ainl_graph_extractor/
extractor.rs1use crate::persona_signals::{
8 extract_pass_collect, flush_episode_pattern_tags, ExtractPassCollected,
9 PersonaSignalExtractorState,
10};
11use crate::recurrence::update_semantic_recurrence;
12use ainl_memory::{GraphStore, SqliteGraphStore};
13use ainl_persona::{EvolutionEngine, PersonaSnapshot, RawSignal};
14use chrono::{DateTime, Utc};
15
16fn store_has_any_persona_for_agent(
17 store: &SqliteGraphStore,
18 agent_id: &str,
19) -> Result<bool, String> {
20 for n in store.find_by_type("persona")? {
21 if n.agent_id == agent_id {
22 return Ok(true);
23 }
24 }
25 Ok(false)
26}
27
28fn merge_err_slot(slot: &mut Option<String>, e: String) {
29 match slot {
30 None => *slot = Some(e),
31 Some(prev) => {
32 prev.push_str("; ");
33 prev.push_str(&e);
34 }
35 }
36}
37
38pub struct GraphExtractorTask {
39 pub agent_id: String,
40 pub evolution_engine: EvolutionEngine,
41 pub signal_state: PersonaSignalExtractorState,
42 #[doc(hidden)]
44 pub test_inject_extract_error: Option<String>,
45 #[doc(hidden)]
47 pub test_inject_pattern_error: Option<String>,
48 #[doc(hidden)]
50 pub test_inject_persona_error: Option<String>,
51}
52
53#[derive(Debug, Clone)]
56pub struct ExtractionReport {
57 pub agent_id: String,
58 pub merged_signals: Vec<RawSignal>,
60 pub facts_written: Option<u32>,
62 pub extract_error: Option<String>,
64 pub pattern_error: Option<String>,
66 pub persona_error: Option<String>,
68 pub semantic_nodes_updated: usize,
69 pub signals_extracted: usize,
70 pub signals_applied: usize,
71 pub persona_snapshot: PersonaSnapshot,
72 pub timestamp: DateTime<Utc>,
73}
74
75impl ExtractionReport {
76 pub fn has_errors(&self) -> bool {
77 self.extract_error.is_some() || self.pattern_error.is_some() || self.persona_error.is_some()
78 }
79}
80
81impl GraphExtractorTask {
82 pub fn new(agent_id: &str) -> Self {
83 Self {
84 agent_id: agent_id.to_string(),
85 evolution_engine: EvolutionEngine::new(agent_id),
86 signal_state: PersonaSignalExtractorState::new(),
87 test_inject_extract_error: None,
88 test_inject_pattern_error: None,
89 test_inject_persona_error: None,
90 }
91 }
92
93 pub fn run_pass(&mut self, store: &SqliteGraphStore) -> ExtractionReport {
98 let agent_id = self.agent_id.clone();
99 let ts = Utc::now();
100 let mut report = ExtractionReport {
101 agent_id: agent_id.clone(),
102 merged_signals: Vec::new(),
103 facts_written: None,
104 extract_error: None,
105 pattern_error: None,
106 persona_error: None,
107 semantic_nodes_updated: 0,
108 signals_extracted: 0,
109 signals_applied: 0,
110 persona_snapshot: self.evolution_engine.snapshot(),
111 timestamp: ts,
112 };
113
114 let semantic_nodes_updated = match update_semantic_recurrence(store, &agent_id) {
115 Ok(n) => n,
116 Err(e) => {
117 merge_err_slot(&mut report.pattern_error, e);
118 0
119 }
120 };
121 report.semantic_nodes_updated = semantic_nodes_updated;
122 if report.pattern_error.is_none() {
123 report.facts_written = Some(semantic_nodes_updated as u32);
124 }
125
126 let mut graph_signals = match self.evolution_engine.extract_signals(store) {
127 Ok(s) => s,
128 Err(e) => {
129 merge_err_slot(&mut report.extract_error, e);
130 Vec::new()
131 }
132 };
133
134 let collected = match extract_pass_collect(store, &agent_id, &mut self.signal_state) {
135 Ok(c) => c,
136 Err(e) => {
137 merge_err_slot(&mut report.extract_error, e);
138 ExtractPassCollected::default()
139 }
140 };
141 graph_signals.extend(collected.signals);
142
143 if let Some(e) = self.test_inject_extract_error.take() {
144 merge_err_slot(&mut report.extract_error, e);
145 }
146
147 if let Some(e) = self.test_inject_pattern_error.take() {
148 merge_err_slot(&mut report.pattern_error, e);
149 } else if let Err(e) = flush_episode_pattern_tags(store, &collected.pending_tags) {
150 merge_err_slot(&mut report.pattern_error, e);
151 }
152
153 let signals_extracted = graph_signals.len();
154 report.signals_extracted = signals_extracted;
155 report.merged_signals = graph_signals.clone();
156
157 let signals_applied = self.evolution_engine.ingest_signals(graph_signals);
158 report.signals_applied = signals_applied;
159
160 let persona_snapshot = self.evolution_engine.snapshot();
161 report.persona_snapshot = persona_snapshot.clone();
162
163 let had_prior_persona = match store_has_any_persona_for_agent(store, &agent_id) {
164 Ok(b) => b,
165 Err(e) => {
166 merge_err_slot(&mut report.extract_error, format!("persona_row_probe: {e}"));
167 false
168 }
169 };
170 let should_persist_persona =
171 signals_extracted > 0 || semantic_nodes_updated > 0 || had_prior_persona;
172
173 if should_persist_persona {
174 if let Some(e) = self.test_inject_persona_error.take() {
175 merge_err_slot(&mut report.persona_error, e);
176 } else if let Err(e) = self
177 .evolution_engine
178 .write_persona_node(store, &persona_snapshot)
179 {
180 merge_err_slot(&mut report.persona_error, e);
181 }
182 }
183
184 report
185 }
186}