1use crate::node::{AinlMemoryNode, AinlNodeType, MemoryCategory, RuntimeStateNode};
30use crate::snapshot::{
31 AgentGraphSnapshot, DanglingEdgeDetail, GraphValidationReport, SnapshotEdge,
32 SNAPSHOT_SCHEMA_VERSION,
33};
34use crate::trajectory_table::TrajectoryDetailRecord;
35use ainl_contracts::{TrajectoryOutcome, TrajectoryStep};
36use chrono::Utc;
37use rusqlite::OptionalExtension;
38use std::collections::HashSet;
39use uuid::Uuid;
40
41#[derive(Debug, Clone)]
43pub enum SnapshotImportError {
44 UnsupportedSchemaVersion { got: String, expected: &'static str },
45 Sqlite(String),
46}
47
48impl std::fmt::Display for SnapshotImportError {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 Self::UnsupportedSchemaVersion { got, expected } => write!(
52 f,
53 "unsupported snapshot schema_version '{got}'; expected '{expected}'"
54 ),
55 Self::Sqlite(e) => write!(f, "{e}"),
56 }
57 }
58}
59
60#[derive(Debug, Clone)]
62pub enum GraphValidationError {
63 Sqlite(String),
64}
65
66impl std::fmt::Display for GraphValidationError {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 Self::Sqlite(e) => write!(f, "{e}"),
70 }
71 }
72}
73
74pub trait GraphStore {
76 fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String>;
78
79 fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String>;
81
82 fn query_episodes_since(
84 &self,
85 since_timestamp: i64,
86 limit: usize,
87 ) -> Result<Vec<AinlMemoryNode>, String>;
88
89 fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String>;
91
92 fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
94
95 fn walk_edges_to(&self, to_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
97}
98
99pub struct SqliteGraphStore {
105 conn: rusqlite::Connection,
106}
107
108fn enable_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
109 conn.execute_batch("PRAGMA foreign_keys = ON;")
110}
111
112fn migrate_edge_columns(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
113 let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_edges)")?;
114 let cols = stmt
115 .query_map([], |row| row.get::<_, String>(1))?
116 .collect::<Result<Vec<_>, _>>()?;
117 if !cols.iter().any(|c| c == "weight") {
118 conn.execute(
119 "ALTER TABLE ainl_graph_edges ADD COLUMN weight REAL NOT NULL DEFAULT 1.0",
120 [],
121 )?;
122 }
123 if !cols.iter().any(|c| c == "metadata") {
124 conn.execute("ALTER TABLE ainl_graph_edges ADD COLUMN metadata TEXT", [])?;
125 }
126 Ok(())
127}
128
129fn edges_table_has_foreign_keys(conn: &rusqlite::Connection) -> Result<bool, rusqlite::Error> {
131 let exists: i64 = conn.query_row(
132 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
133 [],
134 |r| r.get(0),
135 )?;
136 if exists == 0 {
137 return Ok(false);
138 }
139 let n: i64 = conn.query_row(
140 "SELECT COUNT(*) FROM pragma_foreign_key_list('ainl_graph_edges')",
141 [],
142 |r| r.get(0),
143 )?;
144 Ok(n > 0)
145}
146
147fn migrate_edges_add_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
150 if edges_table_has_foreign_keys(conn)? {
151 return Ok(());
152 }
153
154 let exists: i64 = conn.query_row(
155 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
156 [],
157 |r| r.get(0),
158 )?;
159 if exists == 0 {
160 return Ok(());
161 }
162
163 conn.execute("BEGIN IMMEDIATE", [])?;
164 let res: Result<(), rusqlite::Error> = (|| {
165 conn.execute("DROP INDEX IF EXISTS idx_ainl_edges_from", [])?;
166 conn.execute(
167 "ALTER TABLE ainl_graph_edges RENAME TO ainl_graph_edges__old",
168 [],
169 )?;
170 conn.execute(
171 r#"CREATE TABLE ainl_graph_edges (
172 from_id TEXT NOT NULL,
173 to_id TEXT NOT NULL,
174 label TEXT NOT NULL,
175 weight REAL NOT NULL DEFAULT 1.0,
176 metadata TEXT,
177 PRIMARY KEY (from_id, to_id, label),
178 FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
179 FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
180 )"#,
181 [],
182 )?;
183 conn.execute(
184 r#"INSERT INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
185 SELECT o.from_id, o.to_id, o.label,
186 COALESCE(o.weight, 1.0),
187 o.metadata
188 FROM ainl_graph_edges__old o
189 WHERE EXISTS (SELECT 1 FROM ainl_graph_nodes n WHERE n.id = o.from_id)
190 AND EXISTS (SELECT 1 FROM ainl_graph_nodes n2 WHERE n2.id = o.to_id)"#,
191 [],
192 )?;
193 conn.execute("DROP TABLE ainl_graph_edges__old", [])?;
194 conn.execute(
195 "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from ON ainl_graph_edges(from_id, label)",
196 [],
197 )?;
198 Ok(())
199 })();
200
201 match res {
202 Ok(()) => {
203 conn.execute("COMMIT", [])?;
204 }
205 Err(e) => {
206 let _ = conn.execute("ROLLBACK", []);
207 return Err(e);
208 }
209 }
210 Ok(())
211}
212
213fn node_type_name(node: &AinlMemoryNode) -> &'static str {
214 match &node.node_type {
215 AinlNodeType::Episode { .. } => "episode",
216 AinlNodeType::Semantic { .. } => "semantic",
217 AinlNodeType::Procedural { .. } => "procedural",
218 AinlNodeType::Persona { .. } => "persona",
219 AinlNodeType::RuntimeState { .. } => "runtime_state",
220 AinlNodeType::Trajectory { .. } => "trajectory",
221 AinlNodeType::Failure { .. } => "failure",
222 }
223}
224
225fn node_timestamp(node: &AinlMemoryNode) -> i64 {
226 match &node.node_type {
227 AinlNodeType::Episode { episodic } => episodic.timestamp,
228 AinlNodeType::RuntimeState { runtime_state } => runtime_state.updated_at,
229 AinlNodeType::Trajectory { trajectory } => trajectory.recorded_at,
230 AinlNodeType::Failure { failure } => failure.recorded_at,
231 _ => chrono::Utc::now().timestamp(),
232 }
233}
234
235fn failure_fts_body(node: &AinlMemoryNode) -> Option<String> {
236 match &node.node_type {
237 AinlNodeType::Failure { failure } => Some(format!(
238 "{} {} {} {} {}",
239 failure.source,
240 failure.tool_name.as_deref().unwrap_or(""),
241 failure.source_namespace.as_deref().unwrap_or(""),
242 failure.source_tool.as_deref().unwrap_or(""),
243 failure.message
244 )),
245 _ => None,
246 }
247}
248
249fn fts5_prefix_match_query(raw: &str) -> String {
251 raw.split_whitespace()
252 .filter(|t| !t.is_empty())
253 .filter_map(|t| {
254 let esc: String = t.chars().filter(|c| !c.is_control() && *c != '"').collect();
255 if esc.is_empty() {
256 return None;
257 }
258 Some(format!("\"{esc}*\""))
259 })
260 .collect::<Vec<_>>()
261 .join(" AND ")
262}
263
264fn sync_failure_fts_insert(
265 conn: &rusqlite::Connection,
266 node_id: &str,
267 body: &str,
268) -> Result<(), String> {
269 conn.execute(
270 "DELETE FROM ainl_failures_fts WHERE node_id = ?1",
271 [node_id],
272 )
273 .map_err(|e| e.to_string())?;
274 conn.execute(
275 "INSERT INTO ainl_failures_fts(node_id, body) VALUES (?1, ?2)",
276 rusqlite::params![node_id, body],
277 )
278 .map_err(|e| e.to_string())?;
279 Ok(())
280}
281
282fn graph_node_fts_body_from_payload_json(payload: &str) -> String {
284 if payload.chars().count() > 400_000 {
285 payload.chars().take(400_000).collect()
286 } else {
287 payload.to_string()
288 }
289}
290
291fn sync_all_nodes_fts_insert(
292 conn: &rusqlite::Connection,
293 node_id: &str,
294 agent_id: &str,
295 project_id: Option<&str>,
296 body: &str,
297) -> Result<(), String> {
298 let proj = project_id.map(str::trim).filter(|s| !s.is_empty());
299 conn.execute("DELETE FROM ainl_nodes_fts WHERE node_id = ?1", [node_id])
300 .map_err(|e| e.to_string())?;
301 conn.execute(
302 "INSERT INTO ainl_nodes_fts(node_id, agent_id, project_id, body) VALUES (?1, ?2, ?3, ?4)",
303 rusqlite::params![node_id, agent_id, proj, body],
304 )
305 .map_err(|e| e.to_string())?;
306 Ok(())
307}
308
309fn persist_edge(
310 conn: &rusqlite::Connection,
311 from_id: Uuid,
312 to_id: Uuid,
313 label: &str,
314 weight: f32,
315 metadata: Option<&str>,
316) -> Result<(), String> {
317 conn.execute(
318 "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
319 VALUES (?1, ?2, ?3, ?4, ?5)",
320 rusqlite::params![
321 from_id.to_string(),
322 to_id.to_string(),
323 label,
324 weight,
325 metadata
326 ],
327 )
328 .map_err(|e| e.to_string())?;
329 Ok(())
330}
331
332fn collect_snapshot_edges_for_id_set(
334 conn: &rusqlite::Connection,
335 id_set: &HashSet<String>,
336) -> Result<Vec<SnapshotEdge>, String> {
337 let mut edge_stmt = conn
338 .prepare("SELECT from_id, to_id, label, weight, metadata FROM ainl_graph_edges")
339 .map_err(|e| e.to_string())?;
340 let edge_rows = edge_stmt
341 .query_map([], |row| {
342 Ok((
343 row.get::<_, String>(0)?,
344 row.get::<_, String>(1)?,
345 row.get::<_, String>(2)?,
346 row.get::<_, f64>(3)?,
347 row.get::<_, Option<String>>(4)?,
348 ))
349 })
350 .map_err(|e| e.to_string())?
351 .collect::<Result<Vec<_>, _>>()
352 .map_err(|e| e.to_string())?;
353
354 let mut edges = Vec::new();
355 for (from_id, to_id, label, weight, meta) in edge_rows {
356 if !id_set.contains(&from_id) || !id_set.contains(&to_id) {
357 continue;
358 }
359 let source_id = Uuid::parse_str(&from_id).map_err(|e| e.to_string())?;
360 let target_id = Uuid::parse_str(&to_id).map_err(|e| e.to_string())?;
361 let metadata = match meta {
362 Some(s) if !s.is_empty() => Some(serde_json::from_str(&s).map_err(|e| e.to_string())?),
363 _ => None,
364 };
365 edges.push(SnapshotEdge {
366 source_id,
367 target_id,
368 edge_type: label,
369 weight: weight as f32,
370 metadata,
371 });
372 }
373 Ok(edges)
374}
375
376fn persist_node(conn: &rusqlite::Connection, node: &AinlMemoryNode) -> Result<(), String> {
377 let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
378 let type_name = node_type_name(node);
379 let timestamp = node_timestamp(node);
380 let proj = node
381 .project_id
382 .as_deref()
383 .map(str::trim)
384 .filter(|s| !s.is_empty());
385
386 conn.execute(
387 "INSERT OR REPLACE INTO ainl_graph_nodes (id, node_type, payload, timestamp, project_id)
388 VALUES (?1, ?2, ?3, ?4, ?5)",
389 rusqlite::params![node.id.to_string(), type_name, payload, timestamp, proj,],
390 )
391 .map_err(|e| e.to_string())?;
392
393 for edge in &node.edges {
394 persist_edge(
395 conn,
396 node.id,
397 edge.target_id,
398 &edge.label,
399 1.0,
400 None::<&str>,
401 )?;
402 }
403
404 let body_all = graph_node_fts_body_from_payload_json(&payload);
405 if !node.agent_id.trim().is_empty() {
406 let _ = sync_all_nodes_fts_insert(
407 conn,
408 &node.id.to_string(),
409 node.agent_id.as_str(),
410 proj,
411 &body_all,
412 );
413 }
414
415 if let Some(body) = failure_fts_body(node) {
416 let _ = sync_failure_fts_insert(conn, &node.id.to_string(), &body);
418 }
419
420 Ok(())
421}
422
423fn try_insert_node_ignore(
424 conn: &rusqlite::Connection,
425 node: &AinlMemoryNode,
426) -> Result<(), String> {
427 let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
428 let type_name = node_type_name(node);
429 let timestamp = node_timestamp(node);
430 let proj = node
431 .project_id
432 .as_deref()
433 .map(str::trim)
434 .filter(|s| !s.is_empty());
435 let n = conn
436 .execute(
437 "INSERT OR IGNORE INTO ainl_graph_nodes (id, node_type, payload, timestamp, project_id)
438 VALUES (?1, ?2, ?3, ?4, ?5)",
439 rusqlite::params![node.id.to_string(), type_name, payload, timestamp, proj,],
440 )
441 .map_err(|e| e.to_string())?;
442 if n > 0 {
443 if !node.agent_id.trim().is_empty() {
444 let body_all = graph_node_fts_body_from_payload_json(&payload);
445 let _ = sync_all_nodes_fts_insert(
446 conn,
447 &node.id.to_string(),
448 node.agent_id.as_str(),
449 proj,
450 &body_all,
451 );
452 }
453 if let Some(body) = failure_fts_body(node) {
454 let _ = sync_failure_fts_insert(conn, &node.id.to_string(), &body);
455 }
456 }
457 Ok(())
458}
459
460fn try_insert_edge_ignore(conn: &rusqlite::Connection, edge: &SnapshotEdge) -> Result<(), String> {
461 let meta = match &edge.metadata {
462 Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
463 None => None,
464 };
465 conn.execute(
466 "INSERT OR IGNORE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
467 VALUES (?1, ?2, ?3, ?4, ?5)",
468 rusqlite::params![
469 edge.source_id.to_string(),
470 edge.target_id.to_string(),
471 edge.edge_type,
472 edge.weight,
473 meta.as_deref(),
474 ],
475 )
476 .map_err(|e| e.to_string())?;
477 Ok(())
478}
479
480fn migrate_failures_fts_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
481 conn.execute(
482 "CREATE VIRTUAL TABLE IF NOT EXISTS ainl_failures_fts USING fts5(
483 node_id UNINDEXED,
484 body,
485 tokenize = 'unicode61 remove_diacritics 1'
486 )",
487 [],
488 )?;
489 Ok(())
490}
491
492fn migrate_ainl_graph_nodes_add_project_id(
493 conn: &rusqlite::Connection,
494) -> Result<(), rusqlite::Error> {
495 let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_nodes)")?;
496 let cols = stmt
497 .query_map([], |row| row.get::<_, String>(1))?
498 .collect::<Result<Vec<_>, _>>()?;
499 if !cols.iter().any(|c| c == "project_id") {
500 conn.execute(
501 "ALTER TABLE ainl_graph_nodes ADD COLUMN project_id TEXT",
502 [],
503 )?;
504 }
505 Ok(())
506}
507
508fn migrate_ainl_nodes_fts_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
509 conn.execute(
510 "CREATE VIRTUAL TABLE IF NOT EXISTS ainl_nodes_fts USING fts5(
511 node_id UNINDEXED,
512 agent_id UNINDEXED,
513 project_id UNINDEXED,
514 body,
515 tokenize = 'unicode61 remove_diacritics 1'
516 )",
517 [],
518 )?;
519 Ok(())
520}
521
522fn install_ainl_graph_node_delete_fts_triggers(
524 conn: &rusqlite::Connection,
525) -> Result<(), rusqlite::Error> {
526 conn.execute_batch(
527 "DROP TRIGGER IF EXISTS ainl_graph_nodes_after_delete_fts;
528 CREATE TRIGGER ainl_graph_nodes_after_delete_fts
529 AFTER DELETE ON ainl_graph_nodes
530 FOR EACH ROW
531 BEGIN
532 DELETE FROM ainl_nodes_fts WHERE node_id = OLD.id;
533 DELETE FROM ainl_failures_fts WHERE node_id = OLD.id;
534 END;",
535 )?;
536 Ok(())
537}
538
539fn backfill_ainl_nodes_fts_if_empty(conn: &rusqlite::Connection) -> Result<(), String> {
541 let fts_n: i64 = conn
542 .query_row("SELECT COUNT(*) FROM ainl_nodes_fts", [], |r| r.get(0))
543 .unwrap_or(0);
544 if fts_n > 0 {
545 return Ok(());
546 }
547 let mut stmt = conn
548 .prepare(
549 "SELECT id, payload, project_id FROM ainl_graph_nodes
550 WHERE TRIM(COALESCE(json_extract(payload, '$.agent_id'), '')) != ''",
551 )
552 .map_err(|e| e.to_string())?;
553 let rows = stmt
554 .query_map([], |row| {
555 Ok((
556 row.get::<_, String>(0)?,
557 row.get::<_, String>(1)?,
558 row.get::<_, Option<String>>(2)?,
559 ))
560 })
561 .map_err(|e| e.to_string())?
562 .collect::<Result<Vec<_>, _>>()
563 .map_err(|e| e.to_string())?;
564 for (id, payload, col_proj) in rows {
565 let v: serde_json::Value = match serde_json::from_str(&payload) {
566 Ok(x) => x,
567 Err(_) => continue,
568 };
569 let ag = v
570 .get("agent_id")
571 .and_then(|x| x.as_str())
572 .map(str::trim)
573 .unwrap_or("");
574 if ag.is_empty() {
575 continue;
576 }
577 let json_proj = v
578 .get("project_id")
579 .and_then(|x| x.as_str())
580 .map(str::trim)
581 .filter(|s| !s.is_empty());
582 let proj = col_proj
583 .as_deref()
584 .map(str::trim)
585 .filter(|s| !s.is_empty())
586 .or(json_proj);
587 let body = graph_node_fts_body_from_payload_json(&payload);
588 if sync_all_nodes_fts_insert(conn, &id, ag, proj, &body).is_err() {
589 }
591 }
592 Ok(())
593}
594
595fn migrate_trajectories_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
596 conn.execute(
597 "CREATE TABLE IF NOT EXISTS ainl_trajectories (
598 id TEXT PRIMARY KEY,
599 episode_id TEXT NOT NULL,
600 graph_trajectory_node_id TEXT,
601 agent_id TEXT NOT NULL,
602 session_id TEXT NOT NULL,
603 project_id TEXT,
604 recorded_at INTEGER NOT NULL,
605 outcome_json TEXT NOT NULL,
606 ainl_source_hash TEXT,
607 duration_ms INTEGER NOT NULL DEFAULT 0,
608 steps_json TEXT NOT NULL,
609 FOREIGN KEY (episode_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
610 )",
611 [],
612 )?;
613 conn.execute(
614 "CREATE INDEX IF NOT EXISTS idx_ainl_traj_agent_time
615 ON ainl_trajectories(agent_id, recorded_at DESC)",
616 [],
617 )?;
618 Ok(())
619}
620
621fn migrate_ainl_trajectories_add_depth_v1(
622 conn: &rusqlite::Connection,
623) -> Result<(), rusqlite::Error> {
624 let mut stmt = conn.prepare("PRAGMA table_info(ainl_trajectories)")?;
625 let cols: Vec<String> = stmt
626 .query_map([], |row| row.get::<_, String>(1))?
627 .collect::<Result<Vec<_>, _>>()?;
628 if !cols.iter().any(|c| c == "frame_vars_json") {
629 conn.execute(
630 "ALTER TABLE ainl_trajectories ADD COLUMN frame_vars_json TEXT",
631 [],
632 )?;
633 }
634 if !cols.iter().any(|c| c == "fitness_delta") {
635 conn.execute(
636 "ALTER TABLE ainl_trajectories ADD COLUMN fitness_delta REAL",
637 [],
638 )?;
639 }
640 Ok(())
641}
642
643impl SqliteGraphStore {
644 pub fn ensure_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
646 conn.execute(
647 "CREATE TABLE IF NOT EXISTS ainl_graph_nodes (
648 id TEXT PRIMARY KEY,
649 node_type TEXT NOT NULL,
650 payload TEXT NOT NULL,
651 timestamp INTEGER NOT NULL
652 )",
653 [],
654 )?;
655
656 conn.execute(
657 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_timestamp
658 ON ainl_graph_nodes(timestamp DESC)",
659 [],
660 )?;
661
662 conn.execute(
663 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_type
664 ON ainl_graph_nodes(node_type)",
665 [],
666 )?;
667
668 migrate_ainl_graph_nodes_add_project_id(conn)?;
669 conn.execute(
670 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_project_type_time
671 ON ainl_graph_nodes(project_id, node_type, timestamp)",
672 [],
673 )?;
674
675 conn.execute(
676 "CREATE TABLE IF NOT EXISTS ainl_graph_edges (
677 from_id TEXT NOT NULL,
678 to_id TEXT NOT NULL,
679 label TEXT NOT NULL,
680 weight REAL NOT NULL DEFAULT 1.0,
681 metadata TEXT,
682 PRIMARY KEY (from_id, to_id, label),
683 FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
684 FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
685 )",
686 [],
687 )?;
688
689 conn.execute(
690 "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from
691 ON ainl_graph_edges(from_id, label)",
692 [],
693 )?;
694
695 conn.execute(
696 "CREATE INDEX IF NOT EXISTS idx_ainl_edges_to
697 ON ainl_graph_edges(to_id, label)",
698 [],
699 )?;
700
701 migrate_edge_columns(conn)?;
702 migrate_edges_add_foreign_keys(conn)?;
703 migrate_trajectories_v1(conn)?;
704 migrate_ainl_trajectories_add_depth_v1(conn)?;
705 migrate_failures_fts_v1(conn)?;
706 migrate_ainl_nodes_fts_v1(conn)?;
707 if backfill_ainl_nodes_fts_if_empty(conn).is_err() {
708 }
710 let _ = install_ainl_graph_node_delete_fts_triggers(conn);
711 Ok(())
712 }
713
714 pub fn open(path: &std::path::Path) -> Result<Self, String> {
716 let conn = rusqlite::Connection::open(path).map_err(|e| e.to_string())?;
717 enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
718 Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
719 Ok(Self { conn })
720 }
721
722 pub fn from_connection(conn: rusqlite::Connection) -> Result<Self, String> {
724 enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
725 Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
726 Ok(Self { conn })
727 }
728
729 pub(crate) fn conn(&self) -> &rusqlite::Connection {
731 &self.conn
732 }
733
734 pub fn insert_graph_edge(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
736 persist_edge(&self.conn, from_id, to_id, label, 1.0, None)
737 }
738
739 pub fn insert_graph_edge_checked(
741 &self,
742 from_id: Uuid,
743 to_id: Uuid,
744 label: &str,
745 ) -> Result<(), String> {
746 if !self.node_row_exists(&from_id.to_string())? {
747 return Err(format!(
748 "insert_graph_edge_checked: missing source node row {}",
749 from_id
750 ));
751 }
752 if !self.node_row_exists(&to_id.to_string())? {
753 return Err(format!(
754 "insert_graph_edge_checked: missing target node row {}",
755 to_id
756 ));
757 }
758 self.insert_graph_edge(from_id, to_id, label)
759 }
760
761 pub fn insert_graph_edge_with_meta(
763 &self,
764 from_id: Uuid,
765 to_id: Uuid,
766 label: &str,
767 weight: f32,
768 metadata: Option<&serde_json::Value>,
769 ) -> Result<(), String> {
770 let meta = metadata
771 .map(serde_json::to_string)
772 .transpose()
773 .map_err(|e| e.to_string())?;
774 persist_edge(&self.conn, from_id, to_id, label, weight, meta.as_deref())
775 }
776
777 pub fn query_nodes_by_type_since(
779 &self,
780 node_type: &str,
781 since_timestamp: i64,
782 limit: usize,
783 ) -> Result<Vec<AinlMemoryNode>, String> {
784 let mut stmt = self
785 .conn
786 .prepare(
787 "SELECT payload FROM ainl_graph_nodes
788 WHERE node_type = ?1 AND timestamp >= ?2
789 ORDER BY timestamp DESC
790 LIMIT ?3",
791 )
792 .map_err(|e| e.to_string())?;
793
794 let rows = stmt
795 .query_map(
796 rusqlite::params![node_type, since_timestamp, limit as i64],
797 |row| {
798 let payload: String = row.get(0)?;
799 Ok(payload)
800 },
801 )
802 .map_err(|e| e.to_string())?;
803
804 let mut nodes = Vec::new();
805 for row in rows {
806 let payload = row.map_err(|e| e.to_string())?;
807 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
808 nodes.push(node);
809 }
810
811 Ok(nodes)
812 }
813
814 pub fn read_runtime_state(&self, agent_id: &str) -> Result<Option<RuntimeStateNode>, String> {
818 if agent_id.is_empty() {
819 return Ok(None);
820 }
821 let mut stmt = self
822 .conn
823 .prepare(
824 "SELECT payload FROM ainl_graph_nodes
825 WHERE node_type = 'runtime_state'
826 AND json_extract(payload, '$.node_type.runtime_state.agent_id') = ?1
827 ORDER BY timestamp DESC
828 LIMIT 1",
829 )
830 .map_err(|e| e.to_string())?;
831
832 let payload_opt: Option<String> = stmt
833 .query_row([agent_id], |row| row.get(0))
834 .optional()
835 .map_err(|e| e.to_string())?;
836
837 let Some(payload) = payload_opt else {
838 return Ok(None);
839 };
840
841 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
842 match node.node_type {
843 AinlNodeType::RuntimeState { runtime_state } => Ok(Some(runtime_state)),
844 _ => Err("runtime_state row had unexpected node_type payload".to_string()),
845 }
846 }
847
848 pub fn write_runtime_state(&self, state: &RuntimeStateNode) -> Result<(), String> {
850 let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, state.agent_id.as_bytes());
851 let node = AinlMemoryNode {
852 id,
853 memory_category: MemoryCategory::RuntimeState,
854 importance_score: 0.5,
855 agent_id: state.agent_id.clone(),
856 project_id: None,
857 node_type: AinlNodeType::RuntimeState {
858 runtime_state: state.clone(),
859 },
860 edges: Vec::new(),
861 plugin_data: None,
862 };
863 self.write_node(&node)
864 }
865
866 pub fn write_node_with_edges(&mut self, node: &AinlMemoryNode) -> Result<(), String> {
868 let tx = self.conn.transaction().map_err(|e| e.to_string())?;
869 for edge in &node.edges {
870 let exists: Option<i32> = tx
871 .query_row(
872 "SELECT 1 FROM ainl_graph_nodes WHERE id = ?1",
873 [edge.target_id.to_string()],
874 |_| Ok(1),
875 )
876 .optional()
877 .map_err(|e| e.to_string())?;
878 if exists.is_none() {
879 return Err(format!(
880 "write_node_with_edges: missing target node {}",
881 edge.target_id
882 ));
883 }
884 }
885 persist_node(&tx, node)?;
886 tx.commit().map_err(|e| e.to_string())?;
887 Ok(())
888 }
889
890 pub fn validate_graph(&self, agent_id: &str) -> Result<GraphValidationReport, String> {
892 self.validate_graph_checked(agent_id)
893 .map_err(|e| e.to_string())
894 }
895
896 pub fn validate_graph_checked(
898 &self,
899 agent_id: &str,
900 ) -> Result<GraphValidationReport, GraphValidationError> {
901 use std::collections::HashSet;
902
903 let agent_nodes = self
904 .agent_node_ids(agent_id)
905 .map_err(GraphValidationError::Sqlite)?;
906 let node_count = agent_nodes.len();
907
908 let mut stmt = self
909 .conn
910 .prepare("SELECT from_id, to_id, label FROM ainl_graph_edges")
911 .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?;
912 let all_edges: Vec<(String, String, String)> = stmt
913 .query_map([], |row| {
914 Ok((
915 row.get::<_, String>(0)?,
916 row.get::<_, String>(1)?,
917 row.get::<_, String>(2)?,
918 ))
919 })
920 .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?
921 .collect::<Result<Vec<_>, _>>()
922 .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?;
923
924 let mut edge_pairs = Vec::new();
925 for (from_id, to_id, label) in all_edges {
926 let touches_agent = agent_nodes.contains(&from_id) || agent_nodes.contains(&to_id);
927 if touches_agent {
928 edge_pairs.push((from_id, to_id, label));
929 }
930 }
931
932 let edge_count = edge_pairs.len();
933 let mut dangling_edges = Vec::new();
934 let mut dangling_edge_details = Vec::new();
935 let mut cross_agent_boundary_edges = 0usize;
936
937 for (from_id, to_id, label) in &edge_pairs {
938 let from_ok = self
939 .node_row_exists(from_id)
940 .map_err(GraphValidationError::Sqlite)?;
941 let to_ok = self
942 .node_row_exists(to_id)
943 .map_err(GraphValidationError::Sqlite)?;
944 if !from_ok || !to_ok {
945 dangling_edges.push((from_id.clone(), to_id.clone()));
946 dangling_edge_details.push(DanglingEdgeDetail {
947 source_id: from_id.clone(),
948 target_id: to_id.clone(),
949 edge_type: label.clone(),
950 });
951 continue;
952 }
953 let fa = agent_nodes.contains(from_id);
954 let ta = agent_nodes.contains(to_id);
955 if fa ^ ta {
956 cross_agent_boundary_edges += 1;
957 }
958 }
959
960 let mut touched: HashSet<String> =
961 HashSet::with_capacity(edge_pairs.len().saturating_mul(2));
962 for (a, b, _) in &edge_pairs {
963 if agent_nodes.contains(a) {
964 touched.insert(a.clone());
965 }
966 if agent_nodes.contains(b) {
967 touched.insert(b.clone());
968 }
969 }
970
971 let mut orphan_nodes = Vec::new();
972 for id in &agent_nodes {
973 if !touched.contains(id) {
974 orphan_nodes.push(id.clone());
975 }
976 }
977
978 let is_valid = dangling_edges.is_empty();
979 Ok(GraphValidationReport {
980 agent_id: agent_id.to_string(),
981 node_count,
982 edge_count,
983 dangling_edges,
984 dangling_edge_details,
985 cross_agent_boundary_edges,
986 orphan_nodes,
987 is_valid,
988 })
989 }
990
991 fn node_row_exists(&self, id: &str) -> Result<bool, String> {
992 let v: Option<i32> = self
993 .conn
994 .query_row("SELECT 1 FROM ainl_graph_nodes WHERE id = ?1", [id], |_| {
995 Ok(1)
996 })
997 .optional()
998 .map_err(|e| e.to_string())?;
999 Ok(v.is_some())
1000 }
1001
1002 fn agent_node_ids(&self, agent_id: &str) -> Result<HashSet<String>, String> {
1003 let mut stmt = self
1004 .conn
1005 .prepare(
1006 "SELECT id FROM ainl_graph_nodes
1007 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
1008 )
1009 .map_err(|e| e.to_string())?;
1010 let ids = stmt
1011 .query_map([agent_id], |row| row.get::<_, String>(0))
1012 .map_err(|e| e.to_string())?
1013 .collect::<Result<HashSet<_>, _>>()
1014 .map_err(|e| e.to_string())?;
1015 Ok(ids)
1016 }
1017
1018 pub fn agent_subgraph_edges(&self, agent_id: &str) -> Result<Vec<SnapshotEdge>, String> {
1020 let id_set = self.agent_node_ids(agent_id)?;
1021 collect_snapshot_edges_for_id_set(&self.conn, &id_set)
1022 }
1023
1024 pub fn export_graph(&self, agent_id: &str) -> Result<AgentGraphSnapshot, String> {
1026 let mut stmt = self
1027 .conn
1028 .prepare(
1029 "SELECT payload FROM ainl_graph_nodes
1030 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
1031 )
1032 .map_err(|e| e.to_string())?;
1033 let nodes: Vec<AinlMemoryNode> = stmt
1034 .query_map([agent_id], |row| {
1035 let payload: String = row.get(0)?;
1036 Ok(payload)
1037 })
1038 .map_err(|e| e.to_string())?
1039 .map(|r| {
1040 let payload = r.map_err(|e| e.to_string())?;
1041 serde_json::from_str(&payload).map_err(|e| e.to_string())
1042 })
1043 .collect::<Result<Vec<_>, _>>()?;
1044
1045 let id_set: std::collections::HashSet<String> =
1046 nodes.iter().map(|n| n.id.to_string()).collect();
1047
1048 let edges = collect_snapshot_edges_for_id_set(&self.conn, &id_set)?;
1049
1050 Ok(AgentGraphSnapshot {
1051 agent_id: agent_id.to_string(),
1052 exported_at: Utc::now(),
1053 schema_version: std::borrow::Cow::Borrowed(SNAPSHOT_SCHEMA_VERSION),
1054 nodes,
1055 edges,
1056 })
1057 }
1058
1059 pub fn import_graph(
1068 &mut self,
1069 snapshot: &AgentGraphSnapshot,
1070 allow_dangling_edges: bool,
1071 ) -> Result<(), String> {
1072 self.import_graph_checked(snapshot, allow_dangling_edges)
1073 .map_err(|e| e.to_string())
1074 }
1075
1076 pub fn import_graph_checked(
1078 &mut self,
1079 snapshot: &AgentGraphSnapshot,
1080 allow_dangling_edges: bool,
1081 ) -> Result<(), SnapshotImportError> {
1082 if snapshot.schema_version.as_ref() != SNAPSHOT_SCHEMA_VERSION {
1083 return Err(SnapshotImportError::UnsupportedSchemaVersion {
1084 got: snapshot.schema_version.to_string(),
1085 expected: SNAPSHOT_SCHEMA_VERSION,
1086 });
1087 }
1088
1089 if allow_dangling_edges {
1090 self.conn
1091 .execute_batch("PRAGMA foreign_keys = OFF;")
1092 .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1093 }
1094
1095 let result: Result<(), SnapshotImportError> = (|| {
1096 let tx = self
1097 .conn
1098 .transaction()
1099 .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1100 for node in &snapshot.nodes {
1101 try_insert_node_ignore(&tx, node).map_err(SnapshotImportError::Sqlite)?;
1102 }
1103 for edge in &snapshot.edges {
1104 try_insert_edge_ignore(&tx, edge).map_err(SnapshotImportError::Sqlite)?;
1105 }
1106 tx.commit()
1107 .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1108 Ok(())
1109 })();
1110
1111 if allow_dangling_edges {
1112 self.conn
1113 .execute_batch("PRAGMA foreign_keys = ON;")
1114 .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1115 }
1116
1117 result
1118 }
1119
1120 pub fn insert_trajectory_detail(&self, row: &TrajectoryDetailRecord) -> Result<(), String> {
1122 let steps_json = serde_json::to_string(&row.steps).map_err(|e| e.to_string())?;
1123 let outcome_json = serde_json::to_string(&row.outcome).map_err(|e| e.to_string())?;
1124 let frame_s = match &row.frame_vars {
1125 None => None,
1126 Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
1127 };
1128 self.conn
1129 .execute(
1130 "INSERT OR REPLACE INTO ainl_trajectories (
1131 id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1132 recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1133 frame_vars_json, fitness_delta
1134 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1135 rusqlite::params![
1136 row.id.to_string(),
1137 row.episode_id.to_string(),
1138 row.graph_trajectory_node_id.map(|u| u.to_string()),
1139 row.agent_id,
1140 row.session_id,
1141 row.project_id,
1142 row.recorded_at,
1143 outcome_json,
1144 row.ainl_source_hash,
1145 row.duration_ms as i64,
1146 steps_json,
1147 frame_s,
1148 row.fitness_delta,
1149 ],
1150 )
1151 .map_err(|e| e.to_string())?;
1152 Ok(())
1153 }
1154
1155 pub fn list_trajectories_for_agent(
1157 &self,
1158 agent_id: &str,
1159 limit: usize,
1160 since_timestamp: Option<i64>,
1161 ) -> Result<Vec<TrajectoryDetailRecord>, String> {
1162 let cap = limit.clamp(1, 500) as i64;
1163 let sql = if since_timestamp.is_some() {
1164 "SELECT id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1165 recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1166 frame_vars_json, fitness_delta
1167 FROM ainl_trajectories
1168 WHERE agent_id = ?1 AND recorded_at >= ?2
1169 ORDER BY recorded_at DESC
1170 LIMIT ?3"
1171 } else {
1172 "SELECT id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1173 recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1174 frame_vars_json, fitness_delta
1175 FROM ainl_trajectories
1176 WHERE agent_id = ?1
1177 ORDER BY recorded_at DESC
1178 LIMIT ?2"
1179 };
1180
1181 let mut stmt = self.conn.prepare(sql).map_err(|e| e.to_string())?;
1182 let rows = if let Some(since) = since_timestamp {
1183 stmt.query_map(rusqlite::params![agent_id, since, cap], map_trajectory_row)
1184 } else {
1185 stmt.query_map(rusqlite::params![agent_id, cap], map_trajectory_row)
1186 }
1187 .map_err(|e| e.to_string())?;
1188
1189 let mut out = Vec::new();
1190 for row in rows {
1191 out.push(row.map_err(|e| e.to_string())?);
1192 }
1193 Ok(out)
1194 }
1195
1196 pub fn count_trajectory_details_before(
1198 &self,
1199 agent_id: &str,
1200 before_unix: i64,
1201 ) -> Result<usize, String> {
1202 if agent_id.trim().is_empty() {
1203 return Err("agent_id is empty".into());
1204 }
1205 let n: i64 = self
1206 .conn
1207 .query_row(
1208 "SELECT COUNT(*) FROM ainl_trajectories WHERE agent_id = ?1 AND recorded_at < ?2",
1209 rusqlite::params![agent_id, before_unix],
1210 |r| r.get(0),
1211 )
1212 .map_err(|e| e.to_string())?;
1213 Ok(n as usize)
1214 }
1215
1216 pub fn delete_trajectory_details_before(
1222 &self,
1223 agent_id: &str,
1224 before_unix: i64,
1225 ) -> Result<usize, String> {
1226 if agent_id.trim().is_empty() {
1227 return Err("agent_id is empty".into());
1228 }
1229 let n = self
1230 .conn
1231 .execute(
1232 "DELETE FROM ainl_trajectories WHERE agent_id = ?1 AND recorded_at < ?2",
1233 rusqlite::params![agent_id, before_unix],
1234 )
1235 .map_err(|e| e.to_string())?;
1236 Ok(n)
1237 }
1238
1239 pub fn search_failures_fts_for_agent(
1243 &self,
1244 agent_id: &str,
1245 query: &str,
1246 limit: usize,
1247 ) -> Result<Vec<AinlMemoryNode>, String> {
1248 let fts_q = fts5_prefix_match_query(query);
1249 if fts_q.is_empty() || agent_id.trim().is_empty() {
1250 return Ok(Vec::new());
1251 }
1252 let cap = limit.clamp(1, 200) as i64;
1253 let mut stmt = self
1254 .conn
1255 .prepare(
1256 "SELECT n.payload
1257 FROM ainl_failures_fts AS f
1258 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1259 WHERE n.node_type = 'failure'
1260 AND json_extract(n.payload, '$.agent_id') = ?1
1261 AND f.body MATCH ?2
1262 ORDER BY n.timestamp DESC
1263 LIMIT ?3",
1264 )
1265 .map_err(|e| e.to_string())?;
1266
1267 let rows = stmt.query_map(rusqlite::params![agent_id, fts_q, cap], |row| {
1268 let payload: String = row.get(0)?;
1269 Ok(payload)
1270 });
1271
1272 let mut out = Vec::new();
1273 let rows = match rows {
1274 Ok(r) => r,
1275 Err(e) => {
1276 let msg = e.to_string();
1277 if msg.contains("fts5") || msg.to_ascii_lowercase().contains("syntax") {
1278 return Ok(Vec::new());
1279 }
1280 return Err(msg);
1281 }
1282 };
1283 for row in rows {
1284 match row {
1285 Ok(payload) => {
1286 if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1287 out.push(node);
1288 }
1289 }
1290 Err(e) => {
1291 let msg = e.to_string();
1292 if msg.contains("fts5") || msg.to_ascii_lowercase().contains("syntax") {
1293 return Ok(Vec::new());
1294 }
1295 return Err(msg);
1296 }
1297 }
1298 }
1299 Ok(out)
1300 }
1301
1302 pub fn search_all_nodes_fts_for_agent(
1308 &self,
1309 agent_id: &str,
1310 query: &str,
1311 project_id: Option<&str>,
1312 limit: usize,
1313 ) -> Result<Vec<AinlMemoryNode>, String> {
1314 let fts_q = fts5_prefix_match_query(query);
1315 if fts_q.is_empty() || agent_id.trim().is_empty() {
1316 return Ok(Vec::new());
1317 }
1318 let cap = limit.clamp(1, 200) as i64;
1319 let project_filter = project_id.map(str::trim).filter(|s| !s.is_empty());
1320 let mut out = Vec::new();
1321 if let Some(p) = project_filter {
1322 let mut stmt = self
1323 .conn
1324 .prepare(
1325 "SELECT n.payload
1326 FROM ainl_nodes_fts AS f
1327 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1328 WHERE f.agent_id = ?1
1329 AND (COALESCE(f.project_id, '') = '' OR f.project_id = ?3)
1330 AND f.body MATCH ?2
1331 ORDER BY n.timestamp DESC
1332 LIMIT ?4",
1333 )
1334 .map_err(|e| e.to_string())?;
1335 let mut rows = stmt
1336 .query(rusqlite::params![agent_id, fts_q, p, cap])
1337 .map_err(|e| e.to_string())?;
1338 while let Some(row) = rows.next().map_err(|e| e.to_string())? {
1339 let payload: String = row.get(0).map_err(|e| e.to_string())?;
1340 if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1341 out.push(node);
1342 }
1343 }
1344 } else {
1345 let mut stmt = self
1346 .conn
1347 .prepare(
1348 "SELECT n.payload
1349 FROM ainl_nodes_fts AS f
1350 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1351 WHERE f.agent_id = ?1
1352 AND f.body MATCH ?2
1353 ORDER BY n.timestamp DESC
1354 LIMIT ?3",
1355 )
1356 .map_err(|e| e.to_string())?;
1357 let mut rows = stmt
1358 .query(rusqlite::params![agent_id, fts_q, cap])
1359 .map_err(|e| e.to_string())?;
1360 while let Some(row) = rows.next().map_err(|e| e.to_string())? {
1361 let payload: String = row.get(0).map_err(|e| e.to_string())?;
1362 if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1363 out.push(node);
1364 }
1365 }
1366 }
1367 Ok(out)
1368 }
1369}
1370
1371fn map_trajectory_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<TrajectoryDetailRecord> {
1372 let id_s: String = row.get(0)?;
1373 let episode_s: String = row.get(1)?;
1374 let graph_traj: Option<String> = row.get(2)?;
1375 let agent_id: String = row.get(3)?;
1376 let session_id: String = row.get(4)?;
1377 let project_id: Option<String> = row.get(5)?;
1378 let recorded_at: i64 = row.get(6)?;
1379 let outcome_json: String = row.get(7)?;
1380 let hash: Option<String> = row.get(8)?;
1381 let duration_ms: i64 = row.get(9)?;
1382 let steps_json: String = row.get(10)?;
1383 let frame_vars_json: Option<String> = row.get(11)?;
1384 let fitness_sql: Option<f64> = row.get(12)?;
1385 let id = Uuid::parse_str(&id_s).map_err(|_| {
1386 rusqlite::Error::InvalidColumnType(0, "id".into(), rusqlite::types::Type::Text)
1387 })?;
1388 let episode_id = Uuid::parse_str(&episode_s).map_err(|_| {
1389 rusqlite::Error::InvalidColumnType(1, "episode_id".into(), rusqlite::types::Type::Text)
1390 })?;
1391 let graph_trajectory_node_id = graph_traj
1392 .filter(|s| !s.is_empty())
1393 .map(|s| Uuid::parse_str(&s))
1394 .transpose()
1395 .map_err(|_| {
1396 rusqlite::Error::InvalidColumnType(
1397 2,
1398 "graph_trajectory_node_id".into(),
1399 rusqlite::types::Type::Text,
1400 )
1401 })?;
1402 let outcome: TrajectoryOutcome = serde_json::from_str(&outcome_json).map_err(|_| {
1403 rusqlite::Error::InvalidColumnType(7, "outcome_json".into(), rusqlite::types::Type::Text)
1404 })?;
1405 let steps: Vec<TrajectoryStep> = serde_json::from_str(&steps_json).map_err(|_| {
1406 rusqlite::Error::InvalidColumnType(10, "steps_json".into(), rusqlite::types::Type::Text)
1407 })?;
1408 let frame_vars = frame_vars_json
1409 .filter(|s| !s.trim().is_empty())
1410 .and_then(|s| serde_json::from_str(&s).ok());
1411 let fitness_delta = fitness_sql.map(|f| f as f32);
1412 Ok(TrajectoryDetailRecord {
1413 id,
1414 episode_id,
1415 graph_trajectory_node_id,
1416 agent_id,
1417 session_id,
1418 project_id,
1419 recorded_at,
1420 outcome,
1421 ainl_source_hash: hash,
1422 duration_ms: duration_ms.max(0) as u64,
1423 steps,
1424 frame_vars,
1425 fitness_delta,
1426 })
1427}
1428
1429impl GraphStore for SqliteGraphStore {
1430 fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String> {
1433 persist_node(&self.conn, node)
1434 }
1435
1436 fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String> {
1437 let payload: Option<String> = self
1438 .conn
1439 .query_row(
1440 "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
1441 [id.to_string()],
1442 |row| row.get::<_, String>(0),
1443 )
1444 .optional()
1445 .map_err(|e: rusqlite::Error| e.to_string())?;
1446
1447 match payload {
1448 Some(p) => {
1449 let node: AinlMemoryNode = serde_json::from_str(&p).map_err(|e| e.to_string())?;
1450 Ok(Some(node))
1451 }
1452 None => Ok(None),
1453 }
1454 }
1455
1456 fn query_episodes_since(
1457 &self,
1458 since_timestamp: i64,
1459 limit: usize,
1460 ) -> Result<Vec<AinlMemoryNode>, String> {
1461 let mut stmt = self
1462 .conn
1463 .prepare(
1464 "SELECT payload FROM ainl_graph_nodes
1465 WHERE node_type = 'episode' AND timestamp >= ?1
1466 ORDER BY timestamp DESC
1467 LIMIT ?2",
1468 )
1469 .map_err(|e| e.to_string())?;
1470
1471 let rows = stmt
1472 .query_map([since_timestamp, limit as i64], |row| {
1473 let payload: String = row.get(0)?;
1474 Ok(payload)
1475 })
1476 .map_err(|e| e.to_string())?;
1477
1478 let mut nodes = Vec::new();
1479 for row in rows {
1480 let payload = row.map_err(|e| e.to_string())?;
1481 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
1482 nodes.push(node);
1483 }
1484
1485 Ok(nodes)
1486 }
1487
1488 fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String> {
1489 let mut stmt = self
1490 .conn
1491 .prepare(
1492 "SELECT payload FROM ainl_graph_nodes
1493 WHERE node_type = ?1
1494 ORDER BY timestamp DESC",
1495 )
1496 .map_err(|e| e.to_string())?;
1497
1498 let rows = stmt
1499 .query_map([type_name], |row| {
1500 let payload: String = row.get(0)?;
1501 Ok(payload)
1502 })
1503 .map_err(|e| e.to_string())?;
1504
1505 let mut nodes = Vec::new();
1506 for row in rows {
1507 let payload = row.map_err(|e| e.to_string())?;
1508 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
1509 nodes.push(node);
1510 }
1511
1512 Ok(nodes)
1513 }
1514
1515 fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
1516 let mut stmt = self
1517 .conn
1518 .prepare(
1519 "SELECT to_id FROM ainl_graph_edges
1520 WHERE from_id = ?1 AND label = ?2",
1521 )
1522 .map_err(|e| e.to_string())?;
1523
1524 let target_ids: Vec<String> = stmt
1525 .query_map([from_id.to_string(), label.to_string()], |row| row.get(0))
1526 .map_err(|e| e.to_string())?
1527 .collect::<Result<Vec<_>, _>>()
1528 .map_err(|e| e.to_string())?;
1529
1530 let mut nodes = Vec::new();
1531 for target_id in target_ids {
1532 let id = Uuid::parse_str(&target_id).map_err(|e| e.to_string())?;
1533 if let Some(node) = self.read_node(id)? {
1534 nodes.push(node);
1535 }
1536 }
1537
1538 Ok(nodes)
1539 }
1540
1541 fn walk_edges_to(&self, to_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
1542 let mut stmt = self
1543 .conn
1544 .prepare(
1545 "SELECT from_id FROM ainl_graph_edges
1546 WHERE to_id = ?1 AND label = ?2",
1547 )
1548 .map_err(|e| e.to_string())?;
1549
1550 let source_ids: Vec<String> = stmt
1551 .query_map([to_id.to_string(), label.to_string()], |row| row.get(0))
1552 .map_err(|e| e.to_string())?
1553 .collect::<Result<Vec<_>, _>>()
1554 .map_err(|e| e.to_string())?;
1555
1556 let mut nodes = Vec::new();
1557 for source_id in source_ids {
1558 let id = Uuid::parse_str(&source_id).map_err(|e| e.to_string())?;
1559 if let Some(node) = self.read_node(id)? {
1560 nodes.push(node);
1561 }
1562 }
1563
1564 Ok(nodes)
1565 }
1566}