Skip to main content

ainl_memory/
store.rs

1//! Graph storage backends for AINL memory.
2//!
3//! Defines the [`GraphStore`] trait and the SQLite implementation.
4//!
5//! ## Referential integrity (SQLite)
6//!
7//! `ainl_graph_edges` uses real `FOREIGN KEY (from_id)` / `FOREIGN KEY (to_id)` references to
8//! `ainl_graph_nodes(id)` with `ON DELETE CASCADE`. [`SqliteGraphStore::open`] and
9//! [`SqliteGraphStore::from_connection`] run `PRAGMA foreign_keys = ON` on the handle.
10//!
11//! Databases created before these constraints used a plain edges table; [`SqliteGraphStore::ensure_schema`]
12//! runs a one-time `migrate_edges_add_foreign_keys` rebuild. Edge rows whose endpoints
13//! are missing from `ainl_graph_nodes` **cannot** be kept under FK rules and are **omitted** from
14//! the migrated copy.
15//!
16//! ## Above the database (still recommended)
17//!
18//! - **Eager checks**: [`SqliteGraphStore::write_node_with_edges`], [`SqliteGraphStore::insert_graph_edge_checked`]
19//!   give clear errors without relying on SQLite error text alone.
20//! - **Repair / forensic import**: [`SqliteGraphStore::import_graph`] with `allow_dangling_edges: true`
21//!   is the **supported** way to load snapshots that violate referential integrity: FK enforcement is
22//!   disabled only for the duration of that import, then turned back on. Follow with
23//!   [`SqliteGraphStore::validate_graph`] before resuming normal writes on the same connection.
24//! - **Semantic graph checks**: [`SqliteGraphStore::validate_graph`] (agent-scoped edges, dangling
25//!   diagnostics, cross-agent boundary counts, etc.) — orthogonal to FK row existence.
26//!
27//! SQLite tables integrate with existing openfang-memory schema where applicable.
28
29use crate::node::{AinlMemoryNode, AinlNodeType, MemoryCategory, RuntimeStateNode};
30use crate::snapshot::{
31    AgentGraphSnapshot, DanglingEdgeDetail, GraphValidationReport, SnapshotEdge, SNAPSHOT_SCHEMA_VERSION,
32};
33use chrono::Utc;
34use rusqlite::OptionalExtension;
35use std::collections::HashSet;
36use uuid::Uuid;
37
38/// Graph memory storage trait - swappable backends
39pub trait GraphStore {
40    /// Write a node to storage
41    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String>;
42
43    /// Read a node by ID
44    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String>;
45
46    /// Query episodes since a given timestamp
47    fn query_episodes_since(
48        &self,
49        since_timestamp: i64,
50        limit: usize,
51    ) -> Result<Vec<AinlMemoryNode>, String>;
52
53    /// Find nodes by type
54    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String>;
55
56    /// Walk edges from a node with a given label
57    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
58}
59
60/// SQLite implementation of GraphStore
61///
62/// Integrates with existing openfang-memory schema by adding two tables:
63/// - `ainl_graph_nodes`: stores node payloads
64/// - `ainl_graph_edges`: stores graph edges
65pub struct SqliteGraphStore {
66    conn: rusqlite::Connection,
67}
68
69fn enable_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
70    conn.execute_batch("PRAGMA foreign_keys = ON;")
71}
72
73fn migrate_edge_columns(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
74    let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_edges)")?;
75    let cols = stmt
76        .query_map([], |row| row.get::<_, String>(1))?
77        .collect::<Result<Vec<_>, _>>()?;
78    if !cols.iter().any(|c| c == "weight") {
79        conn.execute(
80            "ALTER TABLE ainl_graph_edges ADD COLUMN weight REAL NOT NULL DEFAULT 1.0",
81            [],
82        )?;
83    }
84    if !cols.iter().any(|c| c == "metadata") {
85        conn.execute("ALTER TABLE ainl_graph_edges ADD COLUMN metadata TEXT", [])?;
86    }
87    Ok(())
88}
89
90/// True when `ainl_graph_edges` declares at least one foreign-key reference (new schema).
91fn edges_table_has_foreign_keys(conn: &rusqlite::Connection) -> Result<bool, rusqlite::Error> {
92    let exists: i64 = conn.query_row(
93        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
94        [],
95        |r| r.get(0),
96    )?;
97    if exists == 0 {
98        return Ok(false);
99    }
100    let n: i64 = conn.query_row(
101        "SELECT COUNT(*) FROM pragma_foreign_key_list('ainl_graph_edges')",
102        [],
103        |r| r.get(0),
104    )?;
105    Ok(n > 0)
106}
107
108/// Rebuild `ainl_graph_edges` with `FOREIGN KEY` constraints. Rows whose endpoints are missing
109/// from `ainl_graph_nodes` are **dropped** (they cannot be represented under FK rules).
110fn migrate_edges_add_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
111    if edges_table_has_foreign_keys(conn)? {
112        return Ok(());
113    }
114
115    let exists: i64 = conn.query_row(
116        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
117        [],
118        |r| r.get(0),
119    )?;
120    if exists == 0 {
121        return Ok(());
122    }
123
124    conn.execute("BEGIN IMMEDIATE", [])?;
125    let res: Result<(), rusqlite::Error> = (|| {
126        conn.execute("DROP INDEX IF EXISTS idx_ainl_edges_from", [])?;
127        conn.execute(
128            "ALTER TABLE ainl_graph_edges RENAME TO ainl_graph_edges__old",
129            [],
130        )?;
131        conn.execute(
132            r#"CREATE TABLE ainl_graph_edges (
133                from_id TEXT NOT NULL,
134                to_id TEXT NOT NULL,
135                label TEXT NOT NULL,
136                weight REAL NOT NULL DEFAULT 1.0,
137                metadata TEXT,
138                PRIMARY KEY (from_id, to_id, label),
139                FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
140                FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
141            )"#,
142            [],
143        )?;
144        conn.execute(
145            r#"INSERT INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
146               SELECT o.from_id, o.to_id, o.label,
147                      COALESCE(o.weight, 1.0),
148                      o.metadata
149               FROM ainl_graph_edges__old o
150               WHERE EXISTS (SELECT 1 FROM ainl_graph_nodes n WHERE n.id = o.from_id)
151                 AND EXISTS (SELECT 1 FROM ainl_graph_nodes n2 WHERE n2.id = o.to_id)"#,
152            [],
153        )?;
154        conn.execute("DROP TABLE ainl_graph_edges__old", [])?;
155        conn.execute(
156            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from ON ainl_graph_edges(from_id, label)",
157            [],
158        )?;
159        Ok(())
160    })();
161
162    match res {
163        Ok(()) => {
164            conn.execute("COMMIT", [])?;
165        }
166        Err(e) => {
167            let _ = conn.execute("ROLLBACK", []);
168            return Err(e);
169        }
170    }
171    Ok(())
172}
173
174fn node_type_name(node: &AinlMemoryNode) -> &'static str {
175    match &node.node_type {
176        AinlNodeType::Episode { .. } => "episode",
177        AinlNodeType::Semantic { .. } => "semantic",
178        AinlNodeType::Procedural { .. } => "procedural",
179        AinlNodeType::Persona { .. } => "persona",
180        AinlNodeType::RuntimeState { .. } => "runtime_state",
181    }
182}
183
184fn runtime_state_timestamp(rs: &RuntimeStateNode) -> i64 {
185    chrono::DateTime::parse_from_rfc3339(&rs.updated_at)
186        .map(|d| d.timestamp())
187        .unwrap_or_else(|_| Utc::now().timestamp())
188}
189
190fn node_timestamp(node: &AinlMemoryNode) -> i64 {
191    match &node.node_type {
192        AinlNodeType::Episode { episodic } => episodic.timestamp,
193        AinlNodeType::RuntimeState { runtime_state } => runtime_state_timestamp(runtime_state),
194        _ => chrono::Utc::now().timestamp(),
195    }
196}
197
198fn persist_edge(
199    conn: &rusqlite::Connection,
200    from_id: Uuid,
201    to_id: Uuid,
202    label: &str,
203    weight: f32,
204    metadata: Option<&str>,
205) -> Result<(), String> {
206    conn.execute(
207        "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
208         VALUES (?1, ?2, ?3, ?4, ?5)",
209        rusqlite::params![from_id.to_string(), to_id.to_string(), label, weight, metadata],
210    )
211    .map_err(|e| e.to_string())?;
212    Ok(())
213}
214
215/// All `ainl_graph_edges` rows whose endpoints are both present in `id_set`, as [`SnapshotEdge`] values.
216fn collect_snapshot_edges_for_id_set(
217    conn: &rusqlite::Connection,
218    id_set: &HashSet<String>,
219) -> Result<Vec<SnapshotEdge>, String> {
220    let mut edge_stmt = conn
221        .prepare("SELECT from_id, to_id, label, weight, metadata FROM ainl_graph_edges")
222        .map_err(|e| e.to_string())?;
223    let edge_rows = edge_stmt
224        .query_map([], |row| {
225            Ok((
226                row.get::<_, String>(0)?,
227                row.get::<_, String>(1)?,
228                row.get::<_, String>(2)?,
229                row.get::<_, f64>(3)?,
230                row.get::<_, Option<String>>(4)?,
231            ))
232        })
233        .map_err(|e| e.to_string())?
234        .collect::<Result<Vec<_>, _>>()
235        .map_err(|e| e.to_string())?;
236
237    let mut edges = Vec::new();
238    for (from_id, to_id, label, weight, meta) in edge_rows {
239        if !id_set.contains(&from_id) || !id_set.contains(&to_id) {
240            continue;
241        }
242        let source_id = Uuid::parse_str(&from_id).map_err(|e| e.to_string())?;
243        let target_id = Uuid::parse_str(&to_id).map_err(|e| e.to_string())?;
244        let metadata = match meta {
245            Some(s) if !s.is_empty() => Some(serde_json::from_str(&s).map_err(|e| e.to_string())?),
246            _ => None,
247        };
248        edges.push(SnapshotEdge {
249            source_id,
250            target_id,
251            edge_type: label,
252            weight: weight as f32,
253            metadata,
254        });
255    }
256    Ok(edges)
257}
258
259fn persist_node(conn: &rusqlite::Connection, node: &AinlMemoryNode) -> Result<(), String> {
260    let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
261    let type_name = node_type_name(node);
262    let timestamp = node_timestamp(node);
263
264    conn.execute(
265        "INSERT OR REPLACE INTO ainl_graph_nodes (id, node_type, payload, timestamp)
266         VALUES (?1, ?2, ?3, ?4)",
267        rusqlite::params![node.id.to_string(), type_name, payload, timestamp,],
268    )
269    .map_err(|e| e.to_string())?;
270
271    for edge in &node.edges {
272        persist_edge(conn, node.id, edge.target_id, &edge.label, 1.0, None::<&str>)?;
273    }
274
275    Ok(())
276}
277
278fn try_insert_node_ignore(conn: &rusqlite::Connection, node: &AinlMemoryNode) -> Result<(), String> {
279    let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
280    let type_name = node_type_name(node);
281    let timestamp = node_timestamp(node);
282    conn.execute(
283        "INSERT OR IGNORE INTO ainl_graph_nodes (id, node_type, payload, timestamp)
284         VALUES (?1, ?2, ?3, ?4)",
285        rusqlite::params![node.id.to_string(), type_name, payload, timestamp,],
286    )
287    .map_err(|e| e.to_string())?;
288    Ok(())
289}
290
291fn try_insert_edge_ignore(
292    conn: &rusqlite::Connection,
293    edge: &SnapshotEdge,
294) -> Result<(), String> {
295    let meta = match &edge.metadata {
296        Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
297        None => None,
298    };
299    conn.execute(
300        "INSERT OR IGNORE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
301         VALUES (?1, ?2, ?3, ?4, ?5)",
302        rusqlite::params![
303            edge.source_id.to_string(),
304            edge.target_id.to_string(),
305            edge.edge_type,
306            edge.weight,
307            meta.as_deref(),
308        ],
309    )
310    .map_err(|e| e.to_string())?;
311    Ok(())
312}
313
314impl SqliteGraphStore {
315    /// Ensure the AINL graph schema exists in the database
316    pub fn ensure_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
317        conn.execute(
318            "CREATE TABLE IF NOT EXISTS ainl_graph_nodes (
319                id TEXT PRIMARY KEY,
320                node_type TEXT NOT NULL,
321                payload TEXT NOT NULL,
322                timestamp INTEGER NOT NULL
323            )",
324            [],
325        )?;
326
327        conn.execute(
328            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_timestamp
329             ON ainl_graph_nodes(timestamp DESC)",
330            [],
331        )?;
332
333        conn.execute(
334            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_type
335             ON ainl_graph_nodes(node_type)",
336            [],
337        )?;
338
339        conn.execute(
340            "CREATE TABLE IF NOT EXISTS ainl_graph_edges (
341                from_id TEXT NOT NULL,
342                to_id TEXT NOT NULL,
343                label TEXT NOT NULL,
344                weight REAL NOT NULL DEFAULT 1.0,
345                metadata TEXT,
346                PRIMARY KEY (from_id, to_id, label),
347                FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
348                FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
349            )",
350            [],
351        )?;
352
353        conn.execute(
354            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from
355             ON ainl_graph_edges(from_id, label)",
356            [],
357        )?;
358
359        migrate_edge_columns(conn)?;
360        migrate_edges_add_foreign_keys(conn)?;
361        Ok(())
362    }
363
364    /// Open/create a graph store at the given path
365    pub fn open(path: &std::path::Path) -> Result<Self, String> {
366        let conn = rusqlite::Connection::open(path).map_err(|e| e.to_string())?;
367        enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
368        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
369        Ok(Self { conn })
370    }
371
372    /// Create from an existing connection (for integration with openfang-memory pool)
373    pub fn from_connection(conn: rusqlite::Connection) -> Result<Self, String> {
374        enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
375        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
376        Ok(Self { conn })
377    }
378
379    /// Low-level access for query builders in this crate.
380    pub(crate) fn conn(&self) -> &rusqlite::Connection {
381        &self.conn
382    }
383
384    /// Insert a directed edge between two node IDs (separate from per-node edge payloads).
385    pub fn insert_graph_edge(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
386        persist_edge(&self.conn, from_id, to_id, label, 1.0, None)
387    }
388
389    /// Like [`Self::insert_graph_edge`], but verifies both endpoints exist first (clear errors for strict runtime wiring).
390    pub fn insert_graph_edge_checked(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
391        if !self.node_row_exists(&from_id.to_string())? {
392            return Err(format!(
393                "insert_graph_edge_checked: missing source node row {}",
394                from_id
395            ));
396        }
397        if !self.node_row_exists(&to_id.to_string())? {
398            return Err(format!(
399                "insert_graph_edge_checked: missing target node row {}",
400                to_id
401            ));
402        }
403        self.insert_graph_edge(from_id, to_id, label)
404    }
405
406    /// Same as [`Self::insert_graph_edge`], with optional edge weight and JSON metadata.
407    pub fn insert_graph_edge_with_meta(
408        &self,
409        from_id: Uuid,
410        to_id: Uuid,
411        label: &str,
412        weight: f32,
413        metadata: Option<&serde_json::Value>,
414    ) -> Result<(), String> {
415        let meta = metadata.map(serde_json::to_string).transpose().map_err(|e| e.to_string())?;
416        persist_edge(&self.conn, from_id, to_id, label, weight, meta.as_deref())
417    }
418
419    /// Nodes of a given `node_type` with `timestamp >= since_timestamp`, most recent first.
420    pub fn query_nodes_by_type_since(
421        &self,
422        node_type: &str,
423        since_timestamp: i64,
424        limit: usize,
425    ) -> Result<Vec<AinlMemoryNode>, String> {
426        let mut stmt = self
427            .conn
428            .prepare(
429                "SELECT payload FROM ainl_graph_nodes
430                 WHERE node_type = ?1 AND timestamp >= ?2
431                 ORDER BY timestamp DESC
432                 LIMIT ?3",
433            )
434            .map_err(|e| e.to_string())?;
435
436        let rows = stmt
437            .query_map(
438                rusqlite::params![node_type, since_timestamp, limit as i64],
439                |row| {
440                    let payload: String = row.get(0)?;
441                    Ok(payload)
442                },
443            )
444            .map_err(|e| e.to_string())?;
445
446        let mut nodes = Vec::new();
447        for row in rows {
448            let payload = row.map_err(|e| e.to_string())?;
449            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
450            nodes.push(node);
451        }
452
453        Ok(nodes)
454    }
455
456    /// Load the latest persisted [`RuntimeStateNode`] for `agent_id`, if any.
457    ///
458    /// Rows are stored with `node_type = 'runtime_state'` and JSON `$.agent_id` matching the agent.
459    pub fn load_runtime_state(&self, agent_id: &str) -> Result<Option<RuntimeStateNode>, String> {
460        if agent_id.is_empty() {
461            return Ok(None);
462        }
463        let mut stmt = self
464            .conn
465            .prepare(
466                "SELECT payload FROM ainl_graph_nodes
467                 WHERE node_type = 'runtime_state'
468                   AND json_extract(payload, '$.node_type.runtime_state.agent_id') = ?1
469                 ORDER BY timestamp DESC
470                 LIMIT 1",
471            )
472            .map_err(|e| e.to_string())?;
473
474        let payload_opt: Option<String> = stmt
475            .query_row([agent_id], |row| row.get(0))
476            .optional()
477            .map_err(|e| e.to_string())?;
478
479        let Some(payload) = payload_opt else {
480            return Ok(None);
481        };
482
483        let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
484        match node.node_type {
485            AinlNodeType::RuntimeState { runtime_state } => Ok(Some(runtime_state)),
486            _ => Err("runtime_state row had unexpected node_type payload".to_string()),
487        }
488    }
489
490    /// Upsert one [`RuntimeStateNode`] row per agent (stable id via [`Uuid::new_v5`]).
491    pub fn save_runtime_state(&self, state: &RuntimeStateNode) -> Result<(), String> {
492        let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, state.agent_id.as_bytes());
493        let node = AinlMemoryNode {
494            id,
495            memory_category: MemoryCategory::RuntimeState,
496            importance_score: 0.5,
497            agent_id: state.agent_id.clone(),
498            node_type: AinlNodeType::RuntimeState {
499                runtime_state: state.clone(),
500            },
501            edges: Vec::new(),
502        };
503        self.write_node(&node)
504    }
505
506    /// Write a node and its embedded edges in one transaction; fails if any edge target is missing.
507    pub fn write_node_with_edges(&mut self, node: &AinlMemoryNode) -> Result<(), String> {
508        let tx = self.conn.transaction().map_err(|e| e.to_string())?;
509        for edge in &node.edges {
510            let exists: Option<i32> = tx
511                .query_row(
512                    "SELECT 1 FROM ainl_graph_nodes WHERE id = ?1",
513                    [edge.target_id.to_string()],
514                    |_| Ok(1),
515                )
516                .optional()
517                .map_err(|e| e.to_string())?;
518            if exists.is_none() {
519                return Err(format!(
520                    "write_node_with_edges: missing target node {}",
521                    edge.target_id
522                ));
523            }
524        }
525        persist_node(&tx, node)?;
526        tx.commit().map_err(|e| e.to_string())?;
527        Ok(())
528    }
529
530    /// Validate structural integrity for one agent's induced subgraph.
531    pub fn validate_graph(&self, agent_id: &str) -> Result<GraphValidationReport, String> {
532        use std::collections::HashSet;
533
534        let agent_nodes = self.agent_node_ids(agent_id)?;
535        let node_count = agent_nodes.len();
536
537        let mut stmt = self
538            .conn
539            .prepare("SELECT from_id, to_id, label FROM ainl_graph_edges")
540            .map_err(|e| e.to_string())?;
541        let all_edges: Vec<(String, String, String)> = stmt
542            .query_map([], |row| {
543                Ok((
544                    row.get::<_, String>(0)?,
545                    row.get::<_, String>(1)?,
546                    row.get::<_, String>(2)?,
547                ))
548            })
549            .map_err(|e| e.to_string())?
550            .collect::<Result<Vec<_>, _>>()
551            .map_err(|e| e.to_string())?;
552
553        let mut edge_pairs = Vec::new();
554        for (from_id, to_id, label) in all_edges {
555            let touches_agent =
556                agent_nodes.contains(&from_id) || agent_nodes.contains(&to_id);
557            if touches_agent {
558                edge_pairs.push((from_id, to_id, label));
559            }
560        }
561
562        let edge_count = edge_pairs.len();
563        let mut dangling_edges = Vec::new();
564        let mut dangling_edge_details = Vec::new();
565        let mut cross_agent_boundary_edges = 0usize;
566
567        for (from_id, to_id, label) in &edge_pairs {
568            let from_ok = self.node_row_exists(from_id)?;
569            let to_ok = self.node_row_exists(to_id)?;
570            if !from_ok || !to_ok {
571                dangling_edges.push((from_id.clone(), to_id.clone()));
572                dangling_edge_details.push(DanglingEdgeDetail {
573                    source_id: from_id.clone(),
574                    target_id: to_id.clone(),
575                    edge_type: label.clone(),
576                });
577                continue;
578            }
579            let fa = agent_nodes.contains(from_id);
580            let ta = agent_nodes.contains(to_id);
581            if fa ^ ta {
582                cross_agent_boundary_edges += 1;
583            }
584        }
585
586        let mut touched: HashSet<String> = HashSet::with_capacity(edge_pairs.len().saturating_mul(2));
587        for (a, b, _) in &edge_pairs {
588            if agent_nodes.contains(a) {
589                touched.insert(a.clone());
590            }
591            if agent_nodes.contains(b) {
592                touched.insert(b.clone());
593            }
594        }
595
596        let mut orphan_nodes = Vec::new();
597        for id in &agent_nodes {
598            if !touched.contains(id) {
599                orphan_nodes.push(id.clone());
600            }
601        }
602
603        let is_valid = dangling_edges.is_empty();
604        Ok(GraphValidationReport {
605            agent_id: agent_id.to_string(),
606            node_count,
607            edge_count,
608            dangling_edges,
609            dangling_edge_details,
610            cross_agent_boundary_edges,
611            orphan_nodes,
612            is_valid,
613        })
614    }
615
616    fn node_row_exists(&self, id: &str) -> Result<bool, String> {
617        let v: Option<i32> = self
618            .conn
619            .query_row("SELECT 1 FROM ainl_graph_nodes WHERE id = ?1", [id], |_| Ok(1))
620            .optional()
621            .map_err(|e| e.to_string())?;
622        Ok(v.is_some())
623    }
624
625    fn agent_node_ids(&self, agent_id: &str) -> Result<HashSet<String>, String> {
626        let mut stmt = self
627            .conn
628            .prepare(
629                "SELECT id FROM ainl_graph_nodes
630                 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
631            )
632            .map_err(|e| e.to_string())?;
633        let ids = stmt
634            .query_map([agent_id], |row| row.get::<_, String>(0))
635            .map_err(|e| e.to_string())?
636            .collect::<Result<HashSet<_>, _>>()
637            .map_err(|e| e.to_string())?;
638        Ok(ids)
639    }
640
641    /// Directed edges where **both** endpoints are nodes owned by `agent_id` (aligned with [`Self::export_graph`] edge set).
642    pub fn agent_subgraph_edges(&self, agent_id: &str) -> Result<Vec<SnapshotEdge>, String> {
643        let id_set = self.agent_node_ids(agent_id)?;
644        collect_snapshot_edges_for_id_set(&self.conn, &id_set)
645    }
646
647    /// Export all nodes and interconnecting edges for `agent_id`.
648    pub fn export_graph(&self, agent_id: &str) -> Result<AgentGraphSnapshot, String> {
649        let mut stmt = self
650            .conn
651            .prepare(
652                "SELECT payload FROM ainl_graph_nodes
653                 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
654            )
655            .map_err(|e| e.to_string())?;
656        let nodes: Vec<AinlMemoryNode> = stmt
657            .query_map([agent_id], |row| {
658                let payload: String = row.get(0)?;
659                Ok(payload)
660            })
661            .map_err(|e| e.to_string())?
662            .map(|r| {
663                let payload = r.map_err(|e| e.to_string())?;
664                serde_json::from_str(&payload).map_err(|e| e.to_string())
665            })
666            .collect::<Result<Vec<_>, _>>()?;
667
668        let id_set: std::collections::HashSet<String> =
669            nodes.iter().map(|n| n.id.to_string()).collect();
670
671        let edges = collect_snapshot_edges_for_id_set(&self.conn, &id_set)?;
672
673        Ok(AgentGraphSnapshot {
674            agent_id: agent_id.to_string(),
675            exported_at: Utc::now(),
676            schema_version: std::borrow::Cow::Borrowed(SNAPSHOT_SCHEMA_VERSION),
677            nodes,
678            edges,
679        })
680    }
681
682    /// Import a snapshot in one transaction (`INSERT OR IGNORE` per row).
683    ///
684    /// * `allow_dangling_edges == false` (**default / production**): `PRAGMA foreign_keys` stays
685    ///   enabled; every edge must reference existing node rows after inserts (same invariants as
686    ///   [`Self::write_node_with_edges`]).
687    /// * `allow_dangling_edges == true` (**repair / forensic**): FK checks are disabled only for
688    ///   this import so partially invalid snapshots can be loaded; run [`Self::validate_graph`]
689    ///   afterward and repair before returning to normal writes.
690    pub fn import_graph(
691        &mut self,
692        snapshot: &AgentGraphSnapshot,
693        allow_dangling_edges: bool,
694    ) -> Result<(), String> {
695        if allow_dangling_edges {
696            self.conn
697                .execute_batch("PRAGMA foreign_keys = OFF;")
698                .map_err(|e| e.to_string())?;
699        }
700
701        let result: Result<(), String> = (|| {
702            let tx = self.conn.transaction().map_err(|e| e.to_string())?;
703            for node in &snapshot.nodes {
704                try_insert_node_ignore(&tx, node)?;
705            }
706            for edge in &snapshot.edges {
707                try_insert_edge_ignore(&tx, edge)?;
708            }
709            tx.commit().map_err(|e| e.to_string())?;
710            Ok(())
711        })();
712
713        if allow_dangling_edges {
714            self.conn
715                .execute_batch("PRAGMA foreign_keys = ON;")
716                .map_err(|e| e.to_string())?;
717        }
718
719        result
720    }
721}
722
723impl GraphStore for SqliteGraphStore {
724    /// Persists the full node JSON under `id` via `INSERT OR REPLACE` (upsert).
725    /// Backfill pattern: `read_node` → patch fields (e.g. episodic signals) → `write_node`, preserving loaded `edges`.
726    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String> {
727        persist_node(&self.conn, node)
728    }
729
730    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String> {
731        let payload: Option<String> = self
732            .conn
733            .query_row(
734                "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
735                [id.to_string()],
736                |row| row.get::<_, String>(0),
737            )
738            .optional()
739            .map_err(|e: rusqlite::Error| e.to_string())?;
740
741        match payload {
742            Some(p) => {
743                let node: AinlMemoryNode = serde_json::from_str(&p).map_err(|e| e.to_string())?;
744                Ok(Some(node))
745            }
746            None => Ok(None),
747        }
748    }
749
750    fn query_episodes_since(
751        &self,
752        since_timestamp: i64,
753        limit: usize,
754    ) -> Result<Vec<AinlMemoryNode>, String> {
755        let mut stmt = self
756            .conn
757            .prepare(
758                "SELECT payload FROM ainl_graph_nodes
759                 WHERE node_type = 'episode' AND timestamp >= ?1
760                 ORDER BY timestamp DESC
761                 LIMIT ?2",
762            )
763            .map_err(|e| e.to_string())?;
764
765        let rows = stmt
766            .query_map([since_timestamp, limit as i64], |row| {
767                let payload: String = row.get(0)?;
768                Ok(payload)
769            })
770            .map_err(|e| e.to_string())?;
771
772        let mut nodes = Vec::new();
773        for row in rows {
774            let payload = row.map_err(|e| e.to_string())?;
775            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
776            nodes.push(node);
777        }
778
779        Ok(nodes)
780    }
781
782    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String> {
783        let mut stmt = self
784            .conn
785            .prepare(
786                "SELECT payload FROM ainl_graph_nodes
787                 WHERE node_type = ?1
788                 ORDER BY timestamp DESC",
789            )
790            .map_err(|e| e.to_string())?;
791
792        let rows = stmt
793            .query_map([type_name], |row| {
794                let payload: String = row.get(0)?;
795                Ok(payload)
796            })
797            .map_err(|e| e.to_string())?;
798
799        let mut nodes = Vec::new();
800        for row in rows {
801            let payload = row.map_err(|e| e.to_string())?;
802            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
803            nodes.push(node);
804        }
805
806        Ok(nodes)
807    }
808
809    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
810        let mut stmt = self
811            .conn
812            .prepare(
813                "SELECT to_id FROM ainl_graph_edges
814                 WHERE from_id = ?1 AND label = ?2",
815            )
816            .map_err(|e| e.to_string())?;
817
818        let target_ids: Vec<String> = stmt
819            .query_map([from_id.to_string(), label.to_string()], |row| row.get(0))
820            .map_err(|e| e.to_string())?
821            .collect::<Result<Vec<_>, _>>()
822            .map_err(|e| e.to_string())?;
823
824        let mut nodes = Vec::new();
825        for target_id in target_ids {
826            let id = Uuid::parse_str(&target_id).map_err(|e| e.to_string())?;
827            if let Some(node) = self.read_node(id)? {
828                nodes.push(node);
829            }
830        }
831
832        Ok(nodes)
833    }
834}