Skip to main content

second_brain_core/
kuzu_store.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::RwLock;
4
5use anyhow::{Context, Result};
6use chrono::{DateTime, Utc};
7use kuzu::{Connection, Database, SystemConfig, Value};
8use uuid::Uuid;
9
10use crate::schema::{
11    Conversation, Entity, Memory, MemoryType, Relation, RelationType, SyncEntry, SyncNodeType,
12    SyncOp, SyncState,
13};
14use crate::store::Store;
15
16type RelationsCache = HashMap<(Uuid, Option<RelationType>), Vec<Relation>>;
17
18pub enum ApplyOutcome {
19    Created,
20    Updated,
21    Deleted,
22    Skipped,
23}
24
25pub struct KuzuStore {
26    db: Database,
27    machine_id: String,
28    // get_relations(id, rel_type) is deterministic and query-independent: its result
29    // only changes when an edge rooted at `id` is created or removed. Caching between
30    // such writes is correct, so every write path that touches a memory's outgoing
31    // edges must evict the affected key (invariant enforced below).
32    relations_cache: RwLock<RelationsCache>,
33}
34
35fn memory_return_cols(alias: &str) -> String {
36    ["id", "content", "memory_type", "confidence", "created_at", "last_accessed", "access_count", "source", "source_id", "project_path", "machine_id", "updated_at"]
37        .iter()
38        .map(|col| format!("{alias}.{col}"))
39        .collect::<Vec<_>>()
40        .join(", ")
41}
42
43fn project_path_from_db(s: &str) -> Option<String> {
44    if s.is_empty() { None } else { Some(s.to_string()) }
45}
46
47impl KuzuStore {
48    pub fn open(path: &Path, machine_id: String) -> Result<Self> {
49        let db = Database::new(path, SystemConfig::default()).context("opening KùzuDB database")?;
50        let store = Self {
51            db,
52            machine_id,
53            relations_cache: RwLock::new(HashMap::new()),
54        };
55        store.init_schema()?;
56        Ok(store)
57    }
58
59    pub fn in_memory(machine_id: String) -> Result<Self> {
60        let db =
61            Database::in_memory(SystemConfig::default()).context("creating in-memory KùzuDB")?;
62        let store = Self {
63            db,
64            machine_id,
65            relations_cache: RwLock::new(HashMap::new()),
66        };
67        store.init_schema()?;
68        Ok(store)
69    }
70
71    fn evict_relations_for(&self, id: Uuid) {
72        let mut cache = self.relations_cache.write().unwrap();
73        cache.retain(|(key_id, _), _| *key_id != id);
74    }
75
76    // A full clear is acceptable on bulk maintenance paths (consolidation purge,
77    // delete-by-source, entity/conversation detach) because they are bursty and rare,
78    // not per-recall, and enumerating every memory whose outgoing edge count changed
79    // (e.g. memories that MENTIONS a deleted entity) is not cheap. Clearing is always
80    // sound; it only discards warmth, never correctness.
81    fn clear_relations_cache(&self) {
82        self.relations_cache.write().unwrap().clear();
83    }
84
85    pub fn machine_id(&self) -> &str {
86        &self.machine_id
87    }
88
89    fn conn(&self) -> Result<Connection<'_>> {
90        Connection::new(&self.db).context("creating connection")
91    }
92
93    pub fn diagnostic(&self) -> Result<String> {
94        let conn = self.conn()?;
95
96        let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
97        let total: i64 = result
98            .next()
99            .map(|r| match &r[0] {
100                Value::Int64(v) => *v,
101                _ => -1,
102            })
103            .unwrap_or(-1);
104
105        let mut result = conn.query(
106            "MATCH (m:Memory) WHERE size(m.embedding) > 0 AND m.embedding[1] <> 0.0 RETURN count(m);",
107        )?;
108        let with_emb: i64 = result
109            .next()
110            .map(|r| match &r[0] {
111                Value::Int64(v) => *v,
112                _ => -1,
113            })
114            .unwrap_or(-1);
115
116        let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
117        let zeros_str = format!("[{}]", zeros.join(","));
118        let idx_query = format!(
119            "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, 1) YIELD node, distance RETURN distance;",
120            zeros_str
121        );
122        let idx_status = match conn.query(&idx_query) {
123            Ok(mut r) => match r.next() {
124                Some(row) => format!("ok (distance: {:?})", &row[0]),
125                None => "ok (0 results)".to_string(),
126            },
127            Err(e) => format!("error: {e}"),
128        };
129
130        Ok(format!(
131            "total memories: {total}\nwith embeddings: {with_emb}\nvector index: {idx_status}"
132        ))
133    }
134
135    fn init_schema(&self) -> Result<()> {
136        let conn = self.conn()?;
137
138        conn.query(
139            "CREATE NODE TABLE IF NOT EXISTS Memory(
140                id STRING PRIMARY KEY,
141                content STRING,
142                embedding FLOAT[384],
143                memory_type STRING,
144                confidence FLOAT,
145                created_at STRING,
146                last_accessed STRING,
147                access_count INT64,
148                source STRING,
149                source_id STRING,
150                project_path STRING
151            );",
152        )
153        .context("creating Memory table")?;
154
155        conn.query("ALTER TABLE Memory ADD project_path STRING DEFAULT '';").ok();
156
157        conn.query(
158            "CREATE NODE TABLE IF NOT EXISTS Machine(
159                id STRING PRIMARY KEY,
160                name STRING
161            );",
162        )
163        .context("creating Machine table")?;
164
165        conn.query("ALTER TABLE Memory ADD machine_id STRING DEFAULT '';").ok();
166
167        conn.query("ALTER TABLE Memory ADD updated_at STRING DEFAULT '';").ok();
168        conn.query("MATCH (m:Memory) WHERE m.updated_at = '' SET m.updated_at = m.created_at;").ok();
169
170        conn.query(
171            "CREATE NODE TABLE IF NOT EXISTS Entity(
172                id STRING PRIMARY KEY,
173                name STRING,
174                entity_type STRING,
175                embedding FLOAT[384],
176                aliases STRING[]
177            );",
178        )
179        .context("creating Entity table")?;
180
181        conn.query(
182            "CREATE NODE TABLE IF NOT EXISTS Conversation(
183                id STRING PRIMARY KEY,
184                source STRING,
185                machine_id STRING,
186                started_at STRING,
187                project_path STRING
188            );",
189        )
190        .context("creating Conversation table")?;
191
192        conn.query(
193            "CREATE NODE TABLE IF NOT EXISTS IngestLog(
194                file_path STRING PRIMARY KEY,
195                file_hash STRING,
196                ingested_at STRING,
197                memory_count INT64,
198                source STRING
199            );",
200        )
201        .context("creating IngestLog table")?;
202
203        conn.query(
204            "CREATE REL TABLE IF NOT EXISTS RELATES_TO(
205                FROM Memory TO Memory,
206                strength FLOAT,
207                context STRING
208            );",
209        )
210        .context("creating RELATES_TO rel")?;
211
212        conn.query(
213            "CREATE REL TABLE IF NOT EXISTS MENTIONS(
214                FROM Memory TO Entity,
215                position INT64
216            );",
217        )
218        .context("creating MENTIONS rel")?;
219
220        conn.query(
221            "CREATE REL TABLE IF NOT EXISTS DERIVED_FROM(
222                FROM Memory TO Conversation,
223                transformation STRING
224            );",
225        )
226        .context("creating DERIVED_FROM rel")?;
227
228        conn.query(
229            "CREATE REL TABLE IF NOT EXISTS DISTILLED_FROM(
230                FROM Memory TO Memory,
231                model STRING
232            );",
233        )
234        .context("creating DISTILLED_FROM rel")?;
235
236        conn.query(
237            "CREATE REL TABLE IF NOT EXISTS CONTRADICTS(
238                FROM Memory TO Memory,
239                resolution STRING
240            );",
241        )
242        .context("creating CONTRADICTS rel")?;
243
244        conn.query(
245            "CREATE REL TABLE IF NOT EXISTS REINFORCES(
246                FROM Memory TO Memory
247            );",
248        )
249        .context("creating REINFORCES rel")?;
250
251        conn.query(
252            "CREATE REL TABLE IF NOT EXISTS SUPERSEDES(
253                FROM Memory TO Memory,
254                reason STRING
255            );",
256        )
257        .context("creating SUPERSEDES rel")?;
258
259        self.migrate_sync_log(&conn).context("migrating SyncLog table")?;
260
261        conn.query(
262            "CREATE NODE TABLE IF NOT EXISTS SyncState(
263                peer_id STRING PRIMARY KEY,
264                last_seq INT64,
265                last_sync_at STRING
266            );",
267        )
268        .context("creating SyncState table")?;
269
270        conn.query(
271            "CREATE NODE TABLE IF NOT EXISTS ConsolidationLog(
272                memory_id STRING PRIMARY KEY,
273                distilled_id STRING,
274                consolidated_at STRING,
275                model STRING
276            );",
277        )
278        .context("creating ConsolidationLog table")?;
279
280        conn.query(
281            "CREATE NODE TABLE IF NOT EXISTS WikiExportLog(
282                id STRING PRIMARY KEY,
283                last_sync_seq INT64,
284                exported_at STRING,
285                vault_path STRING,
286                pages_created INT64,
287                pages_updated INT64,
288                memories_processed INT64
289            );",
290        )
291        .context("creating WikiExportLog table")?;
292
293        conn.query(
294            "CREATE NODE TABLE IF NOT EXISTS Tombstone(
295                node_id STRING PRIMARY KEY,
296                node_type STRING,
297                deleted_at STRING,
298                machine_id STRING);",
299        )
300        .context("creating Tombstone table")?;
301
302        conn.query(
303            "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
304        )
305        .ok();
306
307        Ok(())
308    }
309
310    fn migrate_sync_log(&self, conn: &Connection<'_>) -> Result<()> {
311        // Probe by column: a missing column or a missing table makes the query
312        // fail, so on a fresh DB both probes fail and control falls through to
313        // the CREATE below. We drop only when the pre-redesign `seq` schema is
314        // the one actually present.
315        let new_schema_present = conn
316            .query("MATCH (s:SyncLog) RETURN s.id LIMIT 1;")
317            .is_ok();
318        if !new_schema_present {
319            let old_schema_present = conn
320                .query("MATCH (s:SyncLog) RETURN s.seq LIMIT 1;")
321                .is_ok();
322            if old_schema_present {
323                conn.query("DROP TABLE SyncLog;")?;
324                conn.query("MATCH (s:SyncState) SET s.last_seq = 0;").ok();
325            }
326        }
327        conn.query(
328            "CREATE NODE TABLE IF NOT EXISTS SyncLog(
329                id STRING PRIMARY KEY,
330                local_seq INT64,
331                origin_machine_id STRING,
332                origin_seq INT64,
333                op STRING,
334                node_type STRING,
335                node_id STRING,
336                timestamp STRING,
337                data STRING
338            );",
339        )?;
340        Ok(())
341    }
342
343    fn with_transaction<T>(&self, f: impl FnOnce(&Connection<'_>) -> Result<T>) -> Result<T> {
344        let conn = self.conn()?;
345        conn.query("BEGIN TRANSACTION;")?;
346        match f(&conn) {
347            Ok(v) => {
348                conn.query("COMMIT;")?;
349                Ok(v)
350            }
351            Err(e) => {
352                let _ = conn.query("ROLLBACK;");
353                Err(e)
354            }
355        }
356    }
357
358    fn append_sync_log(
359        &self,
360        conn: &Connection<'_>,
361        op: SyncOp,
362        node_type: SyncNodeType,
363        node_id: &str,
364        data: Option<&str>,
365        origin_machine_id: &str,
366        origin_seq: Option<i64>,
367    ) -> Result<()> {
368        let mut r = conn.query("MATCH (s:SyncLog) RETURN max(s.local_seq);")?;
369        let local_seq: i64 = match r.next() {
370            Some(row) => match &row[0] {
371                Value::Int64(n) => n + 1,
372                _ => 1,
373            },
374            None => 1,
375        };
376        let origin_seq = origin_seq.unwrap_or(local_seq);
377        let id = format!("{}:{}", origin_machine_id, origin_seq);
378        let id_esc = escape_cypher(&id);
379        let mut dup = conn.query(&format!(
380            "MATCH (s:SyncLog {{id: '{id_esc}'}}) RETURN s.id LIMIT 1;"
381        ))?;
382        if dup.next().is_some() {
383            return Ok(());
384        }
385        let timestamp = chrono::Utc::now().to_rfc3339();
386        let op_str = match op {
387            SyncOp::Create => "create",
388            SyncOp::Update => "update",
389            SyncOp::Delete => "delete",
390        };
391        let nt_str = match node_type {
392            SyncNodeType::Memory => "memory",
393            SyncNodeType::Entity => "entity",
394            SyncNodeType::Conversation => "conversation",
395            SyncNodeType::Relation => "relation",
396        };
397        let origin_machine_esc = escape_cypher(origin_machine_id);
398        let node_id_esc = escape_cypher(node_id);
399        let data_esc = data.map(escape_cypher).unwrap_or_default();
400        conn.query(&format!(
401            "CREATE (:SyncLog {{id:'{id_esc}', local_seq:{local_seq}, \
402             origin_machine_id:'{origin_machine_esc}', origin_seq:{origin_seq}, \
403             op:'{op_str}', node_type:'{nt_str}', node_id:'{node_id_esc}', \
404             timestamp:'{timestamp}', data:'{data_esc}'}});"
405        ))?;
406        Ok(())
407    }
408
409    pub fn tombstone_exists(&self, id: Uuid) -> Result<bool> {
410        let conn = self.conn()?;
411        let id_esc = escape_cypher(&id.to_string());
412        let mut result = conn.query(&format!(
413            "MATCH (t:Tombstone {{node_id: '{id_esc}'}}) RETURN t.node_id LIMIT 1;"
414        ))?;
415        Ok(result.next().is_some())
416    }
417
418    fn upsert_tombstone(
419        &self,
420        conn: &Connection<'_>,
421        node_id: &str,
422        node_type: &str,
423        machine_id: &str,
424    ) -> Result<()> {
425        let node_id_esc = escape_cypher(node_id);
426        let node_type_esc = escape_cypher(node_type);
427        let machine_id_esc = escape_cypher(machine_id);
428        let deleted_at = chrono::Utc::now().to_rfc3339();
429        conn.query(&format!(
430            "MERGE (t:Tombstone {{node_id: '{node_id_esc}'}}) \
431             SET t.node_type = '{node_type_esc}', \
432                 t.deleted_at = '{deleted_at}', \
433                 t.machine_id = '{machine_id_esc}';"
434        ))?;
435        Ok(())
436    }
437}
438
439impl Store for KuzuStore {
440    fn store_memory(&self, memory: &Memory) -> Result<()> {
441        let data = serde_json::to_string(memory).ok();
442        self.with_transaction(|conn| {
443            self.create_memory_node(conn, memory)?;
444            self.append_sync_log(
445                conn,
446                SyncOp::Create,
447                SyncNodeType::Memory,
448                &memory.id.to_string(),
449                data.as_deref(),
450                &self.machine_id,
451                None,
452            )
453        })
454    }
455
456    fn get_memory(&self, id: Uuid) -> Result<Option<Memory>> {
457        let conn = self.conn()?;
458        let query = format!(
459            "MATCH (m:Memory {{id: '{}'}}) RETURN {};",
460            id, memory_return_cols("m")
461        );
462
463        let mut result = conn.query(&query)?;
464        match result.next() {
465            Some(row) => Ok(Some(row_to_memory(&row)?)),
466            None => Ok(None),
467        }
468    }
469
470    fn delete_memory(&self, id: Uuid) -> Result<()> {
471        let id_string = id.to_string();
472        let result = self.with_transaction(|conn| {
473            conn.query(&format!(
474                "MATCH (m:Memory {{id: '{id_string}'}}) DETACH DELETE m;"
475            ))?;
476            self.upsert_tombstone(conn, &id_string, "memory", &self.machine_id)?;
477            self.append_sync_log(
478                conn,
479                SyncOp::Delete,
480                SyncNodeType::Memory,
481                &id_string,
482                None,
483                &self.machine_id,
484                None,
485            )
486        });
487        // DETACH DELETE drops this memory's outgoing edges; its cached relations are
488        // now stale. Edges where this memory was the target also vanish, but those
489        // keys belong to other source ids; a delete is rare enough that a full clear
490        // is the safe choice to avoid serving a stale neighbor's relation list.
491        self.clear_relations_cache();
492        result
493    }
494
495    fn store_entity(&self, entity: &Entity) -> Result<()> {
496        let data = serde_json::to_string(entity).ok();
497        self.with_transaction(|conn| {
498            self.create_entity_node(conn, entity)?;
499            self.append_sync_log(
500                conn,
501                SyncOp::Create,
502                SyncNodeType::Entity,
503                &entity.id.to_string(),
504                data.as_deref(),
505                &self.machine_id,
506                None,
507            )
508        })
509    }
510
511    fn get_entity(&self, id: Uuid) -> Result<Option<Entity>> {
512        let conn = self.conn()?;
513        let mut result = conn.query(&format!(
514            "MATCH (e:Entity {{id: '{}'}}) RETURN e.id, e.name, e.entity_type;",
515            id
516        ))?;
517
518        match result.next() {
519            Some(row) => Ok(Some(row_to_entity(&row)?)),
520            None => Ok(None),
521        }
522    }
523
524    fn find_entity_by_name(&self, name: &str) -> Result<Option<Entity>> {
525        let conn = self.conn()?;
526        let name_escaped = escape_cypher(name);
527        let query = format!(
528            "MATCH (e:Entity) WHERE e.name = '{}' RETURN e.id, e.name, e.entity_type;",
529            name_escaped
530        );
531        let mut result = conn.query(&query)?;
532
533        match result.next() {
534            Some(row) => Ok(Some(row_to_entity(&row)?)),
535            None => Ok(None),
536        }
537    }
538
539    fn store_conversation(&self, conversation: &Conversation) -> Result<()> {
540        let data = serde_json::to_string(conversation).ok();
541        self.with_transaction(|conn| {
542            self.create_conversation_node(conn, conversation)?;
543            self.append_sync_log(
544                conn,
545                SyncOp::Create,
546                SyncNodeType::Conversation,
547                &conversation.id.to_string(),
548                data.as_deref(),
549                &self.machine_id,
550                None,
551            )
552        })
553    }
554
555    fn store_relation(&self, relation: &Relation) -> Result<()> {
556        let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
557        let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
558        let data = serde_json::to_string(relation).ok();
559        let result = self.with_transaction(|conn| {
560            self.create_relation_edge(conn, relation)?;
561            self.append_sync_log(
562                conn,
563                SyncOp::Create,
564                SyncNodeType::Relation,
565                &node_id,
566                data.as_deref(),
567                &self.machine_id,
568                None,
569            )
570        });
571        // get_relations keys on the source (from_id) only, so evicting that id is the
572        // exact and minimal invalidation for a new outgoing edge.
573        self.evict_relations_for(relation.from_id);
574        result
575    }
576
577    fn get_relations(
578        &self,
579        node_id: Uuid,
580        relation_type: Option<RelationType>,
581    ) -> Result<Vec<Relation>> {
582        let cache_key = (node_id, relation_type);
583        if let Some(hit) = self.relations_cache.read().unwrap().get(&cache_key) {
584            return Ok(hit.clone());
585        }
586
587        let conn = self.conn()?;
588        let id = node_id.to_string();
589
590        let query = match relation_type {
591            Some(RelationType::RelatesTo) => format!(
592                "MATCH (a:Memory {{id: '{}'}})-[r:RELATES_TO]->(b:Memory) RETURN b.id, r.strength, r.context;",
593                id
594            ),
595            Some(RelationType::Contradicts) => format!(
596                "MATCH (a:Memory {{id: '{}'}})-[r:CONTRADICTS]->(b:Memory) RETURN b.id, r.resolution;",
597                id
598            ),
599            Some(RelationType::Reinforces) => format!(
600                "MATCH (a:Memory {{id: '{}'}})-[:REINFORCES]->(b:Memory) RETURN b.id;",
601                id
602            ),
603            Some(RelationType::Supersedes) => format!(
604                "MATCH (a:Memory {{id: '{}'}})-[r:SUPERSEDES]->(b:Memory) RETURN b.id, r.reason;",
605                id
606            ),
607            Some(RelationType::Mentions) => format!(
608                "MATCH (a:Memory {{id: '{}'}})-[:MENTIONS]->(b:Entity) RETURN b.id;",
609                id
610            ),
611            Some(RelationType::DerivedFrom) => format!(
612                "MATCH (a:Memory {{id: '{}'}})-[:DERIVED_FROM]->(b:Conversation) RETURN b.id;",
613                id
614            ),
615            Some(RelationType::DistilledFrom) => format!(
616                "MATCH (a:Memory {{id: '{}'}})-[:DISTILLED_FROM]->(b:Memory) RETURN b.id;",
617                id
618            ),
619            None => {
620                let mut relations = Vec::new();
621                let mem_to_mem = [
622                    ("RELATES_TO", RelationType::RelatesTo),
623                    ("REINFORCES", RelationType::Reinforces),
624                    ("CONTRADICTS", RelationType::Contradicts),
625                    ("SUPERSEDES", RelationType::Supersedes),
626                    ("DISTILLED_FROM", RelationType::DistilledFrom),
627                ];
628                for (label, rt) in &mem_to_mem {
629                    let q = format!(
630                        "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:Memory) RETURN b.id;",
631                        id, label
632                    );
633                    if let Ok(mut result) = conn.query(&q) {
634                        for row in &mut result {
635                            let to_id_str = value_to_string(&row[0]);
636                            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
637                            relations.push(Relation {
638                                from_id: node_id,
639                                to_id,
640                                relation_type: *rt,
641                                strength: 1.0,
642                                context: None,
643                            });
644                        }
645                    }
646                }
647                for (label, target, rt) in [
648                    ("MENTIONS", "Entity", RelationType::Mentions),
649                    ("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
650                ] {
651                    let q = format!(
652                        "MATCH (a:Memory {{id: '{}'}})-[:{}]->(b:{}) RETURN b.id;",
653                        id, label, target
654                    );
655                    if let Ok(mut result) = conn.query(&q) {
656                        for row in &mut result {
657                            let to_id_str = value_to_string(&row[0]);
658                            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
659                            relations.push(Relation {
660                                from_id: node_id,
661                                to_id,
662                                relation_type: rt,
663                                strength: 1.0,
664                                context: None,
665                            });
666                        }
667                    }
668                }
669                self.relations_cache
670                    .write()
671                    .unwrap()
672                    .insert(cache_key, relations.clone());
673                return Ok(relations);
674            }
675        };
676
677        let mut result = conn.query(&query)?;
678        let mut relations = Vec::new();
679
680        for row in &mut result {
681            let to_id_str = value_to_string(&row[0]);
682            let to_id = Uuid::parse_str(&to_id_str).unwrap_or_default();
683
684            relations.push(Relation {
685                from_id: node_id,
686                to_id,
687                relation_type: relation_type.unwrap_or(RelationType::RelatesTo),
688                strength: 1.0,
689                context: None,
690            });
691        }
692
693        self.relations_cache
694            .write()
695            .unwrap()
696            .insert(cache_key, relations.clone());
697        Ok(relations)
698    }
699
700    fn vector_search(&self, embedding: &[f32], limit: usize) -> Result<Vec<(Memory, f32)>> {
701        let conn = self.conn()?;
702        let embedding_str = format_embedding(embedding);
703
704        let query = format!(
705            "CALL QUERY_VECTOR_INDEX('Memory', 'memory_emb_idx', {}, {}) YIELD node, distance
706             RETURN {}, distance;",
707            embedding_str, limit, memory_return_cols("node")
708        );
709
710        let mut result = conn.query(&query)?;
711        let mut results = Vec::new();
712
713        for row in &mut result {
714            let memory = row_to_memory(&row[..12])?;
715            let distance = value_to_f32(&row[12]);
716            let similarity = 1.0 - distance;
717            results.push((memory, similarity));
718        }
719
720        Ok(results)
721    }
722
723    fn traverse(&self, start_id: Uuid, depth: u32) -> Result<Vec<(Memory, Vec<Relation>)>> {
724        let conn = self.conn()?;
725        let query = format!(
726            "MATCH (start:Memory {{id: '{}'}})-[r:RELATES_TO|REINFORCES*1..{}]->(m:Memory)
727             RETURN DISTINCT {};",
728            start_id, depth, memory_return_cols("m")
729        );
730
731        let mut result = conn.query(&query)?;
732        let mut results = Vec::new();
733
734        for row in &mut result {
735            let memory = row_to_memory(&row)?;
736            results.push((memory, Vec::new()));
737        }
738
739        Ok(results)
740    }
741
742    fn memories_by_source(&self, source: &str) -> Result<Vec<Memory>> {
743        let conn = self.conn()?;
744        let source_escaped = escape_cypher(source);
745        let query = format!(
746            "MATCH (m:Memory) WHERE m.source = '{}' RETURN {};",
747            source_escaped, memory_return_cols("m")
748        );
749        let mut result = conn.query(&query)?;
750
751        let mut memories = Vec::new();
752        for row in &mut result {
753            memories.push(row_to_memory(&row)?);
754        }
755        Ok(memories)
756    }
757
758    fn memories_by_type(&self, memory_type: MemoryType) -> Result<Vec<Memory>> {
759        let conn = self.conn()?;
760        let type_str = format!("{:?}", memory_type).to_lowercase();
761        let query = format!(
762            "MATCH (m:Memory) WHERE m.memory_type = '{}' RETURN {};",
763            type_str, memory_return_cols("m")
764        );
765        let mut result = conn.query(&query)?;
766
767        let mut memories = Vec::new();
768        for row in &mut result {
769            memories.push(row_to_memory(&row)?);
770        }
771        Ok(memories)
772    }
773
774    fn memories_needing_decay(&self, threshold_days: u32) -> Result<Vec<Memory>> {
775        let conn = self.conn()?;
776        let cutoff = chrono::Utc::now() - chrono::Duration::days(threshold_days as i64);
777        let cutoff_str = cutoff.to_rfc3339();
778
779        let query = format!(
780            "MATCH (m:Memory) WHERE m.last_accessed < '{}' AND m.confidence > 0.05 RETURN {};",
781            cutoff_str, memory_return_cols("m")
782        );
783
784        let mut result = conn.query(&query)?;
785        let mut memories = Vec::new();
786        for row in &mut result {
787            memories.push(row_to_memory(&row)?);
788        }
789        Ok(memories)
790    }
791
792    fn update_memory(&self, memory: &Memory) -> Result<()> {
793        let id = memory.id.to_string();
794        let last_accessed = memory.last_accessed.to_rfc3339();
795        let updated_at = memory.updated_at.to_rfc3339();
796        let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
797        let machine_id_escaped = escape_cypher(&memory.machine_id);
798        let data = serde_json::to_string(memory).ok();
799        self.with_transaction(|conn| {
800            let query = format!(
801                "MATCH (m:Memory {{id: '{}'}}) SET m.confidence = {}, m.last_accessed = '{}', m.access_count = {}, m.project_path = '{}', m.updated_at = '{}', m.machine_id = '{}';",
802                id, memory.confidence, last_accessed, memory.access_count, project_path_escaped, updated_at, machine_id_escaped
803            );
804            conn.query(&query)?;
805            self.append_sync_log(
806                conn,
807                SyncOp::Update,
808                SyncNodeType::Memory,
809                &id,
810                data.as_deref(),
811                &self.machine_id,
812                None,
813            )
814        })
815    }
816
817    fn record_access(&self, memory: &Memory) -> Result<()> {
818        let id_esc = escape_cypher(&memory.id.to_string());
819        let last_accessed = memory.last_accessed.to_rfc3339();
820        let last_accessed_esc = escape_cypher(&last_accessed);
821        let conn = self.conn()?;
822        conn.query(&format!(
823            "MATCH (m:Memory {{id: '{id_esc}'}}) SET \
824             m.last_accessed = '{last_accessed_esc}', \
825             m.access_count = {}, \
826             m.confidence = {};",
827            memory.access_count, memory.confidence
828        ))?;
829        Ok(())
830    }
831
832    fn text_search(&self, query: &str, limit: usize) -> Result<Vec<Memory>> {
833        let conn = self.conn()?;
834        let query_escaped = escape_cypher(&query.to_lowercase());
835        let cypher = format!(
836            "MATCH (m:Memory) WHERE lower(m.content) CONTAINS '{}' RETURN {} LIMIT {};",
837            query_escaped, memory_return_cols("m"), limit
838        );
839
840        let mut result = conn.query(&cypher)?;
841        let mut memories = Vec::new();
842        for row in &mut result {
843            memories.push(row_to_memory(&row)?);
844        }
845        Ok(memories)
846    }
847
848    fn memory_count(&self) -> Result<usize> {
849        let conn = self.conn()?;
850        let mut result = conn.query("MATCH (m:Memory) RETURN count(m);")?;
851        match result.next() {
852            Some(row) => match &row[0] {
853                Value::Int64(n) => Ok(*n as usize),
854                _ => Ok(0),
855            },
856            None => Ok(0),
857        }
858    }
859
860    fn all_memory_ids(&self) -> Result<Vec<Uuid>> {
861        let conn = self.conn()?;
862        let mut result = conn.query("MATCH (m:Memory) RETURN m.id;")?;
863        let mut ids = Vec::new();
864        for row in &mut result {
865            let id_str = value_to_string(&row[0]);
866            ids.push(Uuid::parse_str(&id_str).unwrap_or_default());
867        }
868        Ok(ids)
869    }
870
871    fn all_relations(&self) -> Result<Vec<Relation>> {
872        let conn = self.conn()?;
873        let mut relations = Vec::new();
874
875        let mem_to_mem = [
876            ("RELATES_TO", RelationType::RelatesTo),
877            ("REINFORCES", RelationType::Reinforces),
878            ("CONTRADICTS", RelationType::Contradicts),
879            ("SUPERSEDES", RelationType::Supersedes),
880            ("DISTILLED_FROM", RelationType::DistilledFrom),
881        ];
882        for (label, rt) in &mem_to_mem {
883            let q = format!(
884                "MATCH (a:Memory)-[:{}]->(b:Memory) RETURN a.id, b.id;",
885                label
886            );
887            let mut result = conn.query(&q)?;
888            for row in &mut result {
889                let from_id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
890                let to_id = Uuid::parse_str(&value_to_string(&row[1])).unwrap_or_default();
891                relations.push(Relation {
892                    from_id,
893                    to_id,
894                    relation_type: *rt,
895                    strength: 1.0,
896                    context: None,
897                });
898            }
899        }
900
901        for (label, target, rt) in [
902            ("MENTIONS", "Entity", RelationType::Mentions),
903            ("DERIVED_FROM", "Conversation", RelationType::DerivedFrom),
904        ] {
905            let q = format!(
906                "MATCH (a:Memory)-[:{}]->(b:{}) RETURN a.id, b.id;",
907                label, target
908            );
909            let mut result = conn.query(&q)?;
910            for row in &mut result {
911                let from_id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
912                let to_id = Uuid::parse_str(&value_to_string(&row[1])).unwrap_or_default();
913                relations.push(Relation {
914                    from_id,
915                    to_id,
916                    relation_type: rt,
917                    strength: 1.0,
918                    context: None,
919                });
920            }
921        }
922
923        Ok(relations)
924    }
925}
926
927impl KuzuStore {
928    pub fn find_or_create_entity(&self, name: &str, entity_type: &str) -> Result<Entity> {
929        if let Some(existing) = self.find_entity_by_name(name)? {
930            return Ok(existing);
931        }
932        let entity = Entity::new(name.to_string(), entity_type.to_string());
933        self.store_entity(&entity)?;
934        Ok(entity)
935    }
936
937    pub fn memory_content_exists(&self, content_prefix: &str) -> Result<bool> {
938        let conn = self.conn()?;
939        let escaped = escape_cypher(content_prefix);
940        let query = format!(
941            "MATCH (m:Memory) WHERE starts_with(m.content, '{}') RETURN m.id LIMIT 1;",
942            escaped
943        );
944        let mut result = conn.query(&query)?;
945        Ok(result.next().is_some())
946    }
947
948    pub fn unconsolidated_memories(&self, limit: usize) -> Result<Vec<Memory>> {
949        let conn = self.conn()?;
950        let query = format!(
951            "MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (c:ConsolidationLog) WHERE c.memory_id = m.id}} AND m.source <> 'consolidation' RETURN {} LIMIT {};",
952            memory_return_cols("m"), limit
953        );
954        let mut result = conn.query(&query)?;
955        let mut memories = Vec::new();
956        for row in &mut result {
957            memories.push(row_to_memory(&row)?);
958        }
959        Ok(memories)
960    }
961
962    pub fn memories_created_between(
963        &self,
964        start: &DateTime<Utc>,
965        end: &DateTime<Utc>,
966    ) -> Result<Vec<Memory>> {
967        let conn = self.conn()?;
968        let query = format!(
969            "MATCH (m:Memory) WHERE m.created_at >= '{}' AND m.created_at < '{}' RETURN {} ORDER BY m.created_at ASC;",
970            start.to_rfc3339(),
971            end.to_rfc3339(),
972            memory_return_cols("m")
973        );
974        let mut result = conn.query(&query)?;
975        let mut memories = Vec::new();
976        for row in &mut result {
977            memories.push(row_to_memory(&row)?);
978        }
979        Ok(memories)
980    }
981
982    pub fn mark_consolidated(&self, raw_id: Uuid, distilled_id: Uuid, model: &str) -> Result<()> {
983        let conn = self.conn()?;
984        let now = Utc::now().to_rfc3339();
985        let query = format!(
986            "CREATE (:ConsolidationLog {{memory_id: '{}', distilled_id: '{}', consolidated_at: '{}', model: '{}'}});",
987            raw_id,
988            distilled_id,
989            now,
990            escape_cypher(model)
991        );
992        conn.query(&query)?;
993        Ok(())
994    }
995
996    pub fn consolidation_count(&self) -> Result<usize> {
997        let conn = self.conn()?;
998        let mut result = conn.query("MATCH (c:ConsolidationLog) RETURN count(c);")?;
999        match result.next() {
1000            Some(row) => match &row[0] {
1001                Value::Int64(n) => Ok(*n as usize),
1002                _ => Ok(0),
1003            },
1004            None => Ok(0),
1005        }
1006    }
1007
1008    pub fn delete_consolidated_raw(&self) -> Result<usize> {
1009        let ids: Vec<Uuid> = {
1010            let conn = self.conn()?;
1011            let result = conn.query(
1012                "MATCH (c:ConsolidationLog) WHERE c.memory_id <> c.distilled_id RETURN c.memory_id;",
1013            )?;
1014            let mut acc = Vec::new();
1015            for row in result {
1016                if let Value::String(id) = &row[0]
1017                    && let Ok(uuid) = Uuid::parse_str(id)
1018                {
1019                    acc.push(uuid);
1020                }
1021            }
1022            acc
1023        };
1024        let count = ids.len();
1025        let machine_id = self.machine_id.clone();
1026        for id in &ids {
1027            let id_str = id.to_string();
1028            let mid = machine_id.clone();
1029            self.with_transaction(|conn| {
1030                conn.query(&format!(
1031                    "MATCH (m:Memory {{id: '{id_str}'}}) DETACH DELETE m;"
1032                ))?;
1033                self.upsert_tombstone(conn, &id_str, "memory", &mid)?;
1034                self.append_sync_log(
1035                    conn,
1036                    SyncOp::Delete,
1037                    SyncNodeType::Memory,
1038                    &id_str,
1039                    None,
1040                    &mid,
1041                    None,
1042                )
1043            })?;
1044        }
1045        if count > 0 {
1046            self.clear_relations_cache();
1047        }
1048        Ok(count)
1049    }
1050
1051    pub fn rebuild_vector_index(&self) -> Result<()> {
1052        let conn = self.conn()?;
1053        conn.query("CALL DROP_VECTOR_INDEX('Memory', 'memory_emb_idx');")
1054            .ok();
1055        conn.query(
1056            "CALL CREATE_VECTOR_INDEX('Memory', 'memory_emb_idx', 'embedding', metric := 'cosine');",
1057        )?;
1058        Ok(())
1059    }
1060
1061    pub fn clear_ingest_log(&self) -> Result<usize> {
1062        let conn = self.conn()?;
1063        let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
1064        let count = match result.next() {
1065            Some(row) => match &row[0] {
1066                Value::Int64(n) => *n as usize,
1067                _ => 0,
1068            },
1069            None => 0,
1070        };
1071        conn.query("MATCH (l:IngestLog) DETACH DELETE l;")?;
1072        Ok(count)
1073    }
1074
1075    pub fn delete_memories_by_source(&self, source: &str) -> Result<usize> {
1076        let conn = self.conn()?;
1077        let escaped = escape_cypher(source);
1078        let mut result = conn.query(&format!(
1079            "MATCH (m:Memory) WHERE m.source = '{}' RETURN count(m);",
1080            escaped
1081        ))?;
1082        let count = match result.next() {
1083            Some(row) => match &row[0] {
1084                Value::Int64(n) => *n as usize,
1085                _ => 0,
1086            },
1087            None => 0,
1088        };
1089        conn.query(&format!(
1090            "MATCH (m:Memory) WHERE m.source = '{}' DETACH DELETE m;",
1091            escaped
1092        ))?;
1093        if count > 0 {
1094            self.clear_relations_cache();
1095        }
1096        Ok(count)
1097    }
1098
1099    pub fn is_file_ingested(&self, file_path: &str) -> Result<bool> {
1100        let conn = self.conn()?;
1101        let escaped = escape_cypher(file_path);
1102        let mut result = conn.query(&format!(
1103            "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_path;",
1104            escaped
1105        ))?;
1106        Ok(result.next().is_some())
1107    }
1108
1109    pub fn is_file_changed(&self, file_path: &str, file_hash: &str) -> Result<bool> {
1110        let conn = self.conn()?;
1111        let escaped = escape_cypher(file_path);
1112        let mut result = conn.query(&format!(
1113            "MATCH (l:IngestLog {{file_path: '{}'}}) RETURN l.file_hash;",
1114            escaped
1115        ))?;
1116        match result.next() {
1117            Some(row) => {
1118                let stored_hash = value_to_string(&row[0]);
1119                Ok(stored_hash != file_hash)
1120            }
1121            None => Ok(true),
1122        }
1123    }
1124
1125    pub fn mark_ingested(
1126        &self,
1127        file_path: &str,
1128        file_hash: &str,
1129        memory_count: usize,
1130        source: &str,
1131    ) -> Result<()> {
1132        let conn = self.conn()?;
1133        let path_escaped = escape_cypher(file_path);
1134        let hash_escaped = escape_cypher(file_hash);
1135        let source_escaped = escape_cypher(source);
1136        let now = chrono::Utc::now().to_rfc3339();
1137
1138        conn.query(&format!(
1139            "MERGE (l:IngestLog {{file_path: '{}'}})
1140             SET l.file_hash = '{}', l.ingested_at = '{}', l.memory_count = {}, l.source = '{}';",
1141            path_escaped, hash_escaped, now, memory_count as i64, source_escaped
1142        ))?;
1143        Ok(())
1144    }
1145
1146    pub fn ingested_file_count(&self) -> Result<usize> {
1147        let conn = self.conn()?;
1148        let mut result = conn.query("MATCH (l:IngestLog) RETURN count(l);")?;
1149        match result.next() {
1150            Some(row) => match &row[0] {
1151                Value::Int64(n) => Ok(*n as usize),
1152                _ => Ok(0),
1153            },
1154            None => Ok(0),
1155        }
1156    }
1157}
1158
1159impl KuzuStore {
1160    fn create_memory_node(&self, conn: &Connection<'_>, memory: &Memory) -> Result<()> {
1161        let id = memory.id.to_string();
1162        let memory_type = format!("{:?}", memory.memory_type).to_lowercase();
1163        let created_at = memory.created_at.to_rfc3339();
1164        let last_accessed = memory.last_accessed.to_rfc3339();
1165        let embedding_str = format_embedding(&memory.embedding);
1166        let content_escaped = escape_cypher(&memory.content);
1167        let source_escaped = escape_cypher(&memory.source);
1168        let source_id_escaped = escape_cypher(&memory.source_id);
1169        let project_path_escaped = escape_cypher(memory.project_path.as_deref().unwrap_or(""));
1170        let machine_id_escaped = escape_cypher(&memory.machine_id);
1171        let updated_at = memory.updated_at.to_rfc3339();
1172
1173        let query = format!(
1174            "CREATE (:Memory {{
1175                id: '{id}',
1176                content: '{content_escaped}',
1177                embedding: {embedding_str},
1178                memory_type: '{memory_type}',
1179                confidence: {confidence},
1180                created_at: '{created_at}',
1181                last_accessed: '{last_accessed}',
1182                access_count: {access_count},
1183                source: '{source_escaped}',
1184                source_id: '{source_id_escaped}',
1185                project_path: '{project_path_escaped}',
1186                machine_id: '{machine_id_escaped}',
1187                updated_at: '{updated_at}'
1188            }});",
1189            confidence = memory.confidence,
1190            access_count = memory.access_count,
1191        );
1192
1193        conn.query(&query)?;
1194        Ok(())
1195    }
1196
1197    fn create_entity_node(&self, conn: &Connection<'_>, entity: &Entity) -> Result<()> {
1198        let id = entity.id.to_string();
1199        let embedding_str = format_embedding(&entity.embedding);
1200        let aliases_str = format_string_array(&entity.aliases);
1201        let name_escaped = escape_cypher(&entity.name);
1202        let etype_escaped = escape_cypher(&entity.entity_type);
1203
1204        let query = format!(
1205            "CREATE (:Entity {{
1206                id: '{id}',
1207                name: '{name_escaped}',
1208                entity_type: '{etype_escaped}',
1209                embedding: {embedding_str},
1210                aliases: {aliases_str}
1211            }});"
1212        );
1213
1214        conn.query(&query)?;
1215        Ok(())
1216    }
1217
1218    fn create_conversation_node(
1219        &self,
1220        conn: &Connection<'_>,
1221        conversation: &Conversation,
1222    ) -> Result<()> {
1223        let id = conversation.id.to_string();
1224        let started_at = conversation.started_at.to_rfc3339();
1225        let source_escaped = escape_cypher(&conversation.source);
1226        let machine_escaped = escape_cypher(&conversation.machine_id);
1227        let project_escaped = escape_cypher(conversation.project_path.as_deref().unwrap_or(""));
1228
1229        let query = format!(
1230            "CREATE (:Conversation {{
1231                id: '{id}',
1232                source: '{source_escaped}',
1233                machine_id: '{machine_escaped}',
1234                started_at: '{started_at}',
1235                project_path: '{project_escaped}'
1236            }});"
1237        );
1238
1239        conn.query(&query)?;
1240        Ok(())
1241    }
1242
1243    fn create_relation_edge(&self, conn: &Connection<'_>, relation: &Relation) -> Result<()> {
1244        let from_id = relation.from_id.to_string();
1245        let to_id = relation.to_id.to_string();
1246
1247        let query = match relation.relation_type {
1248            RelationType::RelatesTo => format!(
1249                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:RELATES_TO {{strength: {}, context: '{}'}}]->(b);",
1250                from_id,
1251                to_id,
1252                relation.strength,
1253                relation.context.as_deref().unwrap_or("")
1254            ),
1255            RelationType::Mentions => format!(
1256                "MATCH (a:Memory {{id: '{}'}}), (b:Entity {{id: '{}'}}) CREATE (a)-[:MENTIONS {{position: 0}}]->(b);",
1257                from_id, to_id
1258            ),
1259            RelationType::DerivedFrom => format!(
1260                "MATCH (a:Memory {{id: '{}'}}), (b:Conversation {{id: '{}'}}) CREATE (a)-[:DERIVED_FROM {{transformation: '{}'}}]->(b);",
1261                from_id,
1262                to_id,
1263                relation.context.as_deref().unwrap_or("direct")
1264            ),
1265            RelationType::Contradicts => format!(
1266                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:CONTRADICTS {{resolution: '{}'}}]->(b);",
1267                from_id,
1268                to_id,
1269                relation.context.as_deref().unwrap_or("")
1270            ),
1271            RelationType::Reinforces => format!(
1272                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:REINFORCES]->(b);",
1273                from_id, to_id
1274            ),
1275            RelationType::DistilledFrom => format!(
1276                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:DISTILLED_FROM {{model: '{}'}}]->(b);",
1277                from_id,
1278                to_id,
1279                relation.context.as_deref().unwrap_or("")
1280            ),
1281            RelationType::Supersedes => format!(
1282                "MATCH (a:Memory {{id: '{}'}}), (b:Memory {{id: '{}'}}) CREATE (a)-[:SUPERSEDES {{reason: '{}'}}]->(b);",
1283                from_id,
1284                to_id,
1285                relation.context.as_deref().unwrap_or("")
1286            ),
1287        };
1288
1289        conn.query(&query)?;
1290        Ok(())
1291    }
1292}
1293
1294impl KuzuStore {
1295    pub fn get_conversation(&self, id: Uuid) -> Result<Option<Conversation>> {
1296        let conn = self.conn()?;
1297        let mut result = conn.query(&format!(
1298            "MATCH (c:Conversation {{id: '{}'}}) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
1299            id
1300        ))?;
1301
1302        match result.next() {
1303            Some(row) => {
1304                let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
1305                let source = value_to_string(&row[1]);
1306                let machine_id = value_to_string(&row[2]);
1307                let started_at_str = value_to_string(&row[3]);
1308                let project_path = {
1309                    let s = value_to_string(&row[4]);
1310                    if s.is_empty() { None } else { Some(s) }
1311                };
1312                let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
1313                    .map(|dt| dt.with_timezone(&chrono::Utc))
1314                    .unwrap_or_else(|_| chrono::Utc::now());
1315
1316                Ok(Some(Conversation {
1317                    id,
1318                    source,
1319                    machine_id,
1320                    started_at,
1321                    project_path,
1322                }))
1323            }
1324            None => Ok(None),
1325        }
1326    }
1327
1328    pub fn get_memory_with_embedding(&self, id: Uuid) -> Result<Option<Memory>> {
1329        let conn = self.conn()?;
1330        let query = format!(
1331            "MATCH (m:Memory {{id: '{}'}}) RETURN {}, m.embedding;",
1332            id, memory_return_cols("m")
1333        );
1334
1335        let mut result = conn.query(&query)?;
1336        match result.next() {
1337            Some(row) => {
1338                let mut memory = row_to_memory(&row[..12])?;
1339                memory.embedding = value_to_f32_vec(&row[12]);
1340                Ok(Some(memory))
1341            }
1342            None => Ok(None),
1343        }
1344    }
1345
1346    pub fn sync_log_since(&self, after_seq: u64) -> Result<Vec<SyncEntry>> {
1347        self.sync_log_page(after_seq, None)
1348    }
1349
1350    fn tombstone_exists_on(&self, conn: &Connection<'_>, id: Uuid) -> Result<bool> {
1351        let id_esc = escape_cypher(&id.to_string());
1352        let mut result = conn.query(&format!(
1353            "MATCH (t:Tombstone {{node_id: '{id_esc}'}}) RETURN t.node_id LIMIT 1;"
1354        ))?;
1355        Ok(result.next().is_some())
1356    }
1357
1358    fn normalize_incoming(mem: &mut Memory) {
1359        if mem.updated_at == DateTime::UNIX_EPOCH {
1360            mem.updated_at = mem.created_at;
1361        }
1362    }
1363
1364    pub fn apply_create_memory(
1365        &self,
1366        mem: &Memory,
1367        origin_machine_id: &str,
1368        origin_seq: i64,
1369    ) -> Result<ApplyOutcome> {
1370        let mut mem = mem.clone();
1371        Self::normalize_incoming(&mut mem);
1372        let data = serde_json::to_string(&mem).ok();
1373        let node_id = mem.id.to_string();
1374        self.with_transaction(|conn| {
1375            if self.tombstone_exists_on(conn, mem.id)? {
1376                return Ok(ApplyOutcome::Skipped);
1377            }
1378            if self.get_memory(mem.id)?.is_some() {
1379                return Ok(ApplyOutcome::Skipped);
1380            }
1381            self.create_memory_node(conn, &mem)?;
1382            self.append_sync_log(
1383                conn,
1384                SyncOp::Create,
1385                SyncNodeType::Memory,
1386                &node_id,
1387                data.as_deref(),
1388                origin_machine_id,
1389                Some(origin_seq),
1390            )?;
1391            Ok(ApplyOutcome::Created)
1392        })
1393    }
1394
1395    // Holds a single write transaction open for the whole closure so a batch of
1396    // creates commits with one fsync instead of one per record. KuzuDB is single
1397    // writer and forbids nested transactions, so the closure must not call any
1398    // method that opens its own transaction; it may only use the `*_on_tx` apply
1399    // helpers and read-only lookups (which run on separate read connections and
1400    // see committed state, matching the per-record skip semantics).
1401    pub fn with_write_transaction<T>(
1402        &self,
1403        f: impl FnOnce(&Connection<'_>) -> Result<T>,
1404    ) -> Result<T> {
1405        self.with_transaction(f)
1406    }
1407
1408    pub fn apply_create_memory_on_tx(
1409        &self,
1410        conn: &Connection<'_>,
1411        mem: &Memory,
1412        origin_machine_id: &str,
1413        origin_seq: i64,
1414    ) -> Result<ApplyOutcome> {
1415        let mut mem = mem.clone();
1416        Self::normalize_incoming(&mut mem);
1417        let data = serde_json::to_string(&mem).ok();
1418        let node_id = mem.id.to_string();
1419        if self.tombstone_exists_on(conn, mem.id)? {
1420            return Ok(ApplyOutcome::Skipped);
1421        }
1422        if self.get_memory(mem.id)?.is_some() {
1423            return Ok(ApplyOutcome::Skipped);
1424        }
1425        self.create_memory_node(conn, &mem)?;
1426        self.append_sync_log(
1427            conn,
1428            SyncOp::Create,
1429            SyncNodeType::Memory,
1430            &node_id,
1431            data.as_deref(),
1432            origin_machine_id,
1433            Some(origin_seq),
1434        )?;
1435        Ok(ApplyOutcome::Created)
1436    }
1437
1438    pub fn apply_create_conversation_on_tx(
1439        &self,
1440        conn: &Connection<'_>,
1441        conv: &Conversation,
1442        origin_machine_id: &str,
1443        origin_seq: i64,
1444    ) -> Result<ApplyOutcome> {
1445        let data = serde_json::to_string(conv).ok();
1446        let node_id = conv.id.to_string();
1447        if self.get_conversation(conv.id)?.is_some() {
1448            return Ok(ApplyOutcome::Skipped);
1449        }
1450        self.create_conversation_node(conn, conv)?;
1451        self.append_sync_log(
1452            conn,
1453            SyncOp::Create,
1454            SyncNodeType::Conversation,
1455            &node_id,
1456            data.as_deref(),
1457            origin_machine_id,
1458            Some(origin_seq),
1459        )?;
1460        Ok(ApplyOutcome::Created)
1461    }
1462
1463    pub fn apply_create_entity_on_tx(
1464        &self,
1465        conn: &Connection<'_>,
1466        entity: &Entity,
1467        origin_machine_id: &str,
1468        origin_seq: i64,
1469    ) -> Result<ApplyOutcome> {
1470        let data = serde_json::to_string(entity).ok();
1471        let node_id = entity.id.to_string();
1472        if self.get_entity(entity.id)?.is_some() {
1473            return Ok(ApplyOutcome::Skipped);
1474        }
1475        self.create_entity_node(conn, entity)?;
1476        self.append_sync_log(
1477            conn,
1478            SyncOp::Create,
1479            SyncNodeType::Entity,
1480            &node_id,
1481            data.as_deref(),
1482            origin_machine_id,
1483            Some(origin_seq),
1484        )?;
1485        Ok(ApplyOutcome::Created)
1486    }
1487
1488    pub fn apply_create_relation_on_tx(
1489        &self,
1490        conn: &Connection<'_>,
1491        relation: &Relation,
1492        origin_machine_id: &str,
1493        origin_seq: i64,
1494    ) -> Result<ApplyOutcome> {
1495        let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
1496        let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
1497        let data = serde_json::to_string(relation).ok();
1498        match self.create_relation_edge(conn, relation) {
1499            Ok(()) => {
1500                self.append_sync_log(
1501                    conn,
1502                    SyncOp::Create,
1503                    SyncNodeType::Relation,
1504                    &node_id,
1505                    data.as_deref(),
1506                    origin_machine_id,
1507                    Some(origin_seq),
1508                )?;
1509                Ok(ApplyOutcome::Created)
1510            }
1511            // create_relation_edge fails when an endpoint node is absent; a relation
1512            // whose endpoints are not present yet is skipped, matching the per-record path.
1513            Err(_) => Ok(ApplyOutcome::Skipped),
1514        }
1515    }
1516
1517    pub fn note_relation_created(&self, from_id: Uuid) {
1518        self.evict_relations_for(from_id);
1519    }
1520
1521    pub fn apply_update_memory(
1522        &self,
1523        incoming: &Memory,
1524        origin_machine_id: &str,
1525        origin_seq: i64,
1526    ) -> Result<(ApplyOutcome, Option<Memory>)> {
1527        let mut incoming = incoming.clone();
1528        Self::normalize_incoming(&mut incoming);
1529        let data = serde_json::to_string(&incoming).ok();
1530        let node_id = incoming.id.to_string();
1531        self.with_transaction(|conn| {
1532            match self.get_memory(incoming.id)? {
1533                None => {
1534                    if self.tombstone_exists_on(conn, incoming.id)? {
1535                        return Ok((ApplyOutcome::Skipped, None));
1536                    }
1537                    self.create_memory_node(conn, &incoming)?;
1538                    self.append_sync_log(
1539                        conn,
1540                        SyncOp::Create,
1541                        SyncNodeType::Memory,
1542                        &node_id,
1543                        data.as_deref(),
1544                        origin_machine_id,
1545                        Some(origin_seq),
1546                    )?;
1547                    Ok((ApplyOutcome::Created, Some(incoming)))
1548                }
1549                Some(existing) => {
1550                    let incoming_wins = if incoming.updated_at != existing.updated_at {
1551                        incoming.updated_at > existing.updated_at
1552                    } else if incoming.access_count != existing.access_count {
1553                        incoming.access_count > existing.access_count
1554                    } else {
1555                        incoming.machine_id > existing.machine_id
1556                    };
1557
1558                    if !incoming_wins {
1559                        return Ok((ApplyOutcome::Skipped, None));
1560                    }
1561
1562                    let id = incoming.id.to_string();
1563                    let content_escaped = escape_cypher(&incoming.content);
1564                    let memory_type = format!("{:?}", incoming.memory_type).to_lowercase();
1565                    let created_at = incoming.created_at.to_rfc3339();
1566                    let last_accessed = incoming.last_accessed.to_rfc3339();
1567                    let updated_at = incoming.updated_at.to_rfc3339();
1568                    let source_escaped = escape_cypher(&incoming.source);
1569                    let source_id_escaped = escape_cypher(&incoming.source_id);
1570                    let project_path_escaped =
1571                        escape_cypher(incoming.project_path.as_deref().unwrap_or(""));
1572                    let machine_id_escaped = escape_cypher(&incoming.machine_id);
1573                    // embedding is excluded because KuzuDB forbids SET on HNSW-indexed columns;
1574                    // the embedding is recomputed at ingest time so skipping it here is safe
1575                    conn.query(&format!(
1576                        "MATCH (m:Memory {{id: '{id}'}}) SET \
1577                         m.content = '{content_escaped}', \
1578                         m.memory_type = '{memory_type}', \
1579                         m.confidence = {confidence}, \
1580                         m.created_at = '{created_at}', \
1581                         m.last_accessed = '{last_accessed}', \
1582                         m.access_count = {access_count}, \
1583                         m.source = '{source_escaped}', \
1584                         m.source_id = '{source_id_escaped}', \
1585                         m.project_path = '{project_path_escaped}', \
1586                         m.machine_id = '{machine_id_escaped}', \
1587                         m.updated_at = '{updated_at}';",
1588                        confidence = incoming.confidence,
1589                        access_count = incoming.access_count,
1590                    ))?;
1591                    self.append_sync_log(
1592                        conn,
1593                        SyncOp::Update,
1594                        SyncNodeType::Memory,
1595                        &node_id,
1596                        data.as_deref(),
1597                        origin_machine_id,
1598                        Some(origin_seq),
1599                    )?;
1600                    Ok((ApplyOutcome::Updated, Some(incoming)))
1601                }
1602            }
1603        })
1604    }
1605
1606    pub fn apply_delete_memory(
1607        &self,
1608        node_id: &str,
1609        origin_machine_id: &str,
1610        origin_seq: i64,
1611    ) -> Result<ApplyOutcome> {
1612        let result = self.with_transaction(|conn| {
1613            conn.query(&format!(
1614                "MATCH (m:Memory {{id: '{node_id}'}}) DETACH DELETE m;"
1615            ))?;
1616            self.upsert_tombstone(conn, node_id, "memory", origin_machine_id)?;
1617            self.append_sync_log(
1618                conn,
1619                SyncOp::Delete,
1620                SyncNodeType::Memory,
1621                node_id,
1622                None,
1623                origin_machine_id,
1624                Some(origin_seq),
1625            )?;
1626            Ok(ApplyOutcome::Deleted)
1627        });
1628        // DETACH DELETE removes both this memory's outgoing edges and edges where it
1629        // was the target (changing neighbors' relation lists), so clear the whole cache.
1630        self.clear_relations_cache();
1631        result
1632    }
1633
1634    pub fn apply_create_conversation(
1635        &self,
1636        conv: &Conversation,
1637        origin_machine_id: &str,
1638        origin_seq: i64,
1639    ) -> Result<ApplyOutcome> {
1640        let data = serde_json::to_string(conv).ok();
1641        let node_id = conv.id.to_string();
1642        self.with_transaction(|conn| {
1643            if self.get_conversation(conv.id)?.is_some() {
1644                return Ok(ApplyOutcome::Skipped);
1645            }
1646            self.create_conversation_node(conn, conv)?;
1647            self.append_sync_log(
1648                conn,
1649                SyncOp::Create,
1650                SyncNodeType::Conversation,
1651                &node_id,
1652                data.as_deref(),
1653                origin_machine_id,
1654                Some(origin_seq),
1655            )?;
1656            Ok(ApplyOutcome::Created)
1657        })
1658    }
1659
1660    pub fn apply_upsert_conversation(
1661        &self,
1662        conv: &Conversation,
1663        origin_machine_id: &str,
1664        origin_seq: i64,
1665    ) -> Result<ApplyOutcome> {
1666        let data = serde_json::to_string(conv).ok();
1667        let node_id = conv.id.to_string();
1668        let source_escaped = escape_cypher(&conv.source);
1669        let machine_escaped = escape_cypher(&conv.machine_id);
1670        let project_escaped = escape_cypher(conv.project_path.as_deref().unwrap_or(""));
1671        let started_at = conv.started_at.to_rfc3339();
1672        self.with_transaction(|conn| {
1673            conn.query(&format!(
1674                "MERGE (c:Conversation {{id: '{node_id}'}}) SET \
1675                 c.source = '{source_escaped}', \
1676                 c.machine_id = '{machine_escaped}', \
1677                 c.started_at = '{started_at}', \
1678                 c.project_path = '{project_escaped}';"
1679            ))?;
1680            self.append_sync_log(
1681                conn,
1682                SyncOp::Update,
1683                SyncNodeType::Conversation,
1684                &node_id,
1685                data.as_deref(),
1686                origin_machine_id,
1687                Some(origin_seq),
1688            )?;
1689            Ok(ApplyOutcome::Updated)
1690        })
1691    }
1692
1693    pub fn apply_delete_conversation(
1694        &self,
1695        node_id: &str,
1696        origin_machine_id: &str,
1697        origin_seq: i64,
1698    ) -> Result<ApplyOutcome> {
1699        let result = self.with_transaction(|conn| {
1700            conn.query(&format!(
1701                "MATCH (c:Conversation {{id: '{node_id}'}}) DETACH DELETE c;"
1702            ))?;
1703            self.append_sync_log(
1704                conn,
1705                SyncOp::Delete,
1706                SyncNodeType::Conversation,
1707                node_id,
1708                None,
1709                origin_machine_id,
1710                Some(origin_seq),
1711            )?;
1712            Ok(ApplyOutcome::Deleted)
1713        });
1714        // DETACH DELETE drops DERIVED_FROM edges from memories pointing at this
1715        // conversation, changing those memories' relation lists. The source ids are not
1716        // enumerated here, so clear the whole cache.
1717        self.clear_relations_cache();
1718        result
1719    }
1720
1721    pub fn apply_create_entity(
1722        &self,
1723        entity: &Entity,
1724        origin_machine_id: &str,
1725        origin_seq: i64,
1726    ) -> Result<ApplyOutcome> {
1727        let data = serde_json::to_string(entity).ok();
1728        let node_id = entity.id.to_string();
1729        self.with_transaction(|conn| {
1730            if self.get_entity(entity.id)?.is_some() {
1731                return Ok(ApplyOutcome::Skipped);
1732            }
1733            self.create_entity_node(conn, entity)?;
1734            self.append_sync_log(
1735                conn,
1736                SyncOp::Create,
1737                SyncNodeType::Entity,
1738                &node_id,
1739                data.as_deref(),
1740                origin_machine_id,
1741                Some(origin_seq),
1742            )?;
1743            Ok(ApplyOutcome::Created)
1744        })
1745    }
1746
1747    pub fn apply_upsert_entity(
1748        &self,
1749        entity: &Entity,
1750        origin_machine_id: &str,
1751        origin_seq: i64,
1752    ) -> Result<ApplyOutcome> {
1753        let data = serde_json::to_string(entity).ok();
1754        let node_id = entity.id.to_string();
1755        let name_escaped = escape_cypher(&entity.name);
1756        let etype_escaped = escape_cypher(&entity.entity_type);
1757        let embedding_str = format_embedding(&entity.embedding);
1758        let aliases_str = format_string_array(&entity.aliases);
1759        self.with_transaction(|conn| {
1760            conn.query(&format!(
1761                "MERGE (e:Entity {{id: '{node_id}'}}) SET \
1762                 e.name = '{name_escaped}', \
1763                 e.entity_type = '{etype_escaped}', \
1764                 e.embedding = {embedding_str}, \
1765                 e.aliases = {aliases_str};"
1766            ))?;
1767            self.append_sync_log(
1768                conn,
1769                SyncOp::Update,
1770                SyncNodeType::Entity,
1771                &node_id,
1772                data.as_deref(),
1773                origin_machine_id,
1774                Some(origin_seq),
1775            )?;
1776            Ok(ApplyOutcome::Updated)
1777        })
1778    }
1779
1780    pub fn apply_delete_entity(
1781        &self,
1782        node_id: &str,
1783        origin_machine_id: &str,
1784        origin_seq: i64,
1785    ) -> Result<ApplyOutcome> {
1786        let result = self.with_transaction(|conn| {
1787            conn.query(&format!(
1788                "MATCH (e:Entity {{id: '{node_id}'}}) DETACH DELETE e;"
1789            ))?;
1790            self.append_sync_log(
1791                conn,
1792                SyncOp::Delete,
1793                SyncNodeType::Entity,
1794                node_id,
1795                None,
1796                origin_machine_id,
1797                Some(origin_seq),
1798            )?;
1799            Ok(ApplyOutcome::Deleted)
1800        });
1801        // DETACH DELETE drops MENTIONS edges from memories pointing at this entity,
1802        // changing those memories' relation lists; the source ids are not enumerated
1803        // here, so clear the whole cache.
1804        self.clear_relations_cache();
1805        result
1806    }
1807
1808    pub fn apply_create_relation(
1809        &self,
1810        relation: &Relation,
1811        origin_machine_id: &str,
1812        origin_seq: i64,
1813    ) -> Result<ApplyOutcome> {
1814        let rel_type = format!("{:?}", relation.relation_type).to_lowercase();
1815        let node_id = format!("{}:{}:{}", relation.from_id, relation.to_id, rel_type);
1816        let data = serde_json::to_string(relation).ok();
1817        let result = self.with_transaction(|conn| {
1818            match self.create_relation_edge(conn, relation) {
1819                Ok(()) => {
1820                    self.append_sync_log(
1821                        conn,
1822                        SyncOp::Create,
1823                        SyncNodeType::Relation,
1824                        &node_id,
1825                        data.as_deref(),
1826                        origin_machine_id,
1827                        Some(origin_seq),
1828                    )?;
1829                    Ok(ApplyOutcome::Created)
1830                }
1831                Err(_) => Ok(ApplyOutcome::Skipped),
1832            }
1833        });
1834        self.evict_relations_for(relation.from_id);
1835        result
1836    }
1837
1838    pub fn apply_delete_relation(
1839        &self,
1840        node_id: &str,
1841        relation: Option<&Relation>,
1842        origin_machine_id: &str,
1843        origin_seq: i64,
1844    ) -> Result<ApplyOutcome> {
1845        let Some(rel) = relation else {
1846            // Nothing to delete or relay because there is no deserializable relation data,
1847            // so skip rather than inflate the delete counter with a hollow log entry.
1848            return Ok(ApplyOutcome::Skipped);
1849        };
1850        let evict_id = rel.from_id;
1851        let result = self.with_transaction(|conn| {
1852            let from_id = rel.from_id.to_string();
1853            let to_id = rel.to_id.to_string();
1854            let rel_label = match rel.relation_type {
1855                crate::schema::RelationType::RelatesTo => "RELATES_TO",
1856                crate::schema::RelationType::Mentions => "MENTIONS",
1857                crate::schema::RelationType::DerivedFrom => "DERIVED_FROM",
1858                crate::schema::RelationType::Contradicts => "CONTRADICTS",
1859                crate::schema::RelationType::Reinforces => "REINFORCES",
1860                crate::schema::RelationType::Supersedes => "SUPERSEDES",
1861                crate::schema::RelationType::DistilledFrom => "DISTILLED_FROM",
1862            };
1863            conn.query(&format!(
1864                "MATCH (a {{id: '{from_id}'}})-[r:{rel_label}]->(b {{id: '{to_id}'}}) DELETE r;"
1865            ))
1866            .ok();
1867            self.append_sync_log(
1868                conn,
1869                SyncOp::Delete,
1870                SyncNodeType::Relation,
1871                node_id,
1872                None,
1873                origin_machine_id,
1874                Some(origin_seq),
1875            )?;
1876            Ok(ApplyOutcome::Deleted)
1877        });
1878        self.evict_relations_for(evict_id);
1879        result
1880    }
1881
1882    pub fn sync_log_page(&self, after_seq: u64, limit: Option<usize>) -> Result<Vec<SyncEntry>> {
1883        let conn = self.conn()?;
1884        let limit_clause = match limit {
1885            Some(n) => format!(" LIMIT {}", n),
1886            None => String::new(),
1887        };
1888        let query = format!(
1889            "MATCH (s:SyncLog) WHERE s.local_seq > {} \
1890             RETURN s.id, s.local_seq, s.origin_machine_id, s.origin_seq, s.op, s.node_type, s.node_id, s.timestamp, s.data \
1891             ORDER BY s.local_seq{};",
1892            after_seq, limit_clause
1893        );
1894
1895        let mut result = conn.query(&query)?;
1896        let mut entries = Vec::new();
1897
1898        for row in &mut result {
1899            let id = value_to_string(&row[0]);
1900            let local_seq = match &row[1] {
1901                Value::Int64(n) => *n,
1902                _ => 0,
1903            };
1904            let origin_machine_id = value_to_string(&row[2]);
1905            let origin_seq = match &row[3] {
1906                Value::Int64(n) => *n,
1907                _ => 0,
1908            };
1909            let op_str = value_to_string(&row[4]);
1910            let node_type_str = value_to_string(&row[5]);
1911            let node_id = value_to_string(&row[6]);
1912            let timestamp_str = value_to_string(&row[7]);
1913            let data_str = value_to_string(&row[8]);
1914
1915            let op = match op_str.as_str() {
1916                "create" => SyncOp::Create,
1917                "update" => SyncOp::Update,
1918                "delete" => SyncOp::Delete,
1919                _ => continue,
1920            };
1921            let node_type = match node_type_str.as_str() {
1922                "memory" => SyncNodeType::Memory,
1923                "entity" => SyncNodeType::Entity,
1924                "conversation" => SyncNodeType::Conversation,
1925                "relation" => SyncNodeType::Relation,
1926                _ => continue,
1927            };
1928            let timestamp = chrono::DateTime::parse_from_rfc3339(&timestamp_str)
1929                .map(|dt| dt.with_timezone(&chrono::Utc))
1930                .unwrap_or_else(|_| chrono::Utc::now());
1931
1932            let data = if data_str.is_empty() {
1933                None
1934            } else {
1935                Some(data_str)
1936            };
1937
1938            entries.push(SyncEntry {
1939                id,
1940                local_seq,
1941                origin_machine_id,
1942                origin_seq,
1943                op,
1944                node_type,
1945                node_id,
1946                timestamp,
1947                data,
1948            });
1949        }
1950
1951        Ok(entries)
1952    }
1953
1954    pub fn get_sync_state(&self, peer_id: &str) -> Result<Option<SyncState>> {
1955        let conn = self.conn()?;
1956        let escaped = escape_cypher(peer_id);
1957        let mut result = conn.query(&format!(
1958            "MATCH (s:SyncState {{peer_id: '{}'}}) RETURN s.peer_id, s.last_seq, s.last_sync_at;",
1959            escaped
1960        ))?;
1961
1962        match result.next() {
1963            Some(row) => {
1964                let peer_id = value_to_string(&row[0]);
1965                let last_seq = match &row[1] {
1966                    Value::Int64(n) => *n as u64,
1967                    _ => 0,
1968                };
1969                let last_sync_at_str = value_to_string(&row[2]);
1970                let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
1971                    .map(|dt| dt.with_timezone(&chrono::Utc))
1972                    .unwrap_or_else(|_| chrono::Utc::now());
1973
1974                Ok(Some(SyncState {
1975                    peer_id,
1976                    last_seq,
1977                    last_sync_at,
1978                }))
1979            }
1980            None => Ok(None),
1981        }
1982    }
1983
1984    pub fn set_sync_state(&self, state: &SyncState) -> Result<()> {
1985        let conn = self.conn()?;
1986        let peer_escaped = escape_cypher(&state.peer_id);
1987        let now = state.last_sync_at.to_rfc3339();
1988
1989        conn.query(&format!(
1990            "MERGE (s:SyncState {{peer_id: '{}'}}) SET s.last_seq = {}, s.last_sync_at = '{}';",
1991            peer_escaped, state.last_seq, now
1992        ))?;
1993        Ok(())
1994    }
1995
1996    pub fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
1997        let conn = self.conn()?;
1998        let mut result =
1999            conn.query("MATCH (s:SyncState) RETURN s.peer_id, s.last_seq, s.last_sync_at;")?;
2000        let mut states = Vec::new();
2001
2002        for row in &mut result {
2003            let peer_id = value_to_string(&row[0]);
2004            let last_seq = match &row[1] {
2005                Value::Int64(n) => *n as u64,
2006                _ => 0,
2007            };
2008            let last_sync_at_str = value_to_string(&row[2]);
2009            let last_sync_at = chrono::DateTime::parse_from_rfc3339(&last_sync_at_str)
2010                .map(|dt| dt.with_timezone(&chrono::Utc))
2011                .unwrap_or_else(|_| chrono::Utc::now());
2012
2013            states.push(SyncState {
2014                peer_id,
2015                last_seq,
2016                last_sync_at,
2017            });
2018        }
2019
2020        Ok(states)
2021    }
2022
2023    pub fn all_entities(&self) -> Result<Vec<Entity>> {
2024        let conn = self.conn()?;
2025        let mut result = conn.query("MATCH (e:Entity) RETURN e.id, e.name, e.entity_type;")?;
2026        let mut entities = Vec::new();
2027        for row in &mut result {
2028            entities.push(row_to_entity(&row)?);
2029        }
2030        Ok(entities)
2031    }
2032
2033    pub fn memories_for_entity(&self, entity_id: Uuid) -> Result<Vec<Memory>> {
2034        let conn = self.conn()?;
2035        let query = format!(
2036            "MATCH (m:Memory)-[:MENTIONS]->(e:Entity {{id: '{}'}}) RETURN {};",
2037            entity_id, memory_return_cols("m")
2038        );
2039        let mut result = conn.query(&query)?;
2040        let mut memories = Vec::new();
2041        for row in &mut result {
2042            memories.push(row_to_memory(&row)?);
2043        }
2044        Ok(memories)
2045    }
2046
2047    pub fn unassociated_memories(&self) -> Result<Vec<Memory>> {
2048        let conn = self.conn()?;
2049        let mut result = conn.query(
2050            &format!("MATCH (m:Memory) WHERE NOT EXISTS {{MATCH (m)-[:MENTIONS]->(:Entity)}} RETURN {};", memory_return_cols("m"))
2051        )?;
2052        let mut memories = Vec::new();
2053        for row in &mut result {
2054            memories.push(row_to_memory(&row)?);
2055        }
2056        Ok(memories)
2057    }
2058
2059    pub fn related_entity_names(&self, entity_id: Uuid) -> Result<Vec<String>> {
2060        let conn = self.conn()?;
2061        let query = format!(
2062            "MATCH (m:Memory)-[:MENTIONS]->(e1:Entity {{id: '{}'}}), (m)-[:MENTIONS]->(e2:Entity) WHERE e2.id <> '{}' RETURN DISTINCT e2.name;",
2063            entity_id, entity_id
2064        );
2065        let mut result = conn.query(&query)?;
2066        let mut names = Vec::new();
2067        for row in &mut result {
2068            names.push(value_to_string(&row[0]));
2069        }
2070        Ok(names)
2071    }
2072
2073    pub fn max_sync_seq(&self) -> Result<u64> {
2074        let conn = self.conn()?;
2075        let mut result = conn.query("MATCH (s:SyncLog) RETURN max(s.local_seq);")?;
2076        match result.next() {
2077            Some(row) => match &row[0] {
2078                Value::Int64(n) => Ok(*n as u64),
2079                _ => Ok(0),
2080            },
2081            None => Ok(0),
2082        }
2083    }
2084
2085    pub fn backfill_project_paths(&self) -> Result<u64> {
2086        let conn = self.conn()?;
2087        let mut result = conn.query(
2088            "MATCH (m:Memory)-[:DERIVED_FROM]->(c:Conversation) WHERE m.project_path = '' AND c.project_path IS NOT NULL AND c.project_path <> '' SET m.project_path = c.project_path RETURN count(m);"
2089        )?;
2090        match result.next() {
2091            Some(row) => match &row[0] {
2092                Value::Int64(n) => Ok(*n as u64),
2093                _ => Ok(0),
2094            },
2095            None => Ok(0),
2096        }
2097    }
2098
2099    pub fn memories_by_project_path(&self, project_path: &str) -> Result<Vec<Memory>> {
2100        let conn = self.conn()?;
2101        let escaped = escape_cypher(project_path);
2102        let cols = memory_return_cols("m");
2103        let query = format!(
2104            "MATCH (m:Memory) WHERE m.project_path = '{}' RETURN {} LIMIT 20;",
2105            escaped, cols
2106        );
2107        let mut result = conn.query(&query)?;
2108        let mut memories = Vec::new();
2109        for row in &mut result {
2110            memories.push(row_to_memory(&row)?);
2111        }
2112        Ok(memories)
2113    }
2114
2115    pub fn all_memories_with_embeddings(&self) -> Result<Vec<Memory>> {
2116        let conn = self.conn()?;
2117        let mut result = conn.query(&format!(
2118            "MATCH (m:Memory) RETURN {}, m.embedding;",
2119            memory_return_cols("m")
2120        ))?;
2121        let mut acc = Vec::new();
2122        for row in &mut result {
2123            // row_to_memory nulls the embedding; repopulate it from the trailing
2124            // m.embedding column so callers get the real vector, not Vec::new().
2125            let mut memory = row_to_memory(&row[..12])?;
2126            memory.embedding = value_to_f32_vec(&row[12]);
2127            acc.push(memory);
2128        }
2129        Ok(acc)
2130    }
2131
2132    pub fn backfill_sync_log(&self) -> Result<u64> {
2133        let mut count = 0u64;
2134
2135        let memories = self.all_memories_with_embeddings()?;
2136
2137        for memory in &memories {
2138            let origin_mid = memory.machine_id.clone();
2139            let data = serde_json::to_string(memory).ok();
2140            let node_id = memory.id.to_string();
2141            self.with_transaction(|conn| {
2142                self.append_sync_log(
2143                    conn,
2144                    SyncOp::Create,
2145                    SyncNodeType::Memory,
2146                    &node_id,
2147                    data.as_deref(),
2148                    &origin_mid,
2149                    None,
2150                )
2151            })?;
2152            count += 1;
2153        }
2154
2155        let conversations: Vec<Conversation> = {
2156            let conn = self.conn()?;
2157            let mut result = conn.query(
2158                "MATCH (c:Conversation) RETURN c.id, c.source, c.machine_id, c.started_at, c.project_path;",
2159            )?;
2160            let mut acc = Vec::new();
2161            for row in &mut result {
2162                let id = Uuid::parse_str(&value_to_string(&row[0])).unwrap_or_default();
2163                let source = value_to_string(&row[1]);
2164                let machine_id = value_to_string(&row[2]);
2165                let started_at_str = value_to_string(&row[3]);
2166                let project_path = {
2167                    let s = value_to_string(&row[4]);
2168                    if s.is_empty() { None } else { Some(s) }
2169                };
2170                let started_at = chrono::DateTime::parse_from_rfc3339(&started_at_str)
2171                    .map(|dt| dt.with_timezone(&chrono::Utc))
2172                    .unwrap_or_else(|_| chrono::Utc::now());
2173                acc.push(Conversation { id, source, machine_id, started_at, project_path });
2174            }
2175            acc
2176        };
2177
2178        for conv in &conversations {
2179            let origin_mid = conv.machine_id.clone();
2180            let data = serde_json::to_string(conv).ok();
2181            let node_id = conv.id.to_string();
2182            self.with_transaction(|conn| {
2183                self.append_sync_log(
2184                    conn,
2185                    SyncOp::Create,
2186                    SyncNodeType::Conversation,
2187                    &node_id,
2188                    data.as_deref(),
2189                    &origin_mid,
2190                    None,
2191                )
2192            })?;
2193            count += 1;
2194        }
2195
2196        Ok(count)
2197    }
2198}
2199
2200impl KuzuStore {
2201    pub fn upsert_machine(&self, id: &str, name: &str) -> Result<()> {
2202        let conn = self.conn()?;
2203        let id_escaped = escape_cypher(id);
2204        let name_escaped = escape_cypher(name);
2205        conn.query(&format!(
2206            "MERGE (m:Machine {{id: '{id_escaped}'}}) SET m.name = '{name_escaped}';"
2207        ))?;
2208        Ok(())
2209    }
2210
2211    pub fn get_machine_name(&self, id: &str) -> Result<Option<String>> {
2212        let conn = self.conn()?;
2213        let id_escaped = escape_cypher(id);
2214        let mut result = conn.query(&format!(
2215            "MATCH (m:Machine {{id: '{id_escaped}'}}) RETURN m.name;"
2216        ))?;
2217        match result.next() {
2218            Some(row) => Ok(Some(value_to_string(&row[0]))),
2219            None => Ok(None),
2220        }
2221    }
2222
2223    pub fn get_all_machines(&self) -> Result<std::collections::HashMap<String, String>> {
2224        let conn = self.conn()?;
2225        let mut result = conn.query("MATCH (m:Machine) RETURN m.id, m.name;")?;
2226        let mut map = std::collections::HashMap::new();
2227        for row in &mut result {
2228            map.insert(value_to_string(&row[0]), value_to_string(&row[1]));
2229        }
2230        Ok(map)
2231    }
2232
2233    pub fn backfill_machine_id(&self, machine_id: &str) -> Result<u64> {
2234        let conn = self.conn()?;
2235        let escaped = escape_cypher(machine_id);
2236        let mut result = conn.query(&format!(
2237            "MATCH (m:Memory) WHERE m.machine_id = '' SET m.machine_id = '{escaped}' RETURN count(m);"
2238        ))?;
2239        match result.next() {
2240            Some(row) => match &row[0] {
2241                Value::Int64(n) => Ok(*n as u64),
2242                _ => Ok(0),
2243            },
2244            None => Ok(0),
2245        }
2246    }
2247
2248    pub fn register_machine(&self, identity: &crate::machine::MachineIdentity) -> Result<()> {
2249        self.upsert_machine(&identity.id, &identity.name)?;
2250        let count = self.backfill_machine_id(&identity.id)?;
2251        if count > 0 {
2252            tracing::info!("backfilled machine_id on {count} existing memories");
2253        }
2254        Ok(())
2255    }
2256}
2257
2258fn format_embedding(embedding: &[f32]) -> String {
2259    if embedding.is_empty() {
2260        let zeros: Vec<String> = (0..384).map(|_| "0.0".to_string()).collect();
2261        return format!("[{}]", zeros.join(","));
2262    }
2263    let parts: Vec<String> = embedding.iter().map(|v| format!("{}", v)).collect();
2264    format!("[{}]", parts.join(","))
2265}
2266
2267fn escape_cypher(s: &str) -> String {
2268    s.replace('\\', "\\\\").replace('\'', "\\'")
2269}
2270
2271fn format_string_array(items: &[String]) -> String {
2272    let parts: Vec<String> = items
2273        .iter()
2274        .map(|s| format!("'{}'", s.replace('\'', "''")))
2275        .collect();
2276    format!("[{}]", parts.join(","))
2277}
2278
2279fn value_to_string(val: &Value) -> String {
2280    match val {
2281        Value::String(s) => s.clone(),
2282        other => format!("{:?}", other),
2283    }
2284}
2285
2286fn value_to_f32(val: &Value) -> f32 {
2287    match val {
2288        Value::Float(f) => *f,
2289        Value::Double(d) => *d as f32,
2290        Value::Int64(i) => *i as f32,
2291        _ => 0.0,
2292    }
2293}
2294
2295fn value_to_f32_vec(val: &Value) -> Vec<f32> {
2296    match val {
2297        Value::Array(_, items) | Value::List(_, items) => items.iter().map(value_to_f32).collect(),
2298        _ => Vec::new(),
2299    }
2300}
2301
2302fn row_to_memory(row: &[Value]) -> Result<Memory> {
2303    let id_str = value_to_string(&row[0]);
2304    let id = Uuid::parse_str(&id_str).unwrap_or_default();
2305    let content = value_to_string(&row[1]);
2306    let memory_type_str = value_to_string(&row[2]);
2307    let confidence = value_to_f32(&row[3]);
2308    let created_at_str = value_to_string(&row[4]);
2309    let last_accessed_str = value_to_string(&row[5]);
2310    let access_count = match &row[6] {
2311        Value::Int64(i) => *i as u32,
2312        _ => 0,
2313    };
2314    let source = value_to_string(&row[7]);
2315    let source_id = value_to_string(&row[8]);
2316    let project_path = project_path_from_db(&value_to_string(&row[9]));
2317    let machine_id = value_to_string(&row[10]);
2318    let updated_at_str = value_to_string(&row[11]);
2319
2320    let memory_type = match memory_type_str.as_str() {
2321        "episodic" => MemoryType::Episodic,
2322        "procedural" => MemoryType::Procedural,
2323        "decision" => MemoryType::Decision,
2324        "architecture" => MemoryType::Architecture,
2325        "debugging" => MemoryType::Debugging,
2326        "task" => MemoryType::Task,
2327        "question" => MemoryType::Question,
2328        _ => MemoryType::Semantic,
2329    };
2330
2331    let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
2332        .map(|dt| dt.with_timezone(&chrono::Utc))
2333        .unwrap_or_else(|_| chrono::Utc::now());
2334
2335    let last_accessed = chrono::DateTime::parse_from_rfc3339(&last_accessed_str)
2336        .map(|dt| dt.with_timezone(&chrono::Utc))
2337        .unwrap_or_else(|_| chrono::Utc::now());
2338
2339    let updated_at = if updated_at_str.is_empty() {
2340        created_at
2341    } else {
2342        chrono::DateTime::parse_from_rfc3339(&updated_at_str)
2343            .map(|dt| dt.with_timezone(&chrono::Utc))
2344            .unwrap_or(created_at)
2345    };
2346
2347    Ok(Memory {
2348        id,
2349        content,
2350        embedding: Vec::new(),
2351        memory_type,
2352        confidence,
2353        created_at,
2354        last_accessed,
2355        access_count,
2356        source,
2357        source_id,
2358        project_path,
2359        machine_id,
2360        updated_at,
2361    })
2362}
2363
2364fn row_to_entity(row: &[Value]) -> Result<Entity> {
2365    let id_str = value_to_string(&row[0]);
2366    let id = Uuid::parse_str(&id_str).unwrap_or_default();
2367    let name = value_to_string(&row[1]);
2368    let entity_type = value_to_string(&row[2]);
2369
2370    Ok(Entity {
2371        id,
2372        name,
2373        entity_type,
2374        embedding: Vec::new(),
2375        aliases: Vec::new(),
2376    })
2377}
2378
2379unsafe impl Send for KuzuStore {}
2380unsafe impl Sync for KuzuStore {}
2381
2382#[cfg(test)]
2383mod tests {
2384    use super::*;
2385
2386    #[test]
2387    fn migrate_sync_log_recreates_old_schema_and_resets_cursors() {
2388        let store = KuzuStore::in_memory("test-machine-migrate".to_string()).unwrap();
2389        let conn = store.conn().unwrap();
2390
2391        // Tear down the new-schema table that init_schema already created and
2392        // replace it with the pre-redesign schema (seq INT64 PRIMARY KEY).
2393        conn.query("DROP TABLE SyncLog;").unwrap();
2394        conn.query(
2395            "CREATE NODE TABLE SyncLog(\
2396                seq INT64 PRIMARY KEY, \
2397                op STRING, \
2398                node_type STRING, \
2399                node_id STRING, \
2400                machine_id STRING, \
2401                timestamp STRING, \
2402                data STRING\
2403            );",
2404        )
2405        .unwrap();
2406        conn.query(
2407            "CREATE (:SyncLog {seq: 1, op: 'create', node_type: 'memory', \
2408             node_id: 'abc', machine_id: 'old-machine', \
2409             timestamp: '2024-01-01T00:00:00Z', data: ''});",
2410        )
2411        .unwrap();
2412
2413        // Seed a SyncState cursor at a non-zero position so we can verify it gets reset.
2414        conn.query(
2415            "MERGE (s:SyncState {peer_id: 'peer-x'}) \
2416             SET s.last_seq = 5, s.last_sync_at = '2024-01-01T00:00:00Z';",
2417        )
2418        .unwrap();
2419
2420        store.migrate_sync_log(&conn).unwrap();
2421
2422        // New-schema column must exist (table was recreated with id STRING PRIMARY KEY).
2423        assert!(
2424            conn.query("MATCH (s:SyncLog) RETURN s.id LIMIT 1;").is_ok(),
2425            "new-schema column s.id must be present after migration"
2426        );
2427
2428        // SyncState cursor must have been reset to 0.
2429        let mut r = conn
2430            .query(
2431                "MATCH (s:SyncState {peer_id: 'peer-x'}) RETURN s.last_seq;",
2432            )
2433            .unwrap();
2434        let last_seq = match r.next() {
2435            Some(row) => match &row[0] {
2436                Value::Int64(n) => *n,
2437                _ => panic!("unexpected value type for last_seq"),
2438            },
2439            None => panic!("SyncState row not found after migration"),
2440        };
2441        assert_eq!(last_seq, 0, "last_seq must be reset to 0 after migration");
2442    }
2443
2444    #[test]
2445    fn rollback_leaves_no_partial_state() {
2446        let store = KuzuStore::in_memory("test-machine-rollback".to_string()).unwrap();
2447        let embedding = std::iter::repeat("0.0")
2448            .take(384)
2449            .collect::<Vec<_>>()
2450            .join(", ");
2451
2452        let result: Result<()> = store.with_transaction(|conn| {
2453            conn.query(&format!(
2454                "CREATE (:Memory {{id: 'rollback-test-id', content: 'x', \
2455                 embedding: [{embedding}], memory_type: 'semantic', confidence: 1.0, \
2456                 created_at: '2024-01-01T00:00:00Z', last_accessed: '2024-01-01T00:00:00Z', \
2457                 access_count: 0, source: 'test', source_id: '', project_path: '', \
2458                 machine_id: 'x', updated_at: '2024-01-01T00:00:00Z'}});"
2459            ))?;
2460            Err(anyhow::anyhow!("deliberate rollback"))
2461        });
2462
2463        assert!(
2464            result.is_err(),
2465            "with_transaction must propagate the closure error"
2466        );
2467
2468        let entries = store.sync_log_since(0).unwrap();
2469        assert_eq!(entries.len(), 0, "no SyncLog rows may survive a rollback");
2470        assert_eq!(
2471            store.memory_count().unwrap(),
2472            0,
2473            "the Memory node created inside the transaction must be rolled back"
2474        );
2475    }
2476
2477    fn memory_with_embedding(content: &str, embedding: Vec<f32>) -> Memory {
2478        let mut m = Memory::new(
2479            content.to_string(),
2480            MemoryType::Semantic,
2481            "test".to_string(),
2482            String::new(),
2483        );
2484        m.embedding = embedding;
2485        m
2486    }
2487
2488    #[test]
2489    fn recall_reflects_new_relation_after_cache_invalidation() {
2490        use crate::query::{QueryEngine, QueryFilters, QueryRequest};
2491
2492        let store = KuzuStore::in_memory("test-machine-invalidate".to_string()).unwrap();
2493
2494        let mut emb_a = vec![0.0_f32; 384];
2495        emb_a[0] = 1.0;
2496        let mut emb_b = vec![0.0_f32; 384];
2497        emb_b[1] = 1.0;
2498        let target = memory_with_embedding("kuzu is the embedded graph store", emb_a.clone());
2499        let neighbor = memory_with_embedding("an unrelated note about sync", emb_b);
2500        store.store_memory(&target).unwrap();
2501        store.store_memory(&neighbor).unwrap();
2502
2503        let request = QueryRequest {
2504            text: "graph store".to_string(),
2505            embedding: emb_a,
2506            limit: 10,
2507            filters: QueryFilters::default(),
2508        };
2509
2510        let before = QueryEngine::new(&store).recall(&request).unwrap();
2511        let before_score = before
2512            .iter()
2513            .find(|r| r.memory.id == target.id)
2514            .expect("target must be recalled")
2515            .score;
2516
2517        // Adding a Reinforces edge raises the graph-relevance term, so a stale cache
2518        // (which returned the empty pre-edge relation list) would leave the score
2519        // unchanged. Asserting the score rises proves store_relation evicted the key.
2520        let rel = Relation {
2521            from_id: target.id,
2522            to_id: neighbor.id,
2523            relation_type: RelationType::Reinforces,
2524            strength: 1.0,
2525            context: None,
2526        };
2527        store.store_relation(&rel).unwrap();
2528
2529        let after = QueryEngine::new(&store).recall(&request).unwrap();
2530        let after_score = after
2531            .iter()
2532            .find(|r| r.memory.id == target.id)
2533            .expect("target must still be recalled")
2534            .score;
2535
2536        assert!(
2537            after_score > before_score,
2538            "recall must reflect the new relation: before={before_score} after={after_score}"
2539        );
2540    }
2541
2542    #[test]
2543    fn get_relations_cached_matches_fresh_uncached_store() {
2544        let machine = "test-machine-equivalence".to_string();
2545        let warm = KuzuStore::in_memory(machine.clone()).unwrap();
2546
2547        let a = memory_with_embedding("memory a", {
2548            let mut e = vec![0.0_f32; 384];
2549            e[0] = 1.0;
2550            e
2551        });
2552        let b = memory_with_embedding("memory b", {
2553            let mut e = vec![0.0_f32; 384];
2554            e[1] = 1.0;
2555            e
2556        });
2557        let c = memory_with_embedding("memory c", {
2558            let mut e = vec![0.0_f32; 384];
2559            e[2] = 1.0;
2560            e
2561        });
2562        for m in [&a, &b, &c] {
2563            warm.store_memory(m).unwrap();
2564        }
2565        let rels = [
2566            Relation {
2567                from_id: a.id,
2568                to_id: b.id,
2569                relation_type: RelationType::RelatesTo,
2570                strength: 0.7,
2571                context: None,
2572            },
2573            Relation {
2574                from_id: a.id,
2575                to_id: c.id,
2576                relation_type: RelationType::Reinforces,
2577                strength: 1.0,
2578                context: None,
2579            },
2580            Relation {
2581                from_id: b.id,
2582                to_id: c.id,
2583                relation_type: RelationType::Supersedes,
2584                strength: 1.0,
2585                context: None,
2586            },
2587        ];
2588        for r in &rels {
2589            warm.store_relation(r).unwrap();
2590        }
2591
2592        // Warm the cache by reading every (id, type) pair twice; the second read is a
2593        // cache hit. A freshly opened store sees the same persisted data with an empty
2594        // cache, so the two must agree edge-for-edge.
2595        let types = [
2596            None,
2597            Some(RelationType::RelatesTo),
2598            Some(RelationType::Reinforces),
2599            Some(RelationType::Supersedes),
2600            Some(RelationType::Mentions),
2601        ];
2602        for id in [a.id, b.id, c.id] {
2603            for rt in types {
2604                let first = warm.get_relations(id, rt).unwrap();
2605                let second = warm.get_relations(id, rt).unwrap();
2606                assert_eq!(
2607                    first.len(),
2608                    second.len(),
2609                    "cache hit must match the live read for ({id}, {rt:?})"
2610                );
2611                let mut a_ids: Vec<Uuid> = first.iter().map(|r| r.to_id).collect();
2612                let mut b_ids: Vec<Uuid> = second.iter().map(|r| r.to_id).collect();
2613                a_ids.sort();
2614                b_ids.sort();
2615                assert_eq!(a_ids, b_ids);
2616            }
2617        }
2618    }
2619}