1use crate::node::{AinlMemoryNode, AinlNodeType, MemoryCategory, RuntimeStateNode};
30use crate::snapshot::{
31 AgentGraphSnapshot, DanglingEdgeDetail, GraphValidationReport, SnapshotEdge,
32 SNAPSHOT_SCHEMA_VERSION,
33};
34use chrono::Utc;
35use rusqlite::OptionalExtension;
36use std::collections::HashSet;
37use uuid::Uuid;
38
39pub trait GraphStore {
41 fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String>;
43
44 fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String>;
46
47 fn query_episodes_since(
49 &self,
50 since_timestamp: i64,
51 limit: usize,
52 ) -> Result<Vec<AinlMemoryNode>, String>;
53
54 fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String>;
56
57 fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
59}
60
61pub struct SqliteGraphStore {
67 conn: rusqlite::Connection,
68}
69
70fn enable_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
71 conn.execute_batch("PRAGMA foreign_keys = ON;")
72}
73
74fn migrate_edge_columns(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
75 let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_edges)")?;
76 let cols = stmt
77 .query_map([], |row| row.get::<_, String>(1))?
78 .collect::<Result<Vec<_>, _>>()?;
79 if !cols.iter().any(|c| c == "weight") {
80 conn.execute(
81 "ALTER TABLE ainl_graph_edges ADD COLUMN weight REAL NOT NULL DEFAULT 1.0",
82 [],
83 )?;
84 }
85 if !cols.iter().any(|c| c == "metadata") {
86 conn.execute("ALTER TABLE ainl_graph_edges ADD COLUMN metadata TEXT", [])?;
87 }
88 Ok(())
89}
90
91fn edges_table_has_foreign_keys(conn: &rusqlite::Connection) -> Result<bool, rusqlite::Error> {
93 let exists: i64 = conn.query_row(
94 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
95 [],
96 |r| r.get(0),
97 )?;
98 if exists == 0 {
99 return Ok(false);
100 }
101 let n: i64 = conn.query_row(
102 "SELECT COUNT(*) FROM pragma_foreign_key_list('ainl_graph_edges')",
103 [],
104 |r| r.get(0),
105 )?;
106 Ok(n > 0)
107}
108
109fn migrate_edges_add_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
112 if edges_table_has_foreign_keys(conn)? {
113 return Ok(());
114 }
115
116 let exists: i64 = conn.query_row(
117 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
118 [],
119 |r| r.get(0),
120 )?;
121 if exists == 0 {
122 return Ok(());
123 }
124
125 conn.execute("BEGIN IMMEDIATE", [])?;
126 let res: Result<(), rusqlite::Error> = (|| {
127 conn.execute("DROP INDEX IF EXISTS idx_ainl_edges_from", [])?;
128 conn.execute(
129 "ALTER TABLE ainl_graph_edges RENAME TO ainl_graph_edges__old",
130 [],
131 )?;
132 conn.execute(
133 r#"CREATE TABLE ainl_graph_edges (
134 from_id TEXT NOT NULL,
135 to_id TEXT NOT NULL,
136 label TEXT NOT NULL,
137 weight REAL NOT NULL DEFAULT 1.0,
138 metadata TEXT,
139 PRIMARY KEY (from_id, to_id, label),
140 FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
141 FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
142 )"#,
143 [],
144 )?;
145 conn.execute(
146 r#"INSERT INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
147 SELECT o.from_id, o.to_id, o.label,
148 COALESCE(o.weight, 1.0),
149 o.metadata
150 FROM ainl_graph_edges__old o
151 WHERE EXISTS (SELECT 1 FROM ainl_graph_nodes n WHERE n.id = o.from_id)
152 AND EXISTS (SELECT 1 FROM ainl_graph_nodes n2 WHERE n2.id = o.to_id)"#,
153 [],
154 )?;
155 conn.execute("DROP TABLE ainl_graph_edges__old", [])?;
156 conn.execute(
157 "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from ON ainl_graph_edges(from_id, label)",
158 [],
159 )?;
160 Ok(())
161 })();
162
163 match res {
164 Ok(()) => {
165 conn.execute("COMMIT", [])?;
166 }
167 Err(e) => {
168 let _ = conn.execute("ROLLBACK", []);
169 return Err(e);
170 }
171 }
172 Ok(())
173}
174
175fn node_type_name(node: &AinlMemoryNode) -> &'static str {
176 match &node.node_type {
177 AinlNodeType::Episode { .. } => "episode",
178 AinlNodeType::Semantic { .. } => "semantic",
179 AinlNodeType::Procedural { .. } => "procedural",
180 AinlNodeType::Persona { .. } => "persona",
181 AinlNodeType::RuntimeState { .. } => "runtime_state",
182 }
183}
184
185fn node_timestamp(node: &AinlMemoryNode) -> i64 {
186 match &node.node_type {
187 AinlNodeType::Episode { episodic } => episodic.timestamp,
188 AinlNodeType::RuntimeState { runtime_state } => runtime_state.updated_at,
189 _ => chrono::Utc::now().timestamp(),
190 }
191}
192
193fn persist_edge(
194 conn: &rusqlite::Connection,
195 from_id: Uuid,
196 to_id: Uuid,
197 label: &str,
198 weight: f32,
199 metadata: Option<&str>,
200) -> Result<(), String> {
201 conn.execute(
202 "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
203 VALUES (?1, ?2, ?3, ?4, ?5)",
204 rusqlite::params![
205 from_id.to_string(),
206 to_id.to_string(),
207 label,
208 weight,
209 metadata
210 ],
211 )
212 .map_err(|e| e.to_string())?;
213 Ok(())
214}
215
216fn collect_snapshot_edges_for_id_set(
218 conn: &rusqlite::Connection,
219 id_set: &HashSet<String>,
220) -> Result<Vec<SnapshotEdge>, String> {
221 let mut edge_stmt = conn
222 .prepare("SELECT from_id, to_id, label, weight, metadata FROM ainl_graph_edges")
223 .map_err(|e| e.to_string())?;
224 let edge_rows = edge_stmt
225 .query_map([], |row| {
226 Ok((
227 row.get::<_, String>(0)?,
228 row.get::<_, String>(1)?,
229 row.get::<_, String>(2)?,
230 row.get::<_, f64>(3)?,
231 row.get::<_, Option<String>>(4)?,
232 ))
233 })
234 .map_err(|e| e.to_string())?
235 .collect::<Result<Vec<_>, _>>()
236 .map_err(|e| e.to_string())?;
237
238 let mut edges = Vec::new();
239 for (from_id, to_id, label, weight, meta) in edge_rows {
240 if !id_set.contains(&from_id) || !id_set.contains(&to_id) {
241 continue;
242 }
243 let source_id = Uuid::parse_str(&from_id).map_err(|e| e.to_string())?;
244 let target_id = Uuid::parse_str(&to_id).map_err(|e| e.to_string())?;
245 let metadata = match meta {
246 Some(s) if !s.is_empty() => Some(serde_json::from_str(&s).map_err(|e| e.to_string())?),
247 _ => None,
248 };
249 edges.push(SnapshotEdge {
250 source_id,
251 target_id,
252 edge_type: label,
253 weight: weight as f32,
254 metadata,
255 });
256 }
257 Ok(edges)
258}
259
260fn persist_node(conn: &rusqlite::Connection, node: &AinlMemoryNode) -> Result<(), String> {
261 let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
262 let type_name = node_type_name(node);
263 let timestamp = node_timestamp(node);
264
265 conn.execute(
266 "INSERT OR REPLACE INTO ainl_graph_nodes (id, node_type, payload, timestamp)
267 VALUES (?1, ?2, ?3, ?4)",
268 rusqlite::params![node.id.to_string(), type_name, payload, timestamp,],
269 )
270 .map_err(|e| e.to_string())?;
271
272 for edge in &node.edges {
273 persist_edge(
274 conn,
275 node.id,
276 edge.target_id,
277 &edge.label,
278 1.0,
279 None::<&str>,
280 )?;
281 }
282
283 Ok(())
284}
285
286fn try_insert_node_ignore(
287 conn: &rusqlite::Connection,
288 node: &AinlMemoryNode,
289) -> Result<(), String> {
290 let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
291 let type_name = node_type_name(node);
292 let timestamp = node_timestamp(node);
293 conn.execute(
294 "INSERT OR IGNORE INTO ainl_graph_nodes (id, node_type, payload, timestamp)
295 VALUES (?1, ?2, ?3, ?4)",
296 rusqlite::params![node.id.to_string(), type_name, payload, timestamp,],
297 )
298 .map_err(|e| e.to_string())?;
299 Ok(())
300}
301
302fn try_insert_edge_ignore(conn: &rusqlite::Connection, edge: &SnapshotEdge) -> Result<(), String> {
303 let meta = match &edge.metadata {
304 Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
305 None => None,
306 };
307 conn.execute(
308 "INSERT OR IGNORE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
309 VALUES (?1, ?2, ?3, ?4, ?5)",
310 rusqlite::params![
311 edge.source_id.to_string(),
312 edge.target_id.to_string(),
313 edge.edge_type,
314 edge.weight,
315 meta.as_deref(),
316 ],
317 )
318 .map_err(|e| e.to_string())?;
319 Ok(())
320}
321
322impl SqliteGraphStore {
323 pub fn ensure_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
325 conn.execute(
326 "CREATE TABLE IF NOT EXISTS ainl_graph_nodes (
327 id TEXT PRIMARY KEY,
328 node_type TEXT NOT NULL,
329 payload TEXT NOT NULL,
330 timestamp INTEGER NOT NULL
331 )",
332 [],
333 )?;
334
335 conn.execute(
336 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_timestamp
337 ON ainl_graph_nodes(timestamp DESC)",
338 [],
339 )?;
340
341 conn.execute(
342 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_type
343 ON ainl_graph_nodes(node_type)",
344 [],
345 )?;
346
347 conn.execute(
348 "CREATE TABLE IF NOT EXISTS ainl_graph_edges (
349 from_id TEXT NOT NULL,
350 to_id TEXT NOT NULL,
351 label TEXT NOT NULL,
352 weight REAL NOT NULL DEFAULT 1.0,
353 metadata TEXT,
354 PRIMARY KEY (from_id, to_id, label),
355 FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
356 FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
357 )",
358 [],
359 )?;
360
361 conn.execute(
362 "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from
363 ON ainl_graph_edges(from_id, label)",
364 [],
365 )?;
366
367 migrate_edge_columns(conn)?;
368 migrate_edges_add_foreign_keys(conn)?;
369 Ok(())
370 }
371
372 pub fn open(path: &std::path::Path) -> Result<Self, String> {
374 let conn = rusqlite::Connection::open(path).map_err(|e| e.to_string())?;
375 enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
376 Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
377 Ok(Self { conn })
378 }
379
380 pub fn from_connection(conn: rusqlite::Connection) -> Result<Self, String> {
382 enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
383 Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
384 Ok(Self { conn })
385 }
386
387 pub(crate) fn conn(&self) -> &rusqlite::Connection {
389 &self.conn
390 }
391
392 pub fn insert_graph_edge(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
394 persist_edge(&self.conn, from_id, to_id, label, 1.0, None)
395 }
396
397 pub fn insert_graph_edge_checked(
399 &self,
400 from_id: Uuid,
401 to_id: Uuid,
402 label: &str,
403 ) -> Result<(), String> {
404 if !self.node_row_exists(&from_id.to_string())? {
405 return Err(format!(
406 "insert_graph_edge_checked: missing source node row {}",
407 from_id
408 ));
409 }
410 if !self.node_row_exists(&to_id.to_string())? {
411 return Err(format!(
412 "insert_graph_edge_checked: missing target node row {}",
413 to_id
414 ));
415 }
416 self.insert_graph_edge(from_id, to_id, label)
417 }
418
419 pub fn insert_graph_edge_with_meta(
421 &self,
422 from_id: Uuid,
423 to_id: Uuid,
424 label: &str,
425 weight: f32,
426 metadata: Option<&serde_json::Value>,
427 ) -> Result<(), String> {
428 let meta = metadata
429 .map(serde_json::to_string)
430 .transpose()
431 .map_err(|e| e.to_string())?;
432 persist_edge(&self.conn, from_id, to_id, label, weight, meta.as_deref())
433 }
434
435 pub fn query_nodes_by_type_since(
437 &self,
438 node_type: &str,
439 since_timestamp: i64,
440 limit: usize,
441 ) -> Result<Vec<AinlMemoryNode>, String> {
442 let mut stmt = self
443 .conn
444 .prepare(
445 "SELECT payload FROM ainl_graph_nodes
446 WHERE node_type = ?1 AND timestamp >= ?2
447 ORDER BY timestamp DESC
448 LIMIT ?3",
449 )
450 .map_err(|e| e.to_string())?;
451
452 let rows = stmt
453 .query_map(
454 rusqlite::params![node_type, since_timestamp, limit as i64],
455 |row| {
456 let payload: String = row.get(0)?;
457 Ok(payload)
458 },
459 )
460 .map_err(|e| e.to_string())?;
461
462 let mut nodes = Vec::new();
463 for row in rows {
464 let payload = row.map_err(|e| e.to_string())?;
465 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
466 nodes.push(node);
467 }
468
469 Ok(nodes)
470 }
471
472 pub fn read_runtime_state(&self, agent_id: &str) -> Result<Option<RuntimeStateNode>, String> {
476 if agent_id.is_empty() {
477 return Ok(None);
478 }
479 let mut stmt = self
480 .conn
481 .prepare(
482 "SELECT payload FROM ainl_graph_nodes
483 WHERE node_type = 'runtime_state'
484 AND json_extract(payload, '$.node_type.runtime_state.agent_id') = ?1
485 ORDER BY timestamp DESC
486 LIMIT 1",
487 )
488 .map_err(|e| e.to_string())?;
489
490 let payload_opt: Option<String> = stmt
491 .query_row([agent_id], |row| row.get(0))
492 .optional()
493 .map_err(|e| e.to_string())?;
494
495 let Some(payload) = payload_opt else {
496 return Ok(None);
497 };
498
499 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
500 match node.node_type {
501 AinlNodeType::RuntimeState { runtime_state } => Ok(Some(runtime_state)),
502 _ => Err("runtime_state row had unexpected node_type payload".to_string()),
503 }
504 }
505
506 pub fn write_runtime_state(&self, state: &RuntimeStateNode) -> Result<(), String> {
508 let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, state.agent_id.as_bytes());
509 let node = AinlMemoryNode {
510 id,
511 memory_category: MemoryCategory::RuntimeState,
512 importance_score: 0.5,
513 agent_id: state.agent_id.clone(),
514 node_type: AinlNodeType::RuntimeState {
515 runtime_state: state.clone(),
516 },
517 edges: Vec::new(),
518 };
519 self.write_node(&node)
520 }
521
522 pub fn write_node_with_edges(&mut self, node: &AinlMemoryNode) -> Result<(), String> {
524 let tx = self.conn.transaction().map_err(|e| e.to_string())?;
525 for edge in &node.edges {
526 let exists: Option<i32> = tx
527 .query_row(
528 "SELECT 1 FROM ainl_graph_nodes WHERE id = ?1",
529 [edge.target_id.to_string()],
530 |_| Ok(1),
531 )
532 .optional()
533 .map_err(|e| e.to_string())?;
534 if exists.is_none() {
535 return Err(format!(
536 "write_node_with_edges: missing target node {}",
537 edge.target_id
538 ));
539 }
540 }
541 persist_node(&tx, node)?;
542 tx.commit().map_err(|e| e.to_string())?;
543 Ok(())
544 }
545
546 pub fn validate_graph(&self, agent_id: &str) -> Result<GraphValidationReport, String> {
548 use std::collections::HashSet;
549
550 let agent_nodes = self.agent_node_ids(agent_id)?;
551 let node_count = agent_nodes.len();
552
553 let mut stmt = self
554 .conn
555 .prepare("SELECT from_id, to_id, label FROM ainl_graph_edges")
556 .map_err(|e| e.to_string())?;
557 let all_edges: Vec<(String, String, String)> = stmt
558 .query_map([], |row| {
559 Ok((
560 row.get::<_, String>(0)?,
561 row.get::<_, String>(1)?,
562 row.get::<_, String>(2)?,
563 ))
564 })
565 .map_err(|e| e.to_string())?
566 .collect::<Result<Vec<_>, _>>()
567 .map_err(|e| e.to_string())?;
568
569 let mut edge_pairs = Vec::new();
570 for (from_id, to_id, label) in all_edges {
571 let touches_agent = agent_nodes.contains(&from_id) || agent_nodes.contains(&to_id);
572 if touches_agent {
573 edge_pairs.push((from_id, to_id, label));
574 }
575 }
576
577 let edge_count = edge_pairs.len();
578 let mut dangling_edges = Vec::new();
579 let mut dangling_edge_details = Vec::new();
580 let mut cross_agent_boundary_edges = 0usize;
581
582 for (from_id, to_id, label) in &edge_pairs {
583 let from_ok = self.node_row_exists(from_id)?;
584 let to_ok = self.node_row_exists(to_id)?;
585 if !from_ok || !to_ok {
586 dangling_edges.push((from_id.clone(), to_id.clone()));
587 dangling_edge_details.push(DanglingEdgeDetail {
588 source_id: from_id.clone(),
589 target_id: to_id.clone(),
590 edge_type: label.clone(),
591 });
592 continue;
593 }
594 let fa = agent_nodes.contains(from_id);
595 let ta = agent_nodes.contains(to_id);
596 if fa ^ ta {
597 cross_agent_boundary_edges += 1;
598 }
599 }
600
601 let mut touched: HashSet<String> =
602 HashSet::with_capacity(edge_pairs.len().saturating_mul(2));
603 for (a, b, _) in &edge_pairs {
604 if agent_nodes.contains(a) {
605 touched.insert(a.clone());
606 }
607 if agent_nodes.contains(b) {
608 touched.insert(b.clone());
609 }
610 }
611
612 let mut orphan_nodes = Vec::new();
613 for id in &agent_nodes {
614 if !touched.contains(id) {
615 orphan_nodes.push(id.clone());
616 }
617 }
618
619 let is_valid = dangling_edges.is_empty();
620 Ok(GraphValidationReport {
621 agent_id: agent_id.to_string(),
622 node_count,
623 edge_count,
624 dangling_edges,
625 dangling_edge_details,
626 cross_agent_boundary_edges,
627 orphan_nodes,
628 is_valid,
629 })
630 }
631
632 fn node_row_exists(&self, id: &str) -> Result<bool, String> {
633 let v: Option<i32> = self
634 .conn
635 .query_row("SELECT 1 FROM ainl_graph_nodes WHERE id = ?1", [id], |_| {
636 Ok(1)
637 })
638 .optional()
639 .map_err(|e| e.to_string())?;
640 Ok(v.is_some())
641 }
642
643 fn agent_node_ids(&self, agent_id: &str) -> Result<HashSet<String>, String> {
644 let mut stmt = self
645 .conn
646 .prepare(
647 "SELECT id FROM ainl_graph_nodes
648 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
649 )
650 .map_err(|e| e.to_string())?;
651 let ids = stmt
652 .query_map([agent_id], |row| row.get::<_, String>(0))
653 .map_err(|e| e.to_string())?
654 .collect::<Result<HashSet<_>, _>>()
655 .map_err(|e| e.to_string())?;
656 Ok(ids)
657 }
658
659 pub fn agent_subgraph_edges(&self, agent_id: &str) -> Result<Vec<SnapshotEdge>, String> {
661 let id_set = self.agent_node_ids(agent_id)?;
662 collect_snapshot_edges_for_id_set(&self.conn, &id_set)
663 }
664
665 pub fn export_graph(&self, agent_id: &str) -> Result<AgentGraphSnapshot, String> {
667 let mut stmt = self
668 .conn
669 .prepare(
670 "SELECT payload FROM ainl_graph_nodes
671 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
672 )
673 .map_err(|e| e.to_string())?;
674 let nodes: Vec<AinlMemoryNode> = stmt
675 .query_map([agent_id], |row| {
676 let payload: String = row.get(0)?;
677 Ok(payload)
678 })
679 .map_err(|e| e.to_string())?
680 .map(|r| {
681 let payload = r.map_err(|e| e.to_string())?;
682 serde_json::from_str(&payload).map_err(|e| e.to_string())
683 })
684 .collect::<Result<Vec<_>, _>>()?;
685
686 let id_set: std::collections::HashSet<String> =
687 nodes.iter().map(|n| n.id.to_string()).collect();
688
689 let edges = collect_snapshot_edges_for_id_set(&self.conn, &id_set)?;
690
691 Ok(AgentGraphSnapshot {
692 agent_id: agent_id.to_string(),
693 exported_at: Utc::now(),
694 schema_version: std::borrow::Cow::Borrowed(SNAPSHOT_SCHEMA_VERSION),
695 nodes,
696 edges,
697 })
698 }
699
700 pub fn import_graph(
709 &mut self,
710 snapshot: &AgentGraphSnapshot,
711 allow_dangling_edges: bool,
712 ) -> Result<(), String> {
713 if allow_dangling_edges {
714 self.conn
715 .execute_batch("PRAGMA foreign_keys = OFF;")
716 .map_err(|e| e.to_string())?;
717 }
718
719 let result: Result<(), String> = (|| {
720 let tx = self.conn.transaction().map_err(|e| e.to_string())?;
721 for node in &snapshot.nodes {
722 try_insert_node_ignore(&tx, node)?;
723 }
724 for edge in &snapshot.edges {
725 try_insert_edge_ignore(&tx, edge)?;
726 }
727 tx.commit().map_err(|e| e.to_string())?;
728 Ok(())
729 })();
730
731 if allow_dangling_edges {
732 self.conn
733 .execute_batch("PRAGMA foreign_keys = ON;")
734 .map_err(|e| e.to_string())?;
735 }
736
737 result
738 }
739}
740
741impl GraphStore for SqliteGraphStore {
742 fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String> {
745 persist_node(&self.conn, node)
746 }
747
748 fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String> {
749 let payload: Option<String> = self
750 .conn
751 .query_row(
752 "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
753 [id.to_string()],
754 |row| row.get::<_, String>(0),
755 )
756 .optional()
757 .map_err(|e: rusqlite::Error| e.to_string())?;
758
759 match payload {
760 Some(p) => {
761 let node: AinlMemoryNode = serde_json::from_str(&p).map_err(|e| e.to_string())?;
762 Ok(Some(node))
763 }
764 None => Ok(None),
765 }
766 }
767
768 fn query_episodes_since(
769 &self,
770 since_timestamp: i64,
771 limit: usize,
772 ) -> Result<Vec<AinlMemoryNode>, String> {
773 let mut stmt = self
774 .conn
775 .prepare(
776 "SELECT payload FROM ainl_graph_nodes
777 WHERE node_type = 'episode' AND timestamp >= ?1
778 ORDER BY timestamp DESC
779 LIMIT ?2",
780 )
781 .map_err(|e| e.to_string())?;
782
783 let rows = stmt
784 .query_map([since_timestamp, limit as i64], |row| {
785 let payload: String = row.get(0)?;
786 Ok(payload)
787 })
788 .map_err(|e| e.to_string())?;
789
790 let mut nodes = Vec::new();
791 for row in rows {
792 let payload = row.map_err(|e| e.to_string())?;
793 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
794 nodes.push(node);
795 }
796
797 Ok(nodes)
798 }
799
800 fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String> {
801 let mut stmt = self
802 .conn
803 .prepare(
804 "SELECT payload FROM ainl_graph_nodes
805 WHERE node_type = ?1
806 ORDER BY timestamp DESC",
807 )
808 .map_err(|e| e.to_string())?;
809
810 let rows = stmt
811 .query_map([type_name], |row| {
812 let payload: String = row.get(0)?;
813 Ok(payload)
814 })
815 .map_err(|e| e.to_string())?;
816
817 let mut nodes = Vec::new();
818 for row in rows {
819 let payload = row.map_err(|e| e.to_string())?;
820 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
821 nodes.push(node);
822 }
823
824 Ok(nodes)
825 }
826
827 fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
828 let mut stmt = self
829 .conn
830 .prepare(
831 "SELECT to_id FROM ainl_graph_edges
832 WHERE from_id = ?1 AND label = ?2",
833 )
834 .map_err(|e| e.to_string())?;
835
836 let target_ids: Vec<String> = stmt
837 .query_map([from_id.to_string(), label.to_string()], |row| row.get(0))
838 .map_err(|e| e.to_string())?
839 .collect::<Result<Vec<_>, _>>()
840 .map_err(|e| e.to_string())?;
841
842 let mut nodes = Vec::new();
843 for target_id in target_ids {
844 let id = Uuid::parse_str(&target_id).map_err(|e| e.to_string())?;
845 if let Some(node) = self.read_node(id)? {
846 nodes.push(node);
847 }
848 }
849
850 Ok(nodes)
851 }
852}