Skip to main content

second_brain_core/
kuzu_store.rs

1use std::path::Path;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use anyhow::{Context, Result};
5use chrono::{DateTime, Utc};
6use kuzu::{Connection, Database, SystemConfig, Value};
7use uuid::Uuid;
8
9use crate::schema::{
10    Conversation, Entity, Memory, MemoryType, Relation, RelationType, SyncEntry, SyncNodeType,
11    SyncOp, SyncState,
12};
13use crate::store::Store;
14
15pub struct KuzuStore {
16    db: Database,
17    machine_id: String,
18    sync_seq: AtomicU64,
19}
20
21fn memory_return_cols(alias: &str) -> String {
22    ["id", "content", "memory_type", "confidence", "created_at", "last_accessed", "access_count", "source", "source_id", "project_path", "machine_id"]
23        .iter()
24        .map(|col| format!("{alias}.{col}"))
25        .collect::<Vec<_>>()
26        .join(", ")
27}
28
29fn project_path_from_db(s: &str) -> Option<String> {
30    if s.is_empty() { None } else { Some(s.to_string()) }
31}
32
33impl KuzuStore {
34    pub fn open(path: &Path, machine_id: String) -> Result<Self> {
35        let db = Database::new(path, SystemConfig::default()).context("opening KùzuDB database")?;
36        let store = Self {
37            db,
38            machine_id,
39            sync_seq: AtomicU64::new(1),
40        };
41        store.init_schema()?;
42        store.init_sync_seq()?;
43        Ok(store)
44    }
45
46    pub fn in_memory(machine_id: String) -> Result<Self> {
47        let db =
48            Database::in_memory(SystemConfig::default()).context("creating in-memory KùzuDB")?;
49        let store = Self {
50            db,
51            machine_id,
52            sync_seq: AtomicU64::new(1),
53        };
54        store.init_schema()?;
55        Ok(store)
56    }
57
58    pub fn machine_id(&self) -> &str {
59        &self.machine_id
60    }
61
62    fn conn(&self) -> Result<Connection<'_>> {
63        Connection::new(&self.db).context("creating connection")
64    }
65
66    pub fn diagnostic(&self) -> Result<String> {
67        let conn = self.conn()?;
68
69        let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
70        let total: i64 = result
71            .next()
72            .map(|r| match &r[0] {
73                Value::Int64(v) => *v,
74                _ => -1,
75            })
76            .unwrap_or(-1);
77
78        let mut result = conn.query(
79            "MATCH (m:Memory) WHERE size(m.embedding) > 0 AND m.embedding[1] <> 0.0 RETURN count(m);",
80        )?;
81        let with_emb: i64 = result
82            .next()
83            .map(|r| match &r[0] {
84                Value::Int64(v) => *v,
85                _ => -1,
86            })
87            .unwrap_or(-1);
88
89        let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
90        let zeros_str = format!("[{}]", zeros.join(","));
91        let idx_query = format!(
92            "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, 1) YIELD node, distance RETURN distance;",
93            zeros_str
94        );
95        let idx_status = match conn.query(&idx_query) {
96            Ok(mut r) => match r.next() {
97                Some(row) => format!("ok (distance: {:?})", &row[0]),
98                None => "ok (0 results)".to_string(),
99            },
100            Err(e) => format!("error: {e}"),
101        };
102
103        Ok(format!(
104            "total memories: {total}\nwith embeddings: {with_emb}\nvector index: {idx_status}"
105        ))
106    }
107
108    fn init_schema(&self) -> Result<()> {
109        let conn = self.conn()?;
110
111        conn.query(
112            "CREATE NODE TABLE IF NOT EXISTS Memory(
113                id STRING PRIMARY KEY,
114                content STRING,
115                embedding FLOAT[384],
116                memory_type STRING,
117                confidence FLOAT,
118                created_at STRING,
119                last_accessed STRING,
120                access_count INT64,
121                source STRING,
122                source_id STRING,
123                project_path STRING
124            );",
125        )
126        .context("creating Memory table")?;
127
128        conn.query("ALTER TABLE Memory ADD project_path STRING DEFAULT '';").ok();
129
130        conn.query(
131            "CREATE NODE TABLE IF NOT EXISTS Machine(
132                id STRING PRIMARY KEY,
133                name STRING
134            );",
135        )
136        .context("creating Machine table")?;
137
138        conn.query("ALTER TABLE Memory ADD machine_id STRING DEFAULT '';").ok();
139
140        conn.query(
141            "CREATE NODE TABLE IF NOT EXISTS Entity(
142                id STRING PRIMARY KEY,
143                name STRING,
144                entity_type STRING,
145                embedding FLOAT[384],
146                aliases STRING[]
147            );",
148        )
149        .context("creating Entity table")?;
150
151        conn.query(
152            "CREATE NODE TABLE IF NOT EXISTS Conversation(
153                id STRING PRIMARY KEY,
154                source STRING,
155                machine_id STRING,
156                started_at STRING,
157                project_path STRING
158            );",
159        )
160        .context("creating Conversation table")?;
161
162        conn.query(
163            "CREATE NODE TABLE IF NOT EXISTS IngestLog(
164                file_path STRING PRIMARY KEY,
165                file_hash STRING,
166                ingested_at STRING,
167                memory_count INT64,
168                source STRING
169            );",
170        )
171        .context("creating IngestLog table")?;
172
173        conn.query(
174            "CREATE REL TABLE IF NOT EXISTS RELATES_TO(
175                FROM Memory TO Memory,
176                strength FLOAT,
177                context STRING
178            );",
179        )
180        .context("creating RELATES_TO rel")?;
181
182        conn.query(
183            "CREATE REL TABLE IF NOT EXISTS MENTIONS(
184                FROM Memory TO Entity,
185                position INT64
186            );",
187        )
188        .context("creating MENTIONS rel")?;
189
190        conn.query(
191            "CREATE REL TABLE IF NOT EXISTS DERIVED_FROM(
192                FROM Memory TO Conversation,
193                transformation STRING
194            );",
195        )
196        .context("creating DERIVED_FROM rel")?;
197
198        conn.query(
199            "CREATE REL TABLE IF NOT EXISTS DISTILLED_FROM(
200                FROM Memory TO Memory,
201                model STRING
202            );",
203        )
204        .context("creating DISTILLED_FROM rel")?;
205
206        conn.query(
207            "CREATE REL TABLE IF NOT EXISTS CONTRADICTS(
208                FROM Memory TO Memory,
209                resolution STRING
210            );",
211        )
212        .context("creating CONTRADICTS rel")?;
213
214        conn.query(
215            "CREATE REL TABLE IF NOT EXISTS REINFORCES(
216                FROM Memory TO Memory
217            );",
218        )
219        .context("creating REINFORCES rel")?;
220
221        conn.query(
222            "CREATE REL TABLE IF NOT EXISTS SUPERSEDES(
223                FROM Memory TO Memory,
224                reason STRING
225            );",
226        )
227        .context("creating SUPERSEDES rel")?;
228
229        conn.query(
230            "CREATE NODE TABLE IF NOT EXISTS SyncLog(
231                seq INT64 PRIMARY KEY,
232                op STRING,
233                node_type STRING,
234                node_id STRING,
235                machine_id STRING,
236                timestamp STRING,
237                data STRING
238            );",
239        )
240        .context("creating SyncLog table")?;
241
242        conn.query(
243            "CREATE NODE TABLE IF NOT EXISTS SyncState(
244                peer_id STRING PRIMARY KEY,
245                last_seq INT64,
246                last_sync_at STRING
247            );",
248        )
249        .context("creating SyncState table")?;
250
251        conn.query(
252            "CREATE NODE TABLE IF NOT EXISTS ConsolidationLog(
253                memory_id STRING PRIMARY KEY,
254                distilled_id STRING,
255                consolidated_at STRING,
256                model STRING
257            );",
258        )
259        .context("creating ConsolidationLog table")?;
260
261        conn.query(
262            "CREATE NODE TABLE IF NOT EXISTS WikiExportLog(
263                id STRING PRIMARY KEY,
264                last_sync_seq INT64,
265                exported_at STRING,
266                vault_path STRING,
267                pages_created INT64,
268                pages_updated INT64,
269                memories_processed INT64
270            );",
271        )
272        .context("creating WikiExportLog table")?;
273
274        conn.query(
275            "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
276        )
277        .ok();
278
279        Ok(())
280    }
281
282    fn init_sync_seq(&self) -> Result<()> {
283        let conn = self.conn()?;
284        let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.seq);")?;
285        if let Some(row) = result.next()
286            && let Value::Int64(max_seq) = &row[0]
287        {
288            self.sync_seq.store((*max_seq as u64) + 1, Ordering::SeqCst);
289        }
290        Ok(())
291    }
292
293    fn log_sync_entry(
294        &self,
295        conn: &Connection<'_>,
296        op: SyncOp,
297        node_type: SyncNodeType,
298        node_id: &str,
299        data: Option<&str>,
300    ) -> Result<()> {
301        let seq = self.sync_seq.fetch_add(1, Ordering::SeqCst);
302        let timestamp = chrono::Utc::now().to_rfc3339();
303        let op_str = match op {
304            SyncOp::Create => "create",
305            SyncOp::Update => "update",
306            SyncOp::Delete => "delete",
307        };
308        let nt_str = match node_type {
309            SyncNodeType::Memory => "memory",
310            SyncNodeType::Entity => "entity",
311            SyncNodeType::Conversation => "conversation",
312            SyncNodeType::Relation => "relation",
313        };
314        let machine_escaped = escape_cypher(&self.machine_id);
315        let node_id_escaped = escape_cypher(node_id);
316        let data_escaped = data.map(escape_cypher).unwrap_or_default();
317
318        let query = format!(
319            "CREATE (:SyncLog {{
320                seq: {seq},
321                op: '{op_str}',
322                node_type: '{nt_str}',
323                node_id: '{node_id_escaped}',
324                machine_id: '{machine_escaped}',
325                timestamp: '{timestamp}',
326                data: '{data_escaped}'
327            }});"
328        );
329        conn.query(&query)?;
330        Ok(())
331    }
332}
333
334impl Store for KuzuStore {
335    fn store_memory(&self, memory: &Memory) -> Result<()> {
336        let conn = self.conn()?;
337        self.create_memory_node(&conn, memory)?;
338        let data = serde_json::to_string(memory).ok();
339        self.log_sync_entry(
340            &conn,
341            SyncOp::Create,
342            SyncNodeType::Memory,
343            &memory.id.to_string(),
344            data.as_deref(),
345        )?;
346        Ok(())
347    }
348
349    fn get_memory(&self, id: Uuid) -> Result<Option<Memory>> {
350        let conn = self.conn()?;
351        let query = format!(
352            "MATCH (m:Memory {{id: '{}'}}) RETURN {};",
353            id, memory_return_cols("m")
354        );
355
356        let mut result = conn.query(&query)?;
357        match result.next() {
358            Some(row) => Ok(Some(row_to_memory(&row)?)),
359            None => Ok(None),
360        }
361    }
362
363    fn delete_memory(&self, id: Uuid) -> Result<()> {
364        let conn = self.conn()?;
365        conn.query(&format!(
366            "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
367            id
368        ))?;
369        self.log_sync_entry(
370            &conn,
371            SyncOp::Delete,
372            SyncNodeType::Memory,
373            &id.to_string(),
374            None,
375        )?;
376        Ok(())
377    }
378
379    fn store_entity(&self, entity: &Entity) -> Result<()> {
380        let conn = self.conn()?;
381        self.create_entity_node(&conn, entity)?;
382        let data = serde_json::to_string(entity).ok();
383        self.log_sync_entry(
384            &conn,
385            SyncOp::Create,
386            SyncNodeType::Entity,
387            &entity.id.to_string(),
388            data.as_deref(),
389        )?;
390        Ok(())
391    }
392
393    fn get_entity(&self, id: Uuid) -> Result<Option<Entity>> {
394        let conn = self.conn()?;
395        let mut result = conn.query(&format!(
396            "MATCH (e:Entity {{id: '{}'}}) RETURN e.id, e.name, e.entity_type;",
397            id
398        ))?;
399
400        match result.next() {
401            Some(row) => Ok(Some(row_to_entity(&row)?)),
402            None => Ok(None),
403        }
404    }
405
406    fn find_entity_by_name(&self, name: &str) -> Result<Option<Entity>> {
407        let conn = self.conn()?;
408        let name_escaped = escape_cypher(name);
409        let query = format!(
410            "MATCH (e:Entity) WHERE e.name = '{}' RETURN e.id, e.name, e.entity_type;",
411            name_escaped
412        );
413        let mut result = conn.query(&query)?;
414
415        match result.next() {
416            Some(row) => Ok(Some(row_to_entity(&row)?)),
417            None => Ok(None),
418        }
419    }
420
421    fn store_conversation(&self, conversation: &Conversation) -> Result<()> {
422        let conn = self.conn()?;
423        self.create_conversation_node(&conn, conversation)?;
424        let data = serde_json::to_string(conversation).ok();
425        self.log_sync_entry(
426            &conn,
427            SyncOp::Create,
428            SyncNodeType::Conversation,
429            &conversation.id.to_string(),
430            data.as_deref(),
431        )?;
432        Ok(())
433    }
434
435    fn store_relation(&self, relation: &Relation) -> Result<()> {
436        let conn = self.conn()?;
437        self.create_relation_edge(&conn, relation)?;
438        let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
439        let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
440        let data = serde_json::to_string(relation).ok();
441        self.log_sync_entry(
442            &conn,
443            SyncOp::Create,
444            SyncNodeType::Relation,
445            &node_id,
446            data.as_deref(),
447        )?;
448        Ok(())
449    }
450
451    fn get_relations(
452        &self,
453        node_id: Uuid,
454        relation_type: Option<RelationType>,
455    ) -> Result<Vec<Relation>> {
456        let conn = self.conn()?;
457        let id = node_id.to_string();
458
459        let query = match relation_type {
460            Some(RelationType::RelatesTo) => format!(
461                "MATCH (a:Memory {{id: '{}'}})-[r:RELATES_TO]->(b:Memory) RETURN b.id, r.strength, r.context;",
462                id
463            ),
464            Some(RelationType::Contradicts) => format!(
465                "MATCH (a:Memory {{id: '{}'}})-[r:CONTRADICTS]->(b:Memory) RETURN b.id, r.resolution;",
466                id
467            ),
468            Some(RelationType::Reinforces) => format!(
469                "MATCH (a:Memory {{id: '{}'}})-[:REINFORCES]->(b:Memory) RETURN b.id;",
470                id
471            ),
472            Some(RelationType::Supersedes) => format!(
473                "MATCH (a:Memory {{id: '{}'}})-[r:SUPERSEDES]->(b:Memory) RETURN b.id, r.reason;",
474                id
475            ),
476            Some(RelationType::Mentions) => format!(
477                "MATCH (a:Memory {{id: '{}'}})-[:MENTIONS]->(b:Entity) RETURN b.id;",
478                id
479            ),
480            Some(RelationType::DerivedFrom) => format!(
481                "MATCH (a:Memory {{id: '{}'}})-[:DERIVED_FROM]->(b:Conversation) RETURN b.id;",
482                id
483            ),
484            Some(RelationType::DistilledFrom) => format!(
485                "MATCH (a:Memory {{id: '{}'}})-[:DISTILLED_FROM]->(b:Memory) RETURN b.id;",
486                id
487            ),
488            None => {
489                let mut relations = Vec::new();
490                let mem_to_mem = [
491                    ("RELATES_TO", RelationType::RelatesTo),
492                    ("REINFORCES", RelationType::Reinforces),
493                    ("CONTRADICTS", RelationType::Contradicts),
494                    ("SUPERSEDES", RelationType::Supersedes),
495                    ("DISTILLED_FROM", RelationType::DistilledFrom),
496                ];
497                for (label, rt) in &mem_to_mem {
498                    let q = format!(
499                        "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:Memory) RETURN b.id;",
500                        id, label
501                    );
502                    if let Ok(mut result) = conn.query(&q) {
503                        for row in &mut result {
504                            let to_id_str = value_to_string(&row[0]);
505                            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
506                            relations.push(Relation {
507                                from_id: node_id,
508                                to_id,
509                                relation_type: *rt,
510                                strength: 1.0,
511                                context: None,
512                            });
513                        }
514                    }
515                }
516                for (label, target, rt) in [
517                    ("MENTIONS", "Entity", RelationType::Mentions),
518                    ("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
519                ] {
520                    let q = format!(
521                        "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:{}) RETURN b.id;",
522                        id, label, target
523                    );
524                    if let Ok(mut result) = conn.query(&q) {
525                        for row in &mut result {
526                            let to_id_str = value_to_string(&row[0]);
527                            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
528                            relations.push(Relation {
529                                from_id: node_id,
530                                to_id,
531                                relation_type: rt,
532                                strength: 1.0,
533                                context: None,
534                            });
535                        }
536                    }
537                }
538                return Ok(relations);
539            }
540        };
541
542        let mut result = conn.query(&query)?;
543        let mut relations = Vec::new();
544
545        for row in &mut result {
546            let to_id_str = value_to_string(&row[0]);
547            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
548
549            relations.push(Relation {
550                from_id: node_id,
551                to_id,
552                relation_type: relation_type.unwrap_or(RelationType::RelatesTo),
553                strength: 1.0,
554                context: None,
555            });
556        }
557
558        Ok(relations)
559    }
560
561    fn vector_search(&self, embedding: &[f32], limit: usize) -> Result<Vec<(Memory, f32)>> {
562        let conn = self.conn()?;
563        let embedding_str = format_embedding(embedding);
564
565        let query = format!(
566            "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, {}) YIELD node, distance
567             RETURN {}, distance;",
568            embedding_str, limit, memory_return_cols("node")
569        );
570
571        let mut result = conn.query(&query)?;
572        let mut results = Vec::new();
573
574        for row in &mut result {
575            let memory = row_to_memory(&row[..11])?;
576            let distance = value_to_f32(&row[11]);
577            let similarity = 1.0 - distance;
578            results.push((memory, similarity));
579        }
580
581        Ok(results)
582    }
583
584    fn traverse(&self, start_id: Uuid, depth: u32) -> Result<Vec<(Memory, Vec<Relation>)>> {
585        let conn = self.conn()?;
586        let query = format!(
587            "MATCH (start:Memory {{id: '{}'}})-[r:RELATES_TO|REINFORCES*1..{}]->(m:Memory)
588             RETURN DISTINCT {};",
589            start_id, depth, memory_return_cols("m")
590        );
591
592        let mut result = conn.query(&query)?;
593        let mut results = Vec::new();
594
595        for row in &mut result {
596            let memory = row_to_memory(&row)?;
597            results.push((memory, Vec::new()));
598        }
599
600        Ok(results)
601    }
602
603    fn memories_by_source(&self, source: &str) -> Result<Vec<Memory>> {
604        let conn = self.conn()?;
605        let source_escaped = escape_cypher(source);
606        let query = format!(
607            "MATCH (m:Memory) WHERE m.source = '{}' RETURN {};",
608            source_escaped, memory_return_cols("m")
609        );
610        let mut result = conn.query(&query)?;
611
612        let mut memories = Vec::new();
613        for row in &mut result {
614            memories.push(row_to_memory(&row)?);
615        }
616        Ok(memories)
617    }
618
619    fn memories_by_type(&self, memory_type: MemoryType) -> Result<Vec<Memory>> {
620        let conn = self.conn()?;
621        let type_str = format!("{:?}", memory_type).to_lowercase();
622        let query = format!(
623            "MATCH (m:Memory) WHERE m.memory_type = '{}' RETURN {};",
624            type_str, memory_return_cols("m")
625        );
626        let mut result = conn.query(&query)?;
627
628        let mut memories = Vec::new();
629        for row in &mut result {
630            memories.push(row_to_memory(&row)?);
631        }
632        Ok(memories)
633    }
634
635    fn memories_needing_decay(&self, threshold_days: u32) -> Result<Vec<Memory>> {
636        let conn = self.conn()?;
637        let cutoff = chrono::Utc::now() - chrono::Duration::days(threshold_days as i64);
638        let cutoff_str = cutoff.to_rfc3339();
639
640        let query = format!(
641            "MATCH (m:Memory) WHERE m.last_accessed < '{}' AND m.confidence > 0.05 RETURN {};",
642            cutoff_str, memory_return_cols("m")
643        );
644
645        let mut result = conn.query(&query)?;
646        let mut memories = Vec::new();
647        for row in &mut result {
648            memories.push(row_to_memory(&row)?);
649        }
650        Ok(memories)
651    }
652
653    fn update_memory(&self, memory: &Memory) -> Result<()> {
654        let conn = self.conn()?;
655        let id = memory.id.to_string();
656        let last_accessed = memory.last_accessed.to_rfc3339();
657        let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
658
659        let query = format!(
660            "MATCH (m:Memory {{id: '{}'}}) SET m.confidence = {}, m.last_accessed = '{}', m.access_count = {}, m.project_path = '{}';",
661            id, memory.confidence, last_accessed, memory.access_count, project_path_escaped
662        );
663
664        conn.query(&query)?;
665        let data = serde_json::to_string(memory).ok();
666        self.log_sync_entry(
667            &conn,
668            SyncOp::Update,
669            SyncNodeType::Memory,
670            &id,
671            data.as_deref(),
672        )?;
673        Ok(())
674    }
675
676    fn text_search(&self, query: &str, limit: usize) -> Result<Vec<Memory>> {
677        let conn = self.conn()?;
678        let query_escaped = escape_cypher(&query.to_lowercase());
679        let cypher = format!(
680            "MATCH (m:Memory) WHERE lower(m.content) CONTAINS '{}' RETURN {} LIMIT {};",
681            query_escaped, memory_return_cols("m"), limit
682        );
683
684        let mut result = conn.query(&cypher)?;
685        let mut memories = Vec::new();
686        for row in &mut result {
687            memories.push(row_to_memory(&row)?);
688        }
689        Ok(memories)
690    }
691
692    fn memory_count(&self) -> Result<usize> {
693        let conn = self.conn()?;
694        let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
695        match result.next() {
696            Some(row) => match &row[0] {
697                Value::Int64(n) => Ok(*n as usize),
698                _ => Ok(0),
699            },
700            None => Ok(0),
701        }
702    }
703}
704
705impl KuzuStore {
706    pub fn find_or_create_entity(&self, name: &str, entity_type: &str) -> Result<Entity> {
707        if let Some(existing) = self.find_entity_by_name(name)? {
708            return Ok(existing);
709        }
710        let entity = Entity::new(name.to_string(), entity_type.to_string());
711        self.store_entity(&entity)?;
712        Ok(entity)
713    }
714
715    pub fn memory_content_exists(&self, content_prefix: &str) -> Result<bool> {
716        let conn = self.conn()?;
717        let escaped = escape_cypher(content_prefix);
718        let query = format!(
719            "MATCH (m:Memory) WHERE starts_with(m.content, '{}') RETURN m.id LIMIT 1;",
720            escaped
721        );
722        let mut result = conn.query(&query)?;
723        Ok(result.next().is_some())
724    }
725
726    pub fn unconsolidated_memories(&self, limit: usize) -> Result<Vec<Memory>> {
727        let conn = self.conn()?;
728        let query = format!(
729            "MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (c:ConsolidationLog) WHERE c.memory_id = m.id}} AND m.source <> 'consolidation' RETURN {} LIMIT {};",
730            memory_return_cols("m"), limit
731        );
732        let mut result = conn.query(&query)?;
733        let mut memories = Vec::new();
734        for row in &mut result {
735            memories.push(row_to_memory(&row)?);
736        }
737        Ok(memories)
738    }
739
740    pub fn memories_created_between(
741        &self,
742        start: &DateTime<Utc>,
743        end: &DateTime<Utc>,
744    ) -> Result<Vec<Memory>> {
745        let conn = self.conn()?;
746        let query = format!(
747            "MATCH (m:Memory) WHERE m.created_at >= '{}' AND m.created_at < '{}' RETURN {} ORDER BY m.created_at ASC;",
748            start.to_rfc3339(),
749            end.to_rfc3339(),
750            memory_return_cols("m")
751        );
752        let mut result = conn.query(&query)?;
753        let mut memories = Vec::new();
754        for row in &mut result {
755            memories.push(row_to_memory(&row)?);
756        }
757        Ok(memories)
758    }
759
760    pub fn mark_consolidated(&self, raw_id: Uuid, distilled_id: Uuid, model: &str) -> Result<()> {
761        let conn = self.conn()?;
762        let now = Utc::now().to_rfc3339();
763        let query = format!(
764            "CREATE (:ConsolidationLog {{memory_id: '{}', distilled_id: '{}', consolidated_at: '{}', model: '{}'}});",
765            raw_id,
766            distilled_id,
767            now,
768            escape_cypher(model)
769        );
770        conn.query(&query)?;
771        Ok(())
772    }
773
774    pub fn consolidation_count(&self) -> Result<usize> {
775        let conn = self.conn()?;
776        let mut result = conn.query("MATCH (c:ConsolidationLog) RETURN count(c);")?;
777        match result.next() {
778            Some(row) => match &row[0] {
779                Value::Int64(n) => Ok(*n as usize),
780                _ => Ok(0),
781            },
782            None => Ok(0),
783        }
784    }
785
786    pub fn delete_consolidated_raw(&self) -> Result<usize> {
787        let conn = self.conn()?;
788        let result = conn.query(
789            "MATCH (c:ConsolidationLog) WHERE c.memory_id <> c.distilled_id RETURN c.memory_id;",
790        )?;
791        let mut ids = Vec::new();
792        for row in result {
793            if let Value::String(id) = &row[0]
794                && let Ok(uuid) = Uuid::parse_str(id)
795            {
796                ids.push(uuid);
797            }
798        }
799        let count = ids.len();
800        for id in &ids {
801            let escaped = escape_cypher(&id.to_string());
802            conn.query(&format!(
803                "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
804                escaped
805            ))
806            .ok();
807        }
808        Ok(count)
809    }
810
811    pub fn rebuild_vector_index(&self) -> Result<()> {
812        let conn = self.conn()?;
813        conn.query("CALL DROP_VECTOR_INDEX('Memory', 'memory_emb_idx');")
814            .ok();
815        conn.query(
816            "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
817        )?;
818        Ok(())
819    }
820
821    pub fn clear_ingest_log(&self) -> Result<usize> {
822        let conn = self.conn()?;
823        let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
824        let count = match result.next() {
825            Some(row) => match &row[0] {
826                Value::Int64(n) => *n as usize,
827                _ => 0,
828            },
829            None => 0,
830        };
831        conn.query("MATCH (l:IngestLog) DETACH DELETE l;")?;
832        Ok(count)
833    }
834
835    pub fn delete_memories_by_source(&self, source: &str) -> Result<usize> {
836        let conn = self.conn()?;
837        let escaped = escape_cypher(source);
838        let mut result = conn.query(&format!(
839            "MATCH (m:Memory) WHERE m.source = '{}' RETURN count(m);",
840            escaped
841        ))?;
842        let count = match result.next() {
843            Some(row) => match &row[0] {
844                Value::Int64(n) => *n as usize,
845                _ => 0,
846            },
847            None => 0,
848        };
849        conn.query(&format!(
850            "MATCH (m:Memory) WHERE m.source = '{}' DETACH DELETE m;",
851            escaped
852        ))?;
853        Ok(count)
854    }
855
856    pub fn is_file_ingested(&self, file_path: &str) -> Result<bool> {
857        let conn = self.conn()?;
858        let escaped = escape_cypher(file_path);
859        let mut result = conn.query(&format!(
860            "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_path;",
861            escaped
862        ))?;
863        Ok(result.next().is_some())
864    }
865
866    pub fn is_file_changed(&self, file_path: &str, file_hash: &str) -> Result<bool> {
867        let conn = self.conn()?;
868        let escaped = escape_cypher(file_path);
869        let mut result = conn.query(&format!(
870            "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_hash;",
871            escaped
872        ))?;
873        match result.next() {
874            Some(row) => {
875                let stored_hash = value_to_string(&row[0]);
876                Ok(stored_hash != file_hash)
877            }
878            None => Ok(true),
879        }
880    }
881
882    pub fn mark_ingested(
883        &self,
884        file_path: &str,
885        file_hash: &str,
886        memory_count: usize,
887        source: &str,
888    ) -> Result<()> {
889        let conn = self.conn()?;
890        let path_escaped = escape_cypher(file_path);
891        let hash_escaped = escape_cypher(file_hash);
892        let source_escaped = escape_cypher(source);
893        let now = chrono::Utc::now().to_rfc3339();
894
895        conn.query(&format!(
896            "MERGE (l:IngestLog {{file_path: '{}'}})
897             SET l.file_hash = '{}', l.ingested_at = '{}', l.memory_count = {}, l.source = '{}';",
898            path_escaped, hash_escaped, now, memory_count as i64, source_escaped
899        ))?;
900        Ok(())
901    }
902
903    pub fn ingested_file_count(&self) -> Result<usize> {
904        let conn = self.conn()?;
905        let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
906        match result.next() {
907            Some(row) => match &row[0] {
908                Value::Int64(n) => Ok(*n as usize),
909                _ => Ok(0),
910            },
911            None => Ok(0),
912        }
913    }
914}
915
916impl KuzuStore {
917    fn create_memory_node(&self, conn: &Connection<'_>, memory: &Memory) -> Result<()> {
918        let id = memory.id.to_string();
919        let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
920        let created_at = memory.created_at.to_rfc3339();
921        let last_accessed = memory.last_accessed.to_rfc3339();
922        let embedding_str = format_embedding(&memory.embedding);
923        let content_escaped = escape_cypher(&memory.content);
924        let source_escaped = escape_cypher(&memory.source);
925        let source_id_escaped = escape_cypher(&memory.source_id);
926        let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
927        let machine_id_escaped = escape_cypher(&memory.machine_id);
928
929        let query = format!(
930            "CREATE (:Memory {{
931                id: '{id}',
932                content: '{content_escaped}',
933                embedding: {embedding_str},
934                memory_type: '{memory_type}',
935                confidence: {confidence},
936                created_at: '{created_at}',
937                last_accessed: '{last_accessed}',
938                access_count: {access_count},
939                source: '{source_escaped}',
940                source_id: '{source_id_escaped}',
941                project_path: '{project_path_escaped}',
942                machine_id: '{machine_id_escaped}'
943            }});",
944            confidence = memory.confidence,
945            access_count = memory.access_count,
946        );
947
948        conn.query(&query)?;
949        Ok(())
950    }
951
952    fn create_entity_node(&self, conn: &Connection<'_>, entity: &Entity) -> Result<()> {
953        let id = entity.id.to_string();
954        let embedding_str = format_embedding(&entity.embedding);
955        let aliases_str = format_string_array(&entity.aliases);
956        let name_escaped = escape_cypher(&entity.name);
957        let etype_escaped = escape_cypher(&entity.entity_type);
958
959        let query = format!(
960            "CREATE (:Entity {{
961                id: '{id}',
962                name: '{name_escaped}',
963                entity_type: '{etype_escaped}',
964                embedding: {embedding_str},
965                aliases: {aliases_str}
966            }});"
967        );
968
969        conn.query(&query)?;
970        Ok(())
971    }
972
973    fn create_conversation_node(
974        &self,
975        conn: &Connection<'_>,
976        conversation: &Conversation,
977    ) -> Result<()> {
978        let id = conversation.id.to_string();
979        let started_at = conversation.started_at.to_rfc3339();
980        let source_escaped = escape_cypher(&conversation.source);
981        let machine_escaped = escape_cypher(&conversation.machine_id);
982        let project_escaped = escape_cypher(conversation.project_path.as_deref().unwrap_or(""));
983
984        let query = format!(
985            "CREATE (:Conversation {{
986                id: '{id}',
987                source: '{source_escaped}',
988                machine_id: '{machine_escaped}',
989                started_at: '{started_at}',
990                project_path: '{project_escaped}'
991            }});"
992        );
993
994        conn.query(&query)?;
995        Ok(())
996    }
997
998    fn create_relation_edge(&self, conn: &Connection<'_>, relation: &Relation) -> Result<()> {
999        let from_id = relation.from_id.to_string();
1000        let to_id = relation.to_id.to_string();
1001
1002        let query = match relation.relation_type {
1003            RelationType::RelatesTo => format!(
1004                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:RELATES_TO {{strength: {}, context: '{}'}}]->(b);",
1005                from_id,
1006                to_id,
1007                relation.strength,
1008                relation.context.as_deref().unwrap_or("")
1009            ),
1010            RelationType::Mentions => format!(
1011                "MATCH (a:Memory {{id: '{}'}}), (b:Entity {{id: '{}'}}) CREATE (a)-[:MENTIONS {{position: 0}}]->(b);",
1012                from_id, to_id
1013            ),
1014            RelationType::DerivedFrom => format!(
1015                "MATCH (a:Memory {{id: '{}'}}), (b:Conversation {{id: '{}'}}) CREATE (a)-[:DERIVED_FROM {{transformation: '{}'}}]->(b);",
1016                from_id,
1017                to_id,
1018                relation.context.as_deref().unwrap_or("direct")
1019            ),
1020            RelationType::Contradicts => format!(
1021                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:CONTRADICTS {{resolution: '{}'}}]->(b);",
1022                from_id,
1023                to_id,
1024                relation.context.as_deref().unwrap_or("")
1025            ),
1026            RelationType::Reinforces => format!(
1027                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:REINFORCES]->(b);",
1028                from_id, to_id
1029            ),
1030            RelationType::DistilledFrom => format!(
1031                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:DISTILLED_FROM {{model: '{}'}}]->(b);",
1032                from_id,
1033                to_id,
1034                relation.context.as_deref().unwrap_or("")
1035            ),
1036            RelationType::Supersedes => format!(
1037                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:SUPERSEDES {{reason: '{}'}}]->(b);",
1038                from_id,
1039                to_id,
1040                relation.context.as_deref().unwrap_or("")
1041            ),
1042        };
1043
1044        conn.query(&query)?;
1045        Ok(())
1046    }
1047}
1048
1049impl KuzuStore {
1050    pub fn get_conversation(&self, id: Uuid) -> Result<Option<Conversation>> {
1051        let conn = self.conn()?;
1052        let mut result = conn.query(&format!(
1053            "MATCH (c:Conversation {{id: '{}'}}) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1054            id
1055        ))?;
1056
1057        match result.next() {
1058            Some(row) => {
1059                let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1060                let source = value_to_string(&row[1]);
1061                let machine_id = value_to_string(&row[2]);
1062                let started_at_str = value_to_string(&row[3]);
1063                let project_path = {
1064                    let s = value_to_string(&row[4]);
1065                    if s.is_empty() { None } else { Some(s) }
1066                };
1067                let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1068                    .map(|dt| dt.with_timezone(&chrono::Utc))
1069                    .unwrap_or_else(|_| chrono::Utc::now());
1070
1071                Ok(Some(Conversation {
1072                    id,
1073                    source,
1074                    machine_id,
1075                    started_at,
1076                    project_path,
1077                }))
1078            }
1079            None => Ok(None),
1080        }
1081    }
1082
1083    pub fn get_memory_with_embedding(&self, id: Uuid) -> Result<Option<Memory>> {
1084        let conn = self.conn()?;
1085        let query = format!(
1086            "MATCH (m:Memory {{id: '{}'}}) RETURN {}, m.embedding;",
1087            id, memory_return_cols("m")
1088        );
1089
1090        let mut result = conn.query(&query)?;
1091        match result.next() {
1092            Some(row) => {
1093                let mut memory = row_to_memory(&row[..11])?;
1094                memory.embedding = value_to_f32_vec(&row[11]);
1095                Ok(Some(memory))
1096            }
1097            None => Ok(None),
1098        }
1099    }
1100
1101    pub fn import_memory(&self, memory: &Memory) -> Result<()> {
1102        let conn = self.conn()?;
1103        self.create_memory_node(&conn, memory)
1104    }
1105
1106    pub fn import_conversation(&self, conversation: &Conversation) -> Result<()> {
1107        let conn = self.conn()?;
1108        self.create_conversation_node(&conn, conversation)
1109    }
1110
1111    pub fn import_entity(&self, entity: &Entity) -> Result<()> {
1112        let conn = self.conn()?;
1113        self.create_entity_node(&conn, entity)
1114    }
1115
1116    pub fn import_relation(&self, relation: &Relation) -> Result<()> {
1117        let conn = self.conn()?;
1118        self.create_relation_edge(&conn, relation)
1119    }
1120
1121    pub fn import_delete_memory(&self, id: Uuid) -> Result<()> {
1122        let conn = self.conn()?;
1123        conn.query(&format!(
1124            "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
1125            id
1126        ))?;
1127        Ok(())
1128    }
1129
1130    pub fn import_or_update_memory(&self, memory: &Memory) -> Result<()> {
1131        let conn = self.conn()?;
1132        let id = memory.id.to_string();
1133        let last_accessed = memory.last_accessed.to_rfc3339();
1134        let content_escaped = escape_cypher(&memory.content);
1135        let embedding_str = format_embedding(&memory.embedding);
1136        let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
1137        let source_escaped = escape_cypher(&memory.source);
1138        let source_id_escaped = escape_cypher(&memory.source_id);
1139        let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
1140        let created_at = memory.created_at.to_rfc3339();
1141
1142        let query = format!(
1143            "MATCH (m:Memory {{id: '{id}'}}) SET m.content = '{content_escaped}', m.embedding = {embedding_str}, m.memory_type = '{memory_type}', m.confidence = {confidence}, m.created_at = '{created_at}', m.last_accessed = '{last_accessed}', m.access_count = {access_count}, m.source = '{source_escaped}', m.source_id = '{source_id_escaped}', m.project_path = '{project_path_escaped}';",
1144            confidence = memory.confidence,
1145            access_count = memory.access_count,
1146        );
1147
1148        conn.query(&query)?;
1149        Ok(())
1150    }
1151
1152    pub fn sync_log_since(&self, after_seq: u64) -> Result<Vec<SyncEntry>> {
1153        self.sync_log_page(after_seq, None)
1154    }
1155
1156    pub fn sync_log_page(&self, after_seq: u64, limit: Option<usize>) -> Result<Vec<SyncEntry>> {
1157        let conn = self.conn()?;
1158        let machine_escaped = escape_cypher(&self.machine_id);
1159        let limit_clause = match limit {
1160            Some(n) => format!(" LIMIT {}", n),
1161            None => String::new(),
1162        };
1163        let query = format!(
1164            "MATCH (s:SyncLog) WHERE s.seq > {} AND s.machine_id = '{}' RETURN s.seq, s.op, s.node_type, s.node_id, s.machine_id, s.timestamp, s.data ORDER BY s.seq{};",
1165            after_seq, machine_escaped, limit_clause
1166        );
1167
1168        let mut result = conn.query(&query)?;
1169        let mut entries = Vec::new();
1170
1171        for row in &mut result {
1172            let seq = match &row[0] {
1173                Value::Int64(n) => *n as u64,
1174                _ => 0,
1175            };
1176            let op_str = value_to_string(&row[1]);
1177            let node_type_str = value_to_string(&row[2]);
1178            let node_id = value_to_string(&row[3]);
1179            let machine_id = value_to_string(&row[4]);
1180            let timestamp_str = value_to_string(&row[5]);
1181            let data_str = value_to_string(&row[6]);
1182
1183            let op = match op_str.as_str() {
1184                "create" => SyncOp::Create,
1185                "update" => SyncOp::Update,
1186                "delete" => SyncOp::Delete,
1187                _ => continue,
1188            };
1189            let node_type = match node_type_str.as_str() {
1190                "memory" => SyncNodeType::Memory,
1191                "entity" => SyncNodeType::Entity,
1192                "conversation" => SyncNodeType::Conversation,
1193                "relation" => SyncNodeType::Relation,
1194                _ => continue,
1195            };
1196            let timestamp = chrono::DateTime::parse_from_rfc3339(&timestamp_str)
1197                .map(|dt| dt.with_timezone(&chrono::Utc))
1198                .unwrap_or_else(|_| chrono::Utc::now());
1199
1200            let data = if data_str.is_empty() {
1201                None
1202            } else {
1203                Some(data_str)
1204            };
1205
1206            entries.push(SyncEntry {
1207                seq,
1208                op,
1209                node_type,
1210                node_id,
1211                machine_id,
1212                timestamp,
1213                data,
1214            });
1215        }
1216
1217        Ok(entries)
1218    }
1219
1220    pub fn get_sync_state(&self, peer_id: &str) -> Result<Option<SyncState>> {
1221        let conn = self.conn()?;
1222        let escaped = escape_cypher(peer_id);
1223        let mut result = conn.query(&format!(
1224            "MATCH (s:SyncState {{peer_id: '{}'}}) RETURN s.peer_id, s.last_seq, s.last_sync_at;",
1225            escaped
1226        ))?;
1227
1228        match result.next() {
1229            Some(row) => {
1230                let peer_id = value_to_string(&row[0]);
1231                let last_seq = match &row[1] {
1232                    Value::Int64(n) => *n as u64,
1233                    _ => 0,
1234                };
1235                let last_sync_at_str = value_to_string(&row[2]);
1236                let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1237                    .map(|dt| dt.with_timezone(&chrono::Utc))
1238                    .unwrap_or_else(|_| chrono::Utc::now());
1239
1240                Ok(Some(SyncState {
1241                    peer_id,
1242                    last_seq,
1243                    last_sync_at,
1244                }))
1245            }
1246            None => Ok(None),
1247        }
1248    }
1249
1250    pub fn set_sync_state(&self, state: &SyncState) -> Result<()> {
1251        let conn = self.conn()?;
1252        let peer_escaped = escape_cypher(&state.peer_id);
1253        let now = state.last_sync_at.to_rfc3339();
1254
1255        conn.query(&format!(
1256            "MERGE (s:SyncState {{peer_id: '{}'}}) SET s.last_seq = {}, s.last_sync_at = '{}';",
1257            peer_escaped, state.last_seq, now
1258        ))?;
1259        Ok(())
1260    }
1261
1262    pub fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
1263        let conn = self.conn()?;
1264        let mut result =
1265            conn.query("MATCH (s:SyncState) RETURN s.peer_id, s.last_seq, s.last_sync_at;")?;
1266        let mut states = Vec::new();
1267
1268        for row in &mut result {
1269            let peer_id = value_to_string(&row[0]);
1270            let last_seq = match &row[1] {
1271                Value::Int64(n) => *n as u64,
1272                _ => 0,
1273            };
1274            let last_sync_at_str = value_to_string(&row[2]);
1275            let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1276                .map(|dt| dt.with_timezone(&chrono::Utc))
1277                .unwrap_or_else(|_| chrono::Utc::now());
1278
1279            states.push(SyncState {
1280                peer_id,
1281                last_seq,
1282                last_sync_at,
1283            });
1284        }
1285
1286        Ok(states)
1287    }
1288
1289    pub fn all_entities(&self) -> Result<Vec<Entity>> {
1290        let conn = self.conn()?;
1291        let mut result = conn.query("MATCH (e:Entity) RETURN e.id, e.name, e.entity_type;")?;
1292        let mut entities = Vec::new();
1293        for row in &mut result {
1294            entities.push(row_to_entity(&row)?);
1295        }
1296        Ok(entities)
1297    }
1298
1299    pub fn memories_for_entity(&self, entity_id: Uuid) -> Result<Vec<Memory>> {
1300        let conn = self.conn()?;
1301        let query = format!(
1302            "MATCH (m:Memory)-[:MENTIONS]->(e:Entity {{id: '{}'}}) RETURN {};",
1303            entity_id, memory_return_cols("m")
1304        );
1305        let mut result = conn.query(&query)?;
1306        let mut memories = Vec::new();
1307        for row in &mut result {
1308            memories.push(row_to_memory(&row)?);
1309        }
1310        Ok(memories)
1311    }
1312
1313    pub fn unassociated_memories(&self) -> Result<Vec<Memory>> {
1314        let conn = self.conn()?;
1315        let mut result = conn.query(
1316            &format!("MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (m)-[:MENTIONS]->(:Entity)}} RETURN {};", memory_return_cols("m"))
1317        )?;
1318        let mut memories = Vec::new();
1319        for row in &mut result {
1320            memories.push(row_to_memory(&row)?);
1321        }
1322        Ok(memories)
1323    }
1324
1325    pub fn related_entity_names(&self, entity_id: Uuid) -> Result<Vec<String>> {
1326        let conn = self.conn()?;
1327        let query = format!(
1328            "MATCH (m:Memory)-[:MENTIONS]->(e1:Entity {{id: '{}'}}), (m)-[:MENTIONS]->(e2:Entity) WHERE e2.id <> '{}' RETURN DISTINCT e2.name;",
1329            entity_id, entity_id
1330        );
1331        let mut result = conn.query(&query)?;
1332        let mut names = Vec::new();
1333        for row in &mut result {
1334            names.push(value_to_string(&row[0]));
1335        }
1336        Ok(names)
1337    }
1338
1339    pub fn max_sync_seq(&self) -> Result<u64> {
1340        let conn = self.conn()?;
1341        let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.seq);")?;
1342        match result.next() {
1343            Some(row) => match &row[0] {
1344                Value::Int64(n) => Ok(*n as u64),
1345                _ => Ok(0),
1346            },
1347            None => Ok(0),
1348        }
1349    }
1350
1351    pub fn backfill_project_paths(&self) -> Result<u64> {
1352        let conn = self.conn()?;
1353        let mut result = conn.query(
1354            "MATCH (m:Memory)-[:DERIVED_FROM]->(c:Conversation) WHERE m.project_path = '' AND c.project_path IS NOT NULL AND c.project_path <> '' SET m.project_path = c.project_path RETURN count(m);"
1355        )?;
1356        match result.next() {
1357            Some(row) => match &row[0] {
1358                Value::Int64(n) => Ok(*n as u64),
1359                _ => Ok(0),
1360            },
1361            None => Ok(0),
1362        }
1363    }
1364
1365    pub fn memories_by_project_path(&self, project_path: &str) -> Result<Vec<Memory>> {
1366        let conn = self.conn()?;
1367        let escaped = escape_cypher(project_path);
1368        let cols = memory_return_cols("m");
1369        let query = format!(
1370            "MATCH (m:Memory) WHERE m.project_path = '{}' RETURN {} LIMIT 20;",
1371            escaped, cols
1372        );
1373        let mut result = conn.query(&query)?;
1374        let mut memories = Vec::new();
1375        for row in &mut result {
1376            memories.push(row_to_memory(&row)?);
1377        }
1378        Ok(memories)
1379    }
1380
1381    pub fn backfill_sync_log(&self) -> Result<u64> {
1382        let conn = self.conn()?;
1383        let mut count = 0u64;
1384
1385        let mut result = conn.query(
1386            &format!("MATCH (m:Memory) RETURN {}, m.embedding;", memory_return_cols("m")),
1387        )?;
1388        for row in &mut result {
1389            let mut memory = row_to_memory(&row[..11])?;
1390            memory.embedding = value_to_f32_vec(&row[11]);
1391            let data = serde_json::to_string(&memory).ok();
1392            self.log_sync_entry(
1393                &conn,
1394                SyncOp::Create,
1395                SyncNodeType::Memory,
1396                &memory.id.to_string(),
1397                data.as_deref(),
1398            )?;
1399            count += 1;
1400        }
1401
1402        let mut result = conn.query(
1403            "MATCH (c:Conversation) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1404        )?;
1405        for row in &mut result {
1406            let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1407            let source = value_to_string(&row[1]);
1408            let machine_id = value_to_string(&row[2]);
1409            let started_at_str = value_to_string(&row[3]);
1410            let project_path = {
1411                let s = value_to_string(&row[4]);
1412                if s.is_empty() { None } else { Some(s) }
1413            };
1414            let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1415                .map(|dt| dt.with_timezone(&chrono::Utc))
1416                .unwrap_or_else(|_| chrono::Utc::now());
1417
1418            let conv = Conversation {
1419                id,
1420                source,
1421                machine_id,
1422                started_at,
1423                project_path,
1424            };
1425            let data = serde_json::to_string(&conv).ok();
1426            self.log_sync_entry(
1427                &conn,
1428                SyncOp::Create,
1429                SyncNodeType::Conversation,
1430                &conv.id.to_string(),
1431                data.as_deref(),
1432            )?;
1433            count += 1;
1434        }
1435
1436        Ok(count)
1437    }
1438}
1439
1440impl KuzuStore {
1441    pub fn upsert_machine(&self, id: &str, name: &str) -> Result<()> {
1442        let conn = self.conn()?;
1443        let id_escaped = escape_cypher(id);
1444        let name_escaped = escape_cypher(name);
1445        conn.query(&format!(
1446            "MERGE (m:Machine {{id: '{id_escaped}'}}) SET m.name = '{name_escaped}';"
1447        ))?;
1448        Ok(())
1449    }
1450
1451    pub fn get_machine_name(&self, id: &str) -> Result<Option<String>> {
1452        let conn = self.conn()?;
1453        let id_escaped = escape_cypher(id);
1454        let mut result = conn.query(&format!(
1455            "MATCH (m:Machine {{id: '{id_escaped}'}}) RETURN m.name;"
1456        ))?;
1457        match result.next() {
1458            Some(row) => Ok(Some(value_to_string(&row[0]))),
1459            None => Ok(None),
1460        }
1461    }
1462
1463    pub fn get_all_machines(&self) -> Result<std::collections::HashMap<String, String>> {
1464        let conn = self.conn()?;
1465        let mut result = conn.query("MATCH (m:Machine) RETURN m.id, m.name;")?;
1466        let mut map = std::collections::HashMap::new();
1467        for row in &mut result {
1468            map.insert(value_to_string(&row[0]), value_to_string(&row[1]));
1469        }
1470        Ok(map)
1471    }
1472
1473    pub fn backfill_machine_id(&self, machine_id: &str) -> Result<u64> {
1474        let conn = self.conn()?;
1475        let escaped = escape_cypher(machine_id);
1476        let mut result = conn.query(&format!(
1477            "MATCH (m:Memory) WHERE m.machine_id = '' SET m.machine_id = '{escaped}' RETURN count(m);"
1478        ))?;
1479        match result.next() {
1480            Some(row) => match &row[0] {
1481                Value::Int64(n) => Ok(*n as u64),
1482                _ => Ok(0),
1483            },
1484            None => Ok(0),
1485        }
1486    }
1487
1488    pub fn register_machine(&self, identity: &crate::machine::MachineIdentity) -> Result<()> {
1489        self.upsert_machine(&identity.id, &identity.name)?;
1490        let count = self.backfill_machine_id(&identity.id)?;
1491        if count > 0 {
1492            tracing::info!("backfilled machine_id on {count} existing memories");
1493        }
1494        Ok(())
1495    }
1496}
1497
1498fn format_embedding(embedding: &[f32]) -> String {
1499    if embedding.is_empty() {
1500        let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
1501        return format!("[{}]", zeros.join(","));
1502    }
1503    let parts: Vec<String> = embedding.iter().map(|v| format!("{}", v)).collect();
1504    format!("[{}]", parts.join(","))
1505}
1506
1507fn escape_cypher(s: &str) -> String {
1508    s.replace('\\', "\\\\").replace('\'', "\\'")
1509}
1510
1511fn format_string_array(items: &[String]) -> String {
1512    let parts: Vec<String> = items
1513        .iter()
1514        .map(|s| format!("'{}'", s.replace('\'', "''")))
1515        .collect();
1516    format!("[{}]", parts.join(","))
1517}
1518
1519fn value_to_string(val: &Value) -> String {
1520    match val {
1521        Value::String(s) => s.clone(),
1522        other => format!("{:?}", other),
1523    }
1524}
1525
1526fn value_to_f32(val: &Value) -> f32 {
1527    match val {
1528        Value::Float(f) => *f,
1529        Value::Double(d) => *d as f32,
1530        Value::Int64(i) => *i as f32,
1531        _ => 0.0,
1532    }
1533}
1534
1535fn value_to_f32_vec(val: &Value) -> Vec<f32> {
1536    match val {
1537        Value::Array(_, items) | Value::List(_, items) => items.iter().map(value_to_f32).collect(),
1538        _ => Vec::new(),
1539    }
1540}
1541
1542fn row_to_memory(row: &[Value]) -> Result<Memory> {
1543    let id_str = value_to_string(&row[0]);
1544    let id = Uuid::parse_str(&id_str).unwrap_or_default();
1545    let content = value_to_string(&row[1]);
1546    let memory_type_str = value_to_string(&row[2]);
1547    let confidence = value_to_f32(&row[3]);
1548    let created_at_str = value_to_string(&row[4]);
1549    let last_accessed_str = value_to_string(&row[5]);
1550    let access_count = match &row[6] {
1551        Value::Int64(i) => *i as u32,
1552        _ => 0,
1553    };
1554    let source = value_to_string(&row[7]);
1555    let source_id = value_to_string(&row[8]);
1556    let project_path = project_path_from_db(&value_to_string(&row[9]));
1557    let machine_id = value_to_string(&row[10]);
1558
1559    let memory_type = match memory_type_str.as_str() {
1560        "episodic" => MemoryType::Episodic,
1561        "procedural" => MemoryType::Procedural,
1562        "decision" => MemoryType::Decision,
1563        "architecture" => MemoryType::Architecture,
1564        "debugging" => MemoryType::Debugging,
1565        "task" => MemoryType::Task,
1566        "question" => MemoryType::Question,
1567        _ => MemoryType::Semantic,
1568    };
1569
1570    let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
1571        .map(|dt| dt.with_timezone(&chrono::Utc))
1572        .unwrap_or_else(|_| chrono::Utc::now());
1573
1574    let last_accessed = chrono::DateTime::parse_from_rfc3339(&last_accessed_str)
1575        .map(|dt| dt.with_timezone(&chrono::Utc))
1576        .unwrap_or_else(|_| chrono::Utc::now());
1577
1578    Ok(Memory {
1579        id,
1580        content,
1581        embedding: Vec::new(),
1582        memory_type,
1583        confidence,
1584        created_at,
1585        last_accessed,
1586        access_count,
1587        source,
1588        source_id,
1589        project_path,
1590        machine_id,
1591    })
1592}
1593
1594fn row_to_entity(row: &[Value]) -> Result<Entity> {
1595    let id_str = value_to_string(&row[0]);
1596    let id = Uuid::parse_str(&id_str).unwrap_or_default();
1597    let name = value_to_string(&row[1]);
1598    let entity_type = value_to_string(&row[2]);
1599
1600    Ok(Entity {
1601        id,
1602        name,
1603        entity_type,
1604        embedding: Vec::new(),
1605        aliases: Vec::new(),
1606    })
1607}
1608
1609unsafe impl Send for KuzuStore {}
1610unsafe impl Sync for KuzuStore {}