1use crate::node::{AinlMemoryNode, AinlNodeType, MemoryCategory, RuntimeStateNode};
30use crate::snapshot::{
31 AgentGraphSnapshot, DanglingEdgeDetail, GraphValidationReport, SnapshotEdge, SNAPSHOT_SCHEMA_VERSION,
32};
33use chrono::Utc;
34use rusqlite::OptionalExtension;
35use std::collections::HashSet;
36use uuid::Uuid;
37
38pub trait GraphStore {
40 fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String>;
42
43 fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String>;
45
46 fn query_episodes_since(
48 &self,
49 since_timestamp: i64,
50 limit: usize,
51 ) -> Result<Vec<AinlMemoryNode>, String>;
52
53 fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String>;
55
56 fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String>;
58}
59
60pub struct SqliteGraphStore {
66 conn: rusqlite::Connection,
67}
68
69fn enable_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
70 conn.execute_batch("PRAGMA foreign_keys = ON;")
71}
72
73fn migrate_edge_columns(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
74 let mut stmt = conn.prepare("PRAGMA table_info(ainl_graph_edges)")?;
75 let cols = stmt
76 .query_map([], |row| row.get::<_, String>(1))?
77 .collect::<Result<Vec<_>, _>>()?;
78 if !cols.iter().any(|c| c == "weight") {
79 conn.execute(
80 "ALTER TABLE ainl_graph_edges ADD COLUMN weight REAL NOT NULL DEFAULT 1.0",
81 [],
82 )?;
83 }
84 if !cols.iter().any(|c| c == "metadata") {
85 conn.execute("ALTER TABLE ainl_graph_edges ADD COLUMN metadata TEXT", [])?;
86 }
87 Ok(())
88}
89
90fn edges_table_has_foreign_keys(conn: &rusqlite::Connection) -> Result<bool, rusqlite::Error> {
92 let exists: i64 = conn.query_row(
93 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
94 [],
95 |r| r.get(0),
96 )?;
97 if exists == 0 {
98 return Ok(false);
99 }
100 let n: i64 = conn.query_row(
101 "SELECT COUNT(*) FROM pragma_foreign_key_list('ainl_graph_edges')",
102 [],
103 |r| r.get(0),
104 )?;
105 Ok(n > 0)
106}
107
108fn migrate_edges_add_foreign_keys(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
111 if edges_table_has_foreign_keys(conn)? {
112 return Ok(());
113 }
114
115 let exists: i64 = conn.query_row(
116 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='ainl_graph_edges'",
117 [],
118 |r| r.get(0),
119 )?;
120 if exists == 0 {
121 return Ok(());
122 }
123
124 conn.execute("BEGIN IMMEDIATE", [])?;
125 let res: Result<(), rusqlite::Error> = (|| {
126 conn.execute("DROP INDEX IF EXISTS idx_ainl_edges_from", [])?;
127 conn.execute(
128 "ALTER TABLE ainl_graph_edges RENAME TO ainl_graph_edges__old",
129 [],
130 )?;
131 conn.execute(
132 r#"CREATE TABLE ainl_graph_edges (
133 from_id TEXT NOT NULL,
134 to_id TEXT NOT NULL,
135 label TEXT NOT NULL,
136 weight REAL NOT NULL DEFAULT 1.0,
137 metadata TEXT,
138 PRIMARY KEY (from_id, to_id, label),
139 FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
140 FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
141 )"#,
142 [],
143 )?;
144 conn.execute(
145 r#"INSERT INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
146 SELECT o.from_id, o.to_id, o.label,
147 COALESCE(o.weight, 1.0),
148 o.metadata
149 FROM ainl_graph_edges__old o
150 WHERE EXISTS (SELECT 1 FROM ainl_graph_nodes n WHERE n.id = o.from_id)
151 AND EXISTS (SELECT 1 FROM ainl_graph_nodes n2 WHERE n2.id = o.to_id)"#,
152 [],
153 )?;
154 conn.execute("DROP TABLE ainl_graph_edges__old", [])?;
155 conn.execute(
156 "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from ON ainl_graph_edges(from_id, label)",
157 [],
158 )?;
159 Ok(())
160 })();
161
162 match res {
163 Ok(()) => {
164 conn.execute("COMMIT", [])?;
165 }
166 Err(e) => {
167 let _ = conn.execute("ROLLBACK", []);
168 return Err(e);
169 }
170 }
171 Ok(())
172}
173
174fn node_type_name(node: &AinlMemoryNode) -> &'static str {
175 match &node.node_type {
176 AinlNodeType::Episode { .. } => "episode",
177 AinlNodeType::Semantic { .. } => "semantic",
178 AinlNodeType::Procedural { .. } => "procedural",
179 AinlNodeType::Persona { .. } => "persona",
180 AinlNodeType::RuntimeState { .. } => "runtime_state",
181 }
182}
183
184fn runtime_state_timestamp(rs: &RuntimeStateNode) -> i64 {
185 chrono::DateTime::parse_from_rfc3339(&rs.updated_at)
186 .map(|d| d.timestamp())
187 .unwrap_or_else(|_| Utc::now().timestamp())
188}
189
190fn node_timestamp(node: &AinlMemoryNode) -> i64 {
191 match &node.node_type {
192 AinlNodeType::Episode { episodic } => episodic.timestamp,
193 AinlNodeType::RuntimeState { runtime_state } => runtime_state_timestamp(runtime_state),
194 _ => chrono::Utc::now().timestamp(),
195 }
196}
197
198fn persist_edge(
199 conn: &rusqlite::Connection,
200 from_id: Uuid,
201 to_id: Uuid,
202 label: &str,
203 weight: f32,
204 metadata: Option<&str>,
205) -> Result<(), String> {
206 conn.execute(
207 "INSERT OR REPLACE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
208 VALUES (?1, ?2, ?3, ?4, ?5)",
209 rusqlite::params![from_id.to_string(), to_id.to_string(), label, weight, metadata],
210 )
211 .map_err(|e| e.to_string())?;
212 Ok(())
213}
214
215fn collect_snapshot_edges_for_id_set(
217 conn: &rusqlite::Connection,
218 id_set: &HashSet<String>,
219) -> Result<Vec<SnapshotEdge>, String> {
220 let mut edge_stmt = conn
221 .prepare("SELECT from_id, to_id, label, weight, metadata FROM ainl_graph_edges")
222 .map_err(|e| e.to_string())?;
223 let edge_rows = edge_stmt
224 .query_map([], |row| {
225 Ok((
226 row.get::<_, String>(0)?,
227 row.get::<_, String>(1)?,
228 row.get::<_, String>(2)?,
229 row.get::<_, f64>(3)?,
230 row.get::<_, Option<String>>(4)?,
231 ))
232 })
233 .map_err(|e| e.to_string())?
234 .collect::<Result<Vec<_>, _>>()
235 .map_err(|e| e.to_string())?;
236
237 let mut edges = Vec::new();
238 for (from_id, to_id, label, weight, meta) in edge_rows {
239 if !id_set.contains(&from_id) || !id_set.contains(&to_id) {
240 continue;
241 }
242 let source_id = Uuid::parse_str(&from_id).map_err(|e| e.to_string())?;
243 let target_id = Uuid::parse_str(&to_id).map_err(|e| e.to_string())?;
244 let metadata = match meta {
245 Some(s) if !s.is_empty() => Some(serde_json::from_str(&s).map_err(|e| e.to_string())?),
246 _ => None,
247 };
248 edges.push(SnapshotEdge {
249 source_id,
250 target_id,
251 edge_type: label,
252 weight: weight as f32,
253 metadata,
254 });
255 }
256 Ok(edges)
257}
258
259fn persist_node(conn: &rusqlite::Connection, node: &AinlMemoryNode) -> Result<(), String> {
260 let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
261 let type_name = node_type_name(node);
262 let timestamp = node_timestamp(node);
263
264 conn.execute(
265 "INSERT OR REPLACE INTO ainl_graph_nodes (id, node_type, payload, timestamp)
266 VALUES (?1, ?2, ?3, ?4)",
267 rusqlite::params![node.id.to_string(), type_name, payload, timestamp,],
268 )
269 .map_err(|e| e.to_string())?;
270
271 for edge in &node.edges {
272 persist_edge(conn, node.id, edge.target_id, &edge.label, 1.0, None::<&str>)?;
273 }
274
275 Ok(())
276}
277
278fn try_insert_node_ignore(conn: &rusqlite::Connection, node: &AinlMemoryNode) -> Result<(), String> {
279 let payload = serde_json::to_string(node).map_err(|e| e.to_string())?;
280 let type_name = node_type_name(node);
281 let timestamp = node_timestamp(node);
282 conn.execute(
283 "INSERT OR IGNORE INTO ainl_graph_nodes (id, node_type, payload, timestamp)
284 VALUES (?1, ?2, ?3, ?4)",
285 rusqlite::params![node.id.to_string(), type_name, payload, timestamp,],
286 )
287 .map_err(|e| e.to_string())?;
288 Ok(())
289}
290
291fn try_insert_edge_ignore(
292 conn: &rusqlite::Connection,
293 edge: &SnapshotEdge,
294) -> Result<(), String> {
295 let meta = match &edge.metadata {
296 Some(v) => Some(serde_json::to_string(v).map_err(|e| e.to_string())?),
297 None => None,
298 };
299 conn.execute(
300 "INSERT OR IGNORE INTO ainl_graph_edges (from_id, to_id, label, weight, metadata)
301 VALUES (?1, ?2, ?3, ?4, ?5)",
302 rusqlite::params![
303 edge.source_id.to_string(),
304 edge.target_id.to_string(),
305 edge.edge_type,
306 edge.weight,
307 meta.as_deref(),
308 ],
309 )
310 .map_err(|e| e.to_string())?;
311 Ok(())
312}
313
314impl SqliteGraphStore {
315 pub fn ensure_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
317 conn.execute(
318 "CREATE TABLE IF NOT EXISTS ainl_graph_nodes (
319 id TEXT PRIMARY KEY,
320 node_type TEXT NOT NULL,
321 payload TEXT NOT NULL,
322 timestamp INTEGER NOT NULL
323 )",
324 [],
325 )?;
326
327 conn.execute(
328 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_timestamp
329 ON ainl_graph_nodes(timestamp DESC)",
330 [],
331 )?;
332
333 conn.execute(
334 "CREATE INDEX IF NOT EXISTS idx_ainl_nodes_type
335 ON ainl_graph_nodes(node_type)",
336 [],
337 )?;
338
339 conn.execute(
340 "CREATE TABLE IF NOT EXISTS ainl_graph_edges (
341 from_id TEXT NOT NULL,
342 to_id TEXT NOT NULL,
343 label TEXT NOT NULL,
344 weight REAL NOT NULL DEFAULT 1.0,
345 metadata TEXT,
346 PRIMARY KEY (from_id, to_id, label),
347 FOREIGN KEY (from_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE,
348 FOREIGN KEY (to_id) REFERENCES ainl_graph_nodes(id) ON DELETE CASCADE
349 )",
350 [],
351 )?;
352
353 conn.execute(
354 "CREATE INDEX IF NOT EXISTS idx_ainl_edges_from
355 ON ainl_graph_edges(from_id, label)",
356 [],
357 )?;
358
359 migrate_edge_columns(conn)?;
360 migrate_edges_add_foreign_keys(conn)?;
361 Ok(())
362 }
363
364 pub fn open(path: &std::path::Path) -> Result<Self, String> {
366 let conn = rusqlite::Connection::open(path).map_err(|e| e.to_string())?;
367 enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
368 Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
369 Ok(Self { conn })
370 }
371
372 pub fn from_connection(conn: rusqlite::Connection) -> Result<Self, String> {
374 enable_foreign_keys(&conn).map_err(|e| e.to_string())?;
375 Self::ensure_schema(&conn).map_err(|e| e.to_string())?;
376 Ok(Self { conn })
377 }
378
379 pub(crate) fn conn(&self) -> &rusqlite::Connection {
381 &self.conn
382 }
383
384 pub fn insert_graph_edge(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
386 persist_edge(&self.conn, from_id, to_id, label, 1.0, None)
387 }
388
389 pub fn insert_graph_edge_checked(&self, from_id: Uuid, to_id: Uuid, label: &str) -> Result<(), String> {
391 if !self.node_row_exists(&from_id.to_string())? {
392 return Err(format!(
393 "insert_graph_edge_checked: missing source node row {}",
394 from_id
395 ));
396 }
397 if !self.node_row_exists(&to_id.to_string())? {
398 return Err(format!(
399 "insert_graph_edge_checked: missing target node row {}",
400 to_id
401 ));
402 }
403 self.insert_graph_edge(from_id, to_id, label)
404 }
405
406 pub fn insert_graph_edge_with_meta(
408 &self,
409 from_id: Uuid,
410 to_id: Uuid,
411 label: &str,
412 weight: f32,
413 metadata: Option<&serde_json::Value>,
414 ) -> Result<(), String> {
415 let meta = metadata.map(serde_json::to_string).transpose().map_err(|e| e.to_string())?;
416 persist_edge(&self.conn, from_id, to_id, label, weight, meta.as_deref())
417 }
418
419 pub fn query_nodes_by_type_since(
421 &self,
422 node_type: &str,
423 since_timestamp: i64,
424 limit: usize,
425 ) -> Result<Vec<AinlMemoryNode>, String> {
426 let mut stmt = self
427 .conn
428 .prepare(
429 "SELECT payload FROM ainl_graph_nodes
430 WHERE node_type = ?1 AND timestamp >= ?2
431 ORDER BY timestamp DESC
432 LIMIT ?3",
433 )
434 .map_err(|e| e.to_string())?;
435
436 let rows = stmt
437 .query_map(
438 rusqlite::params![node_type, since_timestamp, limit as i64],
439 |row| {
440 let payload: String = row.get(0)?;
441 Ok(payload)
442 },
443 )
444 .map_err(|e| e.to_string())?;
445
446 let mut nodes = Vec::new();
447 for row in rows {
448 let payload = row.map_err(|e| e.to_string())?;
449 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
450 nodes.push(node);
451 }
452
453 Ok(nodes)
454 }
455
456 pub fn load_runtime_state(&self, agent_id: &str) -> Result<Option<RuntimeStateNode>, String> {
460 if agent_id.is_empty() {
461 return Ok(None);
462 }
463 let mut stmt = self
464 .conn
465 .prepare(
466 "SELECT payload FROM ainl_graph_nodes
467 WHERE node_type = 'runtime_state'
468 AND json_extract(payload, '$.node_type.runtime_state.agent_id') = ?1
469 ORDER BY timestamp DESC
470 LIMIT 1",
471 )
472 .map_err(|e| e.to_string())?;
473
474 let payload_opt: Option<String> = stmt
475 .query_row([agent_id], |row| row.get(0))
476 .optional()
477 .map_err(|e| e.to_string())?;
478
479 let Some(payload) = payload_opt else {
480 return Ok(None);
481 };
482
483 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
484 match node.node_type {
485 AinlNodeType::RuntimeState { runtime_state } => Ok(Some(runtime_state)),
486 _ => Err("runtime_state row had unexpected node_type payload".to_string()),
487 }
488 }
489
490 pub fn save_runtime_state(&self, state: &RuntimeStateNode) -> Result<(), String> {
492 let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, state.agent_id.as_bytes());
493 let node = AinlMemoryNode {
494 id,
495 memory_category: MemoryCategory::RuntimeState,
496 importance_score: 0.5,
497 agent_id: state.agent_id.clone(),
498 node_type: AinlNodeType::RuntimeState {
499 runtime_state: state.clone(),
500 },
501 edges: Vec::new(),
502 };
503 self.write_node(&node)
504 }
505
506 pub fn write_node_with_edges(&mut self, node: &AinlMemoryNode) -> Result<(), String> {
508 let tx = self.conn.transaction().map_err(|e| e.to_string())?;
509 for edge in &node.edges {
510 let exists: Option<i32> = tx
511 .query_row(
512 "SELECT 1 FROM ainl_graph_nodes WHERE id = ?1",
513 [edge.target_id.to_string()],
514 |_| Ok(1),
515 )
516 .optional()
517 .map_err(|e| e.to_string())?;
518 if exists.is_none() {
519 return Err(format!(
520 "write_node_with_edges: missing target node {}",
521 edge.target_id
522 ));
523 }
524 }
525 persist_node(&tx, node)?;
526 tx.commit().map_err(|e| e.to_string())?;
527 Ok(())
528 }
529
530 pub fn validate_graph(&self, agent_id: &str) -> Result<GraphValidationReport, String> {
532 use std::collections::HashSet;
533
534 let agent_nodes = self.agent_node_ids(agent_id)?;
535 let node_count = agent_nodes.len();
536
537 let mut stmt = self
538 .conn
539 .prepare("SELECT from_id, to_id, label FROM ainl_graph_edges")
540 .map_err(|e| e.to_string())?;
541 let all_edges: Vec<(String, String, String)> = stmt
542 .query_map([], |row| {
543 Ok((
544 row.get::<_, String>(0)?,
545 row.get::<_, String>(1)?,
546 row.get::<_, String>(2)?,
547 ))
548 })
549 .map_err(|e| e.to_string())?
550 .collect::<Result<Vec<_>, _>>()
551 .map_err(|e| e.to_string())?;
552
553 let mut edge_pairs = Vec::new();
554 for (from_id, to_id, label) in all_edges {
555 let touches_agent =
556 agent_nodes.contains(&from_id) || agent_nodes.contains(&to_id);
557 if touches_agent {
558 edge_pairs.push((from_id, to_id, label));
559 }
560 }
561
562 let edge_count = edge_pairs.len();
563 let mut dangling_edges = Vec::new();
564 let mut dangling_edge_details = Vec::new();
565 let mut cross_agent_boundary_edges = 0usize;
566
567 for (from_id, to_id, label) in &edge_pairs {
568 let from_ok = self.node_row_exists(from_id)?;
569 let to_ok = self.node_row_exists(to_id)?;
570 if !from_ok || !to_ok {
571 dangling_edges.push((from_id.clone(), to_id.clone()));
572 dangling_edge_details.push(DanglingEdgeDetail {
573 source_id: from_id.clone(),
574 target_id: to_id.clone(),
575 edge_type: label.clone(),
576 });
577 continue;
578 }
579 let fa = agent_nodes.contains(from_id);
580 let ta = agent_nodes.contains(to_id);
581 if fa ^ ta {
582 cross_agent_boundary_edges += 1;
583 }
584 }
585
586 let mut touched: HashSet<String> = HashSet::with_capacity(edge_pairs.len().saturating_mul(2));
587 for (a, b, _) in &edge_pairs {
588 if agent_nodes.contains(a) {
589 touched.insert(a.clone());
590 }
591 if agent_nodes.contains(b) {
592 touched.insert(b.clone());
593 }
594 }
595
596 let mut orphan_nodes = Vec::new();
597 for id in &agent_nodes {
598 if !touched.contains(id) {
599 orphan_nodes.push(id.clone());
600 }
601 }
602
603 let is_valid = dangling_edges.is_empty();
604 Ok(GraphValidationReport {
605 agent_id: agent_id.to_string(),
606 node_count,
607 edge_count,
608 dangling_edges,
609 dangling_edge_details,
610 cross_agent_boundary_edges,
611 orphan_nodes,
612 is_valid,
613 })
614 }
615
616 fn node_row_exists(&self, id: &str) -> Result<bool, String> {
617 let v: Option<i32> = self
618 .conn
619 .query_row("SELECT 1 FROM ainl_graph_nodes WHERE id = ?1", [id], |_| Ok(1))
620 .optional()
621 .map_err(|e| e.to_string())?;
622 Ok(v.is_some())
623 }
624
625 fn agent_node_ids(&self, agent_id: &str) -> Result<HashSet<String>, String> {
626 let mut stmt = self
627 .conn
628 .prepare(
629 "SELECT id FROM ainl_graph_nodes
630 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
631 )
632 .map_err(|e| e.to_string())?;
633 let ids = stmt
634 .query_map([agent_id], |row| row.get::<_, String>(0))
635 .map_err(|e| e.to_string())?
636 .collect::<Result<HashSet<_>, _>>()
637 .map_err(|e| e.to_string())?;
638 Ok(ids)
639 }
640
641 pub fn agent_subgraph_edges(&self, agent_id: &str) -> Result<Vec<SnapshotEdge>, String> {
643 let id_set = self.agent_node_ids(agent_id)?;
644 collect_snapshot_edges_for_id_set(&self.conn, &id_set)
645 }
646
647 pub fn export_graph(&self, agent_id: &str) -> Result<AgentGraphSnapshot, String> {
649 let mut stmt = self
650 .conn
651 .prepare(
652 "SELECT payload FROM ainl_graph_nodes
653 WHERE COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
654 )
655 .map_err(|e| e.to_string())?;
656 let nodes: Vec<AinlMemoryNode> = stmt
657 .query_map([agent_id], |row| {
658 let payload: String = row.get(0)?;
659 Ok(payload)
660 })
661 .map_err(|e| e.to_string())?
662 .map(|r| {
663 let payload = r.map_err(|e| e.to_string())?;
664 serde_json::from_str(&payload).map_err(|e| e.to_string())
665 })
666 .collect::<Result<Vec<_>, _>>()?;
667
668 let id_set: std::collections::HashSet<String> =
669 nodes.iter().map(|n| n.id.to_string()).collect();
670
671 let edges = collect_snapshot_edges_for_id_set(&self.conn, &id_set)?;
672
673 Ok(AgentGraphSnapshot {
674 agent_id: agent_id.to_string(),
675 exported_at: Utc::now(),
676 schema_version: std::borrow::Cow::Borrowed(SNAPSHOT_SCHEMA_VERSION),
677 nodes,
678 edges,
679 })
680 }
681
682 pub fn import_graph(
691 &mut self,
692 snapshot: &AgentGraphSnapshot,
693 allow_dangling_edges: bool,
694 ) -> Result<(), String> {
695 if allow_dangling_edges {
696 self.conn
697 .execute_batch("PRAGMA foreign_keys = OFF;")
698 .map_err(|e| e.to_string())?;
699 }
700
701 let result: Result<(), String> = (|| {
702 let tx = self.conn.transaction().map_err(|e| e.to_string())?;
703 for node in &snapshot.nodes {
704 try_insert_node_ignore(&tx, node)?;
705 }
706 for edge in &snapshot.edges {
707 try_insert_edge_ignore(&tx, edge)?;
708 }
709 tx.commit().map_err(|e| e.to_string())?;
710 Ok(())
711 })();
712
713 if allow_dangling_edges {
714 self.conn
715 .execute_batch("PRAGMA foreign_keys = ON;")
716 .map_err(|e| e.to_string())?;
717 }
718
719 result
720 }
721}
722
723impl GraphStore for SqliteGraphStore {
724 fn write_node(&self, node: &AinlMemoryNode) -> Result<(), String> {
727 persist_node(&self.conn, node)
728 }
729
730 fn read_node(&self, id: Uuid) -> Result<Option<AinlMemoryNode>, String> {
731 let payload: Option<String> = self
732 .conn
733 .query_row(
734 "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
735 [id.to_string()],
736 |row| row.get::<_, String>(0),
737 )
738 .optional()
739 .map_err(|e: rusqlite::Error| e.to_string())?;
740
741 match payload {
742 Some(p) => {
743 let node: AinlMemoryNode = serde_json::from_str(&p).map_err(|e| e.to_string())?;
744 Ok(Some(node))
745 }
746 None => Ok(None),
747 }
748 }
749
750 fn query_episodes_since(
751 &self,
752 since_timestamp: i64,
753 limit: usize,
754 ) -> Result<Vec<AinlMemoryNode>, String> {
755 let mut stmt = self
756 .conn
757 .prepare(
758 "SELECT payload FROM ainl_graph_nodes
759 WHERE node_type = 'episode' AND timestamp >= ?1
760 ORDER BY timestamp DESC
761 LIMIT ?2",
762 )
763 .map_err(|e| e.to_string())?;
764
765 let rows = stmt
766 .query_map([since_timestamp, limit as i64], |row| {
767 let payload: String = row.get(0)?;
768 Ok(payload)
769 })
770 .map_err(|e| e.to_string())?;
771
772 let mut nodes = Vec::new();
773 for row in rows {
774 let payload = row.map_err(|e| e.to_string())?;
775 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
776 nodes.push(node);
777 }
778
779 Ok(nodes)
780 }
781
782 fn find_by_type(&self, type_name: &str) -> Result<Vec<AinlMemoryNode>, String> {
783 let mut stmt = self
784 .conn
785 .prepare(
786 "SELECT payload FROM ainl_graph_nodes
787 WHERE node_type = ?1
788 ORDER BY timestamp DESC",
789 )
790 .map_err(|e| e.to_string())?;
791
792 let rows = stmt
793 .query_map([type_name], |row| {
794 let payload: String = row.get(0)?;
795 Ok(payload)
796 })
797 .map_err(|e| e.to_string())?;
798
799 let mut nodes = Vec::new();
800 for row in rows {
801 let payload = row.map_err(|e| e.to_string())?;
802 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
803 nodes.push(node);
804 }
805
806 Ok(nodes)
807 }
808
809 fn walk_edges(&self, from_id: Uuid, label: &str) -> Result<Vec<AinlMemoryNode>, String> {
810 let mut stmt = self
811 .conn
812 .prepare(
813 "SELECT to_id FROM ainl_graph_edges
814 WHERE from_id = ?1 AND label = ?2",
815 )
816 .map_err(|e| e.to_string())?;
817
818 let target_ids: Vec<String> = stmt
819 .query_map([from_id.to_string(), label.to_string()], |row| row.get(0))
820 .map_err(|e| e.to_string())?
821 .collect::<Result<Vec<_>, _>>()
822 .map_err(|e| e.to_string())?;
823
824 let mut nodes = Vec::new();
825 for target_id in target_ids {
826 let id = Uuid::parse_str(&target_id).map_err(|e| e.to_string())?;
827 if let Some(node) = self.read_node(id)? {
828 nodes.push(node);
829 }
830 }
831
832 Ok(nodes)
833 }
834}