1use std::path::Path;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use anyhow::{Context, Result};
5use chrono::Utc;
6use kuzu::{Connection, Database, SystemConfig, Value};
7use uuid::Uuid;
8
9use crate::schema::{
10 Conversation, Entity, Memory, MemoryType, Relation, RelationType, SyncEntry, SyncNodeType,
11 SyncOp, SyncState,
12};
13use crate::store::Store;
14
15pub struct KuzuStore {
16 db: Database,
17 machine_id: String,
18 sync_seq: AtomicU64,
19}
20
21impl KuzuStore {
22 pub fn open(path: &Path, machine_id: String) -> Result<Self> {
23 let db = Database::new(path, SystemConfig::default()).context("opening KùzuDB database")?;
24 let store = Self {
25 db,
26 machine_id,
27 sync_seq: AtomicU64::new(1),
28 };
29 store.init_schema()?;
30 store.init_sync_seq()?;
31 Ok(store)
32 }
33
34 pub fn in_memory(machine_id: String) -> Result<Self> {
35 let db =
36 Database::in_memory(SystemConfig::default()).context("creating in-memory KùzuDB")?;
37 let store = Self {
38 db,
39 machine_id,
40 sync_seq: AtomicU64::new(1),
41 };
42 store.init_schema()?;
43 Ok(store)
44 }
45
46 pub fn machine_id(&self) -> &str {
47 &self.machine_id
48 }
49
50 fn conn(&self) -> Result<Connection<'_>> {
51 Connection::new(&self.db).context("creating connection")
52 }
53
54 pub fn diagnostic(&self) -> Result<String> {
55 let conn = self.conn()?;
56
57 let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
58 let total: i64 = result
59 .next()
60 .map(|r| match &r[0] {
61 Value::Int64(v) => *v,
62 _ => -1,
63 })
64 .unwrap_or(-1);
65
66 let mut result = conn.query(
67 "MATCH (m:Memory) WHERE size(m.embedding) > 0 AND m.embedding[1] <> 0.0 RETURN count(m);",
68 )?;
69 let with_emb: i64 = result
70 .next()
71 .map(|r| match &r[0] {
72 Value::Int64(v) => *v,
73 _ => -1,
74 })
75 .unwrap_or(-1);
76
77 let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
78 let zeros_str = format!("[{}]", zeros.join(","));
79 let idx_query = format!(
80 "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, 1) YIELD node, distance RETURN distance;",
81 zeros_str
82 );
83 let idx_status = match conn.query(&idx_query) {
84 Ok(mut r) => match r.next() {
85 Some(row) => format!("ok (distance: {:?})", &row[0]),
86 None => "ok (0 results)".to_string(),
87 },
88 Err(e) => format!("error: {e}"),
89 };
90
91 Ok(format!(
92 "total memories: {total}\nwith embeddings: {with_emb}\nvector index: {idx_status}"
93 ))
94 }
95
96 fn init_schema(&self) -> Result<()> {
97 let conn = self.conn()?;
98
99 conn.query(
100 "CREATE NODE TABLE IF NOT EXISTS Memory(
101 id STRING PRIMARY KEY,
102 content STRING,
103 embedding FLOAT[384],
104 memory_type STRING,
105 confidence FLOAT,
106 created_at STRING,
107 last_accessed STRING,
108 access_count INT64,
109 source STRING,
110 source_id STRING
111 );",
112 )
113 .context("creating Memory table")?;
114
115 conn.query(
116 "CREATE NODE TABLE IF NOT EXISTS Entity(
117 id STRING PRIMARY KEY,
118 name STRING,
119 entity_type STRING,
120 embedding FLOAT[384],
121 aliases STRING[]
122 );",
123 )
124 .context("creating Entity table")?;
125
126 conn.query(
127 "CREATE NODE TABLE IF NOT EXISTS Conversation(
128 id STRING PRIMARY KEY,
129 source STRING,
130 machine_id STRING,
131 started_at STRING,
132 project_path STRING
133 );",
134 )
135 .context("creating Conversation table")?;
136
137 conn.query(
138 "CREATE NODE TABLE IF NOT EXISTS IngestLog(
139 file_path STRING PRIMARY KEY,
140 file_hash STRING,
141 ingested_at STRING,
142 memory_count INT64,
143 source STRING
144 );",
145 )
146 .context("creating IngestLog table")?;
147
148 conn.query(
149 "CREATE REL TABLE IF NOT EXISTS RELATES_TO(
150 FROM Memory TO Memory,
151 strength FLOAT,
152 context STRING
153 );",
154 )
155 .context("creating RELATES_TO rel")?;
156
157 conn.query(
158 "CREATE REL TABLE IF NOT EXISTS MENTIONS(
159 FROM Memory TO Entity,
160 position INT64
161 );",
162 )
163 .context("creating MENTIONS rel")?;
164
165 conn.query(
166 "CREATE REL TABLE IF NOT EXISTS DERIVED_FROM(
167 FROM Memory TO Conversation,
168 transformation STRING
169 );",
170 )
171 .context("creating DERIVED_FROM rel")?;
172
173 conn.query(
174 "CREATE REL TABLE IF NOT EXISTS DISTILLED_FROM(
175 FROM Memory TO Memory,
176 model STRING
177 );",
178 )
179 .context("creating DISTILLED_FROM rel")?;
180
181 conn.query(
182 "CREATE REL TABLE IF NOT EXISTS CONTRADICTS(
183 FROM Memory TO Memory,
184 resolution STRING
185 );",
186 )
187 .context("creating CONTRADICTS rel")?;
188
189 conn.query(
190 "CREATE REL TABLE IF NOT EXISTS REINFORCES(
191 FROM Memory TO Memory
192 );",
193 )
194 .context("creating REINFORCES rel")?;
195
196 conn.query(
197 "CREATE REL TABLE IF NOT EXISTS SUPERSEDES(
198 FROM Memory TO Memory,
199 reason STRING
200 );",
201 )
202 .context("creating SUPERSEDES rel")?;
203
204 conn.query(
205 "CREATE NODE TABLE IF NOT EXISTS SyncLog(
206 seq INT64 PRIMARY KEY,
207 op STRING,
208 node_type STRING,
209 node_id STRING,
210 machine_id STRING,
211 timestamp STRING,
212 data STRING
213 );",
214 )
215 .context("creating SyncLog table")?;
216
217 conn.query(
218 "CREATE NODE TABLE IF NOT EXISTS SyncState(
219 peer_id STRING PRIMARY KEY,
220 last_seq INT64,
221 last_sync_at STRING
222 );",
223 )
224 .context("creating SyncState table")?;
225
226 conn.query(
227 "CREATE NODE TABLE IF NOT EXISTS ConsolidationLog(
228 memory_id STRING PRIMARY KEY,
229 distilled_id STRING,
230 consolidated_at STRING,
231 model STRING
232 );",
233 )
234 .context("creating ConsolidationLog table")?;
235
236 conn.query(
237 "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
238 )
239 .ok();
240
241 Ok(())
242 }
243
244 fn init_sync_seq(&self) -> Result<()> {
245 let conn = self.conn()?;
246 let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.seq);")?;
247 if let Some(row) = result.next() {
248 if let Value::Int64(max_seq) = &row[0] {
249 self.sync_seq.store((*max_seq as u64) + 1, Ordering::SeqCst);
250 }
251 }
252 Ok(())
253 }
254
255 fn log_sync_entry(
256 &self,
257 conn: &Connection<'_>,
258 op: SyncOp,
259 node_type: SyncNodeType,
260 node_id: &str,
261 data: Option<&str>,
262 ) -> Result<()> {
263 let seq = self.sync_seq.fetch_add(1, Ordering::SeqCst);
264 let timestamp = chrono::Utc::now().to_rfc3339();
265 let op_str = match op {
266 SyncOp::Create => "create",
267 SyncOp::Update => "update",
268 SyncOp::Delete => "delete",
269 };
270 let nt_str = match node_type {
271 SyncNodeType::Memory => "memory",
272 SyncNodeType::Entity => "entity",
273 SyncNodeType::Conversation => "conversation",
274 SyncNodeType::Relation => "relation",
275 };
276 let machine_escaped = escape_cypher(&self.machine_id);
277 let node_id_escaped = escape_cypher(node_id);
278 let data_escaped = data.map(|d| escape_cypher(d)).unwrap_or_default();
279
280 let query = format!(
281 "CREATE (:SyncLog {{
282 seq: {seq},
283 op: '{op_str}',
284 node_type: '{nt_str}',
285 node_id: '{node_id_escaped}',
286 machine_id: '{machine_escaped}',
287 timestamp: '{timestamp}',
288 data: '{data_escaped}'
289 }});"
290 );
291 conn.query(&query)?;
292 Ok(())
293 }
294}
295
296impl Store for KuzuStore {
297 fn store_memory(&self, memory: &Memory) -> Result<()> {
298 let conn = self.conn()?;
299 self.create_memory_node(&conn, memory)?;
300 let data = serde_json::to_string(memory).ok();
301 self.log_sync_entry(
302 &conn,
303 SyncOp::Create,
304 SyncNodeType::Memory,
305 &memory.id.to_string(),
306 data.as_deref(),
307 )?;
308 Ok(())
309 }
310
311 fn get_memory(&self, id: Uuid) -> Result<Option<Memory>> {
312 let conn = self.conn()?;
313 let query = format!(
314 "MATCH (m:Memory {{id: '{}'}}) RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id;",
315 id
316 );
317
318 let mut result = conn.query(&query)?;
319 match result.next() {
320 Some(row) => Ok(Some(row_to_memory(&row)?)),
321 None => Ok(None),
322 }
323 }
324
325 fn delete_memory(&self, id: Uuid) -> Result<()> {
326 let conn = self.conn()?;
327 conn.query(&format!(
328 "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
329 id
330 ))?;
331 self.log_sync_entry(
332 &conn,
333 SyncOp::Delete,
334 SyncNodeType::Memory,
335 &id.to_string(),
336 None,
337 )?;
338 Ok(())
339 }
340
341 fn store_entity(&self, entity: &Entity) -> Result<()> {
342 let conn = self.conn()?;
343 self.create_entity_node(&conn, entity)?;
344 let data = serde_json::to_string(entity).ok();
345 self.log_sync_entry(
346 &conn,
347 SyncOp::Create,
348 SyncNodeType::Entity,
349 &entity.id.to_string(),
350 data.as_deref(),
351 )?;
352 Ok(())
353 }
354
355 fn get_entity(&self, id: Uuid) -> Result<Option<Entity>> {
356 let conn = self.conn()?;
357 let mut result = conn.query(&format!(
358 "MATCH (e:Entity {{id: '{}'}}) RETURN e.id, e.name, e.entity_type;",
359 id
360 ))?;
361
362 match result.next() {
363 Some(row) => Ok(Some(row_to_entity(&row)?)),
364 None => Ok(None),
365 }
366 }
367
368 fn find_entity_by_name(&self, name: &str) -> Result<Option<Entity>> {
369 let conn = self.conn()?;
370 let name_escaped = escape_cypher(name);
371 let query = format!(
372 "MATCH (e:Entity) WHERE e.name = '{}' RETURN e.id, e.name, e.entity_type;",
373 name_escaped
374 );
375 let mut result = conn.query(&query)?;
376
377 match result.next() {
378 Some(row) => Ok(Some(row_to_entity(&row)?)),
379 None => Ok(None),
380 }
381 }
382
383 fn store_conversation(&self, conversation: &Conversation) -> Result<()> {
384 let conn = self.conn()?;
385 self.create_conversation_node(&conn, conversation)?;
386 let data = serde_json::to_string(conversation).ok();
387 self.log_sync_entry(
388 &conn,
389 SyncOp::Create,
390 SyncNodeType::Conversation,
391 &conversation.id.to_string(),
392 data.as_deref(),
393 )?;
394 Ok(())
395 }
396
397 fn store_relation(&self, relation: &Relation) -> Result<()> {
398 let conn = self.conn()?;
399 self.create_relation_edge(&conn, relation)?;
400 let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
401 let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
402 let data = serde_json::to_string(relation).ok();
403 self.log_sync_entry(
404 &conn,
405 SyncOp::Create,
406 SyncNodeType::Relation,
407 &node_id,
408 data.as_deref(),
409 )?;
410 Ok(())
411 }
412
413 fn get_relations(
414 &self,
415 node_id: Uuid,
416 relation_type: Option<RelationType>,
417 ) -> Result<Vec<Relation>> {
418 let conn = self.conn()?;
419 let id = node_id.to_string();
420
421 let query = match relation_type {
422 Some(RelationType::RelatesTo) => format!(
423 "MATCH (a:Memory {{id: '{}'}})-[r:RELATES_TO]->(b:Memory) RETURN b.id, r.strength, r.context;",
424 id
425 ),
426 Some(RelationType::Contradicts) => format!(
427 "MATCH (a:Memory {{id: '{}'}})-[r:CONTRADICTS]->(b:Memory) RETURN b.id, r.resolution;",
428 id
429 ),
430 Some(RelationType::Reinforces) => format!(
431 "MATCH (a:Memory {{id: '{}'}})-[:REINFORCES]->(b:Memory) RETURN b.id;",
432 id
433 ),
434 Some(RelationType::Supersedes) => format!(
435 "MATCH (a:Memory {{id: '{}'}})-[r:SUPERSEDES]->(b:Memory) RETURN b.id, r.reason;",
436 id
437 ),
438 Some(RelationType::Mentions) => format!(
439 "MATCH (a:Memory {{id: '{}'}})-[:MENTIONS]->(b:Entity) RETURN b.id;",
440 id
441 ),
442 Some(RelationType::DerivedFrom) => format!(
443 "MATCH (a:Memory {{id: '{}'}})-[:DERIVED_FROM]->(b:Conversation) RETURN b.id;",
444 id
445 ),
446 Some(RelationType::DistilledFrom) => format!(
447 "MATCH (a:Memory {{id: '{}'}})-[:DISTILLED_FROM]->(b:Memory) RETURN b.id;",
448 id
449 ),
450 None => {
451 let mut relations = Vec::new();
452 let mem_to_mem = [
453 ("RELATES_TO", RelationType::RelatesTo),
454 ("REINFORCES", RelationType::Reinforces),
455 ("CONTRADICTS", RelationType::Contradicts),
456 ("SUPERSEDES", RelationType::Supersedes),
457 ("DISTILLED_FROM", RelationType::DistilledFrom),
458 ];
459 for (label, rt) in &mem_to_mem {
460 let q = format!(
461 "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:Memory) RETURN b.id;",
462 id, label
463 );
464 if let Ok(mut result) = conn.query(&q) {
465 for row in &mut result {
466 let to_id_str = value_to_string(&row[0]);
467 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
468 relations.push(Relation {
469 from_id: node_id,
470 to_id,
471 relation_type: *rt,
472 strength: 1.0,
473 context: None,
474 });
475 }
476 }
477 }
478 for (label, target, rt) in [
479 ("MENTIONS", "Entity", RelationType::Mentions),
480 ("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
481 ] {
482 let q = format!(
483 "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:{}) RETURN b.id;",
484 id, label, target
485 );
486 if let Ok(mut result) = conn.query(&q) {
487 for row in &mut result {
488 let to_id_str = value_to_string(&row[0]);
489 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
490 relations.push(Relation {
491 from_id: node_id,
492 to_id,
493 relation_type: rt,
494 strength: 1.0,
495 context: None,
496 });
497 }
498 }
499 }
500 return Ok(relations);
501 },
502 };
503
504 let mut result = conn.query(&query)?;
505 let mut relations = Vec::new();
506
507 for row in &mut result {
508 let to_id_str = value_to_string(&row[0]);
509 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
510
511 relations.push(Relation {
512 from_id: node_id,
513 to_id,
514 relation_type: relation_type.unwrap_or(RelationType::RelatesTo),
515 strength: 1.0,
516 context: None,
517 });
518 }
519
520 Ok(relations)
521 }
522
523 fn vector_search(&self, embedding: &[f32], limit: usize) -> Result<Vec<(Memory, f32)>> {
524 let conn = self.conn()?;
525 let embedding_str = format_embedding(embedding);
526
527 let query = format!(
528 "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, {}) YIELD node, distance
529 RETURN node.id, node.content, node.memory_type, node.confidence,
530 node.created_at, node.last_accessed, node.access_count,
531 node.source, node.source_id, distance;",
532 embedding_str, limit
533 );
534
535 let mut result = conn.query(&query)?;
536 let mut results = Vec::new();
537
538 for row in &mut result {
539 let memory = row_to_memory(&row[..9])?;
540 let distance = value_to_f32(&row[9]);
541 let similarity = 1.0 - distance;
542 results.push((memory, similarity));
543 }
544
545 Ok(results)
546 }
547
548 fn traverse(&self, start_id: Uuid, depth: u32) -> Result<Vec<(Memory, Vec<Relation>)>> {
549 let conn = self.conn()?;
550 let query = format!(
551 "MATCH (start:Memory {{id: '{}'}})-[r:RELATES_TO|REINFORCES*1..{}]->(m:Memory)
552 RETURN DISTINCT m.id, m.content, m.memory_type, m.confidence,
553 m.created_at, m.last_accessed, m.access_count, m.source, m.source_id;",
554 start_id, depth
555 );
556
557 let mut result = conn.query(&query)?;
558 let mut results = Vec::new();
559
560 for row in &mut result {
561 let memory = row_to_memory(&row)?;
562 results.push((memory, Vec::new()));
563 }
564
565 Ok(results)
566 }
567
568 fn memories_by_source(&self, source: &str) -> Result<Vec<Memory>> {
569 let conn = self.conn()?;
570 let source_escaped = escape_cypher(source);
571 let query = format!(
572 "MATCH (m:Memory) WHERE m.source = '{}' RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id;",
573 source_escaped
574 );
575 let mut result = conn.query(&query)?;
576
577 let mut memories = Vec::new();
578 for row in &mut result {
579 memories.push(row_to_memory(&row)?);
580 }
581 Ok(memories)
582 }
583
584 fn memories_by_type(&self, memory_type: MemoryType) -> Result<Vec<Memory>> {
585 let conn = self.conn()?;
586 let type_str = format!("{:?}", memory_type).to_lowercase();
587 let query = format!(
588 "MATCH (m:Memory) WHERE m.memory_type = '{}' RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id;",
589 type_str
590 );
591 let mut result = conn.query(&query)?;
592
593 let mut memories = Vec::new();
594 for row in &mut result {
595 memories.push(row_to_memory(&row)?);
596 }
597 Ok(memories)
598 }
599
600 fn memories_needing_decay(&self, threshold_days: u32) -> Result<Vec<Memory>> {
601 let conn = self.conn()?;
602 let cutoff = chrono::Utc::now() - chrono::Duration::days(threshold_days as i64);
603 let cutoff_str = cutoff.to_rfc3339();
604
605 let query = format!(
606 "MATCH (m:Memory) WHERE m.last_accessed < '{}' AND m.confidence > 0.05 RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id;",
607 cutoff_str
608 );
609
610 let mut result = conn.query(&query)?;
611 let mut memories = Vec::new();
612 for row in &mut result {
613 memories.push(row_to_memory(&row)?);
614 }
615 Ok(memories)
616 }
617
618 fn update_memory(&self, memory: &Memory) -> Result<()> {
619 let conn = self.conn()?;
620 let id = memory.id.to_string();
621 let last_accessed = memory.last_accessed.to_rfc3339();
622
623 let query = format!(
624 "MATCH (m:Memory {{id: '{}'}}) SET m.confidence = {}, m.last_accessed = '{}', m.access_count = {};",
625 id, memory.confidence, last_accessed, memory.access_count
626 );
627
628 conn.query(&query)?;
629 let data = serde_json::to_string(memory).ok();
630 self.log_sync_entry(
631 &conn,
632 SyncOp::Update,
633 SyncNodeType::Memory,
634 &id,
635 data.as_deref(),
636 )?;
637 Ok(())
638 }
639
640 fn text_search(&self, query: &str, limit: usize) -> Result<Vec<Memory>> {
641 let conn = self.conn()?;
642 let query_escaped = escape_cypher(&query.to_lowercase());
643 let cypher = format!(
644 "MATCH (m:Memory) WHERE lower(m.content) CONTAINS '{}' RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id LIMIT {};",
645 query_escaped, limit
646 );
647
648 let mut result = conn.query(&cypher)?;
649 let mut memories = Vec::new();
650 for row in &mut result {
651 memories.push(row_to_memory(&row)?);
652 }
653 Ok(memories)
654 }
655
656 fn memory_count(&self) -> Result<usize> {
657 let conn = self.conn()?;
658 let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
659 match result.next() {
660 Some(row) => match &row[0] {
661 Value::Int64(n) => Ok(*n as usize),
662 _ => Ok(0),
663 },
664 None => Ok(0),
665 }
666 }
667}
668
669impl KuzuStore {
670 pub fn find_or_create_entity(&self, name: &str, entity_type: &str) -> Result<Entity> {
671 if let Some(existing) = self.find_entity_by_name(name)? {
672 return Ok(existing);
673 }
674 let entity = Entity::new(name.to_string(), entity_type.to_string());
675 self.store_entity(&entity)?;
676 Ok(entity)
677 }
678
679 pub fn memory_content_exists(&self, content_prefix: &str) -> Result<bool> {
680 let conn = self.conn()?;
681 let escaped = escape_cypher(content_prefix);
682 let query = format!(
683 "MATCH (m:Memory) WHERE starts_with(m.content, '{}') RETURN m.id LIMIT 1;",
684 escaped
685 );
686 let mut result = conn.query(&query)?;
687 Ok(result.next().is_some())
688 }
689
690 pub fn unconsolidated_memories(&self, limit: usize) -> Result<Vec<Memory>> {
691 let conn = self.conn()?;
692 let query = format!(
693 "MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (c:ConsolidationLog) WHERE c.memory_id = m.id}} AND m.source <> 'consolidation' RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id LIMIT {};",
694 limit
695 );
696 let mut result = conn.query(&query)?;
697 let mut memories = Vec::new();
698 for row in &mut result {
699 memories.push(row_to_memory(&row)?);
700 }
701 Ok(memories)
702 }
703
704 pub fn mark_consolidated(&self, raw_id: Uuid, distilled_id: Uuid, model: &str) -> Result<()> {
705 let conn = self.conn()?;
706 let now = Utc::now().to_rfc3339();
707 let query = format!(
708 "CREATE (:ConsolidationLog {{memory_id: '{}', distilled_id: '{}', consolidated_at: '{}', model: '{}'}});",
709 raw_id, distilled_id, now, escape_cypher(model)
710 );
711 conn.query(&query)?;
712 Ok(())
713 }
714
715 pub fn consolidation_count(&self) -> Result<usize> {
716 let conn = self.conn()?;
717 let mut result = conn.query("MATCH (c:ConsolidationLog) RETURN count(c);")?;
718 match result.next() {
719 Some(row) => match &row[0] {
720 Value::Int64(n) => Ok(*n as usize),
721 _ => Ok(0),
722 },
723 None => Ok(0),
724 }
725 }
726
727 pub fn clear_ingest_log(&self) -> Result<usize> {
728 let conn = self.conn()?;
729 let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
730 let count = match result.next() {
731 Some(row) => match &row[0] {
732 Value::Int64(n) => *n as usize,
733 _ => 0,
734 },
735 None => 0,
736 };
737 conn.query("MATCH (l:IngestLog) DETACH DELETE l;")?;
738 Ok(count)
739 }
740
741 pub fn delete_memories_by_source(&self, source: &str) -> Result<usize> {
742 let conn = self.conn()?;
743 let escaped = escape_cypher(source);
744 let mut result = conn.query(&format!(
745 "MATCH (m:Memory) WHERE m.source = '{}' RETURN count(m);",
746 escaped
747 ))?;
748 let count = match result.next() {
749 Some(row) => match &row[0] {
750 Value::Int64(n) => *n as usize,
751 _ => 0,
752 },
753 None => 0,
754 };
755 conn.query(&format!(
756 "MATCH (m:Memory) WHERE m.source = '{}' DETACH DELETE m;",
757 escaped
758 ))?;
759 Ok(count)
760 }
761
762 pub fn is_file_ingested(&self, file_path: &str) -> Result<bool> {
763 let conn = self.conn()?;
764 let escaped = escape_cypher(file_path);
765 let mut result = conn.query(&format!(
766 "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_path;",
767 escaped
768 ))?;
769 Ok(result.next().is_some())
770 }
771
772 pub fn is_file_changed(&self, file_path: &str, file_hash: &str) -> Result<bool> {
773 let conn = self.conn()?;
774 let escaped = escape_cypher(file_path);
775 let mut result = conn.query(&format!(
776 "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_hash;",
777 escaped
778 ))?;
779 match result.next() {
780 Some(row) => {
781 let stored_hash = value_to_string(&row[0]);
782 Ok(stored_hash != file_hash)
783 }
784 None => Ok(true),
785 }
786 }
787
788 pub fn mark_ingested(
789 &self,
790 file_path: &str,
791 file_hash: &str,
792 memory_count: usize,
793 source: &str,
794 ) -> Result<()> {
795 let conn = self.conn()?;
796 let path_escaped = escape_cypher(file_path);
797 let hash_escaped = escape_cypher(file_hash);
798 let source_escaped = escape_cypher(source);
799 let now = chrono::Utc::now().to_rfc3339();
800
801 conn.query(&format!(
802 "MERGE (l:IngestLog {{file_path: '{}'}})
803 SET l.file_hash = '{}', l.ingested_at = '{}', l.memory_count = {}, l.source = '{}';",
804 path_escaped, hash_escaped, now, memory_count as i64, source_escaped
805 ))?;
806 Ok(())
807 }
808
809 pub fn ingested_file_count(&self) -> Result<usize> {
810 let conn = self.conn()?;
811 let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
812 match result.next() {
813 Some(row) => match &row[0] {
814 Value::Int64(n) => Ok(*n as usize),
815 _ => Ok(0),
816 },
817 None => Ok(0),
818 }
819 }
820}
821
822impl KuzuStore {
823 fn create_memory_node(&self, conn: &Connection<'_>, memory: &Memory) -> Result<()> {
824 let id = memory.id.to_string();
825 let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
826 let created_at = memory.created_at.to_rfc3339();
827 let last_accessed = memory.last_accessed.to_rfc3339();
828 let embedding_str = format_embedding(&memory.embedding);
829 let content_escaped = escape_cypher(&memory.content);
830 let source_escaped = escape_cypher(&memory.source);
831 let source_id_escaped = escape_cypher(&memory.source_id);
832
833 let query = format!(
834 "CREATE (:Memory {{
835 id: '{id}',
836 content: '{content_escaped}',
837 embedding: {embedding_str},
838 memory_type: '{memory_type}',
839 confidence: {confidence},
840 created_at: '{created_at}',
841 last_accessed: '{last_accessed}',
842 access_count: {access_count},
843 source: '{source_escaped}',
844 source_id: '{source_id_escaped}'
845 }});",
846 confidence = memory.confidence,
847 access_count = memory.access_count,
848 );
849
850 conn.query(&query)?;
851 Ok(())
852 }
853
854 fn create_entity_node(&self, conn: &Connection<'_>, entity: &Entity) -> Result<()> {
855 let id = entity.id.to_string();
856 let embedding_str = format_embedding(&entity.embedding);
857 let aliases_str = format_string_array(&entity.aliases);
858 let name_escaped = escape_cypher(&entity.name);
859 let etype_escaped = escape_cypher(&entity.entity_type);
860
861 let query = format!(
862 "CREATE (:Entity {{
863 id: '{id}',
864 name: '{name_escaped}',
865 entity_type: '{etype_escaped}',
866 embedding: {embedding_str},
867 aliases: {aliases_str}
868 }});"
869 );
870
871 conn.query(&query)?;
872 Ok(())
873 }
874
875 fn create_conversation_node(
876 &self,
877 conn: &Connection<'_>,
878 conversation: &Conversation,
879 ) -> Result<()> {
880 let id = conversation.id.to_string();
881 let started_at = conversation.started_at.to_rfc3339();
882 let source_escaped = escape_cypher(&conversation.source);
883 let machine_escaped = escape_cypher(&conversation.machine_id);
884 let project_escaped = escape_cypher(conversation.project_path.as_deref().unwrap_or(""));
885
886 let query = format!(
887 "CREATE (:Conversation {{
888 id: '{id}',
889 source: '{source_escaped}',
890 machine_id: '{machine_escaped}',
891 started_at: '{started_at}',
892 project_path: '{project_escaped}'
893 }});"
894 );
895
896 conn.query(&query)?;
897 Ok(())
898 }
899
900 fn create_relation_edge(&self, conn: &Connection<'_>, relation: &Relation) -> Result<()> {
901 let from_id = relation.from_id.to_string();
902 let to_id = relation.to_id.to_string();
903
904 let query = match relation.relation_type {
905 RelationType::RelatesTo => format!(
906 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:RELATES_TO {{strength: {}, context: '{}'}}]->(b);",
907 from_id, to_id, relation.strength, relation.context.as_deref().unwrap_or("")
908 ),
909 RelationType::Mentions => format!(
910 "MATCH (a:Memory {{id: '{}'}}), (b:Entity {{id: '{}'}}) CREATE (a)-[:MENTIONS {{position: 0}}]->(b);",
911 from_id, to_id
912 ),
913 RelationType::DerivedFrom => format!(
914 "MATCH (a:Memory {{id: '{}'}}), (b:Conversation {{id: '{}'}}) CREATE (a)-[:DERIVED_FROM {{transformation: '{}'}}]->(b);",
915 from_id, to_id, relation.context.as_deref().unwrap_or("direct")
916 ),
917 RelationType::Contradicts => format!(
918 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:CONTRADICTS {{resolution: '{}'}}]->(b);",
919 from_id, to_id, relation.context.as_deref().unwrap_or("")
920 ),
921 RelationType::Reinforces => format!(
922 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:REINFORCES]->(b);",
923 from_id, to_id
924 ),
925 RelationType::DistilledFrom => format!(
926 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:DISTILLED_FROM {{model: '{}'}}]->(b);",
927 from_id, to_id, relation.context.as_deref().unwrap_or("")
928 ),
929 RelationType::Supersedes => format!(
930 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:SUPERSEDES {{reason: '{}'}}]->(b);",
931 from_id, to_id, relation.context.as_deref().unwrap_or("")
932 ),
933 };
934
935 conn.query(&query)?;
936 Ok(())
937 }
938}
939
940impl KuzuStore {
941 pub fn get_conversation(&self, id: Uuid) -> Result<Option<Conversation>> {
942 let conn = self.conn()?;
943 let mut result = conn.query(&format!(
944 "MATCH (c:Conversation {{id: '{}'}}) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
945 id
946 ))?;
947
948 match result.next() {
949 Some(row) => {
950 let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
951 let source = value_to_string(&row[1]);
952 let machine_id = value_to_string(&row[2]);
953 let started_at_str = value_to_string(&row[3]);
954 let project_path = {
955 let s = value_to_string(&row[4]);
956 if s.is_empty() {
957 None
958 } else {
959 Some(s)
960 }
961 };
962 let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
963 .map(|dt| dt.with_timezone(&chrono::Utc))
964 .unwrap_or_else(|_| chrono::Utc::now());
965
966 Ok(Some(Conversation {
967 id,
968 source,
969 machine_id,
970 started_at,
971 project_path,
972 }))
973 }
974 None => Ok(None),
975 }
976 }
977
978 pub fn get_memory_with_embedding(&self, id: Uuid) -> Result<Option<Memory>> {
979 let conn = self.conn()?;
980 let query = format!(
981 "MATCH (m:Memory {{id: '{}'}}) RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id, m.embedding;",
982 id
983 );
984
985 let mut result = conn.query(&query)?;
986 match result.next() {
987 Some(row) => {
988 let mut memory = row_to_memory(&row[..9])?;
989 memory.embedding = value_to_f32_vec(&row[9]);
990 Ok(Some(memory))
991 }
992 None => Ok(None),
993 }
994 }
995
996 pub fn import_memory(&self, memory: &Memory) -> Result<()> {
997 let conn = self.conn()?;
998 self.create_memory_node(&conn, memory)
999 }
1000
1001 pub fn import_conversation(&self, conversation: &Conversation) -> Result<()> {
1002 let conn = self.conn()?;
1003 self.create_conversation_node(&conn, conversation)
1004 }
1005
1006 pub fn import_entity(&self, entity: &Entity) -> Result<()> {
1007 let conn = self.conn()?;
1008 self.create_entity_node(&conn, entity)
1009 }
1010
1011 pub fn import_relation(&self, relation: &Relation) -> Result<()> {
1012 let conn = self.conn()?;
1013 self.create_relation_edge(&conn, relation)
1014 }
1015
1016 pub fn import_delete_memory(&self, id: Uuid) -> Result<()> {
1017 let conn = self.conn()?;
1018 conn.query(&format!(
1019 "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
1020 id
1021 ))?;
1022 Ok(())
1023 }
1024
1025 pub fn import_or_update_memory(&self, memory: &Memory) -> Result<()> {
1026 let conn = self.conn()?;
1027 let id = memory.id.to_string();
1028 let last_accessed = memory.last_accessed.to_rfc3339();
1029 let content_escaped = escape_cypher(&memory.content);
1030 let embedding_str = format_embedding(&memory.embedding);
1031 let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
1032 let source_escaped = escape_cypher(&memory.source);
1033 let source_id_escaped = escape_cypher(&memory.source_id);
1034 let created_at = memory.created_at.to_rfc3339();
1035
1036 let query = format!(
1037 "MATCH (m:Memory {{id: '{id}'}}) SET m.content = '{content_escaped}', m.embedding = {embedding_str}, m.memory_type = '{memory_type}', m.confidence = {confidence}, m.created_at = '{created_at}', m.last_accessed = '{last_accessed}', m.access_count = {access_count}, m.source = '{source_escaped}', m.source_id = '{source_id_escaped}';",
1038 confidence = memory.confidence,
1039 access_count = memory.access_count,
1040 );
1041
1042 conn.query(&query)?;
1043 Ok(())
1044 }
1045
1046 pub fn sync_log_since(&self, after_seq: u64) -> Result<Vec<SyncEntry>> {
1047 self.sync_log_page(after_seq, None)
1048 }
1049
1050 pub fn sync_log_page(&self, after_seq: u64, limit: Option<usize>) -> Result<Vec<SyncEntry>> {
1051 let conn = self.conn()?;
1052 let machine_escaped = escape_cypher(&self.machine_id);
1053 let limit_clause = match limit {
1054 Some(n) => format!(" LIMIT {}", n),
1055 None => String::new(),
1056 };
1057 let query = format!(
1058 "MATCH (s:SyncLog) WHERE s.seq > {} AND s.machine_id = '{}' RETURN s.seq, s.op, s.node_type, s.node_id, s.machine_id, s.timestamp, s.data ORDER BY s.seq{};",
1059 after_seq, machine_escaped, limit_clause
1060 );
1061
1062 let mut result = conn.query(&query)?;
1063 let mut entries = Vec::new();
1064
1065 for row in &mut result {
1066 let seq = match &row[0] {
1067 Value::Int64(n) => *n as u64,
1068 _ => 0,
1069 };
1070 let op_str = value_to_string(&row[1]);
1071 let node_type_str = value_to_string(&row[2]);
1072 let node_id = value_to_string(&row[3]);
1073 let machine_id = value_to_string(&row[4]);
1074 let timestamp_str = value_to_string(&row[5]);
1075 let data_str = value_to_string(&row[6]);
1076
1077 let op = match op_str.as_str() {
1078 "create" => SyncOp::Create,
1079 "update" => SyncOp::Update,
1080 "delete" => SyncOp::Delete,
1081 _ => continue,
1082 };
1083 let node_type = match node_type_str.as_str() {
1084 "memory" => SyncNodeType::Memory,
1085 "entity" => SyncNodeType::Entity,
1086 "conversation" => SyncNodeType::Conversation,
1087 "relation" => SyncNodeType::Relation,
1088 _ => continue,
1089 };
1090 let timestamp = chrono::DateTime::parse_from_rfc3339(×tamp_str)
1091 .map(|dt| dt.with_timezone(&chrono::Utc))
1092 .unwrap_or_else(|_| chrono::Utc::now());
1093
1094 let data = if data_str.is_empty() {
1095 None
1096 } else {
1097 Some(data_str)
1098 };
1099
1100 entries.push(SyncEntry {
1101 seq,
1102 op,
1103 node_type,
1104 node_id,
1105 machine_id,
1106 timestamp,
1107 data,
1108 });
1109 }
1110
1111 Ok(entries)
1112 }
1113
1114 pub fn get_sync_state(&self, peer_id: &str) -> Result<Option<SyncState>> {
1115 let conn = self.conn()?;
1116 let escaped = escape_cypher(peer_id);
1117 let mut result = conn.query(&format!(
1118 "MATCH (s:SyncState {{peer_id: '{}'}}) RETURN s.peer_id, s.last_seq, s.last_sync_at;",
1119 escaped
1120 ))?;
1121
1122 match result.next() {
1123 Some(row) => {
1124 let peer_id = value_to_string(&row[0]);
1125 let last_seq = match &row[1] {
1126 Value::Int64(n) => *n as u64,
1127 _ => 0,
1128 };
1129 let last_sync_at_str = value_to_string(&row[2]);
1130 let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1131 .map(|dt| dt.with_timezone(&chrono::Utc))
1132 .unwrap_or_else(|_| chrono::Utc::now());
1133
1134 Ok(Some(SyncState {
1135 peer_id,
1136 last_seq,
1137 last_sync_at,
1138 }))
1139 }
1140 None => Ok(None),
1141 }
1142 }
1143
1144 pub fn set_sync_state(&self, state: &SyncState) -> Result<()> {
1145 let conn = self.conn()?;
1146 let peer_escaped = escape_cypher(&state.peer_id);
1147 let now = state.last_sync_at.to_rfc3339();
1148
1149 conn.query(&format!(
1150 "MERGE (s:SyncState {{peer_id: '{}'}}) SET s.last_seq = {}, s.last_sync_at = '{}';",
1151 peer_escaped, state.last_seq, now
1152 ))?;
1153 Ok(())
1154 }
1155
1156 pub fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
1157 let conn = self.conn()?;
1158 let mut result =
1159 conn.query("MATCH (s:SyncState) RETURN s.peer_id, s.last_seq, s.last_sync_at;")?;
1160 let mut states = Vec::new();
1161
1162 for row in &mut result {
1163 let peer_id = value_to_string(&row[0]);
1164 let last_seq = match &row[1] {
1165 Value::Int64(n) => *n as u64,
1166 _ => 0,
1167 };
1168 let last_sync_at_str = value_to_string(&row[2]);
1169 let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1170 .map(|dt| dt.with_timezone(&chrono::Utc))
1171 .unwrap_or_else(|_| chrono::Utc::now());
1172
1173 states.push(SyncState {
1174 peer_id,
1175 last_seq,
1176 last_sync_at,
1177 });
1178 }
1179
1180 Ok(states)
1181 }
1182
1183 pub fn backfill_sync_log(&self) -> Result<u64> {
1184 let conn = self.conn()?;
1185 let mut count = 0u64;
1186
1187 let mut result = conn.query(
1188 "MATCH (m:Memory) RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id, m.embedding;",
1189 )?;
1190 for row in &mut result {
1191 let mut memory = row_to_memory(&row[..9])?;
1192 memory.embedding = value_to_f32_vec(&row[9]);
1193 let data = serde_json::to_string(&memory).ok();
1194 self.log_sync_entry(
1195 &conn,
1196 SyncOp::Create,
1197 SyncNodeType::Memory,
1198 &memory.id.to_string(),
1199 data.as_deref(),
1200 )?;
1201 count += 1;
1202 }
1203
1204 let mut result = conn.query(
1205 "MATCH (c:Conversation) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1206 )?;
1207 for row in &mut result {
1208 let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1209 let source = value_to_string(&row[1]);
1210 let machine_id = value_to_string(&row[2]);
1211 let started_at_str = value_to_string(&row[3]);
1212 let project_path = {
1213 let s = value_to_string(&row[4]);
1214 if s.is_empty() {
1215 None
1216 } else {
1217 Some(s)
1218 }
1219 };
1220 let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1221 .map(|dt| dt.with_timezone(&chrono::Utc))
1222 .unwrap_or_else(|_| chrono::Utc::now());
1223
1224 let conv = Conversation {
1225 id,
1226 source,
1227 machine_id,
1228 started_at,
1229 project_path,
1230 };
1231 let data = serde_json::to_string(&conv).ok();
1232 self.log_sync_entry(
1233 &conn,
1234 SyncOp::Create,
1235 SyncNodeType::Conversation,
1236 &conv.id.to_string(),
1237 data.as_deref(),
1238 )?;
1239 count += 1;
1240 }
1241
1242 Ok(count)
1243 }
1244}
1245
1246fn format_embedding(embedding: &[f32]) -> String {
1247 if embedding.is_empty() {
1248 let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
1249 return format!("[{}]", zeros.join(","));
1250 }
1251 let parts: Vec<String> = embedding.iter().map(|v| format!("{}", v)).collect();
1252 format!("[{}]", parts.join(","))
1253}
1254
1255fn escape_cypher(s: &str) -> String {
1256 s.replace('\\', "\\\\").replace('\'', "\\'")
1257}
1258
1259fn format_string_array(items: &[String]) -> String {
1260 let parts: Vec<String> = items
1261 .iter()
1262 .map(|s| format!("'{}'", s.replace('\'', "''")))
1263 .collect();
1264 format!("[{}]", parts.join(","))
1265}
1266
1267fn value_to_string(val: &Value) -> String {
1268 match val {
1269 Value::String(s) => s.clone(),
1270 other => format!("{:?}", other),
1271 }
1272}
1273
1274fn value_to_f32(val: &Value) -> f32 {
1275 match val {
1276 Value::Float(f) => *f,
1277 Value::Double(d) => *d as f32,
1278 Value::Int64(i) => *i as f32,
1279 _ => 0.0,
1280 }
1281}
1282
1283fn value_to_f32_vec(val: &Value) -> Vec<f32> {
1284 match val {
1285 Value::Array(_, items) | Value::List(_, items) => {
1286 items.iter().map(|v| value_to_f32(v)).collect()
1287 }
1288 _ => Vec::new(),
1289 }
1290}
1291
1292fn row_to_memory(row: &[Value]) -> Result<Memory> {
1293 let id_str = value_to_string(&row[0]);
1294 let id = Uuid::parse_str(&id_str).unwrap_or_default();
1295 let content = value_to_string(&row[1]);
1296 let memory_type_str = value_to_string(&row[2]);
1297 let confidence = value_to_f32(&row[3]);
1298 let created_at_str = value_to_string(&row[4]);
1299 let last_accessed_str = value_to_string(&row[5]);
1300 let access_count = match &row[6] {
1301 Value::Int64(i) => *i as u32,
1302 _ => 0,
1303 };
1304 let source = value_to_string(&row[7]);
1305 let source_id = value_to_string(&row[8]);
1306
1307 let memory_type = match memory_type_str.as_str() {
1308 "episodic" => MemoryType::Episodic,
1309 "procedural" => MemoryType::Procedural,
1310 _ => MemoryType::Semantic,
1311 };
1312
1313 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
1314 .map(|dt| dt.with_timezone(&chrono::Utc))
1315 .unwrap_or_else(|_| chrono::Utc::now());
1316
1317 let last_accessed = chrono::DateTime::parse_from_rfc3339(&last_accessed_str)
1318 .map(|dt| dt.with_timezone(&chrono::Utc))
1319 .unwrap_or_else(|_| chrono::Utc::now());
1320
1321 Ok(Memory {
1322 id,
1323 content,
1324 embedding: Vec::new(),
1325 memory_type,
1326 confidence,
1327 created_at,
1328 last_accessed,
1329 access_count,
1330 source,
1331 source_id,
1332 })
1333}
1334
1335fn row_to_entity(row: &[Value]) -> Result<Entity> {
1336 let id_str = value_to_string(&row[0]);
1337 let id = Uuid::parse_str(&id_str).unwrap_or_default();
1338 let name = value_to_string(&row[1]);
1339 let entity_type = value_to_string(&row[2]);
1340
1341 Ok(Entity {
1342 id,
1343 name,
1344 entity_type,
1345 embedding: Vec::new(),
1346 aliases: Vec::new(),
1347 })
1348}
1349
1350unsafe impl Send for KuzuStore {}
1351unsafe impl Sync for KuzuStore {}