use std::fmt;
use std::time::SystemTime;
use serde::{Deserialize, Serialize};
use crate::commit::{MemoryMutation, OpId, TenantId};
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default,
)]
#[serde(transparent)]
pub struct YantrikNodeId(pub u64);
impl YantrikNodeId {
pub const fn new(id: u64) -> Self {
Self(id)
}
pub const fn raw(&self) -> u64 {
self.0
}
}
impl fmt::Display for YantrikNodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "node-{}", self.0)
}
}
impl From<u64> for YantrikNodeId {
fn from(id: u64) -> Self {
Self(id)
}
}
impl From<YantrikNodeId> for u64 {
fn from(id: YantrikNodeId) -> Self {
id.0
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
pub struct YantrikNode {
pub addr: String,
}
impl YantrikNode {
pub fn new(addr: impl Into<String>) -> Self {
Self { addr: addr.into() }
}
}
impl fmt::Display for YantrikNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.addr)
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct YantrikLogEntry {
pub tenant_id: TenantId,
pub op_id: OpId,
pub mutation: MemoryMutation,
}
impl YantrikLogEntry {
pub fn new(tenant_id: TenantId, op_id: OpId, mutation: MemoryMutation) -> Self {
Self {
tenant_id,
op_id,
mutation,
}
}
pub fn into_parts(self) -> (TenantId, OpId, MemoryMutation) {
(self.tenant_id, self.op_id, self.mutation)
}
pub fn mutation(&self) -> &MemoryMutation {
&self.mutation
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct YantrikRaftResponse {
pub term: u64,
pub tenant_log_index: u64,
pub applied_at_unix_micros: i64,
}
impl YantrikRaftResponse {
pub fn new(term: u64, tenant_log_index: u64, applied_at: SystemTime) -> Self {
let micros = applied_at
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.unwrap_or(0);
Self {
term,
tenant_log_index,
applied_at_unix_micros: micros,
}
}
}
openraft::declare_raft_types!(
pub YantrikRaftTypeConfig:
D = YantrikLogEntry,
R = YantrikRaftResponse,
NodeId = YantrikNodeId,
Node = YantrikNode,
Entry = openraft::Entry<YantrikRaftTypeConfig>,
SnapshotData = std::io::Cursor<Vec<u8>>,
AsyncRuntime = openraft::TokioRuntime,
);
#[cfg(test)]
mod tests {
use super::*;
fn upsert(rid: &str) -> MemoryMutation {
MemoryMutation::UpsertMemory {
rid: rid.into(),
text: "x".into(),
memory_type: "semantic".into(),
importance: 0.5,
valence: 0.0,
half_life: 168.0,
namespace: "default".into(),
certainty: 1.0,
domain: "general".into(),
source: "user".into(),
emotional_state: None,
embedding: None,
metadata: serde_json::json!({}),
}
}
#[test]
fn node_id_serde_is_transparent() {
let id = YantrikNodeId::new(7);
let json = serde_json::to_string(&id).unwrap();
assert_eq!(json, "7");
let back: YantrikNodeId = serde_json::from_str(&json).unwrap();
assert_eq!(id, back);
}
#[test]
fn node_id_display_includes_node_prefix() {
assert_eq!(YantrikNodeId::new(7).to_string(), "node-7");
}
#[test]
fn node_id_round_trips_through_u64_conversions() {
let id = YantrikNodeId::from(42_u64);
assert_eq!(id.raw(), 42);
assert_eq!(u64::from(id), 42);
}
#[test]
fn node_serde_round_trip() {
let n = YantrikNode::new("https://10.0.0.5:7100");
let json = serde_json::to_string(&n).unwrap();
let back: YantrikNode = serde_json::from_str(&json).unwrap();
assert_eq!(n, back);
assert_eq!(n.to_string(), "https://10.0.0.5:7100");
}
#[test]
fn log_entry_carries_all_three_required_fields() {
let tenant = TenantId::new(42);
let op_id = OpId::new_random();
let m = upsert("mem_a");
let e = YantrikLogEntry::new(tenant, op_id, m.clone());
assert_eq!(e.tenant_id, tenant);
assert_eq!(e.op_id, op_id);
assert_eq!(e.mutation(), &m);
let (t, o, mm) = e.into_parts();
assert_eq!(t, tenant);
assert_eq!(o, op_id);
assert_eq!(mm, m);
}
#[test]
fn log_entry_serde_round_trip_is_lossless() {
let entry = YantrikLogEntry::new(TenantId::new(1), OpId::new_random(), upsert("mem_a"));
let json = serde_json::to_string(&entry).unwrap();
let back: YantrikLogEntry = serde_json::from_str(&json).unwrap();
assert_eq!(entry, back);
}
#[test]
fn log_entry_with_tombstone_round_trips() {
let entry = YantrikLogEntry::new(
TenantId::new(1),
OpId::new_random(),
MemoryMutation::TombstoneMemory {
rid: "mem_a".into(),
reason: Some("test".into()),
requested_at_unix_micros: 1_700_000_000_000_000,
},
);
let json = serde_json::to_string(&entry).unwrap();
let back: YantrikLogEntry = serde_json::from_str(&json).unwrap();
assert_eq!(entry, back);
}
#[test]
fn raft_response_micros_round_trip() {
let when = std::time::UNIX_EPOCH + std::time::Duration::from_micros(1_700_000_000_000_000);
let r = YantrikRaftResponse::new(7, 42, when);
assert_eq!(r.term, 7);
assert_eq!(r.tenant_log_index, 42);
assert_eq!(r.applied_at_unix_micros, 1_700_000_000_000_000);
}
#[test]
fn raft_response_serde_round_trip() {
let r = YantrikRaftResponse {
term: 3,
tenant_log_index: 99,
applied_at_unix_micros: 1_700_000_000_000_000,
};
let json = serde_json::to_string(&r).unwrap();
let back: YantrikRaftResponse = serde_json::from_str(&json).unwrap();
assert_eq!(r, back);
}
#[test]
fn type_config_associated_types_align() {
fn _assert_assoc<C: openraft::RaftTypeConfig>() {}
_assert_assoc::<YantrikRaftTypeConfig>();
let _: <YantrikRaftTypeConfig as openraft::RaftTypeConfig>::D =
YantrikLogEntry::new(TenantId::new(1), OpId::new_random(), upsert("mem_a"));
let _: <YantrikRaftTypeConfig as openraft::RaftTypeConfig>::R = YantrikRaftResponse {
term: 0,
tenant_log_index: 0,
applied_at_unix_micros: 0,
};
let _: <YantrikRaftTypeConfig as openraft::RaftTypeConfig>::NodeId = YantrikNodeId::new(0);
let _: <YantrikRaftTypeConfig as openraft::RaftTypeConfig>::Node =
YantrikNode::new("http://localhost");
}
}