Skip to main content

ainl_runtime/
runtime.rs

1//! Unified-graph orchestration runtime (v0.2): load, compile context, patch dispatch, record, emit, extract.
2
3use std::collections::{BTreeSet, HashMap, HashSet};
4use std::time::Instant;
5
6use ainl_graph_extractor::GraphExtractorTask;
7use ainl_memory::{
8    AinlMemoryNode, AinlNodeType, GraphStore, GraphValidationReport, PersonaNode, ProceduralNode,
9    RuntimeStateNode, SqliteGraphStore,
10};
11use ainl_persona::axes::default_axis_map;
12use ainl_persona::{EvolutionEngine, PersonaAxis, PersonaSnapshot, RawSignal, INGEST_SCORE_EPSILON};
13use ainl_semantic_tagger::infer_topic_tags;
14use ainl_semantic_tagger::tag_tool_names;
15use ainl_semantic_tagger::TagNamespace;
16use uuid::Uuid;
17
18use crate::adapters::AdapterRegistry;
19use crate::engine::{
20    AinlGraphArtifact, MemoryContext, PatchDispatchResult, PatchSkipReason, TurnInput, TurnOutcome,
21    TurnOutput, EMIT_TO_EDGE,
22};
23use crate::hooks::{NoOpHooks, TurnHooks};
24use crate::RuntimeConfig;
25
26/// Orchestrates ainl-memory, persona snapshot state, and graph extraction for one agent.
27///
28/// ## Evolution writes vs ArmaraOS / openfang-runtime
29///
30/// In production ArmaraOS, **openfang-runtime**’s `GraphMemoryWriter::run_persona_evolution_pass`
31/// is the active writer of the evolution persona row ([`crate::EVOLUTION_TRAIT_NAME`]) to each
32/// agent’s `ainl_memory.db`. This struct holds its own [`GraphExtractorTask`] and
33/// [`EvolutionEngine`]. Calling [`Self::persist_evolution_snapshot`] or
34/// [`Self::evolve_persona_from_graph_signals`] **concurrently** with that pass on the **same**
35/// SQLite store is undefined (competing last-writer wins on the same persona node).
36///
37/// Prefer [`Self::with_evolution_writes_enabled(false)`] when a host embeds `AinlRuntime` alongside
38/// openfang while openfang remains the sole evolution writer. [`Self::evolution_engine_mut`] can
39/// still mutate in-memory axis state; calling [`EvolutionEngine::write_persona_node`] yourself
40/// bypasses this guard and must be avoided in that configuration.
41pub struct AinlRuntime {
42    config: RuntimeConfig,
43    memory: ainl_memory::GraphMemory,
44    extractor: GraphExtractorTask,
45    turn_count: u32,
46    last_extraction_turn: u32,
47    delegation_depth: u32,
48    hooks: Box<dyn TurnHooks>,
49    /// When `false`, [`Self::persist_evolution_snapshot`] and [`Self::evolve_persona_from_graph_signals`]
50    /// return [`Err`] immediately so this runtime does not compete with another evolution writer
51    /// (e.g. openfang’s post-turn pass) on the same DB. Default: `true`.
52    evolution_writes_enabled: bool,
53    persona_cache: Option<String>,
54    /// Test hook: when set, the next scheduled extraction pass is treated as failed (`PartialSuccess`).
55    #[doc(hidden)]
56    test_force_extraction_failure: bool,
57    adapter_registry: AdapterRegistry,
58}
59
60impl AinlRuntime {
61    pub fn new(config: RuntimeConfig, store: SqliteGraphStore) -> Self {
62        let agent_id = config.agent_id.clone();
63        let memory = ainl_memory::GraphMemory::from_sqlite_store(store);
64        let (init_turn_count, init_persona_cache, init_last_extraction_turn) = if agent_id.is_empty()
65        {
66            (0, None, 0)
67        } else {
68            match memory.sqlite_store().load_runtime_state(&agent_id) {
69                Ok(Some(state)) => {
70                    tracing::info!(
71                        agent_id = %agent_id,
72                        turn_count = state.turn_count,
73                        "restored runtime state"
74                    );
75                    (
76                        state.turn_count,
77                        state.last_persona_prompt,
78                        state.last_extraction_turn,
79                    )
80                }
81                Ok(None) => (0, None, 0),
82                Err(e) => {
83                    tracing::warn!(error = %e, "failed to load runtime state — starting fresh");
84                    (0, None, 0)
85                }
86            }
87        };
88        Self {
89            extractor: GraphExtractorTask::new(&agent_id),
90            memory,
91            config,
92            turn_count: init_turn_count,
93            last_extraction_turn: init_last_extraction_turn,
94            delegation_depth: 0,
95            hooks: Box::new(NoOpHooks),
96            evolution_writes_enabled: true,
97            persona_cache: init_persona_cache,
98            test_force_extraction_failure: false,
99            adapter_registry: AdapterRegistry::new(),
100        }
101    }
102
103    /// Register a [`crate::PatchAdapter`] keyed by [`PatchAdapter::name`] (e.g. procedural patch label).
104    pub fn register_adapter(&mut self, adapter: impl crate::PatchAdapter + 'static) {
105        self.adapter_registry.register(adapter);
106    }
107
108    /// Names of currently registered patch adapters.
109    pub fn registered_adapters(&self) -> Vec<&str> {
110        self.adapter_registry.registered_names()
111    }
112
113    #[doc(hidden)]
114    pub fn test_turn_count(&self) -> u32 {
115        self.turn_count
116    }
117
118    #[doc(hidden)]
119    pub fn test_delegation_depth(&self) -> u32 {
120        self.delegation_depth
121    }
122
123    #[doc(hidden)]
124    pub fn test_set_delegation_depth(&mut self, depth: u32) {
125        self.delegation_depth = depth;
126    }
127
128    #[doc(hidden)]
129    pub fn test_set_force_extraction_failure(&mut self, fail: bool) {
130        self.test_force_extraction_failure = fail;
131    }
132
133    pub fn with_hooks(mut self, hooks: impl TurnHooks + 'static) -> Self {
134        self.hooks = Box::new(hooks);
135        self
136    }
137
138    /// Set whether [`Self::persist_evolution_snapshot`] and [`Self::evolve_persona_from_graph_signals`]
139    /// may write the evolution persona row. When `false`, those methods return [`Err`]. Chaining
140    /// after [`Self::new`] is the supported way to disable writes for hosts that delegate evolution
141    /// persistence elsewhere (see struct-level docs).
142    pub fn with_evolution_writes_enabled(mut self, enabled: bool) -> Self {
143        self.evolution_writes_enabled = enabled;
144        self
145    }
146
147    fn require_evolution_writes_enabled(&self) -> Result<(), String> {
148        if self.evolution_writes_enabled {
149            Ok(())
150        } else {
151            Err(
152                "ainl_runtime: evolution_writes_enabled is false — persist_evolution_snapshot and \
153                 evolve_persona_from_graph_signals are disabled so this runtime does not compete \
154                 with openfang-runtime GraphMemoryWriter::run_persona_evolution_pass on the same \
155                 ainl_memory.db"
156                    .to_string(),
157            )
158        }
159    }
160
161    /// Borrow the backing SQLite store (same connection as graph memory).
162    pub fn sqlite_store(&self) -> &SqliteGraphStore {
163        self.memory.sqlite_store()
164    }
165
166    /// Borrow the persona [`EvolutionEngine`] for this runtime’s agent.
167    ///
168    /// This is the **same** `EvolutionEngine` instance held by [`GraphExtractorTask::evolution_engine`].
169    /// Scheduled [`GraphExtractorTask::run_pass`] continues to feed graph + pattern signals into it;
170    /// hosts may also call [`EvolutionEngine::ingest_signals`], [`EvolutionEngine::correction_tick`],
171    /// [`EvolutionEngine::extract_signals`], or [`EvolutionEngine::evolve`] directly, then
172    /// [`Self::persist_evolution_snapshot`] to write the [`PersonaSnapshot`] row ([`crate::EVOLUTION_TRAIT_NAME`]).
173    pub fn evolution_engine(&self) -> &EvolutionEngine {
174        &self.extractor.evolution_engine
175    }
176
177    /// Mutable access to the persona [`EvolutionEngine`] (see [`Self::evolution_engine`]).
178    ///
179    /// Direct calls to [`EvolutionEngine::write_persona_node`] bypass [`Self::evolution_writes_enabled`].
180    pub fn evolution_engine_mut(&mut self) -> &mut EvolutionEngine {
181        &mut self.extractor.evolution_engine
182    }
183
184    /// Ingest explicit [`RawSignal`]s without reading the graph (wrapper for [`EvolutionEngine::ingest_signals`]).
185    pub fn apply_evolution_signals(&mut self, signals: Vec<RawSignal>) -> usize {
186        self.extractor.evolution_engine.ingest_signals(signals)
187    }
188
189    /// Apply a host correction nudge on one axis ([`EvolutionEngine::correction_tick`]).
190    pub fn evolution_correction_tick(&mut self, axis: PersonaAxis, correction: f32) {
191        self.extractor.evolution_engine.correction_tick(axis, correction);
192    }
193
194    /// Snapshot current axis EMA state and persist the evolution persona bundle to the store.
195    ///
196    /// Returns [`Err`] when [`Self::evolution_writes_enabled`] is `false` (see [`Self::with_evolution_writes_enabled`]).
197    pub fn persist_evolution_snapshot(&mut self) -> Result<PersonaSnapshot, String> {
198        self.require_evolution_writes_enabled()?;
199        let store = self.memory.sqlite_store();
200        let snap = self.extractor.evolution_engine.snapshot();
201        self.extractor
202            .evolution_engine
203            .write_persona_node(store, &snap)?;
204        Ok(snap)
205    }
206
207    /// Graph-backed evolution only: extract signals from the store, ingest, write ([`EvolutionEngine::evolve`]).
208    ///
209    /// This does **not** run semantic `recurrence_count` bumps or the extractor’s `extract_pass`
210    /// heuristics — use [`GraphExtractorTask::run_pass`] for the full scheduled pipeline.
211    ///
212    /// Returns [`Err`] when [`Self::evolution_writes_enabled`] is `false` (see [`Self::with_evolution_writes_enabled`]).
213    pub fn evolve_persona_from_graph_signals(&mut self) -> Result<PersonaSnapshot, String> {
214        self.require_evolution_writes_enabled()?;
215        let store = self.memory.sqlite_store();
216        self.extractor.evolution_engine.evolve(store)
217    }
218
219    /// Boot: export + validate the agent subgraph.
220    pub fn load_artifact(&self) -> Result<AinlGraphArtifact, String> {
221        AinlGraphArtifact::load(self.memory.sqlite_store(), &self.config.agent_id)
222    }
223
224    /// Same as [`Self::compile_memory_context_for`] with `user_message: None` (semantic relevance falls back
225    /// to the latest episode’s `user_message` when present).
226    pub fn compile_memory_context(&self) -> Result<MemoryContext, String> {
227        self.compile_memory_context_for(None)
228    }
229
230    /// Build [`MemoryContext`] from the live store plus current extractor axis state.
231    pub fn compile_memory_context_for(&self, user_message: Option<&str>) -> Result<MemoryContext, String> {
232        if self.config.agent_id.is_empty() {
233            return Err("RuntimeConfig.agent_id must be set".to_string());
234        }
235        let store = self.memory.sqlite_store();
236        let q = store.query(&self.config.agent_id);
237        let recent_episodes = q.recent_episodes(5)?;
238        let effective_user = user_message
239            .map(|s| s.to_string())
240            .filter(|s| !s.is_empty())
241            .or_else(|| {
242                recent_episodes.first().and_then(|n| {
243                    if let AinlNodeType::Episode { episodic } = &n.node_type {
244                        episodic.user_message.clone().filter(|m| !m.is_empty())
245                    } else {
246                        None
247                    }
248                })
249            });
250        let all_semantic = q.semantic_nodes()?;
251        let relevant_semantic = match effective_user.as_deref() {
252            Some(msg) => self.relevant_semantic_nodes(msg, all_semantic, 10),
253            None => fallback_high_recurrence_semantic(all_semantic, 10),
254        };
255        let active_patches = q.active_patches()?;
256        let persona_snapshot = persona_snapshot_if_evolved(&self.extractor);
257        Ok(MemoryContext {
258            recent_episodes,
259            relevant_semantic,
260            active_patches,
261            persona_snapshot,
262            compiled_at: chrono::Utc::now(),
263        })
264    }
265
266    /// Route `EMIT_TO` edges from an episode to hook targets (host implements [`TurnHooks::on_emit`]).
267    pub fn route_emit_edges(
268        &self,
269        episode_id: Uuid,
270        turn_output_payload: &serde_json::Value,
271    ) -> Result<(), String> {
272        let store = self.memory.sqlite_store();
273        let neighbors = store
274            .query(&self.config.agent_id)
275            .neighbors(episode_id, EMIT_TO_EDGE)?;
276        for n in neighbors {
277            let target = emit_target_name(&n);
278            self.hooks.on_emit(&target, turn_output_payload);
279        }
280        Ok(())
281    }
282
283    /// Full single-turn orchestration (no LLM / no IR parse).
284    pub fn run_turn(&mut self, input: TurnInput) -> Result<TurnOutput, String> {
285        self.delegation_depth += 1;
286        let rt_ptr = self as *mut Self;
287        // Safety: `rt_ptr` aliases `self` for the synchronous body of `run_turn` only; the defer runs on
288        // return before `self` is invalidated.
289        // `scopeguard::defer!` only supports expression statements; use `guard` for the same drop semantics.
290        let _depth_guard = scopeguard::guard((), |()| unsafe {
291            if (*rt_ptr).delegation_depth > 0 {
292                (*rt_ptr).delegation_depth -= 1;
293            }
294        });
295
296        if self.delegation_depth > self.config.max_delegation_depth {
297            let out = TurnOutput {
298                outcome: TurnOutcome::DepthLimitExceeded,
299                ..Default::default()
300            };
301            self.hooks.on_turn_complete(&out);
302            return Ok(out);
303        }
304
305        if !self.config.enable_graph_memory {
306            let memory_context = MemoryContext::default();
307            let out = TurnOutput {
308                memory_context,
309                outcome: TurnOutcome::GraphMemoryDisabled,
310                ..Default::default()
311            };
312            self.hooks.on_turn_complete(&out);
313            return Ok(out);
314        }
315
316        if self.config.agent_id.is_empty() {
317            return Err("RuntimeConfig.agent_id must be set for run_turn".to_string());
318        }
319
320        let span = tracing::info_span!(
321            "ainl_runtime.run_turn",
322            agent_id = %self.config.agent_id,
323            turn = self.turn_count,
324            depth = input.depth,
325        );
326        let _span_enter = span.enter();
327
328        let validation: GraphValidationReport = self
329            .memory
330            .sqlite_store()
331            .validate_graph(&self.config.agent_id)?;
332        if !validation.is_valid {
333            let mut msg = String::from("graph validation failed before turn");
334            for d in &validation.dangling_edge_details {
335                msg.push_str(&format!(
336                    "; {} -> {} [{}]",
337                    d.source_id, d.target_id, d.edge_type
338                ));
339            }
340            return Err(msg);
341        }
342
343        self.hooks
344            .on_artifact_loaded(&self.config.agent_id, validation.node_count);
345
346        let mut patches_failed: Vec<String> = Vec::new();
347        let mut warnings: Vec<String> = Vec::new();
348
349        let t_persona = Instant::now();
350        let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
351            Some(cached.clone())
352        } else {
353            let nodes = self
354                .memory
355                .sqlite_store()
356                .query(&self.config.agent_id)
357                .persona_nodes()?;
358            let compiled = compile_persona_from_nodes(&nodes)?;
359            self.persona_cache = compiled.clone();
360            compiled
361        };
362        self.hooks
363            .on_persona_compiled(persona_prompt_contribution.as_deref());
364        tracing::debug!(
365            target: "ainl_runtime",
366            duration_ms = t_persona.elapsed().as_millis() as u64,
367            has_contribution = persona_prompt_contribution.is_some(),
368            "persona_phase"
369        );
370
371        let t_memory = Instant::now();
372        let memory_context = self.compile_memory_context_for(Some(&input.user_message))?;
373        self.hooks.on_memory_context_ready(&memory_context);
374        tracing::debug!(
375            target: "ainl_runtime",
376            duration_ms = t_memory.elapsed().as_millis() as u64,
377            episode_count = memory_context.recent_episodes.len(),
378            semantic_count = memory_context.relevant_semantic.len(),
379            patch_count = memory_context.active_patches.len(),
380            "memory_context"
381        );
382
383        let t_patches = Instant::now();
384        let patch_dispatch_results = if self.config.enable_graph_memory {
385            self.dispatch_patches_collect(
386                &memory_context.active_patches,
387                &input.frame,
388                &mut patches_failed,
389            )
390        } else {
391            Vec::new()
392        };
393        for r in &patch_dispatch_results {
394            tracing::debug!(
395                target: "ainl_runtime",
396                label = %r.label,
397                dispatched = r.dispatched,
398                fitness_before = r.fitness_before,
399                fitness_after = r.fitness_after,
400                "patch_dispatch"
401            );
402        }
403        tracing::debug!(
404            target: "ainl_runtime",
405            duration_ms = t_patches.elapsed().as_millis() as u64,
406            "patch_dispatch_phase"
407        );
408
409        let dispatched_count = patch_dispatch_results
410            .iter()
411            .filter(|r| r.dispatched)
412            .count() as u32;
413        if dispatched_count >= self.config.max_steps {
414            let out = TurnOutput {
415                patch_dispatch_results,
416                memory_context,
417                persona_prompt_contribution,
418                steps_executed: dispatched_count,
419                outcome: TurnOutcome::StepLimitExceeded {
420                    steps_executed: dispatched_count,
421                },
422                ..Default::default()
423            };
424            self.hooks.on_turn_complete(&out);
425            return Ok(out);
426        }
427
428        let t_episode = Instant::now();
429        let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
430        let episode_id = record_turn_episode(
431            &self.memory,
432            &self.config.agent_id,
433            &input,
434            &tools_canonical,
435        )?;
436        self.hooks.on_episode_recorded(episode_id);
437        tracing::debug!(
438            target: "ainl_runtime",
439            duration_ms = t_episode.elapsed().as_millis() as u64,
440            episode_id = %episode_id,
441            "episode_record"
442        );
443
444        for &tid in &input.emit_targets {
445            self.memory
446                .sqlite_store()
447                .insert_graph_edge_checked(episode_id, tid, EMIT_TO_EDGE)?;
448        }
449
450        let emit_payload = serde_json::json!({
451            "episode_id": episode_id.to_string(),
452            "user_message": input.user_message,
453            "tools_invoked": tools_canonical,
454            "persona_contribution": persona_prompt_contribution,
455            "turn_count": self.turn_count.wrapping_add(1),
456        });
457        if let Err(e) = self.route_emit_edges(episode_id, &emit_payload) {
458            tracing::warn!(error = %e, "emit routing failed — continuing");
459            warnings.push(format!("emit_routing: {e}"));
460        }
461
462        self.turn_count = self.turn_count.wrapping_add(1);
463
464        let should_extract = self.config.extraction_interval > 0
465            && self
466                .turn_count
467                .saturating_sub(self.last_extraction_turn)
468                >= self.config.extraction_interval;
469
470        let t_extract = Instant::now();
471        let (extraction_report, extraction_failed) = if should_extract {
472            let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
473
474            let res = if force_fail {
475                tracing::warn!(error = "test_forced", "extraction pass failed — continuing");
476                tracing::debug!(
477                    target: "ainl_runtime",
478                    duration_ms = t_extract.elapsed().as_millis() as u64,
479                    signals_ingested = 0u64,
480                    skipped = false,
481                    "extraction_pass"
482                );
483                (None, true)
484            } else {
485                match self.extractor.run_pass(self.memory.sqlite_store()) {
486                    Ok(report) => {
487                        tracing::info!(
488                            agent_id = %report.agent_id,
489                            signals_extracted = report.signals_extracted,
490                            signals_applied = report.signals_applied,
491                            semantic_nodes_updated = report.semantic_nodes_updated,
492                            "ainl-graph-extractor pass completed (scheduled)"
493                        );
494                        self.hooks.on_extraction_complete(&report);
495                        self.persona_cache = None;
496                        tracing::debug!(
497                            target: "ainl_runtime",
498                            duration_ms = t_extract.elapsed().as_millis() as u64,
499                            signals_ingested = report.signals_extracted as u64,
500                            skipped = false,
501                            "extraction_pass"
502                        );
503                        (Some(report), false)
504                    }
505                    Err(e) => {
506                        tracing::warn!(error = %e, "extraction pass failed — continuing");
507                        tracing::debug!(
508                            target: "ainl_runtime",
509                            duration_ms = t_extract.elapsed().as_millis() as u64,
510                            signals_ingested = 0u64,
511                            skipped = false,
512                            "extraction_pass"
513                        );
514                        (None, true)
515                    }
516                }
517            };
518            self.last_extraction_turn = self.turn_count;
519            res
520        } else {
521            tracing::debug!(
522                target: "ainl_runtime",
523                duration_ms = t_extract.elapsed().as_millis() as u64,
524                signals_ingested = 0u64,
525                skipped = true,
526                "extraction_pass"
527            );
528            (None, false)
529        };
530
531        let outcome = if extraction_failed
532            || !patches_failed.is_empty()
533            || !warnings.is_empty()
534        {
535            TurnOutcome::PartialSuccess {
536                episode_recorded: true,
537                extraction_failed,
538                patches_failed,
539                warnings,
540            }
541        } else {
542            TurnOutcome::Success
543        };
544
545        let out = TurnOutput {
546            episode_id,
547            persona_prompt_contribution,
548            memory_context,
549            extraction_report,
550            steps_executed: dispatched_count,
551            outcome,
552            patch_dispatch_results,
553        };
554
555        if !self.config.agent_id.is_empty() {
556            let persist_state = RuntimeStateNode {
557                agent_id: self.config.agent_id.clone(),
558                turn_count: self.turn_count,
559                last_extraction_turn: self.last_extraction_turn,
560                last_persona_prompt: self.persona_cache.clone(),
561                updated_at: chrono::Utc::now().to_rfc3339(),
562            };
563            if let Err(e) = self.memory.sqlite_store().save_runtime_state(&persist_state) {
564                tracing::warn!(error = %e, "failed to persist runtime state — non-fatal");
565            }
566        }
567
568        self.hooks.on_turn_complete(&out);
569        Ok(out)
570    }
571
572    /// Score and rank semantic nodes for the current user text (`ainl-semantic-tagger` topic tags + recurrence).
573    pub fn relevant_semantic_nodes(
574        &self,
575        user_message: &str,
576        all_semantic: Vec<AinlMemoryNode>,
577        limit: usize,
578    ) -> Vec<AinlMemoryNode> {
579        let user_tags = infer_topic_tags(user_message);
580        let user_topics: HashSet<String> = user_tags
581            .iter()
582            .filter(|t| t.namespace == TagNamespace::Topic)
583            .map(|t| t.value.to_lowercase())
584            .collect();
585
586        if user_topics.is_empty() {
587            return fallback_high_recurrence_semantic(all_semantic, limit);
588        }
589
590        let mut scored: Vec<(f32, u32, AinlMemoryNode)> = Vec::new();
591        for n in all_semantic {
592            let (score, rec) = match &n.node_type {
593                AinlNodeType::Semantic { semantic } => {
594                    let mut s = 0f32;
595                    if let Some(cluster) = &semantic.topic_cluster {
596                        for slug in cluster
597                            .split([',', ';'])
598                            .map(|s| s.trim().to_lowercase())
599                            .filter(|s| !s.is_empty())
600                        {
601                            if user_topics.contains(&slug) {
602                                s += 1.0;
603                            }
604                        }
605                    }
606                    if s == 0.0 {
607                        for tag in &semantic.tags {
608                            let tl = tag.to_lowercase();
609                            if let Some(rest) = tl.strip_prefix("topic:") {
610                                let slug = rest.trim();
611                                if user_topics.contains(slug) {
612                                    s = 0.5;
613                                    break;
614                                }
615                            }
616                        }
617                    }
618                    (s, semantic.recurrence_count)
619                }
620                _ => (0.0, 0),
621            };
622            if score > 0.0 {
623                scored.push((score, rec, n));
624            }
625        }
626
627        scored.sort_by(|a, b| {
628            b.0.partial_cmp(&a.0)
629                .unwrap_or(std::cmp::Ordering::Equal)
630                .then_with(|| b.1.cmp(&a.1))
631        });
632        scored.into_iter().take(limit).map(|t| t.2).collect()
633    }
634
635    pub fn dispatch_patches(
636        &mut self,
637        patches: &[AinlMemoryNode],
638        frame: &HashMap<String, serde_json::Value>,
639    ) -> Vec<PatchDispatchResult> {
640        let mut discarded = Vec::new();
641        self.dispatch_patches_collect(patches, frame, &mut discarded)
642    }
643
644    fn dispatch_patches_collect(
645        &mut self,
646        patches: &[AinlMemoryNode],
647        frame: &HashMap<String, serde_json::Value>,
648        patches_failed: &mut Vec<String>,
649    ) -> Vec<PatchDispatchResult> {
650        let mut out = Vec::new();
651        for node in patches {
652            let res = self.dispatch_one_patch(node, frame);
653            if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
654                tracing::warn!(label = %res.label, error = %e, "patch fitness write failed — continuing");
655                patches_failed.push(res.label.clone());
656            }
657            out.push(res);
658        }
659        out
660    }
661
662    fn dispatch_one_patch(
663        &mut self,
664        node: &AinlMemoryNode,
665        frame: &HashMap<String, serde_json::Value>,
666    ) -> PatchDispatchResult {
667        let label_default = String::new();
668        let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
669            AinlNodeType::Procedural { procedural } => (
670                procedural_label(procedural),
671                procedural.patch_version,
672                procedural.retired,
673                procedural.declared_reads.clone(),
674                procedural.fitness,
675            ),
676            _ => {
677                return PatchDispatchResult {
678                    label: label_default,
679                    patch_version: 0,
680                    fitness_before: 0.0,
681                    fitness_after: 0.0,
682                    dispatched: false,
683                    skip_reason: Some(PatchSkipReason::NotProcedural),
684                    adapter_output: None,
685                    adapter_name: None,
686                };
687            }
688        };
689
690        if pv == 0 {
691            return PatchDispatchResult {
692                label: label_src,
693                patch_version: pv,
694                fitness_before: fitness_opt.unwrap_or(0.5),
695                fitness_after: fitness_opt.unwrap_or(0.5),
696                dispatched: false,
697                skip_reason: Some(PatchSkipReason::ZeroVersion),
698                adapter_output: None,
699                adapter_name: None,
700            };
701        }
702        if retired {
703            return PatchDispatchResult {
704                label: label_src.clone(),
705                patch_version: pv,
706                fitness_before: fitness_opt.unwrap_or(0.5),
707                fitness_after: fitness_opt.unwrap_or(0.5),
708                dispatched: false,
709                skip_reason: Some(PatchSkipReason::Retired),
710                adapter_output: None,
711                adapter_name: None,
712            };
713        }
714        for key in &reads {
715            if !frame.contains_key(key) {
716                return PatchDispatchResult {
717                    label: label_src.clone(),
718                    patch_version: pv,
719                    fitness_before: fitness_opt.unwrap_or(0.5),
720                    fitness_after: fitness_opt.unwrap_or(0.5),
721                    dispatched: false,
722                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
723                    adapter_output: None,
724                    adapter_name: None,
725                };
726            }
727        }
728
729        let patch_label = label_src.clone();
730        let adapter_key = patch_label.as_str();
731        let (adapter_output, adapter_name) =
732            if let Some(adapter) = self.adapter_registry.get(adapter_key) {
733                match adapter.execute(adapter_key, frame) {
734                    Ok(output) => {
735                        tracing::debug!(
736                            label = %patch_label,
737                            adapter = %adapter_key,
738                            "adapter executed patch"
739                        );
740                        (Some(output), Some(adapter_key.to_string()))
741                    }
742                    Err(e) => {
743                        tracing::warn!(
744                            label = %patch_label,
745                            error = %e,
746                            "adapter execution failed — continuing as metadata dispatch"
747                        );
748                        (None, Some(adapter_key.to_string()))
749                    }
750                }
751            } else {
752                (None, None)
753            };
754
755        let fitness_before = fitness_opt.unwrap_or(0.5);
756        let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
757
758        let store = self.memory.sqlite_store();
759        let updated = match store.read_node(node.id) {
760            Ok(Some(mut n)) => {
761                if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
762                    procedural.fitness = Some(fitness_after);
763                }
764                n
765            }
766            Ok(None) => {
767                return PatchDispatchResult {
768                    label: label_src,
769                    patch_version: pv,
770                    fitness_before,
771                    fitness_after: fitness_before,
772                    dispatched: false,
773                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead(
774                        "node_row".into(),
775                    )),
776                    adapter_output,
777                    adapter_name,
778                };
779            }
780            Err(e) => {
781                return PatchDispatchResult {
782                    label: label_src,
783                    patch_version: pv,
784                    fitness_before,
785                    fitness_after: fitness_before,
786                    dispatched: false,
787                    skip_reason: Some(PatchSkipReason::PersistFailed(e)),
788                    adapter_output,
789                    adapter_name,
790                };
791            }
792        };
793
794        if let Err(e) = self.memory.write_node(&updated) {
795            return PatchDispatchResult {
796                label: label_src.clone(),
797                patch_version: pv,
798                fitness_before,
799                fitness_after: fitness_before,
800                dispatched: false,
801                skip_reason: Some(PatchSkipReason::PersistFailed(e)),
802                adapter_output,
803                adapter_name,
804            };
805        }
806
807        self.hooks
808            .on_patch_dispatched(label_src.as_str(), fitness_after);
809
810        PatchDispatchResult {
811            label: label_src,
812            patch_version: pv,
813            fitness_before,
814            fitness_after,
815            dispatched: true,
816            skip_reason: None,
817            adapter_output,
818            adapter_name,
819        }
820    }
821}
822
823fn emit_target_name(n: &AinlMemoryNode) -> String {
824    match &n.node_type {
825        AinlNodeType::Persona { persona } => persona.trait_name.clone(),
826        AinlNodeType::Procedural { procedural } => procedural_label(procedural),
827        AinlNodeType::Semantic { semantic } => semantic.fact.chars().take(64).collect(),
828        AinlNodeType::Episode { episodic } => episodic.turn_id.to_string(),
829        AinlNodeType::RuntimeState { runtime_state } => {
830            format!("runtime_state:{}", runtime_state.agent_id)
831        }
832    }
833}
834
835fn procedural_label(p: &ProceduralNode) -> String {
836    if !p.label.is_empty() {
837        p.label.clone()
838    } else {
839        p.pattern_name.clone()
840    }
841}
842
843fn fallback_high_recurrence_semantic(all: Vec<AinlMemoryNode>, limit: usize) -> Vec<AinlMemoryNode> {
844    let mut v: Vec<_> = all
845        .into_iter()
846        .filter(|n| {
847            matches!(&n.node_type, AinlNodeType::Semantic { semantic } if semantic.recurrence_count >= 2)
848        })
849        .collect();
850    v.sort_by(|a, b| {
851        let ra = match &a.node_type {
852            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
853            _ => 0,
854        };
855        let rb = match &b.node_type {
856            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
857            _ => 0,
858        };
859        rb.cmp(&ra)
860    });
861    v.into_iter().take(limit).collect()
862}
863
864fn persona_snapshot_if_evolved(extractor: &GraphExtractorTask) -> Option<ainl_persona::PersonaSnapshot> {
865    let snap = extractor.evolution_engine.snapshot();
866    let defaults = default_axis_map(0.5);
867    for axis in PersonaAxis::ALL {
868        let s = snap.axes.get(&axis).map(|a| a.score).unwrap_or(0.5);
869        let d = defaults.get(&axis).map(|a| a.score).unwrap_or(0.5);
870        if (s - d).abs() > INGEST_SCORE_EPSILON {
871            return Some(snap);
872        }
873    }
874    None
875}
876
877fn compile_persona_from_nodes(nodes: &[AinlMemoryNode]) -> Result<Option<String>, String> {
878    if nodes.is_empty() {
879        return Ok(None);
880    }
881    let mut lines = Vec::new();
882    for n in nodes {
883        if let AinlNodeType::Persona { persona } = &n.node_type {
884            lines.push(format_persona_line(persona));
885        }
886    }
887    if lines.is_empty() {
888        Ok(None)
889    } else {
890        Ok(Some(lines.join("\n")))
891    }
892}
893
894fn format_persona_line(p: &PersonaNode) -> String {
895    format!(
896        "- {} (strength {:.2}, layer {:?}, source {:?})",
897        p.trait_name, p.strength, p.layer, p.source
898    )
899}
900
901/// Canonical tool names for episodic storage: [`tag_tool_names`] → `TagNamespace::Tool` values,
902/// deduplicated and sorted (lexicographic). Empty input yields `["turn"]` (same sentinel as before).
903fn normalize_tools_for_episode(tools_invoked: &[String]) -> Vec<String> {
904    if tools_invoked.is_empty() {
905        return vec!["turn".to_string()];
906    }
907    let tags = tag_tool_names(tools_invoked);
908    let mut seen: BTreeSet<String> = BTreeSet::new();
909    for t in tags {
910        if t.namespace == TagNamespace::Tool {
911            seen.insert(t.value);
912        }
913    }
914    if seen.is_empty() {
915        vec!["turn".to_string()]
916    } else {
917        seen.into_iter().collect()
918    }
919}
920
921fn record_turn_episode(
922    memory: &ainl_memory::GraphMemory,
923    agent_id: &str,
924    input: &TurnInput,
925    tools_invoked_canonical: &[String],
926) -> Result<Uuid, String> {
927    let turn_id = Uuid::new_v4();
928    let timestamp = chrono::Utc::now().timestamp();
929    let tools = tools_invoked_canonical.to_vec();
930    let mut node = AinlMemoryNode::new_episode(
931        turn_id,
932        timestamp,
933        tools.clone(),
934        None,
935        input.trace_event.clone(),
936    );
937    node.agent_id = agent_id.to_string();
938    if let AinlNodeType::Episode { ref mut episodic } = node.node_type {
939        episodic.user_message = Some(input.user_message.clone());
940        episodic.tools_invoked = tools;
941    }
942    memory.write_node(&node)?;
943    Ok(node.id)
944}