1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::RwLock;
4
5use anyhow::{Context, Result};
6use chrono::{DateTime, Utc};
7use kuzu::{Connection, Database, SystemConfig, Value};
8use uuid::Uuid;
9
10use crate::schema::{
11 Conversation, Entity, Memory, MemoryType, Relation, RelationType, SyncEntry, SyncNodeType,
12 SyncOp, SyncState,
13};
14use crate::store::Store;
15
16type RelationsCache = HashMap<(Uuid, Option<RelationType>), Vec<Relation>>;
17
18pub enum ApplyOutcome {
19 Created,
20 Updated,
21 Deleted,
22 Skipped,
23}
24
25pub struct KuzuStore {
26 db: Database,
27 machine_id: String,
28 relations_cache: RwLock<RelationsCache>,
33}
34
35fn memory_return_cols(alias: &str) -> String {
36 ["id", "content", "memory_type", "confidence", "created_at", "last_accessed", "access_count", "source", "source_id", "project_path", "machine_id", "updated_at"]
37 .iter()
38 .map(|col| format!("{alias}.{col}"))
39 .collect::<Vec<_>>()
40 .join(", ")
41}
42
43fn project_path_from_db(s: &str) -> Option<String> {
44 if s.is_empty() { None } else { Some(s.to_string()) }
45}
46
47impl KuzuStore {
48 pub fn open(path: &Path, machine_id: String) -> Result<Self> {
49 let db = Database::new(path, SystemConfig::default()).context("opening KùzuDB database")?;
50 let store = Self {
51 db,
52 machine_id,
53 relations_cache: RwLock::new(HashMap::new()),
54 };
55 store.init_schema()?;
56 Ok(store)
57 }
58
59 pub fn in_memory(machine_id: String) -> Result<Self> {
60 let db =
61 Database::in_memory(SystemConfig::default()).context("creating in-memory KùzuDB")?;
62 let store = Self {
63 db,
64 machine_id,
65 relations_cache: RwLock::new(HashMap::new()),
66 };
67 store.init_schema()?;
68 Ok(store)
69 }
70
71 fn evict_relations_for(&self, id: Uuid) {
72 let mut cache = self.relations_cache.write().unwrap();
73 cache.retain(|(key_id, _), _| *key_id != id);
74 }
75
76 fn clear_relations_cache(&self) {
82 self.relations_cache.write().unwrap().clear();
83 }
84
85 pub fn machine_id(&self) -> &str {
86 &self.machine_id
87 }
88
89 fn conn(&self) -> Result<Connection<'_>> {
90 Connection::new(&self.db).context("creating connection")
91 }
92
93 pub fn diagnostic(&self) -> Result<String> {
94 let conn = self.conn()?;
95
96 let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
97 let total: i64 = result
98 .next()
99 .map(|r| match &r[0] {
100 Value::Int64(v) => *v,
101 _ => -1,
102 })
103 .unwrap_or(-1);
104
105 let mut result = conn.query(
106 "MATCH (m:Memory) WHERE size(m.embedding) > 0 AND m.embedding[1] <> 0.0 RETURN count(m);",
107 )?;
108 let with_emb: i64 = result
109 .next()
110 .map(|r| match &r[0] {
111 Value::Int64(v) => *v,
112 _ => -1,
113 })
114 .unwrap_or(-1);
115
116 let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
117 let zeros_str = format!("[{}]", zeros.join(","));
118 let idx_query = format!(
119 "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, 1) YIELD node, distance RETURN distance;",
120 zeros_str
121 );
122 let idx_status = match conn.query(&idx_query) {
123 Ok(mut r) => match r.next() {
124 Some(row) => format!("ok (distance: {:?})", &row[0]),
125 None => "ok (0 results)".to_string(),
126 },
127 Err(e) => format!("error: {e}"),
128 };
129
130 Ok(format!(
131 "total memories: {total}\nwith embeddings: {with_emb}\nvector index: {idx_status}"
132 ))
133 }
134
135 fn init_schema(&self) -> Result<()> {
136 let conn = self.conn()?;
137
138 conn.query(
139 "CREATE NODE TABLE IF NOT EXISTS Memory(
140 id STRING PRIMARY KEY,
141 content STRING,
142 embedding FLOAT[384],
143 memory_type STRING,
144 confidence FLOAT,
145 created_at STRING,
146 last_accessed STRING,
147 access_count INT64,
148 source STRING,
149 source_id STRING,
150 project_path STRING
151 );",
152 )
153 .context("creating Memory table")?;
154
155 conn.query("ALTER TABLE Memory ADD project_path STRING DEFAULT '';").ok();
156
157 conn.query(
158 "CREATE NODE TABLE IF NOT EXISTS Machine(
159 id STRING PRIMARY KEY,
160 name STRING
161 );",
162 )
163 .context("creating Machine table")?;
164
165 conn.query("ALTER TABLE Memory ADD machine_id STRING DEFAULT '';").ok();
166
167 conn.query("ALTER TABLE Memory ADD updated_at STRING DEFAULT '';").ok();
168 conn.query("MATCH (m:Memory) WHERE m.updated_at = '' SET m.updated_at = m.created_at;").ok();
169
170 conn.query(
171 "CREATE NODE TABLE IF NOT EXISTS Entity(
172 id STRING PRIMARY KEY,
173 name STRING,
174 entity_type STRING,
175 embedding FLOAT[384],
176 aliases STRING[]
177 );",
178 )
179 .context("creating Entity table")?;
180
181 conn.query(
182 "CREATE NODE TABLE IF NOT EXISTS Conversation(
183 id STRING PRIMARY KEY,
184 source STRING,
185 machine_id STRING,
186 started_at STRING,
187 project_path STRING
188 );",
189 )
190 .context("creating Conversation table")?;
191
192 conn.query(
193 "CREATE NODE TABLE IF NOT EXISTS IngestLog(
194 file_path STRING PRIMARY KEY,
195 file_hash STRING,
196 ingested_at STRING,
197 memory_count INT64,
198 source STRING
199 );",
200 )
201 .context("creating IngestLog table")?;
202
203 conn.query(
204 "CREATE REL TABLE IF NOT EXISTS RELATES_TO(
205 FROM Memory TO Memory,
206 strength FLOAT,
207 context STRING
208 );",
209 )
210 .context("creating RELATES_TO rel")?;
211
212 conn.query(
213 "CREATE REL TABLE IF NOT EXISTS MENTIONS(
214 FROM Memory TO Entity,
215 position INT64
216 );",
217 )
218 .context("creating MENTIONS rel")?;
219
220 conn.query(
221 "CREATE REL TABLE IF NOT EXISTS DERIVED_FROM(
222 FROM Memory TO Conversation,
223 transformation STRING
224 );",
225 )
226 .context("creating DERIVED_FROM rel")?;
227
228 conn.query(
229 "CREATE REL TABLE IF NOT EXISTS DISTILLED_FROM(
230 FROM Memory TO Memory,
231 model STRING
232 );",
233 )
234 .context("creating DISTILLED_FROM rel")?;
235
236 conn.query(
237 "CREATE REL TABLE IF NOT EXISTS CONTRADICTS(
238 FROM Memory TO Memory,
239 resolution STRING
240 );",
241 )
242 .context("creating CONTRADICTS rel")?;
243
244 conn.query(
245 "CREATE REL TABLE IF NOT EXISTS REINFORCES(
246 FROM Memory TO Memory
247 );",
248 )
249 .context("creating REINFORCES rel")?;
250
251 conn.query(
252 "CREATE REL TABLE IF NOT EXISTS SUPERSEDES(
253 FROM Memory TO Memory,
254 reason STRING
255 );",
256 )
257 .context("creating SUPERSEDES rel")?;
258
259 self.migrate_sync_log(&conn).context("migrating SyncLog table")?;
260
261 conn.query(
262 "CREATE NODE TABLE IF NOT EXISTS SyncState(
263 peer_id STRING PRIMARY KEY,
264 last_seq INT64,
265 last_sync_at STRING
266 );",
267 )
268 .context("creating SyncState table")?;
269
270 conn.query(
271 "CREATE NODE TABLE IF NOT EXISTS ConsolidationLog(
272 memory_id STRING PRIMARY KEY,
273 distilled_id STRING,
274 consolidated_at STRING,
275 model STRING
276 );",
277 )
278 .context("creating ConsolidationLog table")?;
279
280 conn.query(
281 "CREATE NODE TABLE IF NOT EXISTS WikiExportLog(
282 id STRING PRIMARY KEY,
283 last_sync_seq INT64,
284 exported_at STRING,
285 vault_path STRING,
286 pages_created INT64,
287 pages_updated INT64,
288 memories_processed INT64
289 );",
290 )
291 .context("creating WikiExportLog table")?;
292
293 conn.query(
294 "CREATE NODE TABLE IF NOT EXISTS Tombstone(
295 node_id STRING PRIMARY KEY,
296 node_type STRING,
297 deleted_at STRING,
298 machine_id STRING);",
299 )
300 .context("creating Tombstone table")?;
301
302 conn.query(
303 "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
304 )
305 .ok();
306
307 Ok(())
308 }
309
310 fn migrate_sync_log(&self, conn: &Connection<'_>) -> Result<()> {
311 let new_schema_present = conn
316 .query("MATCH (s:SyncLog) RETURN s.id LIMIT 1;")
317 .is_ok();
318 if !new_schema_present {
319 let old_schema_present = conn
320 .query("MATCH (s:SyncLog) RETURN s.seq LIMIT 1;")
321 .is_ok();
322 if old_schema_present {
323 conn.query("DROP TABLE SyncLog;")?;
324 conn.query("MATCH (s:SyncState) SET s.last_seq = 0;").ok();
325 }
326 }
327 conn.query(
328 "CREATE NODE TABLE IF NOT EXISTS SyncLog(
329 id STRING PRIMARY KEY,
330 local_seq INT64,
331 origin_machine_id STRING,
332 origin_seq INT64,
333 op STRING,
334 node_type STRING,
335 node_id STRING,
336 timestamp STRING,
337 data STRING
338 );",
339 )?;
340 Ok(())
341 }
342
343 fn with_transaction<T>(&self, f: impl FnOnce(&Connection<'_>) -> Result<T>) -> Result<T> {
344 let conn = self.conn()?;
345 conn.query("BEGIN TRANSACTION;")?;
346 match f(&conn) {
347 Ok(v) => {
348 conn.query("COMMIT;")?;
349 Ok(v)
350 }
351 Err(e) => {
352 let _ = conn.query("ROLLBACK;");
353 Err(e)
354 }
355 }
356 }
357
358 fn append_sync_log(
359 &self,
360 conn: &Connection<'_>,
361 op: SyncOp,
362 node_type: SyncNodeType,
363 node_id: &str,
364 data: Option<&str>,
365 origin_machine_id: &str,
366 origin_seq: Option<i64>,
367 ) -> Result<()> {
368 let mut r = conn.query("MATCH (s:SyncLog) RETURN max(s.local_seq);")?;
369 let local_seq: i64 = match r.next() {
370 Some(row) => match &row[0] {
371 Value::Int64(n) => n + 1,
372 _ => 1,
373 },
374 None => 1,
375 };
376 let origin_seq = origin_seq.unwrap_or(local_seq);
377 let id = format!("{}:{}", origin_machine_id, origin_seq);
378 let id_esc = escape_cypher(&id);
379 let mut dup = conn.query(&format!(
380 "MATCH (s:SyncLog {{id: '{id_esc}'}}) RETURN s.id LIMIT 1;"
381 ))?;
382 if dup.next().is_some() {
383 return Ok(());
384 }
385 let timestamp = chrono::Utc::now().to_rfc3339();
386 let op_str = match op {
387 SyncOp::Create => "create",
388 SyncOp::Update => "update",
389 SyncOp::Delete => "delete",
390 };
391 let nt_str = match node_type {
392 SyncNodeType::Memory => "memory",
393 SyncNodeType::Entity => "entity",
394 SyncNodeType::Conversation => "conversation",
395 SyncNodeType::Relation => "relation",
396 };
397 let origin_machine_esc = escape_cypher(origin_machine_id);
398 let node_id_esc = escape_cypher(node_id);
399 let data_esc = data.map(escape_cypher).unwrap_or_default();
400 conn.query(&format!(
401 "CREATE (:SyncLog {{id:'{id_esc}', local_seq:{local_seq}, \
402 origin_machine_id:'{origin_machine_esc}', origin_seq:{origin_seq}, \
403 op:'{op_str}', node_type:'{nt_str}', node_id:'{node_id_esc}', \
404 timestamp:'{timestamp}', data:'{data_esc}'}});"
405 ))?;
406 Ok(())
407 }
408
409 pub fn tombstone_exists(&self, id: Uuid) -> Result<bool> {
410 let conn = self.conn()?;
411 let id_esc = escape_cypher(&id.to_string());
412 let mut result = conn.query(&format!(
413 "MATCH (t:Tombstone {{node_id: '{id_esc}'}}) RETURN t.node_id LIMIT 1;"
414 ))?;
415 Ok(result.next().is_some())
416 }
417
418 fn upsert_tombstone(
419 &self,
420 conn: &Connection<'_>,
421 node_id: &str,
422 node_type: &str,
423 machine_id: &str,
424 ) -> Result<()> {
425 let node_id_esc = escape_cypher(node_id);
426 let node_type_esc = escape_cypher(node_type);
427 let machine_id_esc = escape_cypher(machine_id);
428 let deleted_at = chrono::Utc::now().to_rfc3339();
429 conn.query(&format!(
430 "MERGE (t:Tombstone {{node_id: '{node_id_esc}'}}) \
431 SET t.node_type = '{node_type_esc}', \
432 t.deleted_at = '{deleted_at}', \
433 t.machine_id = '{machine_id_esc}';"
434 ))?;
435 Ok(())
436 }
437}
438
439impl Store for KuzuStore {
440 fn store_memory(&self, memory: &Memory) -> Result<()> {
441 let data = serde_json::to_string(memory).ok();
442 self.with_transaction(|conn| {
443 self.create_memory_node(conn, memory)?;
444 self.append_sync_log(
445 conn,
446 SyncOp::Create,
447 SyncNodeType::Memory,
448 &memory.id.to_string(),
449 data.as_deref(),
450 &self.machine_id,
451 None,
452 )
453 })
454 }
455
456 fn get_memory(&self, id: Uuid) -> Result<Option<Memory>> {
457 let conn = self.conn()?;
458 let query = format!(
459 "MATCH (m:Memory {{id: '{}'}}) RETURN {};",
460 id, memory_return_cols("m")
461 );
462
463 let mut result = conn.query(&query)?;
464 match result.next() {
465 Some(row) => Ok(Some(row_to_memory(&row)?)),
466 None => Ok(None),
467 }
468 }
469
470 fn delete_memory(&self, id: Uuid) -> Result<()> {
471 let id_string = id.to_string();
472 let result = self.with_transaction(|conn| {
473 conn.query(&format!(
474 "MATCH (m:Memory {{id: '{id_string}'}}) DETACH DELETE m;"
475 ))?;
476 self.upsert_tombstone(conn, &id_string, "memory", &self.machine_id)?;
477 self.append_sync_log(
478 conn,
479 SyncOp::Delete,
480 SyncNodeType::Memory,
481 &id_string,
482 None,
483 &self.machine_id,
484 None,
485 )
486 });
487 self.clear_relations_cache();
492 result
493 }
494
495 fn store_entity(&self, entity: &Entity) -> Result<()> {
496 let data = serde_json::to_string(entity).ok();
497 self.with_transaction(|conn| {
498 self.create_entity_node(conn, entity)?;
499 self.append_sync_log(
500 conn,
501 SyncOp::Create,
502 SyncNodeType::Entity,
503 &entity.id.to_string(),
504 data.as_deref(),
505 &self.machine_id,
506 None,
507 )
508 })
509 }
510
511 fn get_entity(&self, id: Uuid) -> Result<Option<Entity>> {
512 let conn = self.conn()?;
513 let mut result = conn.query(&format!(
514 "MATCH (e:Entity {{id: '{}'}}) RETURN e.id, e.name, e.entity_type;",
515 id
516 ))?;
517
518 match result.next() {
519 Some(row) => Ok(Some(row_to_entity(&row)?)),
520 None => Ok(None),
521 }
522 }
523
524 fn find_entity_by_name(&self, name: &str) -> Result<Option<Entity>> {
525 let conn = self.conn()?;
526 let name_escaped = escape_cypher(name);
527 let query = format!(
528 "MATCH (e:Entity) WHERE e.name = '{}' RETURN e.id, e.name, e.entity_type;",
529 name_escaped
530 );
531 let mut result = conn.query(&query)?;
532
533 match result.next() {
534 Some(row) => Ok(Some(row_to_entity(&row)?)),
535 None => Ok(None),
536 }
537 }
538
539 fn store_conversation(&self, conversation: &Conversation) -> Result<()> {
540 let data = serde_json::to_string(conversation).ok();
541 self.with_transaction(|conn| {
542 self.create_conversation_node(conn, conversation)?;
543 self.append_sync_log(
544 conn,
545 SyncOp::Create,
546 SyncNodeType::Conversation,
547 &conversation.id.to_string(),
548 data.as_deref(),
549 &self.machine_id,
550 None,
551 )
552 })
553 }
554
555 fn store_relation(&self, relation: &Relation) -> Result<()> {
556 let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
557 let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
558 let data = serde_json::to_string(relation).ok();
559 let result = self.with_transaction(|conn| {
560 self.create_relation_edge(conn, relation)?;
561 self.append_sync_log(
562 conn,
563 SyncOp::Create,
564 SyncNodeType::Relation,
565 &node_id,
566 data.as_deref(),
567 &self.machine_id,
568 None,
569 )
570 });
571 self.evict_relations_for(relation.from_id);
574 result
575 }
576
577 fn get_relations(
578 &self,
579 node_id: Uuid,
580 relation_type: Option<RelationType>,
581 ) -> Result<Vec<Relation>> {
582 let cache_key = (node_id, relation_type);
583 if let Some(hit) = self.relations_cache.read().unwrap().get(&cache_key) {
584 return Ok(hit.clone());
585 }
586
587 let conn = self.conn()?;
588 let id = node_id.to_string();
589
590 let query = match relation_type {
591 Some(RelationType::RelatesTo) => format!(
592 "MATCH (a:Memory {{id: '{}'}})-[r:RELATES_TO]->(b:Memory) RETURN b.id, r.strength, r.context;",
593 id
594 ),
595 Some(RelationType::Contradicts) => format!(
596 "MATCH (a:Memory {{id: '{}'}})-[r:CONTRADICTS]->(b:Memory) RETURN b.id, r.resolution;",
597 id
598 ),
599 Some(RelationType::Reinforces) => format!(
600 "MATCH (a:Memory {{id: '{}'}})-[:REINFORCES]->(b:Memory) RETURN b.id;",
601 id
602 ),
603 Some(RelationType::Supersedes) => format!(
604 "MATCH (a:Memory {{id: '{}'}})-[r:SUPERSEDES]->(b:Memory) RETURN b.id, r.reason;",
605 id
606 ),
607 Some(RelationType::Mentions) => format!(
608 "MATCH (a:Memory {{id: '{}'}})-[:MENTIONS]->(b:Entity) RETURN b.id;",
609 id
610 ),
611 Some(RelationType::DerivedFrom) => format!(
612 "MATCH (a:Memory {{id: '{}'}})-[:DERIVED_FROM]->(b:Conversation) RETURN b.id;",
613 id
614 ),
615 Some(RelationType::DistilledFrom) => format!(
616 "MATCH (a:Memory {{id: '{}'}})-[:DISTILLED_FROM]->(b:Memory) RETURN b.id;",
617 id
618 ),
619 None => {
620 let mut relations = Vec::new();
621 let mem_to_mem = [
622 ("RELATES_TO", RelationType::RelatesTo),
623 ("REINFORCES", RelationType::Reinforces),
624 ("CONTRADICTS", RelationType::Contradicts),
625 ("SUPERSEDES", RelationType::Supersedes),
626 ("DISTILLED_FROM", RelationType::DistilledFrom),
627 ];
628 for (label, rt) in &mem_to_mem {
629 let q = format!(
630 "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:Memory) RETURN b.id;",
631 id, label
632 );
633 if let Ok(mut result) = conn.query(&q) {
634 for row in &mut result {
635 let to_id_str = value_to_string(&row[0]);
636 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
637 relations.push(Relation {
638 from_id: node_id,
639 to_id,
640 relation_type: *rt,
641 strength: 1.0,
642 context: None,
643 });
644 }
645 }
646 }
647 for (label, target, rt) in [
648 ("MENTIONS", "Entity", RelationType::Mentions),
649 ("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
650 ] {
651 let q = format!(
652 "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:{}) RETURN b.id;",
653 id, label, target
654 );
655 if let Ok(mut result) = conn.query(&q) {
656 for row in &mut result {
657 let to_id_str = value_to_string(&row[0]);
658 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
659 relations.push(Relation {
660 from_id: node_id,
661 to_id,
662 relation_type: rt,
663 strength: 1.0,
664 context: None,
665 });
666 }
667 }
668 }
669 self.relations_cache
670 .write()
671 .unwrap()
672 .insert(cache_key, relations.clone());
673 return Ok(relations);
674 }
675 };
676
677 let mut result = conn.query(&query)?;
678 let mut relations = Vec::new();
679
680 for row in &mut result {
681 let to_id_str = value_to_string(&row[0]);
682 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
683
684 relations.push(Relation {
685 from_id: node_id,
686 to_id,
687 relation_type: relation_type.unwrap_or(RelationType::RelatesTo),
688 strength: 1.0,
689 context: None,
690 });
691 }
692
693 self.relations_cache
694 .write()
695 .unwrap()
696 .insert(cache_key, relations.clone());
697 Ok(relations)
698 }
699
700 fn vector_search(&self, embedding: &[f32], limit: usize) -> Result<Vec<(Memory, f32)>> {
701 let conn = self.conn()?;
702 let embedding_str = format_embedding(embedding);
703
704 let query = format!(
705 "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, {}) YIELD node, distance
706 RETURN {}, distance;",
707 embedding_str, limit, memory_return_cols("node")
708 );
709
710 let mut result = conn.query(&query)?;
711 let mut results = Vec::new();
712
713 for row in &mut result {
714 let memory = row_to_memory(&row[..12])?;
715 let distance = value_to_f32(&row[12]);
716 let similarity = 1.0 - distance;
717 results.push((memory, similarity));
718 }
719
720 Ok(results)
721 }
722
723 fn traverse(&self, start_id: Uuid, depth: u32) -> Result<Vec<(Memory, Vec<Relation>)>> {
724 let conn = self.conn()?;
725 let query = format!(
726 "MATCH (start:Memory {{id: '{}'}})-[r:RELATES_TO|REINFORCES*1..{}]->(m:Memory)
727 RETURN DISTINCT {};",
728 start_id, depth, memory_return_cols("m")
729 );
730
731 let mut result = conn.query(&query)?;
732 let mut results = Vec::new();
733
734 for row in &mut result {
735 let memory = row_to_memory(&row)?;
736 results.push((memory, Vec::new()));
737 }
738
739 Ok(results)
740 }
741
742 fn memories_by_source(&self, source: &str) -> Result<Vec<Memory>> {
743 let conn = self.conn()?;
744 let source_escaped = escape_cypher(source);
745 let query = format!(
746 "MATCH (m:Memory) WHERE m.source = '{}' RETURN {};",
747 source_escaped, memory_return_cols("m")
748 );
749 let mut result = conn.query(&query)?;
750
751 let mut memories = Vec::new();
752 for row in &mut result {
753 memories.push(row_to_memory(&row)?);
754 }
755 Ok(memories)
756 }
757
758 fn memories_by_type(&self, memory_type: MemoryType) -> Result<Vec<Memory>> {
759 let conn = self.conn()?;
760 let type_str = format!("{:?}", memory_type).to_lowercase();
761 let query = format!(
762 "MATCH (m:Memory) WHERE m.memory_type = '{}' RETURN {};",
763 type_str, memory_return_cols("m")
764 );
765 let mut result = conn.query(&query)?;
766
767 let mut memories = Vec::new();
768 for row in &mut result {
769 memories.push(row_to_memory(&row)?);
770 }
771 Ok(memories)
772 }
773
774 fn memories_needing_decay(&self, threshold_days: u32) -> Result<Vec<Memory>> {
775 let conn = self.conn()?;
776 let cutoff = chrono::Utc::now() - chrono::Duration::days(threshold_days as i64);
777 let cutoff_str = cutoff.to_rfc3339();
778
779 let query = format!(
780 "MATCH (m:Memory) WHERE m.last_accessed < '{}' AND m.confidence > 0.05 RETURN {};",
781 cutoff_str, memory_return_cols("m")
782 );
783
784 let mut result = conn.query(&query)?;
785 let mut memories = Vec::new();
786 for row in &mut result {
787 memories.push(row_to_memory(&row)?);
788 }
789 Ok(memories)
790 }
791
792 fn update_memory(&self, memory: &Memory) -> Result<()> {
793 let id = memory.id.to_string();
794 let last_accessed = memory.last_accessed.to_rfc3339();
795 let updated_at = memory.updated_at.to_rfc3339();
796 let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
797 let machine_id_escaped = escape_cypher(&memory.machine_id);
798 let data = serde_json::to_string(memory).ok();
799 self.with_transaction(|conn| {
800 let query = format!(
801 "MATCH (m:Memory {{id: '{}'}}) SET m.confidence = {}, m.last_accessed = '{}', m.access_count = {}, m.project_path = '{}', m.updated_at = '{}', m.machine_id = '{}';",
802 id, memory.confidence, last_accessed, memory.access_count, project_path_escaped, updated_at, machine_id_escaped
803 );
804 conn.query(&query)?;
805 self.append_sync_log(
806 conn,
807 SyncOp::Update,
808 SyncNodeType::Memory,
809 &id,
810 data.as_deref(),
811 &self.machine_id,
812 None,
813 )
814 })
815 }
816
817 fn record_access(&self, memory: &Memory) -> Result<()> {
818 let id_esc = escape_cypher(&memory.id.to_string());
819 let last_accessed = memory.last_accessed.to_rfc3339();
820 let last_accessed_esc = escape_cypher(&last_accessed);
821 let conn = self.conn()?;
822 conn.query(&format!(
823 "MATCH (m:Memory {{id: '{id_esc}'}}) SET \
824 m.last_accessed = '{last_accessed_esc}', \
825 m.access_count = {}, \
826 m.confidence = {};",
827 memory.access_count, memory.confidence
828 ))?;
829 Ok(())
830 }
831
832 fn text_search(&self, query: &str, limit: usize) -> Result<Vec<Memory>> {
833 let conn = self.conn()?;
834 let query_escaped = escape_cypher(&query.to_lowercase());
835 let cypher = format!(
836 "MATCH (m:Memory) WHERE lower(m.content) CONTAINS '{}' RETURN {} LIMIT {};",
837 query_escaped, memory_return_cols("m"), limit
838 );
839
840 let mut result = conn.query(&cypher)?;
841 let mut memories = Vec::new();
842 for row in &mut result {
843 memories.push(row_to_memory(&row)?);
844 }
845 Ok(memories)
846 }
847
848 fn memory_count(&self) -> Result<usize> {
849 let conn = self.conn()?;
850 let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
851 match result.next() {
852 Some(row) => match &row[0] {
853 Value::Int64(n) => Ok(*n as usize),
854 _ => Ok(0),
855 },
856 None => Ok(0),
857 }
858 }
859
860 fn all_memory_ids(&self) -> Result<Vec<Uuid>> {
861 let conn = self.conn()?;
862 let mut result = conn.query("MATCH (m:Memory) RETURN m.id;")?;
863 let mut ids = Vec::new();
864 for row in &mut result {
865 let id_str = value_to_string(&row[0]);
866 ids.push(Uuid::parse_str(&id_str).unwrap_or_default());
867 }
868 Ok(ids)
869 }
870
871 fn all_relations(&self) -> Result<Vec<Relation>> {
872 let conn = self.conn()?;
873 let mut relations = Vec::new();
874
875 let mem_to_mem = [
876 ("RELATES_TO", RelationType::RelatesTo),
877 ("REINFORCES", RelationType::Reinforces),
878 ("CONTRADICTS", RelationType::Contradicts),
879 ("SUPERSEDES", RelationType::Supersedes),
880 ("DISTILLED_FROM", RelationType::DistilledFrom),
881 ];
882 for (label, rt) in &mem_to_mem {
883 let q = format!(
884 "MATCH (a:Memory)-[:{}]->(b:Memory) RETURN a.id, b.id;",
885 label
886 );
887 let mut result = conn.query(&q)?;
888 for row in &mut result {
889 let from_id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
890 let to_id = Uuid::parse_str(&value_to_string(&row[1])).unwrap_or_default();
891 relations.push(Relation {
892 from_id,
893 to_id,
894 relation_type: *rt,
895 strength: 1.0,
896 context: None,
897 });
898 }
899 }
900
901 for (label, target, rt) in [
902 ("MENTIONS", "Entity", RelationType::Mentions),
903 ("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
904 ] {
905 let q = format!(
906 "MATCH (a:Memory)-[:{}]->(b:{}) RETURN a.id, b.id;",
907 label, target
908 );
909 let mut result = conn.query(&q)?;
910 for row in &mut result {
911 let from_id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
912 let to_id = Uuid::parse_str(&value_to_string(&row[1])).unwrap_or_default();
913 relations.push(Relation {
914 from_id,
915 to_id,
916 relation_type: rt,
917 strength: 1.0,
918 context: None,
919 });
920 }
921 }
922
923 Ok(relations)
924 }
925}
926
927impl KuzuStore {
928 pub fn find_or_create_entity(&self, name: &str, entity_type: &str) -> Result<Entity> {
929 if let Some(existing) = self.find_entity_by_name(name)? {
930 return Ok(existing);
931 }
932 let entity = Entity::new(name.to_string(), entity_type.to_string());
933 self.store_entity(&entity)?;
934 Ok(entity)
935 }
936
937 pub fn memory_content_exists(&self, content_prefix: &str) -> Result<bool> {
938 let conn = self.conn()?;
939 let escaped = escape_cypher(content_prefix);
940 let query = format!(
941 "MATCH (m:Memory) WHERE starts_with(m.content, '{}') RETURN m.id LIMIT 1;",
942 escaped
943 );
944 let mut result = conn.query(&query)?;
945 Ok(result.next().is_some())
946 }
947
948 pub fn unconsolidated_memories(&self, limit: usize) -> Result<Vec<Memory>> {
949 let conn = self.conn()?;
950 let query = format!(
951 "MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (c:ConsolidationLog) WHERE c.memory_id = m.id}} AND m.source <> 'consolidation' RETURN {} LIMIT {};",
952 memory_return_cols("m"), limit
953 );
954 let mut result = conn.query(&query)?;
955 let mut memories = Vec::new();
956 for row in &mut result {
957 memories.push(row_to_memory(&row)?);
958 }
959 Ok(memories)
960 }
961
962 pub fn memories_created_between(
963 &self,
964 start: &DateTime<Utc>,
965 end: &DateTime<Utc>,
966 ) -> Result<Vec<Memory>> {
967 let conn = self.conn()?;
968 let query = format!(
969 "MATCH (m:Memory) WHERE m.created_at >= '{}' AND m.created_at < '{}' RETURN {} ORDER BY m.created_at ASC;",
970 start.to_rfc3339(),
971 end.to_rfc3339(),
972 memory_return_cols("m")
973 );
974 let mut result = conn.query(&query)?;
975 let mut memories = Vec::new();
976 for row in &mut result {
977 memories.push(row_to_memory(&row)?);
978 }
979 Ok(memories)
980 }
981
982 pub fn mark_consolidated(&self, raw_id: Uuid, distilled_id: Uuid, model: &str) -> Result<()> {
983 let conn = self.conn()?;
984 let now = Utc::now().to_rfc3339();
985 let query = format!(
986 "CREATE (:ConsolidationLog {{memory_id: '{}', distilled_id: '{}', consolidated_at: '{}', model: '{}'}});",
987 raw_id,
988 distilled_id,
989 now,
990 escape_cypher(model)
991 );
992 conn.query(&query)?;
993 Ok(())
994 }
995
996 pub fn consolidation_count(&self) -> Result<usize> {
997 let conn = self.conn()?;
998 let mut result = conn.query("MATCH (c:ConsolidationLog) RETURN count(c);")?;
999 match result.next() {
1000 Some(row) => match &row[0] {
1001 Value::Int64(n) => Ok(*n as usize),
1002 _ => Ok(0),
1003 },
1004 None => Ok(0),
1005 }
1006 }
1007
1008 pub fn delete_consolidated_raw(&self) -> Result<usize> {
1009 let ids: Vec<Uuid> = {
1010 let conn = self.conn()?;
1011 let result = conn.query(
1012 "MATCH (c:ConsolidationLog) WHERE c.memory_id <> c.distilled_id RETURN c.memory_id;",
1013 )?;
1014 let mut acc = Vec::new();
1015 for row in result {
1016 if let Value::String(id) = &row[0]
1017 && let Ok(uuid) = Uuid::parse_str(id)
1018 {
1019 acc.push(uuid);
1020 }
1021 }
1022 acc
1023 };
1024 let count = ids.len();
1025 let machine_id = self.machine_id.clone();
1026 for id in &ids {
1027 let id_str = id.to_string();
1028 let mid = machine_id.clone();
1029 self.with_transaction(|conn| {
1030 conn.query(&format!(
1031 "MATCH (m:Memory {{id: '{id_str}'}}) DETACH DELETE m;"
1032 ))?;
1033 self.upsert_tombstone(conn, &id_str, "memory", &mid)?;
1034 self.append_sync_log(
1035 conn,
1036 SyncOp::Delete,
1037 SyncNodeType::Memory,
1038 &id_str,
1039 None,
1040 &mid,
1041 None,
1042 )
1043 })?;
1044 }
1045 if count > 0 {
1046 self.clear_relations_cache();
1047 }
1048 Ok(count)
1049 }
1050
1051 pub fn rebuild_vector_index(&self) -> Result<()> {
1052 let conn = self.conn()?;
1053 conn.query("CALL DROP_VECTOR_INDEX('Memory', 'memory_emb_idx');")
1054 .ok();
1055 conn.query(
1056 "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
1057 )?;
1058 Ok(())
1059 }
1060
1061 pub fn clear_ingest_log(&self) -> Result<usize> {
1062 let conn = self.conn()?;
1063 let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
1064 let count = match result.next() {
1065 Some(row) => match &row[0] {
1066 Value::Int64(n) => *n as usize,
1067 _ => 0,
1068 },
1069 None => 0,
1070 };
1071 conn.query("MATCH (l:IngestLog) DETACH DELETE l;")?;
1072 Ok(count)
1073 }
1074
1075 pub fn delete_memories_by_source(&self, source: &str) -> Result<usize> {
1076 let conn = self.conn()?;
1077 let escaped = escape_cypher(source);
1078 let mut result = conn.query(&format!(
1079 "MATCH (m:Memory) WHERE m.source = '{}' RETURN count(m);",
1080 escaped
1081 ))?;
1082 let count = match result.next() {
1083 Some(row) => match &row[0] {
1084 Value::Int64(n) => *n as usize,
1085 _ => 0,
1086 },
1087 None => 0,
1088 };
1089 conn.query(&format!(
1090 "MATCH (m:Memory) WHERE m.source = '{}' DETACH DELETE m;",
1091 escaped
1092 ))?;
1093 if count > 0 {
1094 self.clear_relations_cache();
1095 }
1096 Ok(count)
1097 }
1098
1099 pub fn is_file_ingested(&self, file_path: &str) -> Result<bool> {
1100 let conn = self.conn()?;
1101 let escaped = escape_cypher(file_path);
1102 let mut result = conn.query(&format!(
1103 "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_path;",
1104 escaped
1105 ))?;
1106 Ok(result.next().is_some())
1107 }
1108
1109 pub fn is_file_changed(&self, file_path: &str, file_hash: &str) -> Result<bool> {
1110 let conn = self.conn()?;
1111 let escaped = escape_cypher(file_path);
1112 let mut result = conn.query(&format!(
1113 "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_hash;",
1114 escaped
1115 ))?;
1116 match result.next() {
1117 Some(row) => {
1118 let stored_hash = value_to_string(&row[0]);
1119 Ok(stored_hash != file_hash)
1120 }
1121 None => Ok(true),
1122 }
1123 }
1124
1125 pub fn mark_ingested(
1126 &self,
1127 file_path: &str,
1128 file_hash: &str,
1129 memory_count: usize,
1130 source: &str,
1131 ) -> Result<()> {
1132 let conn = self.conn()?;
1133 let path_escaped = escape_cypher(file_path);
1134 let hash_escaped = escape_cypher(file_hash);
1135 let source_escaped = escape_cypher(source);
1136 let now = chrono::Utc::now().to_rfc3339();
1137
1138 conn.query(&format!(
1139 "MERGE (l:IngestLog {{file_path: '{}'}})
1140 SET l.file_hash = '{}', l.ingested_at = '{}', l.memory_count = {}, l.source = '{}';",
1141 path_escaped, hash_escaped, now, memory_count as i64, source_escaped
1142 ))?;
1143 Ok(())
1144 }
1145
1146 pub fn ingested_file_count(&self) -> Result<usize> {
1147 let conn = self.conn()?;
1148 let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
1149 match result.next() {
1150 Some(row) => match &row[0] {
1151 Value::Int64(n) => Ok(*n as usize),
1152 _ => Ok(0),
1153 },
1154 None => Ok(0),
1155 }
1156 }
1157}
1158
1159impl KuzuStore {
1160 fn create_memory_node(&self, conn: &Connection<'_>, memory: &Memory) -> Result<()> {
1161 let id = memory.id.to_string();
1162 let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
1163 let created_at = memory.created_at.to_rfc3339();
1164 let last_accessed = memory.last_accessed.to_rfc3339();
1165 let embedding_str = format_embedding(&memory.embedding);
1166 let content_escaped = escape_cypher(&memory.content);
1167 let source_escaped = escape_cypher(&memory.source);
1168 let source_id_escaped = escape_cypher(&memory.source_id);
1169 let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
1170 let machine_id_escaped = escape_cypher(&memory.machine_id);
1171 let updated_at = memory.updated_at.to_rfc3339();
1172
1173 let query = format!(
1174 "CREATE (:Memory {{
1175 id: '{id}',
1176 content: '{content_escaped}',
1177 embedding: {embedding_str},
1178 memory_type: '{memory_type}',
1179 confidence: {confidence},
1180 created_at: '{created_at}',
1181 last_accessed: '{last_accessed}',
1182 access_count: {access_count},
1183 source: '{source_escaped}',
1184 source_id: '{source_id_escaped}',
1185 project_path: '{project_path_escaped}',
1186 machine_id: '{machine_id_escaped}',
1187 updated_at: '{updated_at}'
1188 }});",
1189 confidence = memory.confidence,
1190 access_count = memory.access_count,
1191 );
1192
1193 conn.query(&query)?;
1194 Ok(())
1195 }
1196
1197 fn create_entity_node(&self, conn: &Connection<'_>, entity: &Entity) -> Result<()> {
1198 let id = entity.id.to_string();
1199 let embedding_str = format_embedding(&entity.embedding);
1200 let aliases_str = format_string_array(&entity.aliases);
1201 let name_escaped = escape_cypher(&entity.name);
1202 let etype_escaped = escape_cypher(&entity.entity_type);
1203
1204 let query = format!(
1205 "CREATE (:Entity {{
1206 id: '{id}',
1207 name: '{name_escaped}',
1208 entity_type: '{etype_escaped}',
1209 embedding: {embedding_str},
1210 aliases: {aliases_str}
1211 }});"
1212 );
1213
1214 conn.query(&query)?;
1215 Ok(())
1216 }
1217
1218 fn create_conversation_node(
1219 &self,
1220 conn: &Connection<'_>,
1221 conversation: &Conversation,
1222 ) -> Result<()> {
1223 let id = conversation.id.to_string();
1224 let started_at = conversation.started_at.to_rfc3339();
1225 let source_escaped = escape_cypher(&conversation.source);
1226 let machine_escaped = escape_cypher(&conversation.machine_id);
1227 let project_escaped = escape_cypher(conversation.project_path.as_deref().unwrap_or(""));
1228
1229 let query = format!(
1230 "CREATE (:Conversation {{
1231 id: '{id}',
1232 source: '{source_escaped}',
1233 machine_id: '{machine_escaped}',
1234 started_at: '{started_at}',
1235 project_path: '{project_escaped}'
1236 }});"
1237 );
1238
1239 conn.query(&query)?;
1240 Ok(())
1241 }
1242
1243 fn create_relation_edge(&self, conn: &Connection<'_>, relation: &Relation) -> Result<()> {
1244 let from_id = relation.from_id.to_string();
1245 let to_id = relation.to_id.to_string();
1246
1247 let query = match relation.relation_type {
1248 RelationType::RelatesTo => format!(
1249 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:RELATES_TO {{strength: {}, context: '{}'}}]->(b);",
1250 from_id,
1251 to_id,
1252 relation.strength,
1253 relation.context.as_deref().unwrap_or("")
1254 ),
1255 RelationType::Mentions => format!(
1256 "MATCH (a:Memory {{id: '{}'}}), (b:Entity {{id: '{}'}}) CREATE (a)-[:MENTIONS {{position: 0}}]->(b);",
1257 from_id, to_id
1258 ),
1259 RelationType::DerivedFrom => format!(
1260 "MATCH (a:Memory {{id: '{}'}}), (b:Conversation {{id: '{}'}}) CREATE (a)-[:DERIVED_FROM {{transformation: '{}'}}]->(b);",
1261 from_id,
1262 to_id,
1263 relation.context.as_deref().unwrap_or("direct")
1264 ),
1265 RelationType::Contradicts => format!(
1266 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:CONTRADICTS {{resolution: '{}'}}]->(b);",
1267 from_id,
1268 to_id,
1269 relation.context.as_deref().unwrap_or("")
1270 ),
1271 RelationType::Reinforces => format!(
1272 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:REINFORCES]->(b);",
1273 from_id, to_id
1274 ),
1275 RelationType::DistilledFrom => format!(
1276 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:DISTILLED_FROM {{model: '{}'}}]->(b);",
1277 from_id,
1278 to_id,
1279 relation.context.as_deref().unwrap_or("")
1280 ),
1281 RelationType::Supersedes => format!(
1282 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:SUPERSEDES {{reason: '{}'}}]->(b);",
1283 from_id,
1284 to_id,
1285 relation.context.as_deref().unwrap_or("")
1286 ),
1287 };
1288
1289 conn.query(&query)?;
1290 Ok(())
1291 }
1292}
1293
1294impl KuzuStore {
1295 pub fn get_conversation(&self, id: Uuid) -> Result<Option<Conversation>> {
1296 let conn = self.conn()?;
1297 let mut result = conn.query(&format!(
1298 "MATCH (c:Conversation {{id: '{}'}}) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1299 id
1300 ))?;
1301
1302 match result.next() {
1303 Some(row) => {
1304 let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1305 let source = value_to_string(&row[1]);
1306 let machine_id = value_to_string(&row[2]);
1307 let started_at_str = value_to_string(&row[3]);
1308 let project_path = {
1309 let s = value_to_string(&row[4]);
1310 if s.is_empty() { None } else { Some(s) }
1311 };
1312 let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1313 .map(|dt| dt.with_timezone(&chrono::Utc))
1314 .unwrap_or_else(|_| chrono::Utc::now());
1315
1316 Ok(Some(Conversation {
1317 id,
1318 source,
1319 machine_id,
1320 started_at,
1321 project_path,
1322 }))
1323 }
1324 None => Ok(None),
1325 }
1326 }
1327
1328 pub fn get_memory_with_embedding(&self, id: Uuid) -> Result<Option<Memory>> {
1329 let conn = self.conn()?;
1330 let query = format!(
1331 "MATCH (m:Memory {{id: '{}'}}) RETURN {}, m.embedding;",
1332 id, memory_return_cols("m")
1333 );
1334
1335 let mut result = conn.query(&query)?;
1336 match result.next() {
1337 Some(row) => {
1338 let mut memory = row_to_memory(&row[..12])?;
1339 memory.embedding = value_to_f32_vec(&row[12]);
1340 Ok(Some(memory))
1341 }
1342 None => Ok(None),
1343 }
1344 }
1345
1346 pub fn sync_log_since(&self, after_seq: u64) -> Result<Vec<SyncEntry>> {
1347 self.sync_log_page(after_seq, None)
1348 }
1349
1350 fn tombstone_exists_on(&self, conn: &Connection<'_>, id: Uuid) -> Result<bool> {
1351 let id_esc = escape_cypher(&id.to_string());
1352 let mut result = conn.query(&format!(
1353 "MATCH (t:Tombstone {{node_id: '{id_esc}'}}) RETURN t.node_id LIMIT 1;"
1354 ))?;
1355 Ok(result.next().is_some())
1356 }
1357
1358 fn normalize_incoming(mem: &mut Memory) {
1359 if mem.updated_at == DateTime::UNIX_EPOCH {
1360 mem.updated_at = mem.created_at;
1361 }
1362 }
1363
1364 pub fn apply_create_memory(
1365 &self,
1366 mem: &Memory,
1367 origin_machine_id: &str,
1368 origin_seq: i64,
1369 ) -> Result<ApplyOutcome> {
1370 let mut mem = mem.clone();
1371 Self::normalize_incoming(&mut mem);
1372 let data = serde_json::to_string(&mem).ok();
1373 let node_id = mem.id.to_string();
1374 self.with_transaction(|conn| {
1375 if self.tombstone_exists_on(conn, mem.id)? {
1376 return Ok(ApplyOutcome::Skipped);
1377 }
1378 if self.get_memory(mem.id)?.is_some() {
1379 return Ok(ApplyOutcome::Skipped);
1380 }
1381 self.create_memory_node(conn, &mem)?;
1382 self.append_sync_log(
1383 conn,
1384 SyncOp::Create,
1385 SyncNodeType::Memory,
1386 &node_id,
1387 data.as_deref(),
1388 origin_machine_id,
1389 Some(origin_seq),
1390 )?;
1391 Ok(ApplyOutcome::Created)
1392 })
1393 }
1394
1395 pub fn with_write_transaction<T>(
1402 &self,
1403 f: impl FnOnce(&Connection<'_>) -> Result<T>,
1404 ) -> Result<T> {
1405 self.with_transaction(f)
1406 }
1407
1408 pub fn apply_create_memory_on_tx(
1409 &self,
1410 conn: &Connection<'_>,
1411 mem: &Memory,
1412 origin_machine_id: &str,
1413 origin_seq: i64,
1414 ) -> Result<ApplyOutcome> {
1415 let mut mem = mem.clone();
1416 Self::normalize_incoming(&mut mem);
1417 let data = serde_json::to_string(&mem).ok();
1418 let node_id = mem.id.to_string();
1419 if self.tombstone_exists_on(conn, mem.id)? {
1420 return Ok(ApplyOutcome::Skipped);
1421 }
1422 if self.get_memory(mem.id)?.is_some() {
1423 return Ok(ApplyOutcome::Skipped);
1424 }
1425 self.create_memory_node(conn, &mem)?;
1426 self.append_sync_log(
1427 conn,
1428 SyncOp::Create,
1429 SyncNodeType::Memory,
1430 &node_id,
1431 data.as_deref(),
1432 origin_machine_id,
1433 Some(origin_seq),
1434 )?;
1435 Ok(ApplyOutcome::Created)
1436 }
1437
1438 pub fn apply_create_conversation_on_tx(
1439 &self,
1440 conn: &Connection<'_>,
1441 conv: &Conversation,
1442 origin_machine_id: &str,
1443 origin_seq: i64,
1444 ) -> Result<ApplyOutcome> {
1445 let data = serde_json::to_string(conv).ok();
1446 let node_id = conv.id.to_string();
1447 if self.get_conversation(conv.id)?.is_some() {
1448 return Ok(ApplyOutcome::Skipped);
1449 }
1450 self.create_conversation_node(conn, conv)?;
1451 self.append_sync_log(
1452 conn,
1453 SyncOp::Create,
1454 SyncNodeType::Conversation,
1455 &node_id,
1456 data.as_deref(),
1457 origin_machine_id,
1458 Some(origin_seq),
1459 )?;
1460 Ok(ApplyOutcome::Created)
1461 }
1462
1463 pub fn apply_create_entity_on_tx(
1464 &self,
1465 conn: &Connection<'_>,
1466 entity: &Entity,
1467 origin_machine_id: &str,
1468 origin_seq: i64,
1469 ) -> Result<ApplyOutcome> {
1470 let data = serde_json::to_string(entity).ok();
1471 let node_id = entity.id.to_string();
1472 if self.get_entity(entity.id)?.is_some() {
1473 return Ok(ApplyOutcome::Skipped);
1474 }
1475 self.create_entity_node(conn, entity)?;
1476 self.append_sync_log(
1477 conn,
1478 SyncOp::Create,
1479 SyncNodeType::Entity,
1480 &node_id,
1481 data.as_deref(),
1482 origin_machine_id,
1483 Some(origin_seq),
1484 )?;
1485 Ok(ApplyOutcome::Created)
1486 }
1487
1488 pub fn apply_create_relation_on_tx(
1489 &self,
1490 conn: &Connection<'_>,
1491 relation: &Relation,
1492 origin_machine_id: &str,
1493 origin_seq: i64,
1494 ) -> Result<ApplyOutcome> {
1495 let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
1496 let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
1497 let data = serde_json::to_string(relation).ok();
1498 match self.create_relation_edge(conn, relation) {
1499 Ok(()) => {
1500 self.append_sync_log(
1501 conn,
1502 SyncOp::Create,
1503 SyncNodeType::Relation,
1504 &node_id,
1505 data.as_deref(),
1506 origin_machine_id,
1507 Some(origin_seq),
1508 )?;
1509 Ok(ApplyOutcome::Created)
1510 }
1511 Err(_) => Ok(ApplyOutcome::Skipped),
1514 }
1515 }
1516
1517 pub fn note_relation_created(&self, from_id: Uuid) {
1518 self.evict_relations_for(from_id);
1519 }
1520
1521 pub fn apply_update_memory(
1522 &self,
1523 incoming: &Memory,
1524 origin_machine_id: &str,
1525 origin_seq: i64,
1526 ) -> Result<(ApplyOutcome, Option<Memory>)> {
1527 let mut incoming = incoming.clone();
1528 Self::normalize_incoming(&mut incoming);
1529 let data = serde_json::to_string(&incoming).ok();
1530 let node_id = incoming.id.to_string();
1531 self.with_transaction(|conn| {
1532 match self.get_memory(incoming.id)? {
1533 None => {
1534 if self.tombstone_exists_on(conn, incoming.id)? {
1535 return Ok((ApplyOutcome::Skipped, None));
1536 }
1537 self.create_memory_node(conn, &incoming)?;
1538 self.append_sync_log(
1539 conn,
1540 SyncOp::Create,
1541 SyncNodeType::Memory,
1542 &node_id,
1543 data.as_deref(),
1544 origin_machine_id,
1545 Some(origin_seq),
1546 )?;
1547 Ok((ApplyOutcome::Created, Some(incoming)))
1548 }
1549 Some(existing) => {
1550 let incoming_wins = if incoming.updated_at != existing.updated_at {
1551 incoming.updated_at > existing.updated_at
1552 } else if incoming.access_count != existing.access_count {
1553 incoming.access_count > existing.access_count
1554 } else {
1555 incoming.machine_id > existing.machine_id
1556 };
1557
1558 if !incoming_wins {
1559 return Ok((ApplyOutcome::Skipped, None));
1560 }
1561
1562 let id = incoming.id.to_string();
1563 let content_escaped = escape_cypher(&incoming.content);
1564 let memory_type = format!("{:?}", incoming.memory_type).to_lowercase();
1565 let created_at = incoming.created_at.to_rfc3339();
1566 let last_accessed = incoming.last_accessed.to_rfc3339();
1567 let updated_at = incoming.updated_at.to_rfc3339();
1568 let source_escaped = escape_cypher(&incoming.source);
1569 let source_id_escaped = escape_cypher(&incoming.source_id);
1570 let project_path_escaped =
1571 escape_cypher(incoming.project_path.as_deref().unwrap_or(""));
1572 let machine_id_escaped = escape_cypher(&incoming.machine_id);
1573 conn.query(&format!(
1576 "MATCH (m:Memory {{id: '{id}'}}) SET \
1577 m.content = '{content_escaped}', \
1578 m.memory_type = '{memory_type}', \
1579 m.confidence = {confidence}, \
1580 m.created_at = '{created_at}', \
1581 m.last_accessed = '{last_accessed}', \
1582 m.access_count = {access_count}, \
1583 m.source = '{source_escaped}', \
1584 m.source_id = '{source_id_escaped}', \
1585 m.project_path = '{project_path_escaped}', \
1586 m.machine_id = '{machine_id_escaped}', \
1587 m.updated_at = '{updated_at}';",
1588 confidence = incoming.confidence,
1589 access_count = incoming.access_count,
1590 ))?;
1591 self.append_sync_log(
1592 conn,
1593 SyncOp::Update,
1594 SyncNodeType::Memory,
1595 &node_id,
1596 data.as_deref(),
1597 origin_machine_id,
1598 Some(origin_seq),
1599 )?;
1600 Ok((ApplyOutcome::Updated, Some(incoming)))
1601 }
1602 }
1603 })
1604 }
1605
1606 pub fn apply_delete_memory(
1607 &self,
1608 node_id: &str,
1609 origin_machine_id: &str,
1610 origin_seq: i64,
1611 ) -> Result<ApplyOutcome> {
1612 let result = self.with_transaction(|conn| {
1613 conn.query(&format!(
1614 "MATCH (m:Memory {{id: '{node_id}'}}) DETACH DELETE m;"
1615 ))?;
1616 self.upsert_tombstone(conn, node_id, "memory", origin_machine_id)?;
1617 self.append_sync_log(
1618 conn,
1619 SyncOp::Delete,
1620 SyncNodeType::Memory,
1621 node_id,
1622 None,
1623 origin_machine_id,
1624 Some(origin_seq),
1625 )?;
1626 Ok(ApplyOutcome::Deleted)
1627 });
1628 self.clear_relations_cache();
1631 result
1632 }
1633
1634 pub fn apply_create_conversation(
1635 &self,
1636 conv: &Conversation,
1637 origin_machine_id: &str,
1638 origin_seq: i64,
1639 ) -> Result<ApplyOutcome> {
1640 let data = serde_json::to_string(conv).ok();
1641 let node_id = conv.id.to_string();
1642 self.with_transaction(|conn| {
1643 if self.get_conversation(conv.id)?.is_some() {
1644 return Ok(ApplyOutcome::Skipped);
1645 }
1646 self.create_conversation_node(conn, conv)?;
1647 self.append_sync_log(
1648 conn,
1649 SyncOp::Create,
1650 SyncNodeType::Conversation,
1651 &node_id,
1652 data.as_deref(),
1653 origin_machine_id,
1654 Some(origin_seq),
1655 )?;
1656 Ok(ApplyOutcome::Created)
1657 })
1658 }
1659
1660 pub fn apply_upsert_conversation(
1661 &self,
1662 conv: &Conversation,
1663 origin_machine_id: &str,
1664 origin_seq: i64,
1665 ) -> Result<ApplyOutcome> {
1666 let data = serde_json::to_string(conv).ok();
1667 let node_id = conv.id.to_string();
1668 let source_escaped = escape_cypher(&conv.source);
1669 let machine_escaped = escape_cypher(&conv.machine_id);
1670 let project_escaped = escape_cypher(conv.project_path.as_deref().unwrap_or(""));
1671 let started_at = conv.started_at.to_rfc3339();
1672 self.with_transaction(|conn| {
1673 conn.query(&format!(
1674 "MERGE (c:Conversation {{id: '{node_id}'}}) SET \
1675 c.source = '{source_escaped}', \
1676 c.machine_id = '{machine_escaped}', \
1677 c.started_at = '{started_at}', \
1678 c.project_path = '{project_escaped}';"
1679 ))?;
1680 self.append_sync_log(
1681 conn,
1682 SyncOp::Update,
1683 SyncNodeType::Conversation,
1684 &node_id,
1685 data.as_deref(),
1686 origin_machine_id,
1687 Some(origin_seq),
1688 )?;
1689 Ok(ApplyOutcome::Updated)
1690 })
1691 }
1692
1693 pub fn apply_delete_conversation(
1694 &self,
1695 node_id: &str,
1696 origin_machine_id: &str,
1697 origin_seq: i64,
1698 ) -> Result<ApplyOutcome> {
1699 let result = self.with_transaction(|conn| {
1700 conn.query(&format!(
1701 "MATCH (c:Conversation {{id: '{node_id}'}}) DETACH DELETE c;"
1702 ))?;
1703 self.append_sync_log(
1704 conn,
1705 SyncOp::Delete,
1706 SyncNodeType::Conversation,
1707 node_id,
1708 None,
1709 origin_machine_id,
1710 Some(origin_seq),
1711 )?;
1712 Ok(ApplyOutcome::Deleted)
1713 });
1714 self.clear_relations_cache();
1718 result
1719 }
1720
1721 pub fn apply_create_entity(
1722 &self,
1723 entity: &Entity,
1724 origin_machine_id: &str,
1725 origin_seq: i64,
1726 ) -> Result<ApplyOutcome> {
1727 let data = serde_json::to_string(entity).ok();
1728 let node_id = entity.id.to_string();
1729 self.with_transaction(|conn| {
1730 if self.get_entity(entity.id)?.is_some() {
1731 return Ok(ApplyOutcome::Skipped);
1732 }
1733 self.create_entity_node(conn, entity)?;
1734 self.append_sync_log(
1735 conn,
1736 SyncOp::Create,
1737 SyncNodeType::Entity,
1738 &node_id,
1739 data.as_deref(),
1740 origin_machine_id,
1741 Some(origin_seq),
1742 )?;
1743 Ok(ApplyOutcome::Created)
1744 })
1745 }
1746
1747 pub fn apply_upsert_entity(
1748 &self,
1749 entity: &Entity,
1750 origin_machine_id: &str,
1751 origin_seq: i64,
1752 ) -> Result<ApplyOutcome> {
1753 let data = serde_json::to_string(entity).ok();
1754 let node_id = entity.id.to_string();
1755 let name_escaped = escape_cypher(&entity.name);
1756 let etype_escaped = escape_cypher(&entity.entity_type);
1757 let embedding_str = format_embedding(&entity.embedding);
1758 let aliases_str = format_string_array(&entity.aliases);
1759 self.with_transaction(|conn| {
1760 conn.query(&format!(
1761 "MERGE (e:Entity {{id: '{node_id}'}}) SET \
1762 e.name = '{name_escaped}', \
1763 e.entity_type = '{etype_escaped}', \
1764 e.embedding = {embedding_str}, \
1765 e.aliases = {aliases_str};"
1766 ))?;
1767 self.append_sync_log(
1768 conn,
1769 SyncOp::Update,
1770 SyncNodeType::Entity,
1771 &node_id,
1772 data.as_deref(),
1773 origin_machine_id,
1774 Some(origin_seq),
1775 )?;
1776 Ok(ApplyOutcome::Updated)
1777 })
1778 }
1779
1780 pub fn apply_delete_entity(
1781 &self,
1782 node_id: &str,
1783 origin_machine_id: &str,
1784 origin_seq: i64,
1785 ) -> Result<ApplyOutcome> {
1786 let result = self.with_transaction(|conn| {
1787 conn.query(&format!(
1788 "MATCH (e:Entity {{id: '{node_id}'}}) DETACH DELETE e;"
1789 ))?;
1790 self.append_sync_log(
1791 conn,
1792 SyncOp::Delete,
1793 SyncNodeType::Entity,
1794 node_id,
1795 None,
1796 origin_machine_id,
1797 Some(origin_seq),
1798 )?;
1799 Ok(ApplyOutcome::Deleted)
1800 });
1801 self.clear_relations_cache();
1805 result
1806 }
1807
1808 pub fn apply_create_relation(
1809 &self,
1810 relation: &Relation,
1811 origin_machine_id: &str,
1812 origin_seq: i64,
1813 ) -> Result<ApplyOutcome> {
1814 let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
1815 let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
1816 let data = serde_json::to_string(relation).ok();
1817 let result = self.with_transaction(|conn| {
1818 match self.create_relation_edge(conn, relation) {
1819 Ok(()) => {
1820 self.append_sync_log(
1821 conn,
1822 SyncOp::Create,
1823 SyncNodeType::Relation,
1824 &node_id,
1825 data.as_deref(),
1826 origin_machine_id,
1827 Some(origin_seq),
1828 )?;
1829 Ok(ApplyOutcome::Created)
1830 }
1831 Err(_) => Ok(ApplyOutcome::Skipped),
1832 }
1833 });
1834 self.evict_relations_for(relation.from_id);
1835 result
1836 }
1837
1838 pub fn apply_delete_relation(
1839 &self,
1840 node_id: &str,
1841 relation: Option<&Relation>,
1842 origin_machine_id: &str,
1843 origin_seq: i64,
1844 ) -> Result<ApplyOutcome> {
1845 let Some(rel) = relation else {
1846 return Ok(ApplyOutcome::Skipped);
1849 };
1850 let evict_id = rel.from_id;
1851 let result = self.with_transaction(|conn| {
1852 let from_id = rel.from_id.to_string();
1853 let to_id = rel.to_id.to_string();
1854 let rel_label = match rel.relation_type {
1855 crate::schema::RelationType::RelatesTo => "RELATES_TO",
1856 crate::schema::RelationType::Mentions => "MENTIONS",
1857 crate::schema::RelationType::DerivedFrom => "DERIVED_FROM",
1858 crate::schema::RelationType::Contradicts => "CONTRADICTS",
1859 crate::schema::RelationType::Reinforces => "REINFORCES",
1860 crate::schema::RelationType::Supersedes => "SUPERSEDES",
1861 crate::schema::RelationType::DistilledFrom => "DISTILLED_FROM",
1862 };
1863 conn.query(&format!(
1864 "MATCH (a {{id: '{from_id}'}})-[r:{rel_label}]->(b {{id: '{to_id}'}}) DELETE r;"
1865 ))
1866 .ok();
1867 self.append_sync_log(
1868 conn,
1869 SyncOp::Delete,
1870 SyncNodeType::Relation,
1871 node_id,
1872 None,
1873 origin_machine_id,
1874 Some(origin_seq),
1875 )?;
1876 Ok(ApplyOutcome::Deleted)
1877 });
1878 self.evict_relations_for(evict_id);
1879 result
1880 }
1881
1882 pub fn sync_log_page(&self, after_seq: u64, limit: Option<usize>) -> Result<Vec<SyncEntry>> {
1883 let conn = self.conn()?;
1884 let limit_clause = match limit {
1885 Some(n) => format!(" LIMIT {}", n),
1886 None => String::new(),
1887 };
1888 let query = format!(
1889 "MATCH (s:SyncLog) WHERE s.local_seq > {} \
1890 RETURN s.id, s.local_seq, s.origin_machine_id, s.origin_seq, s.op, s.node_type, s.node_id, s.timestamp, s.data \
1891 ORDER BY s.local_seq{};",
1892 after_seq, limit_clause
1893 );
1894
1895 let mut result = conn.query(&query)?;
1896 let mut entries = Vec::new();
1897
1898 for row in &mut result {
1899 let id = value_to_string(&row[0]);
1900 let local_seq = match &row[1] {
1901 Value::Int64(n) => *n,
1902 _ => 0,
1903 };
1904 let origin_machine_id = value_to_string(&row[2]);
1905 let origin_seq = match &row[3] {
1906 Value::Int64(n) => *n,
1907 _ => 0,
1908 };
1909 let op_str = value_to_string(&row[4]);
1910 let node_type_str = value_to_string(&row[5]);
1911 let node_id = value_to_string(&row[6]);
1912 let timestamp_str = value_to_string(&row[7]);
1913 let data_str = value_to_string(&row[8]);
1914
1915 let op = match op_str.as_str() {
1916 "create" => SyncOp::Create,
1917 "update" => SyncOp::Update,
1918 "delete" => SyncOp::Delete,
1919 _ => continue,
1920 };
1921 let node_type = match node_type_str.as_str() {
1922 "memory" => SyncNodeType::Memory,
1923 "entity" => SyncNodeType::Entity,
1924 "conversation" => SyncNodeType::Conversation,
1925 "relation" => SyncNodeType::Relation,
1926 _ => continue,
1927 };
1928 let timestamp = chrono::DateTime::parse_from_rfc3339(×tamp_str)
1929 .map(|dt| dt.with_timezone(&chrono::Utc))
1930 .unwrap_or_else(|_| chrono::Utc::now());
1931
1932 let data = if data_str.is_empty() {
1933 None
1934 } else {
1935 Some(data_str)
1936 };
1937
1938 entries.push(SyncEntry {
1939 id,
1940 local_seq,
1941 origin_machine_id,
1942 origin_seq,
1943 op,
1944 node_type,
1945 node_id,
1946 timestamp,
1947 data,
1948 });
1949 }
1950
1951 Ok(entries)
1952 }
1953
1954 pub fn get_sync_state(&self, peer_id: &str) -> Result<Option<SyncState>> {
1955 let conn = self.conn()?;
1956 let escaped = escape_cypher(peer_id);
1957 let mut result = conn.query(&format!(
1958 "MATCH (s:SyncState {{peer_id: '{}'}}) RETURN s.peer_id, s.last_seq, s.last_sync_at;",
1959 escaped
1960 ))?;
1961
1962 match result.next() {
1963 Some(row) => {
1964 let peer_id = value_to_string(&row[0]);
1965 let last_seq = match &row[1] {
1966 Value::Int64(n) => *n as u64,
1967 _ => 0,
1968 };
1969 let last_sync_at_str = value_to_string(&row[2]);
1970 let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1971 .map(|dt| dt.with_timezone(&chrono::Utc))
1972 .unwrap_or_else(|_| chrono::Utc::now());
1973
1974 Ok(Some(SyncState {
1975 peer_id,
1976 last_seq,
1977 last_sync_at,
1978 }))
1979 }
1980 None => Ok(None),
1981 }
1982 }
1983
1984 pub fn set_sync_state(&self, state: &SyncState) -> Result<()> {
1985 let conn = self.conn()?;
1986 let peer_escaped = escape_cypher(&state.peer_id);
1987 let now = state.last_sync_at.to_rfc3339();
1988
1989 conn.query(&format!(
1990 "MERGE (s:SyncState {{peer_id: '{}'}}) SET s.last_seq = {}, s.last_sync_at = '{}';",
1991 peer_escaped, state.last_seq, now
1992 ))?;
1993 Ok(())
1994 }
1995
1996 pub fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
1997 let conn = self.conn()?;
1998 let mut result =
1999 conn.query("MATCH (s:SyncState) RETURN s.peer_id, s.last_seq, s.last_sync_at;")?;
2000 let mut states = Vec::new();
2001
2002 for row in &mut result {
2003 let peer_id = value_to_string(&row[0]);
2004 let last_seq = match &row[1] {
2005 Value::Int64(n) => *n as u64,
2006 _ => 0,
2007 };
2008 let last_sync_at_str = value_to_string(&row[2]);
2009 let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
2010 .map(|dt| dt.with_timezone(&chrono::Utc))
2011 .unwrap_or_else(|_| chrono::Utc::now());
2012
2013 states.push(SyncState {
2014 peer_id,
2015 last_seq,
2016 last_sync_at,
2017 });
2018 }
2019
2020 Ok(states)
2021 }
2022
2023 pub fn all_entities(&self) -> Result<Vec<Entity>> {
2024 let conn = self.conn()?;
2025 let mut result = conn.query("MATCH (e:Entity) RETURN e.id, e.name, e.entity_type;")?;
2026 let mut entities = Vec::new();
2027 for row in &mut result {
2028 entities.push(row_to_entity(&row)?);
2029 }
2030 Ok(entities)
2031 }
2032
2033 pub fn memories_for_entity(&self, entity_id: Uuid) -> Result<Vec<Memory>> {
2034 let conn = self.conn()?;
2035 let query = format!(
2036 "MATCH (m:Memory)-[:MENTIONS]->(e:Entity {{id: '{}'}}) RETURN {};",
2037 entity_id, memory_return_cols("m")
2038 );
2039 let mut result = conn.query(&query)?;
2040 let mut memories = Vec::new();
2041 for row in &mut result {
2042 memories.push(row_to_memory(&row)?);
2043 }
2044 Ok(memories)
2045 }
2046
2047 pub fn unassociated_memories(&self) -> Result<Vec<Memory>> {
2048 let conn = self.conn()?;
2049 let mut result = conn.query(
2050 &format!("MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (m)-[:MENTIONS]->(:Entity)}} RETURN {};", memory_return_cols("m"))
2051 )?;
2052 let mut memories = Vec::new();
2053 for row in &mut result {
2054 memories.push(row_to_memory(&row)?);
2055 }
2056 Ok(memories)
2057 }
2058
2059 pub fn related_entity_names(&self, entity_id: Uuid) -> Result<Vec<String>> {
2060 let conn = self.conn()?;
2061 let query = format!(
2062 "MATCH (m:Memory)-[:MENTIONS]->(e1:Entity {{id: '{}'}}), (m)-[:MENTIONS]->(e2:Entity) WHERE e2.id <> '{}' RETURN DISTINCT e2.name;",
2063 entity_id, entity_id
2064 );
2065 let mut result = conn.query(&query)?;
2066 let mut names = Vec::new();
2067 for row in &mut result {
2068 names.push(value_to_string(&row[0]));
2069 }
2070 Ok(names)
2071 }
2072
2073 pub fn max_sync_seq(&self) -> Result<u64> {
2074 let conn = self.conn()?;
2075 let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.local_seq);")?;
2076 match result.next() {
2077 Some(row) => match &row[0] {
2078 Value::Int64(n) => Ok(*n as u64),
2079 _ => Ok(0),
2080 },
2081 None => Ok(0),
2082 }
2083 }
2084
2085 pub fn backfill_project_paths(&self) -> Result<u64> {
2086 let conn = self.conn()?;
2087 let mut result = conn.query(
2088 "MATCH (m:Memory)-[:DERIVED_FROM]->(c:Conversation) WHERE m.project_path = '' AND c.project_path IS NOT NULL AND c.project_path <> '' SET m.project_path = c.project_path RETURN count(m);"
2089 )?;
2090 match result.next() {
2091 Some(row) => match &row[0] {
2092 Value::Int64(n) => Ok(*n as u64),
2093 _ => Ok(0),
2094 },
2095 None => Ok(0),
2096 }
2097 }
2098
2099 pub fn memories_by_project_path(&self, project_path: &str) -> Result<Vec<Memory>> {
2100 let conn = self.conn()?;
2101 let escaped = escape_cypher(project_path);
2102 let cols = memory_return_cols("m");
2103 let query = format!(
2104 "MATCH (m:Memory) WHERE m.project_path = '{}' RETURN {} LIMIT 20;",
2105 escaped, cols
2106 );
2107 let mut result = conn.query(&query)?;
2108 let mut memories = Vec::new();
2109 for row in &mut result {
2110 memories.push(row_to_memory(&row)?);
2111 }
2112 Ok(memories)
2113 }
2114
2115 pub fn all_memories_with_embeddings(&self) -> Result<Vec<Memory>> {
2116 let conn = self.conn()?;
2117 let mut result = conn.query(&format!(
2118 "MATCH (m:Memory) RETURN {}, m.embedding;",
2119 memory_return_cols("m")
2120 ))?;
2121 let mut acc = Vec::new();
2122 for row in &mut result {
2123 let mut memory = row_to_memory(&row[..12])?;
2126 memory.embedding = value_to_f32_vec(&row[12]);
2127 acc.push(memory);
2128 }
2129 Ok(acc)
2130 }
2131
2132 pub fn backfill_sync_log(&self) -> Result<u64> {
2133 let mut count = 0u64;
2134
2135 let memories = self.all_memories_with_embeddings()?;
2136
2137 for memory in &memories {
2138 let origin_mid = memory.machine_id.clone();
2139 let data = serde_json::to_string(memory).ok();
2140 let node_id = memory.id.to_string();
2141 self.with_transaction(|conn| {
2142 self.append_sync_log(
2143 conn,
2144 SyncOp::Create,
2145 SyncNodeType::Memory,
2146 &node_id,
2147 data.as_deref(),
2148 &origin_mid,
2149 None,
2150 )
2151 })?;
2152 count += 1;
2153 }
2154
2155 let conversations: Vec<Conversation> = {
2156 let conn = self.conn()?;
2157 let mut result = conn.query(
2158 "MATCH (c:Conversation) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
2159 )?;
2160 let mut acc = Vec::new();
2161 for row in &mut result {
2162 let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
2163 let source = value_to_string(&row[1]);
2164 let machine_id = value_to_string(&row[2]);
2165 let started_at_str = value_to_string(&row[3]);
2166 let project_path = {
2167 let s = value_to_string(&row[4]);
2168 if s.is_empty() { None } else { Some(s) }
2169 };
2170 let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
2171 .map(|dt| dt.with_timezone(&chrono::Utc))
2172 .unwrap_or_else(|_| chrono::Utc::now());
2173 acc.push(Conversation { id, source, machine_id, started_at, project_path });
2174 }
2175 acc
2176 };
2177
2178 for conv in &conversations {
2179 let origin_mid = conv.machine_id.clone();
2180 let data = serde_json::to_string(conv).ok();
2181 let node_id = conv.id.to_string();
2182 self.with_transaction(|conn| {
2183 self.append_sync_log(
2184 conn,
2185 SyncOp::Create,
2186 SyncNodeType::Conversation,
2187 &node_id,
2188 data.as_deref(),
2189 &origin_mid,
2190 None,
2191 )
2192 })?;
2193 count += 1;
2194 }
2195
2196 Ok(count)
2197 }
2198}
2199
2200impl KuzuStore {
2201 pub fn upsert_machine(&self, id: &str, name: &str) -> Result<()> {
2202 let conn = self.conn()?;
2203 let id_escaped = escape_cypher(id);
2204 let name_escaped = escape_cypher(name);
2205 conn.query(&format!(
2206 "MERGE (m:Machine {{id: '{id_escaped}'}}) SET m.name = '{name_escaped}';"
2207 ))?;
2208 Ok(())
2209 }
2210
2211 pub fn get_machine_name(&self, id: &str) -> Result<Option<String>> {
2212 let conn = self.conn()?;
2213 let id_escaped = escape_cypher(id);
2214 let mut result = conn.query(&format!(
2215 "MATCH (m:Machine {{id: '{id_escaped}'}}) RETURN m.name;"
2216 ))?;
2217 match result.next() {
2218 Some(row) => Ok(Some(value_to_string(&row[0]))),
2219 None => Ok(None),
2220 }
2221 }
2222
2223 pub fn get_all_machines(&self) -> Result<std::collections::HashMap<String, String>> {
2224 let conn = self.conn()?;
2225 let mut result = conn.query("MATCH (m:Machine) RETURN m.id, m.name;")?;
2226 let mut map = std::collections::HashMap::new();
2227 for row in &mut result {
2228 map.insert(value_to_string(&row[0]), value_to_string(&row[1]));
2229 }
2230 Ok(map)
2231 }
2232
2233 pub fn backfill_machine_id(&self, machine_id: &str) -> Result<u64> {
2234 let conn = self.conn()?;
2235 let escaped = escape_cypher(machine_id);
2236 let mut result = conn.query(&format!(
2237 "MATCH (m:Memory) WHERE m.machine_id = '' SET m.machine_id = '{escaped}' RETURN count(m);"
2238 ))?;
2239 match result.next() {
2240 Some(row) => match &row[0] {
2241 Value::Int64(n) => Ok(*n as u64),
2242 _ => Ok(0),
2243 },
2244 None => Ok(0),
2245 }
2246 }
2247
2248 pub fn register_machine(&self, identity: &crate::machine::MachineIdentity) -> Result<()> {
2249 self.upsert_machine(&identity.id, &identity.name)?;
2250 let count = self.backfill_machine_id(&identity.id)?;
2251 if count > 0 {
2252 tracing::info!("backfilled machine_id on {count} existing memories");
2253 }
2254 Ok(())
2255 }
2256}
2257
2258fn format_embedding(embedding: &[f32]) -> String {
2259 if embedding.is_empty() {
2260 let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
2261 return format!("[{}]", zeros.join(","));
2262 }
2263 let parts: Vec<String> = embedding.iter().map(|v| format!("{}", v)).collect();
2264 format!("[{}]", parts.join(","))
2265}
2266
2267fn escape_cypher(s: &str) -> String {
2268 s.replace('\\', "\\\\").replace('\'', "\\'")
2269}
2270
2271fn format_string_array(items: &[String]) -> String {
2272 let parts: Vec<String> = items
2273 .iter()
2274 .map(|s| format!("'{}'", s.replace('\'', "''")))
2275 .collect();
2276 format!("[{}]", parts.join(","))
2277}
2278
2279fn value_to_string(val: &Value) -> String {
2280 match val {
2281 Value::String(s) => s.clone(),
2282 other => format!("{:?}", other),
2283 }
2284}
2285
2286fn value_to_f32(val: &Value) -> f32 {
2287 match val {
2288 Value::Float(f) => *f,
2289 Value::Double(d) => *d as f32,
2290 Value::Int64(i) => *i as f32,
2291 _ => 0.0,
2292 }
2293}
2294
2295fn value_to_f32_vec(val: &Value) -> Vec<f32> {
2296 match val {
2297 Value::Array(_, items) | Value::List(_, items) => items.iter().map(value_to_f32).collect(),
2298 _ => Vec::new(),
2299 }
2300}
2301
2302fn row_to_memory(row: &[Value]) -> Result<Memory> {
2303 let id_str = value_to_string(&row[0]);
2304 let id = Uuid::parse_str(&id_str).unwrap_or_default();
2305 let content = value_to_string(&row[1]);
2306 let memory_type_str = value_to_string(&row[2]);
2307 let confidence = value_to_f32(&row[3]);
2308 let created_at_str = value_to_string(&row[4]);
2309 let last_accessed_str = value_to_string(&row[5]);
2310 let access_count = match &row[6] {
2311 Value::Int64(i) => *i as u32,
2312 _ => 0,
2313 };
2314 let source = value_to_string(&row[7]);
2315 let source_id = value_to_string(&row[8]);
2316 let project_path = project_path_from_db(&value_to_string(&row[9]));
2317 let machine_id = value_to_string(&row[10]);
2318 let updated_at_str = value_to_string(&row[11]);
2319
2320 let memory_type = match memory_type_str.as_str() {
2321 "episodic" => MemoryType::Episodic,
2322 "procedural" => MemoryType::Procedural,
2323 "decision" => MemoryType::Decision,
2324 "architecture" => MemoryType::Architecture,
2325 "debugging" => MemoryType::Debugging,
2326 "task" => MemoryType::Task,
2327 "question" => MemoryType::Question,
2328 _ => MemoryType::Semantic,
2329 };
2330
2331 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
2332 .map(|dt| dt.with_timezone(&chrono::Utc))
2333 .unwrap_or_else(|_| chrono::Utc::now());
2334
2335 let last_accessed = chrono::DateTime::parse_from_rfc3339(&last_accessed_str)
2336 .map(|dt| dt.with_timezone(&chrono::Utc))
2337 .unwrap_or_else(|_| chrono::Utc::now());
2338
2339 let updated_at = if updated_at_str.is_empty() {
2340 created_at
2341 } else {
2342 chrono::DateTime::parse_from_rfc3339(&updated_at_str)
2343 .map(|dt| dt.with_timezone(&chrono::Utc))
2344 .unwrap_or(created_at)
2345 };
2346
2347 Ok(Memory {
2348 id,
2349 content,
2350 embedding: Vec::new(),
2351 memory_type,
2352 confidence,
2353 created_at,
2354 last_accessed,
2355 access_count,
2356 source,
2357 source_id,
2358 project_path,
2359 machine_id,
2360 updated_at,
2361 })
2362}
2363
2364fn row_to_entity(row: &[Value]) -> Result<Entity> {
2365 let id_str = value_to_string(&row[0]);
2366 let id = Uuid::parse_str(&id_str).unwrap_or_default();
2367 let name = value_to_string(&row[1]);
2368 let entity_type = value_to_string(&row[2]);
2369
2370 Ok(Entity {
2371 id,
2372 name,
2373 entity_type,
2374 embedding: Vec::new(),
2375 aliases: Vec::new(),
2376 })
2377}
2378
2379unsafe impl Send for KuzuStore {}
2380unsafe impl Sync for KuzuStore {}
2381
2382#[cfg(test)]
2383mod tests {
2384 use super::*;
2385
2386 #[test]
2387 fn migrate_sync_log_recreates_old_schema_and_resets_cursors() {
2388 let store = KuzuStore::in_memory("test-machine-migrate".to_string()).unwrap();
2389 let conn = store.conn().unwrap();
2390
2391 conn.query("DROP TABLE SyncLog;").unwrap();
2394 conn.query(
2395 "CREATE NODE TABLE SyncLog(\
2396 seq INT64 PRIMARY KEY, \
2397 op STRING, \
2398 node_type STRING, \
2399 node_id STRING, \
2400 machine_id STRING, \
2401 timestamp STRING, \
2402 data STRING\
2403 );",
2404 )
2405 .unwrap();
2406 conn.query(
2407 "CREATE (:SyncLog {seq: 1, op: 'create', node_type: 'memory', \
2408 node_id: 'abc', machine_id: 'old-machine', \
2409 timestamp: '2024-01-01T00:00:00Z', data: ''});",
2410 )
2411 .unwrap();
2412
2413 conn.query(
2415 "MERGE (s:SyncState {peer_id: 'peer-x'}) \
2416 SET s.last_seq = 5, s.last_sync_at = '2024-01-01T00:00:00Z';",
2417 )
2418 .unwrap();
2419
2420 store.migrate_sync_log(&conn).unwrap();
2421
2422 assert!(
2424 conn.query("MATCH (s:SyncLog) RETURN s.id LIMIT 1;").is_ok(),
2425 "new-schema column s.id must be present after migration"
2426 );
2427
2428 let mut r = conn
2430 .query(
2431 "MATCH (s:SyncState {peer_id: 'peer-x'}) RETURN s.last_seq;",
2432 )
2433 .unwrap();
2434 let last_seq = match r.next() {
2435 Some(row) => match &row[0] {
2436 Value::Int64(n) => *n,
2437 _ => panic!("unexpected value type for last_seq"),
2438 },
2439 None => panic!("SyncState row not found after migration"),
2440 };
2441 assert_eq!(last_seq, 0, "last_seq must be reset to 0 after migration");
2442 }
2443
2444 #[test]
2445 fn rollback_leaves_no_partial_state() {
2446 let store = KuzuStore::in_memory("test-machine-rollback".to_string()).unwrap();
2447 let embedding = std::iter::repeat("0.0")
2448 .take(384)
2449 .collect::<Vec<_>>()
2450 .join(", ");
2451
2452 let result: Result<()> = store.with_transaction(|conn| {
2453 conn.query(&format!(
2454 "CREATE (:Memory {{id: 'rollback-test-id', content: 'x', \
2455 embedding: [{embedding}], memory_type: 'semantic', confidence: 1.0, \
2456 created_at: '2024-01-01T00:00:00Z', last_accessed: '2024-01-01T00:00:00Z', \
2457 access_count: 0, source: 'test', source_id: '', project_path: '', \
2458 machine_id: 'x', updated_at: '2024-01-01T00:00:00Z'}});"
2459 ))?;
2460 Err(anyhow::anyhow!("deliberate rollback"))
2461 });
2462
2463 assert!(
2464 result.is_err(),
2465 "with_transaction must propagate the closure error"
2466 );
2467
2468 let entries = store.sync_log_since(0).unwrap();
2469 assert_eq!(entries.len(), 0, "no SyncLog rows may survive a rollback");
2470 assert_eq!(
2471 store.memory_count().unwrap(),
2472 0,
2473 "the Memory node created inside the transaction must be rolled back"
2474 );
2475 }
2476
2477 fn memory_with_embedding(content: &str, embedding: Vec<f32>) -> Memory {
2478 let mut m = Memory::new(
2479 content.to_string(),
2480 MemoryType::Semantic,
2481 "test".to_string(),
2482 String::new(),
2483 );
2484 m.embedding = embedding;
2485 m
2486 }
2487
2488 #[test]
2489 fn recall_reflects_new_relation_after_cache_invalidation() {
2490 use crate::query::{QueryEngine, QueryFilters, QueryRequest};
2491
2492 let store = KuzuStore::in_memory("test-machine-invalidate".to_string()).unwrap();
2493
2494 let mut emb_a = vec![0.0_f32; 384];
2495 emb_a[0] = 1.0;
2496 let mut emb_b = vec![0.0_f32; 384];
2497 emb_b[1] = 1.0;
2498 let target = memory_with_embedding("kuzu is the embedded graph store", emb_a.clone());
2499 let neighbor = memory_with_embedding("an unrelated note about sync", emb_b);
2500 store.store_memory(&target).unwrap();
2501 store.store_memory(&neighbor).unwrap();
2502
2503 let request = QueryRequest {
2504 text: "graph store".to_string(),
2505 embedding: emb_a,
2506 limit: 10,
2507 filters: QueryFilters::default(),
2508 };
2509
2510 let before = QueryEngine::new(&store).recall(&request).unwrap();
2511 let before_score = before
2512 .iter()
2513 .find(|r| r.memory.id == target.id)
2514 .expect("target must be recalled")
2515 .score;
2516
2517 let rel = Relation {
2521 from_id: target.id,
2522 to_id: neighbor.id,
2523 relation_type: RelationType::Reinforces,
2524 strength: 1.0,
2525 context: None,
2526 };
2527 store.store_relation(&rel).unwrap();
2528
2529 let after = QueryEngine::new(&store).recall(&request).unwrap();
2530 let after_score = after
2531 .iter()
2532 .find(|r| r.memory.id == target.id)
2533 .expect("target must still be recalled")
2534 .score;
2535
2536 assert!(
2537 after_score > before_score,
2538 "recall must reflect the new relation: before={before_score} after={after_score}"
2539 );
2540 }
2541
2542 #[test]
2543 fn get_relations_cached_matches_fresh_uncached_store() {
2544 let machine = "test-machine-equivalence".to_string();
2545 let warm = KuzuStore::in_memory(machine.clone()).unwrap();
2546
2547 let a = memory_with_embedding("memory a", {
2548 let mut e = vec![0.0_f32; 384];
2549 e[0] = 1.0;
2550 e
2551 });
2552 let b = memory_with_embedding("memory b", {
2553 let mut e = vec![0.0_f32; 384];
2554 e[1] = 1.0;
2555 e
2556 });
2557 let c = memory_with_embedding("memory c", {
2558 let mut e = vec![0.0_f32; 384];
2559 e[2] = 1.0;
2560 e
2561 });
2562 for m in [&a, &b, &c] {
2563 warm.store_memory(m).unwrap();
2564 }
2565 let rels = [
2566 Relation {
2567 from_id: a.id,
2568 to_id: b.id,
2569 relation_type: RelationType::RelatesTo,
2570 strength: 0.7,
2571 context: None,
2572 },
2573 Relation {
2574 from_id: a.id,
2575 to_id: c.id,
2576 relation_type: RelationType::Reinforces,
2577 strength: 1.0,
2578 context: None,
2579 },
2580 Relation {
2581 from_id: b.id,
2582 to_id: c.id,
2583 relation_type: RelationType::Supersedes,
2584 strength: 1.0,
2585 context: None,
2586 },
2587 ];
2588 for r in &rels {
2589 warm.store_relation(r).unwrap();
2590 }
2591
2592 let types = [
2596 None,
2597 Some(RelationType::RelatesTo),
2598 Some(RelationType::Reinforces),
2599 Some(RelationType::Supersedes),
2600 Some(RelationType::Mentions),
2601 ];
2602 for id in [a.id, b.id, c.id] {
2603 for rt in types {
2604 let first = warm.get_relations(id, rt).unwrap();
2605 let second = warm.get_relations(id, rt).unwrap();
2606 assert_eq!(
2607 first.len(),
2608 second.len(),
2609 "cache hit must match the live read for ({id}, {rt:?})"
2610 );
2611 let mut a_ids: Vec<Uuid> = first.iter().map(|r| r.to_id).collect();
2612 let mut b_ids: Vec<Uuid> = second.iter().map(|r| r.to_id).collect();
2613 a_ids.sort();
2614 b_ids.sort();
2615 assert_eq!(a_ids, b_ids);
2616 }
2617 }
2618 }
2619}