1use std::path::Path;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use anyhow::{Context, Result};
5use chrono::{DateTime, Utc};
6use kuzu::{Connection, Database, SystemConfig, Value};
7use uuid::Uuid;
8
9use crate::schema::{
10 Conversation, Entity, Memory, MemoryType, Relation, RelationType, SyncEntry, SyncNodeType,
11 SyncOp, SyncState,
12};
13use crate::store::Store;
14
15pub struct KuzuStore {
16 db: Database,
17 machine_id: String,
18 sync_seq: AtomicU64,
19}
20
21fn memory_return_cols(alias: &str) -> String {
22 ["id", "content", "memory_type", "confidence", "created_at", "last_accessed", "access_count", "source", "source_id", "project_path", "machine_id"]
23 .iter()
24 .map(|col| format!("{alias}.{col}"))
25 .collect::<Vec<_>>()
26 .join(", ")
27}
28
29fn project_path_from_db(s: &str) -> Option<String> {
30 if s.is_empty() { None } else { Some(s.to_string()) }
31}
32
33impl KuzuStore {
34 pub fn open(path: &Path, machine_id: String) -> Result<Self> {
35 let db = Database::new(path, SystemConfig::default()).context("opening KùzuDB database")?;
36 let store = Self {
37 db,
38 machine_id,
39 sync_seq: AtomicU64::new(1),
40 };
41 store.init_schema()?;
42 store.init_sync_seq()?;
43 Ok(store)
44 }
45
46 pub fn in_memory(machine_id: String) -> Result<Self> {
47 let db =
48 Database::in_memory(SystemConfig::default()).context("creating in-memory KùzuDB")?;
49 let store = Self {
50 db,
51 machine_id,
52 sync_seq: AtomicU64::new(1),
53 };
54 store.init_schema()?;
55 Ok(store)
56 }
57
58 pub fn machine_id(&self) -> &str {
59 &self.machine_id
60 }
61
62 fn conn(&self) -> Result<Connection<'_>> {
63 Connection::new(&self.db).context("creating connection")
64 }
65
66 pub fn diagnostic(&self) -> Result<String> {
67 let conn = self.conn()?;
68
69 let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
70 let total: i64 = result
71 .next()
72 .map(|r| match &r[0] {
73 Value::Int64(v) => *v,
74 _ => -1,
75 })
76 .unwrap_or(-1);
77
78 let mut result = conn.query(
79 "MATCH (m:Memory) WHERE size(m.embedding) > 0 AND m.embedding[1] <> 0.0 RETURN count(m);",
80 )?;
81 let with_emb: i64 = result
82 .next()
83 .map(|r| match &r[0] {
84 Value::Int64(v) => *v,
85 _ => -1,
86 })
87 .unwrap_or(-1);
88
89 let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
90 let zeros_str = format!("[{}]", zeros.join(","));
91 let idx_query = format!(
92 "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, 1) YIELD node, distance RETURN distance;",
93 zeros_str
94 );
95 let idx_status = match conn.query(&idx_query) {
96 Ok(mut r) => match r.next() {
97 Some(row) => format!("ok (distance: {:?})", &row[0]),
98 None => "ok (0 results)".to_string(),
99 },
100 Err(e) => format!("error: {e}"),
101 };
102
103 Ok(format!(
104 "total memories: {total}\nwith embeddings: {with_emb}\nvector index: {idx_status}"
105 ))
106 }
107
108 fn init_schema(&self) -> Result<()> {
109 let conn = self.conn()?;
110
111 conn.query(
112 "CREATE NODE TABLE IF NOT EXISTS Memory(
113 id STRING PRIMARY KEY,
114 content STRING,
115 embedding FLOAT[384],
116 memory_type STRING,
117 confidence FLOAT,
118 created_at STRING,
119 last_accessed STRING,
120 access_count INT64,
121 source STRING,
122 source_id STRING,
123 project_path STRING
124 );",
125 )
126 .context("creating Memory table")?;
127
128 conn.query("ALTER TABLE Memory ADD project_path STRING DEFAULT '';").ok();
129
130 conn.query(
131 "CREATE NODE TABLE IF NOT EXISTS Machine(
132 id STRING PRIMARY KEY,
133 name STRING
134 );",
135 )
136 .context("creating Machine table")?;
137
138 conn.query("ALTER TABLE Memory ADD machine_id STRING DEFAULT '';").ok();
139
140 conn.query(
141 "CREATE NODE TABLE IF NOT EXISTS Entity(
142 id STRING PRIMARY KEY,
143 name STRING,
144 entity_type STRING,
145 embedding FLOAT[384],
146 aliases STRING[]
147 );",
148 )
149 .context("creating Entity table")?;
150
151 conn.query(
152 "CREATE NODE TABLE IF NOT EXISTS Conversation(
153 id STRING PRIMARY KEY,
154 source STRING,
155 machine_id STRING,
156 started_at STRING,
157 project_path STRING
158 );",
159 )
160 .context("creating Conversation table")?;
161
162 conn.query(
163 "CREATE NODE TABLE IF NOT EXISTS IngestLog(
164 file_path STRING PRIMARY KEY,
165 file_hash STRING,
166 ingested_at STRING,
167 memory_count INT64,
168 source STRING
169 );",
170 )
171 .context("creating IngestLog table")?;
172
173 conn.query(
174 "CREATE REL TABLE IF NOT EXISTS RELATES_TO(
175 FROM Memory TO Memory,
176 strength FLOAT,
177 context STRING
178 );",
179 )
180 .context("creating RELATES_TO rel")?;
181
182 conn.query(
183 "CREATE REL TABLE IF NOT EXISTS MENTIONS(
184 FROM Memory TO Entity,
185 position INT64
186 );",
187 )
188 .context("creating MENTIONS rel")?;
189
190 conn.query(
191 "CREATE REL TABLE IF NOT EXISTS DERIVED_FROM(
192 FROM Memory TO Conversation,
193 transformation STRING
194 );",
195 )
196 .context("creating DERIVED_FROM rel")?;
197
198 conn.query(
199 "CREATE REL TABLE IF NOT EXISTS DISTILLED_FROM(
200 FROM Memory TO Memory,
201 model STRING
202 );",
203 )
204 .context("creating DISTILLED_FROM rel")?;
205
206 conn.query(
207 "CREATE REL TABLE IF NOT EXISTS CONTRADICTS(
208 FROM Memory TO Memory,
209 resolution STRING
210 );",
211 )
212 .context("creating CONTRADICTS rel")?;
213
214 conn.query(
215 "CREATE REL TABLE IF NOT EXISTS REINFORCES(
216 FROM Memory TO Memory
217 );",
218 )
219 .context("creating REINFORCES rel")?;
220
221 conn.query(
222 "CREATE REL TABLE IF NOT EXISTS SUPERSEDES(
223 FROM Memory TO Memory,
224 reason STRING
225 );",
226 )
227 .context("creating SUPERSEDES rel")?;
228
229 conn.query(
230 "CREATE NODE TABLE IF NOT EXISTS SyncLog(
231 seq INT64 PRIMARY KEY,
232 op STRING,
233 node_type STRING,
234 node_id STRING,
235 machine_id STRING,
236 timestamp STRING,
237 data STRING
238 );",
239 )
240 .context("creating SyncLog table")?;
241
242 conn.query(
243 "CREATE NODE TABLE IF NOT EXISTS SyncState(
244 peer_id STRING PRIMARY KEY,
245 last_seq INT64,
246 last_sync_at STRING
247 );",
248 )
249 .context("creating SyncState table")?;
250
251 conn.query(
252 "CREATE NODE TABLE IF NOT EXISTS ConsolidationLog(
253 memory_id STRING PRIMARY KEY,
254 distilled_id STRING,
255 consolidated_at STRING,
256 model STRING
257 );",
258 )
259 .context("creating ConsolidationLog table")?;
260
261 conn.query(
262 "CREATE NODE TABLE IF NOT EXISTS WikiExportLog(
263 id STRING PRIMARY KEY,
264 last_sync_seq INT64,
265 exported_at STRING,
266 vault_path STRING,
267 pages_created INT64,
268 pages_updated INT64,
269 memories_processed INT64
270 );",
271 )
272 .context("creating WikiExportLog table")?;
273
274 conn.query(
275 "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
276 )
277 .ok();
278
279 Ok(())
280 }
281
282 fn init_sync_seq(&self) -> Result<()> {
283 let conn = self.conn()?;
284 let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.seq);")?;
285 if let Some(row) = result.next()
286 && let Value::Int64(max_seq) = &row[0]
287 {
288 self.sync_seq.store((*max_seq as u64) + 1, Ordering::SeqCst);
289 }
290 Ok(())
291 }
292
293 fn log_sync_entry(
294 &self,
295 conn: &Connection<'_>,
296 op: SyncOp,
297 node_type: SyncNodeType,
298 node_id: &str,
299 data: Option<&str>,
300 ) -> Result<()> {
301 let seq = self.sync_seq.fetch_add(1, Ordering::SeqCst);
302 let timestamp = chrono::Utc::now().to_rfc3339();
303 let op_str = match op {
304 SyncOp::Create => "create",
305 SyncOp::Update => "update",
306 SyncOp::Delete => "delete",
307 };
308 let nt_str = match node_type {
309 SyncNodeType::Memory => "memory",
310 SyncNodeType::Entity => "entity",
311 SyncNodeType::Conversation => "conversation",
312 SyncNodeType::Relation => "relation",
313 };
314 let machine_escaped = escape_cypher(&self.machine_id);
315 let node_id_escaped = escape_cypher(node_id);
316 let data_escaped = data.map(escape_cypher).unwrap_or_default();
317
318 let query = format!(
319 "CREATE (:SyncLog {{
320 seq: {seq},
321 op: '{op_str}',
322 node_type: '{nt_str}',
323 node_id: '{node_id_escaped}',
324 machine_id: '{machine_escaped}',
325 timestamp: '{timestamp}',
326 data: '{data_escaped}'
327 }});"
328 );
329 conn.query(&query)?;
330 Ok(())
331 }
332}
333
334impl Store for KuzuStore {
335 fn store_memory(&self, memory: &Memory) -> Result<()> {
336 let conn = self.conn()?;
337 self.create_memory_node(&conn, memory)?;
338 let data = serde_json::to_string(memory).ok();
339 self.log_sync_entry(
340 &conn,
341 SyncOp::Create,
342 SyncNodeType::Memory,
343 &memory.id.to_string(),
344 data.as_deref(),
345 )?;
346 Ok(())
347 }
348
349 fn get_memory(&self, id: Uuid) -> Result<Option<Memory>> {
350 let conn = self.conn()?;
351 let query = format!(
352 "MATCH (m:Memory {{id: '{}'}}) RETURN {};",
353 id, memory_return_cols("m")
354 );
355
356 let mut result = conn.query(&query)?;
357 match result.next() {
358 Some(row) => Ok(Some(row_to_memory(&row)?)),
359 None => Ok(None),
360 }
361 }
362
363 fn delete_memory(&self, id: Uuid) -> Result<()> {
364 let conn = self.conn()?;
365 conn.query(&format!(
366 "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
367 id
368 ))?;
369 self.log_sync_entry(
370 &conn,
371 SyncOp::Delete,
372 SyncNodeType::Memory,
373 &id.to_string(),
374 None,
375 )?;
376 Ok(())
377 }
378
379 fn store_entity(&self, entity: &Entity) -> Result<()> {
380 let conn = self.conn()?;
381 self.create_entity_node(&conn, entity)?;
382 let data = serde_json::to_string(entity).ok();
383 self.log_sync_entry(
384 &conn,
385 SyncOp::Create,
386 SyncNodeType::Entity,
387 &entity.id.to_string(),
388 data.as_deref(),
389 )?;
390 Ok(())
391 }
392
393 fn get_entity(&self, id: Uuid) -> Result<Option<Entity>> {
394 let conn = self.conn()?;
395 let mut result = conn.query(&format!(
396 "MATCH (e:Entity {{id: '{}'}}) RETURN e.id, e.name, e.entity_type;",
397 id
398 ))?;
399
400 match result.next() {
401 Some(row) => Ok(Some(row_to_entity(&row)?)),
402 None => Ok(None),
403 }
404 }
405
406 fn find_entity_by_name(&self, name: &str) -> Result<Option<Entity>> {
407 let conn = self.conn()?;
408 let name_escaped = escape_cypher(name);
409 let query = format!(
410 "MATCH (e:Entity) WHERE e.name = '{}' RETURN e.id, e.name, e.entity_type;",
411 name_escaped
412 );
413 let mut result = conn.query(&query)?;
414
415 match result.next() {
416 Some(row) => Ok(Some(row_to_entity(&row)?)),
417 None => Ok(None),
418 }
419 }
420
421 fn store_conversation(&self, conversation: &Conversation) -> Result<()> {
422 let conn = self.conn()?;
423 self.create_conversation_node(&conn, conversation)?;
424 let data = serde_json::to_string(conversation).ok();
425 self.log_sync_entry(
426 &conn,
427 SyncOp::Create,
428 SyncNodeType::Conversation,
429 &conversation.id.to_string(),
430 data.as_deref(),
431 )?;
432 Ok(())
433 }
434
435 fn store_relation(&self, relation: &Relation) -> Result<()> {
436 let conn = self.conn()?;
437 self.create_relation_edge(&conn, relation)?;
438 let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
439 let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
440 let data = serde_json::to_string(relation).ok();
441 self.log_sync_entry(
442 &conn,
443 SyncOp::Create,
444 SyncNodeType::Relation,
445 &node_id,
446 data.as_deref(),
447 )?;
448 Ok(())
449 }
450
451 fn get_relations(
452 &self,
453 node_id: Uuid,
454 relation_type: Option<RelationType>,
455 ) -> Result<Vec<Relation>> {
456 let conn = self.conn()?;
457 let id = node_id.to_string();
458
459 let query = match relation_type {
460 Some(RelationType::RelatesTo) => format!(
461 "MATCH (a:Memory {{id: '{}'}})-[r:RELATES_TO]->(b:Memory) RETURN b.id, r.strength, r.context;",
462 id
463 ),
464 Some(RelationType::Contradicts) => format!(
465 "MATCH (a:Memory {{id: '{}'}})-[r:CONTRADICTS]->(b:Memory) RETURN b.id, r.resolution;",
466 id
467 ),
468 Some(RelationType::Reinforces) => format!(
469 "MATCH (a:Memory {{id: '{}'}})-[:REINFORCES]->(b:Memory) RETURN b.id;",
470 id
471 ),
472 Some(RelationType::Supersedes) => format!(
473 "MATCH (a:Memory {{id: '{}'}})-[r:SUPERSEDES]->(b:Memory) RETURN b.id, r.reason;",
474 id
475 ),
476 Some(RelationType::Mentions) => format!(
477 "MATCH (a:Memory {{id: '{}'}})-[:MENTIONS]->(b:Entity) RETURN b.id;",
478 id
479 ),
480 Some(RelationType::DerivedFrom) => format!(
481 "MATCH (a:Memory {{id: '{}'}})-[:DERIVED_FROM]->(b:Conversation) RETURN b.id;",
482 id
483 ),
484 Some(RelationType::DistilledFrom) => format!(
485 "MATCH (a:Memory {{id: '{}'}})-[:DISTILLED_FROM]->(b:Memory) RETURN b.id;",
486 id
487 ),
488 None => {
489 let mut relations = Vec::new();
490 let mem_to_mem = [
491 ("RELATES_TO", RelationType::RelatesTo),
492 ("REINFORCES", RelationType::Reinforces),
493 ("CONTRADICTS", RelationType::Contradicts),
494 ("SUPERSEDES", RelationType::Supersedes),
495 ("DISTILLED_FROM", RelationType::DistilledFrom),
496 ];
497 for (label, rt) in &mem_to_mem {
498 let q = format!(
499 "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:Memory) RETURN b.id;",
500 id, label
501 );
502 if let Ok(mut result) = conn.query(&q) {
503 for row in &mut result {
504 let to_id_str = value_to_string(&row[0]);
505 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
506 relations.push(Relation {
507 from_id: node_id,
508 to_id,
509 relation_type: *rt,
510 strength: 1.0,
511 context: None,
512 });
513 }
514 }
515 }
516 for (label, target, rt) in [
517 ("MENTIONS", "Entity", RelationType::Mentions),
518 ("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
519 ] {
520 let q = format!(
521 "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:{}) RETURN b.id;",
522 id, label, target
523 );
524 if let Ok(mut result) = conn.query(&q) {
525 for row in &mut result {
526 let to_id_str = value_to_string(&row[0]);
527 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
528 relations.push(Relation {
529 from_id: node_id,
530 to_id,
531 relation_type: rt,
532 strength: 1.0,
533 context: None,
534 });
535 }
536 }
537 }
538 return Ok(relations);
539 }
540 };
541
542 let mut result = conn.query(&query)?;
543 let mut relations = Vec::new();
544
545 for row in &mut result {
546 let to_id_str = value_to_string(&row[0]);
547 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
548
549 relations.push(Relation {
550 from_id: node_id,
551 to_id,
552 relation_type: relation_type.unwrap_or(RelationType::RelatesTo),
553 strength: 1.0,
554 context: None,
555 });
556 }
557
558 Ok(relations)
559 }
560
561 fn vector_search(&self, embedding: &[f32], limit: usize) -> Result<Vec<(Memory, f32)>> {
562 let conn = self.conn()?;
563 let embedding_str = format_embedding(embedding);
564
565 let query = format!(
566 "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, {}) YIELD node, distance
567 RETURN {}, distance;",
568 embedding_str, limit, memory_return_cols("node")
569 );
570
571 let mut result = conn.query(&query)?;
572 let mut results = Vec::new();
573
574 for row in &mut result {
575 let memory = row_to_memory(&row[..11])?;
576 let distance = value_to_f32(&row[11]);
577 let similarity = 1.0 - distance;
578 results.push((memory, similarity));
579 }
580
581 Ok(results)
582 }
583
584 fn traverse(&self, start_id: Uuid, depth: u32) -> Result<Vec<(Memory, Vec<Relation>)>> {
585 let conn = self.conn()?;
586 let query = format!(
587 "MATCH (start:Memory {{id: '{}'}})-[r:RELATES_TO|REINFORCES*1..{}]->(m:Memory)
588 RETURN DISTINCT {};",
589 start_id, depth, memory_return_cols("m")
590 );
591
592 let mut result = conn.query(&query)?;
593 let mut results = Vec::new();
594
595 for row in &mut result {
596 let memory = row_to_memory(&row)?;
597 results.push((memory, Vec::new()));
598 }
599
600 Ok(results)
601 }
602
603 fn memories_by_source(&self, source: &str) -> Result<Vec<Memory>> {
604 let conn = self.conn()?;
605 let source_escaped = escape_cypher(source);
606 let query = format!(
607 "MATCH (m:Memory) WHERE m.source = '{}' RETURN {};",
608 source_escaped, memory_return_cols("m")
609 );
610 let mut result = conn.query(&query)?;
611
612 let mut memories = Vec::new();
613 for row in &mut result {
614 memories.push(row_to_memory(&row)?);
615 }
616 Ok(memories)
617 }
618
619 fn memories_by_type(&self, memory_type: MemoryType) -> Result<Vec<Memory>> {
620 let conn = self.conn()?;
621 let type_str = format!("{:?}", memory_type).to_lowercase();
622 let query = format!(
623 "MATCH (m:Memory) WHERE m.memory_type = '{}' RETURN {};",
624 type_str, memory_return_cols("m")
625 );
626 let mut result = conn.query(&query)?;
627
628 let mut memories = Vec::new();
629 for row in &mut result {
630 memories.push(row_to_memory(&row)?);
631 }
632 Ok(memories)
633 }
634
635 fn memories_needing_decay(&self, threshold_days: u32) -> Result<Vec<Memory>> {
636 let conn = self.conn()?;
637 let cutoff = chrono::Utc::now() - chrono::Duration::days(threshold_days as i64);
638 let cutoff_str = cutoff.to_rfc3339();
639
640 let query = format!(
641 "MATCH (m:Memory) WHERE m.last_accessed < '{}' AND m.confidence > 0.05 RETURN {};",
642 cutoff_str, memory_return_cols("m")
643 );
644
645 let mut result = conn.query(&query)?;
646 let mut memories = Vec::new();
647 for row in &mut result {
648 memories.push(row_to_memory(&row)?);
649 }
650 Ok(memories)
651 }
652
653 fn update_memory(&self, memory: &Memory) -> Result<()> {
654 let conn = self.conn()?;
655 let id = memory.id.to_string();
656 let last_accessed = memory.last_accessed.to_rfc3339();
657 let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
658
659 let query = format!(
660 "MATCH (m:Memory {{id: '{}'}}) SET m.confidence = {}, m.last_accessed = '{}', m.access_count = {}, m.project_path = '{}';",
661 id, memory.confidence, last_accessed, memory.access_count, project_path_escaped
662 );
663
664 conn.query(&query)?;
665 let data = serde_json::to_string(memory).ok();
666 self.log_sync_entry(
667 &conn,
668 SyncOp::Update,
669 SyncNodeType::Memory,
670 &id,
671 data.as_deref(),
672 )?;
673 Ok(())
674 }
675
676 fn text_search(&self, query: &str, limit: usize) -> Result<Vec<Memory>> {
677 let conn = self.conn()?;
678 let query_escaped = escape_cypher(&query.to_lowercase());
679 let cypher = format!(
680 "MATCH (m:Memory) WHERE lower(m.content) CONTAINS '{}' RETURN {} LIMIT {};",
681 query_escaped, memory_return_cols("m"), limit
682 );
683
684 let mut result = conn.query(&cypher)?;
685 let mut memories = Vec::new();
686 for row in &mut result {
687 memories.push(row_to_memory(&row)?);
688 }
689 Ok(memories)
690 }
691
692 fn memory_count(&self) -> Result<usize> {
693 let conn = self.conn()?;
694 let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
695 match result.next() {
696 Some(row) => match &row[0] {
697 Value::Int64(n) => Ok(*n as usize),
698 _ => Ok(0),
699 },
700 None => Ok(0),
701 }
702 }
703}
704
705impl KuzuStore {
706 pub fn find_or_create_entity(&self, name: &str, entity_type: &str) -> Result<Entity> {
707 if let Some(existing) = self.find_entity_by_name(name)? {
708 return Ok(existing);
709 }
710 let entity = Entity::new(name.to_string(), entity_type.to_string());
711 self.store_entity(&entity)?;
712 Ok(entity)
713 }
714
715 pub fn memory_content_exists(&self, content_prefix: &str) -> Result<bool> {
716 let conn = self.conn()?;
717 let escaped = escape_cypher(content_prefix);
718 let query = format!(
719 "MATCH (m:Memory) WHERE starts_with(m.content, '{}') RETURN m.id LIMIT 1;",
720 escaped
721 );
722 let mut result = conn.query(&query)?;
723 Ok(result.next().is_some())
724 }
725
726 pub fn unconsolidated_memories(&self, limit: usize) -> Result<Vec<Memory>> {
727 let conn = self.conn()?;
728 let query = format!(
729 "MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (c:ConsolidationLog) WHERE c.memory_id = m.id}} AND m.source <> 'consolidation' RETURN {} LIMIT {};",
730 memory_return_cols("m"), limit
731 );
732 let mut result = conn.query(&query)?;
733 let mut memories = Vec::new();
734 for row in &mut result {
735 memories.push(row_to_memory(&row)?);
736 }
737 Ok(memories)
738 }
739
740 pub fn memories_created_between(
741 &self,
742 start: &DateTime<Utc>,
743 end: &DateTime<Utc>,
744 ) -> Result<Vec<Memory>> {
745 let conn = self.conn()?;
746 let query = format!(
747 "MATCH (m:Memory) WHERE m.created_at >= '{}' AND m.created_at < '{}' RETURN {} ORDER BY m.created_at ASC;",
748 start.to_rfc3339(),
749 end.to_rfc3339(),
750 memory_return_cols("m")
751 );
752 let mut result = conn.query(&query)?;
753 let mut memories = Vec::new();
754 for row in &mut result {
755 memories.push(row_to_memory(&row)?);
756 }
757 Ok(memories)
758 }
759
760 pub fn mark_consolidated(&self, raw_id: Uuid, distilled_id: Uuid, model: &str) -> Result<()> {
761 let conn = self.conn()?;
762 let now = Utc::now().to_rfc3339();
763 let query = format!(
764 "CREATE (:ConsolidationLog {{memory_id: '{}', distilled_id: '{}', consolidated_at: '{}', model: '{}'}});",
765 raw_id,
766 distilled_id,
767 now,
768 escape_cypher(model)
769 );
770 conn.query(&query)?;
771 Ok(())
772 }
773
774 pub fn consolidation_count(&self) -> Result<usize> {
775 let conn = self.conn()?;
776 let mut result = conn.query("MATCH (c:ConsolidationLog) RETURN count(c);")?;
777 match result.next() {
778 Some(row) => match &row[0] {
779 Value::Int64(n) => Ok(*n as usize),
780 _ => Ok(0),
781 },
782 None => Ok(0),
783 }
784 }
785
786 pub fn delete_consolidated_raw(&self) -> Result<usize> {
787 let conn = self.conn()?;
788 let result = conn.query(
789 "MATCH (c:ConsolidationLog) WHERE c.memory_id <> c.distilled_id RETURN c.memory_id;",
790 )?;
791 let mut ids = Vec::new();
792 for row in result {
793 if let Value::String(id) = &row[0]
794 && let Ok(uuid) = Uuid::parse_str(id)
795 {
796 ids.push(uuid);
797 }
798 }
799 let count = ids.len();
800 for id in &ids {
801 let escaped = escape_cypher(&id.to_string());
802 conn.query(&format!(
803 "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
804 escaped
805 ))
806 .ok();
807 }
808 Ok(count)
809 }
810
811 pub fn rebuild_vector_index(&self) -> Result<()> {
812 let conn = self.conn()?;
813 conn.query("CALL DROP_VECTOR_INDEX('Memory', 'memory_emb_idx');")
814 .ok();
815 conn.query(
816 "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
817 )?;
818 Ok(())
819 }
820
821 pub fn clear_ingest_log(&self) -> Result<usize> {
822 let conn = self.conn()?;
823 let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
824 let count = match result.next() {
825 Some(row) => match &row[0] {
826 Value::Int64(n) => *n as usize,
827 _ => 0,
828 },
829 None => 0,
830 };
831 conn.query("MATCH (l:IngestLog) DETACH DELETE l;")?;
832 Ok(count)
833 }
834
835 pub fn delete_memories_by_source(&self, source: &str) -> Result<usize> {
836 let conn = self.conn()?;
837 let escaped = escape_cypher(source);
838 let mut result = conn.query(&format!(
839 "MATCH (m:Memory) WHERE m.source = '{}' RETURN count(m);",
840 escaped
841 ))?;
842 let count = match result.next() {
843 Some(row) => match &row[0] {
844 Value::Int64(n) => *n as usize,
845 _ => 0,
846 },
847 None => 0,
848 };
849 conn.query(&format!(
850 "MATCH (m:Memory) WHERE m.source = '{}' DETACH DELETE m;",
851 escaped
852 ))?;
853 Ok(count)
854 }
855
856 pub fn is_file_ingested(&self, file_path: &str) -> Result<bool> {
857 let conn = self.conn()?;
858 let escaped = escape_cypher(file_path);
859 let mut result = conn.query(&format!(
860 "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_path;",
861 escaped
862 ))?;
863 Ok(result.next().is_some())
864 }
865
866 pub fn is_file_changed(&self, file_path: &str, file_hash: &str) -> Result<bool> {
867 let conn = self.conn()?;
868 let escaped = escape_cypher(file_path);
869 let mut result = conn.query(&format!(
870 "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_hash;",
871 escaped
872 ))?;
873 match result.next() {
874 Some(row) => {
875 let stored_hash = value_to_string(&row[0]);
876 Ok(stored_hash != file_hash)
877 }
878 None => Ok(true),
879 }
880 }
881
882 pub fn mark_ingested(
883 &self,
884 file_path: &str,
885 file_hash: &str,
886 memory_count: usize,
887 source: &str,
888 ) -> Result<()> {
889 let conn = self.conn()?;
890 let path_escaped = escape_cypher(file_path);
891 let hash_escaped = escape_cypher(file_hash);
892 let source_escaped = escape_cypher(source);
893 let now = chrono::Utc::now().to_rfc3339();
894
895 conn.query(&format!(
896 "MERGE (l:IngestLog {{file_path: '{}'}})
897 SET l.file_hash = '{}', l.ingested_at = '{}', l.memory_count = {}, l.source = '{}';",
898 path_escaped, hash_escaped, now, memory_count as i64, source_escaped
899 ))?;
900 Ok(())
901 }
902
903 pub fn ingested_file_count(&self) -> Result<usize> {
904 let conn = self.conn()?;
905 let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
906 match result.next() {
907 Some(row) => match &row[0] {
908 Value::Int64(n) => Ok(*n as usize),
909 _ => Ok(0),
910 },
911 None => Ok(0),
912 }
913 }
914}
915
916impl KuzuStore {
917 fn create_memory_node(&self, conn: &Connection<'_>, memory: &Memory) -> Result<()> {
918 let id = memory.id.to_string();
919 let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
920 let created_at = memory.created_at.to_rfc3339();
921 let last_accessed = memory.last_accessed.to_rfc3339();
922 let embedding_str = format_embedding(&memory.embedding);
923 let content_escaped = escape_cypher(&memory.content);
924 let source_escaped = escape_cypher(&memory.source);
925 let source_id_escaped = escape_cypher(&memory.source_id);
926 let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
927 let machine_id_escaped = escape_cypher(&memory.machine_id);
928
929 let query = format!(
930 "CREATE (:Memory {{
931 id: '{id}',
932 content: '{content_escaped}',
933 embedding: {embedding_str},
934 memory_type: '{memory_type}',
935 confidence: {confidence},
936 created_at: '{created_at}',
937 last_accessed: '{last_accessed}',
938 access_count: {access_count},
939 source: '{source_escaped}',
940 source_id: '{source_id_escaped}',
941 project_path: '{project_path_escaped}',
942 machine_id: '{machine_id_escaped}'
943 }});",
944 confidence = memory.confidence,
945 access_count = memory.access_count,
946 );
947
948 conn.query(&query)?;
949 Ok(())
950 }
951
952 fn create_entity_node(&self, conn: &Connection<'_>, entity: &Entity) -> Result<()> {
953 let id = entity.id.to_string();
954 let embedding_str = format_embedding(&entity.embedding);
955 let aliases_str = format_string_array(&entity.aliases);
956 let name_escaped = escape_cypher(&entity.name);
957 let etype_escaped = escape_cypher(&entity.entity_type);
958
959 let query = format!(
960 "CREATE (:Entity {{
961 id: '{id}',
962 name: '{name_escaped}',
963 entity_type: '{etype_escaped}',
964 embedding: {embedding_str},
965 aliases: {aliases_str}
966 }});"
967 );
968
969 conn.query(&query)?;
970 Ok(())
971 }
972
973 fn create_conversation_node(
974 &self,
975 conn: &Connection<'_>,
976 conversation: &Conversation,
977 ) -> Result<()> {
978 let id = conversation.id.to_string();
979 let started_at = conversation.started_at.to_rfc3339();
980 let source_escaped = escape_cypher(&conversation.source);
981 let machine_escaped = escape_cypher(&conversation.machine_id);
982 let project_escaped = escape_cypher(conversation.project_path.as_deref().unwrap_or(""));
983
984 let query = format!(
985 "CREATE (:Conversation {{
986 id: '{id}',
987 source: '{source_escaped}',
988 machine_id: '{machine_escaped}',
989 started_at: '{started_at}',
990 project_path: '{project_escaped}'
991 }});"
992 );
993
994 conn.query(&query)?;
995 Ok(())
996 }
997
998 fn create_relation_edge(&self, conn: &Connection<'_>, relation: &Relation) -> Result<()> {
999 let from_id = relation.from_id.to_string();
1000 let to_id = relation.to_id.to_string();
1001
1002 let query = match relation.relation_type {
1003 RelationType::RelatesTo => format!(
1004 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:RELATES_TO {{strength: {}, context: '{}'}}]->(b);",
1005 from_id,
1006 to_id,
1007 relation.strength,
1008 relation.context.as_deref().unwrap_or("")
1009 ),
1010 RelationType::Mentions => format!(
1011 "MATCH (a:Memory {{id: '{}'}}), (b:Entity {{id: '{}'}}) CREATE (a)-[:MENTIONS {{position: 0}}]->(b);",
1012 from_id, to_id
1013 ),
1014 RelationType::DerivedFrom => format!(
1015 "MATCH (a:Memory {{id: '{}'}}), (b:Conversation {{id: '{}'}}) CREATE (a)-[:DERIVED_FROM {{transformation: '{}'}}]->(b);",
1016 from_id,
1017 to_id,
1018 relation.context.as_deref().unwrap_or("direct")
1019 ),
1020 RelationType::Contradicts => format!(
1021 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:CONTRADICTS {{resolution: '{}'}}]->(b);",
1022 from_id,
1023 to_id,
1024 relation.context.as_deref().unwrap_or("")
1025 ),
1026 RelationType::Reinforces => format!(
1027 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:REINFORCES]->(b);",
1028 from_id, to_id
1029 ),
1030 RelationType::DistilledFrom => format!(
1031 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:DISTILLED_FROM {{model: '{}'}}]->(b);",
1032 from_id,
1033 to_id,
1034 relation.context.as_deref().unwrap_or("")
1035 ),
1036 RelationType::Supersedes => format!(
1037 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:SUPERSEDES {{reason: '{}'}}]->(b);",
1038 from_id,
1039 to_id,
1040 relation.context.as_deref().unwrap_or("")
1041 ),
1042 };
1043
1044 conn.query(&query)?;
1045 Ok(())
1046 }
1047}
1048
1049impl KuzuStore {
1050 pub fn get_conversation(&self, id: Uuid) -> Result<Option<Conversation>> {
1051 let conn = self.conn()?;
1052 let mut result = conn.query(&format!(
1053 "MATCH (c:Conversation {{id: '{}'}}) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1054 id
1055 ))?;
1056
1057 match result.next() {
1058 Some(row) => {
1059 let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1060 let source = value_to_string(&row[1]);
1061 let machine_id = value_to_string(&row[2]);
1062 let started_at_str = value_to_string(&row[3]);
1063 let project_path = {
1064 let s = value_to_string(&row[4]);
1065 if s.is_empty() { None } else { Some(s) }
1066 };
1067 let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1068 .map(|dt| dt.with_timezone(&chrono::Utc))
1069 .unwrap_or_else(|_| chrono::Utc::now());
1070
1071 Ok(Some(Conversation {
1072 id,
1073 source,
1074 machine_id,
1075 started_at,
1076 project_path,
1077 }))
1078 }
1079 None => Ok(None),
1080 }
1081 }
1082
1083 pub fn get_memory_with_embedding(&self, id: Uuid) -> Result<Option<Memory>> {
1084 let conn = self.conn()?;
1085 let query = format!(
1086 "MATCH (m:Memory {{id: '{}'}}) RETURN {}, m.embedding;",
1087 id, memory_return_cols("m")
1088 );
1089
1090 let mut result = conn.query(&query)?;
1091 match result.next() {
1092 Some(row) => {
1093 let mut memory = row_to_memory(&row[..11])?;
1094 memory.embedding = value_to_f32_vec(&row[11]);
1095 Ok(Some(memory))
1096 }
1097 None => Ok(None),
1098 }
1099 }
1100
1101 pub fn import_memory(&self, memory: &Memory) -> Result<()> {
1102 let conn = self.conn()?;
1103 self.create_memory_node(&conn, memory)
1104 }
1105
1106 pub fn import_conversation(&self, conversation: &Conversation) -> Result<()> {
1107 let conn = self.conn()?;
1108 self.create_conversation_node(&conn, conversation)
1109 }
1110
1111 pub fn import_entity(&self, entity: &Entity) -> Result<()> {
1112 let conn = self.conn()?;
1113 self.create_entity_node(&conn, entity)
1114 }
1115
1116 pub fn import_relation(&self, relation: &Relation) -> Result<()> {
1117 let conn = self.conn()?;
1118 self.create_relation_edge(&conn, relation)
1119 }
1120
1121 pub fn import_delete_memory(&self, id: Uuid) -> Result<()> {
1122 let conn = self.conn()?;
1123 conn.query(&format!(
1124 "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
1125 id
1126 ))?;
1127 Ok(())
1128 }
1129
1130 pub fn import_or_update_memory(&self, memory: &Memory) -> Result<()> {
1131 let conn = self.conn()?;
1132 let id = memory.id.to_string();
1133 let last_accessed = memory.last_accessed.to_rfc3339();
1134 let content_escaped = escape_cypher(&memory.content);
1135 let embedding_str = format_embedding(&memory.embedding);
1136 let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
1137 let source_escaped = escape_cypher(&memory.source);
1138 let source_id_escaped = escape_cypher(&memory.source_id);
1139 let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
1140 let created_at = memory.created_at.to_rfc3339();
1141
1142 let query = format!(
1143 "MATCH (m:Memory {{id: '{id}'}}) SET m.content = '{content_escaped}', m.embedding = {embedding_str}, m.memory_type = '{memory_type}', m.confidence = {confidence}, m.created_at = '{created_at}', m.last_accessed = '{last_accessed}', m.access_count = {access_count}, m.source = '{source_escaped}', m.source_id = '{source_id_escaped}', m.project_path = '{project_path_escaped}';",
1144 confidence = memory.confidence,
1145 access_count = memory.access_count,
1146 );
1147
1148 conn.query(&query)?;
1149 Ok(())
1150 }
1151
1152 pub fn sync_log_since(&self, after_seq: u64) -> Result<Vec<SyncEntry>> {
1153 self.sync_log_page(after_seq, None)
1154 }
1155
1156 pub fn sync_log_page(&self, after_seq: u64, limit: Option<usize>) -> Result<Vec<SyncEntry>> {
1157 let conn = self.conn()?;
1158 let machine_escaped = escape_cypher(&self.machine_id);
1159 let limit_clause = match limit {
1160 Some(n) => format!(" LIMIT {}", n),
1161 None => String::new(),
1162 };
1163 let query = format!(
1164 "MATCH (s:SyncLog) WHERE s.seq > {} AND s.machine_id = '{}' RETURN s.seq, s.op, s.node_type, s.node_id, s.machine_id, s.timestamp, s.data ORDER BY s.seq{};",
1165 after_seq, machine_escaped, limit_clause
1166 );
1167
1168 let mut result = conn.query(&query)?;
1169 let mut entries = Vec::new();
1170
1171 for row in &mut result {
1172 let seq = match &row[0] {
1173 Value::Int64(n) => *n as u64,
1174 _ => 0,
1175 };
1176 let op_str = value_to_string(&row[1]);
1177 let node_type_str = value_to_string(&row[2]);
1178 let node_id = value_to_string(&row[3]);
1179 let machine_id = value_to_string(&row[4]);
1180 let timestamp_str = value_to_string(&row[5]);
1181 let data_str = value_to_string(&row[6]);
1182
1183 let op = match op_str.as_str() {
1184 "create" => SyncOp::Create,
1185 "update" => SyncOp::Update,
1186 "delete" => SyncOp::Delete,
1187 _ => continue,
1188 };
1189 let node_type = match node_type_str.as_str() {
1190 "memory" => SyncNodeType::Memory,
1191 "entity" => SyncNodeType::Entity,
1192 "conversation" => SyncNodeType::Conversation,
1193 "relation" => SyncNodeType::Relation,
1194 _ => continue,
1195 };
1196 let timestamp = chrono::DateTime::parse_from_rfc3339(×tamp_str)
1197 .map(|dt| dt.with_timezone(&chrono::Utc))
1198 .unwrap_or_else(|_| chrono::Utc::now());
1199
1200 let data = if data_str.is_empty() {
1201 None
1202 } else {
1203 Some(data_str)
1204 };
1205
1206 entries.push(SyncEntry {
1207 seq,
1208 op,
1209 node_type,
1210 node_id,
1211 machine_id,
1212 timestamp,
1213 data,
1214 });
1215 }
1216
1217 Ok(entries)
1218 }
1219
1220 pub fn get_sync_state(&self, peer_id: &str) -> Result<Option<SyncState>> {
1221 let conn = self.conn()?;
1222 let escaped = escape_cypher(peer_id);
1223 let mut result = conn.query(&format!(
1224 "MATCH (s:SyncState {{peer_id: '{}'}}) RETURN s.peer_id, s.last_seq, s.last_sync_at;",
1225 escaped
1226 ))?;
1227
1228 match result.next() {
1229 Some(row) => {
1230 let peer_id = value_to_string(&row[0]);
1231 let last_seq = match &row[1] {
1232 Value::Int64(n) => *n as u64,
1233 _ => 0,
1234 };
1235 let last_sync_at_str = value_to_string(&row[2]);
1236 let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1237 .map(|dt| dt.with_timezone(&chrono::Utc))
1238 .unwrap_or_else(|_| chrono::Utc::now());
1239
1240 Ok(Some(SyncState {
1241 peer_id,
1242 last_seq,
1243 last_sync_at,
1244 }))
1245 }
1246 None => Ok(None),
1247 }
1248 }
1249
1250 pub fn set_sync_state(&self, state: &SyncState) -> Result<()> {
1251 let conn = self.conn()?;
1252 let peer_escaped = escape_cypher(&state.peer_id);
1253 let now = state.last_sync_at.to_rfc3339();
1254
1255 conn.query(&format!(
1256 "MERGE (s:SyncState {{peer_id: '{}'}}) SET s.last_seq = {}, s.last_sync_at = '{}';",
1257 peer_escaped, state.last_seq, now
1258 ))?;
1259 Ok(())
1260 }
1261
1262 pub fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
1263 let conn = self.conn()?;
1264 let mut result =
1265 conn.query("MATCH (s:SyncState) RETURN s.peer_id, s.last_seq, s.last_sync_at;")?;
1266 let mut states = Vec::new();
1267
1268 for row in &mut result {
1269 let peer_id = value_to_string(&row[0]);
1270 let last_seq = match &row[1] {
1271 Value::Int64(n) => *n as u64,
1272 _ => 0,
1273 };
1274 let last_sync_at_str = value_to_string(&row[2]);
1275 let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1276 .map(|dt| dt.with_timezone(&chrono::Utc))
1277 .unwrap_or_else(|_| chrono::Utc::now());
1278
1279 states.push(SyncState {
1280 peer_id,
1281 last_seq,
1282 last_sync_at,
1283 });
1284 }
1285
1286 Ok(states)
1287 }
1288
1289 pub fn all_entities(&self) -> Result<Vec<Entity>> {
1290 let conn = self.conn()?;
1291 let mut result = conn.query("MATCH (e:Entity) RETURN e.id, e.name, e.entity_type;")?;
1292 let mut entities = Vec::new();
1293 for row in &mut result {
1294 entities.push(row_to_entity(&row)?);
1295 }
1296 Ok(entities)
1297 }
1298
1299 pub fn memories_for_entity(&self, entity_id: Uuid) -> Result<Vec<Memory>> {
1300 let conn = self.conn()?;
1301 let query = format!(
1302 "MATCH (m:Memory)-[:MENTIONS]->(e:Entity {{id: '{}'}}) RETURN {};",
1303 entity_id, memory_return_cols("m")
1304 );
1305 let mut result = conn.query(&query)?;
1306 let mut memories = Vec::new();
1307 for row in &mut result {
1308 memories.push(row_to_memory(&row)?);
1309 }
1310 Ok(memories)
1311 }
1312
1313 pub fn unassociated_memories(&self) -> Result<Vec<Memory>> {
1314 let conn = self.conn()?;
1315 let mut result = conn.query(
1316 &format!("MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (m)-[:MENTIONS]->(:Entity)}} RETURN {};", memory_return_cols("m"))
1317 )?;
1318 let mut memories = Vec::new();
1319 for row in &mut result {
1320 memories.push(row_to_memory(&row)?);
1321 }
1322 Ok(memories)
1323 }
1324
1325 pub fn related_entity_names(&self, entity_id: Uuid) -> Result<Vec<String>> {
1326 let conn = self.conn()?;
1327 let query = format!(
1328 "MATCH (m:Memory)-[:MENTIONS]->(e1:Entity {{id: '{}'}}), (m)-[:MENTIONS]->(e2:Entity) WHERE e2.id <> '{}' RETURN DISTINCT e2.name;",
1329 entity_id, entity_id
1330 );
1331 let mut result = conn.query(&query)?;
1332 let mut names = Vec::new();
1333 for row in &mut result {
1334 names.push(value_to_string(&row[0]));
1335 }
1336 Ok(names)
1337 }
1338
1339 pub fn max_sync_seq(&self) -> Result<u64> {
1340 let conn = self.conn()?;
1341 let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.seq);")?;
1342 match result.next() {
1343 Some(row) => match &row[0] {
1344 Value::Int64(n) => Ok(*n as u64),
1345 _ => Ok(0),
1346 },
1347 None => Ok(0),
1348 }
1349 }
1350
1351 pub fn backfill_project_paths(&self) -> Result<u64> {
1352 let conn = self.conn()?;
1353 let mut result = conn.query(
1354 "MATCH (m:Memory)-[:DERIVED_FROM]->(c:Conversation) WHERE m.project_path = '' AND c.project_path IS NOT NULL AND c.project_path <> '' SET m.project_path = c.project_path RETURN count(m);"
1355 )?;
1356 match result.next() {
1357 Some(row) => match &row[0] {
1358 Value::Int64(n) => Ok(*n as u64),
1359 _ => Ok(0),
1360 },
1361 None => Ok(0),
1362 }
1363 }
1364
1365 pub fn memories_by_project_path(&self, project_path: &str) -> Result<Vec<Memory>> {
1366 let conn = self.conn()?;
1367 let escaped = escape_cypher(project_path);
1368 let cols = memory_return_cols("m");
1369 let query = format!(
1370 "MATCH (m:Memory) WHERE m.project_path = '{}' RETURN {} LIMIT 20;",
1371 escaped, cols
1372 );
1373 let mut result = conn.query(&query)?;
1374 let mut memories = Vec::new();
1375 for row in &mut result {
1376 memories.push(row_to_memory(&row)?);
1377 }
1378 Ok(memories)
1379 }
1380
1381 pub fn backfill_sync_log(&self) -> Result<u64> {
1382 let conn = self.conn()?;
1383 let mut count = 0u64;
1384
1385 let mut result = conn.query(
1386 &format!("MATCH (m:Memory) RETURN {}, m.embedding;", memory_return_cols("m")),
1387 )?;
1388 for row in &mut result {
1389 let mut memory = row_to_memory(&row[..11])?;
1390 memory.embedding = value_to_f32_vec(&row[11]);
1391 let data = serde_json::to_string(&memory).ok();
1392 self.log_sync_entry(
1393 &conn,
1394 SyncOp::Create,
1395 SyncNodeType::Memory,
1396 &memory.id.to_string(),
1397 data.as_deref(),
1398 )?;
1399 count += 1;
1400 }
1401
1402 let mut result = conn.query(
1403 "MATCH (c:Conversation) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1404 )?;
1405 for row in &mut result {
1406 let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1407 let source = value_to_string(&row[1]);
1408 let machine_id = value_to_string(&row[2]);
1409 let started_at_str = value_to_string(&row[3]);
1410 let project_path = {
1411 let s = value_to_string(&row[4]);
1412 if s.is_empty() { None } else { Some(s) }
1413 };
1414 let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1415 .map(|dt| dt.with_timezone(&chrono::Utc))
1416 .unwrap_or_else(|_| chrono::Utc::now());
1417
1418 let conv = Conversation {
1419 id,
1420 source,
1421 machine_id,
1422 started_at,
1423 project_path,
1424 };
1425 let data = serde_json::to_string(&conv).ok();
1426 self.log_sync_entry(
1427 &conn,
1428 SyncOp::Create,
1429 SyncNodeType::Conversation,
1430 &conv.id.to_string(),
1431 data.as_deref(),
1432 )?;
1433 count += 1;
1434 }
1435
1436 Ok(count)
1437 }
1438}
1439
1440impl KuzuStore {
1441 pub fn upsert_machine(&self, id: &str, name: &str) -> Result<()> {
1442 let conn = self.conn()?;
1443 let id_escaped = escape_cypher(id);
1444 let name_escaped = escape_cypher(name);
1445 conn.query(&format!(
1446 "MERGE (m:Machine {{id: '{id_escaped}'}}) SET m.name = '{name_escaped}';"
1447 ))?;
1448 Ok(())
1449 }
1450
1451 pub fn get_machine_name(&self, id: &str) -> Result<Option<String>> {
1452 let conn = self.conn()?;
1453 let id_escaped = escape_cypher(id);
1454 let mut result = conn.query(&format!(
1455 "MATCH (m:Machine {{id: '{id_escaped}'}}) RETURN m.name;"
1456 ))?;
1457 match result.next() {
1458 Some(row) => Ok(Some(value_to_string(&row[0]))),
1459 None => Ok(None),
1460 }
1461 }
1462
1463 pub fn get_all_machines(&self) -> Result<std::collections::HashMap<String, String>> {
1464 let conn = self.conn()?;
1465 let mut result = conn.query("MATCH (m:Machine) RETURN m.id, m.name;")?;
1466 let mut map = std::collections::HashMap::new();
1467 for row in &mut result {
1468 map.insert(value_to_string(&row[0]), value_to_string(&row[1]));
1469 }
1470 Ok(map)
1471 }
1472
1473 pub fn backfill_machine_id(&self, machine_id: &str) -> Result<u64> {
1474 let conn = self.conn()?;
1475 let escaped = escape_cypher(machine_id);
1476 let mut result = conn.query(&format!(
1477 "MATCH (m:Memory) WHERE m.machine_id = '' SET m.machine_id = '{escaped}' RETURN count(m);"
1478 ))?;
1479 match result.next() {
1480 Some(row) => match &row[0] {
1481 Value::Int64(n) => Ok(*n as u64),
1482 _ => Ok(0),
1483 },
1484 None => Ok(0),
1485 }
1486 }
1487
1488 pub fn register_machine(&self, identity: &crate::machine::MachineIdentity) -> Result<()> {
1489 self.upsert_machine(&identity.id, &identity.name)?;
1490 let count = self.backfill_machine_id(&identity.id)?;
1491 if count > 0 {
1492 tracing::info!("backfilled machine_id on {count} existing memories");
1493 }
1494 Ok(())
1495 }
1496}
1497
1498fn format_embedding(embedding: &[f32]) -> String {
1499 if embedding.is_empty() {
1500 let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
1501 return format!("[{}]", zeros.join(","));
1502 }
1503 let parts: Vec<String> = embedding.iter().map(|v| format!("{}", v)).collect();
1504 format!("[{}]", parts.join(","))
1505}
1506
1507fn escape_cypher(s: &str) -> String {
1508 s.replace('\\', "\\\\").replace('\'', "\\'")
1509}
1510
1511fn format_string_array(items: &[String]) -> String {
1512 let parts: Vec<String> = items
1513 .iter()
1514 .map(|s| format!("'{}'", s.replace('\'', "''")))
1515 .collect();
1516 format!("[{}]", parts.join(","))
1517}
1518
1519fn value_to_string(val: &Value) -> String {
1520 match val {
1521 Value::String(s) => s.clone(),
1522 other => format!("{:?}", other),
1523 }
1524}
1525
1526fn value_to_f32(val: &Value) -> f32 {
1527 match val {
1528 Value::Float(f) => *f,
1529 Value::Double(d) => *d as f32,
1530 Value::Int64(i) => *i as f32,
1531 _ => 0.0,
1532 }
1533}
1534
1535fn value_to_f32_vec(val: &Value) -> Vec<f32> {
1536 match val {
1537 Value::Array(_, items) | Value::List(_, items) => items.iter().map(value_to_f32).collect(),
1538 _ => Vec::new(),
1539 }
1540}
1541
1542fn row_to_memory(row: &[Value]) -> Result<Memory> {
1543 let id_str = value_to_string(&row[0]);
1544 let id = Uuid::parse_str(&id_str).unwrap_or_default();
1545 let content = value_to_string(&row[1]);
1546 let memory_type_str = value_to_string(&row[2]);
1547 let confidence = value_to_f32(&row[3]);
1548 let created_at_str = value_to_string(&row[4]);
1549 let last_accessed_str = value_to_string(&row[5]);
1550 let access_count = match &row[6] {
1551 Value::Int64(i) => *i as u32,
1552 _ => 0,
1553 };
1554 let source = value_to_string(&row[7]);
1555 let source_id = value_to_string(&row[8]);
1556 let project_path = project_path_from_db(&value_to_string(&row[9]));
1557 let machine_id = value_to_string(&row[10]);
1558
1559 let memory_type = match memory_type_str.as_str() {
1560 "episodic" => MemoryType::Episodic,
1561 "procedural" => MemoryType::Procedural,
1562 "decision" => MemoryType::Decision,
1563 "architecture" => MemoryType::Architecture,
1564 "debugging" => MemoryType::Debugging,
1565 "task" => MemoryType::Task,
1566 "question" => MemoryType::Question,
1567 _ => MemoryType::Semantic,
1568 };
1569
1570 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
1571 .map(|dt| dt.with_timezone(&chrono::Utc))
1572 .unwrap_or_else(|_| chrono::Utc::now());
1573
1574 let last_accessed = chrono::DateTime::parse_from_rfc3339(&last_accessed_str)
1575 .map(|dt| dt.with_timezone(&chrono::Utc))
1576 .unwrap_or_else(|_| chrono::Utc::now());
1577
1578 Ok(Memory {
1579 id,
1580 content,
1581 embedding: Vec::new(),
1582 memory_type,
1583 confidence,
1584 created_at,
1585 last_accessed,
1586 access_count,
1587 source,
1588 source_id,
1589 project_path,
1590 machine_id,
1591 })
1592}
1593
1594fn row_to_entity(row: &[Value]) -> Result<Entity> {
1595 let id_str = value_to_string(&row[0]);
1596 let id = Uuid::parse_str(&id_str).unwrap_or_default();
1597 let name = value_to_string(&row[1]);
1598 let entity_type = value_to_string(&row[2]);
1599
1600 Ok(Entity {
1601 id,
1602 name,
1603 entity_type,
1604 embedding: Vec::new(),
1605 aliases: Vec::new(),
1606 })
1607}
1608
1609unsafe impl Send for KuzuStore {}
1610unsafe impl Sync for KuzuStore {}