Skip to main content

ainl_memory/
store.rs

1//! Graph storage backends for AINL memory.
2//!
3//! Defines the [`GraphStore`] trait and the SQLite implementation.
4//!
5//! ## Referential integrity (SQLite)
6//!
7//! `ainl_graph_edges` uses real `FOREIGN KEY (from_id)` / `FOREIGN KEY (to_id)` references to
8//! `ainl_graph_nodes(id)` with `ON DELETE CASCADE`. [`SqliteGraphStore::open`] and
9//! [`SqliteGraphStore::from_connection`] run `PRAGMA foreign_keys = ON` on the handle.
10//!
11//! Databases created before these constraints used a plain edges table; [`SqliteGraphStore::ensure_schema`]
12//! runs a one-time `migrate_edges_add_foreign_keys` rebuild. Edge rows whose endpoints
13//! are missing from `ainl_graph_nodes` **cannot** be kept under FK rules and are **omitted** from
14//! the migrated copy.
15//!
16//! ## Above the database (still recommended)
17//!
18//! - **Eager checks**: [`SqliteGraphStore::write_node_with_edges`], [`SqliteGraphStore::insert_graph_edge_checked`]
19//!   give clear errors without relying on SQLite error text alone.
20//! - **Repair / forensic import**: [`SqliteGraphStore::import_graph`] with `allow_dangling_edges: true`
21//!   is the **supported** way to load snapshots that violate referential integrity: FK enforcement is
22//!   disabled only for the duration of that import, then turned back on. Follow with
23//!   [`SqliteGraphStore::validate_graph`] before resuming normal writes on the same connection.
24//! - **Semantic graph checks**: [`SqliteGraphStore::validate_graph`] (agent-scoped edges, dangling
25//!   diagnostics, cross-agent boundary counts, etc.) — orthogonal to FK row existence.
26//!
27//! SQLite tables integrate with existing openfang-memory schema where applicable.
28
29use crate::node::{AinlMemoryNode, AinlNodeType, MemoryCategory, RuntimeStateNode};
30use crate::snapshot::{
31    AgentGraphSnapshot, DanglingEdgeDetail, GraphValidationReport, SnapshotEdge,
32    SNAPSHOT_SCHEMA_VERSION,
33};
34use crate::trajectory_table::TrajectoryDetailRecord;
35use ainl_contracts::{TrajectoryOutcome, TrajectoryStep};
36use chrono::Utc;
37use rusqlite::OptionalExtension;
38use std::collections::HashSet;
39use uuid::Uuid;
40
41/// Typed failures for snapshot import.
42#[derive(Debug, Clone)]
43pub enum SnapshotImportError {
44    UnsupportedSchemaVersion { got: String, expected: &'static str },
45    Sqlite(String),
46}
47
48impl std::fmt::Display for SnapshotImportError {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            Self::UnsupportedSchemaVersion { got, expected } => write!(
52                f,
53                "unsupported snapshot schema_version '{got}'; expected '{expected}'"
54            ),
55            Self::Sqlite(e) => write!(f, "{e}"),
56        }
57    }
58}
59
60/// Typed failures for graph validation.
61#[derive(Debug, Clone)]
62pub enum GraphValidationError {
63    Sqlite(String),
64}
65
66impl std::fmt::Display for GraphValidationError {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            Self::Sqlite(e) => write!(f, "{e}"),
70        }
71    }
72}
73
74/// Graph memory storage trait - swappable backends
75pub trait GraphStore {
76    /// Write a node to storage
77    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String>;
78
79    /// Read a node by ID
80    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String>;
81
82    /// Query episodes since a given timestamp
83    fn query_episodes_since(
84        &self,
85        since_timestamp: i64,
86        limit: usize,
87    ) -> Result<Vec<AinlMemoryNode>, String>;
88
89    /// Find nodes by type
90    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String>;
91
92    /// Walk edges from a node with a given label
93    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
94}
95
96/// SQLite implementation of GraphStore
97///
98/// Integrates with existing openfang-memory schema by adding two tables:
99/// - `ainl_graph_nodes`: stores node payloads
100/// - `ainl_graph_edges`: stores graph edges
101pub struct SqliteGraphStore {
102    conn: rusqlite::Connection,
103}
104
105fn enable_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
106    conn.execute_batch("PRAGMA foreign_keys = ON;")
107}
108
109fn migrate_edge_columns(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
110    let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_edges)")?;
111    let cols = stmt
112        .query_map([], |row| row.get::<_, String>(1))?
113        .collect::<Result<Vec<_>, _>>()?;
114    if !cols.iter().any(|c| c == "weight") {
115        conn.execute(
116            "ALTER TABLE ainl_graph_edges ADD COLUMN weight REAL NOT NULL DEFAULT 1.0",
117            [],
118        )?;
119    }
120    if !cols.iter().any(|c| c == "metadata") {
121        conn.execute("ALTER TABLE ainl_graph_edges ADD COLUMN metadata TEXT", [])?;
122    }
123    Ok(())
124}
125
126/// True when `ainl_graph_edges` declares at least one foreign-key reference (new schema).
127fn edges_table_has_foreign_keys(conn: &rusqlite::Connection) -> Result<bool, rusqlite::Error> {
128    let exists: i64 = conn.query_row(
129        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
130        [],
131        |r| r.get(0),
132    )?;
133    if exists == 0 {
134        return Ok(false);
135    }
136    let n: i64 = conn.query_row(
137        "SELECT COUNT(*) FROM pragma_foreign_key_list('ainl_graph_edges')",
138        [],
139        |r| r.get(0),
140    )?;
141    Ok(n > 0)
142}
143
144/// Rebuild `ainl_graph_edges` with `FOREIGN KEY` constraints. Rows whose endpoints are missing
145/// from `ainl_graph_nodes` are **dropped** (they cannot be represented under FK rules).
146fn migrate_edges_add_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
147    if edges_table_has_foreign_keys(conn)? {
148        return Ok(());
149    }
150
151    let exists: i64 = conn.query_row(
152        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
153        [],
154        |r| r.get(0),
155    )?;
156    if exists == 0 {
157        return Ok(());
158    }
159
160    conn.execute("BEGIN IMMEDIATE", [])?;
161    let res: Result<(), rusqlite::Error> = (|| {
162        conn.execute("DROP INDEX IF EXISTS idx_ainl_edges_from", [])?;
163        conn.execute(
164            "ALTER TABLE ainl_graph_edges RENAME TO ainl_graph_edges__old",
165            [],
166        )?;
167        conn.execute(
168            r#"CREATE TABLE ainl_graph_edges (
169                from_id TEXT NOT NULL,
170                to_id TEXT NOT NULL,
171                label TEXT NOT NULL,
172                weight REAL NOT NULL DEFAULT 1.0,
173                metadata TEXT,
174                PRIMARY KEY (from_id, to_id, label),
175                FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
176                FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
177            )"#,
178            [],
179        )?;
180        conn.execute(
181            r#"INSERT INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
182               SELECT o.from_id, o.to_id, o.label,
183                      COALESCE(o.weight, 1.0),
184                      o.metadata
185               FROM ainl_graph_edges__old o
186               WHERE EXISTS (SELECT 1 FROM ainl_graph_nodes n WHERE n.id = o.from_id)
187                 AND EXISTS (SELECT 1 FROM ainl_graph_nodes n2 WHERE n2.id = o.to_id)"#,
188            [],
189        )?;
190        conn.execute("DROP TABLE ainl_graph_edges__old", [])?;
191        conn.execute(
192            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from ON ainl_graph_edges(from_id, label)",
193            [],
194        )?;
195        Ok(())
196    })();
197
198    match res {
199        Ok(()) => {
200            conn.execute("COMMIT", [])?;
201        }
202        Err(e) => {
203            let _ = conn.execute("ROLLBACK", []);
204            return Err(e);
205        }
206    }
207    Ok(())
208}
209
210fn node_type_name(node: &AinlMemoryNode) -> &'static str {
211    match &node.node_type {
212        AinlNodeType::Episode { .. } => "episode",
213        AinlNodeType::Semantic { .. } => "semantic",
214        AinlNodeType::Procedural { .. } => "procedural",
215        AinlNodeType::Persona { .. } => "persona",
216        AinlNodeType::RuntimeState { .. } => "runtime_state",
217        AinlNodeType::Trajectory { .. } => "trajectory",
218        AinlNodeType::Failure { .. } => "failure",
219    }
220}
221
222fn node_timestamp(node: &AinlMemoryNode) -> i64 {
223    match &node.node_type {
224        AinlNodeType::Episode { episodic } => episodic.timestamp,
225        AinlNodeType::RuntimeState { runtime_state } => runtime_state.updated_at,
226        AinlNodeType::Trajectory { trajectory } => trajectory.recorded_at,
227        AinlNodeType::Failure { failure } => failure.recorded_at,
228        _ => chrono::Utc::now().timestamp(),
229    }
230}
231
232fn failure_fts_body(node: &AinlMemoryNode) -> Option<String> {
233    match &node.node_type {
234        AinlNodeType::Failure { failure } => Some(format!(
235            "{} {} {}",
236            failure.source,
237            failure.tool_name.as_deref().unwrap_or(""),
238            failure.message
239        )),
240        _ => None,
241    }
242}
243
244/// Token-prefix AND query for FTS5 `body MATCH` (returns empty → skip search).
245fn fts5_prefix_match_query(raw: &str) -> String {
246    raw.split_whitespace()
247        .filter(|t| !t.is_empty())
248        .filter_map(|t| {
249            let esc: String = t.chars().filter(|c| !c.is_control() && *c != '"').collect();
250            if esc.is_empty() {
251                return None;
252            }
253            Some(format!("\"{esc}*\""))
254        })
255        .collect::<Vec<_>>()
256        .join(" AND ")
257}
258
259fn sync_failure_fts_insert(conn: &rusqlite::Connection, node_id: &str, body: &str) -> Result<(), String> {
260    conn.execute(
261        "DELETE FROM ainl_failures_fts WHERE node_id = ?1",
262        [node_id],
263    )
264    .map_err(|e| e.to_string())?;
265    conn.execute(
266        "INSERT INTO ainl_failures_fts(node_id, body) VALUES (?1, ?2)",
267        rusqlite::params![node_id, body],
268    )
269    .map_err(|e| e.to_string())?;
270    Ok(())
271}
272
273/// Full JSON payload (all node kinds) for `ainl_nodes_fts` — generic graph search.
274fn graph_node_fts_body_from_payload_json(payload: &str) -> String {
275    if payload.chars().count() > 400_000 {
276        payload.chars().take(400_000).collect()
277    } else {
278        payload.to_string()
279    }
280}
281
282fn sync_all_nodes_fts_insert(
283    conn: &rusqlite::Connection,
284    node_id: &str,
285    agent_id: &str,
286    project_id: Option<&str>,
287    body: &str,
288) -> Result<(), String> {
289    let proj = project_id.map(str::trim).filter(|s| !s.is_empty());
290    conn.execute("DELETE FROM ainl_nodes_fts WHERE node_id = ?1", [node_id])
291        .map_err(|e| e.to_string())?;
292    conn.execute(
293        "INSERT INTO ainl_nodes_fts(node_id, agent_id, project_id, body) VALUES (?1, ?2, ?3, ?4)",
294        rusqlite::params![node_id, agent_id, proj, body],
295    )
296    .map_err(|e| e.to_string())?;
297    Ok(())
298}
299
300fn persist_edge(
301    conn: &rusqlite::Connection,
302    from_id: Uuid,
303    to_id: Uuid,
304    label: &str,
305    weight: f32,
306    metadata: Option<&str>,
307) -> Result<(), String> {
308    conn.execute(
309        "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
310         VALUES (?1, ?2, ?3, ?4, ?5)",
311        rusqlite::params![
312            from_id.to_string(),
313            to_id.to_string(),
314            label,
315            weight,
316            metadata
317        ],
318    )
319    .map_err(|e| e.to_string())?;
320    Ok(())
321}
322
323/// All `ainl_graph_edges` rows whose endpoints are both present in `id_set`, as [`SnapshotEdge`] values.
324fn collect_snapshot_edges_for_id_set(
325    conn: &rusqlite::Connection,
326    id_set: &HashSet<String>,
327) -> Result<Vec<SnapshotEdge>, String> {
328    let mut edge_stmt = conn
329        .prepare("SELECT from_id, to_id, label, weight, metadata FROM ainl_graph_edges")
330        .map_err(|e| e.to_string())?;
331    let edge_rows = edge_stmt
332        .query_map([], |row| {
333            Ok((
334                row.get::<_, String>(0)?,
335                row.get::<_, String>(1)?,
336                row.get::<_, String>(2)?,
337                row.get::<_, f64>(3)?,
338                row.get::<_, Option<String>>(4)?,
339            ))
340        })
341        .map_err(|e| e.to_string())?
342        .collect::<Result<Vec<_>, _>>()
343        .map_err(|e| e.to_string())?;
344
345    let mut edges = Vec::new();
346    for (from_id, to_id, label, weight, meta) in edge_rows {
347        if !id_set.contains(&from_id) || !id_set.contains(&to_id) {
348            continue;
349        }
350        let source_id = Uuid::parse_str(&from_id).map_err(|e| e.to_string())?;
351        let target_id = Uuid::parse_str(&to_id).map_err(|e| e.to_string())?;
352        let metadata = match meta {
353            Some(s) if !s.is_empty() => Some(serde_json::from_str(&s).map_err(|e| e.to_string())?),
354            _ => None,
355        };
356        edges.push(SnapshotEdge {
357            source_id,
358            target_id,
359            edge_type: label,
360            weight: weight as f32,
361            metadata,
362        });
363    }
364    Ok(edges)
365}
366
367fn persist_node(conn: &rusqlite::Connection, node: &AinlMemoryNode) -> Result<(), String> {
368    let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
369    let type_name = node_type_name(node);
370    let timestamp = node_timestamp(node);
371    let proj = node
372        .project_id
373        .as_deref()
374        .map(str::trim)
375        .filter(|s| !s.is_empty());
376
377    conn.execute(
378        "INSERT OR REPLACE INTO ainl_graph_nodes (id, node_type, payload, timestamp, project_id)
379         VALUES (?1, ?2, ?3, ?4, ?5)",
380        rusqlite::params![node.id.to_string(), type_name, payload, timestamp, proj,],
381    )
382    .map_err(|e| e.to_string())?;
383
384    for edge in &node.edges {
385        persist_edge(
386            conn,
387            node.id,
388            edge.target_id,
389            &edge.label,
390            1.0,
391            None::<&str>,
392        )?;
393    }
394
395    let body_all = graph_node_fts_body_from_payload_json(&payload);
396    if !node.agent_id.trim().is_empty() {
397        let _ = sync_all_nodes_fts_insert(
398            conn,
399            &node.id.to_string(),
400            node.agent_id.as_str(),
401            proj,
402            &body_all,
403        );
404    }
405
406    if let Some(body) = failure_fts_body(node) {
407        // Best-effort: graph row is authoritative; FTS is auxiliary for search.
408        let _ = sync_failure_fts_insert(conn, &node.id.to_string(), &body);
409    }
410
411    Ok(())
412}
413
414fn try_insert_node_ignore(
415    conn: &rusqlite::Connection,
416    node: &AinlMemoryNode,
417) -> Result<(), String> {
418    let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
419    let type_name = node_type_name(node);
420    let timestamp = node_timestamp(node);
421    let proj = node
422        .project_id
423        .as_deref()
424        .map(str::trim)
425        .filter(|s| !s.is_empty());
426    let n = conn
427        .execute(
428        "INSERT OR IGNORE INTO ainl_graph_nodes (id, node_type, payload, timestamp, project_id)
429         VALUES (?1, ?2, ?3, ?4, ?5)",
430        rusqlite::params![node.id.to_string(), type_name, payload, timestamp, proj,],
431    )
432    .map_err(|e| e.to_string())?;
433    if n > 0 {
434        if !node.agent_id.trim().is_empty() {
435            let body_all = graph_node_fts_body_from_payload_json(&payload);
436            let _ = sync_all_nodes_fts_insert(
437                conn,
438                &node.id.to_string(),
439                node.agent_id.as_str(),
440                proj,
441                &body_all,
442            );
443        }
444        if let Some(body) = failure_fts_body(node) {
445            let _ = sync_failure_fts_insert(conn, &node.id.to_string(), &body);
446        }
447    }
448    Ok(())
449}
450
451fn try_insert_edge_ignore(conn: &rusqlite::Connection, edge: &SnapshotEdge) -> Result<(), String> {
452    let meta = match &edge.metadata {
453        Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
454        None => None,
455    };
456    conn.execute(
457        "INSERT OR IGNORE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
458         VALUES (?1, ?2, ?3, ?4, ?5)",
459        rusqlite::params![
460            edge.source_id.to_string(),
461            edge.target_id.to_string(),
462            edge.edge_type,
463            edge.weight,
464            meta.as_deref(),
465        ],
466    )
467    .map_err(|e| e.to_string())?;
468    Ok(())
469}
470
471fn migrate_failures_fts_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
472    conn.execute(
473        "CREATE VIRTUAL TABLE IF NOT EXISTS ainl_failures_fts USING fts5(
474            node_id UNINDEXED,
475            body,
476            tokenize = 'unicode61 remove_diacritics 1'
477        )",
478        [],
479    )?;
480    Ok(())
481}
482
483fn migrate_ainl_graph_nodes_add_project_id(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
484    let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_nodes)")?;
485    let cols = stmt
486        .query_map([], |row| row.get::<_, String>(1))?
487        .collect::<Result<Vec<_>, _>>()?;
488    if !cols.iter().any(|c| c == "project_id") {
489        conn.execute(
490            "ALTER TABLE ainl_graph_nodes ADD COLUMN project_id TEXT",
491            [],
492        )?;
493    }
494    Ok(())
495}
496
497fn migrate_ainl_nodes_fts_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
498    conn.execute(
499        "CREATE VIRTUAL TABLE IF NOT EXISTS ainl_nodes_fts USING fts5(
500            node_id UNINDEXED,
501            agent_id UNINDEXED,
502            project_id UNINDEXED,
503            body,
504            tokenize = 'unicode61 remove_diacritics 1'
505        )",
506        [],
507    )?;
508    Ok(())
509}
510
511/// Remove FTS shadow rows when the primary graph row is deleted (`DELETE` from `ainl_graph_nodes`).
512fn install_ainl_graph_node_delete_fts_triggers(
513    conn: &rusqlite::Connection,
514) -> Result<(), rusqlite::Error> {
515    conn.execute_batch(
516        "DROP TRIGGER IF EXISTS ainl_graph_nodes_after_delete_fts;
517         CREATE TRIGGER ainl_graph_nodes_after_delete_fts
518         AFTER DELETE ON ainl_graph_nodes
519         FOR EACH ROW
520         BEGIN
521           DELETE FROM ainl_nodes_fts WHERE node_id = OLD.id;
522           DELETE FROM ainl_failures_fts WHERE node_id = OLD.id;
523         END;",
524    )?;
525    Ok(())
526}
527
528/// One-time: populate `ainl_nodes_fts` from existing `ainl_graph_nodes` (legacy DBs).
529fn backfill_ainl_nodes_fts_if_empty(conn: &rusqlite::Connection) -> Result<(), String> {
530    let fts_n: i64 = conn
531        .query_row("SELECT COUNT(*) FROM ainl_nodes_fts", [], |r| r.get(0))
532        .unwrap_or(0);
533    if fts_n > 0 {
534        return Ok(());
535    }
536    let mut stmt = conn
537        .prepare(
538            "SELECT id, payload, project_id FROM ainl_graph_nodes
539             WHERE TRIM(COALESCE(json_extract(payload, '$.agent_id'), '')) != ''",
540        )
541        .map_err(|e| e.to_string())?;
542    let rows = stmt
543        .query_map([], |row| {
544            Ok((
545                row.get::<_, String>(0)?,
546                row.get::<_, String>(1)?,
547                row.get::<_, Option<String>>(2)?,
548            ))
549        })
550        .map_err(|e| e.to_string())?
551        .collect::<Result<Vec<_>, _>>()
552        .map_err(|e| e.to_string())?;
553    for (id, payload, col_proj) in rows {
554        let v: serde_json::Value = match serde_json::from_str(&payload) {
555            Ok(x) => x,
556            Err(_) => continue,
557        };
558        let ag = v
559            .get("agent_id")
560            .and_then(|x| x.as_str())
561            .map(str::trim)
562            .unwrap_or("");
563        if ag.is_empty() {
564            continue;
565        }
566        let json_proj = v
567            .get("project_id")
568            .and_then(|x| x.as_str())
569            .map(str::trim)
570            .filter(|s| !s.is_empty());
571        let proj = col_proj
572            .as_deref()
573            .map(str::trim)
574            .filter(|s| !s.is_empty())
575            .or(json_proj);
576        let body = graph_node_fts_body_from_payload_json(&payload);
577        if sync_all_nodes_fts_insert(conn, &id, ag, proj, &body).is_err() {
578            // Best-effort backfill: continue with remaining rows.
579        }
580    }
581    Ok(())
582}
583
584fn migrate_trajectories_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
585    conn.execute(
586        "CREATE TABLE IF NOT EXISTS ainl_trajectories (
587            id TEXT PRIMARY KEY,
588            episode_id TEXT NOT NULL,
589            graph_trajectory_node_id TEXT,
590            agent_id TEXT NOT NULL,
591            session_id TEXT NOT NULL,
592            project_id TEXT,
593            recorded_at INTEGER NOT NULL,
594            outcome_json TEXT NOT NULL,
595            ainl_source_hash TEXT,
596            duration_ms INTEGER NOT NULL DEFAULT 0,
597            steps_json TEXT NOT NULL,
598            FOREIGN KEY (episode_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
599        )",
600        [],
601    )?;
602    conn.execute(
603        "CREATE INDEX IF NOT EXISTS idx_ainl_traj_agent_time
604         ON ainl_trajectories(agent_id, recorded_at DESC)",
605        [],
606    )?;
607    Ok(())
608}
609
610fn migrate_ainl_trajectories_add_depth_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
611    let mut stmt = conn.prepare("PRAGMA table_info(ainl_trajectories)")?;
612    let cols: Vec<String> = stmt
613        .query_map([], |row| row.get::<_, String>(1))?
614        .collect::<Result<Vec<_>, _>>()?;
615    if !cols.iter().any(|c| c == "frame_vars_json") {
616        conn.execute(
617            "ALTER TABLE ainl_trajectories ADD COLUMN frame_vars_json TEXT",
618            [],
619        )?;
620    }
621    if !cols.iter().any(|c| c == "fitness_delta") {
622        conn.execute("ALTER TABLE ainl_trajectories ADD COLUMN fitness_delta REAL", [])?;
623    }
624    Ok(())
625}
626
627impl SqliteGraphStore {
628    /// Ensure the AINL graph schema exists in the database
629    pub fn ensure_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
630        conn.execute(
631            "CREATE TABLE IF NOT EXISTS ainl_graph_nodes (
632                id TEXT PRIMARY KEY,
633                node_type TEXT NOT NULL,
634                payload TEXT NOT NULL,
635                timestamp INTEGER NOT NULL
636            )",
637            [],
638        )?;
639
640        conn.execute(
641            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_timestamp
642             ON ainl_graph_nodes(timestamp DESC)",
643            [],
644        )?;
645
646        conn.execute(
647            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_type
648             ON ainl_graph_nodes(node_type)",
649            [],
650        )?;
651
652        migrate_ainl_graph_nodes_add_project_id(conn)?;
653        conn.execute(
654            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_project_type_time
655             ON ainl_graph_nodes(project_id, node_type, timestamp)",
656            [],
657        )?;
658
659        conn.execute(
660            "CREATE TABLE IF NOT EXISTS ainl_graph_edges (
661                from_id TEXT NOT NULL,
662                to_id TEXT NOT NULL,
663                label TEXT NOT NULL,
664                weight REAL NOT NULL DEFAULT 1.0,
665                metadata TEXT,
666                PRIMARY KEY (from_id, to_id, label),
667                FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
668                FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
669            )",
670            [],
671        )?;
672
673        conn.execute(
674            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from
675             ON ainl_graph_edges(from_id, label)",
676            [],
677        )?;
678
679        migrate_edge_columns(conn)?;
680        migrate_edges_add_foreign_keys(conn)?;
681        migrate_trajectories_v1(conn)?;
682        migrate_ainl_trajectories_add_depth_v1(conn)?;
683        migrate_failures_fts_v1(conn)?;
684        migrate_ainl_nodes_fts_v1(conn)?;
685        if let Err(_) = backfill_ainl_nodes_fts_if_empty(conn) {
686            // Non-fatal: new DBs may have empty graph; legacy rows can be re-synced on next write.
687        }
688        let _ = install_ainl_graph_node_delete_fts_triggers(conn);
689        Ok(())
690    }
691
692    /// Open/create a graph store at the given path
693    pub fn open(path: &std::path::Path) -> Result<Self, String> {
694        let conn = rusqlite::Connection::open(path).map_err(|e| e.to_string())?;
695        enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
696        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
697        Ok(Self { conn })
698    }
699
700    /// Create from an existing connection (for integration with openfang-memory pool)
701    pub fn from_connection(conn: rusqlite::Connection) -> Result<Self, String> {
702        enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
703        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
704        Ok(Self { conn })
705    }
706
707    /// Low-level access for query builders in this crate.
708    pub(crate) fn conn(&self) -> &rusqlite::Connection {
709        &self.conn
710    }
711
712    /// Insert a directed edge between two node IDs (separate from per-node edge payloads).
713    pub fn insert_graph_edge(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
714        persist_edge(&self.conn, from_id, to_id, label, 1.0, None)
715    }
716
717    /// Like [`Self::insert_graph_edge`], but verifies both endpoints exist first (clear errors for strict runtime wiring).
718    pub fn insert_graph_edge_checked(
719        &self,
720        from_id: Uuid,
721        to_id: Uuid,
722        label: &str,
723    ) -> Result<(), String> {
724        if !self.node_row_exists(&from_id.to_string())? {
725            return Err(format!(
726                "insert_graph_edge_checked: missing source node row {}",
727                from_id
728            ));
729        }
730        if !self.node_row_exists(&to_id.to_string())? {
731            return Err(format!(
732                "insert_graph_edge_checked: missing target node row {}",
733                to_id
734            ));
735        }
736        self.insert_graph_edge(from_id, to_id, label)
737    }
738
739    /// Same as [`Self::insert_graph_edge`], with optional edge weight and JSON metadata.
740    pub fn insert_graph_edge_with_meta(
741        &self,
742        from_id: Uuid,
743        to_id: Uuid,
744        label: &str,
745        weight: f32,
746        metadata: Option<&serde_json::Value>,
747    ) -> Result<(), String> {
748        let meta = metadata
749            .map(serde_json::to_string)
750            .transpose()
751            .map_err(|e| e.to_string())?;
752        persist_edge(&self.conn, from_id, to_id, label, weight, meta.as_deref())
753    }
754
755    /// Nodes of a given `node_type` with `timestamp >= since_timestamp`, most recent first.
756    pub fn query_nodes_by_type_since(
757        &self,
758        node_type: &str,
759        since_timestamp: i64,
760        limit: usize,
761    ) -> Result<Vec<AinlMemoryNode>, String> {
762        let mut stmt = self
763            .conn
764            .prepare(
765                "SELECT payload FROM ainl_graph_nodes
766                 WHERE node_type = ?1 AND timestamp >= ?2
767                 ORDER BY timestamp DESC
768                 LIMIT ?3",
769            )
770            .map_err(|e| e.to_string())?;
771
772        let rows = stmt
773            .query_map(
774                rusqlite::params![node_type, since_timestamp, limit as i64],
775                |row| {
776                    let payload: String = row.get(0)?;
777                    Ok(payload)
778                },
779            )
780            .map_err(|e| e.to_string())?;
781
782        let mut nodes = Vec::new();
783        for row in rows {
784            let payload = row.map_err(|e| e.to_string())?;
785            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
786            nodes.push(node);
787        }
788
789        Ok(nodes)
790    }
791
792    /// Read the most recent persisted [`RuntimeStateNode`] for `agent_id`, if any.
793    ///
794    /// Rows are stored with `node_type = 'runtime_state'` and JSON `$.node_type.runtime_state.agent_id` matching the agent.
795    pub fn read_runtime_state(&self, agent_id: &str) -> Result<Option<RuntimeStateNode>, String> {
796        if agent_id.is_empty() {
797            return Ok(None);
798        }
799        let mut stmt = self
800            .conn
801            .prepare(
802                "SELECT payload FROM ainl_graph_nodes
803                 WHERE node_type = 'runtime_state'
804                   AND json_extract(payload, '$.node_type.runtime_state.agent_id') = ?1
805                 ORDER BY timestamp DESC
806                 LIMIT 1",
807            )
808            .map_err(|e| e.to_string())?;
809
810        let payload_opt: Option<String> = stmt
811            .query_row([agent_id], |row| row.get(0))
812            .optional()
813            .map_err(|e| e.to_string())?;
814
815        let Some(payload) = payload_opt else {
816            return Ok(None);
817        };
818
819        let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
820        match node.node_type {
821            AinlNodeType::RuntimeState { runtime_state } => Ok(Some(runtime_state)),
822            _ => Err("runtime_state row had unexpected node_type payload".to_string()),
823        }
824    }
825
826    /// Upsert one [`RuntimeStateNode`] row per agent (stable id via [`Uuid::new_v5`]).
827    pub fn write_runtime_state(&self, state: &RuntimeStateNode) -> Result<(), String> {
828        let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, state.agent_id.as_bytes());
829        let node = AinlMemoryNode {
830            id,
831            memory_category: MemoryCategory::RuntimeState,
832            importance_score: 0.5,
833            agent_id: state.agent_id.clone(),
834            project_id: None,
835            node_type: AinlNodeType::RuntimeState {
836                runtime_state: state.clone(),
837            },
838            edges: Vec::new(),
839        };
840        self.write_node(&node)
841    }
842
843    /// Write a node and its embedded edges in one transaction; fails if any edge target is missing.
844    pub fn write_node_with_edges(&mut self, node: &AinlMemoryNode) -> Result<(), String> {
845        let tx = self.conn.transaction().map_err(|e| e.to_string())?;
846        for edge in &node.edges {
847            let exists: Option<i32> = tx
848                .query_row(
849                    "SELECT 1 FROM ainl_graph_nodes WHERE id = ?1",
850                    [edge.target_id.to_string()],
851                    |_| Ok(1),
852                )
853                .optional()
854                .map_err(|e| e.to_string())?;
855            if exists.is_none() {
856                return Err(format!(
857                    "write_node_with_edges: missing target node {}",
858                    edge.target_id
859                ));
860            }
861        }
862        persist_node(&tx, node)?;
863        tx.commit().map_err(|e| e.to_string())?;
864        Ok(())
865    }
866
867    /// Validate structural integrity for one agent's induced subgraph.
868    pub fn validate_graph(&self, agent_id: &str) -> Result<GraphValidationReport, String> {
869        self.validate_graph_checked(agent_id)
870            .map_err(|e| e.to_string())
871    }
872
873    /// Typed validation variant for callers that need structured error handling.
874    pub fn validate_graph_checked(
875        &self,
876        agent_id: &str,
877    ) -> Result<GraphValidationReport, GraphValidationError> {
878        use std::collections::HashSet;
879
880        let agent_nodes = self
881            .agent_node_ids(agent_id)
882            .map_err(GraphValidationError::Sqlite)?;
883        let node_count = agent_nodes.len();
884
885        let mut stmt = self
886            .conn
887            .prepare("SELECT from_id, to_id, label FROM ainl_graph_edges")
888            .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?;
889        let all_edges: Vec<(String, String, String)> = stmt
890            .query_map([], |row| {
891                Ok((
892                    row.get::<_, String>(0)?,
893                    row.get::<_, String>(1)?,
894                    row.get::<_, String>(2)?,
895                ))
896            })
897            .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?
898            .collect::<Result<Vec<_>, _>>()
899            .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?;
900
901        let mut edge_pairs = Vec::new();
902        for (from_id, to_id, label) in all_edges {
903            let touches_agent = agent_nodes.contains(&from_id) || agent_nodes.contains(&to_id);
904            if touches_agent {
905                edge_pairs.push((from_id, to_id, label));
906            }
907        }
908
909        let edge_count = edge_pairs.len();
910        let mut dangling_edges = Vec::new();
911        let mut dangling_edge_details = Vec::new();
912        let mut cross_agent_boundary_edges = 0usize;
913
914        for (from_id, to_id, label) in &edge_pairs {
915            let from_ok = self
916                .node_row_exists(from_id)
917                .map_err(GraphValidationError::Sqlite)?;
918            let to_ok = self
919                .node_row_exists(to_id)
920                .map_err(GraphValidationError::Sqlite)?;
921            if !from_ok || !to_ok {
922                dangling_edges.push((from_id.clone(), to_id.clone()));
923                dangling_edge_details.push(DanglingEdgeDetail {
924                    source_id: from_id.clone(),
925                    target_id: to_id.clone(),
926                    edge_type: label.clone(),
927                });
928                continue;
929            }
930            let fa = agent_nodes.contains(from_id);
931            let ta = agent_nodes.contains(to_id);
932            if fa ^ ta {
933                cross_agent_boundary_edges += 1;
934            }
935        }
936
937        let mut touched: HashSet<String> =
938            HashSet::with_capacity(edge_pairs.len().saturating_mul(2));
939        for (a, b, _) in &edge_pairs {
940            if agent_nodes.contains(a) {
941                touched.insert(a.clone());
942            }
943            if agent_nodes.contains(b) {
944                touched.insert(b.clone());
945            }
946        }
947
948        let mut orphan_nodes = Vec::new();
949        for id in &agent_nodes {
950            if !touched.contains(id) {
951                orphan_nodes.push(id.clone());
952            }
953        }
954
955        let is_valid = dangling_edges.is_empty();
956        Ok(GraphValidationReport {
957            agent_id: agent_id.to_string(),
958            node_count,
959            edge_count,
960            dangling_edges,
961            dangling_edge_details,
962            cross_agent_boundary_edges,
963            orphan_nodes,
964            is_valid,
965        })
966    }
967
968    fn node_row_exists(&self, id: &str) -> Result<bool, String> {
969        let v: Option<i32> = self
970            .conn
971            .query_row("SELECT 1 FROM ainl_graph_nodes WHERE id = ?1", [id], |_| {
972                Ok(1)
973            })
974            .optional()
975            .map_err(|e| e.to_string())?;
976        Ok(v.is_some())
977    }
978
979    fn agent_node_ids(&self, agent_id: &str) -> Result<HashSet<String>, String> {
980        let mut stmt = self
981            .conn
982            .prepare(
983                "SELECT id FROM ainl_graph_nodes
984                 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
985            )
986            .map_err(|e| e.to_string())?;
987        let ids = stmt
988            .query_map([agent_id], |row| row.get::<_, String>(0))
989            .map_err(|e| e.to_string())?
990            .collect::<Result<HashSet<_>, _>>()
991            .map_err(|e| e.to_string())?;
992        Ok(ids)
993    }
994
995    /// Directed edges where **both** endpoints are nodes owned by `agent_id` (aligned with [`Self::export_graph`] edge set).
996    pub fn agent_subgraph_edges(&self, agent_id: &str) -> Result<Vec<SnapshotEdge>, String> {
997        let id_set = self.agent_node_ids(agent_id)?;
998        collect_snapshot_edges_for_id_set(&self.conn, &id_set)
999    }
1000
1001    /// Export all nodes and interconnecting edges for `agent_id`.
1002    pub fn export_graph(&self, agent_id: &str) -> Result<AgentGraphSnapshot, String> {
1003        let mut stmt = self
1004            .conn
1005            .prepare(
1006                "SELECT payload FROM ainl_graph_nodes
1007                 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
1008            )
1009            .map_err(|e| e.to_string())?;
1010        let nodes: Vec<AinlMemoryNode> = stmt
1011            .query_map([agent_id], |row| {
1012                let payload: String = row.get(0)?;
1013                Ok(payload)
1014            })
1015            .map_err(|e| e.to_string())?
1016            .map(|r| {
1017                let payload = r.map_err(|e| e.to_string())?;
1018                serde_json::from_str(&payload).map_err(|e| e.to_string())
1019            })
1020            .collect::<Result<Vec<_>, _>>()?;
1021
1022        let id_set: std::collections::HashSet<String> =
1023            nodes.iter().map(|n| n.id.to_string()).collect();
1024
1025        let edges = collect_snapshot_edges_for_id_set(&self.conn, &id_set)?;
1026
1027        Ok(AgentGraphSnapshot {
1028            agent_id: agent_id.to_string(),
1029            exported_at: Utc::now(),
1030            schema_version: std::borrow::Cow::Borrowed(SNAPSHOT_SCHEMA_VERSION),
1031            nodes,
1032            edges,
1033        })
1034    }
1035
1036    /// Import a snapshot in one transaction (`INSERT OR IGNORE` per row).
1037    ///
1038    /// * `allow_dangling_edges == false` (**default / production**): `PRAGMA foreign_keys` stays
1039    ///   enabled; every edge must reference existing node rows after inserts (same invariants as
1040    ///   [`Self::write_node_with_edges`]).
1041    /// * `allow_dangling_edges == true` (**repair / forensic**): FK checks are disabled only for
1042    ///   this import so partially invalid snapshots can be loaded; run [`Self::validate_graph`]
1043    ///   afterward and repair before returning to normal writes.
1044    pub fn import_graph(
1045        &mut self,
1046        snapshot: &AgentGraphSnapshot,
1047        allow_dangling_edges: bool,
1048    ) -> Result<(), String> {
1049        self.import_graph_checked(snapshot, allow_dangling_edges)
1050            .map_err(|e| e.to_string())
1051    }
1052
1053    /// Typed import variant for callers that want structured error handling.
1054    pub fn import_graph_checked(
1055        &mut self,
1056        snapshot: &AgentGraphSnapshot,
1057        allow_dangling_edges: bool,
1058    ) -> Result<(), SnapshotImportError> {
1059        if snapshot.schema_version.as_ref() != SNAPSHOT_SCHEMA_VERSION {
1060            return Err(SnapshotImportError::UnsupportedSchemaVersion {
1061                got: snapshot.schema_version.to_string(),
1062                expected: SNAPSHOT_SCHEMA_VERSION,
1063            });
1064        }
1065
1066        if allow_dangling_edges {
1067            self.conn
1068                .execute_batch("PRAGMA foreign_keys = OFF;")
1069                .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1070        }
1071
1072        let result: Result<(), SnapshotImportError> = (|| {
1073            let tx = self
1074                .conn
1075                .transaction()
1076                .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1077            for node in &snapshot.nodes {
1078                try_insert_node_ignore(&tx, node).map_err(SnapshotImportError::Sqlite)?;
1079            }
1080            for edge in &snapshot.edges {
1081                try_insert_edge_ignore(&tx, edge).map_err(SnapshotImportError::Sqlite)?;
1082            }
1083            tx.commit()
1084                .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1085            Ok(())
1086        })();
1087
1088        if allow_dangling_edges {
1089            self.conn
1090                .execute_batch("PRAGMA foreign_keys = ON;")
1091                .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1092        }
1093
1094        result
1095    }
1096
1097    /// Large-step trajectory row (sibling table); episode row must exist first.
1098    pub fn insert_trajectory_detail(&self, row: &TrajectoryDetailRecord) -> Result<(), String> {
1099        let steps_json = serde_json::to_string(&row.steps).map_err(|e| e.to_string())?;
1100        let outcome_json = serde_json::to_string(&row.outcome).map_err(|e| e.to_string())?;
1101        let frame_s = match &row.frame_vars {
1102            None => None,
1103            Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
1104        };
1105        self.conn
1106            .execute(
1107                "INSERT OR REPLACE INTO ainl_trajectories (
1108                    id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1109                    recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1110                    frame_vars_json, fitness_delta
1111                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1112                rusqlite::params![
1113                    row.id.to_string(),
1114                    row.episode_id.to_string(),
1115                    row.graph_trajectory_node_id.map(|u| u.to_string()),
1116                    row.agent_id,
1117                    row.session_id,
1118                    row.project_id,
1119                    row.recorded_at,
1120                    outcome_json,
1121                    row.ainl_source_hash,
1122                    row.duration_ms as i64,
1123                    steps_json,
1124                    frame_s,
1125                    row.fitness_delta,
1126                ],
1127            )
1128            .map_err(|e| e.to_string())?;
1129        Ok(())
1130    }
1131
1132    /// Recent trajectory detail rows for an agent (newest first).
1133    pub fn list_trajectories_for_agent(
1134        &self,
1135        agent_id: &str,
1136        limit: usize,
1137        since_timestamp: Option<i64>,
1138    ) -> Result<Vec<TrajectoryDetailRecord>, String> {
1139        let cap = limit.clamp(1, 500) as i64;
1140        let sql = if since_timestamp.is_some() {
1141            "SELECT id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1142                    recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1143                    frame_vars_json, fitness_delta
1144             FROM ainl_trajectories
1145             WHERE agent_id = ?1 AND recorded_at >= ?2
1146             ORDER BY recorded_at DESC
1147             LIMIT ?3"
1148        } else {
1149            "SELECT id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1150                    recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1151                    frame_vars_json, fitness_delta
1152             FROM ainl_trajectories
1153             WHERE agent_id = ?1
1154             ORDER BY recorded_at DESC
1155             LIMIT ?2"
1156        };
1157
1158        let mut stmt = self.conn.prepare(sql).map_err(|e| e.to_string())?;
1159        let rows = if let Some(since) = since_timestamp {
1160            stmt.query_map(rusqlite::params![agent_id, since, cap], map_trajectory_row)
1161        } else {
1162            stmt.query_map(rusqlite::params![agent_id, cap], map_trajectory_row)
1163        }
1164        .map_err(|e| e.to_string())?;
1165
1166        let mut out = Vec::new();
1167        for row in rows {
1168            out.push(row.map_err(|e| e.to_string())?);
1169        }
1170        Ok(out)
1171    }
1172
1173    /// Count `ainl_trajectories` rows for `agent_id` with `recorded_at` **strictly before** `before_unix` (seconds).
1174    pub fn count_trajectory_details_before(
1175        &self,
1176        agent_id: &str,
1177        before_unix: i64,
1178    ) -> Result<usize, String> {
1179        if agent_id.trim().is_empty() {
1180            return Err("agent_id is empty".into());
1181        }
1182        let n: i64 = self
1183            .conn
1184            .query_row(
1185                "SELECT COUNT(*) FROM ainl_trajectories WHERE agent_id = ?1 AND recorded_at < ?2",
1186                rusqlite::params![agent_id, before_unix],
1187                |r| r.get(0),
1188            )
1189            .map_err(|e| e.to_string())?;
1190        Ok(n as usize)
1191    }
1192
1193    /// Delete `ainl_trajectories` detail rows for `agent_id` with `recorded_at` **strictly before**
1194    /// `before_unix` (seconds). Returns the number of rows removed.
1195    ///
1196    /// This does **not** delete `Trajectory` nodes from `ainl_graph_nodes` or any edges; use graph
1197    /// export / repair paths if you need a fully consistent graph after bulk pruning.
1198    pub fn delete_trajectory_details_before(
1199        &self,
1200        agent_id: &str,
1201        before_unix: i64,
1202    ) -> Result<usize, String> {
1203        if agent_id.trim().is_empty() {
1204            return Err("agent_id is empty".into());
1205        }
1206        let n = self
1207            .conn
1208            .execute(
1209                "DELETE FROM ainl_trajectories WHERE agent_id = ?1 AND recorded_at < ?2",
1210                rusqlite::params![agent_id, before_unix],
1211            )
1212            .map_err(|e| e.to_string())?;
1213        Ok(n)
1214    }
1215
1216    /// Full-text search over persisted failure nodes for one agent (`node_type = failure`).
1217    ///
1218    /// Returns matching [`AinlMemoryNode`] rows (newest first). Invalid FTS syntax yields an empty list.
1219    pub fn search_failures_fts_for_agent(
1220        &self,
1221        agent_id: &str,
1222        query: &str,
1223        limit: usize,
1224    ) -> Result<Vec<AinlMemoryNode>, String> {
1225        let fts_q = fts5_prefix_match_query(query);
1226        if fts_q.is_empty() || agent_id.trim().is_empty() {
1227            return Ok(Vec::new());
1228        }
1229        let cap = limit.clamp(1, 200) as i64;
1230        let mut stmt = self
1231            .conn
1232            .prepare(
1233                "SELECT n.payload
1234                 FROM ainl_failures_fts AS f
1235                 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1236                 WHERE n.node_type = 'failure'
1237                   AND json_extract(n.payload, '$.agent_id') = ?1
1238                   AND f.body MATCH ?2
1239                 ORDER BY n.timestamp DESC
1240                 LIMIT ?3",
1241            )
1242            .map_err(|e| e.to_string())?;
1243
1244        let rows = stmt.query_map(rusqlite::params![agent_id, fts_q, cap], |row| {
1245            let payload: String = row.get(0)?;
1246            Ok(payload)
1247        });
1248
1249        let mut out = Vec::new();
1250        let rows = match rows {
1251            Ok(r) => r,
1252            Err(e) => {
1253                let msg = e.to_string();
1254                if msg.contains("fts5") || msg.to_ascii_lowercase().contains("syntax") {
1255                    return Ok(Vec::new());
1256                }
1257                return Err(msg);
1258            }
1259        };
1260        for row in rows {
1261            match row {
1262                Ok(payload) => {
1263                    if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1264                        out.push(node);
1265                    }
1266                }
1267                Err(e) => {
1268                    let msg = e.to_string();
1269                    if msg.contains("fts5") || msg.to_ascii_lowercase().contains("syntax") {
1270                        return Ok(Vec::new());
1271                    }
1272                    return Err(msg);
1273                }
1274            }
1275        }
1276        Ok(out)
1277    }
1278
1279    /// Full-text search over all graph node JSON payloads (see `ainl_nodes_fts`), scoped by agent.
1280    ///
1281    /// `project_id`: when `Some` (non-empty), return matches whose stored project is empty/NULL
1282    /// **or** equal to that id (so legacy unscoped rows remain visible in a project workspace).
1283    /// When `None`, do not filter by project.
1284    pub fn search_all_nodes_fts_for_agent(
1285        &self,
1286        agent_id: &str,
1287        query: &str,
1288        project_id: Option<&str>,
1289        limit: usize,
1290    ) -> Result<Vec<AinlMemoryNode>, String> {
1291        let fts_q = fts5_prefix_match_query(query);
1292        if fts_q.is_empty() || agent_id.trim().is_empty() {
1293            return Ok(Vec::new());
1294        }
1295        let cap = limit.clamp(1, 200) as i64;
1296        let project_filter = project_id
1297            .map(str::trim)
1298            .filter(|s| !s.is_empty());
1299        let mut out = Vec::new();
1300        if let Some(p) = project_filter {
1301            let mut stmt = self
1302                .conn
1303                .prepare(
1304                    "SELECT n.payload
1305                 FROM ainl_nodes_fts AS f
1306                 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1307                 WHERE f.agent_id = ?1
1308                   AND (COALESCE(f.project_id, '') = '' OR f.project_id = ?3)
1309                   AND f.body MATCH ?2
1310                 ORDER BY n.timestamp DESC
1311                 LIMIT ?4",
1312                )
1313                .map_err(|e| e.to_string())?;
1314            let mut rows = stmt
1315                .query(rusqlite::params![agent_id, fts_q, p, cap])
1316                .map_err(|e| e.to_string())?;
1317            while let Some(row) = rows.next().map_err(|e| e.to_string())? {
1318                let payload: String = row.get(0).map_err(|e| e.to_string())?;
1319                if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1320                    out.push(node);
1321                }
1322            }
1323        } else {
1324            let mut stmt = self
1325                .conn
1326                .prepare(
1327                    "SELECT n.payload
1328                 FROM ainl_nodes_fts AS f
1329                 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1330                 WHERE f.agent_id = ?1
1331                   AND f.body MATCH ?2
1332                 ORDER BY n.timestamp DESC
1333                 LIMIT ?3",
1334                )
1335                .map_err(|e| e.to_string())?;
1336            let mut rows = stmt
1337                .query(rusqlite::params![agent_id, fts_q, cap])
1338                .map_err(|e| e.to_string())?;
1339            while let Some(row) = rows.next().map_err(|e| e.to_string())? {
1340                let payload: String = row.get(0).map_err(|e| e.to_string())?;
1341                if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1342                    out.push(node);
1343                }
1344            }
1345        }
1346        Ok(out)
1347    }
1348}
1349
1350fn map_trajectory_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<TrajectoryDetailRecord> {
1351    let id_s: String = row.get(0)?;
1352    let episode_s: String = row.get(1)?;
1353    let graph_traj: Option<String> = row.get(2)?;
1354    let agent_id: String = row.get(3)?;
1355    let session_id: String = row.get(4)?;
1356    let project_id: Option<String> = row.get(5)?;
1357    let recorded_at: i64 = row.get(6)?;
1358    let outcome_json: String = row.get(7)?;
1359    let hash: Option<String> = row.get(8)?;
1360    let duration_ms: i64 = row.get(9)?;
1361    let steps_json: String = row.get(10)?;
1362    let frame_vars_json: Option<String> = row.get(11)?;
1363    let fitness_sql: Option<f64> = row.get(12)?;
1364    let id = Uuid::parse_str(&id_s).map_err(|_| {
1365        rusqlite::Error::InvalidColumnType(0, "id".into(), rusqlite::types::Type::Text)
1366    })?;
1367    let episode_id = Uuid::parse_str(&episode_s).map_err(|_| {
1368        rusqlite::Error::InvalidColumnType(1, "episode_id".into(), rusqlite::types::Type::Text)
1369    })?;
1370    let graph_trajectory_node_id = graph_traj
1371        .filter(|s| !s.is_empty())
1372        .map(|s| Uuid::parse_str(&s))
1373        .transpose()
1374        .map_err(|_| {
1375            rusqlite::Error::InvalidColumnType(2, "graph_trajectory_node_id".into(), rusqlite::types::Type::Text)
1376        })?;
1377    let outcome: TrajectoryOutcome =
1378        serde_json::from_str(&outcome_json).map_err(|_| {
1379            rusqlite::Error::InvalidColumnType(7, "outcome_json".into(), rusqlite::types::Type::Text)
1380        })?;
1381    let steps: Vec<TrajectoryStep> = serde_json::from_str(&steps_json)
1382        .map_err(|_| {
1383            rusqlite::Error::InvalidColumnType(10, "steps_json".into(), rusqlite::types::Type::Text)
1384        })?;
1385    let frame_vars = frame_vars_json
1386        .filter(|s| !s.trim().is_empty())
1387        .and_then(|s| serde_json::from_str(&s).ok());
1388    let fitness_delta = fitness_sql.map(|f| f as f32);
1389    Ok(TrajectoryDetailRecord {
1390        id,
1391        episode_id,
1392        graph_trajectory_node_id,
1393        agent_id,
1394        session_id,
1395        project_id,
1396        recorded_at,
1397        outcome,
1398        ainl_source_hash: hash,
1399        duration_ms: duration_ms.max(0) as u64,
1400        steps,
1401        frame_vars,
1402        fitness_delta,
1403    })
1404}
1405
1406impl GraphStore for SqliteGraphStore {
1407    /// Persists the full node JSON under `id` via `INSERT OR REPLACE` (upsert).
1408    /// Backfill pattern: `read_node` → patch fields (e.g. episodic signals) → `write_node`, preserving loaded `edges`.
1409    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String> {
1410        persist_node(&self.conn, node)
1411    }
1412
1413    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String> {
1414        let payload: Option<String> = self
1415            .conn
1416            .query_row(
1417                "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
1418                [id.to_string()],
1419                |row| row.get::<_, String>(0),
1420            )
1421            .optional()
1422            .map_err(|e: rusqlite::Error| e.to_string())?;
1423
1424        match payload {
1425            Some(p) => {
1426                let node: AinlMemoryNode = serde_json::from_str(&p).map_err(|e| e.to_string())?;
1427                Ok(Some(node))
1428            }
1429            None => Ok(None),
1430        }
1431    }
1432
1433    fn query_episodes_since(
1434        &self,
1435        since_timestamp: i64,
1436        limit: usize,
1437    ) -> Result<Vec<AinlMemoryNode>, String> {
1438        let mut stmt = self
1439            .conn
1440            .prepare(
1441                "SELECT payload FROM ainl_graph_nodes
1442                 WHERE node_type = 'episode' AND timestamp >= ?1
1443                 ORDER BY timestamp DESC
1444                 LIMIT ?2",
1445            )
1446            .map_err(|e| e.to_string())?;
1447
1448        let rows = stmt
1449            .query_map([since_timestamp, limit as i64], |row| {
1450                let payload: String = row.get(0)?;
1451                Ok(payload)
1452            })
1453            .map_err(|e| e.to_string())?;
1454
1455        let mut nodes = Vec::new();
1456        for row in rows {
1457            let payload = row.map_err(|e| e.to_string())?;
1458            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
1459            nodes.push(node);
1460        }
1461
1462        Ok(nodes)
1463    }
1464
1465    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String> {
1466        let mut stmt = self
1467            .conn
1468            .prepare(
1469                "SELECT payload FROM ainl_graph_nodes
1470                 WHERE node_type = ?1
1471                 ORDER BY timestamp DESC",
1472            )
1473            .map_err(|e| e.to_string())?;
1474
1475        let rows = stmt
1476            .query_map([type_name], |row| {
1477                let payload: String = row.get(0)?;
1478                Ok(payload)
1479            })
1480            .map_err(|e| e.to_string())?;
1481
1482        let mut nodes = Vec::new();
1483        for row in rows {
1484            let payload = row.map_err(|e| e.to_string())?;
1485            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
1486            nodes.push(node);
1487        }
1488
1489        Ok(nodes)
1490    }
1491
1492    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
1493        let mut stmt = self
1494            .conn
1495            .prepare(
1496                "SELECT to_id FROM ainl_graph_edges
1497                 WHERE from_id = ?1 AND label = ?2",
1498            )
1499            .map_err(|e| e.to_string())?;
1500
1501        let target_ids: Vec<String> = stmt
1502            .query_map([from_id.to_string(), label.to_string()], |row| row.get(0))
1503            .map_err(|e| e.to_string())?
1504            .collect::<Result<Vec<_>, _>>()
1505            .map_err(|e| e.to_string())?;
1506
1507        let mut nodes = Vec::new();
1508        for target_id in target_ids {
1509            let id = Uuid::parse_str(&target_id).map_err(|e| e.to_string())?;
1510            if let Some(node) = self.read_node(id)? {
1511                nodes.push(node);
1512            }
1513        }
1514
1515        Ok(nodes)
1516    }
1517}