use std::collections::HashMap;
use std::path::Path;
use std::sync::RwLock;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use kuzu::{Connection, Database, SystemConfig, Value};
use uuid::Uuid;
use crate::schema::{
Conversation, Entity, Memory, MemoryType, Relation, RelationType, SyncEntry, SyncNodeType,
SyncOp, SyncState,
};
use crate::store::Store;
type RelationsCache = HashMap<(Uuid, Option<RelationType>), Vec<Relation>>;
pub enum ApplyOutcome {
Created,
Updated,
Deleted,
Skipped,
}
pub struct KuzuStore {
db: Database,
machine_id: String,
relations_cache: RwLock<RelationsCache>,
}
fn memory_return_cols(alias: &str) -> String {
["id", "content", "memory_type", "confidence", "created_at", "last_accessed", "access_count", "source", "source_id", "project_path", "machine_id", "updated_at"]
.iter()
.map(|col| format!("{alias}.{col}"))
.collect::<Vec<_>>()
.join(", ")
}
fn project_path_from_db(s: &str) -> Option<String> {
if s.is_empty() { None } else { Some(s.to_string()) }
}
impl KuzuStore {
pub fn open(path: &Path, machine_id: String) -> Result<Self> {
let db = Database::new(path, SystemConfig::default()).context("opening KùzuDB database")?;
let store = Self {
db,
machine_id,
relations_cache: RwLock::new(HashMap::new()),
};
store.init_schema()?;
Ok(store)
}
pub fn in_memory(machine_id: String) -> Result<Self> {
let db =
Database::in_memory(SystemConfig::default()).context("creating in-memory KùzuDB")?;
let store = Self {
db,
machine_id,
relations_cache: RwLock::new(HashMap::new()),
};
store.init_schema()?;
Ok(store)
}
fn evict_relations_for(&self, id: Uuid) {
let mut cache = self.relations_cache.write().unwrap();
cache.retain(|(key_id, _), _| *key_id != id);
}
fn clear_relations_cache(&self) {
self.relations_cache.write().unwrap().clear();
}
pub fn machine_id(&self) -> &str {
&self.machine_id
}
fn conn(&self) -> Result<Connection<'_>> {
Connection::new(&self.db).context("creating connection")
}
pub fn diagnostic(&self) -> Result<String> {
let conn = self.conn()?;
let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
let total: i64 = result
.next()
.map(|r| match &r[0] {
Value::Int64(v) => *v,
_ => -1,
})
.unwrap_or(-1);
let mut result = conn.query(
"MATCH (m:Memory) WHERE size(m.embedding) > 0 AND m.embedding[1] <> 0.0 RETURN count(m);",
)?;
let with_emb: i64 = result
.next()
.map(|r| match &r[0] {
Value::Int64(v) => *v,
_ => -1,
})
.unwrap_or(-1);
let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
let zeros_str = format!("[{}]", zeros.join(","));
let idx_query = format!(
"CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, 1) YIELD node, distance RETURN distance;",
zeros_str
);
let idx_status = match conn.query(&idx_query) {
Ok(mut r) => match r.next() {
Some(row) => format!("ok (distance: {:?})", &row[0]),
None => "ok (0 results)".to_string(),
},
Err(e) => format!("error: {e}"),
};
Ok(format!(
"total memories: {total}\nwith embeddings: {with_emb}\nvector index: {idx_status}"
))
}
fn init_schema(&self) -> Result<()> {
let conn = self.conn()?;
conn.query(
"CREATE NODE TABLE IF NOT EXISTS Memory(
id STRING PRIMARY KEY,
content STRING,
embedding FLOAT[384],
memory_type STRING,
confidence FLOAT,
created_at STRING,
last_accessed STRING,
access_count INT64,
source STRING,
source_id STRING,
project_path STRING
);",
)
.context("creating Memory table")?;
conn.query("ALTER TABLE Memory ADD project_path STRING DEFAULT '';").ok();
conn.query(
"CREATE NODE TABLE IF NOT EXISTS Machine(
id STRING PRIMARY KEY,
name STRING
);",
)
.context("creating Machine table")?;
conn.query("ALTER TABLE Memory ADD machine_id STRING DEFAULT '';").ok();
conn.query("ALTER TABLE Memory ADD updated_at STRING DEFAULT '';").ok();
conn.query("MATCH (m:Memory) WHERE m.updated_at = '' SET m.updated_at = m.created_at;").ok();
conn.query(
"CREATE NODE TABLE IF NOT EXISTS Entity(
id STRING PRIMARY KEY,
name STRING,
entity_type STRING,
embedding FLOAT[384],
aliases STRING[]
);",
)
.context("creating Entity table")?;
conn.query(
"CREATE NODE TABLE IF NOT EXISTS Conversation(
id STRING PRIMARY KEY,
source STRING,
machine_id STRING,
started_at STRING,
project_path STRING
);",
)
.context("creating Conversation table")?;
conn.query(
"CREATE NODE TABLE IF NOT EXISTS IngestLog(
file_path STRING PRIMARY KEY,
file_hash STRING,
ingested_at STRING,
memory_count INT64,
source STRING
);",
)
.context("creating IngestLog table")?;
conn.query(
"CREATE REL TABLE IF NOT EXISTS RELATES_TO(
FROM Memory TO Memory,
strength FLOAT,
context STRING
);",
)
.context("creating RELATES_TO rel")?;
conn.query(
"CREATE REL TABLE IF NOT EXISTS MENTIONS(
FROM Memory TO Entity,
position INT64
);",
)
.context("creating MENTIONS rel")?;
conn.query(
"CREATE REL TABLE IF NOT EXISTS DERIVED_FROM(
FROM Memory TO Conversation,
transformation STRING
);",
)
.context("creating DERIVED_FROM rel")?;
conn.query(
"CREATE REL TABLE IF NOT EXISTS DISTILLED_FROM(
FROM Memory TO Memory,
model STRING
);",
)
.context("creating DISTILLED_FROM rel")?;
conn.query(
"CREATE REL TABLE IF NOT EXISTS CONTRADICTS(
FROM Memory TO Memory,
resolution STRING
);",
)
.context("creating CONTRADICTS rel")?;
conn.query(
"CREATE REL TABLE IF NOT EXISTS REINFORCES(
FROM Memory TO Memory
);",
)
.context("creating REINFORCES rel")?;
conn.query(
"CREATE REL TABLE IF NOT EXISTS SUPERSEDES(
FROM Memory TO Memory,
reason STRING
);",
)
.context("creating SUPERSEDES rel")?;
self.migrate_sync_log(&conn).context("migrating SyncLog table")?;
conn.query(
"CREATE NODE TABLE IF NOT EXISTS SyncState(
peer_id STRING PRIMARY KEY,
last_seq INT64,
last_sync_at STRING
);",
)
.context("creating SyncState table")?;
conn.query(
"CREATE NODE TABLE IF NOT EXISTS ConsolidationLog(
memory_id STRING PRIMARY KEY,
distilled_id STRING,
consolidated_at STRING,
model STRING
);",
)
.context("creating ConsolidationLog table")?;
conn.query(
"CREATE NODE TABLE IF NOT EXISTS WikiExportLog(
id STRING PRIMARY KEY,
last_sync_seq INT64,
exported_at STRING,
vault_path STRING,
pages_created INT64,
pages_updated INT64,
memories_processed INT64
);",
)
.context("creating WikiExportLog table")?;
conn.query(
"CREATE NODE TABLE IF NOT EXISTS Tombstone(
node_id STRING PRIMARY KEY,
node_type STRING,
deleted_at STRING,
machine_id STRING);",
)
.context("creating Tombstone table")?;
conn.query(
"CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
)
.ok();
Ok(())
}
fn migrate_sync_log(&self, conn: &Connection<'_>) -> Result<()> {
let new_schema_present = conn
.query("MATCH (s:SyncLog) RETURN s.id LIMIT 1;")
.is_ok();
if !new_schema_present {
let old_schema_present = conn
.query("MATCH (s:SyncLog) RETURN s.seq LIMIT 1;")
.is_ok();
if old_schema_present {
conn.query("DROP TABLE SyncLog;")?;
conn.query("MATCH (s:SyncState) SET s.last_seq = 0;").ok();
}
}
conn.query(
"CREATE NODE TABLE IF NOT EXISTS SyncLog(
id STRING PRIMARY KEY,
local_seq INT64,
origin_machine_id STRING,
origin_seq INT64,
op STRING,
node_type STRING,
node_id STRING,
timestamp STRING,
data STRING
);",
)?;
Ok(())
}
fn with_transaction<T>(&self, f: impl FnOnce(&Connection<'_>) -> Result<T>) -> Result<T> {
let conn = self.conn()?;
conn.query("BEGIN TRANSACTION;")?;
match f(&conn) {
Ok(v) => {
conn.query("COMMIT;")?;
Ok(v)
}
Err(e) => {
let _ = conn.query("ROLLBACK;");
Err(e)
}
}
}
fn append_sync_log(
&self,
conn: &Connection<'_>,
op: SyncOp,
node_type: SyncNodeType,
node_id: &str,
data: Option<&str>,
origin_machine_id: &str,
origin_seq: Option<i64>,
) -> Result<()> {
let mut r = conn.query("MATCH (s:SyncLog) RETURN max(s.local_seq);")?;
let local_seq: i64 = match r.next() {
Some(row) => match &row[0] {
Value::Int64(n) => n + 1,
_ => 1,
},
None => 1,
};
let origin_seq = origin_seq.unwrap_or(local_seq);
let id = format!("{}:{}", origin_machine_id, origin_seq);
let id_esc = escape_cypher(&id);
let mut dup = conn.query(&format!(
"MATCH (s:SyncLog {{id: '{id_esc}'}}) RETURN s.id LIMIT 1;"
))?;
if dup.next().is_some() {
return Ok(());
}
let timestamp = chrono::Utc::now().to_rfc3339();
let op_str = match op {
SyncOp::Create => "create",
SyncOp::Update => "update",
SyncOp::Delete => "delete",
};
let nt_str = match node_type {
SyncNodeType::Memory => "memory",
SyncNodeType::Entity => "entity",
SyncNodeType::Conversation => "conversation",
SyncNodeType::Relation => "relation",
};
let origin_machine_esc = escape_cypher(origin_machine_id);
let node_id_esc = escape_cypher(node_id);
let data_esc = data.map(escape_cypher).unwrap_or_default();
conn.query(&format!(
"CREATE (:SyncLog {{id:'{id_esc}', local_seq:{local_seq}, \
origin_machine_id:'{origin_machine_esc}', origin_seq:{origin_seq}, \
op:'{op_str}', node_type:'{nt_str}', node_id:'{node_id_esc}', \
timestamp:'{timestamp}', data:'{data_esc}'}});"
))?;
Ok(())
}
pub fn tombstone_exists(&self, id: Uuid) -> Result<bool> {
let conn = self.conn()?;
let id_esc = escape_cypher(&id.to_string());
let mut result = conn.query(&format!(
"MATCH (t:Tombstone {{node_id: '{id_esc}'}}) RETURN t.node_id LIMIT 1;"
))?;
Ok(result.next().is_some())
}
fn upsert_tombstone(
&self,
conn: &Connection<'_>,
node_id: &str,
node_type: &str,
machine_id: &str,
) -> Result<()> {
let node_id_esc = escape_cypher(node_id);
let node_type_esc = escape_cypher(node_type);
let machine_id_esc = escape_cypher(machine_id);
let deleted_at = chrono::Utc::now().to_rfc3339();
conn.query(&format!(
"MERGE (t:Tombstone {{node_id: '{node_id_esc}'}}) \
SET t.node_type = '{node_type_esc}', \
t.deleted_at = '{deleted_at}', \
t.machine_id = '{machine_id_esc}';"
))?;
Ok(())
}
}
impl Store for KuzuStore {
fn store_memory(&self, memory: &Memory) -> Result<()> {
let data = serde_json::to_string(memory).ok();
self.with_transaction(|conn| {
self.create_memory_node(conn, memory)?;
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Memory,
&memory.id.to_string(),
data.as_deref(),
&self.machine_id,
None,
)
})
}
fn get_memory(&self, id: Uuid) -> Result<Option<Memory>> {
let conn = self.conn()?;
let query = format!(
"MATCH (m:Memory {{id: '{}'}}) RETURN {};",
id, memory_return_cols("m")
);
let mut result = conn.query(&query)?;
match result.next() {
Some(row) => Ok(Some(row_to_memory(&row)?)),
None => Ok(None),
}
}
fn delete_memory(&self, id: Uuid) -> Result<()> {
let id_string = id.to_string();
let result = self.with_transaction(|conn| {
conn.query(&format!(
"MATCH (m:Memory {{id: '{id_string}'}}) DETACH DELETE m;"
))?;
self.upsert_tombstone(conn, &id_string, "memory", &self.machine_id)?;
self.append_sync_log(
conn,
SyncOp::Delete,
SyncNodeType::Memory,
&id_string,
None,
&self.machine_id,
None,
)
});
self.clear_relations_cache();
result
}
fn store_entity(&self, entity: &Entity) -> Result<()> {
let data = serde_json::to_string(entity).ok();
self.with_transaction(|conn| {
self.create_entity_node(conn, entity)?;
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Entity,
&entity.id.to_string(),
data.as_deref(),
&self.machine_id,
None,
)
})
}
fn get_entity(&self, id: Uuid) -> Result<Option<Entity>> {
let conn = self.conn()?;
let mut result = conn.query(&format!(
"MATCH (e:Entity {{id: '{}'}}) RETURN e.id, e.name, e.entity_type;",
id
))?;
match result.next() {
Some(row) => Ok(Some(row_to_entity(&row)?)),
None => Ok(None),
}
}
fn find_entity_by_name(&self, name: &str) -> Result<Option<Entity>> {
let conn = self.conn()?;
let name_escaped = escape_cypher(name);
let query = format!(
"MATCH (e:Entity) WHERE e.name = '{}' RETURN e.id, e.name, e.entity_type;",
name_escaped
);
let mut result = conn.query(&query)?;
match result.next() {
Some(row) => Ok(Some(row_to_entity(&row)?)),
None => Ok(None),
}
}
fn store_conversation(&self, conversation: &Conversation) -> Result<()> {
let data = serde_json::to_string(conversation).ok();
self.with_transaction(|conn| {
self.create_conversation_node(conn, conversation)?;
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Conversation,
&conversation.id.to_string(),
data.as_deref(),
&self.machine_id,
None,
)
})
}
fn store_relation(&self, relation: &Relation) -> Result<()> {
let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
let data = serde_json::to_string(relation).ok();
let result = self.with_transaction(|conn| {
self.create_relation_edge(conn, relation)?;
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Relation,
&node_id,
data.as_deref(),
&self.machine_id,
None,
)
});
self.evict_relations_for(relation.from_id);
result
}
fn get_relations(
&self,
node_id: Uuid,
relation_type: Option<RelationType>,
) -> Result<Vec<Relation>> {
let cache_key = (node_id, relation_type);
if let Some(hit) = self.relations_cache.read().unwrap().get(&cache_key) {
return Ok(hit.clone());
}
let conn = self.conn()?;
let id = node_id.to_string();
let query = match relation_type {
Some(RelationType::RelatesTo) => format!(
"MATCH (a:Memory {{id: '{}'}})-[r:RELATES_TO]->(b:Memory) RETURN b.id, r.strength, r.context;",
id
),
Some(RelationType::Contradicts) => format!(
"MATCH (a:Memory {{id: '{}'}})-[r:CONTRADICTS]->(b:Memory) RETURN b.id, r.resolution;",
id
),
Some(RelationType::Reinforces) => format!(
"MATCH (a:Memory {{id: '{}'}})-[:REINFORCES]->(b:Memory) RETURN b.id;",
id
),
Some(RelationType::Supersedes) => format!(
"MATCH (a:Memory {{id: '{}'}})-[r:SUPERSEDES]->(b:Memory) RETURN b.id, r.reason;",
id
),
Some(RelationType::Mentions) => format!(
"MATCH (a:Memory {{id: '{}'}})-[:MENTIONS]->(b:Entity) RETURN b.id;",
id
),
Some(RelationType::DerivedFrom) => format!(
"MATCH (a:Memory {{id: '{}'}})-[:DERIVED_FROM]->(b:Conversation) RETURN b.id;",
id
),
Some(RelationType::DistilledFrom) => format!(
"MATCH (a:Memory {{id: '{}'}})-[:DISTILLED_FROM]->(b:Memory) RETURN b.id;",
id
),
None => {
let mut relations = Vec::new();
let mem_to_mem = [
("RELATES_TO", RelationType::RelatesTo),
("REINFORCES", RelationType::Reinforces),
("CONTRADICTS", RelationType::Contradicts),
("SUPERSEDES", RelationType::Supersedes),
("DISTILLED_FROM", RelationType::DistilledFrom),
];
for (label, rt) in &mem_to_mem {
let q = format!(
"MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:Memory) RETURN b.id;",
id, label
);
if let Ok(mut result) = conn.query(&q) {
for row in &mut result {
let to_id_str = value_to_string(&row[0]);
let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
relations.push(Relation {
from_id: node_id,
to_id,
relation_type: *rt,
strength: 1.0,
context: None,
});
}
}
}
for (label, target, rt) in [
("MENTIONS", "Entity", RelationType::Mentions),
("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
] {
let q = format!(
"MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:{}) RETURN b.id;",
id, label, target
);
if let Ok(mut result) = conn.query(&q) {
for row in &mut result {
let to_id_str = value_to_string(&row[0]);
let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
relations.push(Relation {
from_id: node_id,
to_id,
relation_type: rt,
strength: 1.0,
context: None,
});
}
}
}
self.relations_cache
.write()
.unwrap()
.insert(cache_key, relations.clone());
return Ok(relations);
}
};
let mut result = conn.query(&query)?;
let mut relations = Vec::new();
for row in &mut result {
let to_id_str = value_to_string(&row[0]);
let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
relations.push(Relation {
from_id: node_id,
to_id,
relation_type: relation_type.unwrap_or(RelationType::RelatesTo),
strength: 1.0,
context: None,
});
}
self.relations_cache
.write()
.unwrap()
.insert(cache_key, relations.clone());
Ok(relations)
}
fn vector_search(&self, embedding: &[f32], limit: usize) -> Result<Vec<(Memory, f32)>> {
let conn = self.conn()?;
let embedding_str = format_embedding(embedding);
let query = format!(
"CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, {}) YIELD node, distance
RETURN {}, distance;",
embedding_str, limit, memory_return_cols("node")
);
let mut result = conn.query(&query)?;
let mut results = Vec::new();
for row in &mut result {
let memory = row_to_memory(&row[..12])?;
let distance = value_to_f32(&row[12]);
let similarity = 1.0 - distance;
results.push((memory, similarity));
}
Ok(results)
}
fn traverse(&self, start_id: Uuid, depth: u32) -> Result<Vec<(Memory, Vec<Relation>)>> {
let conn = self.conn()?;
let query = format!(
"MATCH (start:Memory {{id: '{}'}})-[r:RELATES_TO|REINFORCES*1..{}]->(m:Memory)
RETURN DISTINCT {};",
start_id, depth, memory_return_cols("m")
);
let mut result = conn.query(&query)?;
let mut results = Vec::new();
for row in &mut result {
let memory = row_to_memory(&row)?;
results.push((memory, Vec::new()));
}
Ok(results)
}
fn memories_by_source(&self, source: &str) -> Result<Vec<Memory>> {
let conn = self.conn()?;
let source_escaped = escape_cypher(source);
let query = format!(
"MATCH (m:Memory) WHERE m.source = '{}' RETURN {};",
source_escaped, memory_return_cols("m")
);
let mut result = conn.query(&query)?;
let mut memories = Vec::new();
for row in &mut result {
memories.push(row_to_memory(&row)?);
}
Ok(memories)
}
fn memories_by_type(&self, memory_type: MemoryType) -> Result<Vec<Memory>> {
let conn = self.conn()?;
let type_str = format!("{:?}", memory_type).to_lowercase();
let query = format!(
"MATCH (m:Memory) WHERE m.memory_type = '{}' RETURN {};",
type_str, memory_return_cols("m")
);
let mut result = conn.query(&query)?;
let mut memories = Vec::new();
for row in &mut result {
memories.push(row_to_memory(&row)?);
}
Ok(memories)
}
fn memories_needing_decay(&self, threshold_days: u32) -> Result<Vec<Memory>> {
let conn = self.conn()?;
let cutoff = chrono::Utc::now() - chrono::Duration::days(threshold_days as i64);
let cutoff_str = cutoff.to_rfc3339();
let query = format!(
"MATCH (m:Memory) WHERE m.last_accessed < '{}' AND m.confidence > 0.05 RETURN {};",
cutoff_str, memory_return_cols("m")
);
let mut result = conn.query(&query)?;
let mut memories = Vec::new();
for row in &mut result {
memories.push(row_to_memory(&row)?);
}
Ok(memories)
}
fn update_memory(&self, memory: &Memory) -> Result<()> {
let id = memory.id.to_string();
let last_accessed = memory.last_accessed.to_rfc3339();
let updated_at = memory.updated_at.to_rfc3339();
let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
let machine_id_escaped = escape_cypher(&memory.machine_id);
let data = serde_json::to_string(memory).ok();
self.with_transaction(|conn| {
let query = format!(
"MATCH (m:Memory {{id: '{}'}}) SET m.confidence = {}, m.last_accessed = '{}', m.access_count = {}, m.project_path = '{}', m.updated_at = '{}', m.machine_id = '{}';",
id, memory.confidence, last_accessed, memory.access_count, project_path_escaped, updated_at, machine_id_escaped
);
conn.query(&query)?;
self.append_sync_log(
conn,
SyncOp::Update,
SyncNodeType::Memory,
&id,
data.as_deref(),
&self.machine_id,
None,
)
})
}
fn record_access(&self, memory: &Memory) -> Result<()> {
let id_esc = escape_cypher(&memory.id.to_string());
let last_accessed = memory.last_accessed.to_rfc3339();
let last_accessed_esc = escape_cypher(&last_accessed);
let conn = self.conn()?;
conn.query(&format!(
"MATCH (m:Memory {{id: '{id_esc}'}}) SET \
m.last_accessed = '{last_accessed_esc}', \
m.access_count = {}, \
m.confidence = {};",
memory.access_count, memory.confidence
))?;
Ok(())
}
fn text_search(&self, query: &str, limit: usize) -> Result<Vec<Memory>> {
let conn = self.conn()?;
let query_escaped = escape_cypher(&query.to_lowercase());
let cypher = format!(
"MATCH (m:Memory) WHERE lower(m.content) CONTAINS '{}' RETURN {} LIMIT {};",
query_escaped, memory_return_cols("m"), limit
);
let mut result = conn.query(&cypher)?;
let mut memories = Vec::new();
for row in &mut result {
memories.push(row_to_memory(&row)?);
}
Ok(memories)
}
fn memory_count(&self) -> Result<usize> {
let conn = self.conn()?;
let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
match result.next() {
Some(row) => match &row[0] {
Value::Int64(n) => Ok(*n as usize),
_ => Ok(0),
},
None => Ok(0),
}
}
fn all_memory_ids(&self) -> Result<Vec<Uuid>> {
let conn = self.conn()?;
let mut result = conn.query("MATCH (m:Memory) RETURN m.id;")?;
let mut ids = Vec::new();
for row in &mut result {
let id_str = value_to_string(&row[0]);
ids.push(Uuid::parse_str(&id_str).unwrap_or_default());
}
Ok(ids)
}
fn all_relations(&self) -> Result<Vec<Relation>> {
let conn = self.conn()?;
let mut relations = Vec::new();
let mem_to_mem = [
("RELATES_TO", RelationType::RelatesTo),
("REINFORCES", RelationType::Reinforces),
("CONTRADICTS", RelationType::Contradicts),
("SUPERSEDES", RelationType::Supersedes),
("DISTILLED_FROM", RelationType::DistilledFrom),
];
for (label, rt) in &mem_to_mem {
let q = format!(
"MATCH (a:Memory)-[:{}]->(b:Memory) RETURN a.id, b.id;",
label
);
let mut result = conn.query(&q)?;
for row in &mut result {
let from_id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
let to_id = Uuid::parse_str(&value_to_string(&row[1])).unwrap_or_default();
relations.push(Relation {
from_id,
to_id,
relation_type: *rt,
strength: 1.0,
context: None,
});
}
}
for (label, target, rt) in [
("MENTIONS", "Entity", RelationType::Mentions),
("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
] {
let q = format!(
"MATCH (a:Memory)-[:{}]->(b:{}) RETURN a.id, b.id;",
label, target
);
let mut result = conn.query(&q)?;
for row in &mut result {
let from_id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
let to_id = Uuid::parse_str(&value_to_string(&row[1])).unwrap_or_default();
relations.push(Relation {
from_id,
to_id,
relation_type: rt,
strength: 1.0,
context: None,
});
}
}
Ok(relations)
}
}
impl KuzuStore {
pub fn find_or_create_entity(&self, name: &str, entity_type: &str) -> Result<Entity> {
if let Some(existing) = self.find_entity_by_name(name)? {
return Ok(existing);
}
let entity = Entity::new(name.to_string(), entity_type.to_string());
self.store_entity(&entity)?;
Ok(entity)
}
pub fn memory_content_exists(&self, content_prefix: &str) -> Result<bool> {
let conn = self.conn()?;
let escaped = escape_cypher(content_prefix);
let query = format!(
"MATCH (m:Memory) WHERE starts_with(m.content, '{}') RETURN m.id LIMIT 1;",
escaped
);
let mut result = conn.query(&query)?;
Ok(result.next().is_some())
}
pub fn unconsolidated_memories(&self, limit: usize) -> Result<Vec<Memory>> {
let conn = self.conn()?;
let query = format!(
"MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (c:ConsolidationLog) WHERE c.memory_id = m.id}} AND m.source <> 'consolidation' RETURN {} LIMIT {};",
memory_return_cols("m"), limit
);
let mut result = conn.query(&query)?;
let mut memories = Vec::new();
for row in &mut result {
memories.push(row_to_memory(&row)?);
}
Ok(memories)
}
pub fn memories_created_between(
&self,
start: &DateTime<Utc>,
end: &DateTime<Utc>,
) -> Result<Vec<Memory>> {
let conn = self.conn()?;
let query = format!(
"MATCH (m:Memory) WHERE m.created_at >= '{}' AND m.created_at < '{}' RETURN {} ORDER BY m.created_at ASC;",
start.to_rfc3339(),
end.to_rfc3339(),
memory_return_cols("m")
);
let mut result = conn.query(&query)?;
let mut memories = Vec::new();
for row in &mut result {
memories.push(row_to_memory(&row)?);
}
Ok(memories)
}
pub fn mark_consolidated(&self, raw_id: Uuid, distilled_id: Uuid, model: &str) -> Result<()> {
let conn = self.conn()?;
let now = Utc::now().to_rfc3339();
let query = format!(
"CREATE (:ConsolidationLog {{memory_id: '{}', distilled_id: '{}', consolidated_at: '{}', model: '{}'}});",
raw_id,
distilled_id,
now,
escape_cypher(model)
);
conn.query(&query)?;
Ok(())
}
pub fn consolidation_count(&self) -> Result<usize> {
let conn = self.conn()?;
let mut result = conn.query("MATCH (c:ConsolidationLog) RETURN count(c);")?;
match result.next() {
Some(row) => match &row[0] {
Value::Int64(n) => Ok(*n as usize),
_ => Ok(0),
},
None => Ok(0),
}
}
pub fn delete_consolidated_raw(&self) -> Result<usize> {
let ids: Vec<Uuid> = {
let conn = self.conn()?;
let result = conn.query(
"MATCH (c:ConsolidationLog) WHERE c.memory_id <> c.distilled_id RETURN c.memory_id;",
)?;
let mut acc = Vec::new();
for row in result {
if let Value::String(id) = &row[0]
&& let Ok(uuid) = Uuid::parse_str(id)
{
acc.push(uuid);
}
}
acc
};
let count = ids.len();
let machine_id = self.machine_id.clone();
for id in &ids {
let id_str = id.to_string();
let mid = machine_id.clone();
self.with_transaction(|conn| {
conn.query(&format!(
"MATCH (m:Memory {{id: '{id_str}'}}) DETACH DELETE m;"
))?;
self.upsert_tombstone(conn, &id_str, "memory", &mid)?;
self.append_sync_log(
conn,
SyncOp::Delete,
SyncNodeType::Memory,
&id_str,
None,
&mid,
None,
)
})?;
}
if count > 0 {
self.clear_relations_cache();
}
Ok(count)
}
pub fn rebuild_vector_index(&self) -> Result<()> {
let conn = self.conn()?;
conn.query("CALL DROP_VECTOR_INDEX('Memory', 'memory_emb_idx');")
.ok();
conn.query(
"CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
)?;
Ok(())
}
pub fn clear_ingest_log(&self) -> Result<usize> {
let conn = self.conn()?;
let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
let count = match result.next() {
Some(row) => match &row[0] {
Value::Int64(n) => *n as usize,
_ => 0,
},
None => 0,
};
conn.query("MATCH (l:IngestLog) DETACH DELETE l;")?;
Ok(count)
}
pub fn delete_memories_by_source(&self, source: &str) -> Result<usize> {
let conn = self.conn()?;
let escaped = escape_cypher(source);
let mut result = conn.query(&format!(
"MATCH (m:Memory) WHERE m.source = '{}' RETURN count(m);",
escaped
))?;
let count = match result.next() {
Some(row) => match &row[0] {
Value::Int64(n) => *n as usize,
_ => 0,
},
None => 0,
};
conn.query(&format!(
"MATCH (m:Memory) WHERE m.source = '{}' DETACH DELETE m;",
escaped
))?;
if count > 0 {
self.clear_relations_cache();
}
Ok(count)
}
pub fn is_file_ingested(&self, file_path: &str) -> Result<bool> {
let conn = self.conn()?;
let escaped = escape_cypher(file_path);
let mut result = conn.query(&format!(
"MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_path;",
escaped
))?;
Ok(result.next().is_some())
}
pub fn is_file_changed(&self, file_path: &str, file_hash: &str) -> Result<bool> {
let conn = self.conn()?;
let escaped = escape_cypher(file_path);
let mut result = conn.query(&format!(
"MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_hash;",
escaped
))?;
match result.next() {
Some(row) => {
let stored_hash = value_to_string(&row[0]);
Ok(stored_hash != file_hash)
}
None => Ok(true),
}
}
pub fn mark_ingested(
&self,
file_path: &str,
file_hash: &str,
memory_count: usize,
source: &str,
) -> Result<()> {
let conn = self.conn()?;
let path_escaped = escape_cypher(file_path);
let hash_escaped = escape_cypher(file_hash);
let source_escaped = escape_cypher(source);
let now = chrono::Utc::now().to_rfc3339();
conn.query(&format!(
"MERGE (l:IngestLog {{file_path: '{}'}})
SET l.file_hash = '{}', l.ingested_at = '{}', l.memory_count = {}, l.source = '{}';",
path_escaped, hash_escaped, now, memory_count as i64, source_escaped
))?;
Ok(())
}
pub fn ingested_file_count(&self) -> Result<usize> {
let conn = self.conn()?;
let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
match result.next() {
Some(row) => match &row[0] {
Value::Int64(n) => Ok(*n as usize),
_ => Ok(0),
},
None => Ok(0),
}
}
}
impl KuzuStore {
fn create_memory_node(&self, conn: &Connection<'_>, memory: &Memory) -> Result<()> {
let id = memory.id.to_string();
let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
let created_at = memory.created_at.to_rfc3339();
let last_accessed = memory.last_accessed.to_rfc3339();
let embedding_str = format_embedding(&memory.embedding);
let content_escaped = escape_cypher(&memory.content);
let source_escaped = escape_cypher(&memory.source);
let source_id_escaped = escape_cypher(&memory.source_id);
let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
let machine_id_escaped = escape_cypher(&memory.machine_id);
let updated_at = memory.updated_at.to_rfc3339();
let query = format!(
"CREATE (:Memory {{
id: '{id}',
content: '{content_escaped}',
embedding: {embedding_str},
memory_type: '{memory_type}',
confidence: {confidence},
created_at: '{created_at}',
last_accessed: '{last_accessed}',
access_count: {access_count},
source: '{source_escaped}',
source_id: '{source_id_escaped}',
project_path: '{project_path_escaped}',
machine_id: '{machine_id_escaped}',
updated_at: '{updated_at}'
}});",
confidence = memory.confidence,
access_count = memory.access_count,
);
conn.query(&query)?;
Ok(())
}
fn create_entity_node(&self, conn: &Connection<'_>, entity: &Entity) -> Result<()> {
let id = entity.id.to_string();
let embedding_str = format_embedding(&entity.embedding);
let aliases_str = format_string_array(&entity.aliases);
let name_escaped = escape_cypher(&entity.name);
let etype_escaped = escape_cypher(&entity.entity_type);
let query = format!(
"CREATE (:Entity {{
id: '{id}',
name: '{name_escaped}',
entity_type: '{etype_escaped}',
embedding: {embedding_str},
aliases: {aliases_str}
}});"
);
conn.query(&query)?;
Ok(())
}
fn create_conversation_node(
&self,
conn: &Connection<'_>,
conversation: &Conversation,
) -> Result<()> {
let id = conversation.id.to_string();
let started_at = conversation.started_at.to_rfc3339();
let source_escaped = escape_cypher(&conversation.source);
let machine_escaped = escape_cypher(&conversation.machine_id);
let project_escaped = escape_cypher(conversation.project_path.as_deref().unwrap_or(""));
let query = format!(
"CREATE (:Conversation {{
id: '{id}',
source: '{source_escaped}',
machine_id: '{machine_escaped}',
started_at: '{started_at}',
project_path: '{project_escaped}'
}});"
);
conn.query(&query)?;
Ok(())
}
fn create_relation_edge(&self, conn: &Connection<'_>, relation: &Relation) -> Result<()> {
let from_id = relation.from_id.to_string();
let to_id = relation.to_id.to_string();
let query = match relation.relation_type {
RelationType::RelatesTo => format!(
"MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:RELATES_TO {{strength: {}, context: '{}'}}]->(b);",
from_id,
to_id,
relation.strength,
relation.context.as_deref().unwrap_or("")
),
RelationType::Mentions => format!(
"MATCH (a:Memory {{id: '{}'}}), (b:Entity {{id: '{}'}}) CREATE (a)-[:MENTIONS {{position: 0}}]->(b);",
from_id, to_id
),
RelationType::DerivedFrom => format!(
"MATCH (a:Memory {{id: '{}'}}), (b:Conversation {{id: '{}'}}) CREATE (a)-[:DERIVED_FROM {{transformation: '{}'}}]->(b);",
from_id,
to_id,
relation.context.as_deref().unwrap_or("direct")
),
RelationType::Contradicts => format!(
"MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:CONTRADICTS {{resolution: '{}'}}]->(b);",
from_id,
to_id,
relation.context.as_deref().unwrap_or("")
),
RelationType::Reinforces => format!(
"MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:REINFORCES]->(b);",
from_id, to_id
),
RelationType::DistilledFrom => format!(
"MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:DISTILLED_FROM {{model: '{}'}}]->(b);",
from_id,
to_id,
relation.context.as_deref().unwrap_or("")
),
RelationType::Supersedes => format!(
"MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:SUPERSEDES {{reason: '{}'}}]->(b);",
from_id,
to_id,
relation.context.as_deref().unwrap_or("")
),
};
conn.query(&query)?;
Ok(())
}
}
impl KuzuStore {
pub fn get_conversation(&self, id: Uuid) -> Result<Option<Conversation>> {
let conn = self.conn()?;
let mut result = conn.query(&format!(
"MATCH (c:Conversation {{id: '{}'}}) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
id
))?;
match result.next() {
Some(row) => {
let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
let source = value_to_string(&row[1]);
let machine_id = value_to_string(&row[2]);
let started_at_str = value_to_string(&row[3]);
let project_path = {
let s = value_to_string(&row[4]);
if s.is_empty() { None } else { Some(s) }
};
let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
Ok(Some(Conversation {
id,
source,
machine_id,
started_at,
project_path,
}))
}
None => Ok(None),
}
}
pub fn get_memory_with_embedding(&self, id: Uuid) -> Result<Option<Memory>> {
let conn = self.conn()?;
let query = format!(
"MATCH (m:Memory {{id: '{}'}}) RETURN {}, m.embedding;",
id, memory_return_cols("m")
);
let mut result = conn.query(&query)?;
match result.next() {
Some(row) => {
let mut memory = row_to_memory(&row[..12])?;
memory.embedding = value_to_f32_vec(&row[12]);
Ok(Some(memory))
}
None => Ok(None),
}
}
pub fn sync_log_since(&self, after_seq: u64) -> Result<Vec<SyncEntry>> {
self.sync_log_page(after_seq, None)
}
fn tombstone_exists_on(&self, conn: &Connection<'_>, id: Uuid) -> Result<bool> {
let id_esc = escape_cypher(&id.to_string());
let mut result = conn.query(&format!(
"MATCH (t:Tombstone {{node_id: '{id_esc}'}}) RETURN t.node_id LIMIT 1;"
))?;
Ok(result.next().is_some())
}
fn normalize_incoming(mem: &mut Memory) {
if mem.updated_at == DateTime::UNIX_EPOCH {
mem.updated_at = mem.created_at;
}
}
pub fn apply_create_memory(
&self,
mem: &Memory,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let mut mem = mem.clone();
Self::normalize_incoming(&mut mem);
let data = serde_json::to_string(&mem).ok();
let node_id = mem.id.to_string();
self.with_transaction(|conn| {
if self.tombstone_exists_on(conn, mem.id)? {
return Ok(ApplyOutcome::Skipped);
}
if self.get_memory(mem.id)?.is_some() {
return Ok(ApplyOutcome::Skipped);
}
self.create_memory_node(conn, &mem)?;
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Memory,
&node_id,
data.as_deref(),
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Created)
})
}
pub fn with_write_transaction<T>(
&self,
f: impl FnOnce(&Connection<'_>) -> Result<T>,
) -> Result<T> {
self.with_transaction(f)
}
pub fn apply_create_memory_on_tx(
&self,
conn: &Connection<'_>,
mem: &Memory,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let mut mem = mem.clone();
Self::normalize_incoming(&mut mem);
let data = serde_json::to_string(&mem).ok();
let node_id = mem.id.to_string();
if self.tombstone_exists_on(conn, mem.id)? {
return Ok(ApplyOutcome::Skipped);
}
if self.get_memory(mem.id)?.is_some() {
return Ok(ApplyOutcome::Skipped);
}
self.create_memory_node(conn, &mem)?;
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Memory,
&node_id,
data.as_deref(),
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Created)
}
pub fn apply_create_conversation_on_tx(
&self,
conn: &Connection<'_>,
conv: &Conversation,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let data = serde_json::to_string(conv).ok();
let node_id = conv.id.to_string();
if self.get_conversation(conv.id)?.is_some() {
return Ok(ApplyOutcome::Skipped);
}
self.create_conversation_node(conn, conv)?;
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Conversation,
&node_id,
data.as_deref(),
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Created)
}
pub fn apply_create_entity_on_tx(
&self,
conn: &Connection<'_>,
entity: &Entity,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let data = serde_json::to_string(entity).ok();
let node_id = entity.id.to_string();
if self.get_entity(entity.id)?.is_some() {
return Ok(ApplyOutcome::Skipped);
}
self.create_entity_node(conn, entity)?;
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Entity,
&node_id,
data.as_deref(),
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Created)
}
pub fn apply_create_relation_on_tx(
&self,
conn: &Connection<'_>,
relation: &Relation,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
let data = serde_json::to_string(relation).ok();
match self.create_relation_edge(conn, relation) {
Ok(()) => {
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Relation,
&node_id,
data.as_deref(),
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Created)
}
Err(_) => Ok(ApplyOutcome::Skipped),
}
}
pub fn note_relation_created(&self, from_id: Uuid) {
self.evict_relations_for(from_id);
}
pub fn apply_update_memory(
&self,
incoming: &Memory,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<(ApplyOutcome, Option<Memory>)> {
let mut incoming = incoming.clone();
Self::normalize_incoming(&mut incoming);
let data = serde_json::to_string(&incoming).ok();
let node_id = incoming.id.to_string();
self.with_transaction(|conn| {
match self.get_memory(incoming.id)? {
None => {
if self.tombstone_exists_on(conn, incoming.id)? {
return Ok((ApplyOutcome::Skipped, None));
}
self.create_memory_node(conn, &incoming)?;
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Memory,
&node_id,
data.as_deref(),
origin_machine_id,
Some(origin_seq),
)?;
Ok((ApplyOutcome::Created, Some(incoming)))
}
Some(existing) => {
let incoming_wins = if incoming.updated_at != existing.updated_at {
incoming.updated_at > existing.updated_at
} else if incoming.access_count != existing.access_count {
incoming.access_count > existing.access_count
} else {
incoming.machine_id > existing.machine_id
};
if !incoming_wins {
return Ok((ApplyOutcome::Skipped, None));
}
let id = incoming.id.to_string();
let content_escaped = escape_cypher(&incoming.content);
let memory_type = format!("{:?}", incoming.memory_type).to_lowercase();
let created_at = incoming.created_at.to_rfc3339();
let last_accessed = incoming.last_accessed.to_rfc3339();
let updated_at = incoming.updated_at.to_rfc3339();
let source_escaped = escape_cypher(&incoming.source);
let source_id_escaped = escape_cypher(&incoming.source_id);
let project_path_escaped =
escape_cypher(incoming.project_path.as_deref().unwrap_or(""));
let machine_id_escaped = escape_cypher(&incoming.machine_id);
conn.query(&format!(
"MATCH (m:Memory {{id: '{id}'}}) SET \
m.content = '{content_escaped}', \
m.memory_type = '{memory_type}', \
m.confidence = {confidence}, \
m.created_at = '{created_at}', \
m.last_accessed = '{last_accessed}', \
m.access_count = {access_count}, \
m.source = '{source_escaped}', \
m.source_id = '{source_id_escaped}', \
m.project_path = '{project_path_escaped}', \
m.machine_id = '{machine_id_escaped}', \
m.updated_at = '{updated_at}';",
confidence = incoming.confidence,
access_count = incoming.access_count,
))?;
self.append_sync_log(
conn,
SyncOp::Update,
SyncNodeType::Memory,
&node_id,
data.as_deref(),
origin_machine_id,
Some(origin_seq),
)?;
Ok((ApplyOutcome::Updated, Some(incoming)))
}
}
})
}
pub fn apply_delete_memory(
&self,
node_id: &str,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let result = self.with_transaction(|conn| {
conn.query(&format!(
"MATCH (m:Memory {{id: '{node_id}'}}) DETACH DELETE m;"
))?;
self.upsert_tombstone(conn, node_id, "memory", origin_machine_id)?;
self.append_sync_log(
conn,
SyncOp::Delete,
SyncNodeType::Memory,
node_id,
None,
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Deleted)
});
self.clear_relations_cache();
result
}
pub fn apply_create_conversation(
&self,
conv: &Conversation,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let data = serde_json::to_string(conv).ok();
let node_id = conv.id.to_string();
self.with_transaction(|conn| {
if self.get_conversation(conv.id)?.is_some() {
return Ok(ApplyOutcome::Skipped);
}
self.create_conversation_node(conn, conv)?;
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Conversation,
&node_id,
data.as_deref(),
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Created)
})
}
pub fn apply_upsert_conversation(
&self,
conv: &Conversation,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let data = serde_json::to_string(conv).ok();
let node_id = conv.id.to_string();
let source_escaped = escape_cypher(&conv.source);
let machine_escaped = escape_cypher(&conv.machine_id);
let project_escaped = escape_cypher(conv.project_path.as_deref().unwrap_or(""));
let started_at = conv.started_at.to_rfc3339();
self.with_transaction(|conn| {
conn.query(&format!(
"MERGE (c:Conversation {{id: '{node_id}'}}) SET \
c.source = '{source_escaped}', \
c.machine_id = '{machine_escaped}', \
c.started_at = '{started_at}', \
c.project_path = '{project_escaped}';"
))?;
self.append_sync_log(
conn,
SyncOp::Update,
SyncNodeType::Conversation,
&node_id,
data.as_deref(),
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Updated)
})
}
pub fn apply_delete_conversation(
&self,
node_id: &str,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let result = self.with_transaction(|conn| {
conn.query(&format!(
"MATCH (c:Conversation {{id: '{node_id}'}}) DETACH DELETE c;"
))?;
self.append_sync_log(
conn,
SyncOp::Delete,
SyncNodeType::Conversation,
node_id,
None,
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Deleted)
});
self.clear_relations_cache();
result
}
pub fn apply_create_entity(
&self,
entity: &Entity,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let data = serde_json::to_string(entity).ok();
let node_id = entity.id.to_string();
self.with_transaction(|conn| {
if self.get_entity(entity.id)?.is_some() {
return Ok(ApplyOutcome::Skipped);
}
self.create_entity_node(conn, entity)?;
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Entity,
&node_id,
data.as_deref(),
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Created)
})
}
pub fn apply_upsert_entity(
&self,
entity: &Entity,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let data = serde_json::to_string(entity).ok();
let node_id = entity.id.to_string();
let name_escaped = escape_cypher(&entity.name);
let etype_escaped = escape_cypher(&entity.entity_type);
let embedding_str = format_embedding(&entity.embedding);
let aliases_str = format_string_array(&entity.aliases);
self.with_transaction(|conn| {
conn.query(&format!(
"MERGE (e:Entity {{id: '{node_id}'}}) SET \
e.name = '{name_escaped}', \
e.entity_type = '{etype_escaped}', \
e.embedding = {embedding_str}, \
e.aliases = {aliases_str};"
))?;
self.append_sync_log(
conn,
SyncOp::Update,
SyncNodeType::Entity,
&node_id,
data.as_deref(),
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Updated)
})
}
pub fn apply_delete_entity(
&self,
node_id: &str,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let result = self.with_transaction(|conn| {
conn.query(&format!(
"MATCH (e:Entity {{id: '{node_id}'}}) DETACH DELETE e;"
))?;
self.append_sync_log(
conn,
SyncOp::Delete,
SyncNodeType::Entity,
node_id,
None,
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Deleted)
});
self.clear_relations_cache();
result
}
pub fn apply_create_relation(
&self,
relation: &Relation,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
let data = serde_json::to_string(relation).ok();
let result = self.with_transaction(|conn| {
match self.create_relation_edge(conn, relation) {
Ok(()) => {
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Relation,
&node_id,
data.as_deref(),
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Created)
}
Err(_) => Ok(ApplyOutcome::Skipped),
}
});
self.evict_relations_for(relation.from_id);
result
}
pub fn apply_delete_relation(
&self,
node_id: &str,
relation: Option<&Relation>,
origin_machine_id: &str,
origin_seq: i64,
) -> Result<ApplyOutcome> {
let Some(rel) = relation else {
return Ok(ApplyOutcome::Skipped);
};
let evict_id = rel.from_id;
let result = self.with_transaction(|conn| {
let from_id = rel.from_id.to_string();
let to_id = rel.to_id.to_string();
let rel_label = match rel.relation_type {
crate::schema::RelationType::RelatesTo => "RELATES_TO",
crate::schema::RelationType::Mentions => "MENTIONS",
crate::schema::RelationType::DerivedFrom => "DERIVED_FROM",
crate::schema::RelationType::Contradicts => "CONTRADICTS",
crate::schema::RelationType::Reinforces => "REINFORCES",
crate::schema::RelationType::Supersedes => "SUPERSEDES",
crate::schema::RelationType::DistilledFrom => "DISTILLED_FROM",
};
conn.query(&format!(
"MATCH (a {{id: '{from_id}'}})-[r:{rel_label}]->(b {{id: '{to_id}'}}) DELETE r;"
))
.ok();
self.append_sync_log(
conn,
SyncOp::Delete,
SyncNodeType::Relation,
node_id,
None,
origin_machine_id,
Some(origin_seq),
)?;
Ok(ApplyOutcome::Deleted)
});
self.evict_relations_for(evict_id);
result
}
pub fn sync_log_page(&self, after_seq: u64, limit: Option<usize>) -> Result<Vec<SyncEntry>> {
let conn = self.conn()?;
let limit_clause = match limit {
Some(n) => format!(" LIMIT {}", n),
None => String::new(),
};
let query = format!(
"MATCH (s:SyncLog) WHERE s.local_seq > {} \
RETURN s.id, s.local_seq, s.origin_machine_id, s.origin_seq, s.op, s.node_type, s.node_id, s.timestamp, s.data \
ORDER BY s.local_seq{};",
after_seq, limit_clause
);
let mut result = conn.query(&query)?;
let mut entries = Vec::new();
for row in &mut result {
let id = value_to_string(&row[0]);
let local_seq = match &row[1] {
Value::Int64(n) => *n,
_ => 0,
};
let origin_machine_id = value_to_string(&row[2]);
let origin_seq = match &row[3] {
Value::Int64(n) => *n,
_ => 0,
};
let op_str = value_to_string(&row[4]);
let node_type_str = value_to_string(&row[5]);
let node_id = value_to_string(&row[6]);
let timestamp_str = value_to_string(&row[7]);
let data_str = value_to_string(&row[8]);
let op = match op_str.as_str() {
"create" => SyncOp::Create,
"update" => SyncOp::Update,
"delete" => SyncOp::Delete,
_ => continue,
};
let node_type = match node_type_str.as_str() {
"memory" => SyncNodeType::Memory,
"entity" => SyncNodeType::Entity,
"conversation" => SyncNodeType::Conversation,
"relation" => SyncNodeType::Relation,
_ => continue,
};
let timestamp = chrono::DateTime::parse_from_rfc3339(×tamp_str)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
let data = if data_str.is_empty() {
None
} else {
Some(data_str)
};
entries.push(SyncEntry {
id,
local_seq,
origin_machine_id,
origin_seq,
op,
node_type,
node_id,
timestamp,
data,
});
}
Ok(entries)
}
pub fn get_sync_state(&self, peer_id: &str) -> Result<Option<SyncState>> {
let conn = self.conn()?;
let escaped = escape_cypher(peer_id);
let mut result = conn.query(&format!(
"MATCH (s:SyncState {{peer_id: '{}'}}) RETURN s.peer_id, s.last_seq, s.last_sync_at;",
escaped
))?;
match result.next() {
Some(row) => {
let peer_id = value_to_string(&row[0]);
let last_seq = match &row[1] {
Value::Int64(n) => *n as u64,
_ => 0,
};
let last_sync_at_str = value_to_string(&row[2]);
let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
Ok(Some(SyncState {
peer_id,
last_seq,
last_sync_at,
}))
}
None => Ok(None),
}
}
pub fn set_sync_state(&self, state: &SyncState) -> Result<()> {
let conn = self.conn()?;
let peer_escaped = escape_cypher(&state.peer_id);
let now = state.last_sync_at.to_rfc3339();
conn.query(&format!(
"MERGE (s:SyncState {{peer_id: '{}'}}) SET s.last_seq = {}, s.last_sync_at = '{}';",
peer_escaped, state.last_seq, now
))?;
Ok(())
}
pub fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
let conn = self.conn()?;
let mut result =
conn.query("MATCH (s:SyncState) RETURN s.peer_id, s.last_seq, s.last_sync_at;")?;
let mut states = Vec::new();
for row in &mut result {
let peer_id = value_to_string(&row[0]);
let last_seq = match &row[1] {
Value::Int64(n) => *n as u64,
_ => 0,
};
let last_sync_at_str = value_to_string(&row[2]);
let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
states.push(SyncState {
peer_id,
last_seq,
last_sync_at,
});
}
Ok(states)
}
pub fn all_entities(&self) -> Result<Vec<Entity>> {
let conn = self.conn()?;
let mut result = conn.query("MATCH (e:Entity) RETURN e.id, e.name, e.entity_type;")?;
let mut entities = Vec::new();
for row in &mut result {
entities.push(row_to_entity(&row)?);
}
Ok(entities)
}
pub fn memories_for_entity(&self, entity_id: Uuid) -> Result<Vec<Memory>> {
let conn = self.conn()?;
let query = format!(
"MATCH (m:Memory)-[:MENTIONS]->(e:Entity {{id: '{}'}}) RETURN {};",
entity_id, memory_return_cols("m")
);
let mut result = conn.query(&query)?;
let mut memories = Vec::new();
for row in &mut result {
memories.push(row_to_memory(&row)?);
}
Ok(memories)
}
pub fn unassociated_memories(&self) -> Result<Vec<Memory>> {
let conn = self.conn()?;
let mut result = conn.query(
&format!("MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (m)-[:MENTIONS]->(:Entity)}} RETURN {};", memory_return_cols("m"))
)?;
let mut memories = Vec::new();
for row in &mut result {
memories.push(row_to_memory(&row)?);
}
Ok(memories)
}
pub fn related_entity_names(&self, entity_id: Uuid) -> Result<Vec<String>> {
let conn = self.conn()?;
let query = format!(
"MATCH (m:Memory)-[:MENTIONS]->(e1:Entity {{id: '{}'}}), (m)-[:MENTIONS]->(e2:Entity) WHERE e2.id <> '{}' RETURN DISTINCT e2.name;",
entity_id, entity_id
);
let mut result = conn.query(&query)?;
let mut names = Vec::new();
for row in &mut result {
names.push(value_to_string(&row[0]));
}
Ok(names)
}
pub fn max_sync_seq(&self) -> Result<u64> {
let conn = self.conn()?;
let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.local_seq);")?;
match result.next() {
Some(row) => match &row[0] {
Value::Int64(n) => Ok(*n as u64),
_ => Ok(0),
},
None => Ok(0),
}
}
pub fn backfill_project_paths(&self) -> Result<u64> {
let conn = self.conn()?;
let mut result = conn.query(
"MATCH (m:Memory)-[:DERIVED_FROM]->(c:Conversation) WHERE m.project_path = '' AND c.project_path IS NOT NULL AND c.project_path <> '' SET m.project_path = c.project_path RETURN count(m);"
)?;
match result.next() {
Some(row) => match &row[0] {
Value::Int64(n) => Ok(*n as u64),
_ => Ok(0),
},
None => Ok(0),
}
}
pub fn memories_by_project_path(&self, project_path: &str) -> Result<Vec<Memory>> {
let conn = self.conn()?;
let escaped = escape_cypher(project_path);
let cols = memory_return_cols("m");
let query = format!(
"MATCH (m:Memory) WHERE m.project_path = '{}' RETURN {} LIMIT 20;",
escaped, cols
);
let mut result = conn.query(&query)?;
let mut memories = Vec::new();
for row in &mut result {
memories.push(row_to_memory(&row)?);
}
Ok(memories)
}
pub fn all_memories_with_embeddings(&self) -> Result<Vec<Memory>> {
let conn = self.conn()?;
let mut result = conn.query(&format!(
"MATCH (m:Memory) RETURN {}, m.embedding;",
memory_return_cols("m")
))?;
let mut acc = Vec::new();
for row in &mut result {
let mut memory = row_to_memory(&row[..12])?;
memory.embedding = value_to_f32_vec(&row[12]);
acc.push(memory);
}
Ok(acc)
}
pub fn backfill_sync_log(&self) -> Result<u64> {
let mut count = 0u64;
let memories = self.all_memories_with_embeddings()?;
for memory in &memories {
let origin_mid = memory.machine_id.clone();
let data = serde_json::to_string(memory).ok();
let node_id = memory.id.to_string();
self.with_transaction(|conn| {
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Memory,
&node_id,
data.as_deref(),
&origin_mid,
None,
)
})?;
count += 1;
}
let conversations: Vec<Conversation> = {
let conn = self.conn()?;
let mut result = conn.query(
"MATCH (c:Conversation) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
)?;
let mut acc = Vec::new();
for row in &mut result {
let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
let source = value_to_string(&row[1]);
let machine_id = value_to_string(&row[2]);
let started_at_str = value_to_string(&row[3]);
let project_path = {
let s = value_to_string(&row[4]);
if s.is_empty() { None } else { Some(s) }
};
let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
acc.push(Conversation { id, source, machine_id, started_at, project_path });
}
acc
};
for conv in &conversations {
let origin_mid = conv.machine_id.clone();
let data = serde_json::to_string(conv).ok();
let node_id = conv.id.to_string();
self.with_transaction(|conn| {
self.append_sync_log(
conn,
SyncOp::Create,
SyncNodeType::Conversation,
&node_id,
data.as_deref(),
&origin_mid,
None,
)
})?;
count += 1;
}
Ok(count)
}
}
impl KuzuStore {
pub fn upsert_machine(&self, id: &str, name: &str) -> Result<()> {
let conn = self.conn()?;
let id_escaped = escape_cypher(id);
let name_escaped = escape_cypher(name);
conn.query(&format!(
"MERGE (m:Machine {{id: '{id_escaped}'}}) SET m.name = '{name_escaped}';"
))?;
Ok(())
}
pub fn get_machine_name(&self, id: &str) -> Result<Option<String>> {
let conn = self.conn()?;
let id_escaped = escape_cypher(id);
let mut result = conn.query(&format!(
"MATCH (m:Machine {{id: '{id_escaped}'}}) RETURN m.name;"
))?;
match result.next() {
Some(row) => Ok(Some(value_to_string(&row[0]))),
None => Ok(None),
}
}
pub fn get_all_machines(&self) -> Result<std::collections::HashMap<String, String>> {
let conn = self.conn()?;
let mut result = conn.query("MATCH (m:Machine) RETURN m.id, m.name;")?;
let mut map = std::collections::HashMap::new();
for row in &mut result {
map.insert(value_to_string(&row[0]), value_to_string(&row[1]));
}
Ok(map)
}
pub fn backfill_machine_id(&self, machine_id: &str) -> Result<u64> {
let conn = self.conn()?;
let escaped = escape_cypher(machine_id);
let mut result = conn.query(&format!(
"MATCH (m:Memory) WHERE m.machine_id = '' SET m.machine_id = '{escaped}' RETURN count(m);"
))?;
match result.next() {
Some(row) => match &row[0] {
Value::Int64(n) => Ok(*n as u64),
_ => Ok(0),
},
None => Ok(0),
}
}
pub fn register_machine(&self, identity: &crate::machine::MachineIdentity) -> Result<()> {
self.upsert_machine(&identity.id, &identity.name)?;
let count = self.backfill_machine_id(&identity.id)?;
if count > 0 {
tracing::info!("backfilled machine_id on {count} existing memories");
}
Ok(())
}
}
fn format_embedding(embedding: &[f32]) -> String {
if embedding.is_empty() {
let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
return format!("[{}]", zeros.join(","));
}
let parts: Vec<String> = embedding.iter().map(|v| format!("{}", v)).collect();
format!("[{}]", parts.join(","))
}
fn escape_cypher(s: &str) -> String {
s.replace('\\', "\\\\").replace('\'', "\\'")
}
fn format_string_array(items: &[String]) -> String {
let parts: Vec<String> = items
.iter()
.map(|s| format!("'{}'", s.replace('\'', "''")))
.collect();
format!("[{}]", parts.join(","))
}
fn value_to_string(val: &Value) -> String {
match val {
Value::String(s) => s.clone(),
other => format!("{:?}", other),
}
}
fn value_to_f32(val: &Value) -> f32 {
match val {
Value::Float(f) => *f,
Value::Double(d) => *d as f32,
Value::Int64(i) => *i as f32,
_ => 0.0,
}
}
fn value_to_f32_vec(val: &Value) -> Vec<f32> {
match val {
Value::Array(_, items) | Value::List(_, items) => items.iter().map(value_to_f32).collect(),
_ => Vec::new(),
}
}
fn row_to_memory(row: &[Value]) -> Result<Memory> {
let id_str = value_to_string(&row[0]);
let id = Uuid::parse_str(&id_str).unwrap_or_default();
let content = value_to_string(&row[1]);
let memory_type_str = value_to_string(&row[2]);
let confidence = value_to_f32(&row[3]);
let created_at_str = value_to_string(&row[4]);
let last_accessed_str = value_to_string(&row[5]);
let access_count = match &row[6] {
Value::Int64(i) => *i as u32,
_ => 0,
};
let source = value_to_string(&row[7]);
let source_id = value_to_string(&row[8]);
let project_path = project_path_from_db(&value_to_string(&row[9]));
let machine_id = value_to_string(&row[10]);
let updated_at_str = value_to_string(&row[11]);
let memory_type = match memory_type_str.as_str() {
"episodic" => MemoryType::Episodic,
"procedural" => MemoryType::Procedural,
"decision" => MemoryType::Decision,
"architecture" => MemoryType::Architecture,
"debugging" => MemoryType::Debugging,
"task" => MemoryType::Task,
"question" => MemoryType::Question,
_ => MemoryType::Semantic,
};
let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
let last_accessed = chrono::DateTime::parse_from_rfc3339(&last_accessed_str)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
let updated_at = if updated_at_str.is_empty() {
created_at
} else {
chrono::DateTime::parse_from_rfc3339(&updated_at_str)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or(created_at)
};
Ok(Memory {
id,
content,
embedding: Vec::new(),
memory_type,
confidence,
created_at,
last_accessed,
access_count,
source,
source_id,
project_path,
machine_id,
updated_at,
})
}
fn row_to_entity(row: &[Value]) -> Result<Entity> {
let id_str = value_to_string(&row[0]);
let id = Uuid::parse_str(&id_str).unwrap_or_default();
let name = value_to_string(&row[1]);
let entity_type = value_to_string(&row[2]);
Ok(Entity {
id,
name,
entity_type,
embedding: Vec::new(),
aliases: Vec::new(),
})
}
unsafe impl Send for KuzuStore {}
unsafe impl Sync for KuzuStore {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn migrate_sync_log_recreates_old_schema_and_resets_cursors() {
let store = KuzuStore::in_memory("test-machine-migrate".to_string()).unwrap();
let conn = store.conn().unwrap();
conn.query("DROP TABLE SyncLog;").unwrap();
conn.query(
"CREATE NODE TABLE SyncLog(\
seq INT64 PRIMARY KEY, \
op STRING, \
node_type STRING, \
node_id STRING, \
machine_id STRING, \
timestamp STRING, \
data STRING\
);",
)
.unwrap();
conn.query(
"CREATE (:SyncLog {seq: 1, op: 'create', node_type: 'memory', \
node_id: 'abc', machine_id: 'old-machine', \
timestamp: '2024-01-01T00:00:00Z', data: ''});",
)
.unwrap();
conn.query(
"MERGE (s:SyncState {peer_id: 'peer-x'}) \
SET s.last_seq = 5, s.last_sync_at = '2024-01-01T00:00:00Z';",
)
.unwrap();
store.migrate_sync_log(&conn).unwrap();
assert!(
conn.query("MATCH (s:SyncLog) RETURN s.id LIMIT 1;").is_ok(),
"new-schema column s.id must be present after migration"
);
let mut r = conn
.query(
"MATCH (s:SyncState {peer_id: 'peer-x'}) RETURN s.last_seq;",
)
.unwrap();
let last_seq = match r.next() {
Some(row) => match &row[0] {
Value::Int64(n) => *n,
_ => panic!("unexpected value type for last_seq"),
},
None => panic!("SyncState row not found after migration"),
};
assert_eq!(last_seq, 0, "last_seq must be reset to 0 after migration");
}
#[test]
fn rollback_leaves_no_partial_state() {
let store = KuzuStore::in_memory("test-machine-rollback".to_string()).unwrap();
let embedding = std::iter::repeat("0.0")
.take(384)
.collect::<Vec<_>>()
.join(", ");
let result: Result<()> = store.with_transaction(|conn| {
conn.query(&format!(
"CREATE (:Memory {{id: 'rollback-test-id', content: 'x', \
embedding: [{embedding}], memory_type: 'semantic', confidence: 1.0, \
created_at: '2024-01-01T00:00:00Z', last_accessed: '2024-01-01T00:00:00Z', \
access_count: 0, source: 'test', source_id: '', project_path: '', \
machine_id: 'x', updated_at: '2024-01-01T00:00:00Z'}});"
))?;
Err(anyhow::anyhow!("deliberate rollback"))
});
assert!(
result.is_err(),
"with_transaction must propagate the closure error"
);
let entries = store.sync_log_since(0).unwrap();
assert_eq!(entries.len(), 0, "no SyncLog rows may survive a rollback");
assert_eq!(
store.memory_count().unwrap(),
0,
"the Memory node created inside the transaction must be rolled back"
);
}
fn memory_with_embedding(content: &str, embedding: Vec<f32>) -> Memory {
let mut m = Memory::new(
content.to_string(),
MemoryType::Semantic,
"test".to_string(),
String::new(),
);
m.embedding = embedding;
m
}
#[test]
fn recall_reflects_new_relation_after_cache_invalidation() {
use crate::query::{QueryEngine, QueryFilters, QueryRequest};
let store = KuzuStore::in_memory("test-machine-invalidate".to_string()).unwrap();
let mut emb_a = vec![0.0_f32; 384];
emb_a[0] = 1.0;
let mut emb_b = vec![0.0_f32; 384];
emb_b[1] = 1.0;
let target = memory_with_embedding("kuzu is the embedded graph store", emb_a.clone());
let neighbor = memory_with_embedding("an unrelated note about sync", emb_b);
store.store_memory(&target).unwrap();
store.store_memory(&neighbor).unwrap();
let request = QueryRequest {
text: "graph store".to_string(),
embedding: emb_a,
limit: 10,
filters: QueryFilters::default(),
};
let before = QueryEngine::new(&store).recall(&request).unwrap();
let before_score = before
.iter()
.find(|r| r.memory.id == target.id)
.expect("target must be recalled")
.score;
let rel = Relation {
from_id: target.id,
to_id: neighbor.id,
relation_type: RelationType::Reinforces,
strength: 1.0,
context: None,
};
store.store_relation(&rel).unwrap();
let after = QueryEngine::new(&store).recall(&request).unwrap();
let after_score = after
.iter()
.find(|r| r.memory.id == target.id)
.expect("target must still be recalled")
.score;
assert!(
after_score > before_score,
"recall must reflect the new relation: before={before_score} after={after_score}"
);
}
#[test]
fn get_relations_cached_matches_fresh_uncached_store() {
let machine = "test-machine-equivalence".to_string();
let warm = KuzuStore::in_memory(machine.clone()).unwrap();
let a = memory_with_embedding("memory a", {
let mut e = vec![0.0_f32; 384];
e[0] = 1.0;
e
});
let b = memory_with_embedding("memory b", {
let mut e = vec![0.0_f32; 384];
e[1] = 1.0;
e
});
let c = memory_with_embedding("memory c", {
let mut e = vec![0.0_f32; 384];
e[2] = 1.0;
e
});
for m in [&a, &b, &c] {
warm.store_memory(m).unwrap();
}
let rels = [
Relation {
from_id: a.id,
to_id: b.id,
relation_type: RelationType::RelatesTo,
strength: 0.7,
context: None,
},
Relation {
from_id: a.id,
to_id: c.id,
relation_type: RelationType::Reinforces,
strength: 1.0,
context: None,
},
Relation {
from_id: b.id,
to_id: c.id,
relation_type: RelationType::Supersedes,
strength: 1.0,
context: None,
},
];
for r in &rels {
warm.store_relation(r).unwrap();
}
let types = [
None,
Some(RelationType::RelatesTo),
Some(RelationType::Reinforces),
Some(RelationType::Supersedes),
Some(RelationType::Mentions),
];
for id in [a.id, b.id, c.id] {
for rt in types {
let first = warm.get_relations(id, rt).unwrap();
let second = warm.get_relations(id, rt).unwrap();
assert_eq!(
first.len(),
second.len(),
"cache hit must match the live read for ({id}, {rt:?})"
);
let mut a_ids: Vec<Uuid> = first.iter().map(|r| r.to_id).collect();
let mut b_ids: Vec<Uuid> = second.iter().map(|r| r.to_id).collect();
a_ids.sort();
b_ids.sort();
assert_eq!(a_ids, b_ids);
}
}
}
}