Skip to main content

ainl_memory/
store.rs

1//! Graph storage backends for AINL memory.
2//!
3//! Defines the [`GraphStore`] trait and the SQLite implementation.
4//!
5//! ## Referential integrity (SQLite)
6//!
7//! `ainl_graph_edges` uses real `FOREIGN KEY (from_id)` / `FOREIGN KEY (to_id)` references to
8//! `ainl_graph_nodes(id)` with `ON DELETE CASCADE`. [`SqliteGraphStore::open`] and
9//! [`SqliteGraphStore::from_connection`] run `PRAGMA foreign_keys = ON` on the handle.
10//!
11//! Databases created before these constraints used a plain edges table; [`SqliteGraphStore::ensure_schema`]
12//! runs a one-time `migrate_edges_add_foreign_keys` rebuild. Edge rows whose endpoints
13//! are missing from `ainl_graph_nodes` **cannot** be kept under FK rules and are **omitted** from
14//! the migrated copy.
15//!
16//! ## Above the database (still recommended)
17//!
18//! - **Eager checks**: [`SqliteGraphStore::write_node_with_edges`], [`SqliteGraphStore::insert_graph_edge_checked`]
19//!   give clear errors without relying on SQLite error text alone.
20//! - **Repair / forensic import**: [`SqliteGraphStore::import_graph`] with `allow_dangling_edges: true`
21//!   is the **supported** way to load snapshots that violate referential integrity: FK enforcement is
22//!   disabled only for the duration of that import, then turned back on. Follow with
23//!   [`SqliteGraphStore::validate_graph`] before resuming normal writes on the same connection.
24//! - **Semantic graph checks**: [`SqliteGraphStore::validate_graph`] (agent-scoped edges, dangling
25//!   diagnostics, cross-agent boundary counts, etc.) — orthogonal to FK row existence.
26//!
27//! SQLite tables integrate with existing armaraos-memory schema where applicable.
28
29use crate::node::{AinlMemoryNode, AinlNodeType, MemoryCategory, RuntimeStateNode};
30use crate::snapshot::{
31    AgentGraphSnapshot, DanglingEdgeDetail, GraphValidationReport, SnapshotEdge,
32    SNAPSHOT_SCHEMA_VERSION,
33};
34use crate::trajectory_table::TrajectoryDetailRecord;
35use ainl_contracts::{TrajectoryOutcome, TrajectoryStep};
36use chrono::Utc;
37use rusqlite::OptionalExtension;
38use std::collections::HashSet;
39use uuid::Uuid;
40
41/// Typed failures for snapshot import.
42#[derive(Debug, Clone)]
43pub enum SnapshotImportError {
44    UnsupportedSchemaVersion { got: String, expected: &'static str },
45    Sqlite(String),
46}
47
48impl std::fmt::Display for SnapshotImportError {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            Self::UnsupportedSchemaVersion { got, expected } => write!(
52                f,
53                "unsupported snapshot schema_version '{got}'; expected '{expected}'"
54            ),
55            Self::Sqlite(e) => write!(f, "{e}"),
56        }
57    }
58}
59
60/// Typed failures for graph validation.
61#[derive(Debug, Clone)]
62pub enum GraphValidationError {
63    Sqlite(String),
64}
65
66impl std::fmt::Display for GraphValidationError {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            Self::Sqlite(e) => write!(f, "{e}"),
70        }
71    }
72}
73
74/// Graph memory storage trait - swappable backends
75pub trait GraphStore {
76    /// Write a node to storage
77    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String>;
78
79    /// Read a node by ID
80    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String>;
81
82    /// Query episodes since a given timestamp
83    fn query_episodes_since(
84        &self,
85        since_timestamp: i64,
86        limit: usize,
87    ) -> Result<Vec<AinlMemoryNode>, String>;
88
89    /// Find nodes by type
90    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String>;
91
92    /// Walk edges from a node with a given label
93    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
94
95    /// Walk edges TO a node — reverse traversal by to_id
96    fn walk_edges_to(&self, to_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
97}
98
99/// SQLite implementation of GraphStore
100///
101/// Integrates with existing armaraos-memory schema by adding two tables:
102/// - `ainl_graph_nodes`: stores node payloads
103/// - `ainl_graph_edges`: stores graph edges
104pub struct SqliteGraphStore {
105    conn: rusqlite::Connection,
106}
107
108fn enable_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
109    conn.execute_batch("PRAGMA foreign_keys = ON;")
110}
111
112fn migrate_edge_columns(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
113    let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_edges)")?;
114    let cols = stmt
115        .query_map([], |row| row.get::<_, String>(1))?
116        .collect::<Result<Vec<_>, _>>()?;
117    if !cols.iter().any(|c| c == "weight") {
118        conn.execute(
119            "ALTER TABLE ainl_graph_edges ADD COLUMN weight REAL NOT NULL DEFAULT 1.0",
120            [],
121        )?;
122    }
123    if !cols.iter().any(|c| c == "metadata") {
124        conn.execute("ALTER TABLE ainl_graph_edges ADD COLUMN metadata TEXT", [])?;
125    }
126    Ok(())
127}
128
129/// True when `ainl_graph_edges` declares at least one foreign-key reference (new schema).
130fn edges_table_has_foreign_keys(conn: &rusqlite::Connection) -> Result<bool, rusqlite::Error> {
131    let exists: i64 = conn.query_row(
132        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
133        [],
134        |r| r.get(0),
135    )?;
136    if exists == 0 {
137        return Ok(false);
138    }
139    let n: i64 = conn.query_row(
140        "SELECT COUNT(*) FROM pragma_foreign_key_list('ainl_graph_edges')",
141        [],
142        |r| r.get(0),
143    )?;
144    Ok(n > 0)
145}
146
147/// Rebuild `ainl_graph_edges` with `FOREIGN KEY` constraints. Rows whose endpoints are missing
148/// from `ainl_graph_nodes` are **dropped** (they cannot be represented under FK rules).
149fn migrate_edges_add_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
150    if edges_table_has_foreign_keys(conn)? {
151        return Ok(());
152    }
153
154    let exists: i64 = conn.query_row(
155        "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
156        [],
157        |r| r.get(0),
158    )?;
159    if exists == 0 {
160        return Ok(());
161    }
162
163    conn.execute("BEGIN IMMEDIATE", [])?;
164    let res: Result<(), rusqlite::Error> = (|| {
165        conn.execute("DROP INDEX IF EXISTS idx_ainl_edges_from", [])?;
166        conn.execute(
167            "ALTER TABLE ainl_graph_edges RENAME TO ainl_graph_edges__old",
168            [],
169        )?;
170        conn.execute(
171            r#"CREATE TABLE ainl_graph_edges (
172                from_id TEXT NOT NULL,
173                to_id TEXT NOT NULL,
174                label TEXT NOT NULL,
175                weight REAL NOT NULL DEFAULT 1.0,
176                metadata TEXT,
177                PRIMARY KEY (from_id, to_id, label),
178                FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
179                FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
180            )"#,
181            [],
182        )?;
183        conn.execute(
184            r#"INSERT INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
185               SELECT o.from_id, o.to_id, o.label,
186                      COALESCE(o.weight, 1.0),
187                      o.metadata
188               FROM ainl_graph_edges__old o
189               WHERE EXISTS (SELECT 1 FROM ainl_graph_nodes n WHERE n.id = o.from_id)
190                 AND EXISTS (SELECT 1 FROM ainl_graph_nodes n2 WHERE n2.id = o.to_id)"#,
191            [],
192        )?;
193        conn.execute("DROP TABLE ainl_graph_edges__old", [])?;
194        conn.execute(
195            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from ON ainl_graph_edges(from_id, label)",
196            [],
197        )?;
198        Ok(())
199    })();
200
201    match res {
202        Ok(()) => {
203            conn.execute("COMMIT", [])?;
204        }
205        Err(e) => {
206            let _ = conn.execute("ROLLBACK", []);
207            return Err(e);
208        }
209    }
210    Ok(())
211}
212
213fn node_type_name(node: &AinlMemoryNode) -> &'static str {
214    match &node.node_type {
215        AinlNodeType::Episode { .. } => "episode",
216        AinlNodeType::Semantic { .. } => "semantic",
217        AinlNodeType::Procedural { .. } => "procedural",
218        AinlNodeType::Persona { .. } => "persona",
219        AinlNodeType::RuntimeState { .. } => "runtime_state",
220        AinlNodeType::Trajectory { .. } => "trajectory",
221        AinlNodeType::Failure { .. } => "failure",
222        AinlNodeType::Mission { .. } => "mission",
223        AinlNodeType::Feature { .. } => "feature",
224        AinlNodeType::Assertion { .. } => "assertion",
225        AinlNodeType::Handoff { .. } => "handoff",
226        AinlNodeType::Extension { .. } => "extension",
227    }
228}
229
230fn node_timestamp(node: &AinlMemoryNode) -> i64 {
231    match &node.node_type {
232        AinlNodeType::Episode { episodic } => episodic.timestamp,
233        AinlNodeType::RuntimeState { runtime_state } => runtime_state.updated_at,
234        AinlNodeType::Trajectory { trajectory } => trajectory.recorded_at,
235        AinlNodeType::Failure { failure } => failure.recorded_at,
236        AinlNodeType::Mission { mission } => mission.created_at.timestamp(),
237        _ => chrono::Utc::now().timestamp(),
238    }
239}
240
241fn failure_fts_body(node: &AinlMemoryNode) -> Option<String> {
242    match &node.node_type {
243        AinlNodeType::Failure { failure } => Some(format!(
244            "{} {} {} {} {}",
245            failure.source,
246            failure.tool_name.as_deref().unwrap_or(""),
247            failure.source_namespace.as_deref().unwrap_or(""),
248            failure.source_tool.as_deref().unwrap_or(""),
249            failure.message
250        )),
251        _ => None,
252    }
253}
254
255/// Mission substrate nodes: index human-readable fields instead of full JSON blobs.
256fn mission_substrate_fts_body(node: &AinlMemoryNode) -> Option<String> {
257    match &node.node_type {
258        AinlNodeType::Mission { mission } => Some(format!(
259            "{} {} {:?} {:?}",
260            mission.mission_id.as_str(),
261            mission.objective_md,
262            mission.state,
263            mission.milestone_ids,
264        )),
265        AinlNodeType::Feature { feature } => Some(format!(
266            "{} {} {:?} {} {:?} {:?}",
267            feature.feature_id.as_str(),
268            feature.description,
269            feature.status,
270            feature.skill_name.as_deref().unwrap_or(""),
271            feature.touches_files,
272            feature.preconditions.iter().map(|p| p.as_str()).collect::<Vec<_>>(),
273        )),
274        AinlNodeType::Handoff { handoff } => Some(format!(
275            "{} {} {} {} {} {:?}",
276            handoff.feature_id.as_str(),
277            handoff.agent_id,
278            handoff.salient_summary,
279            handoff.what_was_implemented_md,
280            handoff.what_was_left_undone_md,
281            handoff.discovered_issues
282                .iter()
283                .map(|i| i.description.as_str())
284                .collect::<Vec<_>>(),
285        )),
286        AinlNodeType::Assertion { assertion } => Some(format!(
287            "{} {} {:?} {:?}",
288            assertion.assertion_id.as_str(),
289            assertion.description,
290            assertion.state,
291            assertion.verification_steps,
292        )),
293        _ => None,
294    }
295}
296
297/// Token-prefix AND query for FTS5 `body MATCH` (returns empty → skip search).
298fn fts5_prefix_match_query(raw: &str) -> String {
299    raw.split_whitespace()
300        .filter(|t| !t.is_empty())
301        .filter_map(|t| {
302            let esc: String = t.chars().filter(|c| !c.is_control() && *c != '"').collect();
303            if esc.is_empty() {
304                return None;
305            }
306            Some(format!("\"{esc}*\""))
307        })
308        .collect::<Vec<_>>()
309        .join(" AND ")
310}
311
312fn sync_failure_fts_insert(
313    conn: &rusqlite::Connection,
314    node_id: &str,
315    body: &str,
316) -> Result<(), String> {
317    conn.execute(
318        "DELETE FROM ainl_failures_fts WHERE node_id = ?1",
319        [node_id],
320    )
321    .map_err(|e| e.to_string())?;
322    conn.execute(
323        "INSERT INTO ainl_failures_fts(node_id, body) VALUES (?1, ?2)",
324        rusqlite::params![node_id, body],
325    )
326    .map_err(|e| e.to_string())?;
327    Ok(())
328}
329
330/// Full JSON payload (all node kinds) for `ainl_nodes_fts` — generic graph search.
331fn graph_node_fts_body_from_payload_json(payload: &str) -> String {
332    if payload.chars().count() > 400_000 {
333        payload.chars().take(400_000).collect()
334    } else {
335        payload.to_string()
336    }
337}
338
339fn sync_all_nodes_fts_insert(
340    conn: &rusqlite::Connection,
341    node_id: &str,
342    agent_id: &str,
343    project_id: Option<&str>,
344    body: &str,
345) -> Result<(), String> {
346    let proj = project_id.map(str::trim).filter(|s| !s.is_empty());
347    conn.execute("DELETE FROM ainl_nodes_fts WHERE node_id = ?1", [node_id])
348        .map_err(|e| e.to_string())?;
349    conn.execute(
350        "INSERT INTO ainl_nodes_fts(node_id, agent_id, project_id, body) VALUES (?1, ?2, ?3, ?4)",
351        rusqlite::params![node_id, agent_id, proj, body],
352    )
353    .map_err(|e| e.to_string())?;
354    Ok(())
355}
356
357fn persist_edge(
358    conn: &rusqlite::Connection,
359    from_id: Uuid,
360    to_id: Uuid,
361    label: &str,
362    weight: f32,
363    metadata: Option<&str>,
364) -> Result<(), String> {
365    conn.execute(
366        "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
367         VALUES (?1, ?2, ?3, ?4, ?5)",
368        rusqlite::params![
369            from_id.to_string(),
370            to_id.to_string(),
371            label,
372            weight,
373            metadata
374        ],
375    )
376    .map_err(|e| e.to_string())?;
377    Ok(())
378}
379
380/// All `ainl_graph_edges` rows whose endpoints are both present in `id_set`, as [`SnapshotEdge`] values.
381fn collect_snapshot_edges_for_id_set(
382    conn: &rusqlite::Connection,
383    id_set: &HashSet<String>,
384) -> Result<Vec<SnapshotEdge>, String> {
385    let mut edge_stmt = conn
386        .prepare("SELECT from_id, to_id, label, weight, metadata FROM ainl_graph_edges")
387        .map_err(|e| e.to_string())?;
388    let edge_rows = edge_stmt
389        .query_map([], |row| {
390            Ok((
391                row.get::<_, String>(0)?,
392                row.get::<_, String>(1)?,
393                row.get::<_, String>(2)?,
394                row.get::<_, f64>(3)?,
395                row.get::<_, Option<String>>(4)?,
396            ))
397        })
398        .map_err(|e| e.to_string())?
399        .collect::<Result<Vec<_>, _>>()
400        .map_err(|e| e.to_string())?;
401
402    let mut edges = Vec::new();
403    for (from_id, to_id, label, weight, meta) in edge_rows {
404        if !id_set.contains(&from_id) || !id_set.contains(&to_id) {
405            continue;
406        }
407        let source_id = Uuid::parse_str(&from_id).map_err(|e| e.to_string())?;
408        let target_id = Uuid::parse_str(&to_id).map_err(|e| e.to_string())?;
409        let metadata = match meta {
410            Some(s) if !s.is_empty() => Some(serde_json::from_str(&s).map_err(|e| e.to_string())?),
411            _ => None,
412        };
413        edges.push(SnapshotEdge {
414            source_id,
415            target_id,
416            edge_type: label,
417            weight: weight as f32,
418            metadata,
419        });
420    }
421    Ok(edges)
422}
423
424fn persist_node(conn: &rusqlite::Connection, node: &AinlMemoryNode) -> Result<(), String> {
425    let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
426    let type_name = node_type_name(node);
427    let timestamp = node_timestamp(node);
428    let proj = node
429        .project_id
430        .as_deref()
431        .map(str::trim)
432        .filter(|s| !s.is_empty());
433    let mission_id = node.mission_id_key();
434    let feature_id = node.feature_id_key();
435
436    conn.execute(
437        "INSERT OR REPLACE INTO ainl_graph_nodes (id, node_type, payload, timestamp, project_id, mission_id, feature_id)
438         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
439        rusqlite::params![
440            node.id.to_string(),
441            type_name,
442            payload,
443            timestamp,
444            proj,
445            mission_id,
446            feature_id,
447        ],
448    )
449    .map_err(|e| e.to_string())?;
450
451    for edge in &node.edges {
452        persist_edge(
453            conn,
454            node.id,
455            edge.target_id,
456            &edge.label,
457            1.0,
458            None::<&str>,
459        )?;
460    }
461
462    let body_all = mission_substrate_fts_body(node)
463        .unwrap_or_else(|| graph_node_fts_body_from_payload_json(&payload));
464    if !node.agent_id.trim().is_empty() {
465        let _ = sync_all_nodes_fts_insert(
466            conn,
467            &node.id.to_string(),
468            node.agent_id.as_str(),
469            proj,
470            &body_all,
471        );
472    }
473
474    if let Some(body) = failure_fts_body(node) {
475        // Best-effort: graph row is authoritative; FTS is auxiliary for search.
476        let _ = sync_failure_fts_insert(conn, &node.id.to_string(), &body);
477    }
478
479    Ok(())
480}
481
482fn try_insert_node_ignore(
483    conn: &rusqlite::Connection,
484    node: &AinlMemoryNode,
485) -> Result<(), String> {
486    let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
487    let type_name = node_type_name(node);
488    let timestamp = node_timestamp(node);
489    let proj = node
490        .project_id
491        .as_deref()
492        .map(str::trim)
493        .filter(|s| !s.is_empty());
494    let mission_id = node.mission_id_key();
495    let feature_id = node.feature_id_key();
496    let n = conn
497        .execute(
498            "INSERT OR IGNORE INTO ainl_graph_nodes (id, node_type, payload, timestamp, project_id, mission_id, feature_id)
499         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
500            rusqlite::params![
501                node.id.to_string(),
502                type_name,
503                payload,
504                timestamp,
505                proj,
506                mission_id,
507                feature_id,
508            ],
509        )
510        .map_err(|e| e.to_string())?;
511    if n > 0 {
512        if !node.agent_id.trim().is_empty() {
513            let body_all = mission_substrate_fts_body(node)
514                .unwrap_or_else(|| graph_node_fts_body_from_payload_json(&payload));
515            let _ = sync_all_nodes_fts_insert(
516                conn,
517                &node.id.to_string(),
518                node.agent_id.as_str(),
519                proj,
520                &body_all,
521            );
522        }
523        if let Some(body) = failure_fts_body(node) {
524            let _ = sync_failure_fts_insert(conn, &node.id.to_string(), &body);
525        }
526    }
527    Ok(())
528}
529
530fn try_insert_edge_ignore(conn: &rusqlite::Connection, edge: &SnapshotEdge) -> Result<(), String> {
531    let meta = match &edge.metadata {
532        Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
533        None => None,
534    };
535    conn.execute(
536        "INSERT OR IGNORE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
537         VALUES (?1, ?2, ?3, ?4, ?5)",
538        rusqlite::params![
539            edge.source_id.to_string(),
540            edge.target_id.to_string(),
541            edge.edge_type,
542            edge.weight,
543            meta.as_deref(),
544        ],
545    )
546    .map_err(|e| e.to_string())?;
547    Ok(())
548}
549
550fn migrate_failures_fts_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
551    conn.execute(
552        "CREATE VIRTUAL TABLE IF NOT EXISTS ainl_failures_fts USING fts5(
553            node_id UNINDEXED,
554            body,
555            tokenize = 'unicode61 remove_diacritics 1'
556        )",
557        [],
558    )?;
559    Ok(())
560}
561
562fn migrate_ainl_graph_nodes_add_mission_columns(
563    conn: &rusqlite::Connection,
564) -> Result<(), rusqlite::Error> {
565    let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_nodes)")?;
566    let cols = stmt
567        .query_map([], |row| row.get::<_, String>(1))?
568        .collect::<Result<Vec<_>, _>>()?;
569    if !cols.iter().any(|c| c == "mission_id") {
570        conn.execute(
571            "ALTER TABLE ainl_graph_nodes ADD COLUMN mission_id TEXT",
572            [],
573        )?;
574    }
575    if !cols.iter().any(|c| c == "feature_id") {
576        conn.execute(
577            "ALTER TABLE ainl_graph_nodes ADD COLUMN feature_id TEXT",
578            [],
579        )?;
580    }
581    conn.execute(
582        "CREATE INDEX IF NOT EXISTS idx_nodes_mission_feature
583         ON ainl_graph_nodes(mission_id, feature_id, node_type, timestamp DESC)",
584        [],
585    )?;
586    Ok(())
587}
588
589fn migrate_ainl_graph_nodes_add_project_id(
590    conn: &rusqlite::Connection,
591) -> Result<(), rusqlite::Error> {
592    let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_nodes)")?;
593    let cols = stmt
594        .query_map([], |row| row.get::<_, String>(1))?
595        .collect::<Result<Vec<_>, _>>()?;
596    if !cols.iter().any(|c| c == "project_id") {
597        conn.execute(
598            "ALTER TABLE ainl_graph_nodes ADD COLUMN project_id TEXT",
599            [],
600        )?;
601    }
602    Ok(())
603}
604
605fn migrate_ainl_nodes_fts_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
606    conn.execute(
607        "CREATE VIRTUAL TABLE IF NOT EXISTS ainl_nodes_fts USING fts5(
608            node_id UNINDEXED,
609            agent_id UNINDEXED,
610            project_id UNINDEXED,
611            body,
612            tokenize = 'unicode61 remove_diacritics 1'
613        )",
614        [],
615    )?;
616    Ok(())
617}
618
619/// Remove FTS shadow rows when the primary graph row is deleted (`DELETE` from `ainl_graph_nodes`).
620fn install_ainl_graph_node_delete_fts_triggers(
621    conn: &rusqlite::Connection,
622) -> Result<(), rusqlite::Error> {
623    conn.execute_batch(
624        "DROP TRIGGER IF EXISTS ainl_graph_nodes_after_delete_fts;
625         CREATE TRIGGER ainl_graph_nodes_after_delete_fts
626         AFTER DELETE ON ainl_graph_nodes
627         FOR EACH ROW
628         BEGIN
629           DELETE FROM ainl_nodes_fts WHERE node_id = OLD.id;
630           DELETE FROM ainl_failures_fts WHERE node_id = OLD.id;
631         END;",
632    )?;
633    Ok(())
634}
635
636/// One-time: populate `ainl_nodes_fts` from existing `ainl_graph_nodes` (legacy DBs).
637fn backfill_ainl_nodes_fts_if_empty(conn: &rusqlite::Connection) -> Result<(), String> {
638    let fts_n: i64 = conn
639        .query_row("SELECT COUNT(*) FROM ainl_nodes_fts", [], |r| r.get(0))
640        .unwrap_or(0);
641    if fts_n > 0 {
642        return Ok(());
643    }
644    let mut stmt = conn
645        .prepare(
646            "SELECT id, payload, project_id FROM ainl_graph_nodes
647             WHERE TRIM(COALESCE(json_extract(payload, '$.agent_id'), '')) != ''",
648        )
649        .map_err(|e| e.to_string())?;
650    let rows = stmt
651        .query_map([], |row| {
652            Ok((
653                row.get::<_, String>(0)?,
654                row.get::<_, String>(1)?,
655                row.get::<_, Option<String>>(2)?,
656            ))
657        })
658        .map_err(|e| e.to_string())?
659        .collect::<Result<Vec<_>, _>>()
660        .map_err(|e| e.to_string())?;
661    for (id, payload, col_proj) in rows {
662        let v: serde_json::Value = match serde_json::from_str(&payload) {
663            Ok(x) => x,
664            Err(_) => continue,
665        };
666        let ag = v
667            .get("agent_id")
668            .and_then(|x| x.as_str())
669            .map(str::trim)
670            .unwrap_or("");
671        if ag.is_empty() {
672            continue;
673        }
674        let json_proj = v
675            .get("project_id")
676            .and_then(|x| x.as_str())
677            .map(str::trim)
678            .filter(|s| !s.is_empty());
679        let proj = col_proj
680            .as_deref()
681            .map(str::trim)
682            .filter(|s| !s.is_empty())
683            .or(json_proj);
684        let body = graph_node_fts_body_from_payload_json(&payload);
685        if sync_all_nodes_fts_insert(conn, &id, ag, proj, &body).is_err() {
686            // Best-effort backfill: continue with remaining rows.
687        }
688    }
689    Ok(())
690}
691
692fn migrate_trajectories_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
693    conn.execute(
694        "CREATE TABLE IF NOT EXISTS ainl_trajectories (
695            id TEXT PRIMARY KEY,
696            episode_id TEXT NOT NULL,
697            graph_trajectory_node_id TEXT,
698            agent_id TEXT NOT NULL,
699            session_id TEXT NOT NULL,
700            project_id TEXT,
701            recorded_at INTEGER NOT NULL,
702            outcome_json TEXT NOT NULL,
703            ainl_source_hash TEXT,
704            duration_ms INTEGER NOT NULL DEFAULT 0,
705            steps_json TEXT NOT NULL,
706            FOREIGN KEY (episode_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
707        )",
708        [],
709    )?;
710    conn.execute(
711        "CREATE INDEX IF NOT EXISTS idx_ainl_traj_agent_time
712         ON ainl_trajectories(agent_id, recorded_at DESC)",
713        [],
714    )?;
715    Ok(())
716}
717
718fn migrate_ainl_trajectories_add_depth_v1(
719    conn: &rusqlite::Connection,
720) -> Result<(), rusqlite::Error> {
721    let mut stmt = conn.prepare("PRAGMA table_info(ainl_trajectories)")?;
722    let cols: Vec<String> = stmt
723        .query_map([], |row| row.get::<_, String>(1))?
724        .collect::<Result<Vec<_>, _>>()?;
725    if !cols.iter().any(|c| c == "frame_vars_json") {
726        conn.execute(
727            "ALTER TABLE ainl_trajectories ADD COLUMN frame_vars_json TEXT",
728            [],
729        )?;
730    }
731    if !cols.iter().any(|c| c == "fitness_delta") {
732        conn.execute(
733            "ALTER TABLE ainl_trajectories ADD COLUMN fitness_delta REAL",
734            [],
735        )?;
736    }
737    Ok(())
738}
739
740impl SqliteGraphStore {
741    /// Ensure the AINL graph schema exists in the database
742    pub fn ensure_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
743        conn.execute(
744            "CREATE TABLE IF NOT EXISTS ainl_graph_nodes (
745                id TEXT PRIMARY KEY,
746                node_type TEXT NOT NULL,
747                payload TEXT NOT NULL,
748                timestamp INTEGER NOT NULL
749            )",
750            [],
751        )?;
752
753        conn.execute(
754            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_timestamp
755             ON ainl_graph_nodes(timestamp DESC)",
756            [],
757        )?;
758
759        conn.execute(
760            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_type
761             ON ainl_graph_nodes(node_type)",
762            [],
763        )?;
764
765        migrate_ainl_graph_nodes_add_project_id(conn)?;
766        migrate_ainl_graph_nodes_add_mission_columns(conn)?;
767        conn.execute(
768            "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_project_type_time
769             ON ainl_graph_nodes(project_id, node_type, timestamp)",
770            [],
771        )?;
772
773        conn.execute(
774            "CREATE TABLE IF NOT EXISTS ainl_graph_edges (
775                from_id TEXT NOT NULL,
776                to_id TEXT NOT NULL,
777                label TEXT NOT NULL,
778                weight REAL NOT NULL DEFAULT 1.0,
779                metadata TEXT,
780                PRIMARY KEY (from_id, to_id, label),
781                FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
782                FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
783            )",
784            [],
785        )?;
786
787        conn.execute(
788            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from
789             ON ainl_graph_edges(from_id, label)",
790            [],
791        )?;
792
793        conn.execute(
794            "CREATE INDEX IF NOT EXISTS idx_ainl_edges_to
795             ON ainl_graph_edges(to_id, label)",
796            [],
797        )?;
798
799        migrate_edge_columns(conn)?;
800        migrate_edges_add_foreign_keys(conn)?;
801        migrate_trajectories_v1(conn)?;
802        migrate_ainl_trajectories_add_depth_v1(conn)?;
803        migrate_failures_fts_v1(conn)?;
804        migrate_ainl_nodes_fts_v1(conn)?;
805        if backfill_ainl_nodes_fts_if_empty(conn).is_err() {
806            // Non-fatal: new DBs may have empty graph; legacy rows can be re-synced on next write.
807        }
808        let _ = install_ainl_graph_node_delete_fts_triggers(conn);
809        Ok(())
810    }
811
812    /// Open/create a graph store at the given path
813    pub fn open(path: &std::path::Path) -> Result<Self, String> {
814        let conn = rusqlite::Connection::open(path).map_err(|e| e.to_string())?;
815        enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
816        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
817        Ok(Self { conn })
818    }
819
820    /// Create from an existing connection (for integration with armaraos-memory pool)
821    pub fn from_connection(conn: rusqlite::Connection) -> Result<Self, String> {
822        enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
823        Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
824        Ok(Self { conn })
825    }
826
827    /// Low-level access for query builders in this crate.
828    pub(crate) fn conn(&self) -> &rusqlite::Connection {
829        &self.conn
830    }
831
832    /// Insert a directed edge between two node IDs (separate from per-node edge payloads).
833    pub fn insert_graph_edge(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
834        persist_edge(&self.conn, from_id, to_id, label, 1.0, None)
835    }
836
837    /// Like [`Self::insert_graph_edge`], but verifies both endpoints exist first (clear errors for strict runtime wiring).
838    pub fn insert_graph_edge_checked(
839        &self,
840        from_id: Uuid,
841        to_id: Uuid,
842        label: &str,
843    ) -> Result<(), String> {
844        if !self.node_row_exists(&from_id.to_string())? {
845            return Err(format!(
846                "insert_graph_edge_checked: missing source node row {}",
847                from_id
848            ));
849        }
850        if !self.node_row_exists(&to_id.to_string())? {
851            return Err(format!(
852                "insert_graph_edge_checked: missing target node row {}",
853                to_id
854            ));
855        }
856        self.insert_graph_edge(from_id, to_id, label)
857    }
858
859    /// Same as [`Self::insert_graph_edge`], with optional edge weight and JSON metadata.
860    pub fn insert_graph_edge_with_meta(
861        &self,
862        from_id: Uuid,
863        to_id: Uuid,
864        label: &str,
865        weight: f32,
866        metadata: Option<&serde_json::Value>,
867    ) -> Result<(), String> {
868        let meta = metadata
869            .map(serde_json::to_string)
870            .transpose()
871            .map_err(|e| e.to_string())?;
872        persist_edge(&self.conn, from_id, to_id, label, weight, meta.as_deref())
873    }
874
875    /// Nodes of a given `node_type` with `timestamp >= since_timestamp`, most recent first.
876    pub fn query_nodes_by_type_since(
877        &self,
878        node_type: &str,
879        since_timestamp: i64,
880        limit: usize,
881    ) -> Result<Vec<AinlMemoryNode>, String> {
882        let mut stmt = self
883            .conn
884            .prepare(
885                "SELECT payload FROM ainl_graph_nodes
886                 WHERE node_type = ?1 AND timestamp >= ?2
887                 ORDER BY timestamp DESC
888                 LIMIT ?3",
889            )
890            .map_err(|e| e.to_string())?;
891
892        let rows = stmt
893            .query_map(
894                rusqlite::params![node_type, since_timestamp, limit as i64],
895                |row| {
896                    let payload: String = row.get(0)?;
897                    Ok(payload)
898                },
899            )
900            .map_err(|e| e.to_string())?;
901
902        let mut nodes = Vec::new();
903        for row in rows {
904            let payload = row.map_err(|e| e.to_string())?;
905            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
906            nodes.push(node);
907        }
908
909        Ok(nodes)
910    }
911
912    /// Read the most recent persisted [`RuntimeStateNode`] for `agent_id`, if any.
913    ///
914    /// Rows are stored with `node_type = 'runtime_state'` and JSON `$.node_type.runtime_state.agent_id` matching the agent.
915    pub fn read_runtime_state(&self, agent_id: &str) -> Result<Option<RuntimeStateNode>, String> {
916        if agent_id.is_empty() {
917            return Ok(None);
918        }
919        let mut stmt = self
920            .conn
921            .prepare(
922                "SELECT payload FROM ainl_graph_nodes
923                 WHERE node_type = 'runtime_state'
924                   AND json_extract(payload, '$.node_type.runtime_state.agent_id') = ?1
925                 ORDER BY timestamp DESC
926                 LIMIT 1",
927            )
928            .map_err(|e| e.to_string())?;
929
930        let payload_opt: Option<String> = stmt
931            .query_row([agent_id], |row| row.get(0))
932            .optional()
933            .map_err(|e| e.to_string())?;
934
935        let Some(payload) = payload_opt else {
936            return Ok(None);
937        };
938
939        let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
940        match node.node_type {
941            AinlNodeType::RuntimeState { runtime_state } => Ok(Some(runtime_state)),
942            _ => Err("runtime_state row had unexpected node_type payload".to_string()),
943        }
944    }
945
946    /// Upsert one [`RuntimeStateNode`] row per agent (stable id via [`Uuid::new_v5`]).
947    pub fn write_runtime_state(&self, state: &RuntimeStateNode) -> Result<(), String> {
948        let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, state.agent_id.as_bytes());
949        let node = AinlMemoryNode {
950            id,
951            memory_category: MemoryCategory::RuntimeState,
952            importance_score: 0.5,
953            agent_id: state.agent_id.clone(),
954            project_id: None,
955            node_type: AinlNodeType::RuntimeState {
956                runtime_state: state.clone(),
957            },
958            edges: Vec::new(),
959            plugin_data: None,
960        };
961        self.write_node(&node)
962    }
963
964    /// Write a node and its embedded edges in one transaction; fails if any edge target is missing.
965    pub fn write_node_with_edges(&mut self, node: &AinlMemoryNode) -> Result<(), String> {
966        let tx = self.conn.transaction().map_err(|e| e.to_string())?;
967        for edge in &node.edges {
968            let exists: Option<i32> = tx
969                .query_row(
970                    "SELECT 1 FROM ainl_graph_nodes WHERE id = ?1",
971                    [edge.target_id.to_string()],
972                    |_| Ok(1),
973                )
974                .optional()
975                .map_err(|e| e.to_string())?;
976            if exists.is_none() {
977                return Err(format!(
978                    "write_node_with_edges: missing target node {}",
979                    edge.target_id
980                ));
981            }
982        }
983        persist_node(&tx, node)?;
984        tx.commit().map_err(|e| e.to_string())?;
985        Ok(())
986    }
987
988    /// Validate structural integrity for one agent's induced subgraph.
989    pub fn validate_graph(&self, agent_id: &str) -> Result<GraphValidationReport, String> {
990        self.validate_graph_checked(agent_id)
991            .map_err(|e| e.to_string())
992    }
993
994    /// Typed validation variant for callers that need structured error handling.
995    pub fn validate_graph_checked(
996        &self,
997        agent_id: &str,
998    ) -> Result<GraphValidationReport, GraphValidationError> {
999        use std::collections::HashSet;
1000
1001        let agent_nodes = self
1002            .agent_node_ids(agent_id)
1003            .map_err(GraphValidationError::Sqlite)?;
1004        let node_count = agent_nodes.len();
1005
1006        let mut stmt = self
1007            .conn
1008            .prepare("SELECT from_id, to_id, label FROM ainl_graph_edges")
1009            .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?;
1010        let all_edges: Vec<(String, String, String)> = stmt
1011            .query_map([], |row| {
1012                Ok((
1013                    row.get::<_, String>(0)?,
1014                    row.get::<_, String>(1)?,
1015                    row.get::<_, String>(2)?,
1016                ))
1017            })
1018            .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?
1019            .collect::<Result<Vec<_>, _>>()
1020            .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?;
1021
1022        let mut edge_pairs = Vec::new();
1023        for (from_id, to_id, label) in all_edges {
1024            let touches_agent = agent_nodes.contains(&from_id) || agent_nodes.contains(&to_id);
1025            if touches_agent {
1026                edge_pairs.push((from_id, to_id, label));
1027            }
1028        }
1029
1030        let edge_count = edge_pairs.len();
1031        let mut dangling_edges = Vec::new();
1032        let mut dangling_edge_details = Vec::new();
1033        let mut cross_agent_boundary_edges = 0usize;
1034
1035        for (from_id, to_id, label) in &edge_pairs {
1036            let from_ok = self
1037                .node_row_exists(from_id)
1038                .map_err(GraphValidationError::Sqlite)?;
1039            let to_ok = self
1040                .node_row_exists(to_id)
1041                .map_err(GraphValidationError::Sqlite)?;
1042            if !from_ok || !to_ok {
1043                dangling_edges.push((from_id.clone(), to_id.clone()));
1044                dangling_edge_details.push(DanglingEdgeDetail {
1045                    source_id: from_id.clone(),
1046                    target_id: to_id.clone(),
1047                    edge_type: label.clone(),
1048                });
1049                continue;
1050            }
1051            let fa = agent_nodes.contains(from_id);
1052            let ta = agent_nodes.contains(to_id);
1053            if fa ^ ta {
1054                cross_agent_boundary_edges += 1;
1055            }
1056        }
1057
1058        let mut touched: HashSet<String> =
1059            HashSet::with_capacity(edge_pairs.len().saturating_mul(2));
1060        for (a, b, _) in &edge_pairs {
1061            if agent_nodes.contains(a) {
1062                touched.insert(a.clone());
1063            }
1064            if agent_nodes.contains(b) {
1065                touched.insert(b.clone());
1066            }
1067        }
1068
1069        let mut orphan_nodes = Vec::new();
1070        for id in &agent_nodes {
1071            if !touched.contains(id) {
1072                orphan_nodes.push(id.clone());
1073            }
1074        }
1075
1076        let is_valid = dangling_edges.is_empty();
1077        Ok(GraphValidationReport {
1078            agent_id: agent_id.to_string(),
1079            node_count,
1080            edge_count,
1081            dangling_edges,
1082            dangling_edge_details,
1083            cross_agent_boundary_edges,
1084            orphan_nodes,
1085            is_valid,
1086        })
1087    }
1088
1089    fn node_row_exists(&self, id: &str) -> Result<bool, String> {
1090        let v: Option<i32> = self
1091            .conn
1092            .query_row("SELECT 1 FROM ainl_graph_nodes WHERE id = ?1", [id], |_| {
1093                Ok(1)
1094            })
1095            .optional()
1096            .map_err(|e| e.to_string())?;
1097        Ok(v.is_some())
1098    }
1099
1100    fn agent_node_ids(&self, agent_id: &str) -> Result<HashSet<String>, String> {
1101        let mut stmt = self
1102            .conn
1103            .prepare(
1104                "SELECT id FROM ainl_graph_nodes
1105                 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
1106            )
1107            .map_err(|e| e.to_string())?;
1108        let ids = stmt
1109            .query_map([agent_id], |row| row.get::<_, String>(0))
1110            .map_err(|e| e.to_string())?
1111            .collect::<Result<HashSet<_>, _>>()
1112            .map_err(|e| e.to_string())?;
1113        Ok(ids)
1114    }
1115
1116    /// Directed edges where **both** endpoints are nodes owned by `agent_id` (aligned with [`Self::export_graph`] edge set).
1117    pub fn agent_subgraph_edges(&self, agent_id: &str) -> Result<Vec<SnapshotEdge>, String> {
1118        let id_set = self.agent_node_ids(agent_id)?;
1119        collect_snapshot_edges_for_id_set(&self.conn, &id_set)
1120    }
1121
1122    /// Export all nodes and interconnecting edges for `agent_id`.
1123    pub fn export_graph(&self, agent_id: &str) -> Result<AgentGraphSnapshot, String> {
1124        let mut stmt = self
1125            .conn
1126            .prepare(
1127                "SELECT payload FROM ainl_graph_nodes
1128                 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
1129            )
1130            .map_err(|e| e.to_string())?;
1131        let nodes: Vec<AinlMemoryNode> = stmt
1132            .query_map([agent_id], |row| {
1133                let payload: String = row.get(0)?;
1134                Ok(payload)
1135            })
1136            .map_err(|e| e.to_string())?
1137            .map(|r| {
1138                let payload = r.map_err(|e| e.to_string())?;
1139                serde_json::from_str(&payload).map_err(|e| e.to_string())
1140            })
1141            .collect::<Result<Vec<_>, _>>()?;
1142
1143        let id_set: std::collections::HashSet<String> =
1144            nodes.iter().map(|n| n.id.to_string()).collect();
1145
1146        let edges = collect_snapshot_edges_for_id_set(&self.conn, &id_set)?;
1147
1148        Ok(AgentGraphSnapshot {
1149            agent_id: agent_id.to_string(),
1150            exported_at: Utc::now(),
1151            schema_version: std::borrow::Cow::Borrowed(SNAPSHOT_SCHEMA_VERSION),
1152            nodes,
1153            edges,
1154        })
1155    }
1156
1157    /// Import a snapshot in one transaction (`INSERT OR IGNORE` per row).
1158    ///
1159    /// * `allow_dangling_edges == false` (**default / production**): `PRAGMA foreign_keys` stays
1160    ///   enabled; every edge must reference existing node rows after inserts (same invariants as
1161    ///   [`Self::write_node_with_edges`]).
1162    /// * `allow_dangling_edges == true` (**repair / forensic**): FK checks are disabled only for
1163    ///   this import so partially invalid snapshots can be loaded; run [`Self::validate_graph`]
1164    ///   afterward and repair before returning to normal writes.
1165    pub fn import_graph(
1166        &mut self,
1167        snapshot: &AgentGraphSnapshot,
1168        allow_dangling_edges: bool,
1169    ) -> Result<(), String> {
1170        self.import_graph_checked(snapshot, allow_dangling_edges)
1171            .map_err(|e| e.to_string())
1172    }
1173
1174    /// Typed import variant for callers that want structured error handling.
1175    pub fn import_graph_checked(
1176        &mut self,
1177        snapshot: &AgentGraphSnapshot,
1178        allow_dangling_edges: bool,
1179    ) -> Result<(), SnapshotImportError> {
1180        if snapshot.schema_version.as_ref() != SNAPSHOT_SCHEMA_VERSION {
1181            return Err(SnapshotImportError::UnsupportedSchemaVersion {
1182                got: snapshot.schema_version.to_string(),
1183                expected: SNAPSHOT_SCHEMA_VERSION,
1184            });
1185        }
1186
1187        if allow_dangling_edges {
1188            self.conn
1189                .execute_batch("PRAGMA foreign_keys = OFF;")
1190                .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1191        }
1192
1193        let result: Result<(), SnapshotImportError> = (|| {
1194            let tx = self
1195                .conn
1196                .transaction()
1197                .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1198            for node in &snapshot.nodes {
1199                try_insert_node_ignore(&tx, node).map_err(SnapshotImportError::Sqlite)?;
1200            }
1201            for edge in &snapshot.edges {
1202                try_insert_edge_ignore(&tx, edge).map_err(SnapshotImportError::Sqlite)?;
1203            }
1204            tx.commit()
1205                .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1206            Ok(())
1207        })();
1208
1209        if allow_dangling_edges {
1210            self.conn
1211                .execute_batch("PRAGMA foreign_keys = ON;")
1212                .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1213        }
1214
1215        result
1216    }
1217
1218    /// Large-step trajectory row (sibling table); episode row must exist first.
1219    pub fn insert_trajectory_detail(&self, row: &TrajectoryDetailRecord) -> Result<(), String> {
1220        let steps_json = serde_json::to_string(&row.steps).map_err(|e| e.to_string())?;
1221        let outcome_json = serde_json::to_string(&row.outcome).map_err(|e| e.to_string())?;
1222        let frame_s = match &row.frame_vars {
1223            None => None,
1224            Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
1225        };
1226        self.conn
1227            .execute(
1228                "INSERT OR REPLACE INTO ainl_trajectories (
1229                    id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1230                    recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1231                    frame_vars_json, fitness_delta
1232                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1233                rusqlite::params![
1234                    row.id.to_string(),
1235                    row.episode_id.to_string(),
1236                    row.graph_trajectory_node_id.map(|u| u.to_string()),
1237                    row.agent_id,
1238                    row.session_id,
1239                    row.project_id,
1240                    row.recorded_at,
1241                    outcome_json,
1242                    row.ainl_source_hash,
1243                    row.duration_ms as i64,
1244                    steps_json,
1245                    frame_s,
1246                    row.fitness_delta,
1247                ],
1248            )
1249            .map_err(|e| e.to_string())?;
1250        Ok(())
1251    }
1252
1253    /// Recent trajectory detail rows for an agent (newest first).
1254    pub fn list_trajectories_for_agent(
1255        &self,
1256        agent_id: &str,
1257        limit: usize,
1258        since_timestamp: Option<i64>,
1259    ) -> Result<Vec<TrajectoryDetailRecord>, String> {
1260        let cap = limit.clamp(1, 500) as i64;
1261        let sql = if since_timestamp.is_some() {
1262            "SELECT id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1263                    recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1264                    frame_vars_json, fitness_delta
1265             FROM ainl_trajectories
1266             WHERE agent_id = ?1 AND recorded_at >= ?2
1267             ORDER BY recorded_at DESC
1268             LIMIT ?3"
1269        } else {
1270            "SELECT id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1271                    recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1272                    frame_vars_json, fitness_delta
1273             FROM ainl_trajectories
1274             WHERE agent_id = ?1
1275             ORDER BY recorded_at DESC
1276             LIMIT ?2"
1277        };
1278
1279        let mut stmt = self.conn.prepare(sql).map_err(|e| e.to_string())?;
1280        let rows = if let Some(since) = since_timestamp {
1281            stmt.query_map(rusqlite::params![agent_id, since, cap], map_trajectory_row)
1282        } else {
1283            stmt.query_map(rusqlite::params![agent_id, cap], map_trajectory_row)
1284        }
1285        .map_err(|e| e.to_string())?;
1286
1287        let mut out = Vec::new();
1288        for row in rows {
1289            out.push(row.map_err(|e| e.to_string())?);
1290        }
1291        Ok(out)
1292    }
1293
1294    /// Count `ainl_trajectories` rows for `agent_id` with `recorded_at` **strictly before** `before_unix` (seconds).
1295    pub fn count_trajectory_details_before(
1296        &self,
1297        agent_id: &str,
1298        before_unix: i64,
1299    ) -> Result<usize, String> {
1300        if agent_id.trim().is_empty() {
1301            return Err("agent_id is empty".into());
1302        }
1303        let n: i64 = self
1304            .conn
1305            .query_row(
1306                "SELECT COUNT(*) FROM ainl_trajectories WHERE agent_id = ?1 AND recorded_at < ?2",
1307                rusqlite::params![agent_id, before_unix],
1308                |r| r.get(0),
1309            )
1310            .map_err(|e| e.to_string())?;
1311        Ok(n as usize)
1312    }
1313
1314    /// Delete `ainl_trajectories` detail rows for `agent_id` with `recorded_at` **strictly before**
1315    /// `before_unix` (seconds). Returns the number of rows removed.
1316    ///
1317    /// This does **not** delete `Trajectory` nodes from `ainl_graph_nodes` or any edges; use graph
1318    /// export / repair paths if you need a fully consistent graph after bulk pruning.
1319    pub fn delete_trajectory_details_before(
1320        &self,
1321        agent_id: &str,
1322        before_unix: i64,
1323    ) -> Result<usize, String> {
1324        if agent_id.trim().is_empty() {
1325            return Err("agent_id is empty".into());
1326        }
1327        let n = self
1328            .conn
1329            .execute(
1330                "DELETE FROM ainl_trajectories WHERE agent_id = ?1 AND recorded_at < ?2",
1331                rusqlite::params![agent_id, before_unix],
1332            )
1333            .map_err(|e| e.to_string())?;
1334        Ok(n)
1335    }
1336
1337    /// Full-text search over persisted failure nodes for one agent (`node_type = failure`).
1338    ///
1339    /// Returns matching [`AinlMemoryNode`] rows (newest first). Invalid FTS syntax yields an empty list.
1340    pub fn search_failures_fts_for_agent(
1341        &self,
1342        agent_id: &str,
1343        query: &str,
1344        limit: usize,
1345    ) -> Result<Vec<AinlMemoryNode>, String> {
1346        let fts_q = fts5_prefix_match_query(query);
1347        if fts_q.is_empty() || agent_id.trim().is_empty() {
1348            return Ok(Vec::new());
1349        }
1350        let cap = limit.clamp(1, 200) as i64;
1351        let mut stmt = self
1352            .conn
1353            .prepare(
1354                "SELECT n.payload
1355                 FROM ainl_failures_fts AS f
1356                 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1357                 WHERE n.node_type = 'failure'
1358                   AND json_extract(n.payload, '$.agent_id') = ?1
1359                   AND f.body MATCH ?2
1360                 ORDER BY n.timestamp DESC
1361                 LIMIT ?3",
1362            )
1363            .map_err(|e| e.to_string())?;
1364
1365        let rows = stmt.query_map(rusqlite::params![agent_id, fts_q, cap], |row| {
1366            let payload: String = row.get(0)?;
1367            Ok(payload)
1368        });
1369
1370        let mut out = Vec::new();
1371        let rows = match rows {
1372            Ok(r) => r,
1373            Err(e) => {
1374                let msg = e.to_string();
1375                if msg.contains("fts5") || msg.to_ascii_lowercase().contains("syntax") {
1376                    return Ok(Vec::new());
1377                }
1378                return Err(msg);
1379            }
1380        };
1381        for row in rows {
1382            match row {
1383                Ok(payload) => {
1384                    if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1385                        out.push(node);
1386                    }
1387                }
1388                Err(e) => {
1389                    let msg = e.to_string();
1390                    if msg.contains("fts5") || msg.to_ascii_lowercase().contains("syntax") {
1391                        return Ok(Vec::new());
1392                    }
1393                    return Err(msg);
1394                }
1395            }
1396        }
1397        Ok(out)
1398    }
1399
1400    /// Full-text search over all graph node JSON payloads (see `ainl_nodes_fts`), scoped by agent.
1401    ///
1402    /// `project_id`: when `Some` (non-empty), return matches whose stored project is empty/NULL
1403    /// **or** equal to that id (so legacy unscoped rows remain visible in a project workspace).
1404    /// When `None`, do not filter by project.
1405    pub fn search_all_nodes_fts_for_agent(
1406        &self,
1407        agent_id: &str,
1408        query: &str,
1409        project_id: Option<&str>,
1410        limit: usize,
1411    ) -> Result<Vec<AinlMemoryNode>, String> {
1412        let fts_q = fts5_prefix_match_query(query);
1413        if fts_q.is_empty() || agent_id.trim().is_empty() {
1414            return Ok(Vec::new());
1415        }
1416        let cap = limit.clamp(1, 200) as i64;
1417        let project_filter = project_id.map(str::trim).filter(|s| !s.is_empty());
1418        let mut out = Vec::new();
1419        if let Some(p) = project_filter {
1420            let mut stmt = self
1421                .conn
1422                .prepare(
1423                    "SELECT n.payload
1424                 FROM ainl_nodes_fts AS f
1425                 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1426                 WHERE f.agent_id = ?1
1427                   AND (COALESCE(f.project_id, '') = '' OR f.project_id = ?3)
1428                   AND f.body MATCH ?2
1429                 ORDER BY n.timestamp DESC
1430                 LIMIT ?4",
1431                )
1432                .map_err(|e| e.to_string())?;
1433            let mut rows = stmt
1434                .query(rusqlite::params![agent_id, fts_q, p, cap])
1435                .map_err(|e| e.to_string())?;
1436            while let Some(row) = rows.next().map_err(|e| e.to_string())? {
1437                let payload: String = row.get(0).map_err(|e| e.to_string())?;
1438                if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1439                    out.push(node);
1440                }
1441            }
1442        } else {
1443            let mut stmt = self
1444                .conn
1445                .prepare(
1446                    "SELECT n.payload
1447                 FROM ainl_nodes_fts AS f
1448                 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1449                 WHERE f.agent_id = ?1
1450                   AND f.body MATCH ?2
1451                 ORDER BY n.timestamp DESC
1452                 LIMIT ?3",
1453                )
1454                .map_err(|e| e.to_string())?;
1455            let mut rows = stmt
1456                .query(rusqlite::params![agent_id, fts_q, cap])
1457                .map_err(|e| e.to_string())?;
1458            while let Some(row) = rows.next().map_err(|e| e.to_string())? {
1459                let payload: String = row.get(0).map_err(|e| e.to_string())?;
1460                if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1461                    out.push(node);
1462                }
1463            }
1464        }
1465        Ok(out)
1466    }
1467}
1468
1469fn map_trajectory_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<TrajectoryDetailRecord> {
1470    let id_s: String = row.get(0)?;
1471    let episode_s: String = row.get(1)?;
1472    let graph_traj: Option<String> = row.get(2)?;
1473    let agent_id: String = row.get(3)?;
1474    let session_id: String = row.get(4)?;
1475    let project_id: Option<String> = row.get(5)?;
1476    let recorded_at: i64 = row.get(6)?;
1477    let outcome_json: String = row.get(7)?;
1478    let hash: Option<String> = row.get(8)?;
1479    let duration_ms: i64 = row.get(9)?;
1480    let steps_json: String = row.get(10)?;
1481    let frame_vars_json: Option<String> = row.get(11)?;
1482    let fitness_sql: Option<f64> = row.get(12)?;
1483    let id = Uuid::parse_str(&id_s).map_err(|_| {
1484        rusqlite::Error::InvalidColumnType(0, "id".into(), rusqlite::types::Type::Text)
1485    })?;
1486    let episode_id = Uuid::parse_str(&episode_s).map_err(|_| {
1487        rusqlite::Error::InvalidColumnType(1, "episode_id".into(), rusqlite::types::Type::Text)
1488    })?;
1489    let graph_trajectory_node_id = graph_traj
1490        .filter(|s| !s.is_empty())
1491        .map(|s| Uuid::parse_str(&s))
1492        .transpose()
1493        .map_err(|_| {
1494            rusqlite::Error::InvalidColumnType(
1495                2,
1496                "graph_trajectory_node_id".into(),
1497                rusqlite::types::Type::Text,
1498            )
1499        })?;
1500    let outcome: TrajectoryOutcome = serde_json::from_str(&outcome_json).map_err(|_| {
1501        rusqlite::Error::InvalidColumnType(7, "outcome_json".into(), rusqlite::types::Type::Text)
1502    })?;
1503    let steps: Vec<TrajectoryStep> = serde_json::from_str(&steps_json).map_err(|_| {
1504        rusqlite::Error::InvalidColumnType(10, "steps_json".into(), rusqlite::types::Type::Text)
1505    })?;
1506    let frame_vars = frame_vars_json
1507        .filter(|s| !s.trim().is_empty())
1508        .and_then(|s| serde_json::from_str(&s).ok());
1509    let fitness_delta = fitness_sql.map(|f| f as f32);
1510    Ok(TrajectoryDetailRecord {
1511        id,
1512        episode_id,
1513        graph_trajectory_node_id,
1514        agent_id,
1515        session_id,
1516        project_id,
1517        recorded_at,
1518        outcome,
1519        ainl_source_hash: hash,
1520        duration_ms: duration_ms.max(0) as u64,
1521        steps,
1522        frame_vars,
1523        fitness_delta,
1524    })
1525}
1526
1527impl GraphStore for SqliteGraphStore {
1528    /// Persists the full node JSON under `id` via `INSERT OR REPLACE` (upsert).
1529    /// Backfill pattern: `read_node` → patch fields (e.g. episodic signals) → `write_node`, preserving loaded `edges`.
1530    fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String> {
1531        persist_node(&self.conn, node)
1532    }
1533
1534    fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String> {
1535        let payload: Option<String> = self
1536            .conn
1537            .query_row(
1538                "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
1539                [id.to_string()],
1540                |row| row.get::<_, String>(0),
1541            )
1542            .optional()
1543            .map_err(|e: rusqlite::Error| e.to_string())?;
1544
1545        match payload {
1546            Some(p) => {
1547                let node: AinlMemoryNode = serde_json::from_str(&p).map_err(|e| e.to_string())?;
1548                Ok(Some(node))
1549            }
1550            None => Ok(None),
1551        }
1552    }
1553
1554    fn query_episodes_since(
1555        &self,
1556        since_timestamp: i64,
1557        limit: usize,
1558    ) -> Result<Vec<AinlMemoryNode>, String> {
1559        let mut stmt = self
1560            .conn
1561            .prepare(
1562                "SELECT payload FROM ainl_graph_nodes
1563                 WHERE node_type = 'episode' AND timestamp >= ?1
1564                 ORDER BY timestamp DESC
1565                 LIMIT ?2",
1566            )
1567            .map_err(|e| e.to_string())?;
1568
1569        let rows = stmt
1570            .query_map([since_timestamp, limit as i64], |row| {
1571                let payload: String = row.get(0)?;
1572                Ok(payload)
1573            })
1574            .map_err(|e| e.to_string())?;
1575
1576        let mut nodes = Vec::new();
1577        for row in rows {
1578            let payload = row.map_err(|e| e.to_string())?;
1579            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
1580            nodes.push(node);
1581        }
1582
1583        Ok(nodes)
1584    }
1585
1586    fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String> {
1587        let mut stmt = self
1588            .conn
1589            .prepare(
1590                "SELECT payload FROM ainl_graph_nodes
1591                 WHERE node_type = ?1
1592                 ORDER BY timestamp DESC",
1593            )
1594            .map_err(|e| e.to_string())?;
1595
1596        let rows = stmt
1597            .query_map([type_name], |row| {
1598                let payload: String = row.get(0)?;
1599                Ok(payload)
1600            })
1601            .map_err(|e| e.to_string())?;
1602
1603        let mut nodes = Vec::new();
1604        for row in rows {
1605            let payload = row.map_err(|e| e.to_string())?;
1606            let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
1607            nodes.push(node);
1608        }
1609
1610        Ok(nodes)
1611    }
1612
1613    fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
1614        let mut stmt = self
1615            .conn
1616            .prepare(
1617                "SELECT to_id FROM ainl_graph_edges
1618                 WHERE from_id = ?1 AND label = ?2",
1619            )
1620            .map_err(|e| e.to_string())?;
1621
1622        let target_ids: Vec<String> = stmt
1623            .query_map([from_id.to_string(), label.to_string()], |row| row.get(0))
1624            .map_err(|e| e.to_string())?
1625            .collect::<Result<Vec<_>, _>>()
1626            .map_err(|e| e.to_string())?;
1627
1628        let mut nodes = Vec::new();
1629        for target_id in target_ids {
1630            let id = Uuid::parse_str(&target_id).map_err(|e| e.to_string())?;
1631            if let Some(node) = self.read_node(id)? {
1632                nodes.push(node);
1633            }
1634        }
1635
1636        Ok(nodes)
1637    }
1638
1639    fn walk_edges_to(&self, to_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
1640        let mut stmt = self
1641            .conn
1642            .prepare(
1643                "SELECT from_id FROM ainl_graph_edges
1644                 WHERE to_id = ?1 AND label = ?2",
1645            )
1646            .map_err(|e| e.to_string())?;
1647
1648        let source_ids: Vec<String> = stmt
1649            .query_map([to_id.to_string(), label.to_string()], |row| row.get(0))
1650            .map_err(|e| e.to_string())?
1651            .collect::<Result<Vec<_>, _>>()
1652            .map_err(|e| e.to_string())?;
1653
1654        let mut nodes = Vec::new();
1655        for source_id in source_ids {
1656            let id = Uuid::parse_str(&source_id).map_err(|e| e.to_string())?;
1657            if let Some(node) = self.read_node(id)? {
1658                nodes.push(node);
1659            }
1660        }
1661
1662        Ok(nodes)
1663    }
1664}
1665
1666#[cfg(test)]
1667mod schema_tests {
1668    use super::SqliteGraphStore;
1669
1670    #[test]
1671    fn ensure_schema_idempotent() {
1672        let dir = tempfile::tempdir().expect("tempdir");
1673        let path = dir.path().join("ainl_schema_idempotent.db");
1674        drop(SqliteGraphStore::open(&path).expect("open 1"));
1675        drop(SqliteGraphStore::open(&path).expect("open 2"));
1676    }
1677}