1use std::path::Path;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use anyhow::{Context, Result};
5use chrono::{DateTime, Utc};
6use kuzu::{Connection, Database, SystemConfig, Value};
7use uuid::Uuid;
8
9use crate::schema::{
10 Conversation, Entity, Memory, MemoryType, Relation, RelationType, SyncEntry, SyncNodeType,
11 SyncOp, SyncState,
12};
13use crate::store::Store;
14
15pub struct KuzuStore {
16 db: Database,
17 machine_id: String,
18 sync_seq: AtomicU64,
19}
20
21fn memory_return_cols(alias: &str) -> String {
22 ["id", "content", "memory_type", "confidence", "created_at", "last_accessed", "access_count", "source", "source_id", "project_path"]
23 .iter()
24 .map(|col| format!("{alias}.{col}"))
25 .collect::<Vec<_>>()
26 .join(", ")
27}
28
29fn project_path_from_db(s: &str) -> Option<String> {
30 if s.is_empty() { None } else { Some(s.to_string()) }
31}
32
33impl KuzuStore {
34 pub fn open(path: &Path, machine_id: String) -> Result<Self> {
35 let db = Database::new(path, SystemConfig::default()).context("opening KùzuDB database")?;
36 let store = Self {
37 db,
38 machine_id,
39 sync_seq: AtomicU64::new(1),
40 };
41 store.init_schema()?;
42 store.init_sync_seq()?;
43 Ok(store)
44 }
45
46 pub fn in_memory(machine_id: String) -> Result<Self> {
47 let db =
48 Database::in_memory(SystemConfig::default()).context("creating in-memory KùzuDB")?;
49 let store = Self {
50 db,
51 machine_id,
52 sync_seq: AtomicU64::new(1),
53 };
54 store.init_schema()?;
55 Ok(store)
56 }
57
58 pub fn machine_id(&self) -> &str {
59 &self.machine_id
60 }
61
62 fn conn(&self) -> Result<Connection<'_>> {
63 Connection::new(&self.db).context("creating connection")
64 }
65
66 pub fn diagnostic(&self) -> Result<String> {
67 let conn = self.conn()?;
68
69 let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
70 let total: i64 = result
71 .next()
72 .map(|r| match &r[0] {
73 Value::Int64(v) => *v,
74 _ => -1,
75 })
76 .unwrap_or(-1);
77
78 let mut result = conn.query(
79 "MATCH (m:Memory) WHERE size(m.embedding) > 0 AND m.embedding[1] <> 0.0 RETURN count(m);",
80 )?;
81 let with_emb: i64 = result
82 .next()
83 .map(|r| match &r[0] {
84 Value::Int64(v) => *v,
85 _ => -1,
86 })
87 .unwrap_or(-1);
88
89 let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
90 let zeros_str = format!("[{}]", zeros.join(","));
91 let idx_query = format!(
92 "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, 1) YIELD node, distance RETURN distance;",
93 zeros_str
94 );
95 let idx_status = match conn.query(&idx_query) {
96 Ok(mut r) => match r.next() {
97 Some(row) => format!("ok (distance: {:?})", &row[0]),
98 None => "ok (0 results)".to_string(),
99 },
100 Err(e) => format!("error: {e}"),
101 };
102
103 Ok(format!(
104 "total memories: {total}\nwith embeddings: {with_emb}\nvector index: {idx_status}"
105 ))
106 }
107
108 fn init_schema(&self) -> Result<()> {
109 let conn = self.conn()?;
110
111 conn.query(
112 "CREATE NODE TABLE IF NOT EXISTS Memory(
113 id STRING PRIMARY KEY,
114 content STRING,
115 embedding FLOAT[384],
116 memory_type STRING,
117 confidence FLOAT,
118 created_at STRING,
119 last_accessed STRING,
120 access_count INT64,
121 source STRING,
122 source_id STRING,
123 project_path STRING
124 );",
125 )
126 .context("creating Memory table")?;
127
128 conn.query("ALTER TABLE Memory ADD project_path STRING DEFAULT '';").ok();
129
130 conn.query(
131 "CREATE NODE TABLE IF NOT EXISTS Entity(
132 id STRING PRIMARY KEY,
133 name STRING,
134 entity_type STRING,
135 embedding FLOAT[384],
136 aliases STRING[]
137 );",
138 )
139 .context("creating Entity table")?;
140
141 conn.query(
142 "CREATE NODE TABLE IF NOT EXISTS Conversation(
143 id STRING PRIMARY KEY,
144 source STRING,
145 machine_id STRING,
146 started_at STRING,
147 project_path STRING
148 );",
149 )
150 .context("creating Conversation table")?;
151
152 conn.query(
153 "CREATE NODE TABLE IF NOT EXISTS IngestLog(
154 file_path STRING PRIMARY KEY,
155 file_hash STRING,
156 ingested_at STRING,
157 memory_count INT64,
158 source STRING
159 );",
160 )
161 .context("creating IngestLog table")?;
162
163 conn.query(
164 "CREATE REL TABLE IF NOT EXISTS RELATES_TO(
165 FROM Memory TO Memory,
166 strength FLOAT,
167 context STRING
168 );",
169 )
170 .context("creating RELATES_TO rel")?;
171
172 conn.query(
173 "CREATE REL TABLE IF NOT EXISTS MENTIONS(
174 FROM Memory TO Entity,
175 position INT64
176 );",
177 )
178 .context("creating MENTIONS rel")?;
179
180 conn.query(
181 "CREATE REL TABLE IF NOT EXISTS DERIVED_FROM(
182 FROM Memory TO Conversation,
183 transformation STRING
184 );",
185 )
186 .context("creating DERIVED_FROM rel")?;
187
188 conn.query(
189 "CREATE REL TABLE IF NOT EXISTS DISTILLED_FROM(
190 FROM Memory TO Memory,
191 model STRING
192 );",
193 )
194 .context("creating DISTILLED_FROM rel")?;
195
196 conn.query(
197 "CREATE REL TABLE IF NOT EXISTS CONTRADICTS(
198 FROM Memory TO Memory,
199 resolution STRING
200 );",
201 )
202 .context("creating CONTRADICTS rel")?;
203
204 conn.query(
205 "CREATE REL TABLE IF NOT EXISTS REINFORCES(
206 FROM Memory TO Memory
207 );",
208 )
209 .context("creating REINFORCES rel")?;
210
211 conn.query(
212 "CREATE REL TABLE IF NOT EXISTS SUPERSEDES(
213 FROM Memory TO Memory,
214 reason STRING
215 );",
216 )
217 .context("creating SUPERSEDES rel")?;
218
219 conn.query(
220 "CREATE NODE TABLE IF NOT EXISTS SyncLog(
221 seq INT64 PRIMARY KEY,
222 op STRING,
223 node_type STRING,
224 node_id STRING,
225 machine_id STRING,
226 timestamp STRING,
227 data STRING
228 );",
229 )
230 .context("creating SyncLog table")?;
231
232 conn.query(
233 "CREATE NODE TABLE IF NOT EXISTS SyncState(
234 peer_id STRING PRIMARY KEY,
235 last_seq INT64,
236 last_sync_at STRING
237 );",
238 )
239 .context("creating SyncState table")?;
240
241 conn.query(
242 "CREATE NODE TABLE IF NOT EXISTS ConsolidationLog(
243 memory_id STRING PRIMARY KEY,
244 distilled_id STRING,
245 consolidated_at STRING,
246 model STRING
247 );",
248 )
249 .context("creating ConsolidationLog table")?;
250
251 conn.query(
252 "CREATE NODE TABLE IF NOT EXISTS WikiExportLog(
253 id STRING PRIMARY KEY,
254 last_sync_seq INT64,
255 exported_at STRING,
256 vault_path STRING,
257 pages_created INT64,
258 pages_updated INT64,
259 memories_processed INT64
260 );",
261 )
262 .context("creating WikiExportLog table")?;
263
264 conn.query(
265 "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
266 )
267 .ok();
268
269 Ok(())
270 }
271
272 fn init_sync_seq(&self) -> Result<()> {
273 let conn = self.conn()?;
274 let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.seq);")?;
275 if let Some(row) = result.next()
276 && let Value::Int64(max_seq) = &row[0]
277 {
278 self.sync_seq.store((*max_seq as u64) + 1, Ordering::SeqCst);
279 }
280 Ok(())
281 }
282
283 fn log_sync_entry(
284 &self,
285 conn: &Connection<'_>,
286 op: SyncOp,
287 node_type: SyncNodeType,
288 node_id: &str,
289 data: Option<&str>,
290 ) -> Result<()> {
291 let seq = self.sync_seq.fetch_add(1, Ordering::SeqCst);
292 let timestamp = chrono::Utc::now().to_rfc3339();
293 let op_str = match op {
294 SyncOp::Create => "create",
295 SyncOp::Update => "update",
296 SyncOp::Delete => "delete",
297 };
298 let nt_str = match node_type {
299 SyncNodeType::Memory => "memory",
300 SyncNodeType::Entity => "entity",
301 SyncNodeType::Conversation => "conversation",
302 SyncNodeType::Relation => "relation",
303 };
304 let machine_escaped = escape_cypher(&self.machine_id);
305 let node_id_escaped = escape_cypher(node_id);
306 let data_escaped = data.map(escape_cypher).unwrap_or_default();
307
308 let query = format!(
309 "CREATE (:SyncLog {{
310 seq: {seq},
311 op: '{op_str}',
312 node_type: '{nt_str}',
313 node_id: '{node_id_escaped}',
314 machine_id: '{machine_escaped}',
315 timestamp: '{timestamp}',
316 data: '{data_escaped}'
317 }});"
318 );
319 conn.query(&query)?;
320 Ok(())
321 }
322}
323
324impl Store for KuzuStore {
325 fn store_memory(&self, memory: &Memory) -> Result<()> {
326 let conn = self.conn()?;
327 self.create_memory_node(&conn, memory)?;
328 let data = serde_json::to_string(memory).ok();
329 self.log_sync_entry(
330 &conn,
331 SyncOp::Create,
332 SyncNodeType::Memory,
333 &memory.id.to_string(),
334 data.as_deref(),
335 )?;
336 Ok(())
337 }
338
339 fn get_memory(&self, id: Uuid) -> Result<Option<Memory>> {
340 let conn = self.conn()?;
341 let query = format!(
342 "MATCH (m:Memory {{id: '{}'}}) RETURN {};",
343 id, memory_return_cols("m")
344 );
345
346 let mut result = conn.query(&query)?;
347 match result.next() {
348 Some(row) => Ok(Some(row_to_memory(&row)?)),
349 None => Ok(None),
350 }
351 }
352
353 fn delete_memory(&self, id: Uuid) -> Result<()> {
354 let conn = self.conn()?;
355 conn.query(&format!(
356 "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
357 id
358 ))?;
359 self.log_sync_entry(
360 &conn,
361 SyncOp::Delete,
362 SyncNodeType::Memory,
363 &id.to_string(),
364 None,
365 )?;
366 Ok(())
367 }
368
369 fn store_entity(&self, entity: &Entity) -> Result<()> {
370 let conn = self.conn()?;
371 self.create_entity_node(&conn, entity)?;
372 let data = serde_json::to_string(entity).ok();
373 self.log_sync_entry(
374 &conn,
375 SyncOp::Create,
376 SyncNodeType::Entity,
377 &entity.id.to_string(),
378 data.as_deref(),
379 )?;
380 Ok(())
381 }
382
383 fn get_entity(&self, id: Uuid) -> Result<Option<Entity>> {
384 let conn = self.conn()?;
385 let mut result = conn.query(&format!(
386 "MATCH (e:Entity {{id: '{}'}}) RETURN e.id, e.name, e.entity_type;",
387 id
388 ))?;
389
390 match result.next() {
391 Some(row) => Ok(Some(row_to_entity(&row)?)),
392 None => Ok(None),
393 }
394 }
395
396 fn find_entity_by_name(&self, name: &str) -> Result<Option<Entity>> {
397 let conn = self.conn()?;
398 let name_escaped = escape_cypher(name);
399 let query = format!(
400 "MATCH (e:Entity) WHERE e.name = '{}' RETURN e.id, e.name, e.entity_type;",
401 name_escaped
402 );
403 let mut result = conn.query(&query)?;
404
405 match result.next() {
406 Some(row) => Ok(Some(row_to_entity(&row)?)),
407 None => Ok(None),
408 }
409 }
410
411 fn store_conversation(&self, conversation: &Conversation) -> Result<()> {
412 let conn = self.conn()?;
413 self.create_conversation_node(&conn, conversation)?;
414 let data = serde_json::to_string(conversation).ok();
415 self.log_sync_entry(
416 &conn,
417 SyncOp::Create,
418 SyncNodeType::Conversation,
419 &conversation.id.to_string(),
420 data.as_deref(),
421 )?;
422 Ok(())
423 }
424
425 fn store_relation(&self, relation: &Relation) -> Result<()> {
426 let conn = self.conn()?;
427 self.create_relation_edge(&conn, relation)?;
428 let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
429 let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
430 let data = serde_json::to_string(relation).ok();
431 self.log_sync_entry(
432 &conn,
433 SyncOp::Create,
434 SyncNodeType::Relation,
435 &node_id,
436 data.as_deref(),
437 )?;
438 Ok(())
439 }
440
441 fn get_relations(
442 &self,
443 node_id: Uuid,
444 relation_type: Option<RelationType>,
445 ) -> Result<Vec<Relation>> {
446 let conn = self.conn()?;
447 let id = node_id.to_string();
448
449 let query = match relation_type {
450 Some(RelationType::RelatesTo) => format!(
451 "MATCH (a:Memory {{id: '{}'}})-[r:RELATES_TO]->(b:Memory) RETURN b.id, r.strength, r.context;",
452 id
453 ),
454 Some(RelationType::Contradicts) => format!(
455 "MATCH (a:Memory {{id: '{}'}})-[r:CONTRADICTS]->(b:Memory) RETURN b.id, r.resolution;",
456 id
457 ),
458 Some(RelationType::Reinforces) => format!(
459 "MATCH (a:Memory {{id: '{}'}})-[:REINFORCES]->(b:Memory) RETURN b.id;",
460 id
461 ),
462 Some(RelationType::Supersedes) => format!(
463 "MATCH (a:Memory {{id: '{}'}})-[r:SUPERSEDES]->(b:Memory) RETURN b.id, r.reason;",
464 id
465 ),
466 Some(RelationType::Mentions) => format!(
467 "MATCH (a:Memory {{id: '{}'}})-[:MENTIONS]->(b:Entity) RETURN b.id;",
468 id
469 ),
470 Some(RelationType::DerivedFrom) => format!(
471 "MATCH (a:Memory {{id: '{}'}})-[:DERIVED_FROM]->(b:Conversation) RETURN b.id;",
472 id
473 ),
474 Some(RelationType::DistilledFrom) => format!(
475 "MATCH (a:Memory {{id: '{}'}})-[:DISTILLED_FROM]->(b:Memory) RETURN b.id;",
476 id
477 ),
478 None => {
479 let mut relations = Vec::new();
480 let mem_to_mem = [
481 ("RELATES_TO", RelationType::RelatesTo),
482 ("REINFORCES", RelationType::Reinforces),
483 ("CONTRADICTS", RelationType::Contradicts),
484 ("SUPERSEDES", RelationType::Supersedes),
485 ("DISTILLED_FROM", RelationType::DistilledFrom),
486 ];
487 for (label, rt) in &mem_to_mem {
488 let q = format!(
489 "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:Memory) RETURN b.id;",
490 id, label
491 );
492 if let Ok(mut result) = conn.query(&q) {
493 for row in &mut result {
494 let to_id_str = value_to_string(&row[0]);
495 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
496 relations.push(Relation {
497 from_id: node_id,
498 to_id,
499 relation_type: *rt,
500 strength: 1.0,
501 context: None,
502 });
503 }
504 }
505 }
506 for (label, target, rt) in [
507 ("MENTIONS", "Entity", RelationType::Mentions),
508 ("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
509 ] {
510 let q = format!(
511 "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:{}) RETURN b.id;",
512 id, label, target
513 );
514 if let Ok(mut result) = conn.query(&q) {
515 for row in &mut result {
516 let to_id_str = value_to_string(&row[0]);
517 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
518 relations.push(Relation {
519 from_id: node_id,
520 to_id,
521 relation_type: rt,
522 strength: 1.0,
523 context: None,
524 });
525 }
526 }
527 }
528 return Ok(relations);
529 }
530 };
531
532 let mut result = conn.query(&query)?;
533 let mut relations = Vec::new();
534
535 for row in &mut result {
536 let to_id_str = value_to_string(&row[0]);
537 let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
538
539 relations.push(Relation {
540 from_id: node_id,
541 to_id,
542 relation_type: relation_type.unwrap_or(RelationType::RelatesTo),
543 strength: 1.0,
544 context: None,
545 });
546 }
547
548 Ok(relations)
549 }
550
551 fn vector_search(&self, embedding: &[f32], limit: usize) -> Result<Vec<(Memory, f32)>> {
552 let conn = self.conn()?;
553 let embedding_str = format_embedding(embedding);
554
555 let query = format!(
556 "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, {}) YIELD node, distance
557 RETURN {}, distance;",
558 embedding_str, limit, memory_return_cols("node")
559 );
560
561 let mut result = conn.query(&query)?;
562 let mut results = Vec::new();
563
564 for row in &mut result {
565 let memory = row_to_memory(&row[..10])?;
566 let distance = value_to_f32(&row[10]);
567 let similarity = 1.0 - distance;
568 results.push((memory, similarity));
569 }
570
571 Ok(results)
572 }
573
574 fn traverse(&self, start_id: Uuid, depth: u32) -> Result<Vec<(Memory, Vec<Relation>)>> {
575 let conn = self.conn()?;
576 let query = format!(
577 "MATCH (start:Memory {{id: '{}'}})-[r:RELATES_TO|REINFORCES*1..{}]->(m:Memory)
578 RETURN DISTINCT {};",
579 start_id, depth, memory_return_cols("m")
580 );
581
582 let mut result = conn.query(&query)?;
583 let mut results = Vec::new();
584
585 for row in &mut result {
586 let memory = row_to_memory(&row)?;
587 results.push((memory, Vec::new()));
588 }
589
590 Ok(results)
591 }
592
593 fn memories_by_source(&self, source: &str) -> Result<Vec<Memory>> {
594 let conn = self.conn()?;
595 let source_escaped = escape_cypher(source);
596 let query = format!(
597 "MATCH (m:Memory) WHERE m.source = '{}' RETURN {};",
598 source_escaped, memory_return_cols("m")
599 );
600 let mut result = conn.query(&query)?;
601
602 let mut memories = Vec::new();
603 for row in &mut result {
604 memories.push(row_to_memory(&row)?);
605 }
606 Ok(memories)
607 }
608
609 fn memories_by_type(&self, memory_type: MemoryType) -> Result<Vec<Memory>> {
610 let conn = self.conn()?;
611 let type_str = format!("{:?}", memory_type).to_lowercase();
612 let query = format!(
613 "MATCH (m:Memory) WHERE m.memory_type = '{}' RETURN {};",
614 type_str, memory_return_cols("m")
615 );
616 let mut result = conn.query(&query)?;
617
618 let mut memories = Vec::new();
619 for row in &mut result {
620 memories.push(row_to_memory(&row)?);
621 }
622 Ok(memories)
623 }
624
625 fn memories_needing_decay(&self, threshold_days: u32) -> Result<Vec<Memory>> {
626 let conn = self.conn()?;
627 let cutoff = chrono::Utc::now() - chrono::Duration::days(threshold_days as i64);
628 let cutoff_str = cutoff.to_rfc3339();
629
630 let query = format!(
631 "MATCH (m:Memory) WHERE m.last_accessed < '{}' AND m.confidence > 0.05 RETURN {};",
632 cutoff_str, memory_return_cols("m")
633 );
634
635 let mut result = conn.query(&query)?;
636 let mut memories = Vec::new();
637 for row in &mut result {
638 memories.push(row_to_memory(&row)?);
639 }
640 Ok(memories)
641 }
642
643 fn update_memory(&self, memory: &Memory) -> Result<()> {
644 let conn = self.conn()?;
645 let id = memory.id.to_string();
646 let last_accessed = memory.last_accessed.to_rfc3339();
647 let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
648
649 let query = format!(
650 "MATCH (m:Memory {{id: '{}'}}) SET m.confidence = {}, m.last_accessed = '{}', m.access_count = {}, m.project_path = '{}';",
651 id, memory.confidence, last_accessed, memory.access_count, project_path_escaped
652 );
653
654 conn.query(&query)?;
655 let data = serde_json::to_string(memory).ok();
656 self.log_sync_entry(
657 &conn,
658 SyncOp::Update,
659 SyncNodeType::Memory,
660 &id,
661 data.as_deref(),
662 )?;
663 Ok(())
664 }
665
666 fn text_search(&self, query: &str, limit: usize) -> Result<Vec<Memory>> {
667 let conn = self.conn()?;
668 let query_escaped = escape_cypher(&query.to_lowercase());
669 let cypher = format!(
670 "MATCH (m:Memory) WHERE lower(m.content) CONTAINS '{}' RETURN {} LIMIT {};",
671 query_escaped, memory_return_cols("m"), limit
672 );
673
674 let mut result = conn.query(&cypher)?;
675 let mut memories = Vec::new();
676 for row in &mut result {
677 memories.push(row_to_memory(&row)?);
678 }
679 Ok(memories)
680 }
681
682 fn memory_count(&self) -> Result<usize> {
683 let conn = self.conn()?;
684 let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
685 match result.next() {
686 Some(row) => match &row[0] {
687 Value::Int64(n) => Ok(*n as usize),
688 _ => Ok(0),
689 },
690 None => Ok(0),
691 }
692 }
693}
694
695impl KuzuStore {
696 pub fn find_or_create_entity(&self, name: &str, entity_type: &str) -> Result<Entity> {
697 if let Some(existing) = self.find_entity_by_name(name)? {
698 return Ok(existing);
699 }
700 let entity = Entity::new(name.to_string(), entity_type.to_string());
701 self.store_entity(&entity)?;
702 Ok(entity)
703 }
704
705 pub fn memory_content_exists(&self, content_prefix: &str) -> Result<bool> {
706 let conn = self.conn()?;
707 let escaped = escape_cypher(content_prefix);
708 let query = format!(
709 "MATCH (m:Memory) WHERE starts_with(m.content, '{}') RETURN m.id LIMIT 1;",
710 escaped
711 );
712 let mut result = conn.query(&query)?;
713 Ok(result.next().is_some())
714 }
715
716 pub fn unconsolidated_memories(&self, limit: usize) -> Result<Vec<Memory>> {
717 let conn = self.conn()?;
718 let query = format!(
719 "MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (c:ConsolidationLog) WHERE c.memory_id = m.id}} AND m.source <> 'consolidation' RETURN {} LIMIT {};",
720 memory_return_cols("m"), limit
721 );
722 let mut result = conn.query(&query)?;
723 let mut memories = Vec::new();
724 for row in &mut result {
725 memories.push(row_to_memory(&row)?);
726 }
727 Ok(memories)
728 }
729
730 pub fn memories_created_between(
731 &self,
732 start: &DateTime<Utc>,
733 end: &DateTime<Utc>,
734 ) -> Result<Vec<Memory>> {
735 let conn = self.conn()?;
736 let query = format!(
737 "MATCH (m:Memory) WHERE m.created_at >= '{}' AND m.created_at < '{}' RETURN {} ORDER BY m.created_at ASC;",
738 start.to_rfc3339(),
739 end.to_rfc3339(),
740 memory_return_cols("m")
741 );
742 let mut result = conn.query(&query)?;
743 let mut memories = Vec::new();
744 for row in &mut result {
745 memories.push(row_to_memory(&row)?);
746 }
747 Ok(memories)
748 }
749
750 pub fn mark_consolidated(&self, raw_id: Uuid, distilled_id: Uuid, model: &str) -> Result<()> {
751 let conn = self.conn()?;
752 let now = Utc::now().to_rfc3339();
753 let query = format!(
754 "CREATE (:ConsolidationLog {{memory_id: '{}', distilled_id: '{}', consolidated_at: '{}', model: '{}'}});",
755 raw_id,
756 distilled_id,
757 now,
758 escape_cypher(model)
759 );
760 conn.query(&query)?;
761 Ok(())
762 }
763
764 pub fn consolidation_count(&self) -> Result<usize> {
765 let conn = self.conn()?;
766 let mut result = conn.query("MATCH (c:ConsolidationLog) RETURN count(c);")?;
767 match result.next() {
768 Some(row) => match &row[0] {
769 Value::Int64(n) => Ok(*n as usize),
770 _ => Ok(0),
771 },
772 None => Ok(0),
773 }
774 }
775
776 pub fn delete_consolidated_raw(&self) -> Result<usize> {
777 let conn = self.conn()?;
778 let result = conn.query(
779 "MATCH (c:ConsolidationLog) WHERE c.memory_id <> c.distilled_id RETURN c.memory_id;",
780 )?;
781 let mut ids = Vec::new();
782 for row in result {
783 if let Value::String(id) = &row[0]
784 && let Ok(uuid) = Uuid::parse_str(id)
785 {
786 ids.push(uuid);
787 }
788 }
789 let count = ids.len();
790 for id in &ids {
791 let escaped = escape_cypher(&id.to_string());
792 conn.query(&format!(
793 "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
794 escaped
795 ))
796 .ok();
797 }
798 Ok(count)
799 }
800
801 pub fn rebuild_vector_index(&self) -> Result<()> {
802 let conn = self.conn()?;
803 conn.query("CALL DROP_VECTOR_INDEX('Memory', 'memory_emb_idx');")
804 .ok();
805 conn.query(
806 "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
807 )?;
808 Ok(())
809 }
810
811 pub fn clear_ingest_log(&self) -> Result<usize> {
812 let conn = self.conn()?;
813 let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
814 let count = match result.next() {
815 Some(row) => match &row[0] {
816 Value::Int64(n) => *n as usize,
817 _ => 0,
818 },
819 None => 0,
820 };
821 conn.query("MATCH (l:IngestLog) DETACH DELETE l;")?;
822 Ok(count)
823 }
824
825 pub fn delete_memories_by_source(&self, source: &str) -> Result<usize> {
826 let conn = self.conn()?;
827 let escaped = escape_cypher(source);
828 let mut result = conn.query(&format!(
829 "MATCH (m:Memory) WHERE m.source = '{}' RETURN count(m);",
830 escaped
831 ))?;
832 let count = match result.next() {
833 Some(row) => match &row[0] {
834 Value::Int64(n) => *n as usize,
835 _ => 0,
836 },
837 None => 0,
838 };
839 conn.query(&format!(
840 "MATCH (m:Memory) WHERE m.source = '{}' DETACH DELETE m;",
841 escaped
842 ))?;
843 Ok(count)
844 }
845
846 pub fn is_file_ingested(&self, file_path: &str) -> Result<bool> {
847 let conn = self.conn()?;
848 let escaped = escape_cypher(file_path);
849 let mut result = conn.query(&format!(
850 "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_path;",
851 escaped
852 ))?;
853 Ok(result.next().is_some())
854 }
855
856 pub fn is_file_changed(&self, file_path: &str, file_hash: &str) -> Result<bool> {
857 let conn = self.conn()?;
858 let escaped = escape_cypher(file_path);
859 let mut result = conn.query(&format!(
860 "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_hash;",
861 escaped
862 ))?;
863 match result.next() {
864 Some(row) => {
865 let stored_hash = value_to_string(&row[0]);
866 Ok(stored_hash != file_hash)
867 }
868 None => Ok(true),
869 }
870 }
871
872 pub fn mark_ingested(
873 &self,
874 file_path: &str,
875 file_hash: &str,
876 memory_count: usize,
877 source: &str,
878 ) -> Result<()> {
879 let conn = self.conn()?;
880 let path_escaped = escape_cypher(file_path);
881 let hash_escaped = escape_cypher(file_hash);
882 let source_escaped = escape_cypher(source);
883 let now = chrono::Utc::now().to_rfc3339();
884
885 conn.query(&format!(
886 "MERGE (l:IngestLog {{file_path: '{}'}})
887 SET l.file_hash = '{}', l.ingested_at = '{}', l.memory_count = {}, l.source = '{}';",
888 path_escaped, hash_escaped, now, memory_count as i64, source_escaped
889 ))?;
890 Ok(())
891 }
892
893 pub fn ingested_file_count(&self) -> Result<usize> {
894 let conn = self.conn()?;
895 let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
896 match result.next() {
897 Some(row) => match &row[0] {
898 Value::Int64(n) => Ok(*n as usize),
899 _ => Ok(0),
900 },
901 None => Ok(0),
902 }
903 }
904}
905
906impl KuzuStore {
907 fn create_memory_node(&self, conn: &Connection<'_>, memory: &Memory) -> Result<()> {
908 let id = memory.id.to_string();
909 let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
910 let created_at = memory.created_at.to_rfc3339();
911 let last_accessed = memory.last_accessed.to_rfc3339();
912 let embedding_str = format_embedding(&memory.embedding);
913 let content_escaped = escape_cypher(&memory.content);
914 let source_escaped = escape_cypher(&memory.source);
915 let source_id_escaped = escape_cypher(&memory.source_id);
916 let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
917
918 let query = format!(
919 "CREATE (:Memory {{
920 id: '{id}',
921 content: '{content_escaped}',
922 embedding: {embedding_str},
923 memory_type: '{memory_type}',
924 confidence: {confidence},
925 created_at: '{created_at}',
926 last_accessed: '{last_accessed}',
927 access_count: {access_count},
928 source: '{source_escaped}',
929 source_id: '{source_id_escaped}',
930 project_path: '{project_path_escaped}'
931 }});",
932 confidence = memory.confidence,
933 access_count = memory.access_count,
934 );
935
936 conn.query(&query)?;
937 Ok(())
938 }
939
940 fn create_entity_node(&self, conn: &Connection<'_>, entity: &Entity) -> Result<()> {
941 let id = entity.id.to_string();
942 let embedding_str = format_embedding(&entity.embedding);
943 let aliases_str = format_string_array(&entity.aliases);
944 let name_escaped = escape_cypher(&entity.name);
945 let etype_escaped = escape_cypher(&entity.entity_type);
946
947 let query = format!(
948 "CREATE (:Entity {{
949 id: '{id}',
950 name: '{name_escaped}',
951 entity_type: '{etype_escaped}',
952 embedding: {embedding_str},
953 aliases: {aliases_str}
954 }});"
955 );
956
957 conn.query(&query)?;
958 Ok(())
959 }
960
961 fn create_conversation_node(
962 &self,
963 conn: &Connection<'_>,
964 conversation: &Conversation,
965 ) -> Result<()> {
966 let id = conversation.id.to_string();
967 let started_at = conversation.started_at.to_rfc3339();
968 let source_escaped = escape_cypher(&conversation.source);
969 let machine_escaped = escape_cypher(&conversation.machine_id);
970 let project_escaped = escape_cypher(conversation.project_path.as_deref().unwrap_or(""));
971
972 let query = format!(
973 "CREATE (:Conversation {{
974 id: '{id}',
975 source: '{source_escaped}',
976 machine_id: '{machine_escaped}',
977 started_at: '{started_at}',
978 project_path: '{project_escaped}'
979 }});"
980 );
981
982 conn.query(&query)?;
983 Ok(())
984 }
985
986 fn create_relation_edge(&self, conn: &Connection<'_>, relation: &Relation) -> Result<()> {
987 let from_id = relation.from_id.to_string();
988 let to_id = relation.to_id.to_string();
989
990 let query = match relation.relation_type {
991 RelationType::RelatesTo => format!(
992 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:RELATES_TO {{strength: {}, context: '{}'}}]->(b);",
993 from_id,
994 to_id,
995 relation.strength,
996 relation.context.as_deref().unwrap_or("")
997 ),
998 RelationType::Mentions => format!(
999 "MATCH (a:Memory {{id: '{}'}}), (b:Entity {{id: '{}'}}) CREATE (a)-[:MENTIONS {{position: 0}}]->(b);",
1000 from_id, to_id
1001 ),
1002 RelationType::DerivedFrom => format!(
1003 "MATCH (a:Memory {{id: '{}'}}), (b:Conversation {{id: '{}'}}) CREATE (a)-[:DERIVED_FROM {{transformation: '{}'}}]->(b);",
1004 from_id,
1005 to_id,
1006 relation.context.as_deref().unwrap_or("direct")
1007 ),
1008 RelationType::Contradicts => format!(
1009 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:CONTRADICTS {{resolution: '{}'}}]->(b);",
1010 from_id,
1011 to_id,
1012 relation.context.as_deref().unwrap_or("")
1013 ),
1014 RelationType::Reinforces => format!(
1015 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:REINFORCES]->(b);",
1016 from_id, to_id
1017 ),
1018 RelationType::DistilledFrom => format!(
1019 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:DISTILLED_FROM {{model: '{}'}}]->(b);",
1020 from_id,
1021 to_id,
1022 relation.context.as_deref().unwrap_or("")
1023 ),
1024 RelationType::Supersedes => format!(
1025 "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:SUPERSEDES {{reason: '{}'}}]->(b);",
1026 from_id,
1027 to_id,
1028 relation.context.as_deref().unwrap_or("")
1029 ),
1030 };
1031
1032 conn.query(&query)?;
1033 Ok(())
1034 }
1035}
1036
1037impl KuzuStore {
1038 pub fn get_conversation(&self, id: Uuid) -> Result<Option<Conversation>> {
1039 let conn = self.conn()?;
1040 let mut result = conn.query(&format!(
1041 "MATCH (c:Conversation {{id: '{}'}}) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1042 id
1043 ))?;
1044
1045 match result.next() {
1046 Some(row) => {
1047 let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1048 let source = value_to_string(&row[1]);
1049 let machine_id = value_to_string(&row[2]);
1050 let started_at_str = value_to_string(&row[3]);
1051 let project_path = {
1052 let s = value_to_string(&row[4]);
1053 if s.is_empty() { None } else { Some(s) }
1054 };
1055 let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1056 .map(|dt| dt.with_timezone(&chrono::Utc))
1057 .unwrap_or_else(|_| chrono::Utc::now());
1058
1059 Ok(Some(Conversation {
1060 id,
1061 source,
1062 machine_id,
1063 started_at,
1064 project_path,
1065 }))
1066 }
1067 None => Ok(None),
1068 }
1069 }
1070
1071 pub fn get_memory_with_embedding(&self, id: Uuid) -> Result<Option<Memory>> {
1072 let conn = self.conn()?;
1073 let query = format!(
1074 "MATCH (m:Memory {{id: '{}'}}) RETURN {}, m.embedding;",
1075 id, memory_return_cols("m")
1076 );
1077
1078 let mut result = conn.query(&query)?;
1079 match result.next() {
1080 Some(row) => {
1081 let mut memory = row_to_memory(&row[..10])?;
1082 memory.embedding = value_to_f32_vec(&row[10]);
1083 Ok(Some(memory))
1084 }
1085 None => Ok(None),
1086 }
1087 }
1088
1089 pub fn import_memory(&self, memory: &Memory) -> Result<()> {
1090 let conn = self.conn()?;
1091 self.create_memory_node(&conn, memory)
1092 }
1093
1094 pub fn import_conversation(&self, conversation: &Conversation) -> Result<()> {
1095 let conn = self.conn()?;
1096 self.create_conversation_node(&conn, conversation)
1097 }
1098
1099 pub fn import_entity(&self, entity: &Entity) -> Result<()> {
1100 let conn = self.conn()?;
1101 self.create_entity_node(&conn, entity)
1102 }
1103
1104 pub fn import_relation(&self, relation: &Relation) -> Result<()> {
1105 let conn = self.conn()?;
1106 self.create_relation_edge(&conn, relation)
1107 }
1108
1109 pub fn import_delete_memory(&self, id: Uuid) -> Result<()> {
1110 let conn = self.conn()?;
1111 conn.query(&format!(
1112 "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
1113 id
1114 ))?;
1115 Ok(())
1116 }
1117
1118 pub fn import_or_update_memory(&self, memory: &Memory) -> Result<()> {
1119 let conn = self.conn()?;
1120 let id = memory.id.to_string();
1121 let last_accessed = memory.last_accessed.to_rfc3339();
1122 let content_escaped = escape_cypher(&memory.content);
1123 let embedding_str = format_embedding(&memory.embedding);
1124 let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
1125 let source_escaped = escape_cypher(&memory.source);
1126 let source_id_escaped = escape_cypher(&memory.source_id);
1127 let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
1128 let created_at = memory.created_at.to_rfc3339();
1129
1130 let query = format!(
1131 "MATCH (m:Memory {{id: '{id}'}}) SET m.content = '{content_escaped}', m.embedding = {embedding_str}, m.memory_type = '{memory_type}', m.confidence = {confidence}, m.created_at = '{created_at}', m.last_accessed = '{last_accessed}', m.access_count = {access_count}, m.source = '{source_escaped}', m.source_id = '{source_id_escaped}', m.project_path = '{project_path_escaped}';",
1132 confidence = memory.confidence,
1133 access_count = memory.access_count,
1134 );
1135
1136 conn.query(&query)?;
1137 Ok(())
1138 }
1139
1140 pub fn sync_log_since(&self, after_seq: u64) -> Result<Vec<SyncEntry>> {
1141 self.sync_log_page(after_seq, None)
1142 }
1143
1144 pub fn sync_log_page(&self, after_seq: u64, limit: Option<usize>) -> Result<Vec<SyncEntry>> {
1145 let conn = self.conn()?;
1146 let machine_escaped = escape_cypher(&self.machine_id);
1147 let limit_clause = match limit {
1148 Some(n) => format!(" LIMIT {}", n),
1149 None => String::new(),
1150 };
1151 let query = format!(
1152 "MATCH (s:SyncLog) WHERE s.seq > {} AND s.machine_id = '{}' RETURN s.seq, s.op, s.node_type, s.node_id, s.machine_id, s.timestamp, s.data ORDER BY s.seq{};",
1153 after_seq, machine_escaped, limit_clause
1154 );
1155
1156 let mut result = conn.query(&query)?;
1157 let mut entries = Vec::new();
1158
1159 for row in &mut result {
1160 let seq = match &row[0] {
1161 Value::Int64(n) => *n as u64,
1162 _ => 0,
1163 };
1164 let op_str = value_to_string(&row[1]);
1165 let node_type_str = value_to_string(&row[2]);
1166 let node_id = value_to_string(&row[3]);
1167 let machine_id = value_to_string(&row[4]);
1168 let timestamp_str = value_to_string(&row[5]);
1169 let data_str = value_to_string(&row[6]);
1170
1171 let op = match op_str.as_str() {
1172 "create" => SyncOp::Create,
1173 "update" => SyncOp::Update,
1174 "delete" => SyncOp::Delete,
1175 _ => continue,
1176 };
1177 let node_type = match node_type_str.as_str() {
1178 "memory" => SyncNodeType::Memory,
1179 "entity" => SyncNodeType::Entity,
1180 "conversation" => SyncNodeType::Conversation,
1181 "relation" => SyncNodeType::Relation,
1182 _ => continue,
1183 };
1184 let timestamp = chrono::DateTime::parse_from_rfc3339(×tamp_str)
1185 .map(|dt| dt.with_timezone(&chrono::Utc))
1186 .unwrap_or_else(|_| chrono::Utc::now());
1187
1188 let data = if data_str.is_empty() {
1189 None
1190 } else {
1191 Some(data_str)
1192 };
1193
1194 entries.push(SyncEntry {
1195 seq,
1196 op,
1197 node_type,
1198 node_id,
1199 machine_id,
1200 timestamp,
1201 data,
1202 });
1203 }
1204
1205 Ok(entries)
1206 }
1207
1208 pub fn get_sync_state(&self, peer_id: &str) -> Result<Option<SyncState>> {
1209 let conn = self.conn()?;
1210 let escaped = escape_cypher(peer_id);
1211 let mut result = conn.query(&format!(
1212 "MATCH (s:SyncState {{peer_id: '{}'}}) RETURN s.peer_id, s.last_seq, s.last_sync_at;",
1213 escaped
1214 ))?;
1215
1216 match result.next() {
1217 Some(row) => {
1218 let peer_id = value_to_string(&row[0]);
1219 let last_seq = match &row[1] {
1220 Value::Int64(n) => *n as u64,
1221 _ => 0,
1222 };
1223 let last_sync_at_str = value_to_string(&row[2]);
1224 let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1225 .map(|dt| dt.with_timezone(&chrono::Utc))
1226 .unwrap_or_else(|_| chrono::Utc::now());
1227
1228 Ok(Some(SyncState {
1229 peer_id,
1230 last_seq,
1231 last_sync_at,
1232 }))
1233 }
1234 None => Ok(None),
1235 }
1236 }
1237
1238 pub fn set_sync_state(&self, state: &SyncState) -> Result<()> {
1239 let conn = self.conn()?;
1240 let peer_escaped = escape_cypher(&state.peer_id);
1241 let now = state.last_sync_at.to_rfc3339();
1242
1243 conn.query(&format!(
1244 "MERGE (s:SyncState {{peer_id: '{}'}}) SET s.last_seq = {}, s.last_sync_at = '{}';",
1245 peer_escaped, state.last_seq, now
1246 ))?;
1247 Ok(())
1248 }
1249
1250 pub fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
1251 let conn = self.conn()?;
1252 let mut result =
1253 conn.query("MATCH (s:SyncState) RETURN s.peer_id, s.last_seq, s.last_sync_at;")?;
1254 let mut states = Vec::new();
1255
1256 for row in &mut result {
1257 let peer_id = value_to_string(&row[0]);
1258 let last_seq = match &row[1] {
1259 Value::Int64(n) => *n as u64,
1260 _ => 0,
1261 };
1262 let last_sync_at_str = value_to_string(&row[2]);
1263 let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1264 .map(|dt| dt.with_timezone(&chrono::Utc))
1265 .unwrap_or_else(|_| chrono::Utc::now());
1266
1267 states.push(SyncState {
1268 peer_id,
1269 last_seq,
1270 last_sync_at,
1271 });
1272 }
1273
1274 Ok(states)
1275 }
1276
1277 pub fn all_entities(&self) -> Result<Vec<Entity>> {
1278 let conn = self.conn()?;
1279 let mut result = conn.query("MATCH (e:Entity) RETURN e.id, e.name, e.entity_type;")?;
1280 let mut entities = Vec::new();
1281 for row in &mut result {
1282 entities.push(row_to_entity(&row)?);
1283 }
1284 Ok(entities)
1285 }
1286
1287 pub fn memories_for_entity(&self, entity_id: Uuid) -> Result<Vec<Memory>> {
1288 let conn = self.conn()?;
1289 let query = format!(
1290 "MATCH (m:Memory)-[:MENTIONS]->(e:Entity {{id: '{}'}}) RETURN {};",
1291 entity_id, memory_return_cols("m")
1292 );
1293 let mut result = conn.query(&query)?;
1294 let mut memories = Vec::new();
1295 for row in &mut result {
1296 memories.push(row_to_memory(&row)?);
1297 }
1298 Ok(memories)
1299 }
1300
1301 pub fn unassociated_memories(&self) -> Result<Vec<Memory>> {
1302 let conn = self.conn()?;
1303 let mut result = conn.query(
1304 &format!("MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (m)-[:MENTIONS]->(:Entity)}} RETURN {};", memory_return_cols("m"))
1305 )?;
1306 let mut memories = Vec::new();
1307 for row in &mut result {
1308 memories.push(row_to_memory(&row)?);
1309 }
1310 Ok(memories)
1311 }
1312
1313 pub fn related_entity_names(&self, entity_id: Uuid) -> Result<Vec<String>> {
1314 let conn = self.conn()?;
1315 let query = format!(
1316 "MATCH (m:Memory)-[:MENTIONS]->(e1:Entity {{id: '{}'}}), (m)-[:MENTIONS]->(e2:Entity) WHERE e2.id <> '{}' RETURN DISTINCT e2.name;",
1317 entity_id, entity_id
1318 );
1319 let mut result = conn.query(&query)?;
1320 let mut names = Vec::new();
1321 for row in &mut result {
1322 names.push(value_to_string(&row[0]));
1323 }
1324 Ok(names)
1325 }
1326
1327 pub fn max_sync_seq(&self) -> Result<u64> {
1328 let conn = self.conn()?;
1329 let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.seq);")?;
1330 match result.next() {
1331 Some(row) => match &row[0] {
1332 Value::Int64(n) => Ok(*n as u64),
1333 _ => Ok(0),
1334 },
1335 None => Ok(0),
1336 }
1337 }
1338
1339 pub fn backfill_project_paths(&self) -> Result<u64> {
1340 let conn = self.conn()?;
1341 let mut result = conn.query(
1342 "MATCH (m:Memory)-[:DERIVED_FROM]->(c:Conversation) WHERE m.project_path = '' AND c.project_path IS NOT NULL AND c.project_path <> '' SET m.project_path = c.project_path RETURN count(m);"
1343 )?;
1344 match result.next() {
1345 Some(row) => match &row[0] {
1346 Value::Int64(n) => Ok(*n as u64),
1347 _ => Ok(0),
1348 },
1349 None => Ok(0),
1350 }
1351 }
1352
1353 pub fn memories_by_project_path(&self, project_path: &str) -> Result<Vec<Memory>> {
1354 let conn = self.conn()?;
1355 let escaped = escape_cypher(project_path);
1356 let cols = memory_return_cols("m");
1357 let query = format!(
1358 "MATCH (m:Memory) WHERE m.project_path = '{}' RETURN {} LIMIT 20;",
1359 escaped, cols
1360 );
1361 let mut result = conn.query(&query)?;
1362 let mut memories = Vec::new();
1363 for row in &mut result {
1364 memories.push(row_to_memory(&row)?);
1365 }
1366 Ok(memories)
1367 }
1368
1369 pub fn backfill_sync_log(&self) -> Result<u64> {
1370 let conn = self.conn()?;
1371 let mut count = 0u64;
1372
1373 let mut result = conn.query(
1374 &format!("MATCH (m:Memory) RETURN {}, m.embedding;", memory_return_cols("m")),
1375 )?;
1376 for row in &mut result {
1377 let mut memory = row_to_memory(&row[..10])?;
1378 memory.embedding = value_to_f32_vec(&row[10]);
1379 let data = serde_json::to_string(&memory).ok();
1380 self.log_sync_entry(
1381 &conn,
1382 SyncOp::Create,
1383 SyncNodeType::Memory,
1384 &memory.id.to_string(),
1385 data.as_deref(),
1386 )?;
1387 count += 1;
1388 }
1389
1390 let mut result = conn.query(
1391 "MATCH (c:Conversation) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1392 )?;
1393 for row in &mut result {
1394 let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1395 let source = value_to_string(&row[1]);
1396 let machine_id = value_to_string(&row[2]);
1397 let started_at_str = value_to_string(&row[3]);
1398 let project_path = {
1399 let s = value_to_string(&row[4]);
1400 if s.is_empty() { None } else { Some(s) }
1401 };
1402 let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1403 .map(|dt| dt.with_timezone(&chrono::Utc))
1404 .unwrap_or_else(|_| chrono::Utc::now());
1405
1406 let conv = Conversation {
1407 id,
1408 source,
1409 machine_id,
1410 started_at,
1411 project_path,
1412 };
1413 let data = serde_json::to_string(&conv).ok();
1414 self.log_sync_entry(
1415 &conn,
1416 SyncOp::Create,
1417 SyncNodeType::Conversation,
1418 &conv.id.to_string(),
1419 data.as_deref(),
1420 )?;
1421 count += 1;
1422 }
1423
1424 Ok(count)
1425 }
1426}
1427
1428fn format_embedding(embedding: &[f32]) -> String {
1429 if embedding.is_empty() {
1430 let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
1431 return format!("[{}]", zeros.join(","));
1432 }
1433 let parts: Vec<String> = embedding.iter().map(|v| format!("{}", v)).collect();
1434 format!("[{}]", parts.join(","))
1435}
1436
1437fn escape_cypher(s: &str) -> String {
1438 s.replace('\\', "\\\\").replace('\'', "\\'")
1439}
1440
1441fn format_string_array(items: &[String]) -> String {
1442 let parts: Vec<String> = items
1443 .iter()
1444 .map(|s| format!("'{}'", s.replace('\'', "''")))
1445 .collect();
1446 format!("[{}]", parts.join(","))
1447}
1448
1449fn value_to_string(val: &Value) -> String {
1450 match val {
1451 Value::String(s) => s.clone(),
1452 other => format!("{:?}", other),
1453 }
1454}
1455
1456fn value_to_f32(val: &Value) -> f32 {
1457 match val {
1458 Value::Float(f) => *f,
1459 Value::Double(d) => *d as f32,
1460 Value::Int64(i) => *i as f32,
1461 _ => 0.0,
1462 }
1463}
1464
1465fn value_to_f32_vec(val: &Value) -> Vec<f32> {
1466 match val {
1467 Value::Array(_, items) | Value::List(_, items) => items.iter().map(value_to_f32).collect(),
1468 _ => Vec::new(),
1469 }
1470}
1471
1472fn row_to_memory(row: &[Value]) -> Result<Memory> {
1473 let id_str = value_to_string(&row[0]);
1474 let id = Uuid::parse_str(&id_str).unwrap_or_default();
1475 let content = value_to_string(&row[1]);
1476 let memory_type_str = value_to_string(&row[2]);
1477 let confidence = value_to_f32(&row[3]);
1478 let created_at_str = value_to_string(&row[4]);
1479 let last_accessed_str = value_to_string(&row[5]);
1480 let access_count = match &row[6] {
1481 Value::Int64(i) => *i as u32,
1482 _ => 0,
1483 };
1484 let source = value_to_string(&row[7]);
1485 let source_id = value_to_string(&row[8]);
1486 let project_path = project_path_from_db(&value_to_string(&row[9]));
1487
1488 let memory_type = match memory_type_str.as_str() {
1489 "episodic" => MemoryType::Episodic,
1490 "procedural" => MemoryType::Procedural,
1491 "decision" => MemoryType::Decision,
1492 "architecture" => MemoryType::Architecture,
1493 "debugging" => MemoryType::Debugging,
1494 "task" => MemoryType::Task,
1495 "question" => MemoryType::Question,
1496 _ => MemoryType::Semantic,
1497 };
1498
1499 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
1500 .map(|dt| dt.with_timezone(&chrono::Utc))
1501 .unwrap_or_else(|_| chrono::Utc::now());
1502
1503 let last_accessed = chrono::DateTime::parse_from_rfc3339(&last_accessed_str)
1504 .map(|dt| dt.with_timezone(&chrono::Utc))
1505 .unwrap_or_else(|_| chrono::Utc::now());
1506
1507 Ok(Memory {
1508 id,
1509 content,
1510 embedding: Vec::new(),
1511 memory_type,
1512 confidence,
1513 created_at,
1514 last_accessed,
1515 access_count,
1516 source,
1517 source_id,
1518 project_path,
1519 })
1520}
1521
1522fn row_to_entity(row: &[Value]) -> Result<Entity> {
1523 let id_str = value_to_string(&row[0]);
1524 let id = Uuid::parse_str(&id_str).unwrap_or_default();
1525 let name = value_to_string(&row[1]);
1526 let entity_type = value_to_string(&row[2]);
1527
1528 Ok(Entity {
1529 id,
1530 name,
1531 entity_type,
1532 embedding: Vec::new(),
1533 aliases: Vec::new(),
1534 })
1535}
1536
1537unsafe impl Send for KuzuStore {}
1538unsafe impl Sync for KuzuStore {}