Skip to main content

ainl_memory/
store.rs

1//! Graph storage backends for AINL memory.
2//!
3//! Defines the [`GraphStore`] trait and the SQLite implementation.
4//!
5//! ## Referential integrity (SQLite)
6//!
7//! `ainl_graph_edges` uses real `FOREIGN KEY (from_id)` / `FOREIGN KEY (to_id)` references to
8//! `ainl_graph_nodes(id)` with `ON DELETE CASCADE`. [`SqliteGraphStore::open`] and
9//! [`SqliteGraphStore::from_connection`] run `PRAGMA foreign_keys = ON` on the handle.
10//!
11//! Databases created before these constraints used a plain edges table; [`SqliteGraphStore::ensure_schema`]
12//! runs a one-time `migrate_edges_add_foreign_keys` rebuild. Edge rows whose endpoints
13//! are missing from `ainl_graph_nodes` **cannot** be kept under FK rules and are **omitted** from
14//! the migrated copy.
15//!
16//! ## Above the database (still recommended)
17//!
18//! - **Eager checks**: [`SqliteGraphStore::write_node_with_edges`], [`SqliteGraphStore::insert_graph_edge_checked`]
19//!   give clear errors without relying on SQLite error text alone.
20//! - **Repair / forensic import**: [`SqliteGraphStore::import_graph`] with `allow_dangling_edges: true`
21//!   is the **supported** way to load snapshots that violate referential integrity: FK enforcement is
22//!   disabled only for the duration of that import, then turned back on. Follow with
23//!   [`SqliteGraphStore::validate_graph`] before resuming normal writes on the same connection.
24//! - **Semantic graph checks**: [`SqliteGraphStore::validate_graph`] (agent-scoped edges, dangling
25//!   diagnostics, cross-agent boundary counts, etc.) — orthogonal to FK row existence.
26//!
27//! SQLite tables integrate with existing openfang-memory schema where applicable.
28
29use crate::node::{AinlMemoryNode, AinlNodeType, MemoryCategory, RuntimeStateNode};
30use crate::snapshot::{
31    AgentGraphSnapshot, DanglingEdgeDetail, GraphValidationReport, SnapshotEdge,
32    SNAPSHOT_SCHEMA_VERSION,
33};
34use chrono::Utc;
35use rusqlite::OptionalExtension;
36use std::collections::HashSet;
37use uuid::Uuid;
38
39/// Graph memory storage trait - swappable backends
40pub trait GraphStore {
41    /// Write a node to storage
42    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String>;
43
44    /// Read a node by ID
45    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String>;
46
47    /// Query episodes since a given timestamp
48    fn query_episodes_since(
49        &self,
50        since_timestamp: i64,
51        limit: usize,
52    ) -> Result<Vec<AinlMemoryNode>, String>;
53
54    /// Find nodes by type
55    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String>;
56
57    /// Walk edges from a node with a given label
58    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
59}
60
61/// SQLite implementation of GraphStore
62///
63/// Integrates with existing openfang-memory schema by adding two tables:
64/// - `ainl_graph_nodes`: stores node payloads
65/// - `ainl_graph_edges`: stores graph edges
66pub struct SqliteGraphStore {
67    conn: rusqlite::Connection,
68}
69
70fn enable_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
71    conn.execute_batch("PRAGMA foreign_keys = ON;")
72}
73
74fn migrate_edge_columns(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
75    let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_edges)")?;
76    let cols = stmt
77        .query_map([], |row| row.get::<_, String>(1))?
78        .collect::<Result<Vec<_>, _>>()?;
79    if !cols.iter().any(|c| c == "weight") {
80        conn.execute(
81            "ALTER TABLE ainl_graph_edges ADD COLUMN weight REAL NOT NULL DEFAULT 1.0",
82            [],
83        )?;
84    }
85    if !cols.iter().any(|c| c == "metadata") {
86        conn.execute("ALTER TABLE ainl_graph_edges ADD COLUMN metadata TEXT", [])?;
87    }
88    Ok(())
89}
90
91/// True when `ainl_graph_edges` declares at least one foreign-key reference (new schema).
92fn edges_table_has_foreign_keys(conn: &rusqlite::Connection) -> Result<bool, rusqlite::Error> {
93    let exists: i64 = conn.query_row(
94        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
95        [],
96        |r| r.get(0),
97    )?;
98    if exists == 0 {
99        return Ok(false);
100    }
101    let n: i64 = conn.query_row(
102        "SELECT COUNT(*) FROM pragma_foreign_key_list('ainl_graph_edges')",
103        [],
104        |r| r.get(0),
105    )?;
106    Ok(n > 0)
107}
108
109/// Rebuild `ainl_graph_edges` with `FOREIGN KEY` constraints. Rows whose endpoints are missing
110/// from `ainl_graph_nodes` are **dropped** (they cannot be represented under FK rules).
111fn migrate_edges_add_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
112    if edges_table_has_foreign_keys(conn)? {
113        return Ok(());
114    }
115
116    let exists: i64 = conn.query_row(
117        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
118        [],
119        |r| r.get(0),
120    )?;
121    if exists == 0 {
122        return Ok(());
123    }
124
125    conn.execute("BEGIN IMMEDIATE", [])?;
126    let res: Result<(), rusqlite::Error> = (|| {
127        conn.execute("DROP INDEX IF EXISTS idx_ainl_edges_from", [])?;
128        conn.execute(
129            "ALTER TABLE ainl_graph_edges RENAME TO ainl_graph_edges__old",
130            [],
131        )?;
132        conn.execute(
133            r#"CREATE TABLE ainl_graph_edges (
134                from_id TEXT NOT NULL,
135                to_id TEXT NOT NULL,
136                label TEXT NOT NULL,
137                weight REAL NOT NULL DEFAULT 1.0,
138                metadata TEXT,
139                PRIMARY KEY (from_id, to_id, label),
140                FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
141                FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
142            )"#,
143            [],
144        )?;
145        conn.execute(
146            r#"INSERT INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
147               SELECT o.from_id, o.to_id, o.label,
148                      COALESCE(o.weight, 1.0),
149                      o.metadata
150               FROM ainl_graph_edges__old o
151               WHERE EXISTS (SELECT 1 FROM ainl_graph_nodes n WHERE n.id = o.from_id)
152                 AND EXISTS (SELECT 1 FROM ainl_graph_nodes n2 WHERE n2.id = o.to_id)"#,
153            [],
154        )?;
155        conn.execute("DROP TABLE ainl_graph_edges__old", [])?;
156        conn.execute(
157            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from ON ainl_graph_edges(from_id, label)",
158            [],
159        )?;
160        Ok(())
161    })();
162
163    match res {
164        Ok(()) => {
165            conn.execute("COMMIT", [])?;
166        }
167        Err(e) => {
168            let _ = conn.execute("ROLLBACK", []);
169            return Err(e);
170        }
171    }
172    Ok(())
173}
174
175fn node_type_name(node: &AinlMemoryNode) -> &'static str {
176    match &node.node_type {
177        AinlNodeType::Episode { .. } => "episode",
178        AinlNodeType::Semantic { .. } => "semantic",
179        AinlNodeType::Procedural { .. } => "procedural",
180        AinlNodeType::Persona { .. } => "persona",
181        AinlNodeType::RuntimeState { .. } => "runtime_state",
182    }
183}
184
185fn node_timestamp(node: &AinlMemoryNode) -> i64 {
186    match &node.node_type {
187        AinlNodeType::Episode { episodic } => episodic.timestamp,
188        AinlNodeType::RuntimeState { runtime_state } => runtime_state.updated_at,
189        _ => chrono::Utc::now().timestamp(),
190    }
191}
192
193fn persist_edge(
194    conn: &rusqlite::Connection,
195    from_id: Uuid,
196    to_id: Uuid,
197    label: &str,
198    weight: f32,
199    metadata: Option<&str>,
200) -> Result<(), String> {
201    conn.execute(
202        "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
203         VALUES (?1, ?2, ?3, ?4, ?5)",
204        rusqlite::params![
205            from_id.to_string(),
206            to_id.to_string(),
207            label,
208            weight,
209            metadata
210        ],
211    )
212    .map_err(|e| e.to_string())?;
213    Ok(())
214}
215
216/// All `ainl_graph_edges` rows whose endpoints are both present in `id_set`, as [`SnapshotEdge`] values.
217fn collect_snapshot_edges_for_id_set(
218    conn: &rusqlite::Connection,
219    id_set: &HashSet<String>,
220) -> Result<Vec<SnapshotEdge>, String> {
221    let mut edge_stmt = conn
222        .prepare("SELECT from_id, to_id, label, weight, metadata FROM ainl_graph_edges")
223        .map_err(|e| e.to_string())?;
224    let edge_rows = edge_stmt
225        .query_map([], |row| {
226            Ok((
227                row.get::<_, String>(0)?,
228                row.get::<_, String>(1)?,
229                row.get::<_, String>(2)?,
230                row.get::<_, f64>(3)?,
231                row.get::<_, Option<String>>(4)?,
232            ))
233        })
234        .map_err(|e| e.to_string())?
235        .collect::<Result<Vec<_>, _>>()
236        .map_err(|e| e.to_string())?;
237
238    let mut edges = Vec::new();
239    for (from_id, to_id, label, weight, meta) in edge_rows {
240        if !id_set.contains(&from_id) || !id_set.contains(&to_id) {
241            continue;
242        }
243        let source_id = Uuid::parse_str(&from_id).map_err(|e| e.to_string())?;
244        let target_id = Uuid::parse_str(&to_id).map_err(|e| e.to_string())?;
245        let metadata = match meta {
246            Some(s) if !s.is_empty() => Some(serde_json::from_str(&s).map_err(|e| e.to_string())?),
247            _ => None,
248        };
249        edges.push(SnapshotEdge {
250            source_id,
251            target_id,
252            edge_type: label,
253            weight: weight as f32,
254            metadata,
255        });
256    }
257    Ok(edges)
258}
259
260fn persist_node(conn: &rusqlite::Connection, node: &AinlMemoryNode) -> Result<(), String> {
261    let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
262    let type_name = node_type_name(node);
263    let timestamp = node_timestamp(node);
264
265    conn.execute(
266        "INSERT OR REPLACE INTO ainl_graph_nodes (id, node_type, payload, timestamp)
267         VALUES (?1, ?2, ?3, ?4)",
268        rusqlite::params![node.id.to_string(), type_name, payload, timestamp,],
269    )
270    .map_err(|e| e.to_string())?;
271
272    for edge in &node.edges {
273        persist_edge(
274            conn,
275            node.id,
276            edge.target_id,
277            &edge.label,
278            1.0,
279            None::<&str>,
280        )?;
281    }
282
283    Ok(())
284}
285
286fn try_insert_node_ignore(
287    conn: &rusqlite::Connection,
288    node: &AinlMemoryNode,
289) -> Result<(), String> {
290    let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
291    let type_name = node_type_name(node);
292    let timestamp = node_timestamp(node);
293    conn.execute(
294        "INSERT OR IGNORE INTO ainl_graph_nodes (id, node_type, payload, timestamp)
295         VALUES (?1, ?2, ?3, ?4)",
296        rusqlite::params![node.id.to_string(), type_name, payload, timestamp,],
297    )
298    .map_err(|e| e.to_string())?;
299    Ok(())
300}
301
302fn try_insert_edge_ignore(conn: &rusqlite::Connection, edge: &SnapshotEdge) -> Result<(), String> {
303    let meta = match &edge.metadata {
304        Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
305        None => None,
306    };
307    conn.execute(
308        "INSERT OR IGNORE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
309         VALUES (?1, ?2, ?3, ?4, ?5)",
310        rusqlite::params![
311            edge.source_id.to_string(),
312            edge.target_id.to_string(),
313            edge.edge_type,
314            edge.weight,
315            meta.as_deref(),
316        ],
317    )
318    .map_err(|e| e.to_string())?;
319    Ok(())
320}
321
322impl SqliteGraphStore {
323    /// Ensure the AINL graph schema exists in the database
324    pub fn ensure_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
325        conn.execute(
326            "CREATE TABLE IF NOT EXISTS ainl_graph_nodes (
327                id TEXT PRIMARY KEY,
328                node_type TEXT NOT NULL,
329                payload TEXT NOT NULL,
330                timestamp INTEGER NOT NULL
331            )",
332            [],
333        )?;
334
335        conn.execute(
336            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_timestamp
337             ON ainl_graph_nodes(timestamp DESC)",
338            [],
339        )?;
340
341        conn.execute(
342            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_type
343             ON ainl_graph_nodes(node_type)",
344            [],
345        )?;
346
347        conn.execute(
348            "CREATE TABLE IF NOT EXISTS ainl_graph_edges (
349                from_id TEXT NOT NULL,
350                to_id TEXT NOT NULL,
351                label TEXT NOT NULL,
352                weight REAL NOT NULL DEFAULT 1.0,
353                metadata TEXT,
354                PRIMARY KEY (from_id, to_id, label),
355                FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
356                FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
357            )",
358            [],
359        )?;
360
361        conn.execute(
362            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from
363             ON ainl_graph_edges(from_id, label)",
364            [],
365        )?;
366
367        migrate_edge_columns(conn)?;
368        migrate_edges_add_foreign_keys(conn)?;
369        Ok(())
370    }
371
372    /// Open/create a graph store at the given path
373    pub fn open(path: &std::path::Path) -> Result<Self, String> {
374        let conn = rusqlite::Connection::open(path).map_err(|e| e.to_string())?;
375        enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
376        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
377        Ok(Self { conn })
378    }
379
380    /// Create from an existing connection (for integration with openfang-memory pool)
381    pub fn from_connection(conn: rusqlite::Connection) -> Result<Self, String> {
382        enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
383        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
384        Ok(Self { conn })
385    }
386
387    /// Low-level access for query builders in this crate.
388    pub(crate) fn conn(&self) -> &rusqlite::Connection {
389        &self.conn
390    }
391
392    /// Insert a directed edge between two node IDs (separate from per-node edge payloads).
393    pub fn insert_graph_edge(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
394        persist_edge(&self.conn, from_id, to_id, label, 1.0, None)
395    }
396
397    /// Like [`Self::insert_graph_edge`], but verifies both endpoints exist first (clear errors for strict runtime wiring).
398    pub fn insert_graph_edge_checked(
399        &self,
400        from_id: Uuid,
401        to_id: Uuid,
402        label: &str,
403    ) -> Result<(), String> {
404        if !self.node_row_exists(&from_id.to_string())? {
405            return Err(format!(
406                "insert_graph_edge_checked: missing source node row {}",
407                from_id
408            ));
409        }
410        if !self.node_row_exists(&to_id.to_string())? {
411            return Err(format!(
412                "insert_graph_edge_checked: missing target node row {}",
413                to_id
414            ));
415        }
416        self.insert_graph_edge(from_id, to_id, label)
417    }
418
419    /// Same as [`Self::insert_graph_edge`], with optional edge weight and JSON metadata.
420    pub fn insert_graph_edge_with_meta(
421        &self,
422        from_id: Uuid,
423        to_id: Uuid,
424        label: &str,
425        weight: f32,
426        metadata: Option<&serde_json::Value>,
427    ) -> Result<(), String> {
428        let meta = metadata
429            .map(serde_json::to_string)
430            .transpose()
431            .map_err(|e| e.to_string())?;
432        persist_edge(&self.conn, from_id, to_id, label, weight, meta.as_deref())
433    }
434
435    /// Nodes of a given `node_type` with `timestamp >= since_timestamp`, most recent first.
436    pub fn query_nodes_by_type_since(
437        &self,
438        node_type: &str,
439        since_timestamp: i64,
440        limit: usize,
441    ) -> Result<Vec<AinlMemoryNode>, String> {
442        let mut stmt = self
443            .conn
444            .prepare(
445                "SELECT payload FROM ainl_graph_nodes
446                 WHERE node_type = ?1 AND timestamp >= ?2
447                 ORDER BY timestamp DESC
448                 LIMIT ?3",
449            )
450            .map_err(|e| e.to_string())?;
451
452        let rows = stmt
453            .query_map(
454                rusqlite::params![node_type, since_timestamp, limit as i64],
455                |row| {
456                    let payload: String = row.get(0)?;
457                    Ok(payload)
458                },
459            )
460            .map_err(|e| e.to_string())?;
461
462        let mut nodes = Vec::new();
463        for row in rows {
464            let payload = row.map_err(|e| e.to_string())?;
465            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
466            nodes.push(node);
467        }
468
469        Ok(nodes)
470    }
471
472    /// Read the most recent persisted [`RuntimeStateNode`] for `agent_id`, if any.
473    ///
474    /// Rows are stored with `node_type = 'runtime_state'` and JSON `$.node_type.runtime_state.agent_id` matching the agent.
475    pub fn read_runtime_state(&self, agent_id: &str) -> Result<Option<RuntimeStateNode>, String> {
476        if agent_id.is_empty() {
477            return Ok(None);
478        }
479        let mut stmt = self
480            .conn
481            .prepare(
482                "SELECT payload FROM ainl_graph_nodes
483                 WHERE node_type = 'runtime_state'
484                   AND json_extract(payload, '$.node_type.runtime_state.agent_id') = ?1
485                 ORDER BY timestamp DESC
486                 LIMIT 1",
487            )
488            .map_err(|e| e.to_string())?;
489
490        let payload_opt: Option<String> = stmt
491            .query_row([agent_id], |row| row.get(0))
492            .optional()
493            .map_err(|e| e.to_string())?;
494
495        let Some(payload) = payload_opt else {
496            return Ok(None);
497        };
498
499        let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
500        match node.node_type {
501            AinlNodeType::RuntimeState { runtime_state } => Ok(Some(runtime_state)),
502            _ => Err("runtime_state row had unexpected node_type payload".to_string()),
503        }
504    }
505
506    /// Upsert one [`RuntimeStateNode`] row per agent (stable id via [`Uuid::new_v5`]).
507    pub fn write_runtime_state(&self, state: &RuntimeStateNode) -> Result<(), String> {
508        let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, state.agent_id.as_bytes());
509        let node = AinlMemoryNode {
510            id,
511            memory_category: MemoryCategory::RuntimeState,
512            importance_score: 0.5,
513            agent_id: state.agent_id.clone(),
514            node_type: AinlNodeType::RuntimeState {
515                runtime_state: state.clone(),
516            },
517            edges: Vec::new(),
518        };
519        self.write_node(&node)
520    }
521
522    /// Write a node and its embedded edges in one transaction; fails if any edge target is missing.
523    pub fn write_node_with_edges(&mut self, node: &AinlMemoryNode) -> Result<(), String> {
524        let tx = self.conn.transaction().map_err(|e| e.to_string())?;
525        for edge in &node.edges {
526            let exists: Option<i32> = tx
527                .query_row(
528                    "SELECT 1 FROM ainl_graph_nodes WHERE id = ?1",
529                    [edge.target_id.to_string()],
530                    |_| Ok(1),
531                )
532                .optional()
533                .map_err(|e| e.to_string())?;
534            if exists.is_none() {
535                return Err(format!(
536                    "write_node_with_edges: missing target node {}",
537                    edge.target_id
538                ));
539            }
540        }
541        persist_node(&tx, node)?;
542        tx.commit().map_err(|e| e.to_string())?;
543        Ok(())
544    }
545
546    /// Validate structural integrity for one agent's induced subgraph.
547    pub fn validate_graph(&self, agent_id: &str) -> Result<GraphValidationReport, String> {
548        use std::collections::HashSet;
549
550        let agent_nodes = self.agent_node_ids(agent_id)?;
551        let node_count = agent_nodes.len();
552
553        let mut stmt = self
554            .conn
555            .prepare("SELECT from_id, to_id, label FROM ainl_graph_edges")
556            .map_err(|e| e.to_string())?;
557        let all_edges: Vec<(String, String, String)> = stmt
558            .query_map([], |row| {
559                Ok((
560                    row.get::<_, String>(0)?,
561                    row.get::<_, String>(1)?,
562                    row.get::<_, String>(2)?,
563                ))
564            })
565            .map_err(|e| e.to_string())?
566            .collect::<Result<Vec<_>, _>>()
567            .map_err(|e| e.to_string())?;
568
569        let mut edge_pairs = Vec::new();
570        for (from_id, to_id, label) in all_edges {
571            let touches_agent = agent_nodes.contains(&from_id) || agent_nodes.contains(&to_id);
572            if touches_agent {
573                edge_pairs.push((from_id, to_id, label));
574            }
575        }
576
577        let edge_count = edge_pairs.len();
578        let mut dangling_edges = Vec::new();
579        let mut dangling_edge_details = Vec::new();
580        let mut cross_agent_boundary_edges = 0usize;
581
582        for (from_id, to_id, label) in &edge_pairs {
583            let from_ok = self.node_row_exists(from_id)?;
584            let to_ok = self.node_row_exists(to_id)?;
585            if !from_ok || !to_ok {
586                dangling_edges.push((from_id.clone(), to_id.clone()));
587                dangling_edge_details.push(DanglingEdgeDetail {
588                    source_id: from_id.clone(),
589                    target_id: to_id.clone(),
590                    edge_type: label.clone(),
591                });
592                continue;
593            }
594            let fa = agent_nodes.contains(from_id);
595            let ta = agent_nodes.contains(to_id);
596            if fa ^ ta {
597                cross_agent_boundary_edges += 1;
598            }
599        }
600
601        let mut touched: HashSet<String> =
602            HashSet::with_capacity(edge_pairs.len().saturating_mul(2));
603        for (a, b, _) in &edge_pairs {
604            if agent_nodes.contains(a) {
605                touched.insert(a.clone());
606            }
607            if agent_nodes.contains(b) {
608                touched.insert(b.clone());
609            }
610        }
611
612        let mut orphan_nodes = Vec::new();
613        for id in &agent_nodes {
614            if !touched.contains(id) {
615                orphan_nodes.push(id.clone());
616            }
617        }
618
619        let is_valid = dangling_edges.is_empty();
620        Ok(GraphValidationReport {
621            agent_id: agent_id.to_string(),
622            node_count,
623            edge_count,
624            dangling_edges,
625            dangling_edge_details,
626            cross_agent_boundary_edges,
627            orphan_nodes,
628            is_valid,
629        })
630    }
631
632    fn node_row_exists(&self, id: &str) -> Result<bool, String> {
633        let v: Option<i32> = self
634            .conn
635            .query_row("SELECT 1 FROM ainl_graph_nodes WHERE id = ?1", [id], |_| {
636                Ok(1)
637            })
638            .optional()
639            .map_err(|e| e.to_string())?;
640        Ok(v.is_some())
641    }
642
643    fn agent_node_ids(&self, agent_id: &str) -> Result<HashSet<String>, String> {
644        let mut stmt = self
645            .conn
646            .prepare(
647                "SELECT id FROM ainl_graph_nodes
648                 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
649            )
650            .map_err(|e| e.to_string())?;
651        let ids = stmt
652            .query_map([agent_id], |row| row.get::<_, String>(0))
653            .map_err(|e| e.to_string())?
654            .collect::<Result<HashSet<_>, _>>()
655            .map_err(|e| e.to_string())?;
656        Ok(ids)
657    }
658
659    /// Directed edges where **both** endpoints are nodes owned by `agent_id` (aligned with [`Self::export_graph`] edge set).
660    pub fn agent_subgraph_edges(&self, agent_id: &str) -> Result<Vec<SnapshotEdge>, String> {
661        let id_set = self.agent_node_ids(agent_id)?;
662        collect_snapshot_edges_for_id_set(&self.conn, &id_set)
663    }
664
665    /// Export all nodes and interconnecting edges for `agent_id`.
666    pub fn export_graph(&self, agent_id: &str) -> Result<AgentGraphSnapshot, String> {
667        let mut stmt = self
668            .conn
669            .prepare(
670                "SELECT payload FROM ainl_graph_nodes
671                 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
672            )
673            .map_err(|e| e.to_string())?;
674        let nodes: Vec<AinlMemoryNode> = stmt
675            .query_map([agent_id], |row| {
676                let payload: String = row.get(0)?;
677                Ok(payload)
678            })
679            .map_err(|e| e.to_string())?
680            .map(|r| {
681                let payload = r.map_err(|e| e.to_string())?;
682                serde_json::from_str(&payload).map_err(|e| e.to_string())
683            })
684            .collect::<Result<Vec<_>, _>>()?;
685
686        let id_set: std::collections::HashSet<String> =
687            nodes.iter().map(|n| n.id.to_string()).collect();
688
689        let edges = collect_snapshot_edges_for_id_set(&self.conn, &id_set)?;
690
691        Ok(AgentGraphSnapshot {
692            agent_id: agent_id.to_string(),
693            exported_at: Utc::now(),
694            schema_version: std::borrow::Cow::Borrowed(SNAPSHOT_SCHEMA_VERSION),
695            nodes,
696            edges,
697        })
698    }
699
700    /// Import a snapshot in one transaction (`INSERT OR IGNORE` per row).
701    ///
702    /// * `allow_dangling_edges == false` (**default / production**): `PRAGMA foreign_keys` stays
703    ///   enabled; every edge must reference existing node rows after inserts (same invariants as
704    ///   [`Self::write_node_with_edges`]).
705    /// * `allow_dangling_edges == true` (**repair / forensic**): FK checks are disabled only for
706    ///   this import so partially invalid snapshots can be loaded; run [`Self::validate_graph`]
707    ///   afterward and repair before returning to normal writes.
708    pub fn import_graph(
709        &mut self,
710        snapshot: &AgentGraphSnapshot,
711        allow_dangling_edges: bool,
712    ) -> Result<(), String> {
713        if allow_dangling_edges {
714            self.conn
715                .execute_batch("PRAGMA foreign_keys = OFF;")
716                .map_err(|e| e.to_string())?;
717        }
718
719        let result: Result<(), String> = (|| {
720            let tx = self.conn.transaction().map_err(|e| e.to_string())?;
721            for node in &snapshot.nodes {
722                try_insert_node_ignore(&tx, node)?;
723            }
724            for edge in &snapshot.edges {
725                try_insert_edge_ignore(&tx, edge)?;
726            }
727            tx.commit().map_err(|e| e.to_string())?;
728            Ok(())
729        })();
730
731        if allow_dangling_edges {
732            self.conn
733                .execute_batch("PRAGMA foreign_keys = ON;")
734                .map_err(|e| e.to_string())?;
735        }
736
737        result
738    }
739}
740
741impl GraphStore for SqliteGraphStore {
742    /// Persists the full node JSON under `id` via `INSERT OR REPLACE` (upsert).
743    /// Backfill pattern: `read_node` → patch fields (e.g. episodic signals) → `write_node`, preserving loaded `edges`.
744    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String> {
745        persist_node(&self.conn, node)
746    }
747
748    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String> {
749        let payload: Option<String> = self
750            .conn
751            .query_row(
752                "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
753                [id.to_string()],
754                |row| row.get::<_, String>(0),
755            )
756            .optional()
757            .map_err(|e: rusqlite::Error| e.to_string())?;
758
759        match payload {
760            Some(p) => {
761                let node: AinlMemoryNode = serde_json::from_str(&p).map_err(|e| e.to_string())?;
762                Ok(Some(node))
763            }
764            None => Ok(None),
765        }
766    }
767
768    fn query_episodes_since(
769        &self,
770        since_timestamp: i64,
771        limit: usize,
772    ) -> Result<Vec<AinlMemoryNode>, String> {
773        let mut stmt = self
774            .conn
775            .prepare(
776                "SELECT payload FROM ainl_graph_nodes
777                 WHERE node_type = 'episode' AND timestamp >= ?1
778                 ORDER BY timestamp DESC
779                 LIMIT ?2",
780            )
781            .map_err(|e| e.to_string())?;
782
783        let rows = stmt
784            .query_map([since_timestamp, limit as i64], |row| {
785                let payload: String = row.get(0)?;
786                Ok(payload)
787            })
788            .map_err(|e| e.to_string())?;
789
790        let mut nodes = Vec::new();
791        for row in rows {
792            let payload = row.map_err(|e| e.to_string())?;
793            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
794            nodes.push(node);
795        }
796
797        Ok(nodes)
798    }
799
800    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String> {
801        let mut stmt = self
802            .conn
803            .prepare(
804                "SELECT payload FROM ainl_graph_nodes
805                 WHERE node_type = ?1
806                 ORDER BY timestamp DESC",
807            )
808            .map_err(|e| e.to_string())?;
809
810        let rows = stmt
811            .query_map([type_name], |row| {
812                let payload: String = row.get(0)?;
813                Ok(payload)
814            })
815            .map_err(|e| e.to_string())?;
816
817        let mut nodes = Vec::new();
818        for row in rows {
819            let payload = row.map_err(|e| e.to_string())?;
820            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
821            nodes.push(node);
822        }
823
824        Ok(nodes)
825    }
826
827    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
828        let mut stmt = self
829            .conn
830            .prepare(
831                "SELECT to_id FROM ainl_graph_edges
832                 WHERE from_id = ?1 AND label = ?2",
833            )
834            .map_err(|e| e.to_string())?;
835
836        let target_ids: Vec<String> = stmt
837            .query_map([from_id.to_string(), label.to_string()], |row| row.get(0))
838            .map_err(|e| e.to_string())?
839            .collect::<Result<Vec<_>, _>>()
840            .map_err(|e| e.to_string())?;
841
842        let mut nodes = Vec::new();
843        for target_id in target_ids {
844            let id = Uuid::parse_str(&target_id).map_err(|e| e.to_string())?;
845            if let Some(node) = self.read_node(id)? {
846                nodes.push(node);
847            }
848        }
849
850        Ok(nodes)
851    }
852}