Skip to main content

ainl_memory/
store.rs

1//! Graph storage backends for AINL memory.
2//!
3//! Defines the [`GraphStore`] trait and the SQLite implementation.
4//!
5//! ## Referential integrity (SQLite)
6//!
7//! `ainl_graph_edges` uses real `FOREIGN KEY (from_id)` / `FOREIGN KEY (to_id)` references to
8//! `ainl_graph_nodes(id)` with `ON DELETE CASCADE`. [`SqliteGraphStore::open`] and
9//! [`SqliteGraphStore::from_connection`] run `PRAGMA foreign_keys = ON` on the handle.
10//!
11//! Databases created before these constraints used a plain edges table; [`SqliteGraphStore::ensure_schema`]
12//! runs a one-time `migrate_edges_add_foreign_keys` rebuild. Edge rows whose endpoints
13//! are missing from `ainl_graph_nodes` **cannot** be kept under FK rules and are **omitted** from
14//! the migrated copy.
15//!
16//! ## Above the database (still recommended)
17//!
18//! - **Eager checks**: [`SqliteGraphStore::write_node_with_edges`], [`SqliteGraphStore::insert_graph_edge_checked`]
19//!   give clear errors without relying on SQLite error text alone.
20//! - **Repair / forensic import**: [`SqliteGraphStore::import_graph`] with `allow_dangling_edges: true`
21//!   is the **supported** way to load snapshots that violate referential integrity: FK enforcement is
22//!   disabled only for the duration of that import, then turned back on. Follow with
23//!   [`SqliteGraphStore::validate_graph`] before resuming normal writes on the same connection.
24//! - **Semantic graph checks**: [`SqliteGraphStore::validate_graph`] (agent-scoped edges, dangling
25//!   diagnostics, cross-agent boundary counts, etc.) — orthogonal to FK row existence.
26//!
27//! SQLite tables integrate with existing openfang-memory schema where applicable.
28
29use crate::node::{AinlMemoryNode, AinlNodeType, MemoryCategory, RuntimeStateNode};
30use crate::snapshot::{
31    AgentGraphSnapshot, DanglingEdgeDetail, GraphValidationReport, SnapshotEdge,
32    SNAPSHOT_SCHEMA_VERSION,
33};
34use crate::trajectory_table::TrajectoryDetailRecord;
35use ainl_contracts::{TrajectoryOutcome, TrajectoryStep};
36use chrono::Utc;
37use rusqlite::OptionalExtension;
38use std::collections::HashSet;
39use uuid::Uuid;
40
41/// Typed failures for snapshot import.
42#[derive(Debug, Clone)]
43pub enum SnapshotImportError {
44    UnsupportedSchemaVersion { got: String, expected: &'static str },
45    Sqlite(String),
46}
47
48impl std::fmt::Display for SnapshotImportError {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            Self::UnsupportedSchemaVersion { got, expected } => write!(
52                f,
53                "unsupported snapshot schema_version '{got}'; expected '{expected}'"
54            ),
55            Self::Sqlite(e) => write!(f, "{e}"),
56        }
57    }
58}
59
60/// Typed failures for graph validation.
61#[derive(Debug, Clone)]
62pub enum GraphValidationError {
63    Sqlite(String),
64}
65
66impl std::fmt::Display for GraphValidationError {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            Self::Sqlite(e) => write!(f, "{e}"),
70        }
71    }
72}
73
74/// Graph memory storage trait - swappable backends
75pub trait GraphStore {
76    /// Write a node to storage
77    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String>;
78
79    /// Read a node by ID
80    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String>;
81
82    /// Query episodes since a given timestamp
83    fn query_episodes_since(
84        &self,
85        since_timestamp: i64,
86        limit: usize,
87    ) -> Result<Vec<AinlMemoryNode>, String>;
88
89    /// Find nodes by type
90    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String>;
91
92    /// Walk edges from a node with a given label
93    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
94
95    /// Walk edges TO a node — reverse traversal by to_id
96    fn walk_edges_to(&self, to_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
97}
98
99/// SQLite implementation of GraphStore
100///
101/// Integrates with existing openfang-memory schema by adding two tables:
102/// - `ainl_graph_nodes`: stores node payloads
103/// - `ainl_graph_edges`: stores graph edges
104pub struct SqliteGraphStore {
105    conn: rusqlite::Connection,
106}
107
108fn enable_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
109    conn.execute_batch("PRAGMA foreign_keys = ON;")
110}
111
112fn migrate_edge_columns(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
113    let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_edges)")?;
114    let cols = stmt
115        .query_map([], |row| row.get::<_, String>(1))?
116        .collect::<Result<Vec<_>, _>>()?;
117    if !cols.iter().any(|c| c == "weight") {
118        conn.execute(
119            "ALTER TABLE ainl_graph_edges ADD COLUMN weight REAL NOT NULL DEFAULT 1.0",
120            [],
121        )?;
122    }
123    if !cols.iter().any(|c| c == "metadata") {
124        conn.execute("ALTER TABLE ainl_graph_edges ADD COLUMN metadata TEXT", [])?;
125    }
126    Ok(())
127}
128
129/// True when `ainl_graph_edges` declares at least one foreign-key reference (new schema).
130fn edges_table_has_foreign_keys(conn: &rusqlite::Connection) -> Result<bool, rusqlite::Error> {
131    let exists: i64 = conn.query_row(
132        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
133        [],
134        |r| r.get(0),
135    )?;
136    if exists == 0 {
137        return Ok(false);
138    }
139    let n: i64 = conn.query_row(
140        "SELECT COUNT(*) FROM pragma_foreign_key_list('ainl_graph_edges')",
141        [],
142        |r| r.get(0),
143    )?;
144    Ok(n > 0)
145}
146
147/// Rebuild `ainl_graph_edges` with `FOREIGN KEY` constraints. Rows whose endpoints are missing
148/// from `ainl_graph_nodes` are **dropped** (they cannot be represented under FK rules).
149fn migrate_edges_add_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
150    if edges_table_has_foreign_keys(conn)? {
151        return Ok(());
152    }
153
154    let exists: i64 = conn.query_row(
155        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
156        [],
157        |r| r.get(0),
158    )?;
159    if exists == 0 {
160        return Ok(());
161    }
162
163    conn.execute("BEGIN IMMEDIATE", [])?;
164    let res: Result<(), rusqlite::Error> = (|| {
165        conn.execute("DROP INDEX IF EXISTS idx_ainl_edges_from", [])?;
166        conn.execute(
167            "ALTER TABLE ainl_graph_edges RENAME TO ainl_graph_edges__old",
168            [],
169        )?;
170        conn.execute(
171            r#"CREATE TABLE ainl_graph_edges (
172                from_id TEXT NOT NULL,
173                to_id TEXT NOT NULL,
174                label TEXT NOT NULL,
175                weight REAL NOT NULL DEFAULT 1.0,
176                metadata TEXT,
177                PRIMARY KEY (from_id, to_id, label),
178                FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
179                FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
180            )"#,
181            [],
182        )?;
183        conn.execute(
184            r#"INSERT INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
185               SELECT o.from_id, o.to_id, o.label,
186                      COALESCE(o.weight, 1.0),
187                      o.metadata
188               FROM ainl_graph_edges__old o
189               WHERE EXISTS (SELECT 1 FROM ainl_graph_nodes n WHERE n.id = o.from_id)
190                 AND EXISTS (SELECT 1 FROM ainl_graph_nodes n2 WHERE n2.id = o.to_id)"#,
191            [],
192        )?;
193        conn.execute("DROP TABLE ainl_graph_edges__old", [])?;
194        conn.execute(
195            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from ON ainl_graph_edges(from_id, label)",
196            [],
197        )?;
198        Ok(())
199    })();
200
201    match res {
202        Ok(()) => {
203            conn.execute("COMMIT", [])?;
204        }
205        Err(e) => {
206            let _ = conn.execute("ROLLBACK", []);
207            return Err(e);
208        }
209    }
210    Ok(())
211}
212
213fn node_type_name(node: &AinlMemoryNode) -> &'static str {
214    match &node.node_type {
215        AinlNodeType::Episode { .. } => "episode",
216        AinlNodeType::Semantic { .. } => "semantic",
217        AinlNodeType::Procedural { .. } => "procedural",
218        AinlNodeType::Persona { .. } => "persona",
219        AinlNodeType::RuntimeState { .. } => "runtime_state",
220        AinlNodeType::Trajectory { .. } => "trajectory",
221        AinlNodeType::Failure { .. } => "failure",
222    }
223}
224
225fn node_timestamp(node: &AinlMemoryNode) -> i64 {
226    match &node.node_type {
227        AinlNodeType::Episode { episodic } => episodic.timestamp,
228        AinlNodeType::RuntimeState { runtime_state } => runtime_state.updated_at,
229        AinlNodeType::Trajectory { trajectory } => trajectory.recorded_at,
230        AinlNodeType::Failure { failure } => failure.recorded_at,
231        _ => chrono::Utc::now().timestamp(),
232    }
233}
234
235fn failure_fts_body(node: &AinlMemoryNode) -> Option<String> {
236    match &node.node_type {
237        AinlNodeType::Failure { failure } => Some(format!(
238            "{} {} {} {} {}",
239            failure.source,
240            failure.tool_name.as_deref().unwrap_or(""),
241            failure.source_namespace.as_deref().unwrap_or(""),
242            failure.source_tool.as_deref().unwrap_or(""),
243            failure.message
244        )),
245        _ => None,
246    }
247}
248
249/// Token-prefix AND query for FTS5 `body MATCH` (returns empty → skip search).
250fn fts5_prefix_match_query(raw: &str) -> String {
251    raw.split_whitespace()
252        .filter(|t| !t.is_empty())
253        .filter_map(|t| {
254            let esc: String = t.chars().filter(|c| !c.is_control() && *c != '"').collect();
255            if esc.is_empty() {
256                return None;
257            }
258            Some(format!("\"{esc}*\""))
259        })
260        .collect::<Vec<_>>()
261        .join(" AND ")
262}
263
264fn sync_failure_fts_insert(
265    conn: &rusqlite::Connection,
266    node_id: &str,
267    body: &str,
268) -> Result<(), String> {
269    conn.execute(
270        "DELETE FROM ainl_failures_fts WHERE node_id = ?1",
271        [node_id],
272    )
273    .map_err(|e| e.to_string())?;
274    conn.execute(
275        "INSERT INTO ainl_failures_fts(node_id, body) VALUES (?1, ?2)",
276        rusqlite::params![node_id, body],
277    )
278    .map_err(|e| e.to_string())?;
279    Ok(())
280}
281
282/// Full JSON payload (all node kinds) for `ainl_nodes_fts` — generic graph search.
283fn graph_node_fts_body_from_payload_json(payload: &str) -> String {
284    if payload.chars().count() > 400_000 {
285        payload.chars().take(400_000).collect()
286    } else {
287        payload.to_string()
288    }
289}
290
291fn sync_all_nodes_fts_insert(
292    conn: &rusqlite::Connection,
293    node_id: &str,
294    agent_id: &str,
295    project_id: Option<&str>,
296    body: &str,
297) -> Result<(), String> {
298    let proj = project_id.map(str::trim).filter(|s| !s.is_empty());
299    conn.execute("DELETE FROM ainl_nodes_fts WHERE node_id = ?1", [node_id])
300        .map_err(|e| e.to_string())?;
301    conn.execute(
302        "INSERT INTO ainl_nodes_fts(node_id, agent_id, project_id, body) VALUES (?1, ?2, ?3, ?4)",
303        rusqlite::params![node_id, agent_id, proj, body],
304    )
305    .map_err(|e| e.to_string())?;
306    Ok(())
307}
308
309fn persist_edge(
310    conn: &rusqlite::Connection,
311    from_id: Uuid,
312    to_id: Uuid,
313    label: &str,
314    weight: f32,
315    metadata: Option<&str>,
316) -> Result<(), String> {
317    conn.execute(
318        "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
319         VALUES (?1, ?2, ?3, ?4, ?5)",
320        rusqlite::params![
321            from_id.to_string(),
322            to_id.to_string(),
323            label,
324            weight,
325            metadata
326        ],
327    )
328    .map_err(|e| e.to_string())?;
329    Ok(())
330}
331
332/// All `ainl_graph_edges` rows whose endpoints are both present in `id_set`, as [`SnapshotEdge`] values.
333fn collect_snapshot_edges_for_id_set(
334    conn: &rusqlite::Connection,
335    id_set: &HashSet<String>,
336) -> Result<Vec<SnapshotEdge>, String> {
337    let mut edge_stmt = conn
338        .prepare("SELECT from_id, to_id, label, weight, metadata FROM ainl_graph_edges")
339        .map_err(|e| e.to_string())?;
340    let edge_rows = edge_stmt
341        .query_map([], |row| {
342            Ok((
343                row.get::<_, String>(0)?,
344                row.get::<_, String>(1)?,
345                row.get::<_, String>(2)?,
346                row.get::<_, f64>(3)?,
347                row.get::<_, Option<String>>(4)?,
348            ))
349        })
350        .map_err(|e| e.to_string())?
351        .collect::<Result<Vec<_>, _>>()
352        .map_err(|e| e.to_string())?;
353
354    let mut edges = Vec::new();
355    for (from_id, to_id, label, weight, meta) in edge_rows {
356        if !id_set.contains(&from_id) || !id_set.contains(&to_id) {
357            continue;
358        }
359        let source_id = Uuid::parse_str(&from_id).map_err(|e| e.to_string())?;
360        let target_id = Uuid::parse_str(&to_id).map_err(|e| e.to_string())?;
361        let metadata = match meta {
362            Some(s) if !s.is_empty() => Some(serde_json::from_str(&s).map_err(|e| e.to_string())?),
363            _ => None,
364        };
365        edges.push(SnapshotEdge {
366            source_id,
367            target_id,
368            edge_type: label,
369            weight: weight as f32,
370            metadata,
371        });
372    }
373    Ok(edges)
374}
375
376fn persist_node(conn: &rusqlite::Connection, node: &AinlMemoryNode) -> Result<(), String> {
377    let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
378    let type_name = node_type_name(node);
379    let timestamp = node_timestamp(node);
380    let proj = node
381        .project_id
382        .as_deref()
383        .map(str::trim)
384        .filter(|s| !s.is_empty());
385
386    conn.execute(
387        "INSERT OR REPLACE INTO ainl_graph_nodes (id, node_type, payload, timestamp, project_id)
388         VALUES (?1, ?2, ?3, ?4, ?5)",
389        rusqlite::params![node.id.to_string(), type_name, payload, timestamp, proj,],
390    )
391    .map_err(|e| e.to_string())?;
392
393    for edge in &node.edges {
394        persist_edge(
395            conn,
396            node.id,
397            edge.target_id,
398            &edge.label,
399            1.0,
400            None::<&str>,
401        )?;
402    }
403
404    let body_all = graph_node_fts_body_from_payload_json(&payload);
405    if !node.agent_id.trim().is_empty() {
406        let _ = sync_all_nodes_fts_insert(
407            conn,
408            &node.id.to_string(),
409            node.agent_id.as_str(),
410            proj,
411            &body_all,
412        );
413    }
414
415    if let Some(body) = failure_fts_body(node) {
416        // Best-effort: graph row is authoritative; FTS is auxiliary for search.
417        let _ = sync_failure_fts_insert(conn, &node.id.to_string(), &body);
418    }
419
420    Ok(())
421}
422
423fn try_insert_node_ignore(
424    conn: &rusqlite::Connection,
425    node: &AinlMemoryNode,
426) -> Result<(), String> {
427    let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
428    let type_name = node_type_name(node);
429    let timestamp = node_timestamp(node);
430    let proj = node
431        .project_id
432        .as_deref()
433        .map(str::trim)
434        .filter(|s| !s.is_empty());
435    let n = conn
436        .execute(
437            "INSERT OR IGNORE INTO ainl_graph_nodes (id, node_type, payload, timestamp, project_id)
438         VALUES (?1, ?2, ?3, ?4, ?5)",
439            rusqlite::params![node.id.to_string(), type_name, payload, timestamp, proj,],
440        )
441        .map_err(|e| e.to_string())?;
442    if n > 0 {
443        if !node.agent_id.trim().is_empty() {
444            let body_all = graph_node_fts_body_from_payload_json(&payload);
445            let _ = sync_all_nodes_fts_insert(
446                conn,
447                &node.id.to_string(),
448                node.agent_id.as_str(),
449                proj,
450                &body_all,
451            );
452        }
453        if let Some(body) = failure_fts_body(node) {
454            let _ = sync_failure_fts_insert(conn, &node.id.to_string(), &body);
455        }
456    }
457    Ok(())
458}
459
460fn try_insert_edge_ignore(conn: &rusqlite::Connection, edge: &SnapshotEdge) -> Result<(), String> {
461    let meta = match &edge.metadata {
462        Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
463        None => None,
464    };
465    conn.execute(
466        "INSERT OR IGNORE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
467         VALUES (?1, ?2, ?3, ?4, ?5)",
468        rusqlite::params![
469            edge.source_id.to_string(),
470            edge.target_id.to_string(),
471            edge.edge_type,
472            edge.weight,
473            meta.as_deref(),
474        ],
475    )
476    .map_err(|e| e.to_string())?;
477    Ok(())
478}
479
480fn migrate_failures_fts_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
481    conn.execute(
482        "CREATE VIRTUAL TABLE IF NOT EXISTS ainl_failures_fts USING fts5(
483            node_id UNINDEXED,
484            body,
485            tokenize = 'unicode61 remove_diacritics 1'
486        )",
487        [],
488    )?;
489    Ok(())
490}
491
492fn migrate_ainl_graph_nodes_add_project_id(
493    conn: &rusqlite::Connection,
494) -> Result<(), rusqlite::Error> {
495    let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_nodes)")?;
496    let cols = stmt
497        .query_map([], |row| row.get::<_, String>(1))?
498        .collect::<Result<Vec<_>, _>>()?;
499    if !cols.iter().any(|c| c == "project_id") {
500        conn.execute(
501            "ALTER TABLE ainl_graph_nodes ADD COLUMN project_id TEXT",
502            [],
503        )?;
504    }
505    Ok(())
506}
507
508fn migrate_ainl_nodes_fts_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
509    conn.execute(
510        "CREATE VIRTUAL TABLE IF NOT EXISTS ainl_nodes_fts USING fts5(
511            node_id UNINDEXED,
512            agent_id UNINDEXED,
513            project_id UNINDEXED,
514            body,
515            tokenize = 'unicode61 remove_diacritics 1'
516        )",
517        [],
518    )?;
519    Ok(())
520}
521
522/// Remove FTS shadow rows when the primary graph row is deleted (`DELETE` from `ainl_graph_nodes`).
523fn install_ainl_graph_node_delete_fts_triggers(
524    conn: &rusqlite::Connection,
525) -> Result<(), rusqlite::Error> {
526    conn.execute_batch(
527        "DROP TRIGGER IF EXISTS ainl_graph_nodes_after_delete_fts;
528         CREATE TRIGGER ainl_graph_nodes_after_delete_fts
529         AFTER DELETE ON ainl_graph_nodes
530         FOR EACH ROW
531         BEGIN
532           DELETE FROM ainl_nodes_fts WHERE node_id = OLD.id;
533           DELETE FROM ainl_failures_fts WHERE node_id = OLD.id;
534         END;",
535    )?;
536    Ok(())
537}
538
539/// One-time: populate `ainl_nodes_fts` from existing `ainl_graph_nodes` (legacy DBs).
540fn backfill_ainl_nodes_fts_if_empty(conn: &rusqlite::Connection) -> Result<(), String> {
541    let fts_n: i64 = conn
542        .query_row("SELECT COUNT(*) FROM ainl_nodes_fts", [], |r| r.get(0))
543        .unwrap_or(0);
544    if fts_n > 0 {
545        return Ok(());
546    }
547    let mut stmt = conn
548        .prepare(
549            "SELECT id, payload, project_id FROM ainl_graph_nodes
550             WHERE TRIM(COALESCE(json_extract(payload, '$.agent_id'), '')) != ''",
551        )
552        .map_err(|e| e.to_string())?;
553    let rows = stmt
554        .query_map([], |row| {
555            Ok((
556                row.get::<_, String>(0)?,
557                row.get::<_, String>(1)?,
558                row.get::<_, Option<String>>(2)?,
559            ))
560        })
561        .map_err(|e| e.to_string())?
562        .collect::<Result<Vec<_>, _>>()
563        .map_err(|e| e.to_string())?;
564    for (id, payload, col_proj) in rows {
565        let v: serde_json::Value = match serde_json::from_str(&payload) {
566            Ok(x) => x,
567            Err(_) => continue,
568        };
569        let ag = v
570            .get("agent_id")
571            .and_then(|x| x.as_str())
572            .map(str::trim)
573            .unwrap_or("");
574        if ag.is_empty() {
575            continue;
576        }
577        let json_proj = v
578            .get("project_id")
579            .and_then(|x| x.as_str())
580            .map(str::trim)
581            .filter(|s| !s.is_empty());
582        let proj = col_proj
583            .as_deref()
584            .map(str::trim)
585            .filter(|s| !s.is_empty())
586            .or(json_proj);
587        let body = graph_node_fts_body_from_payload_json(&payload);
588        if sync_all_nodes_fts_insert(conn, &id, ag, proj, &body).is_err() {
589            // Best-effort backfill: continue with remaining rows.
590        }
591    }
592    Ok(())
593}
594
595fn migrate_trajectories_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
596    conn.execute(
597        "CREATE TABLE IF NOT EXISTS ainl_trajectories (
598            id TEXT PRIMARY KEY,
599            episode_id TEXT NOT NULL,
600            graph_trajectory_node_id TEXT,
601            agent_id TEXT NOT NULL,
602            session_id TEXT NOT NULL,
603            project_id TEXT,
604            recorded_at INTEGER NOT NULL,
605            outcome_json TEXT NOT NULL,
606            ainl_source_hash TEXT,
607            duration_ms INTEGER NOT NULL DEFAULT 0,
608            steps_json TEXT NOT NULL,
609            FOREIGN KEY (episode_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
610        )",
611        [],
612    )?;
613    conn.execute(
614        "CREATE INDEX IF NOT EXISTS idx_ainl_traj_agent_time
615         ON ainl_trajectories(agent_id, recorded_at DESC)",
616        [],
617    )?;
618    Ok(())
619}
620
621fn migrate_ainl_trajectories_add_depth_v1(
622    conn: &rusqlite::Connection,
623) -> Result<(), rusqlite::Error> {
624    let mut stmt = conn.prepare("PRAGMA table_info(ainl_trajectories)")?;
625    let cols: Vec<String> = stmt
626        .query_map([], |row| row.get::<_, String>(1))?
627        .collect::<Result<Vec<_>, _>>()?;
628    if !cols.iter().any(|c| c == "frame_vars_json") {
629        conn.execute(
630            "ALTER TABLE ainl_trajectories ADD COLUMN frame_vars_json TEXT",
631            [],
632        )?;
633    }
634    if !cols.iter().any(|c| c == "fitness_delta") {
635        conn.execute(
636            "ALTER TABLE ainl_trajectories ADD COLUMN fitness_delta REAL",
637            [],
638        )?;
639    }
640    Ok(())
641}
642
643impl SqliteGraphStore {
644    /// Ensure the AINL graph schema exists in the database
645    pub fn ensure_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
646        conn.execute(
647            "CREATE TABLE IF NOT EXISTS ainl_graph_nodes (
648                id TEXT PRIMARY KEY,
649                node_type TEXT NOT NULL,
650                payload TEXT NOT NULL,
651                timestamp INTEGER NOT NULL
652            )",
653            [],
654        )?;
655
656        conn.execute(
657            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_timestamp
658             ON ainl_graph_nodes(timestamp DESC)",
659            [],
660        )?;
661
662        conn.execute(
663            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_type
664             ON ainl_graph_nodes(node_type)",
665            [],
666        )?;
667
668        migrate_ainl_graph_nodes_add_project_id(conn)?;
669        conn.execute(
670            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_project_type_time
671             ON ainl_graph_nodes(project_id, node_type, timestamp)",
672            [],
673        )?;
674
675        conn.execute(
676            "CREATE TABLE IF NOT EXISTS ainl_graph_edges (
677                from_id TEXT NOT NULL,
678                to_id TEXT NOT NULL,
679                label TEXT NOT NULL,
680                weight REAL NOT NULL DEFAULT 1.0,
681                metadata TEXT,
682                PRIMARY KEY (from_id, to_id, label),
683                FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
684                FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
685            )",
686            [],
687        )?;
688
689        conn.execute(
690            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from
691             ON ainl_graph_edges(from_id, label)",
692            [],
693        )?;
694
695        conn.execute(
696            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_to
697             ON ainl_graph_edges(to_id, label)",
698            [],
699        )?;
700
701        migrate_edge_columns(conn)?;
702        migrate_edges_add_foreign_keys(conn)?;
703        migrate_trajectories_v1(conn)?;
704        migrate_ainl_trajectories_add_depth_v1(conn)?;
705        migrate_failures_fts_v1(conn)?;
706        migrate_ainl_nodes_fts_v1(conn)?;
707        if backfill_ainl_nodes_fts_if_empty(conn).is_err() {
708            // Non-fatal: new DBs may have empty graph; legacy rows can be re-synced on next write.
709        }
710        let _ = install_ainl_graph_node_delete_fts_triggers(conn);
711        Ok(())
712    }
713
714    /// Open/create a graph store at the given path
715    pub fn open(path: &std::path::Path) -> Result<Self, String> {
716        let conn = rusqlite::Connection::open(path).map_err(|e| e.to_string())?;
717        enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
718        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
719        Ok(Self { conn })
720    }
721
722    /// Create from an existing connection (for integration with openfang-memory pool)
723    pub fn from_connection(conn: rusqlite::Connection) -> Result<Self, String> {
724        enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
725        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
726        Ok(Self { conn })
727    }
728
729    /// Low-level access for query builders in this crate.
730    pub(crate) fn conn(&self) -> &rusqlite::Connection {
731        &self.conn
732    }
733
734    /// Insert a directed edge between two node IDs (separate from per-node edge payloads).
735    pub fn insert_graph_edge(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
736        persist_edge(&self.conn, from_id, to_id, label, 1.0, None)
737    }
738
739    /// Like [`Self::insert_graph_edge`], but verifies both endpoints exist first (clear errors for strict runtime wiring).
740    pub fn insert_graph_edge_checked(
741        &self,
742        from_id: Uuid,
743        to_id: Uuid,
744        label: &str,
745    ) -> Result<(), String> {
746        if !self.node_row_exists(&from_id.to_string())? {
747            return Err(format!(
748                "insert_graph_edge_checked: missing source node row {}",
749                from_id
750            ));
751        }
752        if !self.node_row_exists(&to_id.to_string())? {
753            return Err(format!(
754                "insert_graph_edge_checked: missing target node row {}",
755                to_id
756            ));
757        }
758        self.insert_graph_edge(from_id, to_id, label)
759    }
760
761    /// Same as [`Self::insert_graph_edge`], with optional edge weight and JSON metadata.
762    pub fn insert_graph_edge_with_meta(
763        &self,
764        from_id: Uuid,
765        to_id: Uuid,
766        label: &str,
767        weight: f32,
768        metadata: Option<&serde_json::Value>,
769    ) -> Result<(), String> {
770        let meta = metadata
771            .map(serde_json::to_string)
772            .transpose()
773            .map_err(|e| e.to_string())?;
774        persist_edge(&self.conn, from_id, to_id, label, weight, meta.as_deref())
775    }
776
777    /// Nodes of a given `node_type` with `timestamp >= since_timestamp`, most recent first.
778    pub fn query_nodes_by_type_since(
779        &self,
780        node_type: &str,
781        since_timestamp: i64,
782        limit: usize,
783    ) -> Result<Vec<AinlMemoryNode>, String> {
784        let mut stmt = self
785            .conn
786            .prepare(
787                "SELECT payload FROM ainl_graph_nodes
788                 WHERE node_type = ?1 AND timestamp >= ?2
789                 ORDER BY timestamp DESC
790                 LIMIT ?3",
791            )
792            .map_err(|e| e.to_string())?;
793
794        let rows = stmt
795            .query_map(
796                rusqlite::params![node_type, since_timestamp, limit as i64],
797                |row| {
798                    let payload: String = row.get(0)?;
799                    Ok(payload)
800                },
801            )
802            .map_err(|e| e.to_string())?;
803
804        let mut nodes = Vec::new();
805        for row in rows {
806            let payload = row.map_err(|e| e.to_string())?;
807            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
808            nodes.push(node);
809        }
810
811        Ok(nodes)
812    }
813
814    /// Read the most recent persisted [`RuntimeStateNode`] for `agent_id`, if any.
815    ///
816    /// Rows are stored with `node_type = 'runtime_state'` and JSON `$.node_type.runtime_state.agent_id` matching the agent.
817    pub fn read_runtime_state(&self, agent_id: &str) -> Result<Option<RuntimeStateNode>, String> {
818        if agent_id.is_empty() {
819            return Ok(None);
820        }
821        let mut stmt = self
822            .conn
823            .prepare(
824                "SELECT payload FROM ainl_graph_nodes
825                 WHERE node_type = 'runtime_state'
826                   AND json_extract(payload, '$.node_type.runtime_state.agent_id') = ?1
827                 ORDER BY timestamp DESC
828                 LIMIT 1",
829            )
830            .map_err(|e| e.to_string())?;
831
832        let payload_opt: Option<String> = stmt
833            .query_row([agent_id], |row| row.get(0))
834            .optional()
835            .map_err(|e| e.to_string())?;
836
837        let Some(payload) = payload_opt else {
838            return Ok(None);
839        };
840
841        let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
842        match node.node_type {
843            AinlNodeType::RuntimeState { runtime_state } => Ok(Some(runtime_state)),
844            _ => Err("runtime_state row had unexpected node_type payload".to_string()),
845        }
846    }
847
848    /// Upsert one [`RuntimeStateNode`] row per agent (stable id via [`Uuid::new_v5`]).
849    pub fn write_runtime_state(&self, state: &RuntimeStateNode) -> Result<(), String> {
850        let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, state.agent_id.as_bytes());
851        let node = AinlMemoryNode {
852            id,
853            memory_category: MemoryCategory::RuntimeState,
854            importance_score: 0.5,
855            agent_id: state.agent_id.clone(),
856            project_id: None,
857            node_type: AinlNodeType::RuntimeState {
858                runtime_state: state.clone(),
859            },
860            edges: Vec::new(),
861            plugin_data: None,
862        };
863        self.write_node(&node)
864    }
865
866    /// Write a node and its embedded edges in one transaction; fails if any edge target is missing.
867    pub fn write_node_with_edges(&mut self, node: &AinlMemoryNode) -> Result<(), String> {
868        let tx = self.conn.transaction().map_err(|e| e.to_string())?;
869        for edge in &node.edges {
870            let exists: Option<i32> = tx
871                .query_row(
872                    "SELECT 1 FROM ainl_graph_nodes WHERE id = ?1",
873                    [edge.target_id.to_string()],
874                    |_| Ok(1),
875                )
876                .optional()
877                .map_err(|e| e.to_string())?;
878            if exists.is_none() {
879                return Err(format!(
880                    "write_node_with_edges: missing target node {}",
881                    edge.target_id
882                ));
883            }
884        }
885        persist_node(&tx, node)?;
886        tx.commit().map_err(|e| e.to_string())?;
887        Ok(())
888    }
889
890    /// Validate structural integrity for one agent's induced subgraph.
891    pub fn validate_graph(&self, agent_id: &str) -> Result<GraphValidationReport, String> {
892        self.validate_graph_checked(agent_id)
893            .map_err(|e| e.to_string())
894    }
895
896    /// Typed validation variant for callers that need structured error handling.
897    pub fn validate_graph_checked(
898        &self,
899        agent_id: &str,
900    ) -> Result<GraphValidationReport, GraphValidationError> {
901        use std::collections::HashSet;
902
903        let agent_nodes = self
904            .agent_node_ids(agent_id)
905            .map_err(GraphValidationError::Sqlite)?;
906        let node_count = agent_nodes.len();
907
908        let mut stmt = self
909            .conn
910            .prepare("SELECT from_id, to_id, label FROM ainl_graph_edges")
911            .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?;
912        let all_edges: Vec<(String, String, String)> = stmt
913            .query_map([], |row| {
914                Ok((
915                    row.get::<_, String>(0)?,
916                    row.get::<_, String>(1)?,
917                    row.get::<_, String>(2)?,
918                ))
919            })
920            .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?
921            .collect::<Result<Vec<_>, _>>()
922            .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?;
923
924        let mut edge_pairs = Vec::new();
925        for (from_id, to_id, label) in all_edges {
926            let touches_agent = agent_nodes.contains(&from_id) || agent_nodes.contains(&to_id);
927            if touches_agent {
928                edge_pairs.push((from_id, to_id, label));
929            }
930        }
931
932        let edge_count = edge_pairs.len();
933        let mut dangling_edges = Vec::new();
934        let mut dangling_edge_details = Vec::new();
935        let mut cross_agent_boundary_edges = 0usize;
936
937        for (from_id, to_id, label) in &edge_pairs {
938            let from_ok = self
939                .node_row_exists(from_id)
940                .map_err(GraphValidationError::Sqlite)?;
941            let to_ok = self
942                .node_row_exists(to_id)
943                .map_err(GraphValidationError::Sqlite)?;
944            if !from_ok || !to_ok {
945                dangling_edges.push((from_id.clone(), to_id.clone()));
946                dangling_edge_details.push(DanglingEdgeDetail {
947                    source_id: from_id.clone(),
948                    target_id: to_id.clone(),
949                    edge_type: label.clone(),
950                });
951                continue;
952            }
953            let fa = agent_nodes.contains(from_id);
954            let ta = agent_nodes.contains(to_id);
955            if fa ^ ta {
956                cross_agent_boundary_edges += 1;
957            }
958        }
959
960        let mut touched: HashSet<String> =
961            HashSet::with_capacity(edge_pairs.len().saturating_mul(2));
962        for (a, b, _) in &edge_pairs {
963            if agent_nodes.contains(a) {
964                touched.insert(a.clone());
965            }
966            if agent_nodes.contains(b) {
967                touched.insert(b.clone());
968            }
969        }
970
971        let mut orphan_nodes = Vec::new();
972        for id in &agent_nodes {
973            if !touched.contains(id) {
974                orphan_nodes.push(id.clone());
975            }
976        }
977
978        let is_valid = dangling_edges.is_empty();
979        Ok(GraphValidationReport {
980            agent_id: agent_id.to_string(),
981            node_count,
982            edge_count,
983            dangling_edges,
984            dangling_edge_details,
985            cross_agent_boundary_edges,
986            orphan_nodes,
987            is_valid,
988        })
989    }
990
991    fn node_row_exists(&self, id: &str) -> Result<bool, String> {
992        let v: Option<i32> = self
993            .conn
994            .query_row("SELECT 1 FROM ainl_graph_nodes WHERE id = ?1", [id], |_| {
995                Ok(1)
996            })
997            .optional()
998            .map_err(|e| e.to_string())?;
999        Ok(v.is_some())
1000    }
1001
1002    fn agent_node_ids(&self, agent_id: &str) -> Result<HashSet<String>, String> {
1003        let mut stmt = self
1004            .conn
1005            .prepare(
1006                "SELECT id FROM ainl_graph_nodes
1007                 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
1008            )
1009            .map_err(|e| e.to_string())?;
1010        let ids = stmt
1011            .query_map([agent_id], |row| row.get::<_, String>(0))
1012            .map_err(|e| e.to_string())?
1013            .collect::<Result<HashSet<_>, _>>()
1014            .map_err(|e| e.to_string())?;
1015        Ok(ids)
1016    }
1017
1018    /// Directed edges where **both** endpoints are nodes owned by `agent_id` (aligned with [`Self::export_graph`] edge set).
1019    pub fn agent_subgraph_edges(&self, agent_id: &str) -> Result<Vec<SnapshotEdge>, String> {
1020        let id_set = self.agent_node_ids(agent_id)?;
1021        collect_snapshot_edges_for_id_set(&self.conn, &id_set)
1022    }
1023
1024    /// Export all nodes and interconnecting edges for `agent_id`.
1025    pub fn export_graph(&self, agent_id: &str) -> Result<AgentGraphSnapshot, String> {
1026        let mut stmt = self
1027            .conn
1028            .prepare(
1029                "SELECT payload FROM ainl_graph_nodes
1030                 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
1031            )
1032            .map_err(|e| e.to_string())?;
1033        let nodes: Vec<AinlMemoryNode> = stmt
1034            .query_map([agent_id], |row| {
1035                let payload: String = row.get(0)?;
1036                Ok(payload)
1037            })
1038            .map_err(|e| e.to_string())?
1039            .map(|r| {
1040                let payload = r.map_err(|e| e.to_string())?;
1041                serde_json::from_str(&payload).map_err(|e| e.to_string())
1042            })
1043            .collect::<Result<Vec<_>, _>>()?;
1044
1045        let id_set: std::collections::HashSet<String> =
1046            nodes.iter().map(|n| n.id.to_string()).collect();
1047
1048        let edges = collect_snapshot_edges_for_id_set(&self.conn, &id_set)?;
1049
1050        Ok(AgentGraphSnapshot {
1051            agent_id: agent_id.to_string(),
1052            exported_at: Utc::now(),
1053            schema_version: std::borrow::Cow::Borrowed(SNAPSHOT_SCHEMA_VERSION),
1054            nodes,
1055            edges,
1056        })
1057    }
1058
1059    /// Import a snapshot in one transaction (`INSERT OR IGNORE` per row).
1060    ///
1061    /// * `allow_dangling_edges == false` (**default / production**): `PRAGMA foreign_keys` stays
1062    ///   enabled; every edge must reference existing node rows after inserts (same invariants as
1063    ///   [`Self::write_node_with_edges`]).
1064    /// * `allow_dangling_edges == true` (**repair / forensic**): FK checks are disabled only for
1065    ///   this import so partially invalid snapshots can be loaded; run [`Self::validate_graph`]
1066    ///   afterward and repair before returning to normal writes.
1067    pub fn import_graph(
1068        &mut self,
1069        snapshot: &AgentGraphSnapshot,
1070        allow_dangling_edges: bool,
1071    ) -> Result<(), String> {
1072        self.import_graph_checked(snapshot, allow_dangling_edges)
1073            .map_err(|e| e.to_string())
1074    }
1075
1076    /// Typed import variant for callers that want structured error handling.
1077    pub fn import_graph_checked(
1078        &mut self,
1079        snapshot: &AgentGraphSnapshot,
1080        allow_dangling_edges: bool,
1081    ) -> Result<(), SnapshotImportError> {
1082        if snapshot.schema_version.as_ref() != SNAPSHOT_SCHEMA_VERSION {
1083            return Err(SnapshotImportError::UnsupportedSchemaVersion {
1084                got: snapshot.schema_version.to_string(),
1085                expected: SNAPSHOT_SCHEMA_VERSION,
1086            });
1087        }
1088
1089        if allow_dangling_edges {
1090            self.conn
1091                .execute_batch("PRAGMA foreign_keys = OFF;")
1092                .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1093        }
1094
1095        let result: Result<(), SnapshotImportError> = (|| {
1096            let tx = self
1097                .conn
1098                .transaction()
1099                .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1100            for node in &snapshot.nodes {
1101                try_insert_node_ignore(&tx, node).map_err(SnapshotImportError::Sqlite)?;
1102            }
1103            for edge in &snapshot.edges {
1104                try_insert_edge_ignore(&tx, edge).map_err(SnapshotImportError::Sqlite)?;
1105            }
1106            tx.commit()
1107                .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1108            Ok(())
1109        })();
1110
1111        if allow_dangling_edges {
1112            self.conn
1113                .execute_batch("PRAGMA foreign_keys = ON;")
1114                .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1115        }
1116
1117        result
1118    }
1119
1120    /// Large-step trajectory row (sibling table); episode row must exist first.
1121    pub fn insert_trajectory_detail(&self, row: &TrajectoryDetailRecord) -> Result<(), String> {
1122        let steps_json = serde_json::to_string(&row.steps).map_err(|e| e.to_string())?;
1123        let outcome_json = serde_json::to_string(&row.outcome).map_err(|e| e.to_string())?;
1124        let frame_s = match &row.frame_vars {
1125            None => None,
1126            Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
1127        };
1128        self.conn
1129            .execute(
1130                "INSERT OR REPLACE INTO ainl_trajectories (
1131                    id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1132                    recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1133                    frame_vars_json, fitness_delta
1134                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1135                rusqlite::params![
1136                    row.id.to_string(),
1137                    row.episode_id.to_string(),
1138                    row.graph_trajectory_node_id.map(|u| u.to_string()),
1139                    row.agent_id,
1140                    row.session_id,
1141                    row.project_id,
1142                    row.recorded_at,
1143                    outcome_json,
1144                    row.ainl_source_hash,
1145                    row.duration_ms as i64,
1146                    steps_json,
1147                    frame_s,
1148                    row.fitness_delta,
1149                ],
1150            )
1151            .map_err(|e| e.to_string())?;
1152        Ok(())
1153    }
1154
1155    /// Recent trajectory detail rows for an agent (newest first).
1156    pub fn list_trajectories_for_agent(
1157        &self,
1158        agent_id: &str,
1159        limit: usize,
1160        since_timestamp: Option<i64>,
1161    ) -> Result<Vec<TrajectoryDetailRecord>, String> {
1162        let cap = limit.clamp(1, 500) as i64;
1163        let sql = if since_timestamp.is_some() {
1164            "SELECT id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1165                    recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1166                    frame_vars_json, fitness_delta
1167             FROM ainl_trajectories
1168             WHERE agent_id = ?1 AND recorded_at >= ?2
1169             ORDER BY recorded_at DESC
1170             LIMIT ?3"
1171        } else {
1172            "SELECT id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1173                    recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1174                    frame_vars_json, fitness_delta
1175             FROM ainl_trajectories
1176             WHERE agent_id = ?1
1177             ORDER BY recorded_at DESC
1178             LIMIT ?2"
1179        };
1180
1181        let mut stmt = self.conn.prepare(sql).map_err(|e| e.to_string())?;
1182        let rows = if let Some(since) = since_timestamp {
1183            stmt.query_map(rusqlite::params![agent_id, since, cap], map_trajectory_row)
1184        } else {
1185            stmt.query_map(rusqlite::params![agent_id, cap], map_trajectory_row)
1186        }
1187        .map_err(|e| e.to_string())?;
1188
1189        let mut out = Vec::new();
1190        for row in rows {
1191            out.push(row.map_err(|e| e.to_string())?);
1192        }
1193        Ok(out)
1194    }
1195
1196    /// Count `ainl_trajectories` rows for `agent_id` with `recorded_at` **strictly before** `before_unix` (seconds).
1197    pub fn count_trajectory_details_before(
1198        &self,
1199        agent_id: &str,
1200        before_unix: i64,
1201    ) -> Result<usize, String> {
1202        if agent_id.trim().is_empty() {
1203            return Err("agent_id is empty".into());
1204        }
1205        let n: i64 = self
1206            .conn
1207            .query_row(
1208                "SELECT COUNT(*) FROM ainl_trajectories WHERE agent_id = ?1 AND recorded_at < ?2",
1209                rusqlite::params![agent_id, before_unix],
1210                |r| r.get(0),
1211            )
1212            .map_err(|e| e.to_string())?;
1213        Ok(n as usize)
1214    }
1215
1216    /// Delete `ainl_trajectories` detail rows for `agent_id` with `recorded_at` **strictly before**
1217    /// `before_unix` (seconds). Returns the number of rows removed.
1218    ///
1219    /// This does **not** delete `Trajectory` nodes from `ainl_graph_nodes` or any edges; use graph
1220    /// export / repair paths if you need a fully consistent graph after bulk pruning.
1221    pub fn delete_trajectory_details_before(
1222        &self,
1223        agent_id: &str,
1224        before_unix: i64,
1225    ) -> Result<usize, String> {
1226        if agent_id.trim().is_empty() {
1227            return Err("agent_id is empty".into());
1228        }
1229        let n = self
1230            .conn
1231            .execute(
1232                "DELETE FROM ainl_trajectories WHERE agent_id = ?1 AND recorded_at < ?2",
1233                rusqlite::params![agent_id, before_unix],
1234            )
1235            .map_err(|e| e.to_string())?;
1236        Ok(n)
1237    }
1238
1239    /// Full-text search over persisted failure nodes for one agent (`node_type = failure`).
1240    ///
1241    /// Returns matching [`AinlMemoryNode`] rows (newest first). Invalid FTS syntax yields an empty list.
1242    pub fn search_failures_fts_for_agent(
1243        &self,
1244        agent_id: &str,
1245        query: &str,
1246        limit: usize,
1247    ) -> Result<Vec<AinlMemoryNode>, String> {
1248        let fts_q = fts5_prefix_match_query(query);
1249        if fts_q.is_empty() || agent_id.trim().is_empty() {
1250            return Ok(Vec::new());
1251        }
1252        let cap = limit.clamp(1, 200) as i64;
1253        let mut stmt = self
1254            .conn
1255            .prepare(
1256                "SELECT n.payload
1257                 FROM ainl_failures_fts AS f
1258                 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1259                 WHERE n.node_type = 'failure'
1260                   AND json_extract(n.payload, '$.agent_id') = ?1
1261                   AND f.body MATCH ?2
1262                 ORDER BY n.timestamp DESC
1263                 LIMIT ?3",
1264            )
1265            .map_err(|e| e.to_string())?;
1266
1267        let rows = stmt.query_map(rusqlite::params![agent_id, fts_q, cap], |row| {
1268            let payload: String = row.get(0)?;
1269            Ok(payload)
1270        });
1271
1272        let mut out = Vec::new();
1273        let rows = match rows {
1274            Ok(r) => r,
1275            Err(e) => {
1276                let msg = e.to_string();
1277                if msg.contains("fts5") || msg.to_ascii_lowercase().contains("syntax") {
1278                    return Ok(Vec::new());
1279                }
1280                return Err(msg);
1281            }
1282        };
1283        for row in rows {
1284            match row {
1285                Ok(payload) => {
1286                    if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1287                        out.push(node);
1288                    }
1289                }
1290                Err(e) => {
1291                    let msg = e.to_string();
1292                    if msg.contains("fts5") || msg.to_ascii_lowercase().contains("syntax") {
1293                        return Ok(Vec::new());
1294                    }
1295                    return Err(msg);
1296                }
1297            }
1298        }
1299        Ok(out)
1300    }
1301
1302    /// Full-text search over all graph node JSON payloads (see `ainl_nodes_fts`), scoped by agent.
1303    ///
1304    /// `project_id`: when `Some` (non-empty), return matches whose stored project is empty/NULL
1305    /// **or** equal to that id (so legacy unscoped rows remain visible in a project workspace).
1306    /// When `None`, do not filter by project.
1307    pub fn search_all_nodes_fts_for_agent(
1308        &self,
1309        agent_id: &str,
1310        query: &str,
1311        project_id: Option<&str>,
1312        limit: usize,
1313    ) -> Result<Vec<AinlMemoryNode>, String> {
1314        let fts_q = fts5_prefix_match_query(query);
1315        if fts_q.is_empty() || agent_id.trim().is_empty() {
1316            return Ok(Vec::new());
1317        }
1318        let cap = limit.clamp(1, 200) as i64;
1319        let project_filter = project_id.map(str::trim).filter(|s| !s.is_empty());
1320        let mut out = Vec::new();
1321        if let Some(p) = project_filter {
1322            let mut stmt = self
1323                .conn
1324                .prepare(
1325                    "SELECT n.payload
1326                 FROM ainl_nodes_fts AS f
1327                 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1328                 WHERE f.agent_id = ?1
1329                   AND (COALESCE(f.project_id, '') = '' OR f.project_id = ?3)
1330                   AND f.body MATCH ?2
1331                 ORDER BY n.timestamp DESC
1332                 LIMIT ?4",
1333                )
1334                .map_err(|e| e.to_string())?;
1335            let mut rows = stmt
1336                .query(rusqlite::params![agent_id, fts_q, p, cap])
1337                .map_err(|e| e.to_string())?;
1338            while let Some(row) = rows.next().map_err(|e| e.to_string())? {
1339                let payload: String = row.get(0).map_err(|e| e.to_string())?;
1340                if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1341                    out.push(node);
1342                }
1343            }
1344        } else {
1345            let mut stmt = self
1346                .conn
1347                .prepare(
1348                    "SELECT n.payload
1349                 FROM ainl_nodes_fts AS f
1350                 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1351                 WHERE f.agent_id = ?1
1352                   AND f.body MATCH ?2
1353                 ORDER BY n.timestamp DESC
1354                 LIMIT ?3",
1355                )
1356                .map_err(|e| e.to_string())?;
1357            let mut rows = stmt
1358                .query(rusqlite::params![agent_id, fts_q, cap])
1359                .map_err(|e| e.to_string())?;
1360            while let Some(row) = rows.next().map_err(|e| e.to_string())? {
1361                let payload: String = row.get(0).map_err(|e| e.to_string())?;
1362                if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1363                    out.push(node);
1364                }
1365            }
1366        }
1367        Ok(out)
1368    }
1369}
1370
1371fn map_trajectory_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<TrajectoryDetailRecord> {
1372    let id_s: String = row.get(0)?;
1373    let episode_s: String = row.get(1)?;
1374    let graph_traj: Option<String> = row.get(2)?;
1375    let agent_id: String = row.get(3)?;
1376    let session_id: String = row.get(4)?;
1377    let project_id: Option<String> = row.get(5)?;
1378    let recorded_at: i64 = row.get(6)?;
1379    let outcome_json: String = row.get(7)?;
1380    let hash: Option<String> = row.get(8)?;
1381    let duration_ms: i64 = row.get(9)?;
1382    let steps_json: String = row.get(10)?;
1383    let frame_vars_json: Option<String> = row.get(11)?;
1384    let fitness_sql: Option<f64> = row.get(12)?;
1385    let id = Uuid::parse_str(&id_s).map_err(|_| {
1386        rusqlite::Error::InvalidColumnType(0, "id".into(), rusqlite::types::Type::Text)
1387    })?;
1388    let episode_id = Uuid::parse_str(&episode_s).map_err(|_| {
1389        rusqlite::Error::InvalidColumnType(1, "episode_id".into(), rusqlite::types::Type::Text)
1390    })?;
1391    let graph_trajectory_node_id = graph_traj
1392        .filter(|s| !s.is_empty())
1393        .map(|s| Uuid::parse_str(&s))
1394        .transpose()
1395        .map_err(|_| {
1396            rusqlite::Error::InvalidColumnType(
1397                2,
1398                "graph_trajectory_node_id".into(),
1399                rusqlite::types::Type::Text,
1400            )
1401        })?;
1402    let outcome: TrajectoryOutcome = serde_json::from_str(&outcome_json).map_err(|_| {
1403        rusqlite::Error::InvalidColumnType(7, "outcome_json".into(), rusqlite::types::Type::Text)
1404    })?;
1405    let steps: Vec<TrajectoryStep> = serde_json::from_str(&steps_json).map_err(|_| {
1406        rusqlite::Error::InvalidColumnType(10, "steps_json".into(), rusqlite::types::Type::Text)
1407    })?;
1408    let frame_vars = frame_vars_json
1409        .filter(|s| !s.trim().is_empty())
1410        .and_then(|s| serde_json::from_str(&s).ok());
1411    let fitness_delta = fitness_sql.map(|f| f as f32);
1412    Ok(TrajectoryDetailRecord {
1413        id,
1414        episode_id,
1415        graph_trajectory_node_id,
1416        agent_id,
1417        session_id,
1418        project_id,
1419        recorded_at,
1420        outcome,
1421        ainl_source_hash: hash,
1422        duration_ms: duration_ms.max(0) as u64,
1423        steps,
1424        frame_vars,
1425        fitness_delta,
1426    })
1427}
1428
1429impl GraphStore for SqliteGraphStore {
1430    /// Persists the full node JSON under `id` via `INSERT OR REPLACE` (upsert).
1431    /// Backfill pattern: `read_node` → patch fields (e.g. episodic signals) → `write_node`, preserving loaded `edges`.
1432    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String> {
1433        persist_node(&self.conn, node)
1434    }
1435
1436    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String> {
1437        let payload: Option<String> = self
1438            .conn
1439            .query_row(
1440                "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
1441                [id.to_string()],
1442                |row| row.get::<_, String>(0),
1443            )
1444            .optional()
1445            .map_err(|e: rusqlite::Error| e.to_string())?;
1446
1447        match payload {
1448            Some(p) => {
1449                let node: AinlMemoryNode = serde_json::from_str(&p).map_err(|e| e.to_string())?;
1450                Ok(Some(node))
1451            }
1452            None => Ok(None),
1453        }
1454    }
1455
1456    fn query_episodes_since(
1457        &self,
1458        since_timestamp: i64,
1459        limit: usize,
1460    ) -> Result<Vec<AinlMemoryNode>, String> {
1461        let mut stmt = self
1462            .conn
1463            .prepare(
1464                "SELECT payload FROM ainl_graph_nodes
1465                 WHERE node_type = 'episode' AND timestamp >= ?1
1466                 ORDER BY timestamp DESC
1467                 LIMIT ?2",
1468            )
1469            .map_err(|e| e.to_string())?;
1470
1471        let rows = stmt
1472            .query_map([since_timestamp, limit as i64], |row| {
1473                let payload: String = row.get(0)?;
1474                Ok(payload)
1475            })
1476            .map_err(|e| e.to_string())?;
1477
1478        let mut nodes = Vec::new();
1479        for row in rows {
1480            let payload = row.map_err(|e| e.to_string())?;
1481            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
1482            nodes.push(node);
1483        }
1484
1485        Ok(nodes)
1486    }
1487
1488    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String> {
1489        let mut stmt = self
1490            .conn
1491            .prepare(
1492                "SELECT payload FROM ainl_graph_nodes
1493                 WHERE node_type = ?1
1494                 ORDER BY timestamp DESC",
1495            )
1496            .map_err(|e| e.to_string())?;
1497
1498        let rows = stmt
1499            .query_map([type_name], |row| {
1500                let payload: String = row.get(0)?;
1501                Ok(payload)
1502            })
1503            .map_err(|e| e.to_string())?;
1504
1505        let mut nodes = Vec::new();
1506        for row in rows {
1507            let payload = row.map_err(|e| e.to_string())?;
1508            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
1509            nodes.push(node);
1510        }
1511
1512        Ok(nodes)
1513    }
1514
1515    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
1516        let mut stmt = self
1517            .conn
1518            .prepare(
1519                "SELECT to_id FROM ainl_graph_edges
1520                 WHERE from_id = ?1 AND label = ?2",
1521            )
1522            .map_err(|e| e.to_string())?;
1523
1524        let target_ids: Vec<String> = stmt
1525            .query_map([from_id.to_string(), label.to_string()], |row| row.get(0))
1526            .map_err(|e| e.to_string())?
1527            .collect::<Result<Vec<_>, _>>()
1528            .map_err(|e| e.to_string())?;
1529
1530        let mut nodes = Vec::new();
1531        for target_id in target_ids {
1532            let id = Uuid::parse_str(&target_id).map_err(|e| e.to_string())?;
1533            if let Some(node) = self.read_node(id)? {
1534                nodes.push(node);
1535            }
1536        }
1537
1538        Ok(nodes)
1539    }
1540
1541    fn walk_edges_to(&self, to_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
1542        let mut stmt = self
1543            .conn
1544            .prepare(
1545                "SELECT from_id FROM ainl_graph_edges
1546                 WHERE to_id = ?1 AND label = ?2",
1547            )
1548            .map_err(|e| e.to_string())?;
1549
1550        let source_ids: Vec<String> = stmt
1551            .query_map([to_id.to_string(), label.to_string()], |row| row.get(0))
1552            .map_err(|e| e.to_string())?
1553            .collect::<Result<Vec<_>, _>>()
1554            .map_err(|e| e.to_string())?;
1555
1556        let mut nodes = Vec::new();
1557        for source_id in source_ids {
1558            let id = Uuid::parse_str(&source_id).map_err(|e| e.to_string())?;
1559            if let Some(node) = self.read_node(id)? {
1560                nodes.push(node);
1561            }
1562        }
1563
1564        Ok(nodes)
1565    }
1566}