1use crate::node::{AinlMemoryNode, AinlNodeType, MemoryCategory, RuntimeStateNode};
30use crate::snapshot::{
31 AgentGraphSnapshot, DanglingEdgeDetail, GraphValidationReport, SnapshotEdge,
32 SNAPSHOT_SCHEMA_VERSION,
33};
34use crate::trajectory_table::TrajectoryDetailRecord;
35use ainl_contracts::{TrajectoryOutcome, TrajectoryStep};
36use chrono::Utc;
37use rusqlite::OptionalExtension;
38use std::collections::HashSet;
39use uuid::Uuid;
40
41#[derive(Debug, Clone)]
43pub enum SnapshotImportError {
44 UnsupportedSchemaVersion { got: String, expected: &'static str },
45 Sqlite(String),
46}
47
48impl std::fmt::Display for SnapshotImportError {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 Self::UnsupportedSchemaVersion { got, expected } => write!(
52 f,
53 "unsupported snapshot schema_version '{got}'; expected '{expected}'"
54 ),
55 Self::Sqlite(e) => write!(f, "{e}"),
56 }
57 }
58}
59
60#[derive(Debug, Clone)]
62pub enum GraphValidationError {
63 Sqlite(String),
64}
65
66impl std::fmt::Display for GraphValidationError {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 Self::Sqlite(e) => write!(f, "{e}"),
70 }
71 }
72}
73
74pub trait GraphStore {
76 fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String>;
78
79 fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String>;
81
82 fn query_episodes_since(
84 &self,
85 since_timestamp: i64,
86 limit: usize,
87 ) -> Result<Vec<AinlMemoryNode>, String>;
88
89 fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String>;
91
92 fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
94
95 fn walk_edges_to(&self, to_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
97}
98
99pub struct SqliteGraphStore {
105 conn: rusqlite::Connection,
106}
107
108fn enable_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
109 conn.execute_batch("PRAGMA foreign_keys = ON;")
110}
111
112fn migrate_edge_columns(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
113 let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_edges)")?;
114 let cols = stmt
115 .query_map([], |row| row.get::<_, String>(1))?
116 .collect::<Result<Vec<_>, _>>()?;
117 if !cols.iter().any(|c| c == "weight") {
118 conn.execute(
119 "ALTER TABLE ainl_graph_edges ADD COLUMN weight REAL NOT NULL DEFAULT 1.0",
120 [],
121 )?;
122 }
123 if !cols.iter().any(|c| c == "metadata") {
124 conn.execute("ALTER TABLE ainl_graph_edges ADD COLUMN metadata TEXT", [])?;
125 }
126 Ok(())
127}
128
129fn edges_table_has_foreign_keys(conn: &rusqlite::Connection) -> Result<bool, rusqlite::Error> {
131 let exists: i64 = conn.query_row(
132 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
133 [],
134 |r| r.get(0),
135 )?;
136 if exists == 0 {
137 return Ok(false);
138 }
139 let n: i64 = conn.query_row(
140 "SELECT COUNT(*) FROM pragma_foreign_key_list('ainl_graph_edges')",
141 [],
142 |r| r.get(0),
143 )?;
144 Ok(n > 0)
145}
146
147fn migrate_edges_add_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
150 if edges_table_has_foreign_keys(conn)? {
151 return Ok(());
152 }
153
154 let exists: i64 = conn.query_row(
155 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
156 [],
157 |r| r.get(0),
158 )?;
159 if exists == 0 {
160 return Ok(());
161 }
162
163 conn.execute("BEGIN IMMEDIATE", [])?;
164 let res: Result<(), rusqlite::Error> = (|| {
165 conn.execute("DROP INDEX IF EXISTS idx_ainl_edges_from", [])?;
166 conn.execute(
167 "ALTER TABLE ainl_graph_edges RENAME TO ainl_graph_edges__old",
168 [],
169 )?;
170 conn.execute(
171 r#"CREATE TABLE ainl_graph_edges (
172 from_id TEXT NOT NULL,
173 to_id TEXT NOT NULL,
174 label TEXT NOT NULL,
175 weight REAL NOT NULL DEFAULT 1.0,
176 metadata TEXT,
177 PRIMARY KEY (from_id, to_id, label),
178 FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
179 FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
180 )"#,
181 [],
182 )?;
183 conn.execute(
184 r#"INSERT INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
185 SELECT o.from_id, o.to_id, o.label,
186 COALESCE(o.weight, 1.0),
187 o.metadata
188 FROM ainl_graph_edges__old o
189 WHERE EXISTS (SELECT 1 FROM ainl_graph_nodes n WHERE n.id = o.from_id)
190 AND EXISTS (SELECT 1 FROM ainl_graph_nodes n2 WHERE n2.id = o.to_id)"#,
191 [],
192 )?;
193 conn.execute("DROP TABLE ainl_graph_edges__old", [])?;
194 conn.execute(
195 "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from ON ainl_graph_edges(from_id, label)",
196 [],
197 )?;
198 Ok(())
199 })();
200
201 match res {
202 Ok(()) => {
203 conn.execute("COMMIT", [])?;
204 }
205 Err(e) => {
206 let _ = conn.execute("ROLLBACK", []);
207 return Err(e);
208 }
209 }
210 Ok(())
211}
212
213fn node_type_name(node: &AinlMemoryNode) -> &'static str {
214 match &node.node_type {
215 AinlNodeType::Episode { .. } => "episode",
216 AinlNodeType::Semantic { .. } => "semantic",
217 AinlNodeType::Procedural { .. } => "procedural",
218 AinlNodeType::Persona { .. } => "persona",
219 AinlNodeType::RuntimeState { .. } => "runtime_state",
220 AinlNodeType::Trajectory { .. } => "trajectory",
221 AinlNodeType::Failure { .. } => "failure",
222 AinlNodeType::Mission { .. } => "mission",
223 AinlNodeType::Feature { .. } => "feature",
224 AinlNodeType::Assertion { .. } => "assertion",
225 AinlNodeType::Handoff { .. } => "handoff",
226 AinlNodeType::Extension { .. } => "extension",
227 }
228}
229
230fn node_timestamp(node: &AinlMemoryNode) -> i64 {
231 match &node.node_type {
232 AinlNodeType::Episode { episodic } => episodic.timestamp,
233 AinlNodeType::RuntimeState { runtime_state } => runtime_state.updated_at,
234 AinlNodeType::Trajectory { trajectory } => trajectory.recorded_at,
235 AinlNodeType::Failure { failure } => failure.recorded_at,
236 AinlNodeType::Mission { mission } => mission.created_at.timestamp(),
237 _ => chrono::Utc::now().timestamp(),
238 }
239}
240
241fn failure_fts_body(node: &AinlMemoryNode) -> Option<String> {
242 match &node.node_type {
243 AinlNodeType::Failure { failure } => Some(format!(
244 "{} {} {} {} {}",
245 failure.source,
246 failure.tool_name.as_deref().unwrap_or(""),
247 failure.source_namespace.as_deref().unwrap_or(""),
248 failure.source_tool.as_deref().unwrap_or(""),
249 failure.message
250 )),
251 _ => None,
252 }
253}
254
255fn mission_substrate_fts_body(node: &AinlMemoryNode) -> Option<String> {
257 match &node.node_type {
258 AinlNodeType::Mission { mission } => Some(format!(
259 "{} {} {:?} {:?}",
260 mission.mission_id.as_str(),
261 mission.objective_md,
262 mission.state,
263 mission.milestone_ids,
264 )),
265 AinlNodeType::Feature { feature } => Some(format!(
266 "{} {} {:?} {} {:?} {:?}",
267 feature.feature_id.as_str(),
268 feature.description,
269 feature.status,
270 feature.skill_name.as_deref().unwrap_or(""),
271 feature.touches_files,
272 feature.preconditions.iter().map(|p| p.as_str()).collect::<Vec<_>>(),
273 )),
274 AinlNodeType::Handoff { handoff } => Some(format!(
275 "{} {} {} {} {} {:?}",
276 handoff.feature_id.as_str(),
277 handoff.agent_id,
278 handoff.salient_summary,
279 handoff.what_was_implemented_md,
280 handoff.what_was_left_undone_md,
281 handoff.discovered_issues
282 .iter()
283 .map(|i| i.description.as_str())
284 .collect::<Vec<_>>(),
285 )),
286 AinlNodeType::Assertion { assertion } => Some(format!(
287 "{} {} {:?} {:?}",
288 assertion.assertion_id.as_str(),
289 assertion.description,
290 assertion.state,
291 assertion.verification_steps,
292 )),
293 _ => None,
294 }
295}
296
297fn fts5_prefix_match_query(raw: &str) -> String {
299 raw.split_whitespace()
300 .filter(|t| !t.is_empty())
301 .filter_map(|t| {
302 let esc: String = t.chars().filter(|c| !c.is_control() && *c != '"').collect();
303 if esc.is_empty() {
304 return None;
305 }
306 Some(format!("\"{esc}*\""))
307 })
308 .collect::<Vec<_>>()
309 .join(" AND ")
310}
311
312fn sync_failure_fts_insert(
313 conn: &rusqlite::Connection,
314 node_id: &str,
315 body: &str,
316) -> Result<(), String> {
317 conn.execute(
318 "DELETE FROM ainl_failures_fts WHERE node_id = ?1",
319 [node_id],
320 )
321 .map_err(|e| e.to_string())?;
322 conn.execute(
323 "INSERT INTO ainl_failures_fts(node_id, body) VALUES (?1, ?2)",
324 rusqlite::params![node_id, body],
325 )
326 .map_err(|e| e.to_string())?;
327 Ok(())
328}
329
330fn graph_node_fts_body_from_payload_json(payload: &str) -> String {
332 if payload.chars().count() > 400_000 {
333 payload.chars().take(400_000).collect()
334 } else {
335 payload.to_string()
336 }
337}
338
339fn sync_all_nodes_fts_insert(
340 conn: &rusqlite::Connection,
341 node_id: &str,
342 agent_id: &str,
343 project_id: Option<&str>,
344 body: &str,
345) -> Result<(), String> {
346 let proj = project_id.map(str::trim).filter(|s| !s.is_empty());
347 conn.execute("DELETE FROM ainl_nodes_fts WHERE node_id = ?1", [node_id])
348 .map_err(|e| e.to_string())?;
349 conn.execute(
350 "INSERT INTO ainl_nodes_fts(node_id, agent_id, project_id, body) VALUES (?1, ?2, ?3, ?4)",
351 rusqlite::params![node_id, agent_id, proj, body],
352 )
353 .map_err(|e| e.to_string())?;
354 Ok(())
355}
356
357fn persist_edge(
358 conn: &rusqlite::Connection,
359 from_id: Uuid,
360 to_id: Uuid,
361 label: &str,
362 weight: f32,
363 metadata: Option<&str>,
364) -> Result<(), String> {
365 conn.execute(
366 "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
367 VALUES (?1, ?2, ?3, ?4, ?5)",
368 rusqlite::params![
369 from_id.to_string(),
370 to_id.to_string(),
371 label,
372 weight,
373 metadata
374 ],
375 )
376 .map_err(|e| e.to_string())?;
377 Ok(())
378}
379
380fn collect_snapshot_edges_for_id_set(
382 conn: &rusqlite::Connection,
383 id_set: &HashSet<String>,
384) -> Result<Vec<SnapshotEdge>, String> {
385 let mut edge_stmt = conn
386 .prepare("SELECT from_id, to_id, label, weight, metadata FROM ainl_graph_edges")
387 .map_err(|e| e.to_string())?;
388 let edge_rows = edge_stmt
389 .query_map([], |row| {
390 Ok((
391 row.get::<_, String>(0)?,
392 row.get::<_, String>(1)?,
393 row.get::<_, String>(2)?,
394 row.get::<_, f64>(3)?,
395 row.get::<_, Option<String>>(4)?,
396 ))
397 })
398 .map_err(|e| e.to_string())?
399 .collect::<Result<Vec<_>, _>>()
400 .map_err(|e| e.to_string())?;
401
402 let mut edges = Vec::new();
403 for (from_id, to_id, label, weight, meta) in edge_rows {
404 if !id_set.contains(&from_id) || !id_set.contains(&to_id) {
405 continue;
406 }
407 let source_id = Uuid::parse_str(&from_id).map_err(|e| e.to_string())?;
408 let target_id = Uuid::parse_str(&to_id).map_err(|e| e.to_string())?;
409 let metadata = match meta {
410 Some(s) if !s.is_empty() => Some(serde_json::from_str(&s).map_err(|e| e.to_string())?),
411 _ => None,
412 };
413 edges.push(SnapshotEdge {
414 source_id,
415 target_id,
416 edge_type: label,
417 weight: weight as f32,
418 metadata,
419 });
420 }
421 Ok(edges)
422}
423
424fn persist_node(conn: &rusqlite::Connection, node: &AinlMemoryNode) -> Result<(), String> {
425 let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
426 let type_name = node_type_name(node);
427 let timestamp = node_timestamp(node);
428 let proj = node
429 .project_id
430 .as_deref()
431 .map(str::trim)
432 .filter(|s| !s.is_empty());
433 let mission_id = node.mission_id_key();
434 let feature_id = node.feature_id_key();
435
436 conn.execute(
437 "INSERT OR REPLACE INTO ainl_graph_nodes (id, node_type, payload, timestamp, project_id, mission_id, feature_id)
438 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
439 rusqlite::params![
440 node.id.to_string(),
441 type_name,
442 payload,
443 timestamp,
444 proj,
445 mission_id,
446 feature_id,
447 ],
448 )
449 .map_err(|e| e.to_string())?;
450
451 for edge in &node.edges {
452 persist_edge(
453 conn,
454 node.id,
455 edge.target_id,
456 &edge.label,
457 1.0,
458 None::<&str>,
459 )?;
460 }
461
462 let body_all = mission_substrate_fts_body(node)
463 .unwrap_or_else(|| graph_node_fts_body_from_payload_json(&payload));
464 if !node.agent_id.trim().is_empty() {
465 let _ = sync_all_nodes_fts_insert(
466 conn,
467 &node.id.to_string(),
468 node.agent_id.as_str(),
469 proj,
470 &body_all,
471 );
472 }
473
474 if let Some(body) = failure_fts_body(node) {
475 let _ = sync_failure_fts_insert(conn, &node.id.to_string(), &body);
477 }
478
479 Ok(())
480}
481
482fn try_insert_node_ignore(
483 conn: &rusqlite::Connection,
484 node: &AinlMemoryNode,
485) -> Result<(), String> {
486 let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
487 let type_name = node_type_name(node);
488 let timestamp = node_timestamp(node);
489 let proj = node
490 .project_id
491 .as_deref()
492 .map(str::trim)
493 .filter(|s| !s.is_empty());
494 let mission_id = node.mission_id_key();
495 let feature_id = node.feature_id_key();
496 let n = conn
497 .execute(
498 "INSERT OR IGNORE INTO ainl_graph_nodes (id, node_type, payload, timestamp, project_id, mission_id, feature_id)
499 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
500 rusqlite::params![
501 node.id.to_string(),
502 type_name,
503 payload,
504 timestamp,
505 proj,
506 mission_id,
507 feature_id,
508 ],
509 )
510 .map_err(|e| e.to_string())?;
511 if n > 0 {
512 if !node.agent_id.trim().is_empty() {
513 let body_all = mission_substrate_fts_body(node)
514 .unwrap_or_else(|| graph_node_fts_body_from_payload_json(&payload));
515 let _ = sync_all_nodes_fts_insert(
516 conn,
517 &node.id.to_string(),
518 node.agent_id.as_str(),
519 proj,
520 &body_all,
521 );
522 }
523 if let Some(body) = failure_fts_body(node) {
524 let _ = sync_failure_fts_insert(conn, &node.id.to_string(), &body);
525 }
526 }
527 Ok(())
528}
529
530fn try_insert_edge_ignore(conn: &rusqlite::Connection, edge: &SnapshotEdge) -> Result<(), String> {
531 let meta = match &edge.metadata {
532 Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
533 None => None,
534 };
535 conn.execute(
536 "INSERT OR IGNORE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
537 VALUES (?1, ?2, ?3, ?4, ?5)",
538 rusqlite::params![
539 edge.source_id.to_string(),
540 edge.target_id.to_string(),
541 edge.edge_type,
542 edge.weight,
543 meta.as_deref(),
544 ],
545 )
546 .map_err(|e| e.to_string())?;
547 Ok(())
548}
549
550fn migrate_failures_fts_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
551 conn.execute(
552 "CREATE VIRTUAL TABLE IF NOT EXISTS ainl_failures_fts USING fts5(
553 node_id UNINDEXED,
554 body,
555 tokenize = 'unicode61 remove_diacritics 1'
556 )",
557 [],
558 )?;
559 Ok(())
560}
561
562fn migrate_ainl_graph_nodes_add_mission_columns(
563 conn: &rusqlite::Connection,
564) -> Result<(), rusqlite::Error> {
565 let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_nodes)")?;
566 let cols = stmt
567 .query_map([], |row| row.get::<_, String>(1))?
568 .collect::<Result<Vec<_>, _>>()?;
569 if !cols.iter().any(|c| c == "mission_id") {
570 conn.execute(
571 "ALTER TABLE ainl_graph_nodes ADD COLUMN mission_id TEXT",
572 [],
573 )?;
574 }
575 if !cols.iter().any(|c| c == "feature_id") {
576 conn.execute(
577 "ALTER TABLE ainl_graph_nodes ADD COLUMN feature_id TEXT",
578 [],
579 )?;
580 }
581 conn.execute(
582 "CREATE INDEX IF NOT EXISTS idx_nodes_mission_feature
583 ON ainl_graph_nodes(mission_id, feature_id, node_type, timestamp DESC)",
584 [],
585 )?;
586 Ok(())
587}
588
589fn migrate_ainl_graph_nodes_add_project_id(
590 conn: &rusqlite::Connection,
591) -> Result<(), rusqlite::Error> {
592 let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_nodes)")?;
593 let cols = stmt
594 .query_map([], |row| row.get::<_, String>(1))?
595 .collect::<Result<Vec<_>, _>>()?;
596 if !cols.iter().any(|c| c == "project_id") {
597 conn.execute(
598 "ALTER TABLE ainl_graph_nodes ADD COLUMN project_id TEXT",
599 [],
600 )?;
601 }
602 Ok(())
603}
604
605fn migrate_ainl_nodes_fts_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
606 conn.execute(
607 "CREATE VIRTUAL TABLE IF NOT EXISTS ainl_nodes_fts USING fts5(
608 node_id UNINDEXED,
609 agent_id UNINDEXED,
610 project_id UNINDEXED,
611 body,
612 tokenize = 'unicode61 remove_diacritics 1'
613 )",
614 [],
615 )?;
616 Ok(())
617}
618
619fn install_ainl_graph_node_delete_fts_triggers(
621 conn: &rusqlite::Connection,
622) -> Result<(), rusqlite::Error> {
623 conn.execute_batch(
624 "DROP TRIGGER IF EXISTS ainl_graph_nodes_after_delete_fts;
625 CREATE TRIGGER ainl_graph_nodes_after_delete_fts
626 AFTER DELETE ON ainl_graph_nodes
627 FOR EACH ROW
628 BEGIN
629 DELETE FROM ainl_nodes_fts WHERE node_id = OLD.id;
630 DELETE FROM ainl_failures_fts WHERE node_id = OLD.id;
631 END;",
632 )?;
633 Ok(())
634}
635
636fn backfill_ainl_nodes_fts_if_empty(conn: &rusqlite::Connection) -> Result<(), String> {
638 let fts_n: i64 = conn
639 .query_row("SELECT COUNT(*) FROM ainl_nodes_fts", [], |r| r.get(0))
640 .unwrap_or(0);
641 if fts_n > 0 {
642 return Ok(());
643 }
644 let mut stmt = conn
645 .prepare(
646 "SELECT id, payload, project_id FROM ainl_graph_nodes
647 WHERE TRIM(COALESCE(json_extract(payload, '$.agent_id'), '')) != ''",
648 )
649 .map_err(|e| e.to_string())?;
650 let rows = stmt
651 .query_map([], |row| {
652 Ok((
653 row.get::<_, String>(0)?,
654 row.get::<_, String>(1)?,
655 row.get::<_, Option<String>>(2)?,
656 ))
657 })
658 .map_err(|e| e.to_string())?
659 .collect::<Result<Vec<_>, _>>()
660 .map_err(|e| e.to_string())?;
661 for (id, payload, col_proj) in rows {
662 let v: serde_json::Value = match serde_json::from_str(&payload) {
663 Ok(x) => x,
664 Err(_) => continue,
665 };
666 let ag = v
667 .get("agent_id")
668 .and_then(|x| x.as_str())
669 .map(str::trim)
670 .unwrap_or("");
671 if ag.is_empty() {
672 continue;
673 }
674 let json_proj = v
675 .get("project_id")
676 .and_then(|x| x.as_str())
677 .map(str::trim)
678 .filter(|s| !s.is_empty());
679 let proj = col_proj
680 .as_deref()
681 .map(str::trim)
682 .filter(|s| !s.is_empty())
683 .or(json_proj);
684 let body = graph_node_fts_body_from_payload_json(&payload);
685 if sync_all_nodes_fts_insert(conn, &id, ag, proj, &body).is_err() {
686 }
688 }
689 Ok(())
690}
691
692fn migrate_trajectories_v1(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
693 conn.execute(
694 "CREATE TABLE IF NOT EXISTS ainl_trajectories (
695 id TEXT PRIMARY KEY,
696 episode_id TEXT NOT NULL,
697 graph_trajectory_node_id TEXT,
698 agent_id TEXT NOT NULL,
699 session_id TEXT NOT NULL,
700 project_id TEXT,
701 recorded_at INTEGER NOT NULL,
702 outcome_json TEXT NOT NULL,
703 ainl_source_hash TEXT,
704 duration_ms INTEGER NOT NULL DEFAULT 0,
705 steps_json TEXT NOT NULL,
706 FOREIGN KEY (episode_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
707 )",
708 [],
709 )?;
710 conn.execute(
711 "CREATE INDEX IF NOT EXISTS idx_ainl_traj_agent_time
712 ON ainl_trajectories(agent_id, recorded_at DESC)",
713 [],
714 )?;
715 Ok(())
716}
717
718fn migrate_ainl_trajectories_add_depth_v1(
719 conn: &rusqlite::Connection,
720) -> Result<(), rusqlite::Error> {
721 let mut stmt = conn.prepare("PRAGMA table_info(ainl_trajectories)")?;
722 let cols: Vec<String> = stmt
723 .query_map([], |row| row.get::<_, String>(1))?
724 .collect::<Result<Vec<_>, _>>()?;
725 if !cols.iter().any(|c| c == "frame_vars_json") {
726 conn.execute(
727 "ALTER TABLE ainl_trajectories ADD COLUMN frame_vars_json TEXT",
728 [],
729 )?;
730 }
731 if !cols.iter().any(|c| c == "fitness_delta") {
732 conn.execute(
733 "ALTER TABLE ainl_trajectories ADD COLUMN fitness_delta REAL",
734 [],
735 )?;
736 }
737 Ok(())
738}
739
740impl SqliteGraphStore {
741 pub fn ensure_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
743 conn.execute(
744 "CREATE TABLE IF NOT EXISTS ainl_graph_nodes (
745 id TEXT PRIMARY KEY,
746 node_type TEXT NOT NULL,
747 payload TEXT NOT NULL,
748 timestamp INTEGER NOT NULL
749 )",
750 [],
751 )?;
752
753 conn.execute(
754 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_timestamp
755 ON ainl_graph_nodes(timestamp DESC)",
756 [],
757 )?;
758
759 conn.execute(
760 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_type
761 ON ainl_graph_nodes(node_type)",
762 [],
763 )?;
764
765 migrate_ainl_graph_nodes_add_project_id(conn)?;
766 migrate_ainl_graph_nodes_add_mission_columns(conn)?;
767 conn.execute(
768 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_project_type_time
769 ON ainl_graph_nodes(project_id, node_type, timestamp)",
770 [],
771 )?;
772
773 conn.execute(
774 "CREATE TABLE IF NOT EXISTS ainl_graph_edges (
775 from_id TEXT NOT NULL,
776 to_id TEXT NOT NULL,
777 label TEXT NOT NULL,
778 weight REAL NOT NULL DEFAULT 1.0,
779 metadata TEXT,
780 PRIMARY KEY (from_id, to_id, label),
781 FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
782 FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
783 )",
784 [],
785 )?;
786
787 conn.execute(
788 "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from
789 ON ainl_graph_edges(from_id, label)",
790 [],
791 )?;
792
793 conn.execute(
794 "CREATE INDEX IF NOT EXISTS idx_ainl_edges_to
795 ON ainl_graph_edges(to_id, label)",
796 [],
797 )?;
798
799 migrate_edge_columns(conn)?;
800 migrate_edges_add_foreign_keys(conn)?;
801 migrate_trajectories_v1(conn)?;
802 migrate_ainl_trajectories_add_depth_v1(conn)?;
803 migrate_failures_fts_v1(conn)?;
804 migrate_ainl_nodes_fts_v1(conn)?;
805 if backfill_ainl_nodes_fts_if_empty(conn).is_err() {
806 }
808 let _ = install_ainl_graph_node_delete_fts_triggers(conn);
809 Ok(())
810 }
811
812 pub fn open(path: &std::path::Path) -> Result<Self, String> {
814 let conn = rusqlite::Connection::open(path).map_err(|e| e.to_string())?;
815 enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
816 Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
817 Ok(Self { conn })
818 }
819
820 pub fn from_connection(conn: rusqlite::Connection) -> Result<Self, String> {
822 enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
823 Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
824 Ok(Self { conn })
825 }
826
827 pub(crate) fn conn(&self) -> &rusqlite::Connection {
829 &self.conn
830 }
831
832 pub fn insert_graph_edge(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
834 persist_edge(&self.conn, from_id, to_id, label, 1.0, None)
835 }
836
837 pub fn insert_graph_edge_checked(
839 &self,
840 from_id: Uuid,
841 to_id: Uuid,
842 label: &str,
843 ) -> Result<(), String> {
844 if !self.node_row_exists(&from_id.to_string())? {
845 return Err(format!(
846 "insert_graph_edge_checked: missing source node row {}",
847 from_id
848 ));
849 }
850 if !self.node_row_exists(&to_id.to_string())? {
851 return Err(format!(
852 "insert_graph_edge_checked: missing target node row {}",
853 to_id
854 ));
855 }
856 self.insert_graph_edge(from_id, to_id, label)
857 }
858
859 pub fn insert_graph_edge_with_meta(
861 &self,
862 from_id: Uuid,
863 to_id: Uuid,
864 label: &str,
865 weight: f32,
866 metadata: Option<&serde_json::Value>,
867 ) -> Result<(), String> {
868 let meta = metadata
869 .map(serde_json::to_string)
870 .transpose()
871 .map_err(|e| e.to_string())?;
872 persist_edge(&self.conn, from_id, to_id, label, weight, meta.as_deref())
873 }
874
875 pub fn query_nodes_by_type_since(
877 &self,
878 node_type: &str,
879 since_timestamp: i64,
880 limit: usize,
881 ) -> Result<Vec<AinlMemoryNode>, String> {
882 let mut stmt = self
883 .conn
884 .prepare(
885 "SELECT payload FROM ainl_graph_nodes
886 WHERE node_type = ?1 AND timestamp >= ?2
887 ORDER BY timestamp DESC
888 LIMIT ?3",
889 )
890 .map_err(|e| e.to_string())?;
891
892 let rows = stmt
893 .query_map(
894 rusqlite::params![node_type, since_timestamp, limit as i64],
895 |row| {
896 let payload: String = row.get(0)?;
897 Ok(payload)
898 },
899 )
900 .map_err(|e| e.to_string())?;
901
902 let mut nodes = Vec::new();
903 for row in rows {
904 let payload = row.map_err(|e| e.to_string())?;
905 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
906 nodes.push(node);
907 }
908
909 Ok(nodes)
910 }
911
912 pub fn read_runtime_state(&self, agent_id: &str) -> Result<Option<RuntimeStateNode>, String> {
916 if agent_id.is_empty() {
917 return Ok(None);
918 }
919 let mut stmt = self
920 .conn
921 .prepare(
922 "SELECT payload FROM ainl_graph_nodes
923 WHERE node_type = 'runtime_state'
924 AND json_extract(payload, '$.node_type.runtime_state.agent_id') = ?1
925 ORDER BY timestamp DESC
926 LIMIT 1",
927 )
928 .map_err(|e| e.to_string())?;
929
930 let payload_opt: Option<String> = stmt
931 .query_row([agent_id], |row| row.get(0))
932 .optional()
933 .map_err(|e| e.to_string())?;
934
935 let Some(payload) = payload_opt else {
936 return Ok(None);
937 };
938
939 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
940 match node.node_type {
941 AinlNodeType::RuntimeState { runtime_state } => Ok(Some(runtime_state)),
942 _ => Err("runtime_state row had unexpected node_type payload".to_string()),
943 }
944 }
945
946 pub fn write_runtime_state(&self, state: &RuntimeStateNode) -> Result<(), String> {
948 let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, state.agent_id.as_bytes());
949 let node = AinlMemoryNode {
950 id,
951 memory_category: MemoryCategory::RuntimeState,
952 importance_score: 0.5,
953 agent_id: state.agent_id.clone(),
954 project_id: None,
955 node_type: AinlNodeType::RuntimeState {
956 runtime_state: state.clone(),
957 },
958 edges: Vec::new(),
959 plugin_data: None,
960 };
961 self.write_node(&node)
962 }
963
964 pub fn write_node_with_edges(&mut self, node: &AinlMemoryNode) -> Result<(), String> {
966 let tx = self.conn.transaction().map_err(|e| e.to_string())?;
967 for edge in &node.edges {
968 let exists: Option<i32> = tx
969 .query_row(
970 "SELECT 1 FROM ainl_graph_nodes WHERE id = ?1",
971 [edge.target_id.to_string()],
972 |_| Ok(1),
973 )
974 .optional()
975 .map_err(|e| e.to_string())?;
976 if exists.is_none() {
977 return Err(format!(
978 "write_node_with_edges: missing target node {}",
979 edge.target_id
980 ));
981 }
982 }
983 persist_node(&tx, node)?;
984 tx.commit().map_err(|e| e.to_string())?;
985 Ok(())
986 }
987
988 pub fn validate_graph(&self, agent_id: &str) -> Result<GraphValidationReport, String> {
990 self.validate_graph_checked(agent_id)
991 .map_err(|e| e.to_string())
992 }
993
994 pub fn validate_graph_checked(
996 &self,
997 agent_id: &str,
998 ) -> Result<GraphValidationReport, GraphValidationError> {
999 use std::collections::HashSet;
1000
1001 let agent_nodes = self
1002 .agent_node_ids(agent_id)
1003 .map_err(GraphValidationError::Sqlite)?;
1004 let node_count = agent_nodes.len();
1005
1006 let mut stmt = self
1007 .conn
1008 .prepare("SELECT from_id, to_id, label FROM ainl_graph_edges")
1009 .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?;
1010 let all_edges: Vec<(String, String, String)> = stmt
1011 .query_map([], |row| {
1012 Ok((
1013 row.get::<_, String>(0)?,
1014 row.get::<_, String>(1)?,
1015 row.get::<_, String>(2)?,
1016 ))
1017 })
1018 .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?
1019 .collect::<Result<Vec<_>, _>>()
1020 .map_err(|e| GraphValidationError::Sqlite(e.to_string()))?;
1021
1022 let mut edge_pairs = Vec::new();
1023 for (from_id, to_id, label) in all_edges {
1024 let touches_agent = agent_nodes.contains(&from_id) || agent_nodes.contains(&to_id);
1025 if touches_agent {
1026 edge_pairs.push((from_id, to_id, label));
1027 }
1028 }
1029
1030 let edge_count = edge_pairs.len();
1031 let mut dangling_edges = Vec::new();
1032 let mut dangling_edge_details = Vec::new();
1033 let mut cross_agent_boundary_edges = 0usize;
1034
1035 for (from_id, to_id, label) in &edge_pairs {
1036 let from_ok = self
1037 .node_row_exists(from_id)
1038 .map_err(GraphValidationError::Sqlite)?;
1039 let to_ok = self
1040 .node_row_exists(to_id)
1041 .map_err(GraphValidationError::Sqlite)?;
1042 if !from_ok || !to_ok {
1043 dangling_edges.push((from_id.clone(), to_id.clone()));
1044 dangling_edge_details.push(DanglingEdgeDetail {
1045 source_id: from_id.clone(),
1046 target_id: to_id.clone(),
1047 edge_type: label.clone(),
1048 });
1049 continue;
1050 }
1051 let fa = agent_nodes.contains(from_id);
1052 let ta = agent_nodes.contains(to_id);
1053 if fa ^ ta {
1054 cross_agent_boundary_edges += 1;
1055 }
1056 }
1057
1058 let mut touched: HashSet<String> =
1059 HashSet::with_capacity(edge_pairs.len().saturating_mul(2));
1060 for (a, b, _) in &edge_pairs {
1061 if agent_nodes.contains(a) {
1062 touched.insert(a.clone());
1063 }
1064 if agent_nodes.contains(b) {
1065 touched.insert(b.clone());
1066 }
1067 }
1068
1069 let mut orphan_nodes = Vec::new();
1070 for id in &agent_nodes {
1071 if !touched.contains(id) {
1072 orphan_nodes.push(id.clone());
1073 }
1074 }
1075
1076 let is_valid = dangling_edges.is_empty();
1077 Ok(GraphValidationReport {
1078 agent_id: agent_id.to_string(),
1079 node_count,
1080 edge_count,
1081 dangling_edges,
1082 dangling_edge_details,
1083 cross_agent_boundary_edges,
1084 orphan_nodes,
1085 is_valid,
1086 })
1087 }
1088
1089 fn node_row_exists(&self, id: &str) -> Result<bool, String> {
1090 let v: Option<i32> = self
1091 .conn
1092 .query_row("SELECT 1 FROM ainl_graph_nodes WHERE id = ?1", [id], |_| {
1093 Ok(1)
1094 })
1095 .optional()
1096 .map_err(|e| e.to_string())?;
1097 Ok(v.is_some())
1098 }
1099
1100 fn agent_node_ids(&self, agent_id: &str) -> Result<HashSet<String>, String> {
1101 let mut stmt = self
1102 .conn
1103 .prepare(
1104 "SELECT id FROM ainl_graph_nodes
1105 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
1106 )
1107 .map_err(|e| e.to_string())?;
1108 let ids = stmt
1109 .query_map([agent_id], |row| row.get::<_, String>(0))
1110 .map_err(|e| e.to_string())?
1111 .collect::<Result<HashSet<_>, _>>()
1112 .map_err(|e| e.to_string())?;
1113 Ok(ids)
1114 }
1115
1116 pub fn agent_subgraph_edges(&self, agent_id: &str) -> Result<Vec<SnapshotEdge>, String> {
1118 let id_set = self.agent_node_ids(agent_id)?;
1119 collect_snapshot_edges_for_id_set(&self.conn, &id_set)
1120 }
1121
1122 pub fn export_graph(&self, agent_id: &str) -> Result<AgentGraphSnapshot, String> {
1124 let mut stmt = self
1125 .conn
1126 .prepare(
1127 "SELECT payload FROM ainl_graph_nodes
1128 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
1129 )
1130 .map_err(|e| e.to_string())?;
1131 let nodes: Vec<AinlMemoryNode> = stmt
1132 .query_map([agent_id], |row| {
1133 let payload: String = row.get(0)?;
1134 Ok(payload)
1135 })
1136 .map_err(|e| e.to_string())?
1137 .map(|r| {
1138 let payload = r.map_err(|e| e.to_string())?;
1139 serde_json::from_str(&payload).map_err(|e| e.to_string())
1140 })
1141 .collect::<Result<Vec<_>, _>>()?;
1142
1143 let id_set: std::collections::HashSet<String> =
1144 nodes.iter().map(|n| n.id.to_string()).collect();
1145
1146 let edges = collect_snapshot_edges_for_id_set(&self.conn, &id_set)?;
1147
1148 Ok(AgentGraphSnapshot {
1149 agent_id: agent_id.to_string(),
1150 exported_at: Utc::now(),
1151 schema_version: std::borrow::Cow::Borrowed(SNAPSHOT_SCHEMA_VERSION),
1152 nodes,
1153 edges,
1154 })
1155 }
1156
1157 pub fn import_graph(
1166 &mut self,
1167 snapshot: &AgentGraphSnapshot,
1168 allow_dangling_edges: bool,
1169 ) -> Result<(), String> {
1170 self.import_graph_checked(snapshot, allow_dangling_edges)
1171 .map_err(|e| e.to_string())
1172 }
1173
1174 pub fn import_graph_checked(
1176 &mut self,
1177 snapshot: &AgentGraphSnapshot,
1178 allow_dangling_edges: bool,
1179 ) -> Result<(), SnapshotImportError> {
1180 if snapshot.schema_version.as_ref() != SNAPSHOT_SCHEMA_VERSION {
1181 return Err(SnapshotImportError::UnsupportedSchemaVersion {
1182 got: snapshot.schema_version.to_string(),
1183 expected: SNAPSHOT_SCHEMA_VERSION,
1184 });
1185 }
1186
1187 if allow_dangling_edges {
1188 self.conn
1189 .execute_batch("PRAGMA foreign_keys = OFF;")
1190 .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1191 }
1192
1193 let result: Result<(), SnapshotImportError> = (|| {
1194 let tx = self
1195 .conn
1196 .transaction()
1197 .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1198 for node in &snapshot.nodes {
1199 try_insert_node_ignore(&tx, node).map_err(SnapshotImportError::Sqlite)?;
1200 }
1201 for edge in &snapshot.edges {
1202 try_insert_edge_ignore(&tx, edge).map_err(SnapshotImportError::Sqlite)?;
1203 }
1204 tx.commit()
1205 .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1206 Ok(())
1207 })();
1208
1209 if allow_dangling_edges {
1210 self.conn
1211 .execute_batch("PRAGMA foreign_keys = ON;")
1212 .map_err(|e| SnapshotImportError::Sqlite(e.to_string()))?;
1213 }
1214
1215 result
1216 }
1217
1218 pub fn insert_trajectory_detail(&self, row: &TrajectoryDetailRecord) -> Result<(), String> {
1220 let steps_json = serde_json::to_string(&row.steps).map_err(|e| e.to_string())?;
1221 let outcome_json = serde_json::to_string(&row.outcome).map_err(|e| e.to_string())?;
1222 let frame_s = match &row.frame_vars {
1223 None => None,
1224 Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
1225 };
1226 self.conn
1227 .execute(
1228 "INSERT OR REPLACE INTO ainl_trajectories (
1229 id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1230 recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1231 frame_vars_json, fitness_delta
1232 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1233 rusqlite::params![
1234 row.id.to_string(),
1235 row.episode_id.to_string(),
1236 row.graph_trajectory_node_id.map(|u| u.to_string()),
1237 row.agent_id,
1238 row.session_id,
1239 row.project_id,
1240 row.recorded_at,
1241 outcome_json,
1242 row.ainl_source_hash,
1243 row.duration_ms as i64,
1244 steps_json,
1245 frame_s,
1246 row.fitness_delta,
1247 ],
1248 )
1249 .map_err(|e| e.to_string())?;
1250 Ok(())
1251 }
1252
1253 pub fn list_trajectories_for_agent(
1255 &self,
1256 agent_id: &str,
1257 limit: usize,
1258 since_timestamp: Option<i64>,
1259 ) -> Result<Vec<TrajectoryDetailRecord>, String> {
1260 let cap = limit.clamp(1, 500) as i64;
1261 let sql = if since_timestamp.is_some() {
1262 "SELECT id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1263 recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1264 frame_vars_json, fitness_delta
1265 FROM ainl_trajectories
1266 WHERE agent_id = ?1 AND recorded_at >= ?2
1267 ORDER BY recorded_at DESC
1268 LIMIT ?3"
1269 } else {
1270 "SELECT id, episode_id, graph_trajectory_node_id, agent_id, session_id, project_id,
1271 recorded_at, outcome_json, ainl_source_hash, duration_ms, steps_json,
1272 frame_vars_json, fitness_delta
1273 FROM ainl_trajectories
1274 WHERE agent_id = ?1
1275 ORDER BY recorded_at DESC
1276 LIMIT ?2"
1277 };
1278
1279 let mut stmt = self.conn.prepare(sql).map_err(|e| e.to_string())?;
1280 let rows = if let Some(since) = since_timestamp {
1281 stmt.query_map(rusqlite::params![agent_id, since, cap], map_trajectory_row)
1282 } else {
1283 stmt.query_map(rusqlite::params![agent_id, cap], map_trajectory_row)
1284 }
1285 .map_err(|e| e.to_string())?;
1286
1287 let mut out = Vec::new();
1288 for row in rows {
1289 out.push(row.map_err(|e| e.to_string())?);
1290 }
1291 Ok(out)
1292 }
1293
1294 pub fn count_trajectory_details_before(
1296 &self,
1297 agent_id: &str,
1298 before_unix: i64,
1299 ) -> Result<usize, String> {
1300 if agent_id.trim().is_empty() {
1301 return Err("agent_id is empty".into());
1302 }
1303 let n: i64 = self
1304 .conn
1305 .query_row(
1306 "SELECT COUNT(*) FROM ainl_trajectories WHERE agent_id = ?1 AND recorded_at < ?2",
1307 rusqlite::params![agent_id, before_unix],
1308 |r| r.get(0),
1309 )
1310 .map_err(|e| e.to_string())?;
1311 Ok(n as usize)
1312 }
1313
1314 pub fn delete_trajectory_details_before(
1320 &self,
1321 agent_id: &str,
1322 before_unix: i64,
1323 ) -> Result<usize, String> {
1324 if agent_id.trim().is_empty() {
1325 return Err("agent_id is empty".into());
1326 }
1327 let n = self
1328 .conn
1329 .execute(
1330 "DELETE FROM ainl_trajectories WHERE agent_id = ?1 AND recorded_at < ?2",
1331 rusqlite::params![agent_id, before_unix],
1332 )
1333 .map_err(|e| e.to_string())?;
1334 Ok(n)
1335 }
1336
1337 pub fn search_failures_fts_for_agent(
1341 &self,
1342 agent_id: &str,
1343 query: &str,
1344 limit: usize,
1345 ) -> Result<Vec<AinlMemoryNode>, String> {
1346 let fts_q = fts5_prefix_match_query(query);
1347 if fts_q.is_empty() || agent_id.trim().is_empty() {
1348 return Ok(Vec::new());
1349 }
1350 let cap = limit.clamp(1, 200) as i64;
1351 let mut stmt = self
1352 .conn
1353 .prepare(
1354 "SELECT n.payload
1355 FROM ainl_failures_fts AS f
1356 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1357 WHERE n.node_type = 'failure'
1358 AND json_extract(n.payload, '$.agent_id') = ?1
1359 AND f.body MATCH ?2
1360 ORDER BY n.timestamp DESC
1361 LIMIT ?3",
1362 )
1363 .map_err(|e| e.to_string())?;
1364
1365 let rows = stmt.query_map(rusqlite::params![agent_id, fts_q, cap], |row| {
1366 let payload: String = row.get(0)?;
1367 Ok(payload)
1368 });
1369
1370 let mut out = Vec::new();
1371 let rows = match rows {
1372 Ok(r) => r,
1373 Err(e) => {
1374 let msg = e.to_string();
1375 if msg.contains("fts5") || msg.to_ascii_lowercase().contains("syntax") {
1376 return Ok(Vec::new());
1377 }
1378 return Err(msg);
1379 }
1380 };
1381 for row in rows {
1382 match row {
1383 Ok(payload) => {
1384 if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1385 out.push(node);
1386 }
1387 }
1388 Err(e) => {
1389 let msg = e.to_string();
1390 if msg.contains("fts5") || msg.to_ascii_lowercase().contains("syntax") {
1391 return Ok(Vec::new());
1392 }
1393 return Err(msg);
1394 }
1395 }
1396 }
1397 Ok(out)
1398 }
1399
1400 pub fn search_all_nodes_fts_for_agent(
1406 &self,
1407 agent_id: &str,
1408 query: &str,
1409 project_id: Option<&str>,
1410 limit: usize,
1411 ) -> Result<Vec<AinlMemoryNode>, String> {
1412 let fts_q = fts5_prefix_match_query(query);
1413 if fts_q.is_empty() || agent_id.trim().is_empty() {
1414 return Ok(Vec::new());
1415 }
1416 let cap = limit.clamp(1, 200) as i64;
1417 let project_filter = project_id.map(str::trim).filter(|s| !s.is_empty());
1418 let mut out = Vec::new();
1419 if let Some(p) = project_filter {
1420 let mut stmt = self
1421 .conn
1422 .prepare(
1423 "SELECT n.payload
1424 FROM ainl_nodes_fts AS f
1425 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1426 WHERE f.agent_id = ?1
1427 AND (COALESCE(f.project_id, '') = '' OR f.project_id = ?3)
1428 AND f.body MATCH ?2
1429 ORDER BY n.timestamp DESC
1430 LIMIT ?4",
1431 )
1432 .map_err(|e| e.to_string())?;
1433 let mut rows = stmt
1434 .query(rusqlite::params![agent_id, fts_q, p, cap])
1435 .map_err(|e| e.to_string())?;
1436 while let Some(row) = rows.next().map_err(|e| e.to_string())? {
1437 let payload: String = row.get(0).map_err(|e| e.to_string())?;
1438 if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1439 out.push(node);
1440 }
1441 }
1442 } else {
1443 let mut stmt = self
1444 .conn
1445 .prepare(
1446 "SELECT n.payload
1447 FROM ainl_nodes_fts AS f
1448 INNER JOIN ainl_graph_nodes AS n ON n.id = f.node_id
1449 WHERE f.agent_id = ?1
1450 AND f.body MATCH ?2
1451 ORDER BY n.timestamp DESC
1452 LIMIT ?3",
1453 )
1454 .map_err(|e| e.to_string())?;
1455 let mut rows = stmt
1456 .query(rusqlite::params![agent_id, fts_q, cap])
1457 .map_err(|e| e.to_string())?;
1458 while let Some(row) = rows.next().map_err(|e| e.to_string())? {
1459 let payload: String = row.get(0).map_err(|e| e.to_string())?;
1460 if let Ok(node) = serde_json::from_str::<AinlMemoryNode>(&payload) {
1461 out.push(node);
1462 }
1463 }
1464 }
1465 Ok(out)
1466 }
1467}
1468
1469fn map_trajectory_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<TrajectoryDetailRecord> {
1470 let id_s: String = row.get(0)?;
1471 let episode_s: String = row.get(1)?;
1472 let graph_traj: Option<String> = row.get(2)?;
1473 let agent_id: String = row.get(3)?;
1474 let session_id: String = row.get(4)?;
1475 let project_id: Option<String> = row.get(5)?;
1476 let recorded_at: i64 = row.get(6)?;
1477 let outcome_json: String = row.get(7)?;
1478 let hash: Option<String> = row.get(8)?;
1479 let duration_ms: i64 = row.get(9)?;
1480 let steps_json: String = row.get(10)?;
1481 let frame_vars_json: Option<String> = row.get(11)?;
1482 let fitness_sql: Option<f64> = row.get(12)?;
1483 let id = Uuid::parse_str(&id_s).map_err(|_| {
1484 rusqlite::Error::InvalidColumnType(0, "id".into(), rusqlite::types::Type::Text)
1485 })?;
1486 let episode_id = Uuid::parse_str(&episode_s).map_err(|_| {
1487 rusqlite::Error::InvalidColumnType(1, "episode_id".into(), rusqlite::types::Type::Text)
1488 })?;
1489 let graph_trajectory_node_id = graph_traj
1490 .filter(|s| !s.is_empty())
1491 .map(|s| Uuid::parse_str(&s))
1492 .transpose()
1493 .map_err(|_| {
1494 rusqlite::Error::InvalidColumnType(
1495 2,
1496 "graph_trajectory_node_id".into(),
1497 rusqlite::types::Type::Text,
1498 )
1499 })?;
1500 let outcome: TrajectoryOutcome = serde_json::from_str(&outcome_json).map_err(|_| {
1501 rusqlite::Error::InvalidColumnType(7, "outcome_json".into(), rusqlite::types::Type::Text)
1502 })?;
1503 let steps: Vec<TrajectoryStep> = serde_json::from_str(&steps_json).map_err(|_| {
1504 rusqlite::Error::InvalidColumnType(10, "steps_json".into(), rusqlite::types::Type::Text)
1505 })?;
1506 let frame_vars = frame_vars_json
1507 .filter(|s| !s.trim().is_empty())
1508 .and_then(|s| serde_json::from_str(&s).ok());
1509 let fitness_delta = fitness_sql.map(|f| f as f32);
1510 Ok(TrajectoryDetailRecord {
1511 id,
1512 episode_id,
1513 graph_trajectory_node_id,
1514 agent_id,
1515 session_id,
1516 project_id,
1517 recorded_at,
1518 outcome,
1519 ainl_source_hash: hash,
1520 duration_ms: duration_ms.max(0) as u64,
1521 steps,
1522 frame_vars,
1523 fitness_delta,
1524 })
1525}
1526
1527impl GraphStore for SqliteGraphStore {
1528 fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String> {
1531 persist_node(&self.conn, node)
1532 }
1533
1534 fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String> {
1535 let payload: Option<String> = self
1536 .conn
1537 .query_row(
1538 "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
1539 [id.to_string()],
1540 |row| row.get::<_, String>(0),
1541 )
1542 .optional()
1543 .map_err(|e: rusqlite::Error| e.to_string())?;
1544
1545 match payload {
1546 Some(p) => {
1547 let node: AinlMemoryNode = serde_json::from_str(&p).map_err(|e| e.to_string())?;
1548 Ok(Some(node))
1549 }
1550 None => Ok(None),
1551 }
1552 }
1553
1554 fn query_episodes_since(
1555 &self,
1556 since_timestamp: i64,
1557 limit: usize,
1558 ) -> Result<Vec<AinlMemoryNode>, String> {
1559 let mut stmt = self
1560 .conn
1561 .prepare(
1562 "SELECT payload FROM ainl_graph_nodes
1563 WHERE node_type = 'episode' AND timestamp >= ?1
1564 ORDER BY timestamp DESC
1565 LIMIT ?2",
1566 )
1567 .map_err(|e| e.to_string())?;
1568
1569 let rows = stmt
1570 .query_map([since_timestamp, limit as i64], |row| {
1571 let payload: String = row.get(0)?;
1572 Ok(payload)
1573 })
1574 .map_err(|e| e.to_string())?;
1575
1576 let mut nodes = Vec::new();
1577 for row in rows {
1578 let payload = row.map_err(|e| e.to_string())?;
1579 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
1580 nodes.push(node);
1581 }
1582
1583 Ok(nodes)
1584 }
1585
1586 fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String> {
1587 let mut stmt = self
1588 .conn
1589 .prepare(
1590 "SELECT payload FROM ainl_graph_nodes
1591 WHERE node_type = ?1
1592 ORDER BY timestamp DESC",
1593 )
1594 .map_err(|e| e.to_string())?;
1595
1596 let rows = stmt
1597 .query_map([type_name], |row| {
1598 let payload: String = row.get(0)?;
1599 Ok(payload)
1600 })
1601 .map_err(|e| e.to_string())?;
1602
1603 let mut nodes = Vec::new();
1604 for row in rows {
1605 let payload = row.map_err(|e| e.to_string())?;
1606 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
1607 nodes.push(node);
1608 }
1609
1610 Ok(nodes)
1611 }
1612
1613 fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
1614 let mut stmt = self
1615 .conn
1616 .prepare(
1617 "SELECT to_id FROM ainl_graph_edges
1618 WHERE from_id = ?1 AND label = ?2",
1619 )
1620 .map_err(|e| e.to_string())?;
1621
1622 let target_ids: Vec<String> = stmt
1623 .query_map([from_id.to_string(), label.to_string()], |row| row.get(0))
1624 .map_err(|e| e.to_string())?
1625 .collect::<Result<Vec<_>, _>>()
1626 .map_err(|e| e.to_string())?;
1627
1628 let mut nodes = Vec::new();
1629 for target_id in target_ids {
1630 let id = Uuid::parse_str(&target_id).map_err(|e| e.to_string())?;
1631 if let Some(node) = self.read_node(id)? {
1632 nodes.push(node);
1633 }
1634 }
1635
1636 Ok(nodes)
1637 }
1638
1639 fn walk_edges_to(&self, to_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
1640 let mut stmt = self
1641 .conn
1642 .prepare(
1643 "SELECT from_id FROM ainl_graph_edges
1644 WHERE to_id = ?1 AND label = ?2",
1645 )
1646 .map_err(|e| e.to_string())?;
1647
1648 let source_ids: Vec<String> = stmt
1649 .query_map([to_id.to_string(), label.to_string()], |row| row.get(0))
1650 .map_err(|e| e.to_string())?
1651 .collect::<Result<Vec<_>, _>>()
1652 .map_err(|e| e.to_string())?;
1653
1654 let mut nodes = Vec::new();
1655 for source_id in source_ids {
1656 let id = Uuid::parse_str(&source_id).map_err(|e| e.to_string())?;
1657 if let Some(node) = self.read_node(id)? {
1658 nodes.push(node);
1659 }
1660 }
1661
1662 Ok(nodes)
1663 }
1664}
1665
1666#[cfg(test)]
1667mod schema_tests {
1668 use super::SqliteGraphStore;
1669
1670 #[test]
1671 fn ensure_schema_idempotent() {
1672 let dir = tempfile::tempdir().expect("tempdir");
1673 let path = dir.path().join("ainl_schema_idempotent.db");
1674 drop(SqliteGraphStore::open(&path).expect("open 1"));
1675 drop(SqliteGraphStore::open(&path).expect("open 2"));
1676 }
1677}