Skip to main content

ainl_graph_extractor/
extractor.rs

1//! One-shot extraction pass wired to [`ainl_persona::EvolutionEngine`].
2//!
3//! Instrumentality from episode tools is emitted by [`ainl_persona::GraphExtractor`] when the
4//! episode is processable; `extract_pass` skips its redundant `tool_affinity` leg in that case
5//! (see [`crate::persona_signals::extract_pass`]).
6
7use crate::persona_signals::{
8    extract_pass_collect, flush_episode_pattern_tags, ExtractPassCollected,
9    PersonaSignalExtractorState,
10};
11use crate::recurrence::update_semantic_recurrence;
12use ainl_memory::{GraphStore, SqliteGraphStore};
13use ainl_persona::{EvolutionEngine, PersonaSnapshot, RawSignal};
14use chrono::{DateTime, Utc};
15
16fn store_has_any_persona_for_agent(
17    store: &SqliteGraphStore,
18    agent_id: &str,
19) -> Result<bool, String> {
20    for n in store.find_by_type("persona")? {
21        if n.agent_id == agent_id {
22            return Ok(true);
23        }
24    }
25    Ok(false)
26}
27
28fn merge_err_slot(slot: &mut Option<String>, e: String) {
29    match slot {
30        None => *slot = Some(e),
31        Some(prev) => {
32            prev.push_str("; ");
33            prev.push_str(&e);
34        }
35    }
36}
37
38pub struct GraphExtractorTask {
39    pub agent_id: String,
40    pub evolution_engine: EvolutionEngine,
41    pub signal_state: PersonaSignalExtractorState,
42    /// Test-only: force [`ExtractionReport::extract_error`] without running the extract pipeline.
43    #[doc(hidden)]
44    pub test_inject_extract_error: Option<String>,
45    /// Test-only: force [`ExtractionReport::pattern_error`] after a successful collect phase.
46    #[doc(hidden)]
47    pub test_inject_pattern_error: Option<String>,
48    /// Test-only: force [`ExtractionReport::persona_error`] instead of a real persona write outcome.
49    #[doc(hidden)]
50    pub test_inject_persona_error: Option<String>,
51}
52
53/// Result of [`GraphExtractorTask::run_pass`]. Errors are carried per phase; the pass does not
54/// return `Result` so callers can record partial progress and continue.
55#[derive(Debug, Clone)]
56pub struct ExtractionReport {
57    pub agent_id: String,
58    /// Signals merged from graph extractors + heuristic pass (before ingest).
59    pub merged_signals: Vec<RawSignal>,
60    /// Semantic recurrence rows updated this pass (`None` = recurrence phase did not complete).
61    pub facts_written: Option<u32>,
62    /// Error during recurrence update, graph signal read, or heuristic collect (before pattern flush).
63    pub extract_error: Option<String>,
64    /// Error flushing episode tag pattern writes.
65    pub pattern_error: Option<String>,
66    /// Error persisting the evolution persona row.
67    pub persona_error: Option<String>,
68    pub semantic_nodes_updated: usize,
69    pub signals_extracted: usize,
70    pub signals_applied: usize,
71    pub persona_snapshot: PersonaSnapshot,
72    pub timestamp: DateTime<Utc>,
73}
74
75impl ExtractionReport {
76    pub fn has_errors(&self) -> bool {
77        self.extract_error.is_some() || self.pattern_error.is_some() || self.persona_error.is_some()
78    }
79}
80
81impl GraphExtractorTask {
82    pub fn new(agent_id: &str) -> Self {
83        Self {
84            agent_id: agent_id.to_string(),
85            evolution_engine: EvolutionEngine::new(agent_id),
86            signal_state: PersonaSignalExtractorState::new(),
87            test_inject_extract_error: None,
88            test_inject_pattern_error: None,
89            test_inject_persona_error: None,
90        }
91    }
92
93    /// Semantic recurrence updates, graph + heuristic signals, ingest, optional evolution write.
94    ///
95    /// Per-phase failures are recorded on [`ExtractionReport`]. Phases keep running so callers
96    /// can observe independent `extract_error`, `pattern_error`, and `persona_error` slots.
97    pub fn run_pass(&mut self, store: &SqliteGraphStore) -> ExtractionReport {
98        let agent_id = self.agent_id.clone();
99        let ts = Utc::now();
100        let mut report = ExtractionReport {
101            agent_id: agent_id.clone(),
102            merged_signals: Vec::new(),
103            facts_written: None,
104            extract_error: None,
105            pattern_error: None,
106            persona_error: None,
107            semantic_nodes_updated: 0,
108            signals_extracted: 0,
109            signals_applied: 0,
110            persona_snapshot: self.evolution_engine.snapshot(),
111            timestamp: ts,
112        };
113
114        let semantic_nodes_updated = match update_semantic_recurrence(store, &agent_id) {
115            Ok(n) => n,
116            Err(e) => {
117                merge_err_slot(&mut report.pattern_error, e);
118                0
119            }
120        };
121        report.semantic_nodes_updated = semantic_nodes_updated;
122        if report.pattern_error.is_none() {
123            report.facts_written = Some(semantic_nodes_updated as u32);
124        }
125
126        let mut graph_signals = match self.evolution_engine.extract_signals(store) {
127            Ok(s) => s,
128            Err(e) => {
129                merge_err_slot(&mut report.extract_error, e);
130                Vec::new()
131            }
132        };
133
134        let collected = match extract_pass_collect(store, &agent_id, &mut self.signal_state) {
135            Ok(c) => c,
136            Err(e) => {
137                merge_err_slot(&mut report.extract_error, e);
138                ExtractPassCollected::default()
139            }
140        };
141        graph_signals.extend(collected.signals);
142
143        if let Some(e) = self.test_inject_extract_error.take() {
144            merge_err_slot(&mut report.extract_error, e);
145        }
146
147        if let Some(e) = self.test_inject_pattern_error.take() {
148            merge_err_slot(&mut report.pattern_error, e);
149        } else if let Err(e) = flush_episode_pattern_tags(store, &collected.pending_tags) {
150            merge_err_slot(&mut report.pattern_error, e);
151        }
152
153        let signals_extracted = graph_signals.len();
154        report.signals_extracted = signals_extracted;
155        report.merged_signals = graph_signals.clone();
156
157        let signals_applied = self.evolution_engine.ingest_signals(graph_signals);
158        report.signals_applied = signals_applied;
159
160        let persona_snapshot = self.evolution_engine.snapshot();
161        report.persona_snapshot = persona_snapshot.clone();
162
163        let had_prior_persona = match store_has_any_persona_for_agent(store, &agent_id) {
164            Ok(b) => b,
165            Err(e) => {
166                merge_err_slot(&mut report.extract_error, format!("persona_row_probe: {e}"));
167                false
168            }
169        };
170        let should_persist_persona =
171            signals_extracted > 0 || semantic_nodes_updated > 0 || had_prior_persona;
172
173        if should_persist_persona {
174            if let Some(e) = self.test_inject_persona_error.take() {
175                merge_err_slot(&mut report.persona_error, e);
176            } else if let Err(e) = self
177                .evolution_engine
178                .write_persona_node(store, &persona_snapshot)
179            {
180                merge_err_slot(&mut report.persona_error, e);
181            }
182        }
183
184        report
185    }
186}