Skip to main content

second_brain_core/
kuzu_store.rs

1use std::path::Path;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use kuzu::{Connection, Database, SystemConfig, Value};
6use uuid::Uuid;
7
8use crate::schema::{
9    Conversation, Entity, Memory, MemoryType, Relation, RelationType, SyncEntry, SyncNodeType,
10    SyncOp, SyncState,
11};
12use crate::store::Store;
13
14pub enum ApplyOutcome {
15    Created,
16    Updated,
17    Deleted,
18    Skipped,
19}
20
21pub struct KuzuStore {
22    db: Database,
23    machine_id: String,
24}
25
26fn memory_return_cols(alias: &str) -> String {
27    ["id", "content", "memory_type", "confidence", "created_at", "last_accessed", "access_count", "source", "source_id", "project_path", "machine_id", "updated_at"]
28        .iter()
29        .map(|col| format!("{alias}.{col}"))
30        .collect::<Vec<_>>()
31        .join(", ")
32}
33
34fn project_path_from_db(s: &str) -> Option<String> {
35    if s.is_empty() { None } else { Some(s.to_string()) }
36}
37
38impl KuzuStore {
39    pub fn open(path: &Path, machine_id: String) -> Result<Self> {
40        let db = Database::new(path, SystemConfig::default()).context("opening KùzuDB database")?;
41        let store = Self { db, machine_id };
42        store.init_schema()?;
43        Ok(store)
44    }
45
46    pub fn in_memory(machine_id: String) -> Result<Self> {
47        let db =
48            Database::in_memory(SystemConfig::default()).context("creating in-memory KùzuDB")?;
49        let store = Self { db, machine_id };
50        store.init_schema()?;
51        Ok(store)
52    }
53
54    pub fn machine_id(&self) -> &str {
55        &self.machine_id
56    }
57
58    fn conn(&self) -> Result<Connection<'_>> {
59        Connection::new(&self.db).context("creating connection")
60    }
61
62    pub fn diagnostic(&self) -> Result<String> {
63        let conn = self.conn()?;
64
65        let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
66        let total: i64 = result
67            .next()
68            .map(|r| match &r[0] {
69                Value::Int64(v) => *v,
70                _ => -1,
71            })
72            .unwrap_or(-1);
73
74        let mut result = conn.query(
75            "MATCH (m:Memory) WHERE size(m.embedding) > 0 AND m.embedding[1] <> 0.0 RETURN count(m);",
76        )?;
77        let with_emb: i64 = result
78            .next()
79            .map(|r| match &r[0] {
80                Value::Int64(v) => *v,
81                _ => -1,
82            })
83            .unwrap_or(-1);
84
85        let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
86        let zeros_str = format!("[{}]", zeros.join(","));
87        let idx_query = format!(
88            "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, 1) YIELD node, distance RETURN distance;",
89            zeros_str
90        );
91        let idx_status = match conn.query(&idx_query) {
92            Ok(mut r) => match r.next() {
93                Some(row) => format!("ok (distance: {:?})", &row[0]),
94                None => "ok (0 results)".to_string(),
95            },
96            Err(e) => format!("error: {e}"),
97        };
98
99        Ok(format!(
100            "total memories: {total}\nwith embeddings: {with_emb}\nvector index: {idx_status}"
101        ))
102    }
103
104    fn init_schema(&self) -> Result<()> {
105        let conn = self.conn()?;
106
107        conn.query(
108            "CREATE NODE TABLE IF NOT EXISTS Memory(
109                id STRING PRIMARY KEY,
110                content STRING,
111                embedding FLOAT[384],
112                memory_type STRING,
113                confidence FLOAT,
114                created_at STRING,
115                last_accessed STRING,
116                access_count INT64,
117                source STRING,
118                source_id STRING,
119                project_path STRING
120            );",
121        )
122        .context("creating Memory table")?;
123
124        conn.query("ALTER TABLE Memory ADD project_path STRING DEFAULT '';").ok();
125
126        conn.query(
127            "CREATE NODE TABLE IF NOT EXISTS Machine(
128                id STRING PRIMARY KEY,
129                name STRING
130            );",
131        )
132        .context("creating Machine table")?;
133
134        conn.query("ALTER TABLE Memory ADD machine_id STRING DEFAULT '';").ok();
135
136        conn.query("ALTER TABLE Memory ADD updated_at STRING DEFAULT '';").ok();
137        conn.query("MATCH (m:Memory) WHERE m.updated_at = '' SET m.updated_at = m.created_at;").ok();
138
139        conn.query(
140            "CREATE NODE TABLE IF NOT EXISTS Entity(
141                id STRING PRIMARY KEY,
142                name STRING,
143                entity_type STRING,
144                embedding FLOAT[384],
145                aliases STRING[]
146            );",
147        )
148        .context("creating Entity table")?;
149
150        conn.query(
151            "CREATE NODE TABLE IF NOT EXISTS Conversation(
152                id STRING PRIMARY KEY,
153                source STRING,
154                machine_id STRING,
155                started_at STRING,
156                project_path STRING
157            );",
158        )
159        .context("creating Conversation table")?;
160
161        conn.query(
162            "CREATE NODE TABLE IF NOT EXISTS IngestLog(
163                file_path STRING PRIMARY KEY,
164                file_hash STRING,
165                ingested_at STRING,
166                memory_count INT64,
167                source STRING
168            );",
169        )
170        .context("creating IngestLog table")?;
171
172        conn.query(
173            "CREATE REL TABLE IF NOT EXISTS RELATES_TO(
174                FROM Memory TO Memory,
175                strength FLOAT,
176                context STRING
177            );",
178        )
179        .context("creating RELATES_TO rel")?;
180
181        conn.query(
182            "CREATE REL TABLE IF NOT EXISTS MENTIONS(
183                FROM Memory TO Entity,
184                position INT64
185            );",
186        )
187        .context("creating MENTIONS rel")?;
188
189        conn.query(
190            "CREATE REL TABLE IF NOT EXISTS DERIVED_FROM(
191                FROM Memory TO Conversation,
192                transformation STRING
193            );",
194        )
195        .context("creating DERIVED_FROM rel")?;
196
197        conn.query(
198            "CREATE REL TABLE IF NOT EXISTS DISTILLED_FROM(
199                FROM Memory TO Memory,
200                model STRING
201            );",
202        )
203        .context("creating DISTILLED_FROM rel")?;
204
205        conn.query(
206            "CREATE REL TABLE IF NOT EXISTS CONTRADICTS(
207                FROM Memory TO Memory,
208                resolution STRING
209            );",
210        )
211        .context("creating CONTRADICTS rel")?;
212
213        conn.query(
214            "CREATE REL TABLE IF NOT EXISTS REINFORCES(
215                FROM Memory TO Memory
216            );",
217        )
218        .context("creating REINFORCES rel")?;
219
220        conn.query(
221            "CREATE REL TABLE IF NOT EXISTS SUPERSEDES(
222                FROM Memory TO Memory,
223                reason STRING
224            );",
225        )
226        .context("creating SUPERSEDES rel")?;
227
228        self.migrate_sync_log(&conn).context("migrating SyncLog table")?;
229
230        conn.query(
231            "CREATE NODE TABLE IF NOT EXISTS SyncState(
232                peer_id STRING PRIMARY KEY,
233                last_seq INT64,
234                last_sync_at STRING
235            );",
236        )
237        .context("creating SyncState table")?;
238
239        conn.query(
240            "CREATE NODE TABLE IF NOT EXISTS ConsolidationLog(
241                memory_id STRING PRIMARY KEY,
242                distilled_id STRING,
243                consolidated_at STRING,
244                model STRING
245            );",
246        )
247        .context("creating ConsolidationLog table")?;
248
249        conn.query(
250            "CREATE NODE TABLE IF NOT EXISTS WikiExportLog(
251                id STRING PRIMARY KEY,
252                last_sync_seq INT64,
253                exported_at STRING,
254                vault_path STRING,
255                pages_created INT64,
256                pages_updated INT64,
257                memories_processed INT64
258            );",
259        )
260        .context("creating WikiExportLog table")?;
261
262        conn.query(
263            "CREATE NODE TABLE IF NOT EXISTS Tombstone(
264                node_id STRING PRIMARY KEY,
265                node_type STRING,
266                deleted_at STRING,
267                machine_id STRING);",
268        )
269        .context("creating Tombstone table")?;
270
271        conn.query(
272            "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
273        )
274        .ok();
275
276        Ok(())
277    }
278
279    fn migrate_sync_log(&self, conn: &Connection<'_>) -> Result<()> {
280        // Probe by column: a missing column or a missing table makes the query
281        // fail, so on a fresh DB both probes fail and control falls through to
282        // the CREATE below. We drop only when the pre-redesign `seq` schema is
283        // the one actually present.
284        let new_schema_present = conn
285            .query("MATCH (s:SyncLog) RETURN s.id LIMIT 1;")
286            .is_ok();
287        if !new_schema_present {
288            let old_schema_present = conn
289                .query("MATCH (s:SyncLog) RETURN s.seq LIMIT 1;")
290                .is_ok();
291            if old_schema_present {
292                conn.query("DROP TABLE SyncLog;")?;
293                conn.query("MATCH (s:SyncState) SET s.last_seq = 0;").ok();
294            }
295        }
296        conn.query(
297            "CREATE NODE TABLE IF NOT EXISTS SyncLog(
298                id STRING PRIMARY KEY,
299                local_seq INT64,
300                origin_machine_id STRING,
301                origin_seq INT64,
302                op STRING,
303                node_type STRING,
304                node_id STRING,
305                timestamp STRING,
306                data STRING
307            );",
308        )?;
309        Ok(())
310    }
311
312    fn with_transaction<T>(&self, f: impl FnOnce(&Connection<'_>) -> Result<T>) -> Result<T> {
313        let conn = self.conn()?;
314        conn.query("BEGIN TRANSACTION;")?;
315        match f(&conn) {
316            Ok(v) => {
317                conn.query("COMMIT;")?;
318                Ok(v)
319            }
320            Err(e) => {
321                let _ = conn.query("ROLLBACK;");
322                Err(e)
323            }
324        }
325    }
326
327    fn append_sync_log(
328        &self,
329        conn: &Connection<'_>,
330        op: SyncOp,
331        node_type: SyncNodeType,
332        node_id: &str,
333        data: Option<&str>,
334        origin_machine_id: &str,
335        origin_seq: Option<i64>,
336    ) -> Result<()> {
337        let mut r = conn.query("MATCH (s:SyncLog) RETURN max(s.local_seq);")?;
338        let local_seq: i64 = match r.next() {
339            Some(row) => match &row[0] {
340                Value::Int64(n) => n + 1,
341                _ => 1,
342            },
343            None => 1,
344        };
345        let origin_seq = origin_seq.unwrap_or(local_seq);
346        let id = format!("{}:{}", origin_machine_id, origin_seq);
347        let id_esc = escape_cypher(&id);
348        let mut dup = conn.query(&format!(
349            "MATCH (s:SyncLog {{id: '{id_esc}'}}) RETURN s.id LIMIT 1;"
350        ))?;
351        if dup.next().is_some() {
352            return Ok(());
353        }
354        let timestamp = chrono::Utc::now().to_rfc3339();
355        let op_str = match op {
356            SyncOp::Create => "create",
357            SyncOp::Update => "update",
358            SyncOp::Delete => "delete",
359        };
360        let nt_str = match node_type {
361            SyncNodeType::Memory => "memory",
362            SyncNodeType::Entity => "entity",
363            SyncNodeType::Conversation => "conversation",
364            SyncNodeType::Relation => "relation",
365        };
366        let origin_machine_esc = escape_cypher(origin_machine_id);
367        let node_id_esc = escape_cypher(node_id);
368        let data_esc = data.map(escape_cypher).unwrap_or_default();
369        conn.query(&format!(
370            "CREATE (:SyncLog {{id:'{id_esc}', local_seq:{local_seq}, \
371             origin_machine_id:'{origin_machine_esc}', origin_seq:{origin_seq}, \
372             op:'{op_str}', node_type:'{nt_str}', node_id:'{node_id_esc}', \
373             timestamp:'{timestamp}', data:'{data_esc}'}});"
374        ))?;
375        Ok(())
376    }
377
378    pub fn tombstone_exists(&self, id: Uuid) -> Result<bool> {
379        let conn = self.conn()?;
380        let id_esc = escape_cypher(&id.to_string());
381        let mut result = conn.query(&format!(
382            "MATCH (t:Tombstone {{node_id: '{id_esc}'}}) RETURN t.node_id LIMIT 1;"
383        ))?;
384        Ok(result.next().is_some())
385    }
386
387    fn upsert_tombstone(
388        &self,
389        conn: &Connection<'_>,
390        node_id: &str,
391        node_type: &str,
392        machine_id: &str,
393    ) -> Result<()> {
394        let node_id_esc = escape_cypher(node_id);
395        let node_type_esc = escape_cypher(node_type);
396        let machine_id_esc = escape_cypher(machine_id);
397        let deleted_at = chrono::Utc::now().to_rfc3339();
398        conn.query(&format!(
399            "MERGE (t:Tombstone {{node_id: '{node_id_esc}'}}) \
400             SET t.node_type = '{node_type_esc}', \
401                 t.deleted_at = '{deleted_at}', \
402                 t.machine_id = '{machine_id_esc}';"
403        ))?;
404        Ok(())
405    }
406}
407
408impl Store for KuzuStore {
409    fn store_memory(&self, memory: &Memory) -> Result<()> {
410        let data = serde_json::to_string(memory).ok();
411        self.with_transaction(|conn| {
412            self.create_memory_node(conn, memory)?;
413            self.append_sync_log(
414                conn,
415                SyncOp::Create,
416                SyncNodeType::Memory,
417                &memory.id.to_string(),
418                data.as_deref(),
419                &self.machine_id,
420                None,
421            )
422        })
423    }
424
425    fn get_memory(&self, id: Uuid) -> Result<Option<Memory>> {
426        let conn = self.conn()?;
427        let query = format!(
428            "MATCH (m:Memory {{id: '{}'}}) RETURN {};",
429            id, memory_return_cols("m")
430        );
431
432        let mut result = conn.query(&query)?;
433        match result.next() {
434            Some(row) => Ok(Some(row_to_memory(&row)?)),
435            None => Ok(None),
436        }
437    }
438
439    fn delete_memory(&self, id: Uuid) -> Result<()> {
440        let id_string = id.to_string();
441        self.with_transaction(|conn| {
442            conn.query(&format!(
443                "MATCH (m:Memory {{id: '{id_string}'}}) DETACH DELETE m;"
444            ))?;
445            self.upsert_tombstone(conn, &id_string, "memory", &self.machine_id)?;
446            self.append_sync_log(
447                conn,
448                SyncOp::Delete,
449                SyncNodeType::Memory,
450                &id_string,
451                None,
452                &self.machine_id,
453                None,
454            )
455        })
456    }
457
458    fn store_entity(&self, entity: &Entity) -> Result<()> {
459        let data = serde_json::to_string(entity).ok();
460        self.with_transaction(|conn| {
461            self.create_entity_node(conn, entity)?;
462            self.append_sync_log(
463                conn,
464                SyncOp::Create,
465                SyncNodeType::Entity,
466                &entity.id.to_string(),
467                data.as_deref(),
468                &self.machine_id,
469                None,
470            )
471        })
472    }
473
474    fn get_entity(&self, id: Uuid) -> Result<Option<Entity>> {
475        let conn = self.conn()?;
476        let mut result = conn.query(&format!(
477            "MATCH (e:Entity {{id: '{}'}}) RETURN e.id, e.name, e.entity_type;",
478            id
479        ))?;
480
481        match result.next() {
482            Some(row) => Ok(Some(row_to_entity(&row)?)),
483            None => Ok(None),
484        }
485    }
486
487    fn find_entity_by_name(&self, name: &str) -> Result<Option<Entity>> {
488        let conn = self.conn()?;
489        let name_escaped = escape_cypher(name);
490        let query = format!(
491            "MATCH (e:Entity) WHERE e.name = '{}' RETURN e.id, e.name, e.entity_type;",
492            name_escaped
493        );
494        let mut result = conn.query(&query)?;
495
496        match result.next() {
497            Some(row) => Ok(Some(row_to_entity(&row)?)),
498            None => Ok(None),
499        }
500    }
501
502    fn store_conversation(&self, conversation: &Conversation) -> Result<()> {
503        let data = serde_json::to_string(conversation).ok();
504        self.with_transaction(|conn| {
505            self.create_conversation_node(conn, conversation)?;
506            self.append_sync_log(
507                conn,
508                SyncOp::Create,
509                SyncNodeType::Conversation,
510                &conversation.id.to_string(),
511                data.as_deref(),
512                &self.machine_id,
513                None,
514            )
515        })
516    }
517
518    fn store_relation(&self, relation: &Relation) -> Result<()> {
519        let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
520        let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
521        let data = serde_json::to_string(relation).ok();
522        self.with_transaction(|conn| {
523            self.create_relation_edge(conn, relation)?;
524            self.append_sync_log(
525                conn,
526                SyncOp::Create,
527                SyncNodeType::Relation,
528                &node_id,
529                data.as_deref(),
530                &self.machine_id,
531                None,
532            )
533        })
534    }
535
536    fn get_relations(
537        &self,
538        node_id: Uuid,
539        relation_type: Option<RelationType>,
540    ) -> Result<Vec<Relation>> {
541        let conn = self.conn()?;
542        let id = node_id.to_string();
543
544        let query = match relation_type {
545            Some(RelationType::RelatesTo) => format!(
546                "MATCH (a:Memory {{id: '{}'}})-[r:RELATES_TO]->(b:Memory) RETURN b.id, r.strength, r.context;",
547                id
548            ),
549            Some(RelationType::Contradicts) => format!(
550                "MATCH (a:Memory {{id: '{}'}})-[r:CONTRADICTS]->(b:Memory) RETURN b.id, r.resolution;",
551                id
552            ),
553            Some(RelationType::Reinforces) => format!(
554                "MATCH (a:Memory {{id: '{}'}})-[:REINFORCES]->(b:Memory) RETURN b.id;",
555                id
556            ),
557            Some(RelationType::Supersedes) => format!(
558                "MATCH (a:Memory {{id: '{}'}})-[r:SUPERSEDES]->(b:Memory) RETURN b.id, r.reason;",
559                id
560            ),
561            Some(RelationType::Mentions) => format!(
562                "MATCH (a:Memory {{id: '{}'}})-[:MENTIONS]->(b:Entity) RETURN b.id;",
563                id
564            ),
565            Some(RelationType::DerivedFrom) => format!(
566                "MATCH (a:Memory {{id: '{}'}})-[:DERIVED_FROM]->(b:Conversation) RETURN b.id;",
567                id
568            ),
569            Some(RelationType::DistilledFrom) => format!(
570                "MATCH (a:Memory {{id: '{}'}})-[:DISTILLED_FROM]->(b:Memory) RETURN b.id;",
571                id
572            ),
573            None => {
574                let mut relations = Vec::new();
575                let mem_to_mem = [
576                    ("RELATES_TO", RelationType::RelatesTo),
577                    ("REINFORCES", RelationType::Reinforces),
578                    ("CONTRADICTS", RelationType::Contradicts),
579                    ("SUPERSEDES", RelationType::Supersedes),
580                    ("DISTILLED_FROM", RelationType::DistilledFrom),
581                ];
582                for (label, rt) in &mem_to_mem {
583                    let q = format!(
584                        "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:Memory) RETURN b.id;",
585                        id, label
586                    );
587                    if let Ok(mut result) = conn.query(&q) {
588                        for row in &mut result {
589                            let to_id_str = value_to_string(&row[0]);
590                            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
591                            relations.push(Relation {
592                                from_id: node_id,
593                                to_id,
594                                relation_type: *rt,
595                                strength: 1.0,
596                                context: None,
597                            });
598                        }
599                    }
600                }
601                for (label, target, rt) in [
602                    ("MENTIONS", "Entity", RelationType::Mentions),
603                    ("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
604                ] {
605                    let q = format!(
606                        "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:{}) RETURN b.id;",
607                        id, label, target
608                    );
609                    if let Ok(mut result) = conn.query(&q) {
610                        for row in &mut result {
611                            let to_id_str = value_to_string(&row[0]);
612                            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
613                            relations.push(Relation {
614                                from_id: node_id,
615                                to_id,
616                                relation_type: rt,
617                                strength: 1.0,
618                                context: None,
619                            });
620                        }
621                    }
622                }
623                return Ok(relations);
624            }
625        };
626
627        let mut result = conn.query(&query)?;
628        let mut relations = Vec::new();
629
630        for row in &mut result {
631            let to_id_str = value_to_string(&row[0]);
632            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
633
634            relations.push(Relation {
635                from_id: node_id,
636                to_id,
637                relation_type: relation_type.unwrap_or(RelationType::RelatesTo),
638                strength: 1.0,
639                context: None,
640            });
641        }
642
643        Ok(relations)
644    }
645
646    fn vector_search(&self, embedding: &[f32], limit: usize) -> Result<Vec<(Memory, f32)>> {
647        let conn = self.conn()?;
648        let embedding_str = format_embedding(embedding);
649
650        let query = format!(
651            "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, {}) YIELD node, distance
652             RETURN {}, distance;",
653            embedding_str, limit, memory_return_cols("node")
654        );
655
656        let mut result = conn.query(&query)?;
657        let mut results = Vec::new();
658
659        for row in &mut result {
660            let memory = row_to_memory(&row[..12])?;
661            let distance = value_to_f32(&row[12]);
662            let similarity = 1.0 - distance;
663            results.push((memory, similarity));
664        }
665
666        Ok(results)
667    }
668
669    fn traverse(&self, start_id: Uuid, depth: u32) -> Result<Vec<(Memory, Vec<Relation>)>> {
670        let conn = self.conn()?;
671        let query = format!(
672            "MATCH (start:Memory {{id: '{}'}})-[r:RELATES_TO|REINFORCES*1..{}]->(m:Memory)
673             RETURN DISTINCT {};",
674            start_id, depth, memory_return_cols("m")
675        );
676
677        let mut result = conn.query(&query)?;
678        let mut results = Vec::new();
679
680        for row in &mut result {
681            let memory = row_to_memory(&row)?;
682            results.push((memory, Vec::new()));
683        }
684
685        Ok(results)
686    }
687
688    fn memories_by_source(&self, source: &str) -> Result<Vec<Memory>> {
689        let conn = self.conn()?;
690        let source_escaped = escape_cypher(source);
691        let query = format!(
692            "MATCH (m:Memory) WHERE m.source = '{}' RETURN {};",
693            source_escaped, memory_return_cols("m")
694        );
695        let mut result = conn.query(&query)?;
696
697        let mut memories = Vec::new();
698        for row in &mut result {
699            memories.push(row_to_memory(&row)?);
700        }
701        Ok(memories)
702    }
703
704    fn memories_by_type(&self, memory_type: MemoryType) -> Result<Vec<Memory>> {
705        let conn = self.conn()?;
706        let type_str = format!("{:?}", memory_type).to_lowercase();
707        let query = format!(
708            "MATCH (m:Memory) WHERE m.memory_type = '{}' RETURN {};",
709            type_str, memory_return_cols("m")
710        );
711        let mut result = conn.query(&query)?;
712
713        let mut memories = Vec::new();
714        for row in &mut result {
715            memories.push(row_to_memory(&row)?);
716        }
717        Ok(memories)
718    }
719
720    fn memories_needing_decay(&self, threshold_days: u32) -> Result<Vec<Memory>> {
721        let conn = self.conn()?;
722        let cutoff = chrono::Utc::now() - chrono::Duration::days(threshold_days as i64);
723        let cutoff_str = cutoff.to_rfc3339();
724
725        let query = format!(
726            "MATCH (m:Memory) WHERE m.last_accessed < '{}' AND m.confidence > 0.05 RETURN {};",
727            cutoff_str, memory_return_cols("m")
728        );
729
730        let mut result = conn.query(&query)?;
731        let mut memories = Vec::new();
732        for row in &mut result {
733            memories.push(row_to_memory(&row)?);
734        }
735        Ok(memories)
736    }
737
738    fn update_memory(&self, memory: &Memory) -> Result<()> {
739        let id = memory.id.to_string();
740        let last_accessed = memory.last_accessed.to_rfc3339();
741        let updated_at = memory.updated_at.to_rfc3339();
742        let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
743        let machine_id_escaped = escape_cypher(&memory.machine_id);
744        let data = serde_json::to_string(memory).ok();
745        self.with_transaction(|conn| {
746            let query = format!(
747                "MATCH (m:Memory {{id: '{}'}}) SET m.confidence = {}, m.last_accessed = '{}', m.access_count = {}, m.project_path = '{}', m.updated_at = '{}', m.machine_id = '{}';",
748                id, memory.confidence, last_accessed, memory.access_count, project_path_escaped, updated_at, machine_id_escaped
749            );
750            conn.query(&query)?;
751            self.append_sync_log(
752                conn,
753                SyncOp::Update,
754                SyncNodeType::Memory,
755                &id,
756                data.as_deref(),
757                &self.machine_id,
758                None,
759            )
760        })
761    }
762
763    fn record_access(&self, memory: &Memory) -> Result<()> {
764        let id_esc = escape_cypher(&memory.id.to_string());
765        let last_accessed = memory.last_accessed.to_rfc3339();
766        let last_accessed_esc = escape_cypher(&last_accessed);
767        let conn = self.conn()?;
768        conn.query(&format!(
769            "MATCH (m:Memory {{id: '{id_esc}'}}) SET \
770             m.last_accessed = '{last_accessed_esc}', \
771             m.access_count = {}, \
772             m.confidence = {};",
773            memory.access_count, memory.confidence
774        ))?;
775        Ok(())
776    }
777
778    fn text_search(&self, query: &str, limit: usize) -> Result<Vec<Memory>> {
779        let conn = self.conn()?;
780        let query_escaped = escape_cypher(&query.to_lowercase());
781        let cypher = format!(
782            "MATCH (m:Memory) WHERE lower(m.content) CONTAINS '{}' RETURN {} LIMIT {};",
783            query_escaped, memory_return_cols("m"), limit
784        );
785
786        let mut result = conn.query(&cypher)?;
787        let mut memories = Vec::new();
788        for row in &mut result {
789            memories.push(row_to_memory(&row)?);
790        }
791        Ok(memories)
792    }
793
794    fn memory_count(&self) -> Result<usize> {
795        let conn = self.conn()?;
796        let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
797        match result.next() {
798            Some(row) => match &row[0] {
799                Value::Int64(n) => Ok(*n as usize),
800                _ => Ok(0),
801            },
802            None => Ok(0),
803        }
804    }
805}
806
807impl KuzuStore {
808    pub fn find_or_create_entity(&self, name: &str, entity_type: &str) -> Result<Entity> {
809        if let Some(existing) = self.find_entity_by_name(name)? {
810            return Ok(existing);
811        }
812        let entity = Entity::new(name.to_string(), entity_type.to_string());
813        self.store_entity(&entity)?;
814        Ok(entity)
815    }
816
817    pub fn memory_content_exists(&self, content_prefix: &str) -> Result<bool> {
818        let conn = self.conn()?;
819        let escaped = escape_cypher(content_prefix);
820        let query = format!(
821            "MATCH (m:Memory) WHERE starts_with(m.content, '{}') RETURN m.id LIMIT 1;",
822            escaped
823        );
824        let mut result = conn.query(&query)?;
825        Ok(result.next().is_some())
826    }
827
828    pub fn unconsolidated_memories(&self, limit: usize) -> Result<Vec<Memory>> {
829        let conn = self.conn()?;
830        let query = format!(
831            "MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (c:ConsolidationLog) WHERE c.memory_id = m.id}} AND m.source <> 'consolidation' RETURN {} LIMIT {};",
832            memory_return_cols("m"), limit
833        );
834        let mut result = conn.query(&query)?;
835        let mut memories = Vec::new();
836        for row in &mut result {
837            memories.push(row_to_memory(&row)?);
838        }
839        Ok(memories)
840    }
841
842    pub fn memories_created_between(
843        &self,
844        start: &DateTime<Utc>,
845        end: &DateTime<Utc>,
846    ) -> Result<Vec<Memory>> {
847        let conn = self.conn()?;
848        let query = format!(
849            "MATCH (m:Memory) WHERE m.created_at >= '{}' AND m.created_at < '{}' RETURN {} ORDER BY m.created_at ASC;",
850            start.to_rfc3339(),
851            end.to_rfc3339(),
852            memory_return_cols("m")
853        );
854        let mut result = conn.query(&query)?;
855        let mut memories = Vec::new();
856        for row in &mut result {
857            memories.push(row_to_memory(&row)?);
858        }
859        Ok(memories)
860    }
861
862    pub fn mark_consolidated(&self, raw_id: Uuid, distilled_id: Uuid, model: &str) -> Result<()> {
863        let conn = self.conn()?;
864        let now = Utc::now().to_rfc3339();
865        let query = format!(
866            "CREATE (:ConsolidationLog {{memory_id: '{}', distilled_id: '{}', consolidated_at: '{}', model: '{}'}});",
867            raw_id,
868            distilled_id,
869            now,
870            escape_cypher(model)
871        );
872        conn.query(&query)?;
873        Ok(())
874    }
875
876    pub fn consolidation_count(&self) -> Result<usize> {
877        let conn = self.conn()?;
878        let mut result = conn.query("MATCH (c:ConsolidationLog) RETURN count(c);")?;
879        match result.next() {
880            Some(row) => match &row[0] {
881                Value::Int64(n) => Ok(*n as usize),
882                _ => Ok(0),
883            },
884            None => Ok(0),
885        }
886    }
887
888    pub fn delete_consolidated_raw(&self) -> Result<usize> {
889        let ids: Vec<Uuid> = {
890            let conn = self.conn()?;
891            let result = conn.query(
892                "MATCH (c:ConsolidationLog) WHERE c.memory_id <> c.distilled_id RETURN c.memory_id;",
893            )?;
894            let mut acc = Vec::new();
895            for row in result {
896                if let Value::String(id) = &row[0]
897                    && let Ok(uuid) = Uuid::parse_str(id)
898                {
899                    acc.push(uuid);
900                }
901            }
902            acc
903        };
904        let count = ids.len();
905        let machine_id = self.machine_id.clone();
906        for id in &ids {
907            let id_str = id.to_string();
908            let mid = machine_id.clone();
909            self.with_transaction(|conn| {
910                conn.query(&format!(
911                    "MATCH (m:Memory {{id: '{id_str}'}}) DETACH DELETE m;"
912                ))?;
913                self.upsert_tombstone(conn, &id_str, "memory", &mid)?;
914                self.append_sync_log(
915                    conn,
916                    SyncOp::Delete,
917                    SyncNodeType::Memory,
918                    &id_str,
919                    None,
920                    &mid,
921                    None,
922                )
923            })?;
924        }
925        Ok(count)
926    }
927
928    pub fn rebuild_vector_index(&self) -> Result<()> {
929        let conn = self.conn()?;
930        conn.query("CALL DROP_VECTOR_INDEX('Memory', 'memory_emb_idx');")
931            .ok();
932        conn.query(
933            "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
934        )?;
935        Ok(())
936    }
937
938    pub fn clear_ingest_log(&self) -> Result<usize> {
939        let conn = self.conn()?;
940        let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
941        let count = match result.next() {
942            Some(row) => match &row[0] {
943                Value::Int64(n) => *n as usize,
944                _ => 0,
945            },
946            None => 0,
947        };
948        conn.query("MATCH (l:IngestLog) DETACH DELETE l;")?;
949        Ok(count)
950    }
951
952    pub fn delete_memories_by_source(&self, source: &str) -> Result<usize> {
953        let conn = self.conn()?;
954        let escaped = escape_cypher(source);
955        let mut result = conn.query(&format!(
956            "MATCH (m:Memory) WHERE m.source = '{}' RETURN count(m);",
957            escaped
958        ))?;
959        let count = match result.next() {
960            Some(row) => match &row[0] {
961                Value::Int64(n) => *n as usize,
962                _ => 0,
963            },
964            None => 0,
965        };
966        conn.query(&format!(
967            "MATCH (m:Memory) WHERE m.source = '{}' DETACH DELETE m;",
968            escaped
969        ))?;
970        Ok(count)
971    }
972
973    pub fn is_file_ingested(&self, file_path: &str) -> Result<bool> {
974        let conn = self.conn()?;
975        let escaped = escape_cypher(file_path);
976        let mut result = conn.query(&format!(
977            "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_path;",
978            escaped
979        ))?;
980        Ok(result.next().is_some())
981    }
982
983    pub fn is_file_changed(&self, file_path: &str, file_hash: &str) -> Result<bool> {
984        let conn = self.conn()?;
985        let escaped = escape_cypher(file_path);
986        let mut result = conn.query(&format!(
987            "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_hash;",
988            escaped
989        ))?;
990        match result.next() {
991            Some(row) => {
992                let stored_hash = value_to_string(&row[0]);
993                Ok(stored_hash != file_hash)
994            }
995            None => Ok(true),
996        }
997    }
998
999    pub fn mark_ingested(
1000        &self,
1001        file_path: &str,
1002        file_hash: &str,
1003        memory_count: usize,
1004        source: &str,
1005    ) -> Result<()> {
1006        let conn = self.conn()?;
1007        let path_escaped = escape_cypher(file_path);
1008        let hash_escaped = escape_cypher(file_hash);
1009        let source_escaped = escape_cypher(source);
1010        let now = chrono::Utc::now().to_rfc3339();
1011
1012        conn.query(&format!(
1013            "MERGE (l:IngestLog {{file_path: '{}'}})
1014             SET l.file_hash = '{}', l.ingested_at = '{}', l.memory_count = {}, l.source = '{}';",
1015            path_escaped, hash_escaped, now, memory_count as i64, source_escaped
1016        ))?;
1017        Ok(())
1018    }
1019
1020    pub fn ingested_file_count(&self) -> Result<usize> {
1021        let conn = self.conn()?;
1022        let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
1023        match result.next() {
1024            Some(row) => match &row[0] {
1025                Value::Int64(n) => Ok(*n as usize),
1026                _ => Ok(0),
1027            },
1028            None => Ok(0),
1029        }
1030    }
1031}
1032
1033impl KuzuStore {
1034    fn create_memory_node(&self, conn: &Connection<'_>, memory: &Memory) -> Result<()> {
1035        let id = memory.id.to_string();
1036        let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
1037        let created_at = memory.created_at.to_rfc3339();
1038        let last_accessed = memory.last_accessed.to_rfc3339();
1039        let embedding_str = format_embedding(&memory.embedding);
1040        let content_escaped = escape_cypher(&memory.content);
1041        let source_escaped = escape_cypher(&memory.source);
1042        let source_id_escaped = escape_cypher(&memory.source_id);
1043        let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
1044        let machine_id_escaped = escape_cypher(&memory.machine_id);
1045        let updated_at = memory.updated_at.to_rfc3339();
1046
1047        let query = format!(
1048            "CREATE (:Memory {{
1049                id: '{id}',
1050                content: '{content_escaped}',
1051                embedding: {embedding_str},
1052                memory_type: '{memory_type}',
1053                confidence: {confidence},
1054                created_at: '{created_at}',
1055                last_accessed: '{last_accessed}',
1056                access_count: {access_count},
1057                source: '{source_escaped}',
1058                source_id: '{source_id_escaped}',
1059                project_path: '{project_path_escaped}',
1060                machine_id: '{machine_id_escaped}',
1061                updated_at: '{updated_at}'
1062            }});",
1063            confidence = memory.confidence,
1064            access_count = memory.access_count,
1065        );
1066
1067        conn.query(&query)?;
1068        Ok(())
1069    }
1070
1071    fn create_entity_node(&self, conn: &Connection<'_>, entity: &Entity) -> Result<()> {
1072        let id = entity.id.to_string();
1073        let embedding_str = format_embedding(&entity.embedding);
1074        let aliases_str = format_string_array(&entity.aliases);
1075        let name_escaped = escape_cypher(&entity.name);
1076        let etype_escaped = escape_cypher(&entity.entity_type);
1077
1078        let query = format!(
1079            "CREATE (:Entity {{
1080                id: '{id}',
1081                name: '{name_escaped}',
1082                entity_type: '{etype_escaped}',
1083                embedding: {embedding_str},
1084                aliases: {aliases_str}
1085            }});"
1086        );
1087
1088        conn.query(&query)?;
1089        Ok(())
1090    }
1091
1092    fn create_conversation_node(
1093        &self,
1094        conn: &Connection<'_>,
1095        conversation: &Conversation,
1096    ) -> Result<()> {
1097        let id = conversation.id.to_string();
1098        let started_at = conversation.started_at.to_rfc3339();
1099        let source_escaped = escape_cypher(&conversation.source);
1100        let machine_escaped = escape_cypher(&conversation.machine_id);
1101        let project_escaped = escape_cypher(conversation.project_path.as_deref().unwrap_or(""));
1102
1103        let query = format!(
1104            "CREATE (:Conversation {{
1105                id: '{id}',
1106                source: '{source_escaped}',
1107                machine_id: '{machine_escaped}',
1108                started_at: '{started_at}',
1109                project_path: '{project_escaped}'
1110            }});"
1111        );
1112
1113        conn.query(&query)?;
1114        Ok(())
1115    }
1116
1117    fn create_relation_edge(&self, conn: &Connection<'_>, relation: &Relation) -> Result<()> {
1118        let from_id = relation.from_id.to_string();
1119        let to_id = relation.to_id.to_string();
1120
1121        let query = match relation.relation_type {
1122            RelationType::RelatesTo => format!(
1123                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:RELATES_TO {{strength: {}, context: '{}'}}]->(b);",
1124                from_id,
1125                to_id,
1126                relation.strength,
1127                relation.context.as_deref().unwrap_or("")
1128            ),
1129            RelationType::Mentions => format!(
1130                "MATCH (a:Memory {{id: '{}'}}), (b:Entity {{id: '{}'}}) CREATE (a)-[:MENTIONS {{position: 0}}]->(b);",
1131                from_id, to_id
1132            ),
1133            RelationType::DerivedFrom => format!(
1134                "MATCH (a:Memory {{id: '{}'}}), (b:Conversation {{id: '{}'}}) CREATE (a)-[:DERIVED_FROM {{transformation: '{}'}}]->(b);",
1135                from_id,
1136                to_id,
1137                relation.context.as_deref().unwrap_or("direct")
1138            ),
1139            RelationType::Contradicts => format!(
1140                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:CONTRADICTS {{resolution: '{}'}}]->(b);",
1141                from_id,
1142                to_id,
1143                relation.context.as_deref().unwrap_or("")
1144            ),
1145            RelationType::Reinforces => format!(
1146                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:REINFORCES]->(b);",
1147                from_id, to_id
1148            ),
1149            RelationType::DistilledFrom => format!(
1150                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:DISTILLED_FROM {{model: '{}'}}]->(b);",
1151                from_id,
1152                to_id,
1153                relation.context.as_deref().unwrap_or("")
1154            ),
1155            RelationType::Supersedes => format!(
1156                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:SUPERSEDES {{reason: '{}'}}]->(b);",
1157                from_id,
1158                to_id,
1159                relation.context.as_deref().unwrap_or("")
1160            ),
1161        };
1162
1163        conn.query(&query)?;
1164        Ok(())
1165    }
1166}
1167
1168impl KuzuStore {
1169    pub fn get_conversation(&self, id: Uuid) -> Result<Option<Conversation>> {
1170        let conn = self.conn()?;
1171        let mut result = conn.query(&format!(
1172            "MATCH (c:Conversation {{id: '{}'}}) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1173            id
1174        ))?;
1175
1176        match result.next() {
1177            Some(row) => {
1178                let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1179                let source = value_to_string(&row[1]);
1180                let machine_id = value_to_string(&row[2]);
1181                let started_at_str = value_to_string(&row[3]);
1182                let project_path = {
1183                    let s = value_to_string(&row[4]);
1184                    if s.is_empty() { None } else { Some(s) }
1185                };
1186                let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1187                    .map(|dt| dt.with_timezone(&chrono::Utc))
1188                    .unwrap_or_else(|_| chrono::Utc::now());
1189
1190                Ok(Some(Conversation {
1191                    id,
1192                    source,
1193                    machine_id,
1194                    started_at,
1195                    project_path,
1196                }))
1197            }
1198            None => Ok(None),
1199        }
1200    }
1201
1202    pub fn get_memory_with_embedding(&self, id: Uuid) -> Result<Option<Memory>> {
1203        let conn = self.conn()?;
1204        let query = format!(
1205            "MATCH (m:Memory {{id: '{}'}}) RETURN {}, m.embedding;",
1206            id, memory_return_cols("m")
1207        );
1208
1209        let mut result = conn.query(&query)?;
1210        match result.next() {
1211            Some(row) => {
1212                let mut memory = row_to_memory(&row[..12])?;
1213                memory.embedding = value_to_f32_vec(&row[12]);
1214                Ok(Some(memory))
1215            }
1216            None => Ok(None),
1217        }
1218    }
1219
1220    pub fn sync_log_since(&self, after_seq: u64) -> Result<Vec<SyncEntry>> {
1221        self.sync_log_page(after_seq, None)
1222    }
1223
1224    fn tombstone_exists_on(&self, conn: &Connection<'_>, id: Uuid) -> Result<bool> {
1225        let id_esc = escape_cypher(&id.to_string());
1226        let mut result = conn.query(&format!(
1227            "MATCH (t:Tombstone {{node_id: '{id_esc}'}}) RETURN t.node_id LIMIT 1;"
1228        ))?;
1229        Ok(result.next().is_some())
1230    }
1231
1232    fn normalize_incoming(mem: &mut Memory) {
1233        if mem.updated_at == DateTime::UNIX_EPOCH {
1234            mem.updated_at = mem.created_at;
1235        }
1236    }
1237
1238    pub fn apply_create_memory(
1239        &self,
1240        mem: &Memory,
1241        origin_machine_id: &str,
1242        origin_seq: i64,
1243    ) -> Result<ApplyOutcome> {
1244        let mut mem = mem.clone();
1245        Self::normalize_incoming(&mut mem);
1246        let data = serde_json::to_string(&mem).ok();
1247        let node_id = mem.id.to_string();
1248        self.with_transaction(|conn| {
1249            if self.tombstone_exists_on(conn, mem.id)? {
1250                return Ok(ApplyOutcome::Skipped);
1251            }
1252            if self.get_memory(mem.id)?.is_some() {
1253                return Ok(ApplyOutcome::Skipped);
1254            }
1255            self.create_memory_node(conn, &mem)?;
1256            self.append_sync_log(
1257                conn,
1258                SyncOp::Create,
1259                SyncNodeType::Memory,
1260                &node_id,
1261                data.as_deref(),
1262                origin_machine_id,
1263                Some(origin_seq),
1264            )?;
1265            Ok(ApplyOutcome::Created)
1266        })
1267    }
1268
1269    pub fn apply_update_memory(
1270        &self,
1271        incoming: &Memory,
1272        origin_machine_id: &str,
1273        origin_seq: i64,
1274    ) -> Result<(ApplyOutcome, Option<Memory>)> {
1275        let mut incoming = incoming.clone();
1276        Self::normalize_incoming(&mut incoming);
1277        let data = serde_json::to_string(&incoming).ok();
1278        let node_id = incoming.id.to_string();
1279        self.with_transaction(|conn| {
1280            match self.get_memory(incoming.id)? {
1281                None => {
1282                    if self.tombstone_exists_on(conn, incoming.id)? {
1283                        return Ok((ApplyOutcome::Skipped, None));
1284                    }
1285                    self.create_memory_node(conn, &incoming)?;
1286                    self.append_sync_log(
1287                        conn,
1288                        SyncOp::Create,
1289                        SyncNodeType::Memory,
1290                        &node_id,
1291                        data.as_deref(),
1292                        origin_machine_id,
1293                        Some(origin_seq),
1294                    )?;
1295                    Ok((ApplyOutcome::Created, Some(incoming)))
1296                }
1297                Some(existing) => {
1298                    let incoming_wins = if incoming.updated_at != existing.updated_at {
1299                        incoming.updated_at > existing.updated_at
1300                    } else if incoming.access_count != existing.access_count {
1301                        incoming.access_count > existing.access_count
1302                    } else {
1303                        incoming.machine_id > existing.machine_id
1304                    };
1305
1306                    if !incoming_wins {
1307                        return Ok((ApplyOutcome::Skipped, None));
1308                    }
1309
1310                    let id = incoming.id.to_string();
1311                    let content_escaped = escape_cypher(&incoming.content);
1312                    let memory_type = format!("{:?}", incoming.memory_type).to_lowercase();
1313                    let created_at = incoming.created_at.to_rfc3339();
1314                    let last_accessed = incoming.last_accessed.to_rfc3339();
1315                    let updated_at = incoming.updated_at.to_rfc3339();
1316                    let source_escaped = escape_cypher(&incoming.source);
1317                    let source_id_escaped = escape_cypher(&incoming.source_id);
1318                    let project_path_escaped =
1319                        escape_cypher(incoming.project_path.as_deref().unwrap_or(""));
1320                    let machine_id_escaped = escape_cypher(&incoming.machine_id);
1321                    // embedding is excluded because KuzuDB forbids SET on HNSW-indexed columns;
1322                    // the embedding is recomputed at ingest time so skipping it here is safe
1323                    conn.query(&format!(
1324                        "MATCH (m:Memory {{id: '{id}'}}) SET \
1325                         m.content = '{content_escaped}', \
1326                         m.memory_type = '{memory_type}', \
1327                         m.confidence = {confidence}, \
1328                         m.created_at = '{created_at}', \
1329                         m.last_accessed = '{last_accessed}', \
1330                         m.access_count = {access_count}, \
1331                         m.source = '{source_escaped}', \
1332                         m.source_id = '{source_id_escaped}', \
1333                         m.project_path = '{project_path_escaped}', \
1334                         m.machine_id = '{machine_id_escaped}', \
1335                         m.updated_at = '{updated_at}';",
1336                        confidence = incoming.confidence,
1337                        access_count = incoming.access_count,
1338                    ))?;
1339                    self.append_sync_log(
1340                        conn,
1341                        SyncOp::Update,
1342                        SyncNodeType::Memory,
1343                        &node_id,
1344                        data.as_deref(),
1345                        origin_machine_id,
1346                        Some(origin_seq),
1347                    )?;
1348                    Ok((ApplyOutcome::Updated, Some(incoming)))
1349                }
1350            }
1351        })
1352    }
1353
1354    pub fn apply_delete_memory(
1355        &self,
1356        node_id: &str,
1357        origin_machine_id: &str,
1358        origin_seq: i64,
1359    ) -> Result<ApplyOutcome> {
1360        self.with_transaction(|conn| {
1361            conn.query(&format!(
1362                "MATCH (m:Memory {{id: '{node_id}'}}) DETACH DELETE m;"
1363            ))?;
1364            self.upsert_tombstone(conn, node_id, "memory", origin_machine_id)?;
1365            self.append_sync_log(
1366                conn,
1367                SyncOp::Delete,
1368                SyncNodeType::Memory,
1369                node_id,
1370                None,
1371                origin_machine_id,
1372                Some(origin_seq),
1373            )?;
1374            Ok(ApplyOutcome::Deleted)
1375        })
1376    }
1377
1378    pub fn apply_create_conversation(
1379        &self,
1380        conv: &Conversation,
1381        origin_machine_id: &str,
1382        origin_seq: i64,
1383    ) -> Result<ApplyOutcome> {
1384        let data = serde_json::to_string(conv).ok();
1385        let node_id = conv.id.to_string();
1386        self.with_transaction(|conn| {
1387            if self.get_conversation(conv.id)?.is_some() {
1388                return Ok(ApplyOutcome::Skipped);
1389            }
1390            self.create_conversation_node(conn, conv)?;
1391            self.append_sync_log(
1392                conn,
1393                SyncOp::Create,
1394                SyncNodeType::Conversation,
1395                &node_id,
1396                data.as_deref(),
1397                origin_machine_id,
1398                Some(origin_seq),
1399            )?;
1400            Ok(ApplyOutcome::Created)
1401        })
1402    }
1403
1404    pub fn apply_upsert_conversation(
1405        &self,
1406        conv: &Conversation,
1407        origin_machine_id: &str,
1408        origin_seq: i64,
1409    ) -> Result<ApplyOutcome> {
1410        let data = serde_json::to_string(conv).ok();
1411        let node_id = conv.id.to_string();
1412        let source_escaped = escape_cypher(&conv.source);
1413        let machine_escaped = escape_cypher(&conv.machine_id);
1414        let project_escaped = escape_cypher(conv.project_path.as_deref().unwrap_or(""));
1415        let started_at = conv.started_at.to_rfc3339();
1416        self.with_transaction(|conn| {
1417            conn.query(&format!(
1418                "MERGE (c:Conversation {{id: '{node_id}'}}) SET \
1419                 c.source = '{source_escaped}', \
1420                 c.machine_id = '{machine_escaped}', \
1421                 c.started_at = '{started_at}', \
1422                 c.project_path = '{project_escaped}';"
1423            ))?;
1424            self.append_sync_log(
1425                conn,
1426                SyncOp::Update,
1427                SyncNodeType::Conversation,
1428                &node_id,
1429                data.as_deref(),
1430                origin_machine_id,
1431                Some(origin_seq),
1432            )?;
1433            Ok(ApplyOutcome::Updated)
1434        })
1435    }
1436
1437    pub fn apply_delete_conversation(
1438        &self,
1439        node_id: &str,
1440        origin_machine_id: &str,
1441        origin_seq: i64,
1442    ) -> Result<ApplyOutcome> {
1443        self.with_transaction(|conn| {
1444            conn.query(&format!(
1445                "MATCH (c:Conversation {{id: '{node_id}'}}) DETACH DELETE c;"
1446            ))?;
1447            self.append_sync_log(
1448                conn,
1449                SyncOp::Delete,
1450                SyncNodeType::Conversation,
1451                node_id,
1452                None,
1453                origin_machine_id,
1454                Some(origin_seq),
1455            )?;
1456            Ok(ApplyOutcome::Deleted)
1457        })
1458    }
1459
1460    pub fn apply_create_entity(
1461        &self,
1462        entity: &Entity,
1463        origin_machine_id: &str,
1464        origin_seq: i64,
1465    ) -> Result<ApplyOutcome> {
1466        let data = serde_json::to_string(entity).ok();
1467        let node_id = entity.id.to_string();
1468        self.with_transaction(|conn| {
1469            if self.get_entity(entity.id)?.is_some() {
1470                return Ok(ApplyOutcome::Skipped);
1471            }
1472            self.create_entity_node(conn, entity)?;
1473            self.append_sync_log(
1474                conn,
1475                SyncOp::Create,
1476                SyncNodeType::Entity,
1477                &node_id,
1478                data.as_deref(),
1479                origin_machine_id,
1480                Some(origin_seq),
1481            )?;
1482            Ok(ApplyOutcome::Created)
1483        })
1484    }
1485
1486    pub fn apply_upsert_entity(
1487        &self,
1488        entity: &Entity,
1489        origin_machine_id: &str,
1490        origin_seq: i64,
1491    ) -> Result<ApplyOutcome> {
1492        let data = serde_json::to_string(entity).ok();
1493        let node_id = entity.id.to_string();
1494        let name_escaped = escape_cypher(&entity.name);
1495        let etype_escaped = escape_cypher(&entity.entity_type);
1496        let embedding_str = format_embedding(&entity.embedding);
1497        let aliases_str = format_string_array(&entity.aliases);
1498        self.with_transaction(|conn| {
1499            conn.query(&format!(
1500                "MERGE (e:Entity {{id: '{node_id}'}}) SET \
1501                 e.name = '{name_escaped}', \
1502                 e.entity_type = '{etype_escaped}', \
1503                 e.embedding = {embedding_str}, \
1504                 e.aliases = {aliases_str};"
1505            ))?;
1506            self.append_sync_log(
1507                conn,
1508                SyncOp::Update,
1509                SyncNodeType::Entity,
1510                &node_id,
1511                data.as_deref(),
1512                origin_machine_id,
1513                Some(origin_seq),
1514            )?;
1515            Ok(ApplyOutcome::Updated)
1516        })
1517    }
1518
1519    pub fn apply_delete_entity(
1520        &self,
1521        node_id: &str,
1522        origin_machine_id: &str,
1523        origin_seq: i64,
1524    ) -> Result<ApplyOutcome> {
1525        self.with_transaction(|conn| {
1526            conn.query(&format!(
1527                "MATCH (e:Entity {{id: '{node_id}'}}) DETACH DELETE e;"
1528            ))?;
1529            self.append_sync_log(
1530                conn,
1531                SyncOp::Delete,
1532                SyncNodeType::Entity,
1533                node_id,
1534                None,
1535                origin_machine_id,
1536                Some(origin_seq),
1537            )?;
1538            Ok(ApplyOutcome::Deleted)
1539        })
1540    }
1541
1542    pub fn apply_create_relation(
1543        &self,
1544        relation: &Relation,
1545        origin_machine_id: &str,
1546        origin_seq: i64,
1547    ) -> Result<ApplyOutcome> {
1548        let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
1549        let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
1550        let data = serde_json::to_string(relation).ok();
1551        self.with_transaction(|conn| {
1552            match self.create_relation_edge(conn, relation) {
1553                Ok(()) => {
1554                    self.append_sync_log(
1555                        conn,
1556                        SyncOp::Create,
1557                        SyncNodeType::Relation,
1558                        &node_id,
1559                        data.as_deref(),
1560                        origin_machine_id,
1561                        Some(origin_seq),
1562                    )?;
1563                    Ok(ApplyOutcome::Created)
1564                }
1565                Err(_) => Ok(ApplyOutcome::Skipped),
1566            }
1567        })
1568    }
1569
1570    pub fn apply_delete_relation(
1571        &self,
1572        node_id: &str,
1573        relation: Option<&Relation>,
1574        origin_machine_id: &str,
1575        origin_seq: i64,
1576    ) -> Result<ApplyOutcome> {
1577        let Some(rel) = relation else {
1578            // Nothing to delete or relay because there is no deserializable relation data,
1579            // so skip rather than inflate the delete counter with a hollow log entry.
1580            return Ok(ApplyOutcome::Skipped);
1581        };
1582        self.with_transaction(|conn| {
1583            let from_id = rel.from_id.to_string();
1584            let to_id = rel.to_id.to_string();
1585            let rel_label = match rel.relation_type {
1586                crate::schema::RelationType::RelatesTo => "RELATES_TO",
1587                crate::schema::RelationType::Mentions => "MENTIONS",
1588                crate::schema::RelationType::DerivedFrom => "DERIVED_FROM",
1589                crate::schema::RelationType::Contradicts => "CONTRADICTS",
1590                crate::schema::RelationType::Reinforces => "REINFORCES",
1591                crate::schema::RelationType::Supersedes => "SUPERSEDES",
1592                crate::schema::RelationType::DistilledFrom => "DISTILLED_FROM",
1593            };
1594            conn.query(&format!(
1595                "MATCH (a {{id: '{from_id}'}})-[r:{rel_label}]->(b {{id: '{to_id}'}}) DELETE r;"
1596            ))
1597            .ok();
1598            self.append_sync_log(
1599                conn,
1600                SyncOp::Delete,
1601                SyncNodeType::Relation,
1602                node_id,
1603                None,
1604                origin_machine_id,
1605                Some(origin_seq),
1606            )?;
1607            Ok(ApplyOutcome::Deleted)
1608        })
1609    }
1610
1611    pub fn sync_log_page(&self, after_seq: u64, limit: Option<usize>) -> Result<Vec<SyncEntry>> {
1612        let conn = self.conn()?;
1613        let limit_clause = match limit {
1614            Some(n) => format!(" LIMIT {}", n),
1615            None => String::new(),
1616        };
1617        let query = format!(
1618            "MATCH (s:SyncLog) WHERE s.local_seq > {} \
1619             RETURN s.id, s.local_seq, s.origin_machine_id, s.origin_seq, s.op, s.node_type, s.node_id, s.timestamp, s.data \
1620             ORDER BY s.local_seq{};",
1621            after_seq, limit_clause
1622        );
1623
1624        let mut result = conn.query(&query)?;
1625        let mut entries = Vec::new();
1626
1627        for row in &mut result {
1628            let id = value_to_string(&row[0]);
1629            let local_seq = match &row[1] {
1630                Value::Int64(n) => *n,
1631                _ => 0,
1632            };
1633            let origin_machine_id = value_to_string(&row[2]);
1634            let origin_seq = match &row[3] {
1635                Value::Int64(n) => *n,
1636                _ => 0,
1637            };
1638            let op_str = value_to_string(&row[4]);
1639            let node_type_str = value_to_string(&row[5]);
1640            let node_id = value_to_string(&row[6]);
1641            let timestamp_str = value_to_string(&row[7]);
1642            let data_str = value_to_string(&row[8]);
1643
1644            let op = match op_str.as_str() {
1645                "create" => SyncOp::Create,
1646                "update" => SyncOp::Update,
1647                "delete" => SyncOp::Delete,
1648                _ => continue,
1649            };
1650            let node_type = match node_type_str.as_str() {
1651                "memory" => SyncNodeType::Memory,
1652                "entity" => SyncNodeType::Entity,
1653                "conversation" => SyncNodeType::Conversation,
1654                "relation" => SyncNodeType::Relation,
1655                _ => continue,
1656            };
1657            let timestamp = chrono::DateTime::parse_from_rfc3339(&timestamp_str)
1658                .map(|dt| dt.with_timezone(&chrono::Utc))
1659                .unwrap_or_else(|_| chrono::Utc::now());
1660
1661            let data = if data_str.is_empty() {
1662                None
1663            } else {
1664                Some(data_str)
1665            };
1666
1667            entries.push(SyncEntry {
1668                id,
1669                local_seq,
1670                origin_machine_id,
1671                origin_seq,
1672                op,
1673                node_type,
1674                node_id,
1675                timestamp,
1676                data,
1677            });
1678        }
1679
1680        Ok(entries)
1681    }
1682
1683    pub fn get_sync_state(&self, peer_id: &str) -> Result<Option<SyncState>> {
1684        let conn = self.conn()?;
1685        let escaped = escape_cypher(peer_id);
1686        let mut result = conn.query(&format!(
1687            "MATCH (s:SyncState {{peer_id: '{}'}}) RETURN s.peer_id, s.last_seq, s.last_sync_at;",
1688            escaped
1689        ))?;
1690
1691        match result.next() {
1692            Some(row) => {
1693                let peer_id = value_to_string(&row[0]);
1694                let last_seq = match &row[1] {
1695                    Value::Int64(n) => *n as u64,
1696                    _ => 0,
1697                };
1698                let last_sync_at_str = value_to_string(&row[2]);
1699                let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1700                    .map(|dt| dt.with_timezone(&chrono::Utc))
1701                    .unwrap_or_else(|_| chrono::Utc::now());
1702
1703                Ok(Some(SyncState {
1704                    peer_id,
1705                    last_seq,
1706                    last_sync_at,
1707                }))
1708            }
1709            None => Ok(None),
1710        }
1711    }
1712
1713    pub fn set_sync_state(&self, state: &SyncState) -> Result<()> {
1714        let conn = self.conn()?;
1715        let peer_escaped = escape_cypher(&state.peer_id);
1716        let now = state.last_sync_at.to_rfc3339();
1717
1718        conn.query(&format!(
1719            "MERGE (s:SyncState {{peer_id: '{}'}}) SET s.last_seq = {}, s.last_sync_at = '{}';",
1720            peer_escaped, state.last_seq, now
1721        ))?;
1722        Ok(())
1723    }
1724
1725    pub fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
1726        let conn = self.conn()?;
1727        let mut result =
1728            conn.query("MATCH (s:SyncState) RETURN s.peer_id, s.last_seq, s.last_sync_at;")?;
1729        let mut states = Vec::new();
1730
1731        for row in &mut result {
1732            let peer_id = value_to_string(&row[0]);
1733            let last_seq = match &row[1] {
1734                Value::Int64(n) => *n as u64,
1735                _ => 0,
1736            };
1737            let last_sync_at_str = value_to_string(&row[2]);
1738            let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1739                .map(|dt| dt.with_timezone(&chrono::Utc))
1740                .unwrap_or_else(|_| chrono::Utc::now());
1741
1742            states.push(SyncState {
1743                peer_id,
1744                last_seq,
1745                last_sync_at,
1746            });
1747        }
1748
1749        Ok(states)
1750    }
1751
1752    pub fn all_entities(&self) -> Result<Vec<Entity>> {
1753        let conn = self.conn()?;
1754        let mut result = conn.query("MATCH (e:Entity) RETURN e.id, e.name, e.entity_type;")?;
1755        let mut entities = Vec::new();
1756        for row in &mut result {
1757            entities.push(row_to_entity(&row)?);
1758        }
1759        Ok(entities)
1760    }
1761
1762    pub fn memories_for_entity(&self, entity_id: Uuid) -> Result<Vec<Memory>> {
1763        let conn = self.conn()?;
1764        let query = format!(
1765            "MATCH (m:Memory)-[:MENTIONS]->(e:Entity {{id: '{}'}}) RETURN {};",
1766            entity_id, memory_return_cols("m")
1767        );
1768        let mut result = conn.query(&query)?;
1769        let mut memories = Vec::new();
1770        for row in &mut result {
1771            memories.push(row_to_memory(&row)?);
1772        }
1773        Ok(memories)
1774    }
1775
1776    pub fn unassociated_memories(&self) -> Result<Vec<Memory>> {
1777        let conn = self.conn()?;
1778        let mut result = conn.query(
1779            &format!("MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (m)-[:MENTIONS]->(:Entity)}} RETURN {};", memory_return_cols("m"))
1780        )?;
1781        let mut memories = Vec::new();
1782        for row in &mut result {
1783            memories.push(row_to_memory(&row)?);
1784        }
1785        Ok(memories)
1786    }
1787
1788    pub fn related_entity_names(&self, entity_id: Uuid) -> Result<Vec<String>> {
1789        let conn = self.conn()?;
1790        let query = format!(
1791            "MATCH (m:Memory)-[:MENTIONS]->(e1:Entity {{id: '{}'}}), (m)-[:MENTIONS]->(e2:Entity) WHERE e2.id <> '{}' RETURN DISTINCT e2.name;",
1792            entity_id, entity_id
1793        );
1794        let mut result = conn.query(&query)?;
1795        let mut names = Vec::new();
1796        for row in &mut result {
1797            names.push(value_to_string(&row[0]));
1798        }
1799        Ok(names)
1800    }
1801
1802    pub fn max_sync_seq(&self) -> Result<u64> {
1803        let conn = self.conn()?;
1804        let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.local_seq);")?;
1805        match result.next() {
1806            Some(row) => match &row[0] {
1807                Value::Int64(n) => Ok(*n as u64),
1808                _ => Ok(0),
1809            },
1810            None => Ok(0),
1811        }
1812    }
1813
1814    pub fn backfill_project_paths(&self) -> Result<u64> {
1815        let conn = self.conn()?;
1816        let mut result = conn.query(
1817            "MATCH (m:Memory)-[:DERIVED_FROM]->(c:Conversation) WHERE m.project_path = '' AND c.project_path IS NOT NULL AND c.project_path <> '' SET m.project_path = c.project_path RETURN count(m);"
1818        )?;
1819        match result.next() {
1820            Some(row) => match &row[0] {
1821                Value::Int64(n) => Ok(*n as u64),
1822                _ => Ok(0),
1823            },
1824            None => Ok(0),
1825        }
1826    }
1827
1828    pub fn memories_by_project_path(&self, project_path: &str) -> Result<Vec<Memory>> {
1829        let conn = self.conn()?;
1830        let escaped = escape_cypher(project_path);
1831        let cols = memory_return_cols("m");
1832        let query = format!(
1833            "MATCH (m:Memory) WHERE m.project_path = '{}' RETURN {} LIMIT 20;",
1834            escaped, cols
1835        );
1836        let mut result = conn.query(&query)?;
1837        let mut memories = Vec::new();
1838        for row in &mut result {
1839            memories.push(row_to_memory(&row)?);
1840        }
1841        Ok(memories)
1842    }
1843
1844    pub fn backfill_sync_log(&self) -> Result<u64> {
1845        let mut count = 0u64;
1846
1847        let memories: Vec<Memory> = {
1848            let conn = self.conn()?;
1849            let mut result = conn.query(&format!(
1850                "MATCH (m:Memory) RETURN {}, m.embedding;",
1851                memory_return_cols("m")
1852            ))?;
1853            let mut acc = Vec::new();
1854            for row in &mut result {
1855                let mut memory = row_to_memory(&row[..12])?;
1856                memory.embedding = value_to_f32_vec(&row[12]);
1857                acc.push(memory);
1858            }
1859            acc
1860        };
1861
1862        for memory in &memories {
1863            let origin_mid = memory.machine_id.clone();
1864            let data = serde_json::to_string(memory).ok();
1865            let node_id = memory.id.to_string();
1866            self.with_transaction(|conn| {
1867                self.append_sync_log(
1868                    conn,
1869                    SyncOp::Create,
1870                    SyncNodeType::Memory,
1871                    &node_id,
1872                    data.as_deref(),
1873                    &origin_mid,
1874                    None,
1875                )
1876            })?;
1877            count += 1;
1878        }
1879
1880        let conversations: Vec<Conversation> = {
1881            let conn = self.conn()?;
1882            let mut result = conn.query(
1883                "MATCH (c:Conversation) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1884            )?;
1885            let mut acc = Vec::new();
1886            for row in &mut result {
1887                let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1888                let source = value_to_string(&row[1]);
1889                let machine_id = value_to_string(&row[2]);
1890                let started_at_str = value_to_string(&row[3]);
1891                let project_path = {
1892                    let s = value_to_string(&row[4]);
1893                    if s.is_empty() { None } else { Some(s) }
1894                };
1895                let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1896                    .map(|dt| dt.with_timezone(&chrono::Utc))
1897                    .unwrap_or_else(|_| chrono::Utc::now());
1898                acc.push(Conversation { id, source, machine_id, started_at, project_path });
1899            }
1900            acc
1901        };
1902
1903        for conv in &conversations {
1904            let origin_mid = conv.machine_id.clone();
1905            let data = serde_json::to_string(conv).ok();
1906            let node_id = conv.id.to_string();
1907            self.with_transaction(|conn| {
1908                self.append_sync_log(
1909                    conn,
1910                    SyncOp::Create,
1911                    SyncNodeType::Conversation,
1912                    &node_id,
1913                    data.as_deref(),
1914                    &origin_mid,
1915                    None,
1916                )
1917            })?;
1918            count += 1;
1919        }
1920
1921        Ok(count)
1922    }
1923}
1924
1925impl KuzuStore {
1926    pub fn upsert_machine(&self, id: &str, name: &str) -> Result<()> {
1927        let conn = self.conn()?;
1928        let id_escaped = escape_cypher(id);
1929        let name_escaped = escape_cypher(name);
1930        conn.query(&format!(
1931            "MERGE (m:Machine {{id: '{id_escaped}'}}) SET m.name = '{name_escaped}';"
1932        ))?;
1933        Ok(())
1934    }
1935
1936    pub fn get_machine_name(&self, id: &str) -> Result<Option<String>> {
1937        let conn = self.conn()?;
1938        let id_escaped = escape_cypher(id);
1939        let mut result = conn.query(&format!(
1940            "MATCH (m:Machine {{id: '{id_escaped}'}}) RETURN m.name;"
1941        ))?;
1942        match result.next() {
1943            Some(row) => Ok(Some(value_to_string(&row[0]))),
1944            None => Ok(None),
1945        }
1946    }
1947
1948    pub fn get_all_machines(&self) -> Result<std::collections::HashMap<String, String>> {
1949        let conn = self.conn()?;
1950        let mut result = conn.query("MATCH (m:Machine) RETURN m.id, m.name;")?;
1951        let mut map = std::collections::HashMap::new();
1952        for row in &mut result {
1953            map.insert(value_to_string(&row[0]), value_to_string(&row[1]));
1954        }
1955        Ok(map)
1956    }
1957
1958    pub fn backfill_machine_id(&self, machine_id: &str) -> Result<u64> {
1959        let conn = self.conn()?;
1960        let escaped = escape_cypher(machine_id);
1961        let mut result = conn.query(&format!(
1962            "MATCH (m:Memory) WHERE m.machine_id = '' SET m.machine_id = '{escaped}' RETURN count(m);"
1963        ))?;
1964        match result.next() {
1965            Some(row) => match &row[0] {
1966                Value::Int64(n) => Ok(*n as u64),
1967                _ => Ok(0),
1968            },
1969            None => Ok(0),
1970        }
1971    }
1972
1973    pub fn register_machine(&self, identity: &crate::machine::MachineIdentity) -> Result<()> {
1974        self.upsert_machine(&identity.id, &identity.name)?;
1975        let count = self.backfill_machine_id(&identity.id)?;
1976        if count > 0 {
1977            tracing::info!("backfilled machine_id on {count} existing memories");
1978        }
1979        Ok(())
1980    }
1981}
1982
1983fn format_embedding(embedding: &[f32]) -> String {
1984    if embedding.is_empty() {
1985        let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
1986        return format!("[{}]", zeros.join(","));
1987    }
1988    let parts: Vec<String> = embedding.iter().map(|v| format!("{}", v)).collect();
1989    format!("[{}]", parts.join(","))
1990}
1991
1992fn escape_cypher(s: &str) -> String {
1993    s.replace('\\', "\\\\").replace('\'', "\\'")
1994}
1995
1996fn format_string_array(items: &[String]) -> String {
1997    let parts: Vec<String> = items
1998        .iter()
1999        .map(|s| format!("'{}'", s.replace('\'', "''")))
2000        .collect();
2001    format!("[{}]", parts.join(","))
2002}
2003
2004fn value_to_string(val: &Value) -> String {
2005    match val {
2006        Value::String(s) => s.clone(),
2007        other => format!("{:?}", other),
2008    }
2009}
2010
2011fn value_to_f32(val: &Value) -> f32 {
2012    match val {
2013        Value::Float(f) => *f,
2014        Value::Double(d) => *d as f32,
2015        Value::Int64(i) => *i as f32,
2016        _ => 0.0,
2017    }
2018}
2019
2020fn value_to_f32_vec(val: &Value) -> Vec<f32> {
2021    match val {
2022        Value::Array(_, items) | Value::List(_, items) => items.iter().map(value_to_f32).collect(),
2023        _ => Vec::new(),
2024    }
2025}
2026
2027fn row_to_memory(row: &[Value]) -> Result<Memory> {
2028    let id_str = value_to_string(&row[0]);
2029    let id = Uuid::parse_str(&id_str).unwrap_or_default();
2030    let content = value_to_string(&row[1]);
2031    let memory_type_str = value_to_string(&row[2]);
2032    let confidence = value_to_f32(&row[3]);
2033    let created_at_str = value_to_string(&row[4]);
2034    let last_accessed_str = value_to_string(&row[5]);
2035    let access_count = match &row[6] {
2036        Value::Int64(i) => *i as u32,
2037        _ => 0,
2038    };
2039    let source = value_to_string(&row[7]);
2040    let source_id = value_to_string(&row[8]);
2041    let project_path = project_path_from_db(&value_to_string(&row[9]));
2042    let machine_id = value_to_string(&row[10]);
2043    let updated_at_str = value_to_string(&row[11]);
2044
2045    let memory_type = match memory_type_str.as_str() {
2046        "episodic" => MemoryType::Episodic,
2047        "procedural" => MemoryType::Procedural,
2048        "decision" => MemoryType::Decision,
2049        "architecture" => MemoryType::Architecture,
2050        "debugging" => MemoryType::Debugging,
2051        "task" => MemoryType::Task,
2052        "question" => MemoryType::Question,
2053        _ => MemoryType::Semantic,
2054    };
2055
2056    let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
2057        .map(|dt| dt.with_timezone(&chrono::Utc))
2058        .unwrap_or_else(|_| chrono::Utc::now());
2059
2060    let last_accessed = chrono::DateTime::parse_from_rfc3339(&last_accessed_str)
2061        .map(|dt| dt.with_timezone(&chrono::Utc))
2062        .unwrap_or_else(|_| chrono::Utc::now());
2063
2064    let updated_at = if updated_at_str.is_empty() {
2065        created_at
2066    } else {
2067        chrono::DateTime::parse_from_rfc3339(&updated_at_str)
2068            .map(|dt| dt.with_timezone(&chrono::Utc))
2069            .unwrap_or(created_at)
2070    };
2071
2072    Ok(Memory {
2073        id,
2074        content,
2075        embedding: Vec::new(),
2076        memory_type,
2077        confidence,
2078        created_at,
2079        last_accessed,
2080        access_count,
2081        source,
2082        source_id,
2083        project_path,
2084        machine_id,
2085        updated_at,
2086    })
2087}
2088
2089fn row_to_entity(row: &[Value]) -> Result<Entity> {
2090    let id_str = value_to_string(&row[0]);
2091    let id = Uuid::parse_str(&id_str).unwrap_or_default();
2092    let name = value_to_string(&row[1]);
2093    let entity_type = value_to_string(&row[2]);
2094
2095    Ok(Entity {
2096        id,
2097        name,
2098        entity_type,
2099        embedding: Vec::new(),
2100        aliases: Vec::new(),
2101    })
2102}
2103
2104unsafe impl Send for KuzuStore {}
2105unsafe impl Sync for KuzuStore {}
2106
2107#[cfg(test)]
2108mod tests {
2109    use super::*;
2110
2111    #[test]
2112    fn migrate_sync_log_recreates_old_schema_and_resets_cursors() {
2113        let store = KuzuStore::in_memory("test-machine-migrate".to_string()).unwrap();
2114        let conn = store.conn().unwrap();
2115
2116        // Tear down the new-schema table that init_schema already created and
2117        // replace it with the pre-redesign schema (seq INT64 PRIMARY KEY).
2118        conn.query("DROP TABLE SyncLog;").unwrap();
2119        conn.query(
2120            "CREATE NODE TABLE SyncLog(\
2121                seq INT64 PRIMARY KEY, \
2122                op STRING, \
2123                node_type STRING, \
2124                node_id STRING, \
2125                machine_id STRING, \
2126                timestamp STRING, \
2127                data STRING\
2128            );",
2129        )
2130        .unwrap();
2131        conn.query(
2132            "CREATE (:SyncLog {seq: 1, op: 'create', node_type: 'memory', \
2133             node_id: 'abc', machine_id: 'old-machine', \
2134             timestamp: '2024-01-01T00:00:00Z', data: ''});",
2135        )
2136        .unwrap();
2137
2138        // Seed a SyncState cursor at a non-zero position so we can verify it gets reset.
2139        conn.query(
2140            "MERGE (s:SyncState {peer_id: 'peer-x'}) \
2141             SET s.last_seq = 5, s.last_sync_at = '2024-01-01T00:00:00Z';",
2142        )
2143        .unwrap();
2144
2145        store.migrate_sync_log(&conn).unwrap();
2146
2147        // New-schema column must exist (table was recreated with id STRING PRIMARY KEY).
2148        assert!(
2149            conn.query("MATCH (s:SyncLog) RETURN s.id LIMIT 1;").is_ok(),
2150            "new-schema column s.id must be present after migration"
2151        );
2152
2153        // SyncState cursor must have been reset to 0.
2154        let mut r = conn
2155            .query(
2156                "MATCH (s:SyncState {peer_id: 'peer-x'}) RETURN s.last_seq;",
2157            )
2158            .unwrap();
2159        let last_seq = match r.next() {
2160            Some(row) => match &row[0] {
2161                Value::Int64(n) => *n,
2162                _ => panic!("unexpected value type for last_seq"),
2163            },
2164            None => panic!("SyncState row not found after migration"),
2165        };
2166        assert_eq!(last_seq, 0, "last_seq must be reset to 0 after migration");
2167    }
2168
2169    #[test]
2170    fn rollback_leaves_no_partial_state() {
2171        let store = KuzuStore::in_memory("test-machine-rollback".to_string()).unwrap();
2172        let embedding = std::iter::repeat("0.0")
2173            .take(384)
2174            .collect::<Vec<_>>()
2175            .join(", ");
2176
2177        let result: Result<()> = store.with_transaction(|conn| {
2178            conn.query(&format!(
2179                "CREATE (:Memory {{id: 'rollback-test-id', content: 'x', \
2180                 embedding: [{embedding}], memory_type: 'semantic', confidence: 1.0, \
2181                 created_at: '2024-01-01T00:00:00Z', last_accessed: '2024-01-01T00:00:00Z', \
2182                 access_count: 0, source: 'test', source_id: '', project_path: '', \
2183                 machine_id: 'x', updated_at: '2024-01-01T00:00:00Z'}});"
2184            ))?;
2185            Err(anyhow::anyhow!("deliberate rollback"))
2186        });
2187
2188        assert!(
2189            result.is_err(),
2190            "with_transaction must propagate the closure error"
2191        );
2192
2193        let entries = store.sync_log_since(0).unwrap();
2194        assert_eq!(entries.len(), 0, "no SyncLog rows may survive a rollback");
2195        assert_eq!(
2196            store.memory_count().unwrap(),
2197            0,
2198            "the Memory node created inside the transaction must be rolled back"
2199        );
2200    }
2201}