Skip to main content

second_brain_core/
kuzu_store.rs

1use std::path::Path;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use anyhow::{Context, Result};
5use chrono::{DateTime, Utc};
6use kuzu::{Connection, Database, SystemConfig, Value};
7use uuid::Uuid;
8
9use crate::schema::{
10    Conversation, Entity, Memory, MemoryType, Relation, RelationType, SyncEntry, SyncNodeType,
11    SyncOp, SyncState,
12};
13use crate::store::Store;
14
15pub struct KuzuStore {
16    db: Database,
17    machine_id: String,
18    sync_seq: AtomicU64,
19}
20
21fn memory_return_cols(alias: &str) -> String {
22    ["id", "content", "memory_type", "confidence", "created_at", "last_accessed", "access_count", "source", "source_id", "project_path"]
23        .iter()
24        .map(|col| format!("{alias}.{col}"))
25        .collect::<Vec<_>>()
26        .join(", ")
27}
28
29fn project_path_from_db(s: &str) -> Option<String> {
30    if s.is_empty() { None } else { Some(s.to_string()) }
31}
32
33impl KuzuStore {
34    pub fn open(path: &Path, machine_id: String) -> Result<Self> {
35        let db = Database::new(path, SystemConfig::default()).context("opening KùzuDB database")?;
36        let store = Self {
37            db,
38            machine_id,
39            sync_seq: AtomicU64::new(1),
40        };
41        store.init_schema()?;
42        store.init_sync_seq()?;
43        Ok(store)
44    }
45
46    pub fn in_memory(machine_id: String) -> Result<Self> {
47        let db =
48            Database::in_memory(SystemConfig::default()).context("creating in-memory KùzuDB")?;
49        let store = Self {
50            db,
51            machine_id,
52            sync_seq: AtomicU64::new(1),
53        };
54        store.init_schema()?;
55        Ok(store)
56    }
57
58    pub fn machine_id(&self) -> &str {
59        &self.machine_id
60    }
61
62    fn conn(&self) -> Result<Connection<'_>> {
63        Connection::new(&self.db).context("creating connection")
64    }
65
66    pub fn diagnostic(&self) -> Result<String> {
67        let conn = self.conn()?;
68
69        let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
70        let total: i64 = result
71            .next()
72            .map(|r| match &r[0] {
73                Value::Int64(v) => *v,
74                _ => -1,
75            })
76            .unwrap_or(-1);
77
78        let mut result = conn.query(
79            "MATCH (m:Memory) WHERE size(m.embedding) > 0 AND m.embedding[1] <> 0.0 RETURN count(m);",
80        )?;
81        let with_emb: i64 = result
82            .next()
83            .map(|r| match &r[0] {
84                Value::Int64(v) => *v,
85                _ => -1,
86            })
87            .unwrap_or(-1);
88
89        let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
90        let zeros_str = format!("[{}]", zeros.join(","));
91        let idx_query = format!(
92            "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, 1) YIELD node, distance RETURN distance;",
93            zeros_str
94        );
95        let idx_status = match conn.query(&idx_query) {
96            Ok(mut r) => match r.next() {
97                Some(row) => format!("ok (distance: {:?})", &row[0]),
98                None => "ok (0 results)".to_string(),
99            },
100            Err(e) => format!("error: {e}"),
101        };
102
103        Ok(format!(
104            "total memories: {total}\nwith embeddings: {with_emb}\nvector index: {idx_status}"
105        ))
106    }
107
108    fn init_schema(&self) -> Result<()> {
109        let conn = self.conn()?;
110
111        conn.query(
112            "CREATE NODE TABLE IF NOT EXISTS Memory(
113                id STRING PRIMARY KEY,
114                content STRING,
115                embedding FLOAT[384],
116                memory_type STRING,
117                confidence FLOAT,
118                created_at STRING,
119                last_accessed STRING,
120                access_count INT64,
121                source STRING,
122                source_id STRING,
123                project_path STRING
124            );",
125        )
126        .context("creating Memory table")?;
127
128        conn.query("ALTER TABLE Memory ADD project_path STRING DEFAULT '';").ok();
129
130        conn.query(
131            "CREATE NODE TABLE IF NOT EXISTS Entity(
132                id STRING PRIMARY KEY,
133                name STRING,
134                entity_type STRING,
135                embedding FLOAT[384],
136                aliases STRING[]
137            );",
138        )
139        .context("creating Entity table")?;
140
141        conn.query(
142            "CREATE NODE TABLE IF NOT EXISTS Conversation(
143                id STRING PRIMARY KEY,
144                source STRING,
145                machine_id STRING,
146                started_at STRING,
147                project_path STRING
148            );",
149        )
150        .context("creating Conversation table")?;
151
152        conn.query(
153            "CREATE NODE TABLE IF NOT EXISTS IngestLog(
154                file_path STRING PRIMARY KEY,
155                file_hash STRING,
156                ingested_at STRING,
157                memory_count INT64,
158                source STRING
159            );",
160        )
161        .context("creating IngestLog table")?;
162
163        conn.query(
164            "CREATE REL TABLE IF NOT EXISTS RELATES_TO(
165                FROM Memory TO Memory,
166                strength FLOAT,
167                context STRING
168            );",
169        )
170        .context("creating RELATES_TO rel")?;
171
172        conn.query(
173            "CREATE REL TABLE IF NOT EXISTS MENTIONS(
174                FROM Memory TO Entity,
175                position INT64
176            );",
177        )
178        .context("creating MENTIONS rel")?;
179
180        conn.query(
181            "CREATE REL TABLE IF NOT EXISTS DERIVED_FROM(
182                FROM Memory TO Conversation,
183                transformation STRING
184            );",
185        )
186        .context("creating DERIVED_FROM rel")?;
187
188        conn.query(
189            "CREATE REL TABLE IF NOT EXISTS DISTILLED_FROM(
190                FROM Memory TO Memory,
191                model STRING
192            );",
193        )
194        .context("creating DISTILLED_FROM rel")?;
195
196        conn.query(
197            "CREATE REL TABLE IF NOT EXISTS CONTRADICTS(
198                FROM Memory TO Memory,
199                resolution STRING
200            );",
201        )
202        .context("creating CONTRADICTS rel")?;
203
204        conn.query(
205            "CREATE REL TABLE IF NOT EXISTS REINFORCES(
206                FROM Memory TO Memory
207            );",
208        )
209        .context("creating REINFORCES rel")?;
210
211        conn.query(
212            "CREATE REL TABLE IF NOT EXISTS SUPERSEDES(
213                FROM Memory TO Memory,
214                reason STRING
215            );",
216        )
217        .context("creating SUPERSEDES rel")?;
218
219        conn.query(
220            "CREATE NODE TABLE IF NOT EXISTS SyncLog(
221                seq INT64 PRIMARY KEY,
222                op STRING,
223                node_type STRING,
224                node_id STRING,
225                machine_id STRING,
226                timestamp STRING,
227                data STRING
228            );",
229        )
230        .context("creating SyncLog table")?;
231
232        conn.query(
233            "CREATE NODE TABLE IF NOT EXISTS SyncState(
234                peer_id STRING PRIMARY KEY,
235                last_seq INT64,
236                last_sync_at STRING
237            );",
238        )
239        .context("creating SyncState table")?;
240
241        conn.query(
242            "CREATE NODE TABLE IF NOT EXISTS ConsolidationLog(
243                memory_id STRING PRIMARY KEY,
244                distilled_id STRING,
245                consolidated_at STRING,
246                model STRING
247            );",
248        )
249        .context("creating ConsolidationLog table")?;
250
251        conn.query(
252            "CREATE NODE TABLE IF NOT EXISTS WikiExportLog(
253                id STRING PRIMARY KEY,
254                last_sync_seq INT64,
255                exported_at STRING,
256                vault_path STRING,
257                pages_created INT64,
258                pages_updated INT64,
259                memories_processed INT64
260            );",
261        )
262        .context("creating WikiExportLog table")?;
263
264        conn.query(
265            "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
266        )
267        .ok();
268
269        Ok(())
270    }
271
272    fn init_sync_seq(&self) -> Result<()> {
273        let conn = self.conn()?;
274        let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.seq);")?;
275        if let Some(row) = result.next()
276            && let Value::Int64(max_seq) = &row[0]
277        {
278            self.sync_seq.store((*max_seq as u64) + 1, Ordering::SeqCst);
279        }
280        Ok(())
281    }
282
283    fn log_sync_entry(
284        &self,
285        conn: &Connection<'_>,
286        op: SyncOp,
287        node_type: SyncNodeType,
288        node_id: &str,
289        data: Option<&str>,
290    ) -> Result<()> {
291        let seq = self.sync_seq.fetch_add(1, Ordering::SeqCst);
292        let timestamp = chrono::Utc::now().to_rfc3339();
293        let op_str = match op {
294            SyncOp::Create => "create",
295            SyncOp::Update => "update",
296            SyncOp::Delete => "delete",
297        };
298        let nt_str = match node_type {
299            SyncNodeType::Memory => "memory",
300            SyncNodeType::Entity => "entity",
301            SyncNodeType::Conversation => "conversation",
302            SyncNodeType::Relation => "relation",
303        };
304        let machine_escaped = escape_cypher(&self.machine_id);
305        let node_id_escaped = escape_cypher(node_id);
306        let data_escaped = data.map(escape_cypher).unwrap_or_default();
307
308        let query = format!(
309            "CREATE (:SyncLog {{
310                seq: {seq},
311                op: '{op_str}',
312                node_type: '{nt_str}',
313                node_id: '{node_id_escaped}',
314                machine_id: '{machine_escaped}',
315                timestamp: '{timestamp}',
316                data: '{data_escaped}'
317            }});"
318        );
319        conn.query(&query)?;
320        Ok(())
321    }
322}
323
324impl Store for KuzuStore {
325    fn store_memory(&self, memory: &Memory) -> Result<()> {
326        let conn = self.conn()?;
327        self.create_memory_node(&conn, memory)?;
328        let data = serde_json::to_string(memory).ok();
329        self.log_sync_entry(
330            &conn,
331            SyncOp::Create,
332            SyncNodeType::Memory,
333            &memory.id.to_string(),
334            data.as_deref(),
335        )?;
336        Ok(())
337    }
338
339    fn get_memory(&self, id: Uuid) -> Result<Option<Memory>> {
340        let conn = self.conn()?;
341        let query = format!(
342            "MATCH (m:Memory {{id: '{}'}}) RETURN {};",
343            id, memory_return_cols("m")
344        );
345
346        let mut result = conn.query(&query)?;
347        match result.next() {
348            Some(row) => Ok(Some(row_to_memory(&row)?)),
349            None => Ok(None),
350        }
351    }
352
353    fn delete_memory(&self, id: Uuid) -> Result<()> {
354        let conn = self.conn()?;
355        conn.query(&format!(
356            "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
357            id
358        ))?;
359        self.log_sync_entry(
360            &conn,
361            SyncOp::Delete,
362            SyncNodeType::Memory,
363            &id.to_string(),
364            None,
365        )?;
366        Ok(())
367    }
368
369    fn store_entity(&self, entity: &Entity) -> Result<()> {
370        let conn = self.conn()?;
371        self.create_entity_node(&conn, entity)?;
372        let data = serde_json::to_string(entity).ok();
373        self.log_sync_entry(
374            &conn,
375            SyncOp::Create,
376            SyncNodeType::Entity,
377            &entity.id.to_string(),
378            data.as_deref(),
379        )?;
380        Ok(())
381    }
382
383    fn get_entity(&self, id: Uuid) -> Result<Option<Entity>> {
384        let conn = self.conn()?;
385        let mut result = conn.query(&format!(
386            "MATCH (e:Entity {{id: '{}'}}) RETURN e.id, e.name, e.entity_type;",
387            id
388        ))?;
389
390        match result.next() {
391            Some(row) => Ok(Some(row_to_entity(&row)?)),
392            None => Ok(None),
393        }
394    }
395
396    fn find_entity_by_name(&self, name: &str) -> Result<Option<Entity>> {
397        let conn = self.conn()?;
398        let name_escaped = escape_cypher(name);
399        let query = format!(
400            "MATCH (e:Entity) WHERE e.name = '{}' RETURN e.id, e.name, e.entity_type;",
401            name_escaped
402        );
403        let mut result = conn.query(&query)?;
404
405        match result.next() {
406            Some(row) => Ok(Some(row_to_entity(&row)?)),
407            None => Ok(None),
408        }
409    }
410
411    fn store_conversation(&self, conversation: &Conversation) -> Result<()> {
412        let conn = self.conn()?;
413        self.create_conversation_node(&conn, conversation)?;
414        let data = serde_json::to_string(conversation).ok();
415        self.log_sync_entry(
416            &conn,
417            SyncOp::Create,
418            SyncNodeType::Conversation,
419            &conversation.id.to_string(),
420            data.as_deref(),
421        )?;
422        Ok(())
423    }
424
425    fn store_relation(&self, relation: &Relation) -> Result<()> {
426        let conn = self.conn()?;
427        self.create_relation_edge(&conn, relation)?;
428        let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
429        let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
430        let data = serde_json::to_string(relation).ok();
431        self.log_sync_entry(
432            &conn,
433            SyncOp::Create,
434            SyncNodeType::Relation,
435            &node_id,
436            data.as_deref(),
437        )?;
438        Ok(())
439    }
440
441    fn get_relations(
442        &self,
443        node_id: Uuid,
444        relation_type: Option<RelationType>,
445    ) -> Result<Vec<Relation>> {
446        let conn = self.conn()?;
447        let id = node_id.to_string();
448
449        let query = match relation_type {
450            Some(RelationType::RelatesTo) => format!(
451                "MATCH (a:Memory {{id: '{}'}})-[r:RELATES_TO]->(b:Memory) RETURN b.id, r.strength, r.context;",
452                id
453            ),
454            Some(RelationType::Contradicts) => format!(
455                "MATCH (a:Memory {{id: '{}'}})-[r:CONTRADICTS]->(b:Memory) RETURN b.id, r.resolution;",
456                id
457            ),
458            Some(RelationType::Reinforces) => format!(
459                "MATCH (a:Memory {{id: '{}'}})-[:REINFORCES]->(b:Memory) RETURN b.id;",
460                id
461            ),
462            Some(RelationType::Supersedes) => format!(
463                "MATCH (a:Memory {{id: '{}'}})-[r:SUPERSEDES]->(b:Memory) RETURN b.id, r.reason;",
464                id
465            ),
466            Some(RelationType::Mentions) => format!(
467                "MATCH (a:Memory {{id: '{}'}})-[:MENTIONS]->(b:Entity) RETURN b.id;",
468                id
469            ),
470            Some(RelationType::DerivedFrom) => format!(
471                "MATCH (a:Memory {{id: '{}'}})-[:DERIVED_FROM]->(b:Conversation) RETURN b.id;",
472                id
473            ),
474            Some(RelationType::DistilledFrom) => format!(
475                "MATCH (a:Memory {{id: '{}'}})-[:DISTILLED_FROM]->(b:Memory) RETURN b.id;",
476                id
477            ),
478            None => {
479                let mut relations = Vec::new();
480                let mem_to_mem = [
481                    ("RELATES_TO", RelationType::RelatesTo),
482                    ("REINFORCES", RelationType::Reinforces),
483                    ("CONTRADICTS", RelationType::Contradicts),
484                    ("SUPERSEDES", RelationType::Supersedes),
485                    ("DISTILLED_FROM", RelationType::DistilledFrom),
486                ];
487                for (label, rt) in &mem_to_mem {
488                    let q = format!(
489                        "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:Memory) RETURN b.id;",
490                        id, label
491                    );
492                    if let Ok(mut result) = conn.query(&q) {
493                        for row in &mut result {
494                            let to_id_str = value_to_string(&row[0]);
495                            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
496                            relations.push(Relation {
497                                from_id: node_id,
498                                to_id,
499                                relation_type: *rt,
500                                strength: 1.0,
501                                context: None,
502                            });
503                        }
504                    }
505                }
506                for (label, target, rt) in [
507                    ("MENTIONS", "Entity", RelationType::Mentions),
508                    ("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
509                ] {
510                    let q = format!(
511                        "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:{}) RETURN b.id;",
512                        id, label, target
513                    );
514                    if let Ok(mut result) = conn.query(&q) {
515                        for row in &mut result {
516                            let to_id_str = value_to_string(&row[0]);
517                            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
518                            relations.push(Relation {
519                                from_id: node_id,
520                                to_id,
521                                relation_type: rt,
522                                strength: 1.0,
523                                context: None,
524                            });
525                        }
526                    }
527                }
528                return Ok(relations);
529            }
530        };
531
532        let mut result = conn.query(&query)?;
533        let mut relations = Vec::new();
534
535        for row in &mut result {
536            let to_id_str = value_to_string(&row[0]);
537            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
538
539            relations.push(Relation {
540                from_id: node_id,
541                to_id,
542                relation_type: relation_type.unwrap_or(RelationType::RelatesTo),
543                strength: 1.0,
544                context: None,
545            });
546        }
547
548        Ok(relations)
549    }
550
551    fn vector_search(&self, embedding: &[f32], limit: usize) -> Result<Vec<(Memory, f32)>> {
552        let conn = self.conn()?;
553        let embedding_str = format_embedding(embedding);
554
555        let query = format!(
556            "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, {}) YIELD node, distance
557             RETURN {}, distance;",
558            embedding_str, limit, memory_return_cols("node")
559        );
560
561        let mut result = conn.query(&query)?;
562        let mut results = Vec::new();
563
564        for row in &mut result {
565            let memory = row_to_memory(&row[..10])?;
566            let distance = value_to_f32(&row[10]);
567            let similarity = 1.0 - distance;
568            results.push((memory, similarity));
569        }
570
571        Ok(results)
572    }
573
574    fn traverse(&self, start_id: Uuid, depth: u32) -> Result<Vec<(Memory, Vec<Relation>)>> {
575        let conn = self.conn()?;
576        let query = format!(
577            "MATCH (start:Memory {{id: '{}'}})-[r:RELATES_TO|REINFORCES*1..{}]->(m:Memory)
578             RETURN DISTINCT {};",
579            start_id, depth, memory_return_cols("m")
580        );
581
582        let mut result = conn.query(&query)?;
583        let mut results = Vec::new();
584
585        for row in &mut result {
586            let memory = row_to_memory(&row)?;
587            results.push((memory, Vec::new()));
588        }
589
590        Ok(results)
591    }
592
593    fn memories_by_source(&self, source: &str) -> Result<Vec<Memory>> {
594        let conn = self.conn()?;
595        let source_escaped = escape_cypher(source);
596        let query = format!(
597            "MATCH (m:Memory) WHERE m.source = '{}' RETURN {};",
598            source_escaped, memory_return_cols("m")
599        );
600        let mut result = conn.query(&query)?;
601
602        let mut memories = Vec::new();
603        for row in &mut result {
604            memories.push(row_to_memory(&row)?);
605        }
606        Ok(memories)
607    }
608
609    fn memories_by_type(&self, memory_type: MemoryType) -> Result<Vec<Memory>> {
610        let conn = self.conn()?;
611        let type_str = format!("{:?}", memory_type).to_lowercase();
612        let query = format!(
613            "MATCH (m:Memory) WHERE m.memory_type = '{}' RETURN {};",
614            type_str, memory_return_cols("m")
615        );
616        let mut result = conn.query(&query)?;
617
618        let mut memories = Vec::new();
619        for row in &mut result {
620            memories.push(row_to_memory(&row)?);
621        }
622        Ok(memories)
623    }
624
625    fn memories_needing_decay(&self, threshold_days: u32) -> Result<Vec<Memory>> {
626        let conn = self.conn()?;
627        let cutoff = chrono::Utc::now() - chrono::Duration::days(threshold_days as i64);
628        let cutoff_str = cutoff.to_rfc3339();
629
630        let query = format!(
631            "MATCH (m:Memory) WHERE m.last_accessed < '{}' AND m.confidence > 0.05 RETURN {};",
632            cutoff_str, memory_return_cols("m")
633        );
634
635        let mut result = conn.query(&query)?;
636        let mut memories = Vec::new();
637        for row in &mut result {
638            memories.push(row_to_memory(&row)?);
639        }
640        Ok(memories)
641    }
642
643    fn update_memory(&self, memory: &Memory) -> Result<()> {
644        let conn = self.conn()?;
645        let id = memory.id.to_string();
646        let last_accessed = memory.last_accessed.to_rfc3339();
647        let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
648
649        let query = format!(
650            "MATCH (m:Memory {{id: '{}'}}) SET m.confidence = {}, m.last_accessed = '{}', m.access_count = {}, m.project_path = '{}';",
651            id, memory.confidence, last_accessed, memory.access_count, project_path_escaped
652        );
653
654        conn.query(&query)?;
655        let data = serde_json::to_string(memory).ok();
656        self.log_sync_entry(
657            &conn,
658            SyncOp::Update,
659            SyncNodeType::Memory,
660            &id,
661            data.as_deref(),
662        )?;
663        Ok(())
664    }
665
666    fn text_search(&self, query: &str, limit: usize) -> Result<Vec<Memory>> {
667        let conn = self.conn()?;
668        let query_escaped = escape_cypher(&query.to_lowercase());
669        let cypher = format!(
670            "MATCH (m:Memory) WHERE lower(m.content) CONTAINS '{}' RETURN {} LIMIT {};",
671            query_escaped, memory_return_cols("m"), limit
672        );
673
674        let mut result = conn.query(&cypher)?;
675        let mut memories = Vec::new();
676        for row in &mut result {
677            memories.push(row_to_memory(&row)?);
678        }
679        Ok(memories)
680    }
681
682    fn memory_count(&self) -> Result<usize> {
683        let conn = self.conn()?;
684        let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
685        match result.next() {
686            Some(row) => match &row[0] {
687                Value::Int64(n) => Ok(*n as usize),
688                _ => Ok(0),
689            },
690            None => Ok(0),
691        }
692    }
693}
694
695impl KuzuStore {
696    pub fn find_or_create_entity(&self, name: &str, entity_type: &str) -> Result<Entity> {
697        if let Some(existing) = self.find_entity_by_name(name)? {
698            return Ok(existing);
699        }
700        let entity = Entity::new(name.to_string(), entity_type.to_string());
701        self.store_entity(&entity)?;
702        Ok(entity)
703    }
704
705    pub fn memory_content_exists(&self, content_prefix: &str) -> Result<bool> {
706        let conn = self.conn()?;
707        let escaped = escape_cypher(content_prefix);
708        let query = format!(
709            "MATCH (m:Memory) WHERE starts_with(m.content, '{}') RETURN m.id LIMIT 1;",
710            escaped
711        );
712        let mut result = conn.query(&query)?;
713        Ok(result.next().is_some())
714    }
715
716    pub fn unconsolidated_memories(&self, limit: usize) -> Result<Vec<Memory>> {
717        let conn = self.conn()?;
718        let query = format!(
719            "MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (c:ConsolidationLog) WHERE c.memory_id = m.id}} AND m.source <> 'consolidation' RETURN {} LIMIT {};",
720            memory_return_cols("m"), limit
721        );
722        let mut result = conn.query(&query)?;
723        let mut memories = Vec::new();
724        for row in &mut result {
725            memories.push(row_to_memory(&row)?);
726        }
727        Ok(memories)
728    }
729
730    pub fn memories_created_between(
731        &self,
732        start: &DateTime<Utc>,
733        end: &DateTime<Utc>,
734    ) -> Result<Vec<Memory>> {
735        let conn = self.conn()?;
736        let query = format!(
737            "MATCH (m:Memory) WHERE m.created_at >= '{}' AND m.created_at < '{}' RETURN {} ORDER BY m.created_at ASC;",
738            start.to_rfc3339(),
739            end.to_rfc3339(),
740            memory_return_cols("m")
741        );
742        let mut result = conn.query(&query)?;
743        let mut memories = Vec::new();
744        for row in &mut result {
745            memories.push(row_to_memory(&row)?);
746        }
747        Ok(memories)
748    }
749
750    pub fn mark_consolidated(&self, raw_id: Uuid, distilled_id: Uuid, model: &str) -> Result<()> {
751        let conn = self.conn()?;
752        let now = Utc::now().to_rfc3339();
753        let query = format!(
754            "CREATE (:ConsolidationLog {{memory_id: '{}', distilled_id: '{}', consolidated_at: '{}', model: '{}'}});",
755            raw_id,
756            distilled_id,
757            now,
758            escape_cypher(model)
759        );
760        conn.query(&query)?;
761        Ok(())
762    }
763
764    pub fn consolidation_count(&self) -> Result<usize> {
765        let conn = self.conn()?;
766        let mut result = conn.query("MATCH (c:ConsolidationLog) RETURN count(c);")?;
767        match result.next() {
768            Some(row) => match &row[0] {
769                Value::Int64(n) => Ok(*n as usize),
770                _ => Ok(0),
771            },
772            None => Ok(0),
773        }
774    }
775
776    pub fn delete_consolidated_raw(&self) -> Result<usize> {
777        let conn = self.conn()?;
778        let result = conn.query(
779            "MATCH (c:ConsolidationLog) WHERE c.memory_id <> c.distilled_id RETURN c.memory_id;",
780        )?;
781        let mut ids = Vec::new();
782        for row in result {
783            if let Value::String(id) = &row[0]
784                && let Ok(uuid) = Uuid::parse_str(id)
785            {
786                ids.push(uuid);
787            }
788        }
789        let count = ids.len();
790        for id in &ids {
791            let escaped = escape_cypher(&id.to_string());
792            conn.query(&format!(
793                "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
794                escaped
795            ))
796            .ok();
797        }
798        Ok(count)
799    }
800
801    pub fn rebuild_vector_index(&self) -> Result<()> {
802        let conn = self.conn()?;
803        conn.query("CALL DROP_VECTOR_INDEX('Memory', 'memory_emb_idx');")
804            .ok();
805        conn.query(
806            "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
807        )?;
808        Ok(())
809    }
810
811    pub fn clear_ingest_log(&self) -> Result<usize> {
812        let conn = self.conn()?;
813        let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
814        let count = match result.next() {
815            Some(row) => match &row[0] {
816                Value::Int64(n) => *n as usize,
817                _ => 0,
818            },
819            None => 0,
820        };
821        conn.query("MATCH (l:IngestLog) DETACH DELETE l;")?;
822        Ok(count)
823    }
824
825    pub fn delete_memories_by_source(&self, source: &str) -> Result<usize> {
826        let conn = self.conn()?;
827        let escaped = escape_cypher(source);
828        let mut result = conn.query(&format!(
829            "MATCH (m:Memory) WHERE m.source = '{}' RETURN count(m);",
830            escaped
831        ))?;
832        let count = match result.next() {
833            Some(row) => match &row[0] {
834                Value::Int64(n) => *n as usize,
835                _ => 0,
836            },
837            None => 0,
838        };
839        conn.query(&format!(
840            "MATCH (m:Memory) WHERE m.source = '{}' DETACH DELETE m;",
841            escaped
842        ))?;
843        Ok(count)
844    }
845
846    pub fn is_file_ingested(&self, file_path: &str) -> Result<bool> {
847        let conn = self.conn()?;
848        let escaped = escape_cypher(file_path);
849        let mut result = conn.query(&format!(
850            "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_path;",
851            escaped
852        ))?;
853        Ok(result.next().is_some())
854    }
855
856    pub fn is_file_changed(&self, file_path: &str, file_hash: &str) -> Result<bool> {
857        let conn = self.conn()?;
858        let escaped = escape_cypher(file_path);
859        let mut result = conn.query(&format!(
860            "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_hash;",
861            escaped
862        ))?;
863        match result.next() {
864            Some(row) => {
865                let stored_hash = value_to_string(&row[0]);
866                Ok(stored_hash != file_hash)
867            }
868            None => Ok(true),
869        }
870    }
871
872    pub fn mark_ingested(
873        &self,
874        file_path: &str,
875        file_hash: &str,
876        memory_count: usize,
877        source: &str,
878    ) -> Result<()> {
879        let conn = self.conn()?;
880        let path_escaped = escape_cypher(file_path);
881        let hash_escaped = escape_cypher(file_hash);
882        let source_escaped = escape_cypher(source);
883        let now = chrono::Utc::now().to_rfc3339();
884
885        conn.query(&format!(
886            "MERGE (l:IngestLog {{file_path: '{}'}})
887             SET l.file_hash = '{}', l.ingested_at = '{}', l.memory_count = {}, l.source = '{}';",
888            path_escaped, hash_escaped, now, memory_count as i64, source_escaped
889        ))?;
890        Ok(())
891    }
892
893    pub fn ingested_file_count(&self) -> Result<usize> {
894        let conn = self.conn()?;
895        let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
896        match result.next() {
897            Some(row) => match &row[0] {
898                Value::Int64(n) => Ok(*n as usize),
899                _ => Ok(0),
900            },
901            None => Ok(0),
902        }
903    }
904}
905
906impl KuzuStore {
907    fn create_memory_node(&self, conn: &Connection<'_>, memory: &Memory) -> Result<()> {
908        let id = memory.id.to_string();
909        let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
910        let created_at = memory.created_at.to_rfc3339();
911        let last_accessed = memory.last_accessed.to_rfc3339();
912        let embedding_str = format_embedding(&memory.embedding);
913        let content_escaped = escape_cypher(&memory.content);
914        let source_escaped = escape_cypher(&memory.source);
915        let source_id_escaped = escape_cypher(&memory.source_id);
916        let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
917
918        let query = format!(
919            "CREATE (:Memory {{
920                id: '{id}',
921                content: '{content_escaped}',
922                embedding: {embedding_str},
923                memory_type: '{memory_type}',
924                confidence: {confidence},
925                created_at: '{created_at}',
926                last_accessed: '{last_accessed}',
927                access_count: {access_count},
928                source: '{source_escaped}',
929                source_id: '{source_id_escaped}',
930                project_path: '{project_path_escaped}'
931            }});",
932            confidence = memory.confidence,
933            access_count = memory.access_count,
934        );
935
936        conn.query(&query)?;
937        Ok(())
938    }
939
940    fn create_entity_node(&self, conn: &Connection<'_>, entity: &Entity) -> Result<()> {
941        let id = entity.id.to_string();
942        let embedding_str = format_embedding(&entity.embedding);
943        let aliases_str = format_string_array(&entity.aliases);
944        let name_escaped = escape_cypher(&entity.name);
945        let etype_escaped = escape_cypher(&entity.entity_type);
946
947        let query = format!(
948            "CREATE (:Entity {{
949                id: '{id}',
950                name: '{name_escaped}',
951                entity_type: '{etype_escaped}',
952                embedding: {embedding_str},
953                aliases: {aliases_str}
954            }});"
955        );
956
957        conn.query(&query)?;
958        Ok(())
959    }
960
961    fn create_conversation_node(
962        &self,
963        conn: &Connection<'_>,
964        conversation: &Conversation,
965    ) -> Result<()> {
966        let id = conversation.id.to_string();
967        let started_at = conversation.started_at.to_rfc3339();
968        let source_escaped = escape_cypher(&conversation.source);
969        let machine_escaped = escape_cypher(&conversation.machine_id);
970        let project_escaped = escape_cypher(conversation.project_path.as_deref().unwrap_or(""));
971
972        let query = format!(
973            "CREATE (:Conversation {{
974                id: '{id}',
975                source: '{source_escaped}',
976                machine_id: '{machine_escaped}',
977                started_at: '{started_at}',
978                project_path: '{project_escaped}'
979            }});"
980        );
981
982        conn.query(&query)?;
983        Ok(())
984    }
985
986    fn create_relation_edge(&self, conn: &Connection<'_>, relation: &Relation) -> Result<()> {
987        let from_id = relation.from_id.to_string();
988        let to_id = relation.to_id.to_string();
989
990        let query = match relation.relation_type {
991            RelationType::RelatesTo => format!(
992                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:RELATES_TO {{strength: {}, context: '{}'}}]->(b);",
993                from_id,
994                to_id,
995                relation.strength,
996                relation.context.as_deref().unwrap_or("")
997            ),
998            RelationType::Mentions => format!(
999                "MATCH (a:Memory {{id: '{}'}}), (b:Entity {{id: '{}'}}) CREATE (a)-[:MENTIONS {{position: 0}}]->(b);",
1000                from_id, to_id
1001            ),
1002            RelationType::DerivedFrom => format!(
1003                "MATCH (a:Memory {{id: '{}'}}), (b:Conversation {{id: '{}'}}) CREATE (a)-[:DERIVED_FROM {{transformation: '{}'}}]->(b);",
1004                from_id,
1005                to_id,
1006                relation.context.as_deref().unwrap_or("direct")
1007            ),
1008            RelationType::Contradicts => format!(
1009                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:CONTRADICTS {{resolution: '{}'}}]->(b);",
1010                from_id,
1011                to_id,
1012                relation.context.as_deref().unwrap_or("")
1013            ),
1014            RelationType::Reinforces => format!(
1015                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:REINFORCES]->(b);",
1016                from_id, to_id
1017            ),
1018            RelationType::DistilledFrom => format!(
1019                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:DISTILLED_FROM {{model: '{}'}}]->(b);",
1020                from_id,
1021                to_id,
1022                relation.context.as_deref().unwrap_or("")
1023            ),
1024            RelationType::Supersedes => format!(
1025                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:SUPERSEDES {{reason: '{}'}}]->(b);",
1026                from_id,
1027                to_id,
1028                relation.context.as_deref().unwrap_or("")
1029            ),
1030        };
1031
1032        conn.query(&query)?;
1033        Ok(())
1034    }
1035}
1036
1037impl KuzuStore {
1038    pub fn get_conversation(&self, id: Uuid) -> Result<Option<Conversation>> {
1039        let conn = self.conn()?;
1040        let mut result = conn.query(&format!(
1041            "MATCH (c:Conversation {{id: '{}'}}) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1042            id
1043        ))?;
1044
1045        match result.next() {
1046            Some(row) => {
1047                let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1048                let source = value_to_string(&row[1]);
1049                let machine_id = value_to_string(&row[2]);
1050                let started_at_str = value_to_string(&row[3]);
1051                let project_path = {
1052                    let s = value_to_string(&row[4]);
1053                    if s.is_empty() { None } else { Some(s) }
1054                };
1055                let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1056                    .map(|dt| dt.with_timezone(&chrono::Utc))
1057                    .unwrap_or_else(|_| chrono::Utc::now());
1058
1059                Ok(Some(Conversation {
1060                    id,
1061                    source,
1062                    machine_id,
1063                    started_at,
1064                    project_path,
1065                }))
1066            }
1067            None => Ok(None),
1068        }
1069    }
1070
1071    pub fn get_memory_with_embedding(&self, id: Uuid) -> Result<Option<Memory>> {
1072        let conn = self.conn()?;
1073        let query = format!(
1074            "MATCH (m:Memory {{id: '{}'}}) RETURN {}, m.embedding;",
1075            id, memory_return_cols("m")
1076        );
1077
1078        let mut result = conn.query(&query)?;
1079        match result.next() {
1080            Some(row) => {
1081                let mut memory = row_to_memory(&row[..10])?;
1082                memory.embedding = value_to_f32_vec(&row[10]);
1083                Ok(Some(memory))
1084            }
1085            None => Ok(None),
1086        }
1087    }
1088
1089    pub fn import_memory(&self, memory: &Memory) -> Result<()> {
1090        let conn = self.conn()?;
1091        self.create_memory_node(&conn, memory)
1092    }
1093
1094    pub fn import_conversation(&self, conversation: &Conversation) -> Result<()> {
1095        let conn = self.conn()?;
1096        self.create_conversation_node(&conn, conversation)
1097    }
1098
1099    pub fn import_entity(&self, entity: &Entity) -> Result<()> {
1100        let conn = self.conn()?;
1101        self.create_entity_node(&conn, entity)
1102    }
1103
1104    pub fn import_relation(&self, relation: &Relation) -> Result<()> {
1105        let conn = self.conn()?;
1106        self.create_relation_edge(&conn, relation)
1107    }
1108
1109    pub fn import_delete_memory(&self, id: Uuid) -> Result<()> {
1110        let conn = self.conn()?;
1111        conn.query(&format!(
1112            "MATCH (m:Memory {{id: '{}'}}) DETACH DELETE m;",
1113            id
1114        ))?;
1115        Ok(())
1116    }
1117
1118    pub fn import_or_update_memory(&self, memory: &Memory) -> Result<()> {
1119        let conn = self.conn()?;
1120        let id = memory.id.to_string();
1121        let last_accessed = memory.last_accessed.to_rfc3339();
1122        let content_escaped = escape_cypher(&memory.content);
1123        let embedding_str = format_embedding(&memory.embedding);
1124        let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
1125        let source_escaped = escape_cypher(&memory.source);
1126        let source_id_escaped = escape_cypher(&memory.source_id);
1127        let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
1128        let created_at = memory.created_at.to_rfc3339();
1129
1130        let query = format!(
1131            "MATCH (m:Memory {{id: '{id}'}}) SET m.content = '{content_escaped}', m.embedding = {embedding_str}, m.memory_type = '{memory_type}', m.confidence = {confidence}, m.created_at = '{created_at}', m.last_accessed = '{last_accessed}', m.access_count = {access_count}, m.source = '{source_escaped}', m.source_id = '{source_id_escaped}', m.project_path = '{project_path_escaped}';",
1132            confidence = memory.confidence,
1133            access_count = memory.access_count,
1134        );
1135
1136        conn.query(&query)?;
1137        Ok(())
1138    }
1139
1140    pub fn sync_log_since(&self, after_seq: u64) -> Result<Vec<SyncEntry>> {
1141        self.sync_log_page(after_seq, None)
1142    }
1143
1144    pub fn sync_log_page(&self, after_seq: u64, limit: Option<usize>) -> Result<Vec<SyncEntry>> {
1145        let conn = self.conn()?;
1146        let machine_escaped = escape_cypher(&self.machine_id);
1147        let limit_clause = match limit {
1148            Some(n) => format!(" LIMIT {}", n),
1149            None => String::new(),
1150        };
1151        let query = format!(
1152            "MATCH (s:SyncLog) WHERE s.seq > {} AND s.machine_id = '{}' RETURN s.seq, s.op, s.node_type, s.node_id, s.machine_id, s.timestamp, s.data ORDER BY s.seq{};",
1153            after_seq, machine_escaped, limit_clause
1154        );
1155
1156        let mut result = conn.query(&query)?;
1157        let mut entries = Vec::new();
1158
1159        for row in &mut result {
1160            let seq = match &row[0] {
1161                Value::Int64(n) => *n as u64,
1162                _ => 0,
1163            };
1164            let op_str = value_to_string(&row[1]);
1165            let node_type_str = value_to_string(&row[2]);
1166            let node_id = value_to_string(&row[3]);
1167            let machine_id = value_to_string(&row[4]);
1168            let timestamp_str = value_to_string(&row[5]);
1169            let data_str = value_to_string(&row[6]);
1170
1171            let op = match op_str.as_str() {
1172                "create" => SyncOp::Create,
1173                "update" => SyncOp::Update,
1174                "delete" => SyncOp::Delete,
1175                _ => continue,
1176            };
1177            let node_type = match node_type_str.as_str() {
1178                "memory" => SyncNodeType::Memory,
1179                "entity" => SyncNodeType::Entity,
1180                "conversation" => SyncNodeType::Conversation,
1181                "relation" => SyncNodeType::Relation,
1182                _ => continue,
1183            };
1184            let timestamp = chrono::DateTime::parse_from_rfc3339(&timestamp_str)
1185                .map(|dt| dt.with_timezone(&chrono::Utc))
1186                .unwrap_or_else(|_| chrono::Utc::now());
1187
1188            let data = if data_str.is_empty() {
1189                None
1190            } else {
1191                Some(data_str)
1192            };
1193
1194            entries.push(SyncEntry {
1195                seq,
1196                op,
1197                node_type,
1198                node_id,
1199                machine_id,
1200                timestamp,
1201                data,
1202            });
1203        }
1204
1205        Ok(entries)
1206    }
1207
1208    pub fn get_sync_state(&self, peer_id: &str) -> Result<Option<SyncState>> {
1209        let conn = self.conn()?;
1210        let escaped = escape_cypher(peer_id);
1211        let mut result = conn.query(&format!(
1212            "MATCH (s:SyncState {{peer_id: '{}'}}) RETURN s.peer_id, s.last_seq, s.last_sync_at;",
1213            escaped
1214        ))?;
1215
1216        match result.next() {
1217            Some(row) => {
1218                let peer_id = value_to_string(&row[0]);
1219                let last_seq = match &row[1] {
1220                    Value::Int64(n) => *n as u64,
1221                    _ => 0,
1222                };
1223                let last_sync_at_str = value_to_string(&row[2]);
1224                let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1225                    .map(|dt| dt.with_timezone(&chrono::Utc))
1226                    .unwrap_or_else(|_| chrono::Utc::now());
1227
1228                Ok(Some(SyncState {
1229                    peer_id,
1230                    last_seq,
1231                    last_sync_at,
1232                }))
1233            }
1234            None => Ok(None),
1235        }
1236    }
1237
1238    pub fn set_sync_state(&self, state: &SyncState) -> Result<()> {
1239        let conn = self.conn()?;
1240        let peer_escaped = escape_cypher(&state.peer_id);
1241        let now = state.last_sync_at.to_rfc3339();
1242
1243        conn.query(&format!(
1244            "MERGE (s:SyncState {{peer_id: '{}'}}) SET s.last_seq = {}, s.last_sync_at = '{}';",
1245            peer_escaped, state.last_seq, now
1246        ))?;
1247        Ok(())
1248    }
1249
1250    pub fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
1251        let conn = self.conn()?;
1252        let mut result =
1253            conn.query("MATCH (s:SyncState) RETURN s.peer_id, s.last_seq, s.last_sync_at;")?;
1254        let mut states = Vec::new();
1255
1256        for row in &mut result {
1257            let peer_id = value_to_string(&row[0]);
1258            let last_seq = match &row[1] {
1259                Value::Int64(n) => *n as u64,
1260                _ => 0,
1261            };
1262            let last_sync_at_str = value_to_string(&row[2]);
1263            let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1264                .map(|dt| dt.with_timezone(&chrono::Utc))
1265                .unwrap_or_else(|_| chrono::Utc::now());
1266
1267            states.push(SyncState {
1268                peer_id,
1269                last_seq,
1270                last_sync_at,
1271            });
1272        }
1273
1274        Ok(states)
1275    }
1276
1277    pub fn all_entities(&self) -> Result<Vec<Entity>> {
1278        let conn = self.conn()?;
1279        let mut result = conn.query("MATCH (e:Entity) RETURN e.id, e.name, e.entity_type;")?;
1280        let mut entities = Vec::new();
1281        for row in &mut result {
1282            entities.push(row_to_entity(&row)?);
1283        }
1284        Ok(entities)
1285    }
1286
1287    pub fn memories_for_entity(&self, entity_id: Uuid) -> Result<Vec<Memory>> {
1288        let conn = self.conn()?;
1289        let query = format!(
1290            "MATCH (m:Memory)-[:MENTIONS]->(e:Entity {{id: '{}'}}) RETURN {};",
1291            entity_id, memory_return_cols("m")
1292        );
1293        let mut result = conn.query(&query)?;
1294        let mut memories = Vec::new();
1295        for row in &mut result {
1296            memories.push(row_to_memory(&row)?);
1297        }
1298        Ok(memories)
1299    }
1300
1301    pub fn unassociated_memories(&self) -> Result<Vec<Memory>> {
1302        let conn = self.conn()?;
1303        let mut result = conn.query(
1304            &format!("MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (m)-[:MENTIONS]->(:Entity)}} RETURN {};", memory_return_cols("m"))
1305        )?;
1306        let mut memories = Vec::new();
1307        for row in &mut result {
1308            memories.push(row_to_memory(&row)?);
1309        }
1310        Ok(memories)
1311    }
1312
1313    pub fn related_entity_names(&self, entity_id: Uuid) -> Result<Vec<String>> {
1314        let conn = self.conn()?;
1315        let query = format!(
1316            "MATCH (m:Memory)-[:MENTIONS]->(e1:Entity {{id: '{}'}}), (m)-[:MENTIONS]->(e2:Entity) WHERE e2.id <> '{}' RETURN DISTINCT e2.name;",
1317            entity_id, entity_id
1318        );
1319        let mut result = conn.query(&query)?;
1320        let mut names = Vec::new();
1321        for row in &mut result {
1322            names.push(value_to_string(&row[0]));
1323        }
1324        Ok(names)
1325    }
1326
1327    pub fn max_sync_seq(&self) -> Result<u64> {
1328        let conn = self.conn()?;
1329        let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.seq);")?;
1330        match result.next() {
1331            Some(row) => match &row[0] {
1332                Value::Int64(n) => Ok(*n as u64),
1333                _ => Ok(0),
1334            },
1335            None => Ok(0),
1336        }
1337    }
1338
1339    pub fn backfill_project_paths(&self) -> Result<u64> {
1340        let conn = self.conn()?;
1341        let mut result = conn.query(
1342            "MATCH (m:Memory)-[:DERIVED_FROM]->(c:Conversation) WHERE m.project_path = '' AND c.project_path IS NOT NULL AND c.project_path <> '' SET m.project_path = c.project_path RETURN count(m);"
1343        )?;
1344        match result.next() {
1345            Some(row) => match &row[0] {
1346                Value::Int64(n) => Ok(*n as u64),
1347                _ => Ok(0),
1348            },
1349            None => Ok(0),
1350        }
1351    }
1352
1353    pub fn memories_by_project_path(&self, project_path: &str) -> Result<Vec<Memory>> {
1354        let conn = self.conn()?;
1355        let escaped = escape_cypher(project_path);
1356        let cols = memory_return_cols("m");
1357        let query = format!(
1358            "MATCH (m:Memory) WHERE m.project_path = '{}' RETURN {} LIMIT 20;",
1359            escaped, cols
1360        );
1361        let mut result = conn.query(&query)?;
1362        let mut memories = Vec::new();
1363        for row in &mut result {
1364            memories.push(row_to_memory(&row)?);
1365        }
1366        Ok(memories)
1367    }
1368
1369    pub fn backfill_sync_log(&self) -> Result<u64> {
1370        let conn = self.conn()?;
1371        let mut count = 0u64;
1372
1373        let mut result = conn.query(
1374            &format!("MATCH (m:Memory) RETURN {}, m.embedding;", memory_return_cols("m")),
1375        )?;
1376        for row in &mut result {
1377            let mut memory = row_to_memory(&row[..10])?;
1378            memory.embedding = value_to_f32_vec(&row[10]);
1379            let data = serde_json::to_string(&memory).ok();
1380            self.log_sync_entry(
1381                &conn,
1382                SyncOp::Create,
1383                SyncNodeType::Memory,
1384                &memory.id.to_string(),
1385                data.as_deref(),
1386            )?;
1387            count += 1;
1388        }
1389
1390        let mut result = conn.query(
1391            "MATCH (c:Conversation) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1392        )?;
1393        for row in &mut result {
1394            let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1395            let source = value_to_string(&row[1]);
1396            let machine_id = value_to_string(&row[2]);
1397            let started_at_str = value_to_string(&row[3]);
1398            let project_path = {
1399                let s = value_to_string(&row[4]);
1400                if s.is_empty() { None } else { Some(s) }
1401            };
1402            let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1403                .map(|dt| dt.with_timezone(&chrono::Utc))
1404                .unwrap_or_else(|_| chrono::Utc::now());
1405
1406            let conv = Conversation {
1407                id,
1408                source,
1409                machine_id,
1410                started_at,
1411                project_path,
1412            };
1413            let data = serde_json::to_string(&conv).ok();
1414            self.log_sync_entry(
1415                &conn,
1416                SyncOp::Create,
1417                SyncNodeType::Conversation,
1418                &conv.id.to_string(),
1419                data.as_deref(),
1420            )?;
1421            count += 1;
1422        }
1423
1424        Ok(count)
1425    }
1426}
1427
1428fn format_embedding(embedding: &[f32]) -> String {
1429    if embedding.is_empty() {
1430        let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
1431        return format!("[{}]", zeros.join(","));
1432    }
1433    let parts: Vec<String> = embedding.iter().map(|v| format!("{}", v)).collect();
1434    format!("[{}]", parts.join(","))
1435}
1436
1437fn escape_cypher(s: &str) -> String {
1438    s.replace('\\', "\\\\").replace('\'', "\\'")
1439}
1440
1441fn format_string_array(items: &[String]) -> String {
1442    let parts: Vec<String> = items
1443        .iter()
1444        .map(|s| format!("'{}'", s.replace('\'', "''")))
1445        .collect();
1446    format!("[{}]", parts.join(","))
1447}
1448
1449fn value_to_string(val: &Value) -> String {
1450    match val {
1451        Value::String(s) => s.clone(),
1452        other => format!("{:?}", other),
1453    }
1454}
1455
1456fn value_to_f32(val: &Value) -> f32 {
1457    match val {
1458        Value::Float(f) => *f,
1459        Value::Double(d) => *d as f32,
1460        Value::Int64(i) => *i as f32,
1461        _ => 0.0,
1462    }
1463}
1464
1465fn value_to_f32_vec(val: &Value) -> Vec<f32> {
1466    match val {
1467        Value::Array(_, items) | Value::List(_, items) => items.iter().map(value_to_f32).collect(),
1468        _ => Vec::new(),
1469    }
1470}
1471
1472fn row_to_memory(row: &[Value]) -> Result<Memory> {
1473    let id_str = value_to_string(&row[0]);
1474    let id = Uuid::parse_str(&id_str).unwrap_or_default();
1475    let content = value_to_string(&row[1]);
1476    let memory_type_str = value_to_string(&row[2]);
1477    let confidence = value_to_f32(&row[3]);
1478    let created_at_str = value_to_string(&row[4]);
1479    let last_accessed_str = value_to_string(&row[5]);
1480    let access_count = match &row[6] {
1481        Value::Int64(i) => *i as u32,
1482        _ => 0,
1483    };
1484    let source = value_to_string(&row[7]);
1485    let source_id = value_to_string(&row[8]);
1486    let project_path = project_path_from_db(&value_to_string(&row[9]));
1487
1488    let memory_type = match memory_type_str.as_str() {
1489        "episodic" => MemoryType::Episodic,
1490        "procedural" => MemoryType::Procedural,
1491        "decision" => MemoryType::Decision,
1492        "architecture" => MemoryType::Architecture,
1493        "debugging" => MemoryType::Debugging,
1494        "task" => MemoryType::Task,
1495        "question" => MemoryType::Question,
1496        _ => MemoryType::Semantic,
1497    };
1498
1499    let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
1500        .map(|dt| dt.with_timezone(&chrono::Utc))
1501        .unwrap_or_else(|_| chrono::Utc::now());
1502
1503    let last_accessed = chrono::DateTime::parse_from_rfc3339(&last_accessed_str)
1504        .map(|dt| dt.with_timezone(&chrono::Utc))
1505        .unwrap_or_else(|_| chrono::Utc::now());
1506
1507    Ok(Memory {
1508        id,
1509        content,
1510        embedding: Vec::new(),
1511        memory_type,
1512        confidence,
1513        created_at,
1514        last_accessed,
1515        access_count,
1516        source,
1517        source_id,
1518        project_path,
1519    })
1520}
1521
1522fn row_to_entity(row: &[Value]) -> Result<Entity> {
1523    let id_str = value_to_string(&row[0]);
1524    let id = Uuid::parse_str(&id_str).unwrap_or_default();
1525    let name = value_to_string(&row[1]);
1526    let entity_type = value_to_string(&row[2]);
1527
1528    Ok(Entity {
1529        id,
1530        name,
1531        entity_type,
1532        embedding: Vec::new(),
1533        aliases: Vec::new(),
1534    })
1535}
1536
1537unsafe impl Send for KuzuStore {}
1538unsafe impl Sync for KuzuStore {}