1use std::path::Path;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use kuzu::{Connection, Database, SystemConfig, Value};
6use uuid::Uuid;
7
8use crate::schema::{
9 Conversation, Entity, Memory, MemoryType, Relation, RelationType, SyncEntry, SyncNodeType,
10 SyncOp, SyncState,
11};
12use crate::store::Store;
13
14pub enum ApplyOutcome {
15 Created,
16 Updated,
17 Deleted,
18 Skipped,
19}
20
21pub struct KuzuStore {
22 db: Database,
23 machine_id: String,
24}
25
26fn memory_return_cols(alias: &str) -> String {
27 ["id", "content", "memory_type", "confidence", "created_at", "last_accessed", "access_count", "source", "source_id", "project_path", "machine_id", "updated_at"]
28 .iter()
29 .map(|col| format!("{alias}.{col}"))
30 .collect::<Vec<_>>()
31 .join(", ")
32}
33
34fn project_path_from_db(s: &str) -> Option<String> {
35 if s.is_empty() { None } else { Some(s.to_string()) }
36}
37
38impl KuzuStore {
39 pub fn open(path: &Path, machine_id: String) -> Result<Self> {
40 let db = Database::new(path, SystemConfig::default()).context("opening KùzuDB database")?;
41 let store = Self { db, machine_id };
42 store.init_schema()?;
43 Ok(store)
44 }
45
46 pub fn in_memory(machine_id: String) -> Result<Self> {
47 let db =
48 Database::in_memory(SystemConfig::default()).context("creating in-memory KùzuDB")?;
49 let store = Self { db, machine_id };
50 store.init_schema()?;
51 Ok(store)
52 }
53
54 pub fn machine_id(&self) -> &str {
55 &self.machine_id
56 }
57
58 fn conn(&self) -> Result<Connection<'_>> {
59 Connection::new(&self.db).context("creating connection")
60 }
61
62 pub fn diagnostic(&self) -> Result<String> {
63 let conn = self.conn()?;
64
65 let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
66 let total: i64 = result
67 .next()
68 .map(|r| match &r[0] {
69 Value::Int64(v) => *v,
70 _ => -1,
71 })
72 .unwrap_or(-1);
73
74 let mut result = conn.query(
75 "MATCH (m:Memory) WHERE size(m.embedding) > 0 AND m.embedding[1] <> 0.0 RETURN count(m);",
76 )?;
77 let with_emb: i64 = result
78 .next()
79 .map(|r| match &r[0] {
80 Value::Int64(v) => *v,
81 _ => -1,
82 })
83 .unwrap_or(-1);
84
85 let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
86 let zeros_str = format!("[{}]", zeros.join(","));
87 let idx_query = format!(
88 "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, 1) YIELD node, distance RETURN distance;",
89 zeros_str
90 );
91 let idx_status = match conn.query(&idx_query) {
92 Ok(mut r) => match r.next() {
93 Some(row) => format!("ok (distance: {:?})", &row[0]),
94 None => "ok (0 results)".to_string(),
95 },
96 Err(e) => format!("error: {e}"),
97 };
98
99 Ok(format!(
100 "total memories: {total}\nwith embeddings: {with_emb}\nvector index: {idx_status}"
101 ))
102 }
103
104 fn init_schema(&self) -> Result<()> {
105 let conn = self.conn()?;
106
107 conn.query(
108 "CREATE NODE TABLE IF NOT EXISTS Memory(
109 id STRING PRIMARY KEY,
110 content STRING,
111 embedding FLOAT[384],
112 memory_type STRING,
113 confidence FLOAT,
114 created_at STRING,
115 last_accessed STRING,
116 access_count INT64,
117 source STRING,
118 source_id STRING,
119 project_path STRING
120 );",
121 )
122 .context("creating Memory table")?;
123
124 conn.query("ALTER TABLE Memory ADD project_path STRING DEFAULT '';").ok();
125
126 conn.query(
127 "CREATE NODE TABLE IF NOT EXISTS Machine(
128 id STRING PRIMARY KEY,
129 name STRING
130 );",
131 )
132 .context("creating Machine table")?;
133
134 conn.query("ALTER TABLE Memory ADD machine_id STRING DEFAULT '';").ok();
135
136 conn.query("ALTER TABLE Memory ADD updated_at STRING DEFAULT '';").ok();
137 conn.query("MATCH (m:Memory) WHERE m.updated_at = '' SET m.updated_at = m.created_at;").ok();
138
139 conn.query(
140 "CREATE NODE TABLE IF NOT EXISTS Entity(
141 id STRING PRIMARY KEY,
142 name STRING,
143 entity_type STRING,
144 embedding FLOAT[384],
145 aliases STRING[]
146 );",
147 )
148 .context("creating Entity table")?;
149
150 conn.query(
151 "CREATE NODE TABLE IF NOT EXISTS Conversation(
152 id STRING PRIMARY KEY,
153 source STRING,
154 machine_id STRING,
155 started_at STRING,
156 project_path STRING
157 );",
158 )
159 .context("creating Conversation table")?;
160
161 conn.query(
162 "CREATE NODE TABLE IF NOT EXISTS IngestLog(
163 file_path STRING PRIMARY KEY,
164 file_hash STRING,
165 ingested_at STRING,
166 memory_count INT64,
167 source STRING
168 );",
169 )
170 .context("creating IngestLog table")?;
171
172 conn.query(
173 "CREATE REL TABLE IF NOT EXISTS RELATES_TO(
174 FROM Memory TO Memory,
175 strength FLOAT,
176 context STRING
177 );",
178 )
179 .context("creating RELATES_TO rel")?;
180
181 conn.query(
182 "CREATE REL TABLE IF NOT EXISTS MENTIONS(
183 FROM Memory TO Entity,
184 position INT64
185 );",
186 )
187 .context("creating MENTIONS rel")?;
188
189 conn.query(
190 "CREATE REL TABLE IF NOT EXISTS DERIVED_FROM(
191 FROM Memory TO Conversation,
192 transformation STRING
193 );",
194 )
195 .context("creating DERIVED_FROM rel")?;
196
197 conn.query(
198 "CREATE REL TABLE IF NOT EXISTS DISTILLED_FROM(
199 FROM Memory TO Memory,
200 model STRING
201 );",
202 )
203 .context("creating DISTILLED_FROM rel")?;
204
205 conn.query(
206 "CREATE REL TABLE IF NOT EXISTS CONTRADICTS(
207 FROM Memory TO Memory,
208 resolution STRING
209 );",
210 )
211 .context("creating CONTRADICTS rel")?;
212
213 conn.query(
214 "CREATE REL TABLE IF NOT EXISTS REINFORCES(
215 FROM Memory TO Memory
216 );",
217 )
218 .context("creating REINFORCES rel")?;
219
220 conn.query(
221 "CREATE REL TABLE IF NOT EXISTS SUPERSEDES(
222 FROM Memory TO Memory,
223 reason STRING
224 );",
225 )
226 .context("creating SUPERSEDES rel")?;
227
228 self.migrate_sync_log(&conn).context("migrating SyncLog table")?;
229
230 conn.query(
231 "CREATE NODE TABLE IF NOT EXISTS SyncState(
232 peer_id STRING PRIMARY KEY,
233 last_seq INT64,
234 last_sync_at STRING
235 );",
236 )
237 .context("creating SyncState table")?;
238
239 conn.query(
240 "CREATE NODE TABLE IF NOT EXISTS ConsolidationLog(
241 memory_id STRING PRIMARY KEY,
242 distilled_id STRING,
243 consolidated_at STRING,
244 model STRING
245 );",
246 )
247 .context("creating ConsolidationLog table")?;
248
249 conn.query(
250 "CREATE NODE TABLE IF NOT EXISTS WikiExportLog(
251 id STRING PRIMARY KEY,
252 last_sync_seq INT64,
253 exported_at STRING,
254 vault_path STRING,
255 pages_created INT64,
256 pages_updated INT64,
257 memories_processed INT64
258 );",
259 )
260 .context("creating WikiExportLog table")?;
261
262 conn.query(
263 "CREATE NODE TABLE IF NOT EXISTS Tombstone(
264 node_id STRING PRIMARY KEY,
265 node_type STRING,
266 deleted_at STRING,
267 machine_id STRING);",
268 )
269 .context("creating Tombstone table")?;
270
271 conn.query(
272 "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
273 )
274 .ok();
275
276 Ok(())
277 }
278
279 fn migrate_sync_log(&self, conn: &Connection<'_>) -> Result<()> {
280 let new_schema_present = conn
285 .query("MATCH (s:SyncLog) RETURN s.id LIMIT 1;")
286 .is_ok();
287 if !new_schema_present {
288 let old_schema_present = conn
289 .query("MATCH (s:SyncLog) RETURN s.seq LIMIT 1;")
290 .is_ok();
291 if old_schema_present {
292 conn.query("DROP TABLE SyncLog;")?;
293 conn.query("MATCH (s:SyncState) SET s.last_seq = 0;").ok();
294 }
295 }
296 conn.query(
297 "CREATE NODE TABLE IF NOT EXISTS SyncLog(
298 id STRING PRIMARY KEY,
299 local_seq INT64,
300 origin_machine_id STRING,
301 origin_seq INT64,
302 op STRING,
303 node_type STRING,
304 node_id STRING,
305 timestamp STRING,
306 data STRING
307 );",
308 )?;
309 Ok(())
310 }
311
312 fn with_transaction<T>(&self, f: impl FnOnce(&Connection<'_>) -> Result<T>) -> Result<T> {
313 let conn = self.conn()?;
314 conn.query("BEGIN TRANSACTION;")?;
315 match f(&conn) {
316 Ok(v) => {
317 conn.query("COMMIT;")?;
318 Ok(v)
319 }
320 Err(e) => {
321 let _ = conn.query("ROLLBACK;");
322 Err(e)
323 }
324 }
325 }
326
327 fn append_sync_log(
328 &self,
329 conn: &Connection<'_>,
330 op: SyncOp,
331 node_type: SyncNodeType,
332 node_id: &str,
333 data: Option<&str>,
334 origin_machine_id: &str,
335 origin_seq: Option<i64>,
336 ) -> Result<()> {
337 let mut r = conn.query("MATCH (s:SyncLog) RETURN max(s.local_seq);")?;
338 let local_seq: i64 = match r.next() {
339 Some(row) => match &row[0] {
340 Value::Int64(n) => n + 1,
341 _ => 1,
342 },
343 None => 1,
344 };
345 let origin_seq = origin_seq.unwrap_or(local_seq);
346 let id = format!("{}:{}", origin_machine_id, origin_seq);
347 let id_esc = escape_cypher(&id);
348 let mut dup = conn.query(&format!(
349 "MATCH (s:SyncLog {{id: '{id_esc}'}}) RETURN s.id LIMIT 1;"
350 ))?;
351 if dup.next().is_some() {
352 return Ok(());
353 }
354 let timestamp = chrono::Utc::now().to_rfc3339();
355 let op_str = match op {
356 SyncOp::Create => "create",
357 SyncOp::Update => "update",
358 SyncOp::Delete => "delete",
359 };
360 let nt_str = match node_type {
361 SyncNodeType::Memory => "memory",
362 SyncNodeType::Entity => "entity",
363 SyncNodeType::Conversation => "conversation",
364 SyncNodeType::Relation => "relation",
365 };
366 let origin_machine_esc = escape_cypher(origin_machine_id);
367 let node_id_esc = escape_cypher(node_id);
368 let data_esc = data.map(escape_cypher).unwrap_or_default();
369 conn.query(&format!(
370 "CREATE (:SyncLog {{id:'{id_esc}', local_seq:{local_seq}, \
371 origin_machine_id:'{origin_machine_esc}', origin_seq:{origin_seq}, \
372 op:'{op_str}', node_type:'{nt_str}', node_id:'{node_id_esc}', \
373 timestamp:'{timestamp}', data:'{data_esc}'}});"
374 ))?;
375 Ok(())
376 }
377
378 pub fn tombstone_exists(&self, id: Uuid) -> Result<bool> {
379 let conn = self.conn()?;
380 let id_esc = escape_cypher(&id.to_string());
381 let mut result = conn.query(&format!(
382 "MATCH (t:Tombstone {{node_id: '{id_esc}'}}) RETURN t.node_id LIMIT 1;"
383 ))?;
384 Ok(result.next().is_some())
385 }
386
387 fn upsert_tombstone(
388 &self,
389 conn: &Connection<'_>,
390 node_id: &str,
391 node_type: &str,
392 machine_id: &str,
393 ) -> Result<()> {
394 let node_id_esc = escape_cypher(node_id);
395 let node_type_esc = escape_cypher(node_type);
396 let machine_id_esc = escape_cypher(machine_id);
397 let deleted_at = chrono::Utc::now().to_rfc3339();
398 conn.query(&format!(
399 "MERGE (t:Tombstone {{node_id: '{node_id_esc}'}}) \
400 SET t.node_type = '{node_type_esc}', \
401 t.deleted_at = '{deleted_at}', \
402 t.machine_id = '{machine_id_esc}';"
403 ))?;
404 Ok(())
405 }
406}
407
408impl Store for KuzuStore {
409 fn store_memory(&self, memory: &Memory) -> Result<()> {
410 let data = serde_json::to_string(memory).ok();
411 self.with_transaction(|conn| {
412 self.create_memory_node(conn, memory)?;
413 self.append_sync_log(
414 conn,
415 SyncOp::Create,
416 SyncNodeType::Memory,
417 &memory.id.to_string(),
418 data.as_deref(),
419 &self.machine_id,
420 None,
421 )
422 })
423 }
424
425 fn get_memory(&self, id: Uuid) -> Result<Option<Memory>> {
426 let conn = self.conn()?;
427 let query = format!(
428 "MATCH (m:Memory {{id: '{}'}}) RETURN {};",
429 id, memory_return_cols("m")
430 );
431
432 let mut result = conn.query(&query)?;
433 match result.next() {
434 Some(row) => Ok(Some(row_to_memory(&row)?)),
435 None => Ok(None),
436 }
437 }
438
439 fn delete_memory(&self, id: Uuid) -> Result<()> {
440 let id_string = id.to_string();
441 self.with_transaction(|conn| {
442 conn.query(&format!(
443 "MATCH (m:Memory {{id: '{id_string}'}}) DETACH DELETE m;"
444 ))?;
445 self.upsert_tombstone(conn, &id_string, "memory", &self.machine_id)?;
446 self.append_sync_log(
447 conn,
448 SyncOp::Delete,
449 SyncNodeType::Memory,
450 &id_string,
451 None,
452 &self.machine_id,
453 None,
454 )
455 })
456 }
457
458 fn store_entity(&self, entity: &Entity) -> Result<()> {
459 let data = serde_json::to_string(entity).ok();
460 self.with_transaction(|conn| {
461 self.create_entity_node(conn, entity)?;
462 self.append_sync_log(
463 conn,
464 SyncOp::Create,
465 SyncNodeType::Entity,
466 &entity.id.to_string(),
467 data.as_deref(),
468 &self.machine_id,
469 None,
470 )
471 })
472 }
473
474 fn get_entity(&self, id: Uuid) -> Result<Option<Entity>> {
475 let conn = self.conn()?;
476 let mut result = conn.query(&format!(
477 "MATCH (e:Entity {{id: '{}'}}) RETURN e.id, e.name, e.entity_type;",
478 id
479 ))?;
480
481 match result.next() {
482 Some(row) => Ok(Some(row_to_entity(&row)?)),
483 None => Ok(None),
484 }
485 }
486
487 fn find_entity_by_name(&self, name: &str) -> Result<Option<Entity>> {
488 let conn = self.conn()?;
489 let name_escaped = escape_cypher(name);
490 let query = format!(
491 "MATCH (e:Entity) WHERE e.name = '{}' RETURN e.id, e.name, e.entity_type;",
492 name_escaped
493 );
494 let mut result = conn.query(&query)?;
495
496 match result.next() {
497 Some(row) => Ok(Some(row_to_entity(&row)?)),
498 None => Ok(None),
499 }
500 }
501
502 fn store_conversation(&self, conversation: &Conversation) -> Result<()> {
503 let data = serde_json::to_string(conversation).ok();
504 self.with_transaction(|conn| {
505 self.create_conversation_node(conn, conversation)?;
506 self.append_sync_log(
507 conn,
508 SyncOp::Create,
509 SyncNodeType::Conversation,
510 &conversation.id.to_string(),
511 data.as_deref(),
512 &self.machine_id,
513 None,
514 )
515 })
516 }
517
518 fn store_relation(&self, relation: &Relation) -> Result<()> {
519 let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
520 let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
521 let data = serde_json::to_string(relation).ok();
522 self.with_transaction(|conn| {
523 self.create_relation_edge(conn, relation)?;
524 self.append_sync_log(
525 conn,
526 SyncOp::Create,
527 SyncNodeType::Relation,
528 &node_id,
529 data.as_deref(),
530 &self.machine_id,
531 None,
532 )
533 })
534 }
535
536 fn get_relations(
537 &self,
538 node_id: Uuid,
539 relation_type: Option<RelationType>,
540 ) -> Result<Vec<Relation>> {
541 let conn = self.conn()?;
542 let id = node_id.to_string();
543
544 let query = match relation_type {
545 Some(RelationType::RelatesTo) => format!(
546 "MATCH (a:Memory {{id: '{}'}})-[r:RELATES_TO]->(b:Memory) RETURN b.id, r.strength, r.context;",
547 id
548 ),
549 Some(RelationType::Contradicts) => format!(
550 "MATCH (a:Memory {{id: '{}'}})-[r:CONTRADICTS]->(b:Memory) RETURN b.id, r.resolution;",
551 id
552 ),
553 Some(RelationType::Reinforces) => format!(
554 "MATCH (a:Memory {{id: '{}'}})-[:REINFORCES]->(b:Memory) RETURN b.id;",
555 id
556 ),
557 Some(RelationType::Supersedes) => format!(
558 "MATCH (a:Memory {{id: '{}'}})-[r:SUPERSEDES]->(b:Memory) RETURN b.id, r.reason;",
559 id
560 ),
561 Some(RelationType::Mentions) => format!(
562 "MATCH (a:Memory {{id: '{}'}})-[:MENTIONS]->(b:Entity) RETURN b.id;",
563 id
564 ),
565 Some(RelationType::DerivedFrom) => format!(
566 "MATCH (a:Memory {{id: '{}'}})-[:DERIVED_FROM]->(b:Conversation) RETURN b.id;",
567 id
568 ),
569 Some(RelationType::DistilledFrom) => format!(
570 "MATCH (a:Memory {{id: '{}'}})-[:DISTILLED_FROM]->(b:Memory) RETURN b.id;",
571 id
572 ),
573 None => {
574 let mut relations = Vec::new();
575 let mem_to_mem = [
576 ("RELATES_TO", RelationType::RelatesTo),
577 ("REINFORCES", RelationType::Reinforces),
578 ("CONTRADICTS", RelationType::Contradicts),
579 ("SUPERSEDES", RelationType::Supersedes),
580 ("DISTILLED_FROM", RelationType::DistilledFrom),
581 ];
582 for (label, rt) in &mem_to_mem {
583 let q = format!(
584 "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:Memory) RETURN b.id;",
585 id, label
586 );
587 if let Ok(mut result) = conn.query(&q) {
588 for row in &mut result {
589 let to_id_str = value_to_string(&row[0]);
590 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
591 relations.push(Relation {
592 from_id: node_id,
593 to_id,
594 relation_type: *rt,
595 strength: 1.0,
596 context: None,
597 });
598 }
599 }
600 }
601 for (label, target, rt) in [
602 ("MENTIONS", "Entity", RelationType::Mentions),
603 ("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
604 ] {
605 let q = format!(
606 "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:{}) RETURN b.id;",
607 id, label, target
608 );
609 if let Ok(mut result) = conn.query(&q) {
610 for row in &mut result {
611 let to_id_str = value_to_string(&row[0]);
612 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
613 relations.push(Relation {
614 from_id: node_id,
615 to_id,
616 relation_type: rt,
617 strength: 1.0,
618 context: None,
619 });
620 }
621 }
622 }
623 return Ok(relations);
624 }
625 };
626
627 let mut result = conn.query(&query)?;
628 let mut relations = Vec::new();
629
630 for row in &mut result {
631 let to_id_str = value_to_string(&row[0]);
632 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
633
634 relations.push(Relation {
635 from_id: node_id,
636 to_id,
637 relation_type: relation_type.unwrap_or(RelationType::RelatesTo),
638 strength: 1.0,
639 context: None,
640 });
641 }
642
643 Ok(relations)
644 }
645
646 fn vector_search(&self, embedding: &[f32], limit: usize) -> Result<Vec<(Memory, f32)>> {
647 let conn = self.conn()?;
648 let embedding_str = format_embedding(embedding);
649
650 let query = format!(
651 "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, {}) YIELD node, distance
652 RETURN {}, distance;",
653 embedding_str, limit, memory_return_cols("node")
654 );
655
656 let mut result = conn.query(&query)?;
657 let mut results = Vec::new();
658
659 for row in &mut result {
660 let memory = row_to_memory(&row[..12])?;
661 let distance = value_to_f32(&row[12]);
662 let similarity = 1.0 - distance;
663 results.push((memory, similarity));
664 }
665
666 Ok(results)
667 }
668
669 fn traverse(&self, start_id: Uuid, depth: u32) -> Result<Vec<(Memory, Vec<Relation>)>> {
670 let conn = self.conn()?;
671 let query = format!(
672 "MATCH (start:Memory {{id: '{}'}})-[r:RELATES_TO|REINFORCES*1..{}]->(m:Memory)
673 RETURN DISTINCT {};",
674 start_id, depth, memory_return_cols("m")
675 );
676
677 let mut result = conn.query(&query)?;
678 let mut results = Vec::new();
679
680 for row in &mut result {
681 let memory = row_to_memory(&row)?;
682 results.push((memory, Vec::new()));
683 }
684
685 Ok(results)
686 }
687
688 fn memories_by_source(&self, source: &str) -> Result<Vec<Memory>> {
689 let conn = self.conn()?;
690 let source_escaped = escape_cypher(source);
691 let query = format!(
692 "MATCH (m:Memory) WHERE m.source = '{}' RETURN {};",
693 source_escaped, memory_return_cols("m")
694 );
695 let mut result = conn.query(&query)?;
696
697 let mut memories = Vec::new();
698 for row in &mut result {
699 memories.push(row_to_memory(&row)?);
700 }
701 Ok(memories)
702 }
703
704 fn memories_by_type(&self, memory_type: MemoryType) -> Result<Vec<Memory>> {
705 let conn = self.conn()?;
706 let type_str = format!("{:?}", memory_type).to_lowercase();
707 let query = format!(
708 "MATCH (m:Memory) WHERE m.memory_type = '{}' RETURN {};",
709 type_str, memory_return_cols("m")
710 );
711 let mut result = conn.query(&query)?;
712
713 let mut memories = Vec::new();
714 for row in &mut result {
715 memories.push(row_to_memory(&row)?);
716 }
717 Ok(memories)
718 }
719
720 fn memories_needing_decay(&self, threshold_days: u32) -> Result<Vec<Memory>> {
721 let conn = self.conn()?;
722 let cutoff = chrono::Utc::now() - chrono::Duration::days(threshold_days as i64);
723 let cutoff_str = cutoff.to_rfc3339();
724
725 let query = format!(
726 "MATCH (m:Memory) WHERE m.last_accessed < '{}' AND m.confidence > 0.05 RETURN {};",
727 cutoff_str, memory_return_cols("m")
728 );
729
730 let mut result = conn.query(&query)?;
731 let mut memories = Vec::new();
732 for row in &mut result {
733 memories.push(row_to_memory(&row)?);
734 }
735 Ok(memories)
736 }
737
738 fn update_memory(&self, memory: &Memory) -> Result<()> {
739 let id = memory.id.to_string();
740 let last_accessed = memory.last_accessed.to_rfc3339();
741 let updated_at = memory.updated_at.to_rfc3339();
742 let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
743 let machine_id_escaped = escape_cypher(&memory.machine_id);
744 let data = serde_json::to_string(memory).ok();
745 self.with_transaction(|conn| {
746 let query = format!(
747 "MATCH (m:Memory {{id: '{}'}}) SET m.confidence = {}, m.last_accessed = '{}', m.access_count = {}, m.project_path = '{}', m.updated_at = '{}', m.machine_id = '{}';",
748 id, memory.confidence, last_accessed, memory.access_count, project_path_escaped, updated_at, machine_id_escaped
749 );
750 conn.query(&query)?;
751 self.append_sync_log(
752 conn,
753 SyncOp::Update,
754 SyncNodeType::Memory,
755 &id,
756 data.as_deref(),
757 &self.machine_id,
758 None,
759 )
760 })
761 }
762
763 fn record_access(&self, memory: &Memory) -> Result<()> {
764 let id_esc = escape_cypher(&memory.id.to_string());
765 let last_accessed = memory.last_accessed.to_rfc3339();
766 let last_accessed_esc = escape_cypher(&last_accessed);
767 let conn = self.conn()?;
768 conn.query(&format!(
769 "MATCH (m:Memory {{id: '{id_esc}'}}) SET \
770 m.last_accessed = '{last_accessed_esc}', \
771 m.access_count = {}, \
772 m.confidence = {};",
773 memory.access_count, memory.confidence
774 ))?;
775 Ok(())
776 }
777
778 fn text_search(&self, query: &str, limit: usize) -> Result<Vec<Memory>> {
779 let conn = self.conn()?;
780 let query_escaped = escape_cypher(&query.to_lowercase());
781 let cypher = format!(
782 "MATCH (m:Memory) WHERE lower(m.content) CONTAINS '{}' RETURN {} LIMIT {};",
783 query_escaped, memory_return_cols("m"), limit
784 );
785
786 let mut result = conn.query(&cypher)?;
787 let mut memories = Vec::new();
788 for row in &mut result {
789 memories.push(row_to_memory(&row)?);
790 }
791 Ok(memories)
792 }
793
794 fn memory_count(&self) -> Result<usize> {
795 let conn = self.conn()?;
796 let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
797 match result.next() {
798 Some(row) => match &row[0] {
799 Value::Int64(n) => Ok(*n as usize),
800 _ => Ok(0),
801 },
802 None => Ok(0),
803 }
804 }
805}
806
807impl KuzuStore {
808 pub fn find_or_create_entity(&self, name: &str, entity_type: &str) -> Result<Entity> {
809 if let Some(existing) = self.find_entity_by_name(name)? {
810 return Ok(existing);
811 }
812 let entity = Entity::new(name.to_string(), entity_type.to_string());
813 self.store_entity(&entity)?;
814 Ok(entity)
815 }
816
817 pub fn memory_content_exists(&self, content_prefix: &str) -> Result<bool> {
818 let conn = self.conn()?;
819 let escaped = escape_cypher(content_prefix);
820 let query = format!(
821 "MATCH (m:Memory) WHERE starts_with(m.content, '{}') RETURN m.id LIMIT 1;",
822 escaped
823 );
824 let mut result = conn.query(&query)?;
825 Ok(result.next().is_some())
826 }
827
828 pub fn unconsolidated_memories(&self, limit: usize) -> Result<Vec<Memory>> {
829 let conn = self.conn()?;
830 let query = format!(
831 "MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (c:ConsolidationLog) WHERE c.memory_id = m.id}} AND m.source <> 'consolidation' RETURN {} LIMIT {};",
832 memory_return_cols("m"), limit
833 );
834 let mut result = conn.query(&query)?;
835 let mut memories = Vec::new();
836 for row in &mut result {
837 memories.push(row_to_memory(&row)?);
838 }
839 Ok(memories)
840 }
841
842 pub fn memories_created_between(
843 &self,
844 start: &DateTime<Utc>,
845 end: &DateTime<Utc>,
846 ) -> Result<Vec<Memory>> {
847 let conn = self.conn()?;
848 let query = format!(
849 "MATCH (m:Memory) WHERE m.created_at >= '{}' AND m.created_at < '{}' RETURN {} ORDER BY m.created_at ASC;",
850 start.to_rfc3339(),
851 end.to_rfc3339(),
852 memory_return_cols("m")
853 );
854 let mut result = conn.query(&query)?;
855 let mut memories = Vec::new();
856 for row in &mut result {
857 memories.push(row_to_memory(&row)?);
858 }
859 Ok(memories)
860 }
861
862 pub fn mark_consolidated(&self, raw_id: Uuid, distilled_id: Uuid, model: &str) -> Result<()> {
863 let conn = self.conn()?;
864 let now = Utc::now().to_rfc3339();
865 let query = format!(
866 "CREATE (:ConsolidationLog {{memory_id: '{}', distilled_id: '{}', consolidated_at: '{}', model: '{}'}});",
867 raw_id,
868 distilled_id,
869 now,
870 escape_cypher(model)
871 );
872 conn.query(&query)?;
873 Ok(())
874 }
875
876 pub fn consolidation_count(&self) -> Result<usize> {
877 let conn = self.conn()?;
878 let mut result = conn.query("MATCH (c:ConsolidationLog) RETURN count(c);")?;
879 match result.next() {
880 Some(row) => match &row[0] {
881 Value::Int64(n) => Ok(*n as usize),
882 _ => Ok(0),
883 },
884 None => Ok(0),
885 }
886 }
887
888 pub fn delete_consolidated_raw(&self) -> Result<usize> {
889 let ids: Vec<Uuid> = {
890 let conn = self.conn()?;
891 let result = conn.query(
892 "MATCH (c:ConsolidationLog) WHERE c.memory_id <> c.distilled_id RETURN c.memory_id;",
893 )?;
894 let mut acc = Vec::new();
895 for row in result {
896 if let Value::String(id) = &row[0]
897 && let Ok(uuid) = Uuid::parse_str(id)
898 {
899 acc.push(uuid);
900 }
901 }
902 acc
903 };
904 let count = ids.len();
905 let machine_id = self.machine_id.clone();
906 for id in &ids {
907 let id_str = id.to_string();
908 let mid = machine_id.clone();
909 self.with_transaction(|conn| {
910 conn.query(&format!(
911 "MATCH (m:Memory {{id: '{id_str}'}}) DETACH DELETE m;"
912 ))?;
913 self.upsert_tombstone(conn, &id_str, "memory", &mid)?;
914 self.append_sync_log(
915 conn,
916 SyncOp::Delete,
917 SyncNodeType::Memory,
918 &id_str,
919 None,
920 &mid,
921 None,
922 )
923 })?;
924 }
925 Ok(count)
926 }
927
928 pub fn rebuild_vector_index(&self) -> Result<()> {
929 let conn = self.conn()?;
930 conn.query("CALL DROP_VECTOR_INDEX('Memory', 'memory_emb_idx');")
931 .ok();
932 conn.query(
933 "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
934 )?;
935 Ok(())
936 }
937
938 pub fn clear_ingest_log(&self) -> Result<usize> {
939 let conn = self.conn()?;
940 let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
941 let count = match result.next() {
942 Some(row) => match &row[0] {
943 Value::Int64(n) => *n as usize,
944 _ => 0,
945 },
946 None => 0,
947 };
948 conn.query("MATCH (l:IngestLog) DETACH DELETE l;")?;
949 Ok(count)
950 }
951
952 pub fn delete_memories_by_source(&self, source: &str) -> Result<usize> {
953 let conn = self.conn()?;
954 let escaped = escape_cypher(source);
955 let mut result = conn.query(&format!(
956 "MATCH (m:Memory) WHERE m.source = '{}' RETURN count(m);",
957 escaped
958 ))?;
959 let count = match result.next() {
960 Some(row) => match &row[0] {
961 Value::Int64(n) => *n as usize,
962 _ => 0,
963 },
964 None => 0,
965 };
966 conn.query(&format!(
967 "MATCH (m:Memory) WHERE m.source = '{}' DETACH DELETE m;",
968 escaped
969 ))?;
970 Ok(count)
971 }
972
973 pub fn is_file_ingested(&self, file_path: &str) -> Result<bool> {
974 let conn = self.conn()?;
975 let escaped = escape_cypher(file_path);
976 let mut result = conn.query(&format!(
977 "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_path;",
978 escaped
979 ))?;
980 Ok(result.next().is_some())
981 }
982
983 pub fn is_file_changed(&self, file_path: &str, file_hash: &str) -> Result<bool> {
984 let conn = self.conn()?;
985 let escaped = escape_cypher(file_path);
986 let mut result = conn.query(&format!(
987 "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_hash;",
988 escaped
989 ))?;
990 match result.next() {
991 Some(row) => {
992 let stored_hash = value_to_string(&row[0]);
993 Ok(stored_hash != file_hash)
994 }
995 None => Ok(true),
996 }
997 }
998
999 pub fn mark_ingested(
1000 &self,
1001 file_path: &str,
1002 file_hash: &str,
1003 memory_count: usize,
1004 source: &str,
1005 ) -> Result<()> {
1006 let conn = self.conn()?;
1007 let path_escaped = escape_cypher(file_path);
1008 let hash_escaped = escape_cypher(file_hash);
1009 let source_escaped = escape_cypher(source);
1010 let now = chrono::Utc::now().to_rfc3339();
1011
1012 conn.query(&format!(
1013 "MERGE (l:IngestLog {{file_path: '{}'}})
1014 SET l.file_hash = '{}', l.ingested_at = '{}', l.memory_count = {}, l.source = '{}';",
1015 path_escaped, hash_escaped, now, memory_count as i64, source_escaped
1016 ))?;
1017 Ok(())
1018 }
1019
1020 pub fn ingested_file_count(&self) -> Result<usize> {
1021 let conn = self.conn()?;
1022 let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
1023 match result.next() {
1024 Some(row) => match &row[0] {
1025 Value::Int64(n) => Ok(*n as usize),
1026 _ => Ok(0),
1027 },
1028 None => Ok(0),
1029 }
1030 }
1031}
1032
1033impl KuzuStore {
1034 fn create_memory_node(&self, conn: &Connection<'_>, memory: &Memory) -> Result<()> {
1035 let id = memory.id.to_string();
1036 let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
1037 let created_at = memory.created_at.to_rfc3339();
1038 let last_accessed = memory.last_accessed.to_rfc3339();
1039 let embedding_str = format_embedding(&memory.embedding);
1040 let content_escaped = escape_cypher(&memory.content);
1041 let source_escaped = escape_cypher(&memory.source);
1042 let source_id_escaped = escape_cypher(&memory.source_id);
1043 let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
1044 let machine_id_escaped = escape_cypher(&memory.machine_id);
1045 let updated_at = memory.updated_at.to_rfc3339();
1046
1047 let query = format!(
1048 "CREATE (:Memory {{
1049 id: '{id}',
1050 content: '{content_escaped}',
1051 embedding: {embedding_str},
1052 memory_type: '{memory_type}',
1053 confidence: {confidence},
1054 created_at: '{created_at}',
1055 last_accessed: '{last_accessed}',
1056 access_count: {access_count},
1057 source: '{source_escaped}',
1058 source_id: '{source_id_escaped}',
1059 project_path: '{project_path_escaped}',
1060 machine_id: '{machine_id_escaped}',
1061 updated_at: '{updated_at}'
1062 }});",
1063 confidence = memory.confidence,
1064 access_count = memory.access_count,
1065 );
1066
1067 conn.query(&query)?;
1068 Ok(())
1069 }
1070
1071 fn create_entity_node(&self, conn: &Connection<'_>, entity: &Entity) -> Result<()> {
1072 let id = entity.id.to_string();
1073 let embedding_str = format_embedding(&entity.embedding);
1074 let aliases_str = format_string_array(&entity.aliases);
1075 let name_escaped = escape_cypher(&entity.name);
1076 let etype_escaped = escape_cypher(&entity.entity_type);
1077
1078 let query = format!(
1079 "CREATE (:Entity {{
1080 id: '{id}',
1081 name: '{name_escaped}',
1082 entity_type: '{etype_escaped}',
1083 embedding: {embedding_str},
1084 aliases: {aliases_str}
1085 }});"
1086 );
1087
1088 conn.query(&query)?;
1089 Ok(())
1090 }
1091
1092 fn create_conversation_node(
1093 &self,
1094 conn: &Connection<'_>,
1095 conversation: &Conversation,
1096 ) -> Result<()> {
1097 let id = conversation.id.to_string();
1098 let started_at = conversation.started_at.to_rfc3339();
1099 let source_escaped = escape_cypher(&conversation.source);
1100 let machine_escaped = escape_cypher(&conversation.machine_id);
1101 let project_escaped = escape_cypher(conversation.project_path.as_deref().unwrap_or(""));
1102
1103 let query = format!(
1104 "CREATE (:Conversation {{
1105 id: '{id}',
1106 source: '{source_escaped}',
1107 machine_id: '{machine_escaped}',
1108 started_at: '{started_at}',
1109 project_path: '{project_escaped}'
1110 }});"
1111 );
1112
1113 conn.query(&query)?;
1114 Ok(())
1115 }
1116
1117 fn create_relation_edge(&self, conn: &Connection<'_>, relation: &Relation) -> Result<()> {
1118 let from_id = relation.from_id.to_string();
1119 let to_id = relation.to_id.to_string();
1120
1121 let query = match relation.relation_type {
1122 RelationType::RelatesTo => format!(
1123 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:RELATES_TO {{strength: {}, context: '{}'}}]->(b);",
1124 from_id,
1125 to_id,
1126 relation.strength,
1127 relation.context.as_deref().unwrap_or("")
1128 ),
1129 RelationType::Mentions => format!(
1130 "MATCH (a:Memory {{id: '{}'}}), (b:Entity {{id: '{}'}}) CREATE (a)-[:MENTIONS {{position: 0}}]->(b);",
1131 from_id, to_id
1132 ),
1133 RelationType::DerivedFrom => format!(
1134 "MATCH (a:Memory {{id: '{}'}}), (b:Conversation {{id: '{}'}}) CREATE (a)-[:DERIVED_FROM {{transformation: '{}'}}]->(b);",
1135 from_id,
1136 to_id,
1137 relation.context.as_deref().unwrap_or("direct")
1138 ),
1139 RelationType::Contradicts => format!(
1140 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:CONTRADICTS {{resolution: '{}'}}]->(b);",
1141 from_id,
1142 to_id,
1143 relation.context.as_deref().unwrap_or("")
1144 ),
1145 RelationType::Reinforces => format!(
1146 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:REINFORCES]->(b);",
1147 from_id, to_id
1148 ),
1149 RelationType::DistilledFrom => format!(
1150 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:DISTILLED_FROM {{model: '{}'}}]->(b);",
1151 from_id,
1152 to_id,
1153 relation.context.as_deref().unwrap_or("")
1154 ),
1155 RelationType::Supersedes => format!(
1156 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:SUPERSEDES {{reason: '{}'}}]->(b);",
1157 from_id,
1158 to_id,
1159 relation.context.as_deref().unwrap_or("")
1160 ),
1161 };
1162
1163 conn.query(&query)?;
1164 Ok(())
1165 }
1166}
1167
1168impl KuzuStore {
1169 pub fn get_conversation(&self, id: Uuid) -> Result<Option<Conversation>> {
1170 let conn = self.conn()?;
1171 let mut result = conn.query(&format!(
1172 "MATCH (c:Conversation {{id: '{}'}}) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1173 id
1174 ))?;
1175
1176 match result.next() {
1177 Some(row) => {
1178 let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1179 let source = value_to_string(&row[1]);
1180 let machine_id = value_to_string(&row[2]);
1181 let started_at_str = value_to_string(&row[3]);
1182 let project_path = {
1183 let s = value_to_string(&row[4]);
1184 if s.is_empty() { None } else { Some(s) }
1185 };
1186 let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1187 .map(|dt| dt.with_timezone(&chrono::Utc))
1188 .unwrap_or_else(|_| chrono::Utc::now());
1189
1190 Ok(Some(Conversation {
1191 id,
1192 source,
1193 machine_id,
1194 started_at,
1195 project_path,
1196 }))
1197 }
1198 None => Ok(None),
1199 }
1200 }
1201
1202 pub fn get_memory_with_embedding(&self, id: Uuid) -> Result<Option<Memory>> {
1203 let conn = self.conn()?;
1204 let query = format!(
1205 "MATCH (m:Memory {{id: '{}'}}) RETURN {}, m.embedding;",
1206 id, memory_return_cols("m")
1207 );
1208
1209 let mut result = conn.query(&query)?;
1210 match result.next() {
1211 Some(row) => {
1212 let mut memory = row_to_memory(&row[..12])?;
1213 memory.embedding = value_to_f32_vec(&row[12]);
1214 Ok(Some(memory))
1215 }
1216 None => Ok(None),
1217 }
1218 }
1219
1220 pub fn sync_log_since(&self, after_seq: u64) -> Result<Vec<SyncEntry>> {
1221 self.sync_log_page(after_seq, None)
1222 }
1223
1224 fn tombstone_exists_on(&self, conn: &Connection<'_>, id: Uuid) -> Result<bool> {
1225 let id_esc = escape_cypher(&id.to_string());
1226 let mut result = conn.query(&format!(
1227 "MATCH (t:Tombstone {{node_id: '{id_esc}'}}) RETURN t.node_id LIMIT 1;"
1228 ))?;
1229 Ok(result.next().is_some())
1230 }
1231
1232 fn normalize_incoming(mem: &mut Memory) {
1233 if mem.updated_at == DateTime::UNIX_EPOCH {
1234 mem.updated_at = mem.created_at;
1235 }
1236 }
1237
1238 pub fn apply_create_memory(
1239 &self,
1240 mem: &Memory,
1241 origin_machine_id: &str,
1242 origin_seq: i64,
1243 ) -> Result<ApplyOutcome> {
1244 let mut mem = mem.clone();
1245 Self::normalize_incoming(&mut mem);
1246 let data = serde_json::to_string(&mem).ok();
1247 let node_id = mem.id.to_string();
1248 self.with_transaction(|conn| {
1249 if self.tombstone_exists_on(conn, mem.id)? {
1250 return Ok(ApplyOutcome::Skipped);
1251 }
1252 if self.get_memory(mem.id)?.is_some() {
1253 return Ok(ApplyOutcome::Skipped);
1254 }
1255 self.create_memory_node(conn, &mem)?;
1256 self.append_sync_log(
1257 conn,
1258 SyncOp::Create,
1259 SyncNodeType::Memory,
1260 &node_id,
1261 data.as_deref(),
1262 origin_machine_id,
1263 Some(origin_seq),
1264 )?;
1265 Ok(ApplyOutcome::Created)
1266 })
1267 }
1268
1269 pub fn apply_update_memory(
1270 &self,
1271 incoming: &Memory,
1272 origin_machine_id: &str,
1273 origin_seq: i64,
1274 ) -> Result<(ApplyOutcome, Option<Memory>)> {
1275 let mut incoming = incoming.clone();
1276 Self::normalize_incoming(&mut incoming);
1277 let data = serde_json::to_string(&incoming).ok();
1278 let node_id = incoming.id.to_string();
1279 self.with_transaction(|conn| {
1280 match self.get_memory(incoming.id)? {
1281 None => {
1282 if self.tombstone_exists_on(conn, incoming.id)? {
1283 return Ok((ApplyOutcome::Skipped, None));
1284 }
1285 self.create_memory_node(conn, &incoming)?;
1286 self.append_sync_log(
1287 conn,
1288 SyncOp::Create,
1289 SyncNodeType::Memory,
1290 &node_id,
1291 data.as_deref(),
1292 origin_machine_id,
1293 Some(origin_seq),
1294 )?;
1295 Ok((ApplyOutcome::Created, Some(incoming)))
1296 }
1297 Some(existing) => {
1298 let incoming_wins = if incoming.updated_at != existing.updated_at {
1299 incoming.updated_at > existing.updated_at
1300 } else if incoming.access_count != existing.access_count {
1301 incoming.access_count > existing.access_count
1302 } else {
1303 incoming.machine_id > existing.machine_id
1304 };
1305
1306 if !incoming_wins {
1307 return Ok((ApplyOutcome::Skipped, None));
1308 }
1309
1310 let id = incoming.id.to_string();
1311 let content_escaped = escape_cypher(&incoming.content);
1312 let memory_type = format!("{:?}", incoming.memory_type).to_lowercase();
1313 let created_at = incoming.created_at.to_rfc3339();
1314 let last_accessed = incoming.last_accessed.to_rfc3339();
1315 let updated_at = incoming.updated_at.to_rfc3339();
1316 let source_escaped = escape_cypher(&incoming.source);
1317 let source_id_escaped = escape_cypher(&incoming.source_id);
1318 let project_path_escaped =
1319 escape_cypher(incoming.project_path.as_deref().unwrap_or(""));
1320 let machine_id_escaped = escape_cypher(&incoming.machine_id);
1321 conn.query(&format!(
1324 "MATCH (m:Memory {{id: '{id}'}}) SET \
1325 m.content = '{content_escaped}', \
1326 m.memory_type = '{memory_type}', \
1327 m.confidence = {confidence}, \
1328 m.created_at = '{created_at}', \
1329 m.last_accessed = '{last_accessed}', \
1330 m.access_count = {access_count}, \
1331 m.source = '{source_escaped}', \
1332 m.source_id = '{source_id_escaped}', \
1333 m.project_path = '{project_path_escaped}', \
1334 m.machine_id = '{machine_id_escaped}', \
1335 m.updated_at = '{updated_at}';",
1336 confidence = incoming.confidence,
1337 access_count = incoming.access_count,
1338 ))?;
1339 self.append_sync_log(
1340 conn,
1341 SyncOp::Update,
1342 SyncNodeType::Memory,
1343 &node_id,
1344 data.as_deref(),
1345 origin_machine_id,
1346 Some(origin_seq),
1347 )?;
1348 Ok((ApplyOutcome::Updated, Some(incoming)))
1349 }
1350 }
1351 })
1352 }
1353
1354 pub fn apply_delete_memory(
1355 &self,
1356 node_id: &str,
1357 origin_machine_id: &str,
1358 origin_seq: i64,
1359 ) -> Result<ApplyOutcome> {
1360 self.with_transaction(|conn| {
1361 conn.query(&format!(
1362 "MATCH (m:Memory {{id: '{node_id}'}}) DETACH DELETE m;"
1363 ))?;
1364 self.upsert_tombstone(conn, node_id, "memory", origin_machine_id)?;
1365 self.append_sync_log(
1366 conn,
1367 SyncOp::Delete,
1368 SyncNodeType::Memory,
1369 node_id,
1370 None,
1371 origin_machine_id,
1372 Some(origin_seq),
1373 )?;
1374 Ok(ApplyOutcome::Deleted)
1375 })
1376 }
1377
1378 pub fn apply_create_conversation(
1379 &self,
1380 conv: &Conversation,
1381 origin_machine_id: &str,
1382 origin_seq: i64,
1383 ) -> Result<ApplyOutcome> {
1384 let data = serde_json::to_string(conv).ok();
1385 let node_id = conv.id.to_string();
1386 self.with_transaction(|conn| {
1387 if self.get_conversation(conv.id)?.is_some() {
1388 return Ok(ApplyOutcome::Skipped);
1389 }
1390 self.create_conversation_node(conn, conv)?;
1391 self.append_sync_log(
1392 conn,
1393 SyncOp::Create,
1394 SyncNodeType::Conversation,
1395 &node_id,
1396 data.as_deref(),
1397 origin_machine_id,
1398 Some(origin_seq),
1399 )?;
1400 Ok(ApplyOutcome::Created)
1401 })
1402 }
1403
1404 pub fn apply_upsert_conversation(
1405 &self,
1406 conv: &Conversation,
1407 origin_machine_id: &str,
1408 origin_seq: i64,
1409 ) -> Result<ApplyOutcome> {
1410 let data = serde_json::to_string(conv).ok();
1411 let node_id = conv.id.to_string();
1412 let source_escaped = escape_cypher(&conv.source);
1413 let machine_escaped = escape_cypher(&conv.machine_id);
1414 let project_escaped = escape_cypher(conv.project_path.as_deref().unwrap_or(""));
1415 let started_at = conv.started_at.to_rfc3339();
1416 self.with_transaction(|conn| {
1417 conn.query(&format!(
1418 "MERGE (c:Conversation {{id: '{node_id}'}}) SET \
1419 c.source = '{source_escaped}', \
1420 c.machine_id = '{machine_escaped}', \
1421 c.started_at = '{started_at}', \
1422 c.project_path = '{project_escaped}';"
1423 ))?;
1424 self.append_sync_log(
1425 conn,
1426 SyncOp::Update,
1427 SyncNodeType::Conversation,
1428 &node_id,
1429 data.as_deref(),
1430 origin_machine_id,
1431 Some(origin_seq),
1432 )?;
1433 Ok(ApplyOutcome::Updated)
1434 })
1435 }
1436
1437 pub fn apply_delete_conversation(
1438 &self,
1439 node_id: &str,
1440 origin_machine_id: &str,
1441 origin_seq: i64,
1442 ) -> Result<ApplyOutcome> {
1443 self.with_transaction(|conn| {
1444 conn.query(&format!(
1445 "MATCH (c:Conversation {{id: '{node_id}'}}) DETACH DELETE c;"
1446 ))?;
1447 self.append_sync_log(
1448 conn,
1449 SyncOp::Delete,
1450 SyncNodeType::Conversation,
1451 node_id,
1452 None,
1453 origin_machine_id,
1454 Some(origin_seq),
1455 )?;
1456 Ok(ApplyOutcome::Deleted)
1457 })
1458 }
1459
1460 pub fn apply_create_entity(
1461 &self,
1462 entity: &Entity,
1463 origin_machine_id: &str,
1464 origin_seq: i64,
1465 ) -> Result<ApplyOutcome> {
1466 let data = serde_json::to_string(entity).ok();
1467 let node_id = entity.id.to_string();
1468 self.with_transaction(|conn| {
1469 if self.get_entity(entity.id)?.is_some() {
1470 return Ok(ApplyOutcome::Skipped);
1471 }
1472 self.create_entity_node(conn, entity)?;
1473 self.append_sync_log(
1474 conn,
1475 SyncOp::Create,
1476 SyncNodeType::Entity,
1477 &node_id,
1478 data.as_deref(),
1479 origin_machine_id,
1480 Some(origin_seq),
1481 )?;
1482 Ok(ApplyOutcome::Created)
1483 })
1484 }
1485
1486 pub fn apply_upsert_entity(
1487 &self,
1488 entity: &Entity,
1489 origin_machine_id: &str,
1490 origin_seq: i64,
1491 ) -> Result<ApplyOutcome> {
1492 let data = serde_json::to_string(entity).ok();
1493 let node_id = entity.id.to_string();
1494 let name_escaped = escape_cypher(&entity.name);
1495 let etype_escaped = escape_cypher(&entity.entity_type);
1496 let embedding_str = format_embedding(&entity.embedding);
1497 let aliases_str = format_string_array(&entity.aliases);
1498 self.with_transaction(|conn| {
1499 conn.query(&format!(
1500 "MERGE (e:Entity {{id: '{node_id}'}}) SET \
1501 e.name = '{name_escaped}', \
1502 e.entity_type = '{etype_escaped}', \
1503 e.embedding = {embedding_str}, \
1504 e.aliases = {aliases_str};"
1505 ))?;
1506 self.append_sync_log(
1507 conn,
1508 SyncOp::Update,
1509 SyncNodeType::Entity,
1510 &node_id,
1511 data.as_deref(),
1512 origin_machine_id,
1513 Some(origin_seq),
1514 )?;
1515 Ok(ApplyOutcome::Updated)
1516 })
1517 }
1518
1519 pub fn apply_delete_entity(
1520 &self,
1521 node_id: &str,
1522 origin_machine_id: &str,
1523 origin_seq: i64,
1524 ) -> Result<ApplyOutcome> {
1525 self.with_transaction(|conn| {
1526 conn.query(&format!(
1527 "MATCH (e:Entity {{id: '{node_id}'}}) DETACH DELETE e;"
1528 ))?;
1529 self.append_sync_log(
1530 conn,
1531 SyncOp::Delete,
1532 SyncNodeType::Entity,
1533 node_id,
1534 None,
1535 origin_machine_id,
1536 Some(origin_seq),
1537 )?;
1538 Ok(ApplyOutcome::Deleted)
1539 })
1540 }
1541
1542 pub fn apply_create_relation(
1543 &self,
1544 relation: &Relation,
1545 origin_machine_id: &str,
1546 origin_seq: i64,
1547 ) -> Result<ApplyOutcome> {
1548 let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
1549 let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
1550 let data = serde_json::to_string(relation).ok();
1551 self.with_transaction(|conn| {
1552 match self.create_relation_edge(conn, relation) {
1553 Ok(()) => {
1554 self.append_sync_log(
1555 conn,
1556 SyncOp::Create,
1557 SyncNodeType::Relation,
1558 &node_id,
1559 data.as_deref(),
1560 origin_machine_id,
1561 Some(origin_seq),
1562 )?;
1563 Ok(ApplyOutcome::Created)
1564 }
1565 Err(_) => Ok(ApplyOutcome::Skipped),
1566 }
1567 })
1568 }
1569
1570 pub fn apply_delete_relation(
1571 &self,
1572 node_id: &str,
1573 relation: Option<&Relation>,
1574 origin_machine_id: &str,
1575 origin_seq: i64,
1576 ) -> Result<ApplyOutcome> {
1577 let Some(rel) = relation else {
1578 return Ok(ApplyOutcome::Skipped);
1581 };
1582 self.with_transaction(|conn| {
1583 let from_id = rel.from_id.to_string();
1584 let to_id = rel.to_id.to_string();
1585 let rel_label = match rel.relation_type {
1586 crate::schema::RelationType::RelatesTo => "RELATES_TO",
1587 crate::schema::RelationType::Mentions => "MENTIONS",
1588 crate::schema::RelationType::DerivedFrom => "DERIVED_FROM",
1589 crate::schema::RelationType::Contradicts => "CONTRADICTS",
1590 crate::schema::RelationType::Reinforces => "REINFORCES",
1591 crate::schema::RelationType::Supersedes => "SUPERSEDES",
1592 crate::schema::RelationType::DistilledFrom => "DISTILLED_FROM",
1593 };
1594 conn.query(&format!(
1595 "MATCH (a {{id: '{from_id}'}})-[r:{rel_label}]->(b {{id: '{to_id}'}}) DELETE r;"
1596 ))
1597 .ok();
1598 self.append_sync_log(
1599 conn,
1600 SyncOp::Delete,
1601 SyncNodeType::Relation,
1602 node_id,
1603 None,
1604 origin_machine_id,
1605 Some(origin_seq),
1606 )?;
1607 Ok(ApplyOutcome::Deleted)
1608 })
1609 }
1610
1611 pub fn sync_log_page(&self, after_seq: u64, limit: Option<usize>) -> Result<Vec<SyncEntry>> {
1612 let conn = self.conn()?;
1613 let limit_clause = match limit {
1614 Some(n) => format!(" LIMIT {}", n),
1615 None => String::new(),
1616 };
1617 let query = format!(
1618 "MATCH (s:SyncLog) WHERE s.local_seq > {} \
1619 RETURN s.id, s.local_seq, s.origin_machine_id, s.origin_seq, s.op, s.node_type, s.node_id, s.timestamp, s.data \
1620 ORDER BY s.local_seq{};",
1621 after_seq, limit_clause
1622 );
1623
1624 let mut result = conn.query(&query)?;
1625 let mut entries = Vec::new();
1626
1627 for row in &mut result {
1628 let id = value_to_string(&row[0]);
1629 let local_seq = match &row[1] {
1630 Value::Int64(n) => *n,
1631 _ => 0,
1632 };
1633 let origin_machine_id = value_to_string(&row[2]);
1634 let origin_seq = match &row[3] {
1635 Value::Int64(n) => *n,
1636 _ => 0,
1637 };
1638 let op_str = value_to_string(&row[4]);
1639 let node_type_str = value_to_string(&row[5]);
1640 let node_id = value_to_string(&row[6]);
1641 let timestamp_str = value_to_string(&row[7]);
1642 let data_str = value_to_string(&row[8]);
1643
1644 let op = match op_str.as_str() {
1645 "create" => SyncOp::Create,
1646 "update" => SyncOp::Update,
1647 "delete" => SyncOp::Delete,
1648 _ => continue,
1649 };
1650 let node_type = match node_type_str.as_str() {
1651 "memory" => SyncNodeType::Memory,
1652 "entity" => SyncNodeType::Entity,
1653 "conversation" => SyncNodeType::Conversation,
1654 "relation" => SyncNodeType::Relation,
1655 _ => continue,
1656 };
1657 let timestamp = chrono::DateTime::parse_from_rfc3339(×tamp_str)
1658 .map(|dt| dt.with_timezone(&chrono::Utc))
1659 .unwrap_or_else(|_| chrono::Utc::now());
1660
1661 let data = if data_str.is_empty() {
1662 None
1663 } else {
1664 Some(data_str)
1665 };
1666
1667 entries.push(SyncEntry {
1668 id,
1669 local_seq,
1670 origin_machine_id,
1671 origin_seq,
1672 op,
1673 node_type,
1674 node_id,
1675 timestamp,
1676 data,
1677 });
1678 }
1679
1680 Ok(entries)
1681 }
1682
1683 pub fn get_sync_state(&self, peer_id: &str) -> Result<Option<SyncState>> {
1684 let conn = self.conn()?;
1685 let escaped = escape_cypher(peer_id);
1686 let mut result = conn.query(&format!(
1687 "MATCH (s:SyncState {{peer_id: '{}'}}) RETURN s.peer_id, s.last_seq, s.last_sync_at;",
1688 escaped
1689 ))?;
1690
1691 match result.next() {
1692 Some(row) => {
1693 let peer_id = value_to_string(&row[0]);
1694 let last_seq = match &row[1] {
1695 Value::Int64(n) => *n as u64,
1696 _ => 0,
1697 };
1698 let last_sync_at_str = value_to_string(&row[2]);
1699 let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1700 .map(|dt| dt.with_timezone(&chrono::Utc))
1701 .unwrap_or_else(|_| chrono::Utc::now());
1702
1703 Ok(Some(SyncState {
1704 peer_id,
1705 last_seq,
1706 last_sync_at,
1707 }))
1708 }
1709 None => Ok(None),
1710 }
1711 }
1712
1713 pub fn set_sync_state(&self, state: &SyncState) -> Result<()> {
1714 let conn = self.conn()?;
1715 let peer_escaped = escape_cypher(&state.peer_id);
1716 let now = state.last_sync_at.to_rfc3339();
1717
1718 conn.query(&format!(
1719 "MERGE (s:SyncState {{peer_id: '{}'}}) SET s.last_seq = {}, s.last_sync_at = '{}';",
1720 peer_escaped, state.last_seq, now
1721 ))?;
1722 Ok(())
1723 }
1724
1725 pub fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
1726 let conn = self.conn()?;
1727 let mut result =
1728 conn.query("MATCH (s:SyncState) RETURN s.peer_id, s.last_seq, s.last_sync_at;")?;
1729 let mut states = Vec::new();
1730
1731 for row in &mut result {
1732 let peer_id = value_to_string(&row[0]);
1733 let last_seq = match &row[1] {
1734 Value::Int64(n) => *n as u64,
1735 _ => 0,
1736 };
1737 let last_sync_at_str = value_to_string(&row[2]);
1738 let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1739 .map(|dt| dt.with_timezone(&chrono::Utc))
1740 .unwrap_or_else(|_| chrono::Utc::now());
1741
1742 states.push(SyncState {
1743 peer_id,
1744 last_seq,
1745 last_sync_at,
1746 });
1747 }
1748
1749 Ok(states)
1750 }
1751
1752 pub fn all_entities(&self) -> Result<Vec<Entity>> {
1753 let conn = self.conn()?;
1754 let mut result = conn.query("MATCH (e:Entity) RETURN e.id, e.name, e.entity_type;")?;
1755 let mut entities = Vec::new();
1756 for row in &mut result {
1757 entities.push(row_to_entity(&row)?);
1758 }
1759 Ok(entities)
1760 }
1761
1762 pub fn memories_for_entity(&self, entity_id: Uuid) -> Result<Vec<Memory>> {
1763 let conn = self.conn()?;
1764 let query = format!(
1765 "MATCH (m:Memory)-[:MENTIONS]->(e:Entity {{id: '{}'}}) RETURN {};",
1766 entity_id, memory_return_cols("m")
1767 );
1768 let mut result = conn.query(&query)?;
1769 let mut memories = Vec::new();
1770 for row in &mut result {
1771 memories.push(row_to_memory(&row)?);
1772 }
1773 Ok(memories)
1774 }
1775
1776 pub fn unassociated_memories(&self) -> Result<Vec<Memory>> {
1777 let conn = self.conn()?;
1778 let mut result = conn.query(
1779 &format!("MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (m)-[:MENTIONS]->(:Entity)}} RETURN {};", memory_return_cols("m"))
1780 )?;
1781 let mut memories = Vec::new();
1782 for row in &mut result {
1783 memories.push(row_to_memory(&row)?);
1784 }
1785 Ok(memories)
1786 }
1787
1788 pub fn related_entity_names(&self, entity_id: Uuid) -> Result<Vec<String>> {
1789 let conn = self.conn()?;
1790 let query = format!(
1791 "MATCH (m:Memory)-[:MENTIONS]->(e1:Entity {{id: '{}'}}), (m)-[:MENTIONS]->(e2:Entity) WHERE e2.id <> '{}' RETURN DISTINCT e2.name;",
1792 entity_id, entity_id
1793 );
1794 let mut result = conn.query(&query)?;
1795 let mut names = Vec::new();
1796 for row in &mut result {
1797 names.push(value_to_string(&row[0]));
1798 }
1799 Ok(names)
1800 }
1801
1802 pub fn max_sync_seq(&self) -> Result<u64> {
1803 let conn = self.conn()?;
1804 let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.local_seq);")?;
1805 match result.next() {
1806 Some(row) => match &row[0] {
1807 Value::Int64(n) => Ok(*n as u64),
1808 _ => Ok(0),
1809 },
1810 None => Ok(0),
1811 }
1812 }
1813
1814 pub fn backfill_project_paths(&self) -> Result<u64> {
1815 let conn = self.conn()?;
1816 let mut result = conn.query(
1817 "MATCH (m:Memory)-[:DERIVED_FROM]->(c:Conversation) WHERE m.project_path = '' AND c.project_path IS NOT NULL AND c.project_path <> '' SET m.project_path = c.project_path RETURN count(m);"
1818 )?;
1819 match result.next() {
1820 Some(row) => match &row[0] {
1821 Value::Int64(n) => Ok(*n as u64),
1822 _ => Ok(0),
1823 },
1824 None => Ok(0),
1825 }
1826 }
1827
1828 pub fn memories_by_project_path(&self, project_path: &str) -> Result<Vec<Memory>> {
1829 let conn = self.conn()?;
1830 let escaped = escape_cypher(project_path);
1831 let cols = memory_return_cols("m");
1832 let query = format!(
1833 "MATCH (m:Memory) WHERE m.project_path = '{}' RETURN {} LIMIT 20;",
1834 escaped, cols
1835 );
1836 let mut result = conn.query(&query)?;
1837 let mut memories = Vec::new();
1838 for row in &mut result {
1839 memories.push(row_to_memory(&row)?);
1840 }
1841 Ok(memories)
1842 }
1843
1844 pub fn backfill_sync_log(&self) -> Result<u64> {
1845 let mut count = 0u64;
1846
1847 let memories: Vec<Memory> = {
1848 let conn = self.conn()?;
1849 let mut result = conn.query(&format!(
1850 "MATCH (m:Memory) RETURN {}, m.embedding;",
1851 memory_return_cols("m")
1852 ))?;
1853 let mut acc = Vec::new();
1854 for row in &mut result {
1855 let mut memory = row_to_memory(&row[..12])?;
1856 memory.embedding = value_to_f32_vec(&row[12]);
1857 acc.push(memory);
1858 }
1859 acc
1860 };
1861
1862 for memory in &memories {
1863 let origin_mid = memory.machine_id.clone();
1864 let data = serde_json::to_string(memory).ok();
1865 let node_id = memory.id.to_string();
1866 self.with_transaction(|conn| {
1867 self.append_sync_log(
1868 conn,
1869 SyncOp::Create,
1870 SyncNodeType::Memory,
1871 &node_id,
1872 data.as_deref(),
1873 &origin_mid,
1874 None,
1875 )
1876 })?;
1877 count += 1;
1878 }
1879
1880 let conversations: Vec<Conversation> = {
1881 let conn = self.conn()?;
1882 let mut result = conn.query(
1883 "MATCH (c:Conversation) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1884 )?;
1885 let mut acc = Vec::new();
1886 for row in &mut result {
1887 let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1888 let source = value_to_string(&row[1]);
1889 let machine_id = value_to_string(&row[2]);
1890 let started_at_str = value_to_string(&row[3]);
1891 let project_path = {
1892 let s = value_to_string(&row[4]);
1893 if s.is_empty() { None } else { Some(s) }
1894 };
1895 let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1896 .map(|dt| dt.with_timezone(&chrono::Utc))
1897 .unwrap_or_else(|_| chrono::Utc::now());
1898 acc.push(Conversation { id, source, machine_id, started_at, project_path });
1899 }
1900 acc
1901 };
1902
1903 for conv in &conversations {
1904 let origin_mid = conv.machine_id.clone();
1905 let data = serde_json::to_string(conv).ok();
1906 let node_id = conv.id.to_string();
1907 self.with_transaction(|conn| {
1908 self.append_sync_log(
1909 conn,
1910 SyncOp::Create,
1911 SyncNodeType::Conversation,
1912 &node_id,
1913 data.as_deref(),
1914 &origin_mid,
1915 None,
1916 )
1917 })?;
1918 count += 1;
1919 }
1920
1921 Ok(count)
1922 }
1923}
1924
1925impl KuzuStore {
1926 pub fn upsert_machine(&self, id: &str, name: &str) -> Result<()> {
1927 let conn = self.conn()?;
1928 let id_escaped = escape_cypher(id);
1929 let name_escaped = escape_cypher(name);
1930 conn.query(&format!(
1931 "MERGE (m:Machine {{id: '{id_escaped}'}}) SET m.name = '{name_escaped}';"
1932 ))?;
1933 Ok(())
1934 }
1935
1936 pub fn get_machine_name(&self, id: &str) -> Result<Option<String>> {
1937 let conn = self.conn()?;
1938 let id_escaped = escape_cypher(id);
1939 let mut result = conn.query(&format!(
1940 "MATCH (m:Machine {{id: '{id_escaped}'}}) RETURN m.name;"
1941 ))?;
1942 match result.next() {
1943 Some(row) => Ok(Some(value_to_string(&row[0]))),
1944 None => Ok(None),
1945 }
1946 }
1947
1948 pub fn get_all_machines(&self) -> Result<std::collections::HashMap<String, String>> {
1949 let conn = self.conn()?;
1950 let mut result = conn.query("MATCH (m:Machine) RETURN m.id, m.name;")?;
1951 let mut map = std::collections::HashMap::new();
1952 for row in &mut result {
1953 map.insert(value_to_string(&row[0]), value_to_string(&row[1]));
1954 }
1955 Ok(map)
1956 }
1957
1958 pub fn backfill_machine_id(&self, machine_id: &str) -> Result<u64> {
1959 let conn = self.conn()?;
1960 let escaped = escape_cypher(machine_id);
1961 let mut result = conn.query(&format!(
1962 "MATCH (m:Memory) WHERE m.machine_id = '' SET m.machine_id = '{escaped}' RETURN count(m);"
1963 ))?;
1964 match result.next() {
1965 Some(row) => match &row[0] {
1966 Value::Int64(n) => Ok(*n as u64),
1967 _ => Ok(0),
1968 },
1969 None => Ok(0),
1970 }
1971 }
1972
1973 pub fn register_machine(&self, identity: &crate::machine::MachineIdentity) -> Result<()> {
1974 self.upsert_machine(&identity.id, &identity.name)?;
1975 let count = self.backfill_machine_id(&identity.id)?;
1976 if count > 0 {
1977 tracing::info!("backfilled machine_id on {count} existing memories");
1978 }
1979 Ok(())
1980 }
1981}
1982
1983fn format_embedding(embedding: &[f32]) -> String {
1984 if embedding.is_empty() {
1985 let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
1986 return format!("[{}]", zeros.join(","));
1987 }
1988 let parts: Vec<String> = embedding.iter().map(|v| format!("{}", v)).collect();
1989 format!("[{}]", parts.join(","))
1990}
1991
1992fn escape_cypher(s: &str) -> String {
1993 s.replace('\\', "\\\\").replace('\'', "\\'")
1994}
1995
1996fn format_string_array(items: &[String]) -> String {
1997 let parts: Vec<String> = items
1998 .iter()
1999 .map(|s| format!("'{}'", s.replace('\'', "''")))
2000 .collect();
2001 format!("[{}]", parts.join(","))
2002}
2003
2004fn value_to_string(val: &Value) -> String {
2005 match val {
2006 Value::String(s) => s.clone(),
2007 other => format!("{:?}", other),
2008 }
2009}
2010
2011fn value_to_f32(val: &Value) -> f32 {
2012 match val {
2013 Value::Float(f) => *f,
2014 Value::Double(d) => *d as f32,
2015 Value::Int64(i) => *i as f32,
2016 _ => 0.0,
2017 }
2018}
2019
2020fn value_to_f32_vec(val: &Value) -> Vec<f32> {
2021 match val {
2022 Value::Array(_, items) | Value::List(_, items) => items.iter().map(value_to_f32).collect(),
2023 _ => Vec::new(),
2024 }
2025}
2026
2027fn row_to_memory(row: &[Value]) -> Result<Memory> {
2028 let id_str = value_to_string(&row[0]);
2029 let id = Uuid::parse_str(&id_str).unwrap_or_default();
2030 let content = value_to_string(&row[1]);
2031 let memory_type_str = value_to_string(&row[2]);
2032 let confidence = value_to_f32(&row[3]);
2033 let created_at_str = value_to_string(&row[4]);
2034 let last_accessed_str = value_to_string(&row[5]);
2035 let access_count = match &row[6] {
2036 Value::Int64(i) => *i as u32,
2037 _ => 0,
2038 };
2039 let source = value_to_string(&row[7]);
2040 let source_id = value_to_string(&row[8]);
2041 let project_path = project_path_from_db(&value_to_string(&row[9]));
2042 let machine_id = value_to_string(&row[10]);
2043 let updated_at_str = value_to_string(&row[11]);
2044
2045 let memory_type = match memory_type_str.as_str() {
2046 "episodic" => MemoryType::Episodic,
2047 "procedural" => MemoryType::Procedural,
2048 "decision" => MemoryType::Decision,
2049 "architecture" => MemoryType::Architecture,
2050 "debugging" => MemoryType::Debugging,
2051 "task" => MemoryType::Task,
2052 "question" => MemoryType::Question,
2053 _ => MemoryType::Semantic,
2054 };
2055
2056 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
2057 .map(|dt| dt.with_timezone(&chrono::Utc))
2058 .unwrap_or_else(|_| chrono::Utc::now());
2059
2060 let last_accessed = chrono::DateTime::parse_from_rfc3339(&last_accessed_str)
2061 .map(|dt| dt.with_timezone(&chrono::Utc))
2062 .unwrap_or_else(|_| chrono::Utc::now());
2063
2064 let updated_at = if updated_at_str.is_empty() {
2065 created_at
2066 } else {
2067 chrono::DateTime::parse_from_rfc3339(&updated_at_str)
2068 .map(|dt| dt.with_timezone(&chrono::Utc))
2069 .unwrap_or(created_at)
2070 };
2071
2072 Ok(Memory {
2073 id,
2074 content,
2075 embedding: Vec::new(),
2076 memory_type,
2077 confidence,
2078 created_at,
2079 last_accessed,
2080 access_count,
2081 source,
2082 source_id,
2083 project_path,
2084 machine_id,
2085 updated_at,
2086 })
2087}
2088
2089fn row_to_entity(row: &[Value]) -> Result<Entity> {
2090 let id_str = value_to_string(&row[0]);
2091 let id = Uuid::parse_str(&id_str).unwrap_or_default();
2092 let name = value_to_string(&row[1]);
2093 let entity_type = value_to_string(&row[2]);
2094
2095 Ok(Entity {
2096 id,
2097 name,
2098 entity_type,
2099 embedding: Vec::new(),
2100 aliases: Vec::new(),
2101 })
2102}
2103
2104unsafe impl Send for KuzuStore {}
2105unsafe impl Sync for KuzuStore {}
2106
2107#[cfg(test)]
2108mod tests {
2109 use super::*;
2110
2111 #[test]
2112 fn migrate_sync_log_recreates_old_schema_and_resets_cursors() {
2113 let store = KuzuStore::in_memory("test-machine-migrate".to_string()).unwrap();
2114 let conn = store.conn().unwrap();
2115
2116 conn.query("DROP TABLE SyncLog;").unwrap();
2119 conn.query(
2120 "CREATE NODE TABLE SyncLog(\
2121 seq INT64 PRIMARY KEY, \
2122 op STRING, \
2123 node_type STRING, \
2124 node_id STRING, \
2125 machine_id STRING, \
2126 timestamp STRING, \
2127 data STRING\
2128 );",
2129 )
2130 .unwrap();
2131 conn.query(
2132 "CREATE (:SyncLog {seq: 1, op: 'create', node_type: 'memory', \
2133 node_id: 'abc', machine_id: 'old-machine', \
2134 timestamp: '2024-01-01T00:00:00Z', data: ''});",
2135 )
2136 .unwrap();
2137
2138 conn.query(
2140 "MERGE (s:SyncState {peer_id: 'peer-x'}) \
2141 SET s.last_seq = 5, s.last_sync_at = '2024-01-01T00:00:00Z';",
2142 )
2143 .unwrap();
2144
2145 store.migrate_sync_log(&conn).unwrap();
2146
2147 assert!(
2149 conn.query("MATCH (s:SyncLog) RETURN s.id LIMIT 1;").is_ok(),
2150 "new-schema column s.id must be present after migration"
2151 );
2152
2153 let mut r = conn
2155 .query(
2156 "MATCH (s:SyncState {peer_id: 'peer-x'}) RETURN s.last_seq;",
2157 )
2158 .unwrap();
2159 let last_seq = match r.next() {
2160 Some(row) => match &row[0] {
2161 Value::Int64(n) => *n,
2162 _ => panic!("unexpected value type for last_seq"),
2163 },
2164 None => panic!("SyncState row not found after migration"),
2165 };
2166 assert_eq!(last_seq, 0, "last_seq must be reset to 0 after migration");
2167 }
2168
2169 #[test]
2170 fn rollback_leaves_no_partial_state() {
2171 let store = KuzuStore::in_memory("test-machine-rollback".to_string()).unwrap();
2172 let embedding = std::iter::repeat("0.0")
2173 .take(384)
2174 .collect::<Vec<_>>()
2175 .join(", ");
2176
2177 let result: Result<()> = store.with_transaction(|conn| {
2178 conn.query(&format!(
2179 "CREATE (:Memory {{id: 'rollback-test-id', content: 'x', \
2180 embedding: [{embedding}], memory_type: 'semantic', confidence: 1.0, \
2181 created_at: '2024-01-01T00:00:00Z', last_accessed: '2024-01-01T00:00:00Z', \
2182 access_count: 0, source: 'test', source_id: '', project_path: '', \
2183 machine_id: 'x', updated_at: '2024-01-01T00:00:00Z'}});"
2184 ))?;
2185 Err(anyhow::anyhow!("deliberate rollback"))
2186 });
2187
2188 assert!(
2189 result.is_err(),
2190 "with_transaction must propagate the closure error"
2191 );
2192
2193 let entries = store.sync_log_since(0).unwrap();
2194 assert_eq!(entries.len(), 0, "no SyncLog rows may survive a rollback");
2195 assert_eq!(
2196 store.memory_count().unwrap(),
2197 0,
2198 "the Memory node created inside the transaction must be rolled back"
2199 );
2200 }
2201}