1use crate::node::{AinlMemoryNode, AinlNodeType, MemoryCategory, RuntimeStateNode};
30use crate::snapshot::{
31 AgentGraphSnapshot, DanglingEdgeDetail, GraphValidationReport, SnapshotEdge,
32 SNAPSHOT_SCHEMA_VERSION,
33};
34use crate::trajectory_table::TrajectoryDetailRecord;
35use ainl_contracts::{TrajectoryOutcome, TrajectoryStep};
36use chrono::Utc;
37use rusqlite::OptionalExtension;
38use std::collections::HashSet;
39use uuid::Uuid;
40
41#[derive(Debug, Clone)]
43pub enum SnapshotImportError {
44 UnsupportedSchemaVersion { got: String, expected: &'static str },
45 Sqlite(String),
46}
47
48impl std::fmt::Display for SnapshotImportError {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 Self::UnsupportedSchemaVersion { got, expected } => write!(
52 f,
53 "unsupported snapshot schema_version '{got}'; expected '{expected}'"
54 ),
55 Self::Sqlite(e) => write!(f, "{e}"),
56 }
57 }
58}
59
60#[derive(Debug, Clone)]
62pub enum GraphValidationError {
63 Sqlite(String),
64}
65
66impl std::fmt::Display for GraphValidationError {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 Self::Sqlite(e) => write!(f, "{e}"),
70 }
71 }
72}
73
74pub trait GraphStore {
76 fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String>;
78
79 fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String>;
81
82 fn query_episodes_since(
84 &self,
85 since_timestamp: i64,
86 limit: usize,
87 ) -> Result<Vec<AinlMemoryNode>, String>;
88
89 fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String>;
91
92 fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
94}
95
96pub struct SqliteGraphStore {
102 conn: rusqlite::Connection,
103}
104
105fn enable_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
106 conn.execute_batch("PRAGMA foreign_keys = ON;")
107}
108
109fn migrate_edge_columns(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
110 let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_edges)")?;
111 let cols = stmt
112 .query_map([], |row| row.get::<_, String>(1))?
113 .collect::<Result<Vec<_>, _>>()?;
114 if !cols.iter().any(|c| c == "weight") {
115 conn.execute(
116 "ALTER TABLE ainl_graph_edges ADD COLUMN weight REAL NOT NULL DEFAULT 1.0",
117 [],
118 )?;
119 }
120 if !cols.iter().any(|c| c == "metadata") {
121 conn.execute("ALTER TABLE ainl_graph_edges ADD COLUMN metadata TEXT", [])?;
122 }
123 Ok(())
124}
125
126fn edges_table_has_foreign_keys(conn: &rusqlite::Connection) -> Result<bool, rusqlite::Error> {
128 let exists: i64 = conn.query_row(
129 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
130 [],
131 |r| r.get(0),
132 )?;
133 if exists == 0 {
134 return Ok(false);
135 }
136 let n: i64 = conn.query_row(
137 "SELECT COUNT(*) FROM pragma_foreign_key_list('ainl_graph_edges')",
138 [],
139 |r| r.get(0),
140 )?;
141 Ok(n > 0)
142}
143
144fn migrate_edges_add_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
147 if edges_table_has_foreign_keys(conn)? {
148 return Ok(());
149 }
150
151 let exists: i64 = conn.query_row(
152 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
153 [],
154 |r| r.get(0),
155 )?;
156 if exists == 0 {
157 return Ok(());
158 }
159
160 conn.execute("BEGIN IMMEDIATE", [])?;
161 let res: Result<(), rusqlite::Error> = (|| {
162 conn.execute("DROP INDEX IF EXISTS idx_ainl_edges_from", [])?;
163 conn.execute(
164 "ALTER TABLE ainl_graph_edges RENAME TO ainl_graph_edges__old",
165 [],
166 )?;
167 conn.execute(
168 r#"CREATE TABLE ainl_graph_edges (
169 from_id TEXT NOT NULL,
170 to_id TEXT NOT NULL,
171 label TEXT NOT NULL,
172 weight REAL NOT NULL DEFAULT 1.0,
173 metadata TEXT,
174 PRIMARY KEY (from_id, to_id, label),
175 FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
176 FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
177 )"#,
178 [],
179 )?;
180 conn.execute(
181 r#"INSERT INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
182 SELECT o.from_id, o.to_id, o.label,
183 COALESCE(o.weight, 1.0),
184 o.metadata
185 FROM ainl_graph_edges__old o
186 WHERE EXISTS (SELECT 1 FROM ainl_graph_nodes n WHERE n.id = o.from_id)
187 AND EXISTS (SELECT 1 FROM ainl_graph_nodes n2 WHERE n2.id = o.to_id)"#,
188 [],
189 )?;
190 conn.execute("DROP TABLE ainl_graph_edges__old", [])?;
191 conn.execute(
192 "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from ON ainl_graph_edges(from_id, label)",
193 [],
194 )?;
195 Ok(())
196 })();
197
198 match res {
199 Ok(()) => {
200 conn.execute("COMMIT", [])?;
201 }
202 Err(e) => {
203 let _ = conn.execute("ROLLBACK", []);
204 return Err(e);
205 }
206 }
207 Ok(())
208}
209
210fn node_type_name(node: &AinlMemoryNode) -> &'static str {
211 match &node.node_type {
212 AinlNodeType::Episode { .. } => "episode",
213 AinlNodeType::Semantic { .. } => "semantic",
214 AinlNodeType::Procedural { .. } => "procedural",
215 AinlNodeType::Persona { .. } => "persona",
216 AinlNodeType::RuntimeState { .. } => "runtime_state",
217 AinlNodeType::Trajectory { .. } => "trajectory",
218 AinlNodeType::Failure { .. } => "failure",
219 }
220}
221
222fn node_timestamp(node: &AinlMemoryNode) -> i64 {
223 match &node.node_type {
224 AinlNodeType::Episode { episodic } => episodic.timestamp,
225 AinlNodeType::RuntimeState { runtime_state } => runtime_state.updated_at,
226 AinlNodeType::Trajectory { trajectory } => trajectory.recorded_at,
227 AinlNodeType::Failure { failure } => failure.recorded_at,
228 _ => chrono::Utc::now().timestamp(),
229 }
230}
231
232fn failure_fts_body(node: &AinlMemoryNode) -> Option<String> {
233 match &node.node_type {
234 AinlNodeType::Failure { failure } => Some(format!(
235 "{} {} {}",
236 failure.source,
237 failure.tool_name.as_deref().unwrap_or(""),
238 failure.message
239 )),
240 _ => None,
241 }
242}
243
244fn fts5_prefix_match_query(raw: &str) -> String {
246 raw.split_whitespace()
247 .filter(|t| !t.is_empty())
248 .filter_map(|t| {
249 let esc: String = t.chars().filter(|c| !c.is_control() && *c != '"').collect();
250 if esc.is_empty() {
251 return None;
252 }
253 Some(format!("\"{esc}*\""))
254 })
255 .collect::<Vec<_>>()
256 .join(" AND ")
257}
258
259fn sync_failure_fts_insert(conn: &rusqlite::Connection, node_id: &str, body: &str) -> Result<(), String> {
260 conn.execute(
261 "DELETE FROM ainl_failures_fts WHERE node_id = ?1",
262 [node_id],
263 )
264 .map_err(|e| e.to_string())?;
265 conn.execute(
266 "INSERT INTO ainl_failures_fts(node_id, body) VALUES (?1, ?2)",
267 rusqlite::params![node_id, body],
268 )
269 .map_err(|e| e.to_string())?;
270 Ok(())
271}
272
273fn graph_node_fts_body_from_payload_json(payload: &str) -> String {
275 if payload.chars().count() > 400_000 {
276 payload.chars().take(400_000).collect()
277 } else {
278 payload.to_string()
279 }
280}
281
282fn sync_all_nodes_fts_insert(
283 conn: &rusqlite::Connection,
284 node_id: &str,
285 agent_id: &str,
286 project_id: Option<&str>,
287 body: &str,
288) -> Result<(), String> {
289 let proj = project_id.map(str::trim).filter(|s| !s.is_empty());
290 conn.execute("DELETE FROM ainl_nodes_fts WHERE node_id = ?1", [node_id])
291 .map_err(|e| e.to_string())?;
292 conn.execute(
293 "INSERT INTO ainl_nodes_fts(node_id, agent_id, project_id, body) VALUES (?1, ?2, ?3, ?4)",
294 rusqlite::params![node_id, agent_id, proj, body],
295 )
296 .map_err(|e| e.to_string())?;
297 Ok(())
298}
299
300fn persist_edge(
301 conn: &rusqlite::Connection,
302 from_id: Uuid,
303 to_id: Uuid,
304 label: &str,
305 weight: f32,
306 metadata: Option<&str>,
307) -> Result<(), String> {
308 conn.execute(
309 "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
310 VALUES (?1, ?2, ?3, ?4, ?5)",
311 rusqlite::params![
312 from_id.to_string(),
313 to_id.to_string(),
314 label,
315 weight,
316 metadata
317 ],
318 )
319 .map_err(|e| e.to_string())?;
320 Ok(())
321}
322
323fn collect_snapshot_edges_for_id_set(
325 conn: &rusqlite::Connection,
326 id_set: &HashSet<String>,
327) -> Result<Vec<SnapshotEdge>, String> {
328 let mut edge_stmt = conn
329 .prepare("SELECT from_id, to_id, label, weight, metadata FROM ainl_graph_edges")
330 .map_err(|e| e.to_string())?;
331 let edge_rows = edge_stmt
332 .query_map([], |row| {
333 Ok((
334 row.get::<_, String>(0)?,
335 row.get::<_, String>(1)?,
336 row.get::<_, String>(2)?,
337 row.get::<_, f64>(3)?,
338 row.get::<_, Option<String>>(4)?,
339 ))
340 })
341 .map_err(|e| e.to_string())?
342 .collect::<Result<Vec<_>, _>>()
343 .map_err(|e| e.to_string())?;
344
345 let mut edges = Vec::new();
346 for (from_id, to_id, label, weight, meta) in edge_rows {
347 if !id_set.contains(&from_id) || !id_set.contains(&to_id) {
348 continue;
349 }
350 let source_id = Uuid::parse_str(&from_id).map_err(|e| e.to_string())?;
351 let target_id = Uuid::parse_str(&to_id).map_err(|e| e.to_string())?;
352 let metadata = match meta {
353 Some(s) if !s.is_empty() => Some(serde_json::from_str(&s).map_err(|e| e.to_string())?),
354 _ => None,
355 };
356 edges.push(SnapshotEdge {
357 source_id,
358 target_id,
359 edge_type: label,
360 weight: weight as f32,
361 metadata,
362 });
363 }
364 Ok(edges)
365}
366
367fn persist_node(conn: &rusqlite::Connection, node: &AinlMemoryNode) -> Result<(), String> {
368 let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
369 let type_name = node_type_name(node);
370 let timestamp = node_timestamp(node);
371 let proj = node
372 .project_id
373 .as_deref()
374 .map(str::trim)
375 .filter(|s| !s.is_empty());
376
377 conn.execute(
378 "INSERT OR REPLACE INTO ainl_graph_nodes (id, node_type, payload, timestamp, project_id)
379 VALUES (?1, ?2, ?3, ?4, ?5)",
380 rusqlite::params![node.id.to_string(), type_name, payload, timestamp, proj,],
381 )
382 .map_err(|e| e.to_string())?;
383
384 for edge in &node.edges {
385 persist_edge(
386 conn,
387 node.id,
388 edge.target_id,
389 &edge.label,
390 1.0,
391 None::<&str>,
392 )?;
393 }
394
395 let body_all = graph_node_fts_body_from_payload_json(&payload);
396 if !node.agent_id.trim().is_empty() {
397 let _ = sync_all_nodes_fts_insert(
398 conn,
399 &node.id.to_string(),
400 node.agent_id.as_str(),
401 proj,
402 &body_all,
403 );
404 }
405
406 if let Some(body) = failure_fts_body(node) {
407 let _ = sync_failure_fts_insert(conn, &node.id.to_string(), &body);
409 }
410
411 Ok(())
412}
413
414fn try_insert_node_ignore(
415 conn: &rusqlite::Connection,
416 node: &AinlMemoryNode,
417) -> Result<(), String> {
418 let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
419 let type_name = node_type_name(node);
420 let timestamp = node_timestamp(node);
421 let proj = node
422 .project_id
423 .as_deref()
424 .map(str::trim)
425 .filter(|s| !s.is_empty());
426 let n = conn
427 .execute(
428 "INSERT OR IGNORE INTO ainl_graph_nodes (id, node_type, payload, timestamp, project_id)
429 VALUES (?1, ?2, ?3, ?4, ?5)",
430 rusqlite::params![node.id.to_string(), type_name, payload, timestamp, proj,],
431 )
432 .map_err(|e| e.to_string())?;
433 if n > 0 {
434 if !node.agent_id.trim().is_empty() {
435 let body_all = graph_node_fts_body_from_payload_json(&payload);
436 let _ = sync_all_nodes_fts_insert(
437 conn,
438 &node.id.to_string(),
439 node.agent_id.as_str(),
440 proj,
441 &body_all,
442 );
443 }
444 if let Some(body) = failure_fts_body(node) {
445 let _ = sync_failure_fts_insert(conn, &node.id.to_string(), &body);
446 }
447 }
448 Ok(())
449}
450
451fn try_insert_edge_ignore(conn: &rusqlite::Connection, edge: &SnapshotEdge) -> Result<(), String> {
452 let meta = match &edge.metadata {
453 Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
454 None => None,
455 };
456 conn.execute(
457 "INSERT OR IGNORE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
458 VALUES (?1, ?2, ?3, ?4, ?5)",
459 rusqlite::params![
460 edge.source_id.to_string(),
461 edge.target_id.to_string(),
462 edge.edge_type,
463 edge.weight,
464 meta.as_deref(),
465 ],
466 )
467 .map_err(|e| e.to_string())?;
468 Ok(())
469}
470
471fn migrate_failures_fts_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
472 conn.execute(
473 "CREATE VIRTUAL TABLE IF NOT EXISTS ainl_failures_fts USING fts5(
474 node_id UNINDEXED,
475 body,
476 tokenize = 'unicode61 remove_diacritics 1'
477 )",
478 [],
479 )?;
480 Ok(())
481}
482
483fn migrate_ainl_graph_nodes_add_project_id(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
484 let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_nodes)")?;
485 let cols = stmt
486 .query_map([], |row| row.get::<_, String>(1))?
487 .collect::<Result<Vec<_>, _>>()?;
488 if !cols.iter().any(|c| c == "project_id") {
489 conn.execute(
490 "ALTER TABLE ainl_graph_nodes ADD COLUMN project_id TEXT",
491 [],
492 )?;
493 }
494 Ok(())
495}
496
497fn migrate_ainl_nodes_fts_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
498 conn.execute(
499 "CREATE VIRTUAL TABLE IF NOT EXISTS ainl_nodes_fts USING fts5(
500 node_id UNINDEXED,
501 agent_id UNINDEXED,
502 project_id UNINDEXED,
503 body,
504 tokenize = 'unicode61 remove_diacritics 1'
505 )",
506 [],
507 )?;
508 Ok(())
509}
510
511fn install_ainl_graph_node_delete_fts_triggers(
513 conn: &rusqlite::Connection,
514) -> Result<(), rusqlite::Error> {
515 conn.execute_batch(
516 "DROP TRIGGER IF EXISTS ainl_graph_nodes_after_delete_fts;
517 CREATE TRIGGER ainl_graph_nodes_after_delete_fts
518 AFTER DELETE ON ainl_graph_nodes
519 FOR EACH ROW
520 BEGIN
521 DELETE FROM ainl_nodes_fts WHERE node_id = OLD.id;
522 DELETE FROM ainl_failures_fts WHERE node_id = OLD.id;
523 END;",
524 )?;
525 Ok(())
526}
527
528fn backfill_ainl_nodes_fts_if_empty(conn: &rusqlite::Connection) -> Result<(), String> {
530 let fts_n: i64 = conn
531 .query_row("SELECT COUNT(*) FROM ainl_nodes_fts", [], |r| r.get(0))
532 .unwrap_or(0);
533 if fts_n > 0 {
534 return Ok(());
535 }
536 let mut stmt = conn
537 .prepare(
538 "SELECT id, payload, project_id FROM ainl_graph_nodes
539 WHERE TRIM(COALESCE(json_extract(payload, '$.agent_id'), '')) != ''",
540 )
541 .map_err(|e| e.to_string())?;
542 let rows = stmt
543 .query_map([], |row| {
544 Ok((
545 row.get::<_, String>(0)?,
546 row.get::<_, String>(1)?,
547 row.get::<_, Option<String>>(2)?,
548 ))
549 })
550 .map_err(|e| e.to_string())?
551 .collect::<Result<Vec<_>, _>>()
552 .map_err(|e| e.to_string())?;
553 for (id, payload, col_proj) in rows {
554 let v: serde_json::Value = match serde_json::from_str(&payload) {
555 Ok(x) => x,
556 Err(_) => continue,
557 };
558 let ag = v
559 .get("agent_id")
560 .and_then(|x| x.as_str())
561 .map(str::trim)
562 .unwrap_or("");
563 if ag.is_empty() {
564 continue;
565 }
566 let json_proj = v
567 .get("project_id")
568 .and_then(|x| x.as_str())
569 .map(str::trim)
570 .filter(|s| !s.is_empty());
571 let proj = col_proj
572 .as_deref()
573 .map(str::trim)
574 .filter(|s| !s.is_empty())
575 .or(json_proj);
576 let body = graph_node_fts_body_from_payload_json(&payload);
577 if sync_all_nodes_fts_insert(conn, &id, ag, proj, &body).is_err() {
578 }
580 }
581 Ok(())
582}
583
584fn migrate_trajectories_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
585 conn.execute(
586 "CREATE TABLE IF NOT EXISTS ainl_trajectories (
587 id TEXT PRIMARY KEY,
588 episode_id TEXT NOT NULL,
589 graph_trajectory_node_id TEXT,
590 agent_id TEXT NOT NULL,
591 session_id TEXT NOT NULL,
592 project_id TEXT,
593 recorded_at INTEGER NOT NULL,
594 outcome_json TEXT NOT NULL,
595 ainl_source_hash TEXT,
596 duration_ms INTEGER NOT NULL DEFAULT 0,
597 steps_json TEXT NOT NULL,
598 FOREIGN KEY (episode_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
599 )",
600 [],
601 )?;
602 conn.execute(
603 "CREATE INDEX IF NOT EXISTS idx_ainl_traj_agent_time
604 ON ainl_trajectories(agent_id, recorded_at DESC)",
605 [],
606 )?;
607 Ok(())
608}
609
610fn migrate_ainl_trajectories_add_depth_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
611 let mut stmt = conn.prepare("PRAGMA table_info(ainl_trajectories)")?;
612 let cols: Vec<String> = stmt
613 .query_map([], |row| row.get::<_, String>(1))?
614 .collect::<Result<Vec<_>, _>>()?;
615 if !cols.iter().any(|c| c == "frame_vars_json") {
616 conn.execute(
617 "ALTER TABLE ainl_trajectories ADD COLUMN frame_vars_json TEXT",
618 [],
619 )?;
620 }
621 if !cols.iter().any(|c| c == "fitness_delta") {
622 conn.execute("ALTER TABLE ainl_trajectories ADD COLUMN fitness_delta REAL", [])?;
623 }
624 Ok(())
625}
626
627impl SqliteGraphStore {
628 pub fn ensure_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
630 conn.execute(
631 "CREATE TABLE IF NOT EXISTS ainl_graph_nodes (
632 id TEXT PRIMARY KEY,
633 node_type TEXT NOT NULL,
634 payload TEXT NOT NULL,
635 timestamp INTEGER NOT NULL
636 )",
637 [],
638 )?;
639
640 conn.execute(
641 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_timestamp
642 ON ainl_graph_nodes(timestamp DESC)",
643 [],
644 )?;
645
646 conn.execute(
647 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_type
648 ON ainl_graph_nodes(node_type)",
649 [],
650 )?;
651
652 migrate_ainl_graph_nodes_add_project_id(conn)?;
653 conn.execute(
654 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_project_type_time
655 ON ainl_graph_nodes(project_id, node_type, timestamp)",
656 [],
657 )?;
658
659 conn.execute(
660 "CREATE TABLE IF NOT EXISTS ainl_graph_edges (
661 from_id TEXT NOT NULL,
662 to_id TEXT NOT NULL,
663 label TEXT NOT NULL,
664 weight REAL NOT NULL DEFAULT 1.0,
665 metadata TEXT,
666 PRIMARY KEY (from_id, to_id, label),
667 FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
668 FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
669 )",
670 [],
671 )?;
672
673 conn.execute(
674 "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from
675 ON ainl_graph_edges(from_id, label)",
676 [],
677 )?;
678
679 migrate_edge_columns(conn)?;
680 migrate_edges_add_foreign_keys(conn)?;
681 migrate_trajectories_v1(conn)?;
682 migrate_ainl_trajectories_add_depth_v1(conn)?;
683 migrate_failures_fts_v1(conn)?;
684 migrate_ainl_nodes_fts_v1(conn)?;
685 if let Err(_) = backfill_ainl_nodes_fts_if_empty(conn) {
686 }
688 let _ = install_ainl_graph_node_delete_fts_triggers(conn);
689 Ok(())
690 }
691
692 pub fn open(path: &std::path::Path) -> Result<Self, String> {
694 let conn = rusqlite::Connection::open(path).map_err(|e| e.to_string())?;
695 enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
696 Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
697 Ok(Self { conn })
698 }
699
700 pub fn from_connection(conn: rusqlite::Connection) -> Result<Self, String> {
702 enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
703 Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
704 Ok(Self { conn })
705 }
706
707 pub(crate) fn conn(&self) -> &rusqlite::Connection {
709 &self.conn
710 }
711
712 pub fn insert_graph_edge(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
714 persist_edge(&self.conn, from_id, to_id, label, 1.0, None)
715 }
716
717 pub fn insert_graph_edge_checked(
719 &self,
720 from_id: Uuid,
721 to_id: Uuid,
722 label: &str,
723 ) -> Result<(), String> {
724 if !self.node_row_exists(&from_id.to_string())? {
725 return Err(format!(
726 "insert_graph_edge_checked: missing source node row {}",
727 from_id
728 ));
729 }
730 if !self.node_row_exists(&to_id.to_string())? {
731 return Err(format!(
732 "insert_graph_edge_checked: missing target node row {}",
733 to_id
734 ));
735 }
736 self.insert_graph_edge(from_id, to_id, label)
737 }
738
739 pub fn insert_graph_edge_with_meta(
741 &self,
742 from_id: Uuid,
743 to_id: Uuid,
744 label: &str,
745 weight: f32,
746 metadata: Option<&serde_json::Value>,
747 ) -> Result<(), String> {
748 let meta = metadata
749 .map(serde_json::to_string)
750 .transpose()
751 .map_err(|e| e.to_string())?;
752 persist_edge(&self.conn, from_id, to_id, label, weight, meta.as_deref())
753 }
754
755 pub fn query_nodes_by_type_since(
757 &self,
758 node_type: &str,
759 since_timestamp: i64,
760 limit: usize,
761 ) -> Result<Vec<AinlMemoryNode>, String> {
762 let mut stmt = self
763 .conn
764 .prepare(
765 "SELECT payload FROM ainl_graph_nodes
766 WHERE node_type = ?1 AND timestamp >= ?2
767 ORDER BY timestamp DESC
768 LIMIT ?3",
769 )
770 .map_err(|e| e.to_string())?;
771
772 let rows = stmt
773 .query_map(
774 rusqlite::params![node_type, since_timestamp, limit as i64],
775 |row| {
776 let payload: String = row.get(0)?;
777 Ok(payload)
778 },
779 )
780 .map_err(|e| e.to_string())?;
781
782 let mut nodes = Vec::new();
783 for row in rows {
784 let payload = row.map_err(|e| e.to_string())?;
785 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
786 nodes.push(node);
787 }
788
789 Ok(nodes)
790 }
791
792 pub fn read_runtime_state(&self, agent_id: &str) -> Result<Option<RuntimeStateNode>, String> {
796 if agent_id.is_empty() {
797 return Ok(None);
798 }
799 let mut stmt = self
800 .conn
801 .prepare(
802 "SELECT payload FROM ainl_graph_nodes
803 WHERE node_type = 'runtime_state'
804 AND json_extract(payload, '$.node_type.runtime_state.agent_id') = ?1
805 ORDER BY timestamp DESC
806 LIMIT 1",
807 )
808 .map_err(|e| e.to_string())?;
809
810 let payload_opt: Option<String> = stmt
811 .query_row([agent_id], |row| row.get(0))
812 .optional()
813 .map_err(|e| e.to_string())?;
814
815 let Some(payload) = payload_opt else {
816 return Ok(None);
817 };
818
819 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
820 match node.node_type {
821 AinlNodeType::RuntimeState { runtime_state } => Ok(Some(runtime_state)),
822 _ => Err("runtime_state row had unexpected node_type payload".to_string()),
823 }
824 }
825
826 pub fn write_runtime_state(&self, state: &RuntimeStateNode) -> Result<(), String> {
828 let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, state.agent_id.as_bytes());
829 let node = AinlMemoryNode {
830 id,
831 memory_category: MemoryCategory::RuntimeState,
832 importance_score: 0.5,
833 agent_id: state.agent_id.clone(),
834 project_id: None,
835 node_type: AinlNodeType::RuntimeState {
836 runtime_state: state.clone(),
837 },
838 edges: Vec::new(),
839 };
840 self.write_node(&node)
841 }
842
843 pub fn write_node_with_edges(&mut self, node: &AinlMemoryNode) -> Result<(), String> {
845 let tx = self.conn.transaction().map_err(|e| e.to_string())?;
846 for edge in &node.edges {
847 let exists: Option<i32> = tx
848 .query_row(
849 "SELECT 1 FROM ainl_graph_nodes WHERE id = ?1",
850 [edge.target_id.to_string()],
851 |_| Ok(1),
852 )
853 .optional()
854 .map_err(|e| e.to_string())?;
855 if exists.is_none() {
856 return Err(format!(
857 "write_node_with_edges: missing target node {}",
858 edge.target_id
859 ));
860 }
861 }
862 persist_node(&tx, node)?;
863 tx.commit().map_err(|e| e.to_string())?;
864 Ok(())
865 }
866
867 pub fn validate_graph(&self, agent_id: &str) -> Result<GraphValidationReport, String> {
869 self.validate_graph_checked(agent_id)
870 .map_err(|e| e.to_string())
871 }
872
873 pub fn validate_graph_checked(
875 &self,
876 agent_id: &str,
877 ) -> Result<GraphValidationReport, GraphValidationError> {
878 use std::collections::HashSet;
879
880 let agent_nodes = self
881 .agent_node_ids(agent_id)
882 .map_err(GraphValidationError::Sqlite)?;
883 let node_count = agent_nodes.len();
884
885 let mut stmt = self
886 .conn
887 .prepare("SELECT from_id, to_id, label FROM ainl_graph_edges")
888 .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?;
889 let all_edges: Vec<(String, String, String)> = stmt
890 .query_map([], |row| {
891 Ok((
892 row.get::<_, String>(0)?,
893 row.get::<_, String>(1)?,
894 row.get::<_, String>(2)?,
895 ))
896 })
897 .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?
898 .collect::<Result<Vec<_>, _>>()
899 .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?;
900
901 let mut edge_pairs = Vec::new();
902 for (from_id, to_id, label) in all_edges {
903 let touches_agent = agent_nodes.contains(&from_id) || agent_nodes.contains(&to_id);
904 if touches_agent {
905 edge_pairs.push((from_id, to_id, label));
906 }
907 }
908
909 let edge_count = edge_pairs.len();
910 let mut dangling_edges = Vec::new();
911 let mut dangling_edge_details = Vec::new();
912 let mut cross_agent_boundary_edges = 0usize;
913
914 for (from_id, to_id, label) in &edge_pairs {
915 let from_ok = self
916 .node_row_exists(from_id)
917 .map_err(GraphValidationError::Sqlite)?;
918 let to_ok = self
919 .node_row_exists(to_id)
920 .map_err(GraphValidationError::Sqlite)?;
921 if !from_ok || !to_ok {
922 dangling_edges.push((from_id.clone(), to_id.clone()));
923 dangling_edge_details.push(DanglingEdgeDetail {
924 source_id: from_id.clone(),
925 target_id: to_id.clone(),
926 edge_type: label.clone(),
927 });
928 continue;
929 }
930 let fa = agent_nodes.contains(from_id);
931 let ta = agent_nodes.contains(to_id);
932 if fa ^ ta {
933 cross_agent_boundary_edges += 1;
934 }
935 }
936
937 let mut touched: HashSet<String> =
938 HashSet::with_capacity(edge_pairs.len().saturating_mul(2));
939 for (a, b, _) in &edge_pairs {
940 if agent_nodes.contains(a) {
941 touched.insert(a.clone());
942 }
943 if agent_nodes.contains(b) {
944 touched.insert(b.clone());
945 }
946 }
947
948 let mut orphan_nodes = Vec::new();
949 for id in &agent_nodes {
950 if !touched.contains(id) {
951 orphan_nodes.push(id.clone());
952 }
953 }
954
955 let is_valid = dangling_edges.is_empty();
956 Ok(GraphValidationReport {
957 agent_id: agent_id.to_string(),
958 node_count,
959 edge_count,
960 dangling_edges,
961 dangling_edge_details,
962 cross_agent_boundary_edges,
963 orphan_nodes,
964 is_valid,
965 })
966 }
967
968 fn node_row_exists(&self, id: &str) -> Result<bool, String> {
969 let v: Option<i32> = self
970 .conn
971 .query_row("SELECT 1 FROM ainl_graph_nodes WHERE id = ?1", [id], |_| {
972 Ok(1)
973 })
974 .optional()
975 .map_err(|e| e.to_string())?;
976 Ok(v.is_some())
977 }
978
979 fn agent_node_ids(&self, agent_id: &str) -> Result<HashSet<String>, String> {
980 let mut stmt = self
981 .conn
982 .prepare(
983 "SELECT id FROM ainl_graph_nodes
984 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
985 )
986 .map_err(|e| e.to_string())?;
987 let ids = stmt
988 .query_map([agent_id], |row| row.get::<_, String>(0))
989 .map_err(|e| e.to_string())?
990 .collect::<Result<HashSet<_>, _>>()
991 .map_err(|e| e.to_string())?;
992 Ok(ids)
993 }
994
995 pub fn agent_subgraph_edges(&self, agent_id: &str) -> Result<Vec<SnapshotEdge>, String> {
997 let id_set = self.agent_node_ids(agent_id)?;
998 collect_snapshot_edges_for_id_set(&self.conn, &id_set)
999 }
1000
1001 pub fn export_graph(&self, agent_id: &str) -> Result<AgentGraphSnapshot, String> {
1003 let mut stmt = self
1004 .conn
1005 .prepare(
1006 "SELECT payload FROM ainl_graph_nodes
1007 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
1008 )
1009 .map_err(|e| e.to_string())?;
1010 let nodes: Vec<AinlMemoryNode> = stmt
1011 .query_map([agent_id], |row| {
1012 let payload: String = row.get(0)?;
1013 Ok(payload)
1014 })
1015 .map_err(|e| e.to_string())?
1016 .map(|r| {
1017 let payload = r.map_err(|e| e.to_string())?;
1018 serde_json::from_str(&payload).map_err(|e| e.to_string())
1019 })
1020 .collect::<Result<Vec<_>, _>>()?;
1021
1022 let id_set: std::collections::HashSet<String> =
1023 nodes.iter().map(|n| n.id.to_string()).collect();
1024
1025 let edges = collect_snapshot_edges_for_id_set(&self.conn, &id_set)?;
1026
1027 Ok(AgentGraphSnapshot {
1028 agent_id: agent_id.to_string(),
1029 exported_at: Utc::now(),
1030 schema_version: std::borrow::Cow::Borrowed(SNAPSHOT_SCHEMA_VERSION),
1031 nodes,
1032 edges,
1033 })
1034 }
1035
1036 pub fn import_graph(
1045 &mut self,
1046 snapshot: &AgentGraphSnapshot,
1047 allow_dangling_edges: bool,
1048 ) -> Result<(), String> {
1049 self.import_graph_checked(snapshot, allow_dangling_edges)
1050 .map_err(|e| e.to_string())
1051 }
1052
1053 pub fn import_graph_checked(
1055 &mut self,
1056 snapshot: &AgentGraphSnapshot,
1057 allow_dangling_edges: bool,
1058 ) -> Result<(), SnapshotImportError> {
1059 if snapshot.schema_version.as_ref() != SNAPSHOT_SCHEMA_VERSION {
1060 return Err(SnapshotImportError::UnsupportedSchemaVersion {
1061 got: snapshot.schema_version.to_string(),
1062 expected: SNAPSHOT_SCHEMA_VERSION,
1063 });
1064 }
1065
1066 if allow_dangling_edges {
1067 self.conn
1068 .execute_batch("PRAGMA foreign_keys = OFF;")
1069 .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1070 }
1071
1072 let result: Result<(), SnapshotImportError> = (|| {
1073 let tx = self
1074 .conn
1075 .transaction()
1076 .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1077 for node in &snapshot.nodes {
1078 try_insert_node_ignore(&tx, node).map_err(SnapshotImportError::Sqlite)?;
1079 }
1080 for edge in &snapshot.edges {
1081 try_insert_edge_ignore(&tx, edge).map_err(SnapshotImportError::Sqlite)?;
1082 }
1083 tx.commit()
1084 .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1085 Ok(())
1086 })();
1087
1088 if allow_dangling_edges {
1089 self.conn
1090 .execute_batch("PRAGMA foreign_keys = ON;")
1091 .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1092 }
1093
1094 result
1095 }
1096
1097 pub fn insert_trajectory_detail(&self, row: &TrajectoryDetailRecord) -> Result<(), String> {
1099 let steps_json = serde_json::to_string(&row.steps).map_err(|e| e.to_string())?;
1100 let outcome_json = serde_json::to_string(&row.outcome).map_err(|e| e.to_string())?;
1101 let frame_s = match &row.frame_vars {
1102 None => None,
1103 Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
1104 };
1105 self.conn
1106 .execute(
1107 "INSERT OR REPLACE INTO ainl_trajectories (
1108 id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1109 recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1110 frame_vars_json, fitness_delta
1111 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1112 rusqlite::params![
1113 row.id.to_string(),
1114 row.episode_id.to_string(),
1115 row.graph_trajectory_node_id.map(|u| u.to_string()),
1116 row.agent_id,
1117 row.session_id,
1118 row.project_id,
1119 row.recorded_at,
1120 outcome_json,
1121 row.ainl_source_hash,
1122 row.duration_ms as i64,
1123 steps_json,
1124 frame_s,
1125 row.fitness_delta,
1126 ],
1127 )
1128 .map_err(|e| e.to_string())?;
1129 Ok(())
1130 }
1131
1132 pub fn list_trajectories_for_agent(
1134 &self,
1135 agent_id: &str,
1136 limit: usize,
1137 since_timestamp: Option<i64>,
1138 ) -> Result<Vec<TrajectoryDetailRecord>, String> {
1139 let cap = limit.clamp(1, 500) as i64;
1140 let sql = if since_timestamp.is_some() {
1141 "SELECT id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1142 recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1143 frame_vars_json, fitness_delta
1144 FROM ainl_trajectories
1145 WHERE agent_id = ?1 AND recorded_at >= ?2
1146 ORDER BY recorded_at DESC
1147 LIMIT ?3"
1148 } else {
1149 "SELECT id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1150 recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1151 frame_vars_json, fitness_delta
1152 FROM ainl_trajectories
1153 WHERE agent_id = ?1
1154 ORDER BY recorded_at DESC
1155 LIMIT ?2"
1156 };
1157
1158 let mut stmt = self.conn.prepare(sql).map_err(|e| e.to_string())?;
1159 let rows = if let Some(since) = since_timestamp {
1160 stmt.query_map(rusqlite::params![agent_id, since, cap], map_trajectory_row)
1161 } else {
1162 stmt.query_map(rusqlite::params![agent_id, cap], map_trajectory_row)
1163 }
1164 .map_err(|e| e.to_string())?;
1165
1166 let mut out = Vec::new();
1167 for row in rows {
1168 out.push(row.map_err(|e| e.to_string())?);
1169 }
1170 Ok(out)
1171 }
1172
1173 pub fn count_trajectory_details_before(
1175 &self,
1176 agent_id: &str,
1177 before_unix: i64,
1178 ) -> Result<usize, String> {
1179 if agent_id.trim().is_empty() {
1180 return Err("agent_id is empty".into());
1181 }
1182 let n: i64 = self
1183 .conn
1184 .query_row(
1185 "SELECT COUNT(*) FROM ainl_trajectories WHERE agent_id = ?1 AND recorded_at < ?2",
1186 rusqlite::params![agent_id, before_unix],
1187 |r| r.get(0),
1188 )
1189 .map_err(|e| e.to_string())?;
1190 Ok(n as usize)
1191 }
1192
1193 pub fn delete_trajectory_details_before(
1199 &self,
1200 agent_id: &str,
1201 before_unix: i64,
1202 ) -> Result<usize, String> {
1203 if agent_id.trim().is_empty() {
1204 return Err("agent_id is empty".into());
1205 }
1206 let n = self
1207 .conn
1208 .execute(
1209 "DELETE FROM ainl_trajectories WHERE agent_id = ?1 AND recorded_at < ?2",
1210 rusqlite::params![agent_id, before_unix],
1211 )
1212 .map_err(|e| e.to_string())?;
1213 Ok(n)
1214 }
1215
1216 pub fn search_failures_fts_for_agent(
1220 &self,
1221 agent_id: &str,
1222 query: &str,
1223 limit: usize,
1224 ) -> Result<Vec<AinlMemoryNode>, String> {
1225 let fts_q = fts5_prefix_match_query(query);
1226 if fts_q.is_empty() || agent_id.trim().is_empty() {
1227 return Ok(Vec::new());
1228 }
1229 let cap = limit.clamp(1, 200) as i64;
1230 let mut stmt = self
1231 .conn
1232 .prepare(
1233 "SELECT n.payload
1234 FROM ainl_failures_fts AS f
1235 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1236 WHERE n.node_type = 'failure'
1237 AND json_extract(n.payload, '$.agent_id') = ?1
1238 AND f.body MATCH ?2
1239 ORDER BY n.timestamp DESC
1240 LIMIT ?3",
1241 )
1242 .map_err(|e| e.to_string())?;
1243
1244 let rows = stmt.query_map(rusqlite::params![agent_id, fts_q, cap], |row| {
1245 let payload: String = row.get(0)?;
1246 Ok(payload)
1247 });
1248
1249 let mut out = Vec::new();
1250 let rows = match rows {
1251 Ok(r) => r,
1252 Err(e) => {
1253 let msg = e.to_string();
1254 if msg.contains("fts5") || msg.to_ascii_lowercase().contains("syntax") {
1255 return Ok(Vec::new());
1256 }
1257 return Err(msg);
1258 }
1259 };
1260 for row in rows {
1261 match row {
1262 Ok(payload) => {
1263 if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1264 out.push(node);
1265 }
1266 }
1267 Err(e) => {
1268 let msg = e.to_string();
1269 if msg.contains("fts5") || msg.to_ascii_lowercase().contains("syntax") {
1270 return Ok(Vec::new());
1271 }
1272 return Err(msg);
1273 }
1274 }
1275 }
1276 Ok(out)
1277 }
1278
1279 pub fn search_all_nodes_fts_for_agent(
1285 &self,
1286 agent_id: &str,
1287 query: &str,
1288 project_id: Option<&str>,
1289 limit: usize,
1290 ) -> Result<Vec<AinlMemoryNode>, String> {
1291 let fts_q = fts5_prefix_match_query(query);
1292 if fts_q.is_empty() || agent_id.trim().is_empty() {
1293 return Ok(Vec::new());
1294 }
1295 let cap = limit.clamp(1, 200) as i64;
1296 let project_filter = project_id
1297 .map(str::trim)
1298 .filter(|s| !s.is_empty());
1299 let mut out = Vec::new();
1300 if let Some(p) = project_filter {
1301 let mut stmt = self
1302 .conn
1303 .prepare(
1304 "SELECT n.payload
1305 FROM ainl_nodes_fts AS f
1306 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1307 WHERE f.agent_id = ?1
1308 AND (COALESCE(f.project_id, '') = '' OR f.project_id = ?3)
1309 AND f.body MATCH ?2
1310 ORDER BY n.timestamp DESC
1311 LIMIT ?4",
1312 )
1313 .map_err(|e| e.to_string())?;
1314 let mut rows = stmt
1315 .query(rusqlite::params![agent_id, fts_q, p, cap])
1316 .map_err(|e| e.to_string())?;
1317 while let Some(row) = rows.next().map_err(|e| e.to_string())? {
1318 let payload: String = row.get(0).map_err(|e| e.to_string())?;
1319 if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1320 out.push(node);
1321 }
1322 }
1323 } else {
1324 let mut stmt = self
1325 .conn
1326 .prepare(
1327 "SELECT n.payload
1328 FROM ainl_nodes_fts AS f
1329 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1330 WHERE f.agent_id = ?1
1331 AND f.body MATCH ?2
1332 ORDER BY n.timestamp DESC
1333 LIMIT ?3",
1334 )
1335 .map_err(|e| e.to_string())?;
1336 let mut rows = stmt
1337 .query(rusqlite::params![agent_id, fts_q, cap])
1338 .map_err(|e| e.to_string())?;
1339 while let Some(row) = rows.next().map_err(|e| e.to_string())? {
1340 let payload: String = row.get(0).map_err(|e| e.to_string())?;
1341 if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1342 out.push(node);
1343 }
1344 }
1345 }
1346 Ok(out)
1347 }
1348}
1349
1350fn map_trajectory_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<TrajectoryDetailRecord> {
1351 let id_s: String = row.get(0)?;
1352 let episode_s: String = row.get(1)?;
1353 let graph_traj: Option<String> = row.get(2)?;
1354 let agent_id: String = row.get(3)?;
1355 let session_id: String = row.get(4)?;
1356 let project_id: Option<String> = row.get(5)?;
1357 let recorded_at: i64 = row.get(6)?;
1358 let outcome_json: String = row.get(7)?;
1359 let hash: Option<String> = row.get(8)?;
1360 let duration_ms: i64 = row.get(9)?;
1361 let steps_json: String = row.get(10)?;
1362 let frame_vars_json: Option<String> = row.get(11)?;
1363 let fitness_sql: Option<f64> = row.get(12)?;
1364 let id = Uuid::parse_str(&id_s).map_err(|_| {
1365 rusqlite::Error::InvalidColumnType(0, "id".into(), rusqlite::types::Type::Text)
1366 })?;
1367 let episode_id = Uuid::parse_str(&episode_s).map_err(|_| {
1368 rusqlite::Error::InvalidColumnType(1, "episode_id".into(), rusqlite::types::Type::Text)
1369 })?;
1370 let graph_trajectory_node_id = graph_traj
1371 .filter(|s| !s.is_empty())
1372 .map(|s| Uuid::parse_str(&s))
1373 .transpose()
1374 .map_err(|_| {
1375 rusqlite::Error::InvalidColumnType(2, "graph_trajectory_node_id".into(), rusqlite::types::Type::Text)
1376 })?;
1377 let outcome: TrajectoryOutcome =
1378 serde_json::from_str(&outcome_json).map_err(|_| {
1379 rusqlite::Error::InvalidColumnType(7, "outcome_json".into(), rusqlite::types::Type::Text)
1380 })?;
1381 let steps: Vec<TrajectoryStep> = serde_json::from_str(&steps_json)
1382 .map_err(|_| {
1383 rusqlite::Error::InvalidColumnType(10, "steps_json".into(), rusqlite::types::Type::Text)
1384 })?;
1385 let frame_vars = frame_vars_json
1386 .filter(|s| !s.trim().is_empty())
1387 .and_then(|s| serde_json::from_str(&s).ok());
1388 let fitness_delta = fitness_sql.map(|f| f as f32);
1389 Ok(TrajectoryDetailRecord {
1390 id,
1391 episode_id,
1392 graph_trajectory_node_id,
1393 agent_id,
1394 session_id,
1395 project_id,
1396 recorded_at,
1397 outcome,
1398 ainl_source_hash: hash,
1399 duration_ms: duration_ms.max(0) as u64,
1400 steps,
1401 frame_vars,
1402 fitness_delta,
1403 })
1404}
1405
1406impl GraphStore for SqliteGraphStore {
1407 fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String> {
1410 persist_node(&self.conn, node)
1411 }
1412
1413 fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String> {
1414 let payload: Option<String> = self
1415 .conn
1416 .query_row(
1417 "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
1418 [id.to_string()],
1419 |row| row.get::<_, String>(0),
1420 )
1421 .optional()
1422 .map_err(|e: rusqlite::Error| e.to_string())?;
1423
1424 match payload {
1425 Some(p) => {
1426 let node: AinlMemoryNode = serde_json::from_str(&p).map_err(|e| e.to_string())?;
1427 Ok(Some(node))
1428 }
1429 None => Ok(None),
1430 }
1431 }
1432
1433 fn query_episodes_since(
1434 &self,
1435 since_timestamp: i64,
1436 limit: usize,
1437 ) -> Result<Vec<AinlMemoryNode>, String> {
1438 let mut stmt = self
1439 .conn
1440 .prepare(
1441 "SELECT payload FROM ainl_graph_nodes
1442 WHERE node_type = 'episode' AND timestamp >= ?1
1443 ORDER BY timestamp DESC
1444 LIMIT ?2",
1445 )
1446 .map_err(|e| e.to_string())?;
1447
1448 let rows = stmt
1449 .query_map([since_timestamp, limit as i64], |row| {
1450 let payload: String = row.get(0)?;
1451 Ok(payload)
1452 })
1453 .map_err(|e| e.to_string())?;
1454
1455 let mut nodes = Vec::new();
1456 for row in rows {
1457 let payload = row.map_err(|e| e.to_string())?;
1458 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
1459 nodes.push(node);
1460 }
1461
1462 Ok(nodes)
1463 }
1464
1465 fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String> {
1466 let mut stmt = self
1467 .conn
1468 .prepare(
1469 "SELECT payload FROM ainl_graph_nodes
1470 WHERE node_type = ?1
1471 ORDER BY timestamp DESC",
1472 )
1473 .map_err(|e| e.to_string())?;
1474
1475 let rows = stmt
1476 .query_map([type_name], |row| {
1477 let payload: String = row.get(0)?;
1478 Ok(payload)
1479 })
1480 .map_err(|e| e.to_string())?;
1481
1482 let mut nodes = Vec::new();
1483 for row in rows {
1484 let payload = row.map_err(|e| e.to_string())?;
1485 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
1486 nodes.push(node);
1487 }
1488
1489 Ok(nodes)
1490 }
1491
1492 fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
1493 let mut stmt = self
1494 .conn
1495 .prepare(
1496 "SELECT to_id FROM ainl_graph_edges
1497 WHERE from_id = ?1 AND label = ?2",
1498 )
1499 .map_err(|e| e.to_string())?;
1500
1501 let target_ids: Vec<String> = stmt
1502 .query_map([from_id.to_string(), label.to_string()], |row| row.get(0))
1503 .map_err(|e| e.to_string())?
1504 .collect::<Result<Vec<_>, _>>()
1505 .map_err(|e| e.to_string())?;
1506
1507 let mut nodes = Vec::new();
1508 for target_id in target_ids {
1509 let id = Uuid::parse_str(&target_id).map_err(|e| e.to_string())?;
1510 if let Some(node) = self.read_node(id)? {
1511 nodes.push(node);
1512 }
1513 }
1514
1515 Ok(nodes)
1516 }
1517}