1use crate::node::{
15 AinlMemoryNode, AinlNodeType, EpisodicNode, PersonaLayer, PersonaNode, ProceduralNode,
16 ProcedureType, RuntimeStateNode, SemanticNode, StrengthEvent,
17};
18use crate::snapshot::SnapshotEdge;
19use crate::store::{GraphStore, SqliteGraphStore};
20use chrono::{DateTime, Utc};
21use rusqlite::{params, OptionalExtension};
22use std::collections::{HashMap, HashSet, VecDeque};
23use uuid::Uuid;
24
25fn node_matches_agent(node: &AinlMemoryNode, agent_id: &str) -> bool {
26 node.agent_id.is_empty() || node.agent_id == agent_id
27}
28
29pub fn walk_from(
31 store: &dyn GraphStore,
32 start_id: Uuid,
33 edge_label: &str,
34 max_depth: usize,
35) -> Result<Vec<AinlMemoryNode>, String> {
36 let mut visited = std::collections::HashSet::new();
37 let mut result = Vec::new();
38 let mut current_level = vec![start_id];
39
40 for _ in 0..max_depth {
41 if current_level.is_empty() {
42 break;
43 }
44
45 let mut next_level = Vec::new();
46
47 for node_id in current_level {
48 if visited.contains(&node_id) {
49 continue;
50 }
51 visited.insert(node_id);
52
53 if let Some(node) = store.read_node(node_id)? {
54 result.push(node.clone());
55
56 for next_node in store.walk_edges(node_id, edge_label)? {
57 if !visited.contains(&next_node.id) {
58 next_level.push(next_node.id);
59 }
60 }
61 }
62 }
63
64 current_level = next_level;
65 }
66
67 Ok(result)
68}
69
70pub fn recall_recent(
72 store: &dyn GraphStore,
73 since_timestamp: i64,
74 limit: usize,
75 tool_filter: Option<&str>,
76) -> Result<Vec<AinlMemoryNode>, String> {
77 let episodes = store.query_episodes_since(since_timestamp, limit)?;
78
79 if let Some(tool_name) = tool_filter {
80 Ok(episodes
81 .into_iter()
82 .filter(|node| match &node.node_type {
83 AinlNodeType::Episode { episodic } => {
84 episodic.effective_tools().contains(&tool_name.to_string())
85 }
86 _ => false,
87 })
88 .collect())
89 } else {
90 Ok(episodes)
91 }
92}
93
94pub fn find_patterns(
96 store: &dyn GraphStore,
97 name_prefix: &str,
98) -> Result<Vec<AinlMemoryNode>, String> {
99 let all_procedural = store.find_by_type("procedural")?;
100
101 Ok(all_procedural
102 .into_iter()
103 .filter(|node| match &node.node_type {
104 AinlNodeType::Procedural { procedural } => {
105 procedural.pattern_name.starts_with(name_prefix)
106 }
107 _ => false,
108 })
109 .collect())
110}
111
112pub fn find_high_confidence_facts(
114 store: &dyn GraphStore,
115 min_confidence: f32,
116) -> Result<Vec<AinlMemoryNode>, String> {
117 let all_semantic = store.find_by_type("semantic")?;
118
119 Ok(all_semantic
120 .into_iter()
121 .filter(|node| match &node.node_type {
122 AinlNodeType::Semantic { semantic } => semantic.confidence >= min_confidence,
123 _ => false,
124 })
125 .collect())
126}
127
128pub fn find_strong_traits(store: &dyn GraphStore) -> Result<Vec<AinlMemoryNode>, String> {
130 let mut all_persona = store.find_by_type("persona")?;
131
132 all_persona.sort_by(|a, b| {
133 let strength_a = match &a.node_type {
134 AinlNodeType::Persona { persona } => persona.strength,
135 _ => 0.0,
136 };
137 let strength_b = match &b.node_type {
138 AinlNodeType::Persona { persona } => persona.strength,
139 _ => 0.0,
140 };
141 strength_b
142 .partial_cmp(&strength_a)
143 .unwrap_or(std::cmp::Ordering::Equal)
144 });
145
146 Ok(all_persona)
147}
148
149pub fn recall_by_topic_cluster(
152 store: &dyn GraphStore,
153 agent_id: &str,
154 cluster: &str,
155) -> Result<Vec<SemanticNode>, String> {
156 let mut out = Vec::new();
157 for node in store.find_by_type("semantic")? {
158 if !node_matches_agent(&node, agent_id) {
159 continue;
160 }
161 if let AinlNodeType::Semantic { semantic } = &node.node_type {
162 if semantic.topic_cluster.as_deref() == Some(cluster) {
163 out.push(semantic.clone());
164 }
165 }
166 }
167 Ok(out)
168}
169
170pub fn recall_contradictions(
171 store: &dyn GraphStore,
172 node_id: Uuid,
173) -> Result<Vec<SemanticNode>, String> {
174 let Some(node) = store.read_node(node_id)? else {
175 return Ok(Vec::new());
176 };
177 let contradiction_ids: Vec<String> = match &node.node_type {
178 AinlNodeType::Semantic { semantic } => semantic.contradiction_ids.clone(),
179 _ => return Ok(Vec::new()),
180 };
181 let mut out = Vec::new();
182 for cid in contradiction_ids {
183 if let Ok(uuid) = Uuid::parse_str(&cid) {
184 if let Some(n) = store.read_node(uuid)? {
185 if let AinlNodeType::Semantic { semantic } = &n.node_type {
186 out.push(semantic.clone());
187 }
188 }
189 }
190 }
191 Ok(out)
192}
193
194pub fn count_by_topic_cluster(
195 store: &dyn GraphStore,
196 agent_id: &str,
197) -> Result<HashMap<String, usize>, String> {
198 let mut counts: HashMap<String, usize> = HashMap::new();
199 for node in store.find_by_type("semantic")? {
200 if !node_matches_agent(&node, agent_id) {
201 continue;
202 }
203 if let AinlNodeType::Semantic { semantic } = &node.node_type {
204 if let Some(cluster) = semantic.topic_cluster.as_deref() {
205 if cluster.is_empty() {
206 continue;
207 }
208 *counts.entry(cluster.to_string()).or_insert(0) += 1;
209 }
210 }
211 }
212 Ok(counts)
213}
214
215pub fn recall_flagged_episodes(
218 store: &dyn GraphStore,
219 agent_id: &str,
220 limit: usize,
221) -> Result<Vec<EpisodicNode>, String> {
222 let mut out: Vec<(i64, EpisodicNode)> = Vec::new();
223 for node in store.find_by_type("episode")? {
224 if !node_matches_agent(&node, agent_id) {
225 continue;
226 }
227 if let AinlNodeType::Episode { episodic } = &node.node_type {
228 if episodic.flagged {
229 out.push((episodic.timestamp, episodic.clone()));
230 }
231 }
232 }
233 out.sort_by(|a, b| b.0.cmp(&a.0));
234 out.truncate(limit);
235 Ok(out.into_iter().map(|(_, e)| e).collect())
236}
237
238pub fn recall_episodes_by_conversation(
239 store: &dyn GraphStore,
240 conversation_id: &str,
241) -> Result<Vec<EpisodicNode>, String> {
242 let mut out: Vec<(u32, EpisodicNode)> = Vec::new();
243 for node in store.find_by_type("episode")? {
244 if let AinlNodeType::Episode { episodic } = &node.node_type {
245 if episodic.conversation_id == conversation_id {
246 out.push((episodic.turn_index, episodic.clone()));
247 }
248 }
249 }
250 out.sort_by(|a, b| a.0.cmp(&b.0));
251 Ok(out.into_iter().map(|(_, e)| e).collect())
252}
253
254pub fn recall_episodes_with_signal(
255 store: &dyn GraphStore,
256 agent_id: &str,
257 signal_type: &str,
258) -> Result<Vec<EpisodicNode>, String> {
259 let mut out = Vec::new();
260 for node in store.find_by_type("episode")? {
261 if !node_matches_agent(&node, agent_id) {
262 continue;
263 }
264 if let AinlNodeType::Episode { episodic } = &node.node_type {
265 if episodic
266 .persona_signals_emitted
267 .iter()
268 .any(|s| s == signal_type)
269 {
270 out.push(episodic.clone());
271 }
272 }
273 }
274 Ok(out)
275}
276
277pub fn recall_task_scoped_episodes(
281 store: &dyn GraphStore,
282 agent_id: &str,
283 conversation_id: Option<&str>,
284 topic_tags: &[String],
285 limit: usize,
286) -> Result<Vec<EpisodicNode>, String> {
287 let mut in_conversation: Vec<(i64, EpisodicNode)> = Vec::new();
288 let mut by_topic: Vec<(i64, EpisodicNode)> = Vec::new();
289 for node in store.find_by_type("episode")? {
290 if !node_matches_agent(&node, agent_id) {
291 continue;
292 }
293 let AinlNodeType::Episode { episodic } = &node.node_type else {
294 continue;
295 };
296 if let Some(cid) = conversation_id {
297 if !cid.is_empty() && episodic.conversation_id == cid {
298 in_conversation.push((episodic.timestamp, episodic.clone()));
299 continue;
300 }
301 }
302 if !topic_tags.is_empty()
303 && episodic
304 .tags
305 .iter()
306 .any(|t| topic_tags.iter().any(|q| q == t))
307 {
308 by_topic.push((episodic.timestamp, episodic.clone()));
309 }
310 }
311 in_conversation.sort_by(|a, b| b.0.cmp(&a.0));
312 by_topic.sort_by(|a, b| b.0.cmp(&a.0));
313 let mut out: Vec<EpisodicNode> = in_conversation.into_iter().map(|(_, e)| e).collect();
314 if out.len() < limit {
315 for (_, ep) in by_topic {
316 if out.len() >= limit {
317 break;
318 }
319 if !out.iter().any(|e| e.turn_id == ep.turn_id) {
320 out.push(ep);
321 }
322 }
323 }
324 out.truncate(limit);
325 Ok(out)
326}
327
328pub fn recall_by_procedure_type(
331 store: &dyn GraphStore,
332 agent_id: &str,
333 procedure_type: ProcedureType,
334) -> Result<Vec<ProceduralNode>, String> {
335 let mut out = Vec::new();
336 for node in store.find_by_type("procedural")? {
337 if !node_matches_agent(&node, agent_id) {
338 continue;
339 }
340 if let AinlNodeType::Procedural { procedural } = &node.node_type {
341 if procedural.procedure_type == procedure_type {
342 out.push(procedural.clone());
343 }
344 }
345 }
346 Ok(out)
347}
348
349pub fn recall_low_success_procedures(
350 store: &dyn GraphStore,
351 agent_id: &str,
352 threshold: f32,
353) -> Result<Vec<ProceduralNode>, String> {
354 let mut out = Vec::new();
355 for node in store.find_by_type("procedural")? {
356 if !node_matches_agent(&node, agent_id) {
357 continue;
358 }
359 if let AinlNodeType::Procedural { procedural } = &node.node_type {
360 let total = procedural
361 .success_count
362 .saturating_add(procedural.failure_count);
363 if total > 0 && procedural.success_rate < threshold {
364 out.push(procedural.clone());
365 }
366 }
367 }
368 Ok(out)
369}
370
371pub fn recall_strength_history(
374 store: &dyn GraphStore,
375 node_id: Uuid,
376) -> Result<Vec<StrengthEvent>, String> {
377 let Some(node) = store.read_node(node_id)? else {
378 return Ok(Vec::new());
379 };
380 let mut events = match &node.node_type {
381 AinlNodeType::Persona { persona } => persona.evolution_log.clone(),
382 _ => return Ok(Vec::new()),
383 };
384 events.sort_by_key(|e| e.timestamp);
385 Ok(events)
386}
387
388pub fn recall_delta_by_relevance(
389 store: &dyn GraphStore,
390 agent_id: &str,
391 min_relevance: f32,
392) -> Result<Vec<PersonaNode>, String> {
393 let mut out = Vec::new();
394 for node in store.find_by_type("persona")? {
395 if !node_matches_agent(&node, agent_id) {
396 continue;
397 }
398 if let AinlNodeType::Persona { persona } = &node.node_type {
399 if persona.layer == PersonaLayer::Delta && persona.relevance_score >= min_relevance {
400 out.push(persona.clone());
401 }
402 }
403 }
404 Ok(out)
405}
406
407pub struct GraphQuery<'a> {
411 store: &'a SqliteGraphStore,
412 agent_id: String,
413}
414
415impl SqliteGraphStore {
416 pub fn query<'a>(&'a self, agent_id: &str) -> GraphQuery<'a> {
417 GraphQuery {
418 store: self,
419 agent_id: agent_id.to_string(),
420 }
421 }
422}
423
424fn load_nodes_from_payload_rows(
425 rows: impl Iterator<Item = Result<String, rusqlite::Error>>,
426) -> Result<Vec<AinlMemoryNode>, String> {
427 let mut out = Vec::new();
428 for row in rows {
429 let payload = row.map_err(|e| e.to_string())?;
430 let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
431 out.push(node);
432 }
433 Ok(out)
434}
435
436impl<'a> GraphQuery<'a> {
437 fn conn(&self) -> &rusqlite::Connection {
438 self.store.conn()
439 }
440
441 pub fn episodes(&self) -> Result<Vec<AinlMemoryNode>, String> {
442 let mut stmt = self
443 .conn()
444 .prepare(
445 "SELECT payload FROM ainl_graph_nodes
446 WHERE node_type = 'episode'
447 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
448 )
449 .map_err(|e| e.to_string())?;
450 let rows = stmt
451 .query_map(params![&self.agent_id], |row| row.get::<_, String>(0))
452 .map_err(|e| e.to_string())?;
453 load_nodes_from_payload_rows(rows)
454 }
455
456 pub fn semantic_nodes(&self) -> Result<Vec<AinlMemoryNode>, String> {
457 let mut stmt = self
458 .conn()
459 .prepare(
460 "SELECT payload FROM ainl_graph_nodes
461 WHERE node_type = 'semantic'
462 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
463 )
464 .map_err(|e| e.to_string())?;
465 let rows = stmt
466 .query_map(params![&self.agent_id], |row| row.get::<_, String>(0))
467 .map_err(|e| e.to_string())?;
468 load_nodes_from_payload_rows(rows)
469 }
470
471 pub fn procedural_nodes(&self) -> Result<Vec<AinlMemoryNode>, String> {
472 let mut stmt = self
473 .conn()
474 .prepare(
475 "SELECT payload FROM ainl_graph_nodes
476 WHERE node_type = 'procedural'
477 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
478 )
479 .map_err(|e| e.to_string())?;
480 let rows = stmt
481 .query_map(params![&self.agent_id], |row| row.get::<_, String>(0))
482 .map_err(|e| e.to_string())?;
483 load_nodes_from_payload_rows(rows)
484 }
485
486 pub fn persona_nodes(&self) -> Result<Vec<AinlMemoryNode>, String> {
487 let mut stmt = self
488 .conn()
489 .prepare(
490 "SELECT payload FROM ainl_graph_nodes
491 WHERE node_type = 'persona'
492 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
493 )
494 .map_err(|e| e.to_string())?;
495 let rows = stmt
496 .query_map(params![&self.agent_id], |row| row.get::<_, String>(0))
497 .map_err(|e| e.to_string())?;
498 load_nodes_from_payload_rows(rows)
499 }
500
501 pub fn recent_episodes(&self, limit: usize) -> Result<Vec<AinlMemoryNode>, String> {
502 let mut stmt = self
503 .conn()
504 .prepare(
505 "SELECT payload FROM ainl_graph_nodes
506 WHERE node_type = 'episode'
507 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
508 ORDER BY timestamp DESC
509 LIMIT ?2",
510 )
511 .map_err(|e| e.to_string())?;
512 let rows = stmt
513 .query_map(params![&self.agent_id, limit as i64], |row| {
514 row.get::<_, String>(0)
515 })
516 .map_err(|e| e.to_string())?;
517 load_nodes_from_payload_rows(rows)
518 }
519
520 pub fn since(&self, ts: DateTime<Utc>, node_type: &str) -> Result<Vec<AinlMemoryNode>, String> {
521 let col = node_type.to_ascii_lowercase();
522 let since_ts = ts.timestamp();
523 let mut stmt = self
524 .conn()
525 .prepare(
526 "SELECT payload FROM ainl_graph_nodes
527 WHERE node_type = ?1
528 AND timestamp >= ?2
529 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?3
530 ORDER BY timestamp ASC",
531 )
532 .map_err(|e| e.to_string())?;
533 let rows = stmt
534 .query_map(params![&col, since_ts, &self.agent_id], |row| {
535 row.get::<_, String>(0)
536 })
537 .map_err(|e| e.to_string())?;
538 load_nodes_from_payload_rows(rows)
539 }
540
541 pub fn subgraph_edges(&self) -> Result<Vec<SnapshotEdge>, String> {
543 self.store.agent_subgraph_edges(&self.agent_id)
544 }
545
546 pub fn neighbors(&self, node_id: Uuid, edge_type: &str) -> Result<Vec<AinlMemoryNode>, String> {
547 let mut stmt = self
548 .conn()
549 .prepare(
550 "SELECT to_id FROM ainl_graph_edges
551 WHERE from_id = ?1 AND label = ?2",
552 )
553 .map_err(|e| e.to_string())?;
554 let ids: Vec<String> = stmt
555 .query_map(params![node_id.to_string(), edge_type], |row| row.get(0))
556 .map_err(|e| e.to_string())?
557 .collect::<Result<Vec<_>, _>>()
558 .map_err(|e| e.to_string())?;
559 let mut out = Vec::new();
560 for sid in ids {
561 let id = Uuid::parse_str(&sid).map_err(|e| e.to_string())?;
562 if let Some(n) = self.store.read_node(id)? {
563 out.push(n);
564 }
565 }
566 Ok(out)
567 }
568
569 pub fn lineage(&self, node_id: Uuid) -> Result<Vec<AinlMemoryNode>, String> {
570 let mut visited: HashSet<Uuid> = HashSet::new();
571 let mut out = Vec::new();
572 let mut queue: VecDeque<(Uuid, u32)> = VecDeque::new();
573 visited.insert(node_id);
574 queue.push_back((node_id, 0));
575
576 while let Some((nid, depth)) = queue.pop_front() {
577 if depth >= 20 {
578 continue;
579 }
580 let mut stmt = self
581 .conn()
582 .prepare(
583 "SELECT to_id FROM ainl_graph_edges
584 WHERE from_id = ?1 AND label IN ('DERIVED_FROM', 'CAUSED_PATCH')",
585 )
586 .map_err(|e| e.to_string())?;
587 let targets: Vec<String> = stmt
588 .query_map(params![nid.to_string()], |row| row.get(0))
589 .map_err(|e| e.to_string())?
590 .collect::<Result<Vec<_>, _>>()
591 .map_err(|e| e.to_string())?;
592 for sid in targets {
593 let tid = Uuid::parse_str(&sid).map_err(|e| e.to_string())?;
594 if visited.insert(tid) {
595 if let Some(n) = self.store.read_node(tid)? {
596 out.push(n.clone());
597 queue.push_back((tid, depth + 1));
598 }
599 }
600 }
601 }
602
603 Ok(out)
604 }
605
606 pub fn by_tag(&self, tag: &str) -> Result<Vec<AinlMemoryNode>, String> {
607 let mut stmt = self
608 .conn()
609 .prepare(
610 "SELECT DISTINCT n.payload FROM ainl_graph_nodes n
611 WHERE COALESCE(json_extract(n.payload, '$.agent_id'), '') = ?1
612 AND (
613 EXISTS (
614 SELECT 1 FROM json_each(n.payload, '$.node_type.persona_signals_emitted') j
615 WHERE j.value = ?2
616 )
617 OR EXISTS (
618 SELECT 1 FROM json_each(n.payload, '$.node_type.tags') j
619 WHERE j.value = ?2
620 )
621 )",
622 )
623 .map_err(|e| e.to_string())?;
624 let rows = stmt
625 .query_map(params![&self.agent_id, tag], |row| row.get::<_, String>(0))
626 .map_err(|e| e.to_string())?;
627 load_nodes_from_payload_rows(rows)
628 }
629
630 pub fn latest_semantic_with_tag(&self, tag: &str) -> Result<Option<AinlMemoryNode>, String> {
632 let mut stmt = self
633 .conn()
634 .prepare(
635 "SELECT payload FROM ainl_graph_nodes
636 WHERE node_type = 'semantic'
637 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
638 AND EXISTS (
639 SELECT 1 FROM json_each(json_extract(payload, '$.node_type.tags')) j
640 WHERE j.value = ?2
641 )
642 ORDER BY timestamp DESC, id DESC
643 LIMIT 1",
644 )
645 .map_err(|e| e.to_string())?;
646 let row = stmt
647 .query_row(params![&self.agent_id, tag], |row| row.get::<_, String>(0))
648 .optional()
649 .map_err(|e| e.to_string())?;
650 match row {
651 Some(payload) => {
652 let node: AinlMemoryNode =
653 serde_json::from_str(&payload).map_err(|e| e.to_string())?;
654 Ok(Some(node))
655 }
656 None => Ok(None),
657 }
658 }
659
660 pub fn by_topic_cluster(&self, cluster: &str) -> Result<Vec<AinlMemoryNode>, String> {
661 let like = format!("%{cluster}%");
662 let mut stmt = self
663 .conn()
664 .prepare(
665 "SELECT payload FROM ainl_graph_nodes
666 WHERE node_type = 'semantic'
667 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
668 AND json_extract(payload, '$.node_type.topic_cluster') LIKE ?2 ESCAPE '\\'",
669 )
670 .map_err(|e| e.to_string())?;
671 let rows = stmt
672 .query_map(params![&self.agent_id, like], |row| row.get::<_, String>(0))
673 .map_err(|e| e.to_string())?;
674 load_nodes_from_payload_rows(rows)
675 }
676
677 pub fn pattern_by_name(&self, name: &str) -> Result<Option<AinlMemoryNode>, String> {
678 let mut stmt = self
679 .conn()
680 .prepare(
681 "SELECT payload FROM ainl_graph_nodes
682 WHERE node_type = 'procedural'
683 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
684 AND (
685 json_extract(payload, '$.node_type.pattern_name') = ?2
686 OR json_extract(payload, '$.node_type.label') = ?2
687 )
688 ORDER BY timestamp DESC
689 LIMIT 1",
690 )
691 .map_err(|e| e.to_string())?;
692 let row = stmt
693 .query_row(params![&self.agent_id, name], |row| row.get::<_, String>(0))
694 .optional()
695 .map_err(|e| e.to_string())?;
696 match row {
697 Some(payload) => {
698 let node: AinlMemoryNode =
699 serde_json::from_str(&payload).map_err(|e| e.to_string())?;
700 Ok(Some(node))
701 }
702 None => Ok(None),
703 }
704 }
705
706 pub fn active_patches(&self) -> Result<Vec<AinlMemoryNode>, String> {
707 let mut stmt = self
708 .conn()
709 .prepare(
710 "SELECT payload FROM ainl_graph_nodes
711 WHERE node_type = 'procedural'
712 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
713 AND (
714 json_extract(payload, '$.node_type.retired') IS NULL
715 OR json_extract(payload, '$.node_type.retired') = 0
716 OR CAST(json_extract(payload, '$.node_type.retired') AS TEXT) = 'false'
717 )",
718 )
719 .map_err(|e| e.to_string())?;
720 let rows = stmt
721 .query_map(params![&self.agent_id], |row| row.get::<_, String>(0))
722 .map_err(|e| e.to_string())?;
723 load_nodes_from_payload_rows(rows)
724 }
725
726 pub fn successful_episodes(&self, limit: usize) -> Result<Vec<AinlMemoryNode>, String> {
727 let mut stmt = self
728 .conn()
729 .prepare(
730 "SELECT payload FROM ainl_graph_nodes
731 WHERE node_type = 'episode'
732 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
733 AND json_extract(payload, '$.node_type.outcome') = 'success'
734 ORDER BY timestamp DESC
735 LIMIT ?2",
736 )
737 .map_err(|e| e.to_string())?;
738 let rows = stmt
739 .query_map(params![&self.agent_id, limit as i64], |row| {
740 row.get::<_, String>(0)
741 })
742 .map_err(|e| e.to_string())?;
743 load_nodes_from_payload_rows(rows)
744 }
745
746 pub fn episodes_with_tool(
747 &self,
748 tool_name: &str,
749 limit: usize,
750 ) -> Result<Vec<AinlMemoryNode>, String> {
751 let mut stmt = self
752 .conn()
753 .prepare(
754 "SELECT payload FROM ainl_graph_nodes
755 WHERE node_type = 'episode'
756 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
757 AND (
758 EXISTS (
759 SELECT 1 FROM json_each(json_extract(payload, '$.node_type.tools_invoked')) e
760 WHERE e.value = ?2
761 )
762 OR EXISTS (
763 SELECT 1 FROM json_each(json_extract(payload, '$.node_type.tool_calls')) e
764 WHERE e.value = ?2
765 )
766 )
767 ORDER BY timestamp DESC
768 LIMIT ?3",
769 )
770 .map_err(|e| e.to_string())?;
771 let rows = stmt
772 .query_map(params![&self.agent_id, tool_name, limit as i64], |row| {
773 row.get::<_, String>(0)
774 })
775 .map_err(|e| e.to_string())?;
776 load_nodes_from_payload_rows(rows)
777 }
778
779 pub fn evolved_persona(&self) -> Result<Option<AinlMemoryNode>, String> {
780 let mut stmt = self
781 .conn()
782 .prepare(
783 "SELECT payload FROM ainl_graph_nodes
784 WHERE node_type = 'persona'
785 AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
786 AND json_extract(payload, '$.node_type.trait_name') = 'axis_evolution_snapshot'
787 ORDER BY timestamp DESC
788 LIMIT 1",
789 )
790 .map_err(|e| e.to_string())?;
791 let row = stmt
792 .query_row(params![&self.agent_id], |row| row.get::<_, String>(0))
793 .optional()
794 .map_err(|e| e.to_string())?;
795 match row {
796 Some(payload) => {
797 let node: AinlMemoryNode =
798 serde_json::from_str(&payload).map_err(|e| e.to_string())?;
799 Ok(Some(node))
800 }
801 None => Ok(None),
802 }
803 }
804
805 pub fn read_runtime_state(&self) -> Result<Option<RuntimeStateNode>, String> {
807 self.store.read_runtime_state(&self.agent_id)
808 }
809}
810
811#[cfg(test)]
812mod tests {
813 use super::*;
814 use crate::node::AinlMemoryNode;
815 use crate::store::SqliteGraphStore;
816
817 #[test]
818 fn test_recall_recent_with_tool_filter() {
819 let temp_dir = std::env::temp_dir();
820 let db_path = temp_dir.join("ainl_query_test_recall.db");
821 let _ = std::fs::remove_file(&db_path);
822
823 let store = SqliteGraphStore::open(&db_path).expect("Failed to open store");
824
825 let now = chrono::Utc::now().timestamp();
826
827 let node1 = AinlMemoryNode::new_episode(
828 uuid::Uuid::new_v4(),
829 now,
830 vec!["file_read".to_string()],
831 None,
832 None,
833 );
834
835 let node2 = AinlMemoryNode::new_episode(
836 uuid::Uuid::new_v4(),
837 now + 1,
838 vec!["agent_delegate".to_string()],
839 Some("agent-B".to_string()),
840 None,
841 );
842
843 store.write_node(&node1).expect("Write failed");
844 store.write_node(&node2).expect("Write failed");
845
846 let delegations =
847 recall_recent(&store, now - 100, 10, Some("agent_delegate")).expect("Query failed");
848
849 assert_eq!(delegations.len(), 1);
850 }
851
852 #[test]
853 fn test_find_high_confidence_facts() {
854 let temp_dir = std::env::temp_dir();
855 let db_path = temp_dir.join("ainl_query_test_facts.db");
856 let _ = std::fs::remove_file(&db_path);
857
858 let store = SqliteGraphStore::open(&db_path).expect("Failed to open store");
859
860 let turn_id = uuid::Uuid::new_v4();
861
862 let fact1 = AinlMemoryNode::new_fact("User prefers Rust".to_string(), 0.95, turn_id);
863 let fact2 = AinlMemoryNode::new_fact("User dislikes Python".to_string(), 0.45, turn_id);
864
865 store.write_node(&fact1).expect("Write failed");
866 store.write_node(&fact2).expect("Write failed");
867
868 let high_conf = find_high_confidence_facts(&store, 0.7).expect("Query failed");
869
870 assert_eq!(high_conf.len(), 1);
871 }
872
873 #[test]
874 fn test_query_active_patches() {
875 let path = std::env::temp_dir().join(format!(
876 "ainl_query_active_patch_{}.db",
877 uuid::Uuid::new_v4()
878 ));
879 let _ = std::fs::remove_file(&path);
880 let store = SqliteGraphStore::open(&path).expect("open");
881 let ag = "agent-active-patch";
882 let mut p1 = AinlMemoryNode::new_pattern("pat_one".into(), vec![1, 2]);
883 p1.agent_id = ag.into();
884 let mut p2 = AinlMemoryNode::new_pattern("pat_two".into(), vec![3, 4]);
885 p2.agent_id = ag.into();
886 store.write_node(&p1).expect("w1");
887 store.write_node(&p2).expect("w2");
888
889 let conn = store.conn();
890 let payload2: String = conn
891 .query_row(
892 "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
893 [p2.id.to_string()],
894 |row| row.get(0),
895 )
896 .unwrap();
897 let mut v: serde_json::Value = serde_json::from_str(&payload2).unwrap();
898 v["node_type"]["retired"] = serde_json::json!(true);
899 conn.execute(
900 "UPDATE ainl_graph_nodes SET payload = ?1 WHERE id = ?2",
901 rusqlite::params![v.to_string(), p2.id.to_string()],
902 )
903 .unwrap();
904
905 let active = store.query(ag).active_patches().expect("q");
906 assert_eq!(active.len(), 1);
907 assert_eq!(active[0].id, p1.id);
908 }
909
910 #[test]
911 fn test_recall_task_scoped_episodes_prefers_conversation_then_topic() {
912 let path =
913 std::env::temp_dir().join(format!("ainl_query_task_scope_{}.db", uuid::Uuid::new_v4()));
914 let _ = std::fs::remove_file(&path);
915 let store = SqliteGraphStore::open(&path).expect("open");
916 let now = chrono::Utc::now().timestamp();
917 let ag = "agent-task-scope";
918
919 let mut ep_conv =
920 AinlMemoryNode::new_episode(uuid::Uuid::new_v4(), now, vec![], None, None);
921 ep_conv.agent_id = ag.into();
922 if let AinlNodeType::Episode { ref mut episodic } = ep_conv.node_type {
923 episodic.conversation_id = "conv-1".into();
924 episodic.tags = vec!["topic:tax".into()];
925 }
926 let mut ep_topic =
927 AinlMemoryNode::new_episode(uuid::Uuid::new_v4(), now - 10, vec![], None, None);
928 ep_topic.agent_id = ag.into();
929 if let AinlNodeType::Episode { ref mut episodic } = ep_topic.node_type {
930 episodic.conversation_id = "conv-2".into();
931 episodic.tags = vec!["topic:tax".into()];
932 }
933 let mut ep_other =
934 AinlMemoryNode::new_episode(uuid::Uuid::new_v4(), now - 20, vec![], None, None);
935 ep_other.agent_id = ag.into();
936 if let AinlNodeType::Episode { ref mut episodic } = ep_other.node_type {
937 episodic.conversation_id = "conv-3".into();
938 episodic.tags = vec!["topic:other".into()];
939 }
940 store.write_node(&ep_conv).expect("w1");
941 store.write_node(&ep_topic).expect("w2");
942 store.write_node(&ep_other).expect("w3");
943
944 let rows =
945 recall_task_scoped_episodes(&store, ag, Some("conv-1"), &["topic:tax".to_string()], 2)
946 .expect("query");
947 assert_eq!(rows.len(), 2);
948 assert_eq!(rows[0].conversation_id, "conv-1");
949 assert!(rows[1].tags.iter().any(|t| t == "topic:tax"));
950 }
951}