Skip to main content

second_brain_core/
kuzu_store.rs

1use std::path::Path;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use anyhow::{Context, Result};
5use chrono::Utc;
6use kuzu::{Connection, Database, SystemConfig, Value};
7use uuid::Uuid;
8
9use crate::schema::{
10    Conversation, Entity, Memory, MemoryType, Relation, RelationType, SyncEntry, SyncNodeType,
11    SyncOp, SyncState,
12};
13use crate::store::Store;
14
15pub struct KuzuStore {
16    db: Database,
17    machine_id: String,
18    sync_seq: AtomicU64,
19}
20
21impl KuzuStore {
22    pub fn open(path: &Path, machine_id: String) -> Result<Self> {
23        let db = Database::new(path, SystemConfig::default()).context("opening KùzuDB database")?;
24        let store = Self {
25            db,
26            machine_id,
27            sync_seq: AtomicU64::new(1),
28        };
29        store.init_schema()?;
30        store.init_sync_seq()?;
31        Ok(store)
32    }
33
34    pub fn in_memory(machine_id: String) -> Result<Self> {
35        let db =
36            Database::in_memory(SystemConfig::default()).context("creating in-memory KùzuDB")?;
37        let store = Self {
38            db,
39            machine_id,
40            sync_seq: AtomicU64::new(1),
41        };
42        store.init_schema()?;
43        Ok(store)
44    }
45
46    pub fn machine_id(&self) -> &str {
47        &self.machine_id
48    }
49
50    fn conn(&self) -> Result<Connection<'_>> {
51        Connection::new(&self.db).context("creating connection")
52    }
53
54    pub fn diagnostic(&self) -> Result<String> {
55        let conn = self.conn()?;
56
57        let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
58        let total: i64 = result
59            .next()
60            .map(|r| match &r[0] {
61                Value::Int64(v) => *v,
62                _ => -1,
63            })
64            .unwrap_or(-1);
65
66        let mut result = conn.query(
67            "MATCH (m:Memory) WHERE size(m.embedding) > 0 AND m.embedding[1] <> 0.0 RETURN count(m);",
68        )?;
69        let with_emb: i64 = result
70            .next()
71            .map(|r| match &r[0] {
72                Value::Int64(v) => *v,
73                _ => -1,
74            })
75            .unwrap_or(-1);
76
77        let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
78        let zeros_str = format!("[{}]", zeros.join(","));
79        let idx_query = format!(
80            "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, 1) YIELD node, distance RETURN distance;",
81            zeros_str
82        );
83        let idx_status = match conn.query(&idx_query) {
84            Ok(mut r) => match r.next() {
85                Some(row) => format!("ok (distance: {:?})", &row[0]),
86                None => "ok (0 results)".to_string(),
87            },
88            Err(e) => format!("error: {e}"),
89        };
90
91        Ok(format!(
92            "total memories: {total}\nwith embeddings: {with_emb}\nvector index: {idx_status}"
93        ))
94    }
95
96    fn init_schema(&self) -> Result<()> {
97        let conn = self.conn()?;
98
99        conn.query(
100            "CREATE NODE TABLE IF NOT EXISTS Memory(
101                id STRING PRIMARY KEY,
102                content STRING,
103                embedding FLOAT[384],
104                memory_type STRING,
105                confidence FLOAT,
106                created_at STRING,
107                last_accessed STRING,
108                access_count INT64,
109                source STRING,
110                source_id STRING
111            );",
112        )
113        .context("creating Memory table")?;
114
115        conn.query(
116            "CREATE NODE TABLE IF NOT EXISTS Entity(
117                id STRING PRIMARY KEY,
118                name STRING,
119                entity_type STRING,
120                embedding FLOAT[384],
121                aliases STRING[]
122            );",
123        )
124        .context("creating Entity table")?;
125
126        conn.query(
127            "CREATE NODE TABLE IF NOT EXISTS Conversation(
128                id STRING PRIMARY KEY,
129                source STRING,
130                machine_id STRING,
131                started_at STRING,
132                project_path STRING
133            );",
134        )
135        .context("creating Conversation table")?;
136
137        conn.query(
138            "CREATE NODE TABLE IF NOT EXISTS IngestLog(
139                file_path STRING PRIMARY KEY,
140                file_hash STRING,
141                ingested_at STRING,
142                memory_count INT64,
143                source STRING
144            );",
145        )
146        .context("creating IngestLog table")?;
147
148        conn.query(
149            "CREATE REL TABLE IF NOT EXISTS RELATES_TO(
150                FROM Memory TO Memory,
151                strength FLOAT,
152                context STRING
153            );",
154        )
155        .context("creating RELATES_TO rel")?;
156
157        conn.query(
158            "CREATE REL TABLE IF NOT EXISTS MENTIONS(
159                FROM Memory TO Entity,
160                position INT64
161            );",
162        )
163        .context("creating MENTIONS rel")?;
164
165        conn.query(
166            "CREATE REL TABLE IF NOT EXISTS DERIVED_FROM(
167                FROM Memory TO Conversation,
168                transformation STRING
169            );",
170        )
171        .context("creating DERIVED_FROM rel")?;
172
173        conn.query(
174            "CREATE REL TABLE IF NOT EXISTS DISTILLED_FROM(
175                FROM Memory TO Memory,
176                model STRING
177            );",
178        )
179        .context("creating DISTILLED_FROM rel")?;
180
181        conn.query(
182            "CREATE REL TABLE IF NOT EXISTS CONTRADICTS(
183                FROM Memory TO Memory,
184                resolution STRING
185            );",
186        )
187        .context("creating CONTRADICTS rel")?;
188
189        conn.query(
190            "CREATE REL TABLE IF NOT EXISTS REINFORCES(
191                FROM Memory TO Memory
192            );",
193        )
194        .context("creating REINFORCES rel")?;
195
196        conn.query(
197            "CREATE REL TABLE IF NOT EXISTS SUPERSEDES(
198                FROM Memory TO Memory,
199                reason STRING
200            );",
201        )
202        .context("creating SUPERSEDES rel")?;
203
204        conn.query(
205            "CREATE NODE TABLE IF NOT EXISTS SyncLog(
206                seq INT64 PRIMARY KEY,
207                op STRING,
208                node_type STRING,
209                node_id STRING,
210                machine_id STRING,
211                timestamp STRING,
212                data STRING
213            );",
214        )
215        .context("creating SyncLog table")?;
216
217        conn.query(
218            "CREATE NODE TABLE IF NOT EXISTS SyncState(
219                peer_id STRING PRIMARY KEY,
220                last_seq INT64,
221                last_sync_at STRING
222            );",
223        )
224        .context("creating SyncState table")?;
225
226        conn.query(
227            "CREATE NODE TABLE IF NOT EXISTS ConsolidationLog(
228                memory_id STRING PRIMARY KEY,
229                distilled_id STRING,
230                consolidated_at STRING,
231                model STRING
232            );",
233        )
234        .context("creating ConsolidationLog table")?;
235
236        conn.query(
237            "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
238        )
239        .ok();
240
241        Ok(())
242    }
243
244    fn init_sync_seq(&self) -> Result<()> {
245        let conn = self.conn()?;
246        let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.seq);")?;
247        if let Some(row) = result.next() {
248            if let Value::Int64(max_seq) = &row[0] {
249                self.sync_seq.store((*max_seq as u64) + 1, Ordering::SeqCst);
250            }
251        }
252        Ok(())
253    }
254
255    fn log_sync_entry(
256        &self,
257        conn: &Connection<'_>,
258        op: SyncOp,
259        node_type: SyncNodeType,
260        node_id: &str,
261        data: Option<&str>,
262    ) -> Result<()> {
263        let seq = self.sync_seq.fetch_add(1, Ordering::SeqCst);
264        let timestamp = chrono::Utc::now().to_rfc3339();
265        let op_str = match op {
266            SyncOp::Create => "create",
267            SyncOp::Update => "update",
268            SyncOp::Delete => "delete",
269        };
270        let nt_str = match node_type {
271            SyncNodeType::Memory => "memory",
272            SyncNodeType::Entity => "entity",
273            SyncNodeType::Conversation => "conversation",
274            SyncNodeType::Relation => "relation",
275        };
276        let machine_escaped = escape_cypher(&self.machine_id);
277        let node_id_escaped = escape_cypher(node_id);
278        let data_escaped = data.map(|d| escape_cypher(d)).unwrap_or_default();
279
280        let query = format!(
281            "CREATE (:SyncLog {{
282                seq: {seq},
283                op: '{op_str}',
284                node_type: '{nt_str}',
285                node_id: '{node_id_escaped}',
286                machine_id: '{machine_escaped}',
287                timestamp: '{timestamp}',
288                data: '{data_escaped}'
289            }});"
290        );
291        conn.query(&query)?;
292        Ok(())
293    }
294}
295
296impl Store for KuzuStore {
297    fn store_memory(&self, memory: &Memory) -> Result<()> {
298        let conn = self.conn()?;
299        self.create_memory_node(&conn, memory)?;
300        let data = serde_json::to_string(memory).ok();
301        self.log_sync_entry(
302            &conn,
303            SyncOp::Create,
304            SyncNodeType::Memory,
305            &memory.id.to_string(),
306            data.as_deref(),
307        )?;
308        Ok(())
309    }
310
311    fn get_memory(&self, id: Uuid) -> Result<Option<Memory>> {
312        let conn = self.conn()?;
313        let query = format!(
314            "MATCH (m:Memory {{id: '{}'}}) RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id;",
315            id
316        );
317
318        let mut result = conn.query(&query)?;
319        match result.next() {
320            Some(row) => Ok(Some(row_to_memory(&row)?)),
321            None => Ok(None),
322        }
323    }
324
325    fn delete_memory(&self, id: Uuid) -> Result<()> {
326        let conn = self.conn()?;
327        conn.query(&format!(
328            "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
329            id
330        ))?;
331        self.log_sync_entry(
332            &conn,
333            SyncOp::Delete,
334            SyncNodeType::Memory,
335            &id.to_string(),
336            None,
337        )?;
338        Ok(())
339    }
340
341    fn store_entity(&self, entity: &Entity) -> Result<()> {
342        let conn = self.conn()?;
343        self.create_entity_node(&conn, entity)?;
344        let data = serde_json::to_string(entity).ok();
345        self.log_sync_entry(
346            &conn,
347            SyncOp::Create,
348            SyncNodeType::Entity,
349            &entity.id.to_string(),
350            data.as_deref(),
351        )?;
352        Ok(())
353    }
354
355    fn get_entity(&self, id: Uuid) -> Result<Option<Entity>> {
356        let conn = self.conn()?;
357        let mut result = conn.query(&format!(
358            "MATCH (e:Entity {{id: '{}'}}) RETURN e.id, e.name, e.entity_type;",
359            id
360        ))?;
361
362        match result.next() {
363            Some(row) => Ok(Some(row_to_entity(&row)?)),
364            None => Ok(None),
365        }
366    }
367
368    fn find_entity_by_name(&self, name: &str) -> Result<Option<Entity>> {
369        let conn = self.conn()?;
370        let name_escaped = escape_cypher(name);
371        let query = format!(
372            "MATCH (e:Entity) WHERE e.name = '{}' RETURN e.id, e.name, e.entity_type;",
373            name_escaped
374        );
375        let mut result = conn.query(&query)?;
376
377        match result.next() {
378            Some(row) => Ok(Some(row_to_entity(&row)?)),
379            None => Ok(None),
380        }
381    }
382
383    fn store_conversation(&self, conversation: &Conversation) -> Result<()> {
384        let conn = self.conn()?;
385        self.create_conversation_node(&conn, conversation)?;
386        let data = serde_json::to_string(conversation).ok();
387        self.log_sync_entry(
388            &conn,
389            SyncOp::Create,
390            SyncNodeType::Conversation,
391            &conversation.id.to_string(),
392            data.as_deref(),
393        )?;
394        Ok(())
395    }
396
397    fn store_relation(&self, relation: &Relation) -> Result<()> {
398        let conn = self.conn()?;
399        self.create_relation_edge(&conn, relation)?;
400        let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
401        let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
402        let data = serde_json::to_string(relation).ok();
403        self.log_sync_entry(
404            &conn,
405            SyncOp::Create,
406            SyncNodeType::Relation,
407            &node_id,
408            data.as_deref(),
409        )?;
410        Ok(())
411    }
412
413    fn get_relations(
414        &self,
415        node_id: Uuid,
416        relation_type: Option<RelationType>,
417    ) -> Result<Vec<Relation>> {
418        let conn = self.conn()?;
419        let id = node_id.to_string();
420
421        let query = match relation_type {
422            Some(RelationType::RelatesTo) => format!(
423                "MATCH (a:Memory {{id: '{}'}})-[r:RELATES_TO]->(b:Memory) RETURN b.id, r.strength, r.context;",
424                id
425            ),
426            Some(RelationType::Contradicts) => format!(
427                "MATCH (a:Memory {{id: '{}'}})-[r:CONTRADICTS]->(b:Memory) RETURN b.id, r.resolution;",
428                id
429            ),
430            Some(RelationType::Reinforces) => format!(
431                "MATCH (a:Memory {{id: '{}'}})-[:REINFORCES]->(b:Memory) RETURN b.id;",
432                id
433            ),
434            Some(RelationType::Supersedes) => format!(
435                "MATCH (a:Memory {{id: '{}'}})-[r:SUPERSEDES]->(b:Memory) RETURN b.id, r.reason;",
436                id
437            ),
438            Some(RelationType::Mentions) => format!(
439                "MATCH (a:Memory {{id: '{}'}})-[:MENTIONS]->(b:Entity) RETURN b.id;",
440                id
441            ),
442            Some(RelationType::DerivedFrom) => format!(
443                "MATCH (a:Memory {{id: '{}'}})-[:DERIVED_FROM]->(b:Conversation) RETURN b.id;",
444                id
445            ),
446            Some(RelationType::DistilledFrom) => format!(
447                "MATCH (a:Memory {{id: '{}'}})-[:DISTILLED_FROM]->(b:Memory) RETURN b.id;",
448                id
449            ),
450            None => {
451                let mut relations = Vec::new();
452                let mem_to_mem = [
453                    ("RELATES_TO", RelationType::RelatesTo),
454                    ("REINFORCES", RelationType::Reinforces),
455                    ("CONTRADICTS", RelationType::Contradicts),
456                    ("SUPERSEDES", RelationType::Supersedes),
457                    ("DISTILLED_FROM", RelationType::DistilledFrom),
458                ];
459                for (label, rt) in &mem_to_mem {
460                    let q = format!(
461                        "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:Memory) RETURN b.id;",
462                        id, label
463                    );
464                    if let Ok(mut result) = conn.query(&q) {
465                        for row in &mut result {
466                            let to_id_str = value_to_string(&row[0]);
467                            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
468                            relations.push(Relation {
469                                from_id: node_id,
470                                to_id,
471                                relation_type: *rt,
472                                strength: 1.0,
473                                context: None,
474                            });
475                        }
476                    }
477                }
478                for (label, target, rt) in [
479                    ("MENTIONS", "Entity", RelationType::Mentions),
480                    ("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
481                ] {
482                    let q = format!(
483                        "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:{}) RETURN b.id;",
484                        id, label, target
485                    );
486                    if let Ok(mut result) = conn.query(&q) {
487                        for row in &mut result {
488                            let to_id_str = value_to_string(&row[0]);
489                            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
490                            relations.push(Relation {
491                                from_id: node_id,
492                                to_id,
493                                relation_type: rt,
494                                strength: 1.0,
495                                context: None,
496                            });
497                        }
498                    }
499                }
500                return Ok(relations);
501            },
502        };
503
504        let mut result = conn.query(&query)?;
505        let mut relations = Vec::new();
506
507        for row in &mut result {
508            let to_id_str = value_to_string(&row[0]);
509            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
510
511            relations.push(Relation {
512                from_id: node_id,
513                to_id,
514                relation_type: relation_type.unwrap_or(RelationType::RelatesTo),
515                strength: 1.0,
516                context: None,
517            });
518        }
519
520        Ok(relations)
521    }
522
523    fn vector_search(&self, embedding: &[f32], limit: usize) -> Result<Vec<(Memory, f32)>> {
524        let conn = self.conn()?;
525        let embedding_str = format_embedding(embedding);
526
527        let query = format!(
528            "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, {}) YIELD node, distance
529             RETURN node.id, node.content, node.memory_type, node.confidence,
530                    node.created_at, node.last_accessed, node.access_count,
531                    node.source, node.source_id, distance;",
532            embedding_str, limit
533        );
534
535        let mut result = conn.query(&query)?;
536        let mut results = Vec::new();
537
538        for row in &mut result {
539            let memory = row_to_memory(&row[..9])?;
540            let distance = value_to_f32(&row[9]);
541            let similarity = 1.0 - distance;
542            results.push((memory, similarity));
543        }
544
545        Ok(results)
546    }
547
548    fn traverse(&self, start_id: Uuid, depth: u32) -> Result<Vec<(Memory, Vec<Relation>)>> {
549        let conn = self.conn()?;
550        let query = format!(
551            "MATCH (start:Memory {{id: '{}'}})-[r:RELATES_TO|REINFORCES*1..{}]->(m:Memory)
552             RETURN DISTINCT m.id, m.content, m.memory_type, m.confidence,
553                    m.created_at, m.last_accessed, m.access_count, m.source, m.source_id;",
554            start_id, depth
555        );
556
557        let mut result = conn.query(&query)?;
558        let mut results = Vec::new();
559
560        for row in &mut result {
561            let memory = row_to_memory(&row)?;
562            results.push((memory, Vec::new()));
563        }
564
565        Ok(results)
566    }
567
568    fn memories_by_source(&self, source: &str) -> Result<Vec<Memory>> {
569        let conn = self.conn()?;
570        let source_escaped = escape_cypher(source);
571        let query = format!(
572            "MATCH (m:Memory) WHERE m.source = '{}' RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id;",
573            source_escaped
574        );
575        let mut result = conn.query(&query)?;
576
577        let mut memories = Vec::new();
578        for row in &mut result {
579            memories.push(row_to_memory(&row)?);
580        }
581        Ok(memories)
582    }
583
584    fn memories_by_type(&self, memory_type: MemoryType) -> Result<Vec<Memory>> {
585        let conn = self.conn()?;
586        let type_str = format!("{:?}", memory_type).to_lowercase();
587        let query = format!(
588            "MATCH (m:Memory) WHERE m.memory_type = '{}' RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id;",
589            type_str
590        );
591        let mut result = conn.query(&query)?;
592
593        let mut memories = Vec::new();
594        for row in &mut result {
595            memories.push(row_to_memory(&row)?);
596        }
597        Ok(memories)
598    }
599
600    fn memories_needing_decay(&self, threshold_days: u32) -> Result<Vec<Memory>> {
601        let conn = self.conn()?;
602        let cutoff = chrono::Utc::now() - chrono::Duration::days(threshold_days as i64);
603        let cutoff_str = cutoff.to_rfc3339();
604
605        let query = format!(
606            "MATCH (m:Memory) WHERE m.last_accessed < '{}' AND m.confidence > 0.05 RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id;",
607            cutoff_str
608        );
609
610        let mut result = conn.query(&query)?;
611        let mut memories = Vec::new();
612        for row in &mut result {
613            memories.push(row_to_memory(&row)?);
614        }
615        Ok(memories)
616    }
617
618    fn update_memory(&self, memory: &Memory) -> Result<()> {
619        let conn = self.conn()?;
620        let id = memory.id.to_string();
621        let last_accessed = memory.last_accessed.to_rfc3339();
622
623        let query = format!(
624            "MATCH (m:Memory {{id: '{}'}}) SET m.confidence = {}, m.last_accessed = '{}', m.access_count = {};",
625            id, memory.confidence, last_accessed, memory.access_count
626        );
627
628        conn.query(&query)?;
629        let data = serde_json::to_string(memory).ok();
630        self.log_sync_entry(
631            &conn,
632            SyncOp::Update,
633            SyncNodeType::Memory,
634            &id,
635            data.as_deref(),
636        )?;
637        Ok(())
638    }
639
640    fn text_search(&self, query: &str, limit: usize) -> Result<Vec<Memory>> {
641        let conn = self.conn()?;
642        let query_escaped = escape_cypher(&query.to_lowercase());
643        let cypher = format!(
644            "MATCH (m:Memory) WHERE lower(m.content) CONTAINS '{}' RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id LIMIT {};",
645            query_escaped, limit
646        );
647
648        let mut result = conn.query(&cypher)?;
649        let mut memories = Vec::new();
650        for row in &mut result {
651            memories.push(row_to_memory(&row)?);
652        }
653        Ok(memories)
654    }
655
656    fn memory_count(&self) -> Result<usize> {
657        let conn = self.conn()?;
658        let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
659        match result.next() {
660            Some(row) => match &row[0] {
661                Value::Int64(n) => Ok(*n as usize),
662                _ => Ok(0),
663            },
664            None => Ok(0),
665        }
666    }
667}
668
669impl KuzuStore {
670    pub fn find_or_create_entity(&self, name: &str, entity_type: &str) -> Result<Entity> {
671        if let Some(existing) = self.find_entity_by_name(name)? {
672            return Ok(existing);
673        }
674        let entity = Entity::new(name.to_string(), entity_type.to_string());
675        self.store_entity(&entity)?;
676        Ok(entity)
677    }
678
679    pub fn memory_content_exists(&self, content_prefix: &str) -> Result<bool> {
680        let conn = self.conn()?;
681        let escaped = escape_cypher(content_prefix);
682        let query = format!(
683            "MATCH (m:Memory) WHERE starts_with(m.content, '{}') RETURN m.id LIMIT 1;",
684            escaped
685        );
686        let mut result = conn.query(&query)?;
687        Ok(result.next().is_some())
688    }
689
690    pub fn unconsolidated_memories(&self, limit: usize) -> Result<Vec<Memory>> {
691        let conn = self.conn()?;
692        let query = format!(
693            "MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (c:ConsolidationLog) WHERE c.memory_id = m.id}} AND m.source <> 'consolidation' RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id LIMIT {};",
694            limit
695        );
696        let mut result = conn.query(&query)?;
697        let mut memories = Vec::new();
698        for row in &mut result {
699            memories.push(row_to_memory(&row)?);
700        }
701        Ok(memories)
702    }
703
704    pub fn mark_consolidated(&self, raw_id: Uuid, distilled_id: Uuid, model: &str) -> Result<()> {
705        let conn = self.conn()?;
706        let now = Utc::now().to_rfc3339();
707        let query = format!(
708            "CREATE (:ConsolidationLog {{memory_id: '{}', distilled_id: '{}', consolidated_at: '{}', model: '{}'}});",
709            raw_id, distilled_id, now, escape_cypher(model)
710        );
711        conn.query(&query)?;
712        Ok(())
713    }
714
715    pub fn consolidation_count(&self) -> Result<usize> {
716        let conn = self.conn()?;
717        let mut result = conn.query("MATCH (c:ConsolidationLog) RETURN count(c);")?;
718        match result.next() {
719            Some(row) => match &row[0] {
720                Value::Int64(n) => Ok(*n as usize),
721                _ => Ok(0),
722            },
723            None => Ok(0),
724        }
725    }
726
727    pub fn clear_ingest_log(&self) -> Result<usize> {
728        let conn = self.conn()?;
729        let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
730        let count = match result.next() {
731            Some(row) => match &row[0] {
732                Value::Int64(n) => *n as usize,
733                _ => 0,
734            },
735            None => 0,
736        };
737        conn.query("MATCH (l:IngestLog) DETACH DELETE l;")?;
738        Ok(count)
739    }
740
741    pub fn delete_memories_by_source(&self, source: &str) -> Result<usize> {
742        let conn = self.conn()?;
743        let escaped = escape_cypher(source);
744        let mut result = conn.query(&format!(
745            "MATCH (m:Memory) WHERE m.source = '{}' RETURN count(m);",
746            escaped
747        ))?;
748        let count = match result.next() {
749            Some(row) => match &row[0] {
750                Value::Int64(n) => *n as usize,
751                _ => 0,
752            },
753            None => 0,
754        };
755        conn.query(&format!(
756            "MATCH (m:Memory) WHERE m.source = '{}' DETACH DELETE m;",
757            escaped
758        ))?;
759        Ok(count)
760    }
761
762    pub fn is_file_ingested(&self, file_path: &str) -> Result<bool> {
763        let conn = self.conn()?;
764        let escaped = escape_cypher(file_path);
765        let mut result = conn.query(&format!(
766            "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_path;",
767            escaped
768        ))?;
769        Ok(result.next().is_some())
770    }
771
772    pub fn is_file_changed(&self, file_path: &str, file_hash: &str) -> Result<bool> {
773        let conn = self.conn()?;
774        let escaped = escape_cypher(file_path);
775        let mut result = conn.query(&format!(
776            "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_hash;",
777            escaped
778        ))?;
779        match result.next() {
780            Some(row) => {
781                let stored_hash = value_to_string(&row[0]);
782                Ok(stored_hash != file_hash)
783            }
784            None => Ok(true),
785        }
786    }
787
788    pub fn mark_ingested(
789        &self,
790        file_path: &str,
791        file_hash: &str,
792        memory_count: usize,
793        source: &str,
794    ) -> Result<()> {
795        let conn = self.conn()?;
796        let path_escaped = escape_cypher(file_path);
797        let hash_escaped = escape_cypher(file_hash);
798        let source_escaped = escape_cypher(source);
799        let now = chrono::Utc::now().to_rfc3339();
800
801        conn.query(&format!(
802            "MERGE (l:IngestLog {{file_path: '{}'}})
803             SET l.file_hash = '{}', l.ingested_at = '{}', l.memory_count = {}, l.source = '{}';",
804            path_escaped, hash_escaped, now, memory_count as i64, source_escaped
805        ))?;
806        Ok(())
807    }
808
809    pub fn ingested_file_count(&self) -> Result<usize> {
810        let conn = self.conn()?;
811        let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
812        match result.next() {
813            Some(row) => match &row[0] {
814                Value::Int64(n) => Ok(*n as usize),
815                _ => Ok(0),
816            },
817            None => Ok(0),
818        }
819    }
820}
821
822impl KuzuStore {
823    fn create_memory_node(&self, conn: &Connection<'_>, memory: &Memory) -> Result<()> {
824        let id = memory.id.to_string();
825        let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
826        let created_at = memory.created_at.to_rfc3339();
827        let last_accessed = memory.last_accessed.to_rfc3339();
828        let embedding_str = format_embedding(&memory.embedding);
829        let content_escaped = escape_cypher(&memory.content);
830        let source_escaped = escape_cypher(&memory.source);
831        let source_id_escaped = escape_cypher(&memory.source_id);
832
833        let query = format!(
834            "CREATE (:Memory {{
835                id: '{id}',
836                content: '{content_escaped}',
837                embedding: {embedding_str},
838                memory_type: '{memory_type}',
839                confidence: {confidence},
840                created_at: '{created_at}',
841                last_accessed: '{last_accessed}',
842                access_count: {access_count},
843                source: '{source_escaped}',
844                source_id: '{source_id_escaped}'
845            }});",
846            confidence = memory.confidence,
847            access_count = memory.access_count,
848        );
849
850        conn.query(&query)?;
851        Ok(())
852    }
853
854    fn create_entity_node(&self, conn: &Connection<'_>, entity: &Entity) -> Result<()> {
855        let id = entity.id.to_string();
856        let embedding_str = format_embedding(&entity.embedding);
857        let aliases_str = format_string_array(&entity.aliases);
858        let name_escaped = escape_cypher(&entity.name);
859        let etype_escaped = escape_cypher(&entity.entity_type);
860
861        let query = format!(
862            "CREATE (:Entity {{
863                id: '{id}',
864                name: '{name_escaped}',
865                entity_type: '{etype_escaped}',
866                embedding: {embedding_str},
867                aliases: {aliases_str}
868            }});"
869        );
870
871        conn.query(&query)?;
872        Ok(())
873    }
874
875    fn create_conversation_node(
876        &self,
877        conn: &Connection<'_>,
878        conversation: &Conversation,
879    ) -> Result<()> {
880        let id = conversation.id.to_string();
881        let started_at = conversation.started_at.to_rfc3339();
882        let source_escaped = escape_cypher(&conversation.source);
883        let machine_escaped = escape_cypher(&conversation.machine_id);
884        let project_escaped = escape_cypher(conversation.project_path.as_deref().unwrap_or(""));
885
886        let query = format!(
887            "CREATE (:Conversation {{
888                id: '{id}',
889                source: '{source_escaped}',
890                machine_id: '{machine_escaped}',
891                started_at: '{started_at}',
892                project_path: '{project_escaped}'
893            }});"
894        );
895
896        conn.query(&query)?;
897        Ok(())
898    }
899
900    fn create_relation_edge(&self, conn: &Connection<'_>, relation: &Relation) -> Result<()> {
901        let from_id = relation.from_id.to_string();
902        let to_id = relation.to_id.to_string();
903
904        let query = match relation.relation_type {
905            RelationType::RelatesTo => format!(
906                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:RELATES_TO {{strength: {}, context: '{}'}}]->(b);",
907                from_id, to_id, relation.strength, relation.context.as_deref().unwrap_or("")
908            ),
909            RelationType::Mentions => format!(
910                "MATCH (a:Memory {{id: '{}'}}), (b:Entity {{id: '{}'}}) CREATE (a)-[:MENTIONS {{position: 0}}]->(b);",
911                from_id, to_id
912            ),
913            RelationType::DerivedFrom => format!(
914                "MATCH (a:Memory {{id: '{}'}}), (b:Conversation {{id: '{}'}}) CREATE (a)-[:DERIVED_FROM {{transformation: '{}'}}]->(b);",
915                from_id, to_id, relation.context.as_deref().unwrap_or("direct")
916            ),
917            RelationType::Contradicts => format!(
918                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:CONTRADICTS {{resolution: '{}'}}]->(b);",
919                from_id, to_id, relation.context.as_deref().unwrap_or("")
920            ),
921            RelationType::Reinforces => format!(
922                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:REINFORCES]->(b);",
923                from_id, to_id
924            ),
925            RelationType::DistilledFrom => format!(
926                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:DISTILLED_FROM {{model: '{}'}}]->(b);",
927                from_id, to_id, relation.context.as_deref().unwrap_or("")
928            ),
929            RelationType::Supersedes => format!(
930                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:SUPERSEDES {{reason: '{}'}}]->(b);",
931                from_id, to_id, relation.context.as_deref().unwrap_or("")
932            ),
933        };
934
935        conn.query(&query)?;
936        Ok(())
937    }
938}
939
940impl KuzuStore {
941    pub fn get_conversation(&self, id: Uuid) -> Result<Option<Conversation>> {
942        let conn = self.conn()?;
943        let mut result = conn.query(&format!(
944            "MATCH (c:Conversation {{id: '{}'}}) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
945            id
946        ))?;
947
948        match result.next() {
949            Some(row) => {
950                let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
951                let source = value_to_string(&row[1]);
952                let machine_id = value_to_string(&row[2]);
953                let started_at_str = value_to_string(&row[3]);
954                let project_path = {
955                    let s = value_to_string(&row[4]);
956                    if s.is_empty() {
957                        None
958                    } else {
959                        Some(s)
960                    }
961                };
962                let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
963                    .map(|dt| dt.with_timezone(&chrono::Utc))
964                    .unwrap_or_else(|_| chrono::Utc::now());
965
966                Ok(Some(Conversation {
967                    id,
968                    source,
969                    machine_id,
970                    started_at,
971                    project_path,
972                }))
973            }
974            None => Ok(None),
975        }
976    }
977
978    pub fn get_memory_with_embedding(&self, id: Uuid) -> Result<Option<Memory>> {
979        let conn = self.conn()?;
980        let query = format!(
981            "MATCH (m:Memory {{id: '{}'}}) RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id, m.embedding;",
982            id
983        );
984
985        let mut result = conn.query(&query)?;
986        match result.next() {
987            Some(row) => {
988                let mut memory = row_to_memory(&row[..9])?;
989                memory.embedding = value_to_f32_vec(&row[9]);
990                Ok(Some(memory))
991            }
992            None => Ok(None),
993        }
994    }
995
996    pub fn import_memory(&self, memory: &Memory) -> Result<()> {
997        let conn = self.conn()?;
998        self.create_memory_node(&conn, memory)
999    }
1000
1001    pub fn import_conversation(&self, conversation: &Conversation) -> Result<()> {
1002        let conn = self.conn()?;
1003        self.create_conversation_node(&conn, conversation)
1004    }
1005
1006    pub fn import_entity(&self, entity: &Entity) -> Result<()> {
1007        let conn = self.conn()?;
1008        self.create_entity_node(&conn, entity)
1009    }
1010
1011    pub fn import_relation(&self, relation: &Relation) -> Result<()> {
1012        let conn = self.conn()?;
1013        self.create_relation_edge(&conn, relation)
1014    }
1015
1016    pub fn import_delete_memory(&self, id: Uuid) -> Result<()> {
1017        let conn = self.conn()?;
1018        conn.query(&format!(
1019            "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
1020            id
1021        ))?;
1022        Ok(())
1023    }
1024
1025    pub fn import_or_update_memory(&self, memory: &Memory) -> Result<()> {
1026        let conn = self.conn()?;
1027        let id = memory.id.to_string();
1028        let last_accessed = memory.last_accessed.to_rfc3339();
1029        let content_escaped = escape_cypher(&memory.content);
1030        let embedding_str = format_embedding(&memory.embedding);
1031        let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
1032        let source_escaped = escape_cypher(&memory.source);
1033        let source_id_escaped = escape_cypher(&memory.source_id);
1034        let created_at = memory.created_at.to_rfc3339();
1035
1036        let query = format!(
1037            "MATCH (m:Memory {{id: '{id}'}}) SET m.content = '{content_escaped}', m.embedding = {embedding_str}, m.memory_type = '{memory_type}', m.confidence = {confidence}, m.created_at = '{created_at}', m.last_accessed = '{last_accessed}', m.access_count = {access_count}, m.source = '{source_escaped}', m.source_id = '{source_id_escaped}';",
1038            confidence = memory.confidence,
1039            access_count = memory.access_count,
1040        );
1041
1042        conn.query(&query)?;
1043        Ok(())
1044    }
1045
1046    pub fn sync_log_since(&self, after_seq: u64) -> Result<Vec<SyncEntry>> {
1047        self.sync_log_page(after_seq, None)
1048    }
1049
1050    pub fn sync_log_page(&self, after_seq: u64, limit: Option<usize>) -> Result<Vec<SyncEntry>> {
1051        let conn = self.conn()?;
1052        let machine_escaped = escape_cypher(&self.machine_id);
1053        let limit_clause = match limit {
1054            Some(n) => format!(" LIMIT {}", n),
1055            None => String::new(),
1056        };
1057        let query = format!(
1058            "MATCH (s:SyncLog) WHERE s.seq > {} AND s.machine_id = '{}' RETURN s.seq, s.op, s.node_type, s.node_id, s.machine_id, s.timestamp, s.data ORDER BY s.seq{};",
1059            after_seq, machine_escaped, limit_clause
1060        );
1061
1062        let mut result = conn.query(&query)?;
1063        let mut entries = Vec::new();
1064
1065        for row in &mut result {
1066            let seq = match &row[0] {
1067                Value::Int64(n) => *n as u64,
1068                _ => 0,
1069            };
1070            let op_str = value_to_string(&row[1]);
1071            let node_type_str = value_to_string(&row[2]);
1072            let node_id = value_to_string(&row[3]);
1073            let machine_id = value_to_string(&row[4]);
1074            let timestamp_str = value_to_string(&row[5]);
1075            let data_str = value_to_string(&row[6]);
1076
1077            let op = match op_str.as_str() {
1078                "create" => SyncOp::Create,
1079                "update" => SyncOp::Update,
1080                "delete" => SyncOp::Delete,
1081                _ => continue,
1082            };
1083            let node_type = match node_type_str.as_str() {
1084                "memory" => SyncNodeType::Memory,
1085                "entity" => SyncNodeType::Entity,
1086                "conversation" => SyncNodeType::Conversation,
1087                "relation" => SyncNodeType::Relation,
1088                _ => continue,
1089            };
1090            let timestamp = chrono::DateTime::parse_from_rfc3339(&timestamp_str)
1091                .map(|dt| dt.with_timezone(&chrono::Utc))
1092                .unwrap_or_else(|_| chrono::Utc::now());
1093
1094            let data = if data_str.is_empty() {
1095                None
1096            } else {
1097                Some(data_str)
1098            };
1099
1100            entries.push(SyncEntry {
1101                seq,
1102                op,
1103                node_type,
1104                node_id,
1105                machine_id,
1106                timestamp,
1107                data,
1108            });
1109        }
1110
1111        Ok(entries)
1112    }
1113
1114    pub fn get_sync_state(&self, peer_id: &str) -> Result<Option<SyncState>> {
1115        let conn = self.conn()?;
1116        let escaped = escape_cypher(peer_id);
1117        let mut result = conn.query(&format!(
1118            "MATCH (s:SyncState {{peer_id: '{}'}}) RETURN s.peer_id, s.last_seq, s.last_sync_at;",
1119            escaped
1120        ))?;
1121
1122        match result.next() {
1123            Some(row) => {
1124                let peer_id = value_to_string(&row[0]);
1125                let last_seq = match &row[1] {
1126                    Value::Int64(n) => *n as u64,
1127                    _ => 0,
1128                };
1129                let last_sync_at_str = value_to_string(&row[2]);
1130                let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1131                    .map(|dt| dt.with_timezone(&chrono::Utc))
1132                    .unwrap_or_else(|_| chrono::Utc::now());
1133
1134                Ok(Some(SyncState {
1135                    peer_id,
1136                    last_seq,
1137                    last_sync_at,
1138                }))
1139            }
1140            None => Ok(None),
1141        }
1142    }
1143
1144    pub fn set_sync_state(&self, state: &SyncState) -> Result<()> {
1145        let conn = self.conn()?;
1146        let peer_escaped = escape_cypher(&state.peer_id);
1147        let now = state.last_sync_at.to_rfc3339();
1148
1149        conn.query(&format!(
1150            "MERGE (s:SyncState {{peer_id: '{}'}}) SET s.last_seq = {}, s.last_sync_at = '{}';",
1151            peer_escaped, state.last_seq, now
1152        ))?;
1153        Ok(())
1154    }
1155
1156    pub fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
1157        let conn = self.conn()?;
1158        let mut result =
1159            conn.query("MATCH (s:SyncState) RETURN s.peer_id, s.last_seq, s.last_sync_at;")?;
1160        let mut states = Vec::new();
1161
1162        for row in &mut result {
1163            let peer_id = value_to_string(&row[0]);
1164            let last_seq = match &row[1] {
1165                Value::Int64(n) => *n as u64,
1166                _ => 0,
1167            };
1168            let last_sync_at_str = value_to_string(&row[2]);
1169            let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1170                .map(|dt| dt.with_timezone(&chrono::Utc))
1171                .unwrap_or_else(|_| chrono::Utc::now());
1172
1173            states.push(SyncState {
1174                peer_id,
1175                last_seq,
1176                last_sync_at,
1177            });
1178        }
1179
1180        Ok(states)
1181    }
1182
1183    pub fn backfill_sync_log(&self) -> Result<u64> {
1184        let conn = self.conn()?;
1185        let mut count = 0u64;
1186
1187        let mut result = conn.query(
1188            "MATCH (m:Memory) RETURN m.id, m.content, m.memory_type, m.confidence, m.created_at, m.last_accessed, m.access_count, m.source, m.source_id, m.embedding;",
1189        )?;
1190        for row in &mut result {
1191            let mut memory = row_to_memory(&row[..9])?;
1192            memory.embedding = value_to_f32_vec(&row[9]);
1193            let data = serde_json::to_string(&memory).ok();
1194            self.log_sync_entry(
1195                &conn,
1196                SyncOp::Create,
1197                SyncNodeType::Memory,
1198                &memory.id.to_string(),
1199                data.as_deref(),
1200            )?;
1201            count += 1;
1202        }
1203
1204        let mut result = conn.query(
1205            "MATCH (c:Conversation) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1206        )?;
1207        for row in &mut result {
1208            let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1209            let source = value_to_string(&row[1]);
1210            let machine_id = value_to_string(&row[2]);
1211            let started_at_str = value_to_string(&row[3]);
1212            let project_path = {
1213                let s = value_to_string(&row[4]);
1214                if s.is_empty() {
1215                    None
1216                } else {
1217                    Some(s)
1218                }
1219            };
1220            let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1221                .map(|dt| dt.with_timezone(&chrono::Utc))
1222                .unwrap_or_else(|_| chrono::Utc::now());
1223
1224            let conv = Conversation {
1225                id,
1226                source,
1227                machine_id,
1228                started_at,
1229                project_path,
1230            };
1231            let data = serde_json::to_string(&conv).ok();
1232            self.log_sync_entry(
1233                &conn,
1234                SyncOp::Create,
1235                SyncNodeType::Conversation,
1236                &conv.id.to_string(),
1237                data.as_deref(),
1238            )?;
1239            count += 1;
1240        }
1241
1242        Ok(count)
1243    }
1244}
1245
1246fn format_embedding(embedding: &[f32]) -> String {
1247    if embedding.is_empty() {
1248        let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
1249        return format!("[{}]", zeros.join(","));
1250    }
1251    let parts: Vec<String> = embedding.iter().map(|v| format!("{}", v)).collect();
1252    format!("[{}]", parts.join(","))
1253}
1254
1255fn escape_cypher(s: &str) -> String {
1256    s.replace('\\', "\\\\").replace('\'', "\\'")
1257}
1258
1259fn format_string_array(items: &[String]) -> String {
1260    let parts: Vec<String> = items
1261        .iter()
1262        .map(|s| format!("'{}'", s.replace('\'', "''")))
1263        .collect();
1264    format!("[{}]", parts.join(","))
1265}
1266
1267fn value_to_string(val: &Value) -> String {
1268    match val {
1269        Value::String(s) => s.clone(),
1270        other => format!("{:?}", other),
1271    }
1272}
1273
1274fn value_to_f32(val: &Value) -> f32 {
1275    match val {
1276        Value::Float(f) => *f,
1277        Value::Double(d) => *d as f32,
1278        Value::Int64(i) => *i as f32,
1279        _ => 0.0,
1280    }
1281}
1282
1283fn value_to_f32_vec(val: &Value) -> Vec<f32> {
1284    match val {
1285        Value::Array(_, items) | Value::List(_, items) => {
1286            items.iter().map(|v| value_to_f32(v)).collect()
1287        }
1288        _ => Vec::new(),
1289    }
1290}
1291
1292fn row_to_memory(row: &[Value]) -> Result<Memory> {
1293    let id_str = value_to_string(&row[0]);
1294    let id = Uuid::parse_str(&id_str).unwrap_or_default();
1295    let content = value_to_string(&row[1]);
1296    let memory_type_str = value_to_string(&row[2]);
1297    let confidence = value_to_f32(&row[3]);
1298    let created_at_str = value_to_string(&row[4]);
1299    let last_accessed_str = value_to_string(&row[5]);
1300    let access_count = match &row[6] {
1301        Value::Int64(i) => *i as u32,
1302        _ => 0,
1303    };
1304    let source = value_to_string(&row[7]);
1305    let source_id = value_to_string(&row[8]);
1306
1307    let memory_type = match memory_type_str.as_str() {
1308        "episodic" => MemoryType::Episodic,
1309        "procedural" => MemoryType::Procedural,
1310        _ => MemoryType::Semantic,
1311    };
1312
1313    let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
1314        .map(|dt| dt.with_timezone(&chrono::Utc))
1315        .unwrap_or_else(|_| chrono::Utc::now());
1316
1317    let last_accessed = chrono::DateTime::parse_from_rfc3339(&last_accessed_str)
1318        .map(|dt| dt.with_timezone(&chrono::Utc))
1319        .unwrap_or_else(|_| chrono::Utc::now());
1320
1321    Ok(Memory {
1322        id,
1323        content,
1324        embedding: Vec::new(),
1325        memory_type,
1326        confidence,
1327        created_at,
1328        last_accessed,
1329        access_count,
1330        source,
1331        source_id,
1332    })
1333}
1334
1335fn row_to_entity(row: &[Value]) -> Result<Entity> {
1336    let id_str = value_to_string(&row[0]);
1337    let id = Uuid::parse_str(&id_str).unwrap_or_default();
1338    let name = value_to_string(&row[1]);
1339    let entity_type = value_to_string(&row[2]);
1340
1341    Ok(Entity {
1342        id,
1343        name,
1344        entity_type,
1345        embedding: Vec::new(),
1346        aliases: Vec::new(),
1347    })
1348}
1349
1350unsafe impl Send for KuzuStore {}
1351unsafe impl Sync for KuzuStore {}