Skip to main content

ainl_memory/
query.rs

1//! Graph traversal and querying utilities.
2//!
3//! Two layers:
4//!
5//! 1. **Free functions** (e.g. [`recall_recent`], [`walk_from`], [`find_patterns`]) — take [`GraphStore`]
6//!    plus parameters such as `agent_id` where filtering applies.
7//! 2. **[`GraphQuery`]** — returned by [`SqliteGraphStore::query`]; holds `agent_id` and offers SQL-backed
8//!    filters (`episodes`, `lineage`, `by_tag`, `subgraph_edges`, [`GraphQuery::read_runtime_state`], …).
9//!
10//! Serialized node JSON uses top-level `agent_id` and nested `node_type` fields (e.g. `$.node_type.outcome`);
11//! edge rows use SQLite columns `from_id`, `to_id`, `label` (see [`crate::snapshot::SnapshotEdge`] for the
12//! export/import naming `source_id` / `target_id` / `edge_type`).
13
14use crate::node::{
15    AinlMemoryNode, AinlNodeType, EpisodicNode, PersonaLayer, PersonaNode, ProceduralNode,
16    ProcedureType, RuntimeStateNode, SemanticNode, StrengthEvent,
17};
18use crate::snapshot::SnapshotEdge;
19use crate::store::{GraphStore, SqliteGraphStore};
20use chrono::{DateTime, Utc};
21use rusqlite::{params, OptionalExtension};
22use std::collections::{HashMap, HashSet, VecDeque};
23use uuid::Uuid;
24
25fn node_matches_agent(node: &AinlMemoryNode, agent_id: &str) -> bool {
26    node.agent_id.is_empty() || node.agent_id == agent_id
27}
28
29/// Walk the graph from a starting node, following edges with a specific label
30pub fn walk_from(
31    store: &dyn GraphStore,
32    start_id: Uuid,
33    edge_label: &str,
34    max_depth: usize,
35) -> Result<Vec<AinlMemoryNode>, String> {
36    let mut visited = std::collections::HashSet::new();
37    let mut result = Vec::new();
38    let mut current_level = vec![start_id];
39
40    for _ in 0..max_depth {
41        if current_level.is_empty() {
42            break;
43        }
44
45        let mut next_level = Vec::new();
46
47        for node_id in current_level {
48            if visited.contains(&node_id) {
49                continue;
50            }
51            visited.insert(node_id);
52
53            if let Some(node) = store.read_node(node_id)? {
54                result.push(node.clone());
55
56                for next_node in store.walk_edges(node_id, edge_label)? {
57                    if !visited.contains(&next_node.id) {
58                        next_level.push(next_node.id);
59                    }
60                }
61            }
62        }
63
64        current_level = next_level;
65    }
66
67    Ok(result)
68}
69
70/// Recall recent episodes, optionally filtered by tool usage
71pub fn recall_recent(
72    store: &dyn GraphStore,
73    since_timestamp: i64,
74    limit: usize,
75    tool_filter: Option<&str>,
76) -> Result<Vec<AinlMemoryNode>, String> {
77    let episodes = store.query_episodes_since(since_timestamp, limit)?;
78
79    if let Some(tool_name) = tool_filter {
80        Ok(episodes
81            .into_iter()
82            .filter(|node| match &node.node_type {
83                AinlNodeType::Episode { episodic } => {
84                    episodic.effective_tools().contains(&tool_name.to_string())
85                }
86                _ => false,
87            })
88            .collect())
89    } else {
90        Ok(episodes)
91    }
92}
93
94/// Find procedural patterns by name prefix
95pub fn find_patterns(
96    store: &dyn GraphStore,
97    name_prefix: &str,
98) -> Result<Vec<AinlMemoryNode>, String> {
99    let all_procedural = store.find_by_type("procedural")?;
100
101    Ok(all_procedural
102        .into_iter()
103        .filter(|node| match &node.node_type {
104            AinlNodeType::Procedural { procedural } => {
105                procedural.pattern_name.starts_with(name_prefix)
106            }
107            _ => false,
108        })
109        .collect())
110}
111
112/// Find semantic facts with confidence above a threshold
113pub fn find_high_confidence_facts(
114    store: &dyn GraphStore,
115    min_confidence: f32,
116) -> Result<Vec<AinlMemoryNode>, String> {
117    let all_semantic = store.find_by_type("semantic")?;
118
119    Ok(all_semantic
120        .into_iter()
121        .filter(|node| match &node.node_type {
122            AinlNodeType::Semantic { semantic } => semantic.confidence >= min_confidence,
123            _ => false,
124        })
125        .collect())
126}
127
128/// Find persona traits sorted by strength
129pub fn find_strong_traits(store: &dyn GraphStore) -> Result<Vec<AinlMemoryNode>, String> {
130    let mut all_persona = store.find_by_type("persona")?;
131
132    all_persona.sort_by(|a, b| {
133        let strength_a = match &a.node_type {
134            AinlNodeType::Persona { persona } => persona.strength,
135            _ => 0.0,
136        };
137        let strength_b = match &b.node_type {
138            AinlNodeType::Persona { persona } => persona.strength,
139            _ => 0.0,
140        };
141        strength_b
142            .partial_cmp(&strength_a)
143            .unwrap_or(std::cmp::Ordering::Equal)
144    });
145
146    Ok(all_persona)
147}
148
149// --- Semantic helpers ---
150
151pub fn recall_by_topic_cluster(
152    store: &dyn GraphStore,
153    agent_id: &str,
154    cluster: &str,
155) -> Result<Vec<SemanticNode>, String> {
156    let mut out = Vec::new();
157    for node in store.find_by_type("semantic")? {
158        if !node_matches_agent(&node, agent_id) {
159            continue;
160        }
161        if let AinlNodeType::Semantic { semantic } = &node.node_type {
162            if semantic.topic_cluster.as_deref() == Some(cluster) {
163                out.push(semantic.clone());
164            }
165        }
166    }
167    Ok(out)
168}
169
170pub fn recall_contradictions(
171    store: &dyn GraphStore,
172    node_id: Uuid,
173) -> Result<Vec<SemanticNode>, String> {
174    let Some(node) = store.read_node(node_id)? else {
175        return Ok(Vec::new());
176    };
177    let contradiction_ids: Vec<String> = match &node.node_type {
178        AinlNodeType::Semantic { semantic } => semantic.contradiction_ids.clone(),
179        _ => return Ok(Vec::new()),
180    };
181    let mut out = Vec::new();
182    for cid in contradiction_ids {
183        if let Ok(uuid) = Uuid::parse_str(&cid) {
184            if let Some(n) = store.read_node(uuid)? {
185                if let AinlNodeType::Semantic { semantic } = &n.node_type {
186                    out.push(semantic.clone());
187                }
188            }
189        }
190    }
191    Ok(out)
192}
193
194pub fn count_by_topic_cluster(
195    store: &dyn GraphStore,
196    agent_id: &str,
197) -> Result<HashMap<String, usize>, String> {
198    let mut counts: HashMap<String, usize> = HashMap::new();
199    for node in store.find_by_type("semantic")? {
200        if !node_matches_agent(&node, agent_id) {
201            continue;
202        }
203        if let AinlNodeType::Semantic { semantic } = &node.node_type {
204            if let Some(cluster) = semantic.topic_cluster.as_deref() {
205                if cluster.is_empty() {
206                    continue;
207                }
208                *counts.entry(cluster.to_string()).or_insert(0) += 1;
209            }
210        }
211    }
212    Ok(counts)
213}
214
215// --- Episodic helpers ---
216
217pub fn recall_flagged_episodes(
218    store: &dyn GraphStore,
219    agent_id: &str,
220    limit: usize,
221) -> Result<Vec<EpisodicNode>, String> {
222    let mut out: Vec<(i64, EpisodicNode)> = Vec::new();
223    for node in store.find_by_type("episode")? {
224        if !node_matches_agent(&node, agent_id) {
225            continue;
226        }
227        if let AinlNodeType::Episode { episodic } = &node.node_type {
228            if episodic.flagged {
229                out.push((episodic.timestamp, episodic.clone()));
230            }
231        }
232    }
233    out.sort_by(|a, b| b.0.cmp(&a.0));
234    out.truncate(limit);
235    Ok(out.into_iter().map(|(_, e)| e).collect())
236}
237
238pub fn recall_episodes_by_conversation(
239    store: &dyn GraphStore,
240    conversation_id: &str,
241) -> Result<Vec<EpisodicNode>, String> {
242    let mut out: Vec<(u32, EpisodicNode)> = Vec::new();
243    for node in store.find_by_type("episode")? {
244        if let AinlNodeType::Episode { episodic } = &node.node_type {
245            if episodic.conversation_id == conversation_id {
246                out.push((episodic.turn_index, episodic.clone()));
247            }
248        }
249    }
250    out.sort_by(|a, b| a.0.cmp(&b.0));
251    Ok(out.into_iter().map(|(_, e)| e).collect())
252}
253
254pub fn recall_episodes_with_signal(
255    store: &dyn GraphStore,
256    agent_id: &str,
257    signal_type: &str,
258) -> Result<Vec<EpisodicNode>, String> {
259    let mut out = Vec::new();
260    for node in store.find_by_type("episode")? {
261        if !node_matches_agent(&node, agent_id) {
262            continue;
263        }
264        if let AinlNodeType::Episode { episodic } = &node.node_type {
265            if episodic
266                .persona_signals_emitted
267                .iter()
268                .any(|s| s == signal_type)
269            {
270                out.push(episodic.clone());
271            }
272        }
273    }
274    Ok(out)
275}
276
277/// Task-scoped episodic recall:
278/// 1) same conversation id first,
279/// 2) then topic tag overlap fallback when sparse.
280pub fn recall_task_scoped_episodes(
281    store: &dyn GraphStore,
282    agent_id: &str,
283    conversation_id: Option<&str>,
284    topic_tags: &[String],
285    limit: usize,
286) -> Result<Vec<EpisodicNode>, String> {
287    let mut in_conversation: Vec<(i64, EpisodicNode)> = Vec::new();
288    let mut by_topic: Vec<(i64, EpisodicNode)> = Vec::new();
289    for node in store.find_by_type("episode")? {
290        if !node_matches_agent(&node, agent_id) {
291            continue;
292        }
293        let AinlNodeType::Episode { episodic } = &node.node_type else {
294            continue;
295        };
296        if let Some(cid) = conversation_id {
297            if !cid.is_empty() && episodic.conversation_id == cid {
298                in_conversation.push((episodic.timestamp, episodic.clone()));
299                continue;
300            }
301        }
302        if !topic_tags.is_empty()
303            && episodic
304                .tags
305                .iter()
306                .any(|t| topic_tags.iter().any(|q| q == t))
307        {
308            by_topic.push((episodic.timestamp, episodic.clone()));
309        }
310    }
311    in_conversation.sort_by(|a, b| b.0.cmp(&a.0));
312    by_topic.sort_by(|a, b| b.0.cmp(&a.0));
313    let mut out: Vec<EpisodicNode> = in_conversation.into_iter().map(|(_, e)| e).collect();
314    if out.len() < limit {
315        for (_, ep) in by_topic {
316            if out.len() >= limit {
317                break;
318            }
319            if !out.iter().any(|e| e.turn_id == ep.turn_id) {
320                out.push(ep);
321            }
322        }
323    }
324    out.truncate(limit);
325    Ok(out)
326}
327
328// --- Procedural helpers ---
329
330pub fn recall_by_procedure_type(
331    store: &dyn GraphStore,
332    agent_id: &str,
333    procedure_type: ProcedureType,
334) -> Result<Vec<ProceduralNode>, String> {
335    let mut out = Vec::new();
336    for node in store.find_by_type("procedural")? {
337        if !node_matches_agent(&node, agent_id) {
338            continue;
339        }
340        if let AinlNodeType::Procedural { procedural } = &node.node_type {
341            if procedural.procedure_type == procedure_type {
342                out.push(procedural.clone());
343            }
344        }
345    }
346    Ok(out)
347}
348
349pub fn recall_low_success_procedures(
350    store: &dyn GraphStore,
351    agent_id: &str,
352    threshold: f32,
353) -> Result<Vec<ProceduralNode>, String> {
354    let mut out = Vec::new();
355    for node in store.find_by_type("procedural")? {
356        if !node_matches_agent(&node, agent_id) {
357            continue;
358        }
359        if let AinlNodeType::Procedural { procedural } = &node.node_type {
360            let total = procedural
361                .success_count
362                .saturating_add(procedural.failure_count);
363            if total > 0 && procedural.success_rate < threshold {
364                out.push(procedural.clone());
365            }
366        }
367    }
368    Ok(out)
369}
370
371// --- Persona helpers ---
372
373pub fn recall_strength_history(
374    store: &dyn GraphStore,
375    node_id: Uuid,
376) -> Result<Vec<StrengthEvent>, String> {
377    let Some(node) = store.read_node(node_id)? else {
378        return Ok(Vec::new());
379    };
380    let mut events = match &node.node_type {
381        AinlNodeType::Persona { persona } => persona.evolution_log.clone(),
382        _ => return Ok(Vec::new()),
383    };
384    events.sort_by_key(|e| e.timestamp);
385    Ok(events)
386}
387
388pub fn recall_delta_by_relevance(
389    store: &dyn GraphStore,
390    agent_id: &str,
391    min_relevance: f32,
392) -> Result<Vec<PersonaNode>, String> {
393    let mut out = Vec::new();
394    for node in store.find_by_type("persona")? {
395        if !node_matches_agent(&node, agent_id) {
396            continue;
397        }
398        if let AinlNodeType::Persona { persona } = &node.node_type {
399            if persona.layer == PersonaLayer::Delta && persona.relevance_score >= min_relevance {
400                out.push(persona.clone());
401            }
402        }
403    }
404    Ok(out)
405}
406
407// --- GraphQuery (builder over SqliteGraphStore, v0.1.4+) ---
408
409/// Builder-style queries scoped to one `agent_id` (matches `json_extract(payload, '$.agent_id')`).
410pub struct GraphQuery<'a> {
411    store: &'a SqliteGraphStore,
412    agent_id: String,
413}
414
415impl SqliteGraphStore {
416    pub fn query<'a>(&'a self, agent_id: &str) -> GraphQuery<'a> {
417        GraphQuery {
418            store: self,
419            agent_id: agent_id.to_string(),
420        }
421    }
422}
423
424fn load_nodes_from_payload_rows(
425    rows: impl Iterator<Item = Result<String, rusqlite::Error>>,
426) -> Result<Vec<AinlMemoryNode>, String> {
427    let mut out = Vec::new();
428    for row in rows {
429        let payload = row.map_err(|e| e.to_string())?;
430        let node: AinlMemoryNode = serde_json::from_str(&payload).map_err(|e| e.to_string())?;
431        out.push(node);
432    }
433    Ok(out)
434}
435
436impl<'a> GraphQuery<'a> {
437    fn conn(&self) -> &rusqlite::Connection {
438        self.store.conn()
439    }
440
441    pub fn episodes(&self) -> Result<Vec<AinlMemoryNode>, String> {
442        let mut stmt = self
443            .conn()
444            .prepare(
445                "SELECT payload FROM ainl_graph_nodes
446                 WHERE node_type = 'episode'
447                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
448            )
449            .map_err(|e| e.to_string())?;
450        let rows = stmt
451            .query_map(params![&self.agent_id], |row| row.get::<_, String>(0))
452            .map_err(|e| e.to_string())?;
453        load_nodes_from_payload_rows(rows)
454    }
455
456    pub fn semantic_nodes(&self) -> Result<Vec<AinlMemoryNode>, String> {
457        let mut stmt = self
458            .conn()
459            .prepare(
460                "SELECT payload FROM ainl_graph_nodes
461                 WHERE node_type = 'semantic'
462                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
463            )
464            .map_err(|e| e.to_string())?;
465        let rows = stmt
466            .query_map(params![&self.agent_id], |row| row.get::<_, String>(0))
467            .map_err(|e| e.to_string())?;
468        load_nodes_from_payload_rows(rows)
469    }
470
471    pub fn procedural_nodes(&self) -> Result<Vec<AinlMemoryNode>, String> {
472        let mut stmt = self
473            .conn()
474            .prepare(
475                "SELECT payload FROM ainl_graph_nodes
476                 WHERE node_type = 'procedural'
477                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
478            )
479            .map_err(|e| e.to_string())?;
480        let rows = stmt
481            .query_map(params![&self.agent_id], |row| row.get::<_, String>(0))
482            .map_err(|e| e.to_string())?;
483        load_nodes_from_payload_rows(rows)
484    }
485
486    pub fn persona_nodes(&self) -> Result<Vec<AinlMemoryNode>, String> {
487        let mut stmt = self
488            .conn()
489            .prepare(
490                "SELECT payload FROM ainl_graph_nodes
491                 WHERE node_type = 'persona'
492                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1",
493            )
494            .map_err(|e| e.to_string())?;
495        let rows = stmt
496            .query_map(params![&self.agent_id], |row| row.get::<_, String>(0))
497            .map_err(|e| e.to_string())?;
498        load_nodes_from_payload_rows(rows)
499    }
500
501    pub fn recent_episodes(&self, limit: usize) -> Result<Vec<AinlMemoryNode>, String> {
502        let mut stmt = self
503            .conn()
504            .prepare(
505                "SELECT payload FROM ainl_graph_nodes
506                 WHERE node_type = 'episode'
507                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
508                 ORDER BY timestamp DESC
509                 LIMIT ?2",
510            )
511            .map_err(|e| e.to_string())?;
512        let rows = stmt
513            .query_map(params![&self.agent_id, limit as i64], |row| {
514                row.get::<_, String>(0)
515            })
516            .map_err(|e| e.to_string())?;
517        load_nodes_from_payload_rows(rows)
518    }
519
520    pub fn since(&self, ts: DateTime<Utc>, node_type: &str) -> Result<Vec<AinlMemoryNode>, String> {
521        let col = node_type.to_ascii_lowercase();
522        let since_ts = ts.timestamp();
523        let mut stmt = self
524            .conn()
525            .prepare(
526                "SELECT payload FROM ainl_graph_nodes
527                 WHERE node_type = ?1
528                   AND timestamp >= ?2
529                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?3
530                 ORDER BY timestamp ASC",
531            )
532            .map_err(|e| e.to_string())?;
533        let rows = stmt
534            .query_map(params![&col, since_ts, &self.agent_id], |row| {
535                row.get::<_, String>(0)
536            })
537            .map_err(|e| e.to_string())?;
538        load_nodes_from_payload_rows(rows)
539    }
540
541    /// All directed edges whose **both** endpoints are nodes for this `agent_id` (same rule as [`SqliteGraphStore::export_graph`]).
542    pub fn subgraph_edges(&self) -> Result<Vec<SnapshotEdge>, String> {
543        self.store.agent_subgraph_edges(&self.agent_id)
544    }
545
546    pub fn neighbors(&self, node_id: Uuid, edge_type: &str) -> Result<Vec<AinlMemoryNode>, String> {
547        let mut stmt = self
548            .conn()
549            .prepare(
550                "SELECT to_id FROM ainl_graph_edges
551                 WHERE from_id = ?1 AND label = ?2",
552            )
553            .map_err(|e| e.to_string())?;
554        let ids: Vec<String> = stmt
555            .query_map(params![node_id.to_string(), edge_type], |row| row.get(0))
556            .map_err(|e| e.to_string())?
557            .collect::<Result<Vec<_>, _>>()
558            .map_err(|e| e.to_string())?;
559        let mut out = Vec::new();
560        for sid in ids {
561            let id = Uuid::parse_str(&sid).map_err(|e| e.to_string())?;
562            if let Some(n) = self.store.read_node(id)? {
563                out.push(n);
564            }
565        }
566        Ok(out)
567    }
568
569    pub fn lineage(&self, node_id: Uuid) -> Result<Vec<AinlMemoryNode>, String> {
570        let mut visited: HashSet<Uuid> = HashSet::new();
571        let mut out = Vec::new();
572        let mut queue: VecDeque<(Uuid, u32)> = VecDeque::new();
573        visited.insert(node_id);
574        queue.push_back((node_id, 0));
575
576        while let Some((nid, depth)) = queue.pop_front() {
577            if depth >= 20 {
578                continue;
579            }
580            let mut stmt = self
581                .conn()
582                .prepare(
583                    "SELECT to_id FROM ainl_graph_edges
584                     WHERE from_id = ?1 AND label IN ('DERIVED_FROM', 'CAUSED_PATCH')",
585                )
586                .map_err(|e| e.to_string())?;
587            let targets: Vec<String> = stmt
588                .query_map(params![nid.to_string()], |row| row.get(0))
589                .map_err(|e| e.to_string())?
590                .collect::<Result<Vec<_>, _>>()
591                .map_err(|e| e.to_string())?;
592            for sid in targets {
593                let tid = Uuid::parse_str(&sid).map_err(|e| e.to_string())?;
594                if visited.insert(tid) {
595                    if let Some(n) = self.store.read_node(tid)? {
596                        out.push(n.clone());
597                        queue.push_back((tid, depth + 1));
598                    }
599                }
600            }
601        }
602
603        Ok(out)
604    }
605
606    pub fn by_tag(&self, tag: &str) -> Result<Vec<AinlMemoryNode>, String> {
607        let mut stmt = self
608            .conn()
609            .prepare(
610                "SELECT DISTINCT n.payload FROM ainl_graph_nodes n
611                 WHERE COALESCE(json_extract(n.payload, '$.agent_id'), '') = ?1
612                   AND (
613                     EXISTS (
614                       SELECT 1 FROM json_each(n.payload, '$.node_type.persona_signals_emitted') j
615                       WHERE j.value = ?2
616                     )
617                     OR EXISTS (
618                       SELECT 1 FROM json_each(n.payload, '$.node_type.tags') j
619                       WHERE j.value = ?2
620                     )
621                   )",
622            )
623            .map_err(|e| e.to_string())?;
624        let rows = stmt
625            .query_map(params![&self.agent_id, tag], |row| row.get::<_, String>(0))
626            .map_err(|e| e.to_string())?;
627        load_nodes_from_payload_rows(rows)
628    }
629
630    /// Latest semantic fact whose `tags` include `tag` (see `SemanticNode::tags`), by node `timestamp` desc.
631    pub fn latest_semantic_with_tag(&self, tag: &str) -> Result<Option<AinlMemoryNode>, String> {
632        let mut stmt = self
633            .conn()
634            .prepare(
635                "SELECT payload FROM ainl_graph_nodes
636                 WHERE node_type = 'semantic'
637                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
638                   AND EXISTS (
639                     SELECT 1 FROM json_each(json_extract(payload, '$.node_type.tags')) j
640                     WHERE j.value = ?2
641                   )
642                 ORDER BY timestamp DESC, id DESC
643                 LIMIT 1",
644            )
645            .map_err(|e| e.to_string())?;
646        let row = stmt
647            .query_row(params![&self.agent_id, tag], |row| row.get::<_, String>(0))
648            .optional()
649            .map_err(|e| e.to_string())?;
650        match row {
651            Some(payload) => {
652                let node: AinlMemoryNode =
653                    serde_json::from_str(&payload).map_err(|e| e.to_string())?;
654                Ok(Some(node))
655            }
656            None => Ok(None),
657        }
658    }
659
660    pub fn by_topic_cluster(&self, cluster: &str) -> Result<Vec<AinlMemoryNode>, String> {
661        let like = format!("%{cluster}%");
662        let mut stmt = self
663            .conn()
664            .prepare(
665                "SELECT payload FROM ainl_graph_nodes
666                 WHERE node_type = 'semantic'
667                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
668                   AND json_extract(payload, '$.node_type.topic_cluster') LIKE ?2 ESCAPE '\\'",
669            )
670            .map_err(|e| e.to_string())?;
671        let rows = stmt
672            .query_map(params![&self.agent_id, like], |row| row.get::<_, String>(0))
673            .map_err(|e| e.to_string())?;
674        load_nodes_from_payload_rows(rows)
675    }
676
677    pub fn pattern_by_name(&self, name: &str) -> Result<Option<AinlMemoryNode>, String> {
678        let mut stmt = self
679            .conn()
680            .prepare(
681                "SELECT payload FROM ainl_graph_nodes
682                 WHERE node_type = 'procedural'
683                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
684                   AND (
685                     json_extract(payload, '$.node_type.pattern_name') = ?2
686                     OR json_extract(payload, '$.node_type.label') = ?2
687                   )
688                 ORDER BY timestamp DESC
689                 LIMIT 1",
690            )
691            .map_err(|e| e.to_string())?;
692        let row = stmt
693            .query_row(params![&self.agent_id, name], |row| row.get::<_, String>(0))
694            .optional()
695            .map_err(|e| e.to_string())?;
696        match row {
697            Some(payload) => {
698                let node: AinlMemoryNode =
699                    serde_json::from_str(&payload).map_err(|e| e.to_string())?;
700                Ok(Some(node))
701            }
702            None => Ok(None),
703        }
704    }
705
706    pub fn active_patches(&self) -> Result<Vec<AinlMemoryNode>, String> {
707        let mut stmt = self
708            .conn()
709            .prepare(
710                "SELECT payload FROM ainl_graph_nodes
711                 WHERE node_type = 'procedural'
712                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
713                   AND (
714                     json_extract(payload, '$.node_type.retired') IS NULL
715                     OR json_extract(payload, '$.node_type.retired') = 0
716                     OR CAST(json_extract(payload, '$.node_type.retired') AS TEXT) = 'false'
717                   )",
718            )
719            .map_err(|e| e.to_string())?;
720        let rows = stmt
721            .query_map(params![&self.agent_id], |row| row.get::<_, String>(0))
722            .map_err(|e| e.to_string())?;
723        load_nodes_from_payload_rows(rows)
724    }
725
726    pub fn successful_episodes(&self, limit: usize) -> Result<Vec<AinlMemoryNode>, String> {
727        let mut stmt = self
728            .conn()
729            .prepare(
730                "SELECT payload FROM ainl_graph_nodes
731                 WHERE node_type = 'episode'
732                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
733                   AND json_extract(payload, '$.node_type.outcome') = 'success'
734                 ORDER BY timestamp DESC
735                 LIMIT ?2",
736            )
737            .map_err(|e| e.to_string())?;
738        let rows = stmt
739            .query_map(params![&self.agent_id, limit as i64], |row| {
740                row.get::<_, String>(0)
741            })
742            .map_err(|e| e.to_string())?;
743        load_nodes_from_payload_rows(rows)
744    }
745
746    pub fn episodes_with_tool(
747        &self,
748        tool_name: &str,
749        limit: usize,
750    ) -> Result<Vec<AinlMemoryNode>, String> {
751        let mut stmt = self
752            .conn()
753            .prepare(
754                "SELECT payload FROM ainl_graph_nodes
755                 WHERE node_type = 'episode'
756                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
757                   AND (
758                     EXISTS (
759                       SELECT 1 FROM json_each(json_extract(payload, '$.node_type.tools_invoked')) e
760                       WHERE e.value = ?2
761                     )
762                     OR EXISTS (
763                       SELECT 1 FROM json_each(json_extract(payload, '$.node_type.tool_calls')) e
764                       WHERE e.value = ?2
765                     )
766                   )
767                 ORDER BY timestamp DESC
768                 LIMIT ?3",
769            )
770            .map_err(|e| e.to_string())?;
771        let rows = stmt
772            .query_map(params![&self.agent_id, tool_name, limit as i64], |row| {
773                row.get::<_, String>(0)
774            })
775            .map_err(|e| e.to_string())?;
776        load_nodes_from_payload_rows(rows)
777    }
778
779    pub fn evolved_persona(&self) -> Result<Option<AinlMemoryNode>, String> {
780        let mut stmt = self
781            .conn()
782            .prepare(
783                "SELECT payload FROM ainl_graph_nodes
784                 WHERE node_type = 'persona'
785                   AND COALESCE(json_extract(payload, '$.agent_id'), '') = ?1
786                   AND json_extract(payload, '$.node_type.trait_name') = 'axis_evolution_snapshot'
787                 ORDER BY timestamp DESC
788                 LIMIT 1",
789            )
790            .map_err(|e| e.to_string())?;
791        let row = stmt
792            .query_row(params![&self.agent_id], |row| row.get::<_, String>(0))
793            .optional()
794            .map_err(|e| e.to_string())?;
795        match row {
796            Some(payload) => {
797                let node: AinlMemoryNode =
798                    serde_json::from_str(&payload).map_err(|e| e.to_string())?;
799                Ok(Some(node))
800            }
801            None => Ok(None),
802        }
803    }
804
805    /// Latest persisted [`RuntimeStateNode`] for this query's `agent_id`.
806    pub fn read_runtime_state(&self) -> Result<Option<RuntimeStateNode>, String> {
807        self.store.read_runtime_state(&self.agent_id)
808    }
809}
810
811#[cfg(test)]
812mod tests {
813    use super::*;
814    use crate::node::AinlMemoryNode;
815    use crate::store::SqliteGraphStore;
816
817    #[test]
818    fn test_recall_recent_with_tool_filter() {
819        let temp_dir = std::env::temp_dir();
820        let db_path = temp_dir.join("ainl_query_test_recall.db");
821        let _ = std::fs::remove_file(&db_path);
822
823        let store = SqliteGraphStore::open(&db_path).expect("Failed to open store");
824
825        let now = chrono::Utc::now().timestamp();
826
827        let node1 = AinlMemoryNode::new_episode(
828            uuid::Uuid::new_v4(),
829            now,
830            vec!["file_read".to_string()],
831            None,
832            None,
833        );
834
835        let node2 = AinlMemoryNode::new_episode(
836            uuid::Uuid::new_v4(),
837            now + 1,
838            vec!["agent_delegate".to_string()],
839            Some("agent-B".to_string()),
840            None,
841        );
842
843        store.write_node(&node1).expect("Write failed");
844        store.write_node(&node2).expect("Write failed");
845
846        let delegations =
847            recall_recent(&store, now - 100, 10, Some("agent_delegate")).expect("Query failed");
848
849        assert_eq!(delegations.len(), 1);
850    }
851
852    #[test]
853    fn test_find_high_confidence_facts() {
854        let temp_dir = std::env::temp_dir();
855        let db_path = temp_dir.join("ainl_query_test_facts.db");
856        let _ = std::fs::remove_file(&db_path);
857
858        let store = SqliteGraphStore::open(&db_path).expect("Failed to open store");
859
860        let turn_id = uuid::Uuid::new_v4();
861
862        let fact1 = AinlMemoryNode::new_fact("User prefers Rust".to_string(), 0.95, turn_id);
863        let fact2 = AinlMemoryNode::new_fact("User dislikes Python".to_string(), 0.45, turn_id);
864
865        store.write_node(&fact1).expect("Write failed");
866        store.write_node(&fact2).expect("Write failed");
867
868        let high_conf = find_high_confidence_facts(&store, 0.7).expect("Query failed");
869
870        assert_eq!(high_conf.len(), 1);
871    }
872
873    #[test]
874    fn test_query_active_patches() {
875        let path = std::env::temp_dir().join(format!(
876            "ainl_query_active_patch_{}.db",
877            uuid::Uuid::new_v4()
878        ));
879        let _ = std::fs::remove_file(&path);
880        let store = SqliteGraphStore::open(&path).expect("open");
881        let ag = "agent-active-patch";
882        let mut p1 = AinlMemoryNode::new_pattern("pat_one".into(), vec![1, 2]);
883        p1.agent_id = ag.into();
884        let mut p2 = AinlMemoryNode::new_pattern("pat_two".into(), vec![3, 4]);
885        p2.agent_id = ag.into();
886        store.write_node(&p1).expect("w1");
887        store.write_node(&p2).expect("w2");
888
889        let conn = store.conn();
890        let payload2: String = conn
891            .query_row(
892                "SELECT payload FROM ainl_graph_nodes WHERE id = ?1",
893                [p2.id.to_string()],
894                |row| row.get(0),
895            )
896            .unwrap();
897        let mut v: serde_json::Value = serde_json::from_str(&payload2).unwrap();
898        v["node_type"]["retired"] = serde_json::json!(true);
899        conn.execute(
900            "UPDATE ainl_graph_nodes SET payload = ?1 WHERE id = ?2",
901            rusqlite::params![v.to_string(), p2.id.to_string()],
902        )
903        .unwrap();
904
905        let active = store.query(ag).active_patches().expect("q");
906        assert_eq!(active.len(), 1);
907        assert_eq!(active[0].id, p1.id);
908    }
909
910    #[test]
911    fn test_recall_task_scoped_episodes_prefers_conversation_then_topic() {
912        let path =
913            std::env::temp_dir().join(format!("ainl_query_task_scope_{}.db", uuid::Uuid::new_v4()));
914        let _ = std::fs::remove_file(&path);
915        let store = SqliteGraphStore::open(&path).expect("open");
916        let now = chrono::Utc::now().timestamp();
917        let ag = "agent-task-scope";
918
919        let mut ep_conv =
920            AinlMemoryNode::new_episode(uuid::Uuid::new_v4(), now, vec![], None, None);
921        ep_conv.agent_id = ag.into();
922        if let AinlNodeType::Episode { ref mut episodic } = ep_conv.node_type {
923            episodic.conversation_id = "conv-1".into();
924            episodic.tags = vec!["topic:tax".into()];
925        }
926        let mut ep_topic =
927            AinlMemoryNode::new_episode(uuid::Uuid::new_v4(), now - 10, vec![], None, None);
928        ep_topic.agent_id = ag.into();
929        if let AinlNodeType::Episode { ref mut episodic } = ep_topic.node_type {
930            episodic.conversation_id = "conv-2".into();
931            episodic.tags = vec!["topic:tax".into()];
932        }
933        let mut ep_other =
934            AinlMemoryNode::new_episode(uuid::Uuid::new_v4(), now - 20, vec![], None, None);
935        ep_other.agent_id = ag.into();
936        if let AinlNodeType::Episode { ref mut episodic } = ep_other.node_type {
937            episodic.conversation_id = "conv-3".into();
938            episodic.tags = vec!["topic:other".into()];
939        }
940        store.write_node(&ep_conv).expect("w1");
941        store.write_node(&ep_topic).expect("w2");
942        store.write_node(&ep_other).expect("w3");
943
944        let rows =
945            recall_task_scoped_episodes(&store, ag, Some("conv-1"), &["topic:tax".to_string()], 2)
946                .expect("query");
947        assert_eq!(rows.len(), 2);
948        assert_eq!(rows[0].conversation_id, "conv-1");
949        assert!(rows[1].tags.iter().any(|t| t == "topic:tax"));
950    }
951}