Skip to main content

ainl_runtime/
runtime.rs

1//! Unified-graph orchestration runtime (v0.2): load, compile context, patch dispatch, record, emit, extract.
2
3use std::collections::{BTreeSet, HashMap, HashSet};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::Arc;
7use std::time::Instant;
8
9use ainl_graph_extractor::GraphExtractorTask;
10use ainl_memory::{
11    AinlMemoryNode, AinlNodeType, GraphStore, GraphValidationReport, PersonaNode, ProceduralNode,
12    RuntimeStateNode, SqliteGraphStore,
13};
14use ainl_persona::axes::default_axis_map;
15use ainl_persona::{
16    EvolutionEngine, PersonaAxis, PersonaSnapshot, RawSignal, INGEST_SCORE_EPSILON,
17};
18use ainl_semantic_tagger::infer_topic_tags;
19use ainl_semantic_tagger::tag_tool_names;
20use ainl_semantic_tagger::TagNamespace;
21use uuid::Uuid;
22
23use crate::adapters::{AdapterRegistry, GraphPatchAdapter};
24use crate::engine::{
25    AinlGraphArtifact, AinlRuntimeError, MemoryContext, PatchDispatchContext, PatchDispatchResult,
26    PatchSkipReason, TurnInput, TurnOutcome, TurnPhase, TurnResult, TurnStatus, TurnWarning,
27    EMIT_TO_EDGE,
28};
29use crate::graph_cell::{GraphCell, SqliteStoreRef};
30use crate::hooks::{NoOpHooks, TurnHooks};
31#[cfg(feature = "async")]
32use crate::hooks::TurnHooksAsync;
33use crate::RuntimeConfig;
34
35/// Orchestrates ainl-memory, persona snapshot state, and graph extraction for one agent.
36///
37/// ## Evolution writes vs ArmaraOS / openfang-runtime
38///
39/// In production ArmaraOS, **openfang-runtime**’s `GraphMemoryWriter::run_persona_evolution_pass`
40/// is the active writer of the evolution persona row ([`crate::EVOLUTION_TRAIT_NAME`]) to each
41/// agent’s `ainl_memory.db`. This struct holds its own [`GraphExtractorTask`] and
42/// [`EvolutionEngine`]. Calling [`Self::persist_evolution_snapshot`] or
43/// [`Self::evolve_persona_from_graph_signals`] **concurrently** with that pass on the **same**
44/// SQLite store is undefined (competing last-writer wins on the same persona node).
45///
46/// Prefer [`Self::with_evolution_writes_enabled(false)`] when a host embeds `AinlRuntime` alongside
47/// openfang while openfang remains the sole evolution writer. [`Self::evolution_engine_mut`] can
48/// still mutate in-memory axis state; calling [`EvolutionEngine::write_persona_node`] yourself
49/// bypasses this guard and must be avoided in that configuration.
50pub struct AinlRuntime {
51    config: RuntimeConfig,
52    memory: GraphCell,
53    extractor: GraphExtractorTask,
54    turn_count: u64,
55    last_extraction_at_turn: u64,
56    /// Current delegation depth for the active `run_turn` call chain (incremented per nested entry).
57    current_depth: Arc<AtomicU32>,
58    hooks: Box<dyn TurnHooks>,
59    /// When `false`, [`Self::persist_evolution_snapshot`] and [`Self::evolve_persona_from_graph_signals`]
60    /// return [`Err`] immediately so this runtime does not compete with another evolution writer
61    /// (e.g. openfang’s post-turn pass) on the same DB. Default: `true`.
62    evolution_writes_enabled: bool,
63    persona_cache: Option<String>,
64    /// Test hook: when set, the next scheduled extraction pass is treated as failed (`PartialSuccess`).
65    #[doc(hidden)]
66    test_force_extraction_failure: bool,
67    /// Test hook: next fitness write-back from procedural dispatch fails without touching SQLite.
68    #[doc(hidden)]
69    test_force_fitness_write_failure: bool,
70    /// Test hook: next runtime-state SQLite persist fails (non-fatal warning).
71    #[doc(hidden)]
72    test_force_runtime_state_write_failure: bool,
73    adapter_registry: AdapterRegistry,
74    /// Optional async hooks for [`Self::run_turn_async`] (see `async` feature).
75    #[cfg(feature = "async")]
76    hooks_async: Option<std::sync::Arc<dyn TurnHooksAsync>>,
77}
78
79impl AinlRuntime {
80    pub fn new(config: RuntimeConfig, store: SqliteGraphStore) -> Self {
81        let agent_id = config.agent_id.clone();
82        let memory = GraphCell::new(store);
83        let (init_turn_count, init_persona_cache, init_last_extraction_at_turn) =
84            if agent_id.is_empty() {
85                (0, None, 0)
86            } else {
87                match memory.read_runtime_state(&agent_id) {
88                    Ok(Some(state)) => {
89                        tracing::info!(
90                            agent_id = %agent_id,
91                            turn_count = state.turn_count,
92                            "restored runtime state"
93                        );
94                        let persona_cache = state
95                            .persona_snapshot_json
96                            .as_ref()
97                            .and_then(|json| serde_json::from_str::<String>(json).ok());
98                        (
99                            state.turn_count,
100                            persona_cache,
101                            state.last_extraction_at_turn,
102                        )
103                    }
104                    Ok(None) => (0, None, 0),
105                    Err(e) => {
106                        tracing::warn!(error = %e, "failed to load runtime state — starting fresh");
107                        (0, None, 0)
108                    }
109                }
110            };
111        Self {
112            extractor: GraphExtractorTask::new(&agent_id),
113            memory,
114            config,
115            turn_count: init_turn_count,
116            last_extraction_at_turn: init_last_extraction_at_turn,
117            current_depth: Arc::new(AtomicU32::new(0)),
118            hooks: Box::new(NoOpHooks),
119            evolution_writes_enabled: true,
120            persona_cache: init_persona_cache,
121            test_force_extraction_failure: false,
122            test_force_fitness_write_failure: false,
123            test_force_runtime_state_write_failure: false,
124            adapter_registry: AdapterRegistry::new(),
125            #[cfg(feature = "async")]
126            hooks_async: None,
127        }
128    }
129
130    /// Register a [`crate::PatchAdapter`] keyed by [`PatchAdapter::name`] (e.g. procedural patch label).
131    pub fn register_adapter(&mut self, adapter: impl crate::PatchAdapter + 'static) {
132        self.adapter_registry.register(adapter);
133    }
134
135    /// Install the reference [`GraphPatchAdapter`] as fallback for procedural patches without a
136    /// label-specific adapter (see [`PatchDispatchContext`]).
137    pub fn register_default_patch_adapters(&mut self) {
138        self.register_adapter(GraphPatchAdapter::new());
139    }
140
141    /// Names of currently registered patch adapters.
142    pub fn registered_adapters(&self) -> Vec<&str> {
143        self.adapter_registry.registered_names()
144    }
145
146    #[doc(hidden)]
147    pub fn test_turn_count(&self) -> u64 {
148        self.turn_count
149    }
150
151    #[doc(hidden)]
152    pub fn test_persona_cache(&self) -> Option<&str> {
153        self.persona_cache.as_deref()
154    }
155
156    #[doc(hidden)]
157    pub fn test_delegation_depth(&self) -> u32 {
158        self.current_depth.load(Ordering::SeqCst)
159    }
160
161    #[doc(hidden)]
162    pub fn test_set_delegation_depth(&mut self, depth: u32) {
163        self.current_depth.store(depth, Ordering::SeqCst);
164    }
165
166    #[doc(hidden)]
167    pub fn test_set_force_extraction_failure(&mut self, fail: bool) {
168        self.test_force_extraction_failure = fail;
169    }
170
171    #[doc(hidden)]
172    pub fn test_set_force_fitness_write_failure(&mut self, fail: bool) {
173        self.test_force_fitness_write_failure = fail;
174    }
175
176    /// Test hook: access the graph extractor task for per-phase error injection.
177    #[doc(hidden)]
178    pub fn test_extractor_mut(&mut self) -> &mut GraphExtractorTask {
179        &mut self.extractor
180    }
181
182    #[doc(hidden)]
183    pub fn test_set_force_runtime_state_write_failure(&mut self, fail: bool) {
184        self.test_force_runtime_state_write_failure = fail;
185    }
186
187    pub fn with_hooks(mut self, hooks: impl TurnHooks + 'static) -> Self {
188        self.hooks = Box::new(hooks);
189        self
190    }
191
192    /// Install async turn hooks ([`TurnHooksAsync`]) for [`Self::run_turn_async`].
193    #[cfg(feature = "async")]
194    pub fn with_hooks_async(mut self, hooks: std::sync::Arc<dyn TurnHooksAsync>) -> Self {
195        self.hooks_async = Some(hooks);
196        self
197    }
198
199    /// Set whether [`Self::persist_evolution_snapshot`] and [`Self::evolve_persona_from_graph_signals`]
200    /// may write the evolution persona row. When `false`, those methods return [`Err`]. Chaining
201    /// after [`Self::new`] is the supported way to disable writes for hosts that delegate evolution
202    /// persistence elsewhere (see struct-level docs).
203    pub fn with_evolution_writes_enabled(mut self, enabled: bool) -> Self {
204        self.evolution_writes_enabled = enabled;
205        self
206    }
207
208    fn require_evolution_writes_enabled(&self) -> Result<(), String> {
209        if self.evolution_writes_enabled {
210            Ok(())
211        } else {
212            Err(
213                "ainl_runtime: evolution_writes_enabled is false — persist_evolution_snapshot and \
214                 evolve_persona_from_graph_signals are disabled so this runtime does not compete \
215                 with openfang-runtime GraphMemoryWriter::run_persona_evolution_pass on the same \
216                 ainl_memory.db"
217                    .to_string(),
218            )
219        }
220    }
221
222    /// Borrow the backing SQLite store (same connection as graph memory).
223    ///
224    /// When built with the `async` feature, this locks the in-runtime graph mutex for the lifetime
225    /// of the returned guard (see [`SqliteStoreRef`]). That mutex is [`std::sync::Mutex`] (shared
226    /// via [`std::sync::Arc`]), not `tokio::sync::Mutex`, so this helper remains usable from Tokio
227    /// worker threads for quick reads without forcing an async lock API.
228    pub fn sqlite_store(&self) -> SqliteStoreRef<'_> {
229        self.memory.sqlite_ref()
230    }
231
232    /// Borrow the persona [`EvolutionEngine`] for this runtime’s agent.
233    ///
234    /// This is the **same** `EvolutionEngine` instance held by [`GraphExtractorTask::evolution_engine`].
235    /// Scheduled [`GraphExtractorTask::run_pass`] continues to feed graph + pattern signals into it;
236    /// hosts may also call [`EvolutionEngine::ingest_signals`], [`EvolutionEngine::correction_tick`],
237    /// [`EvolutionEngine::extract_signals`], or [`EvolutionEngine::evolve`] directly, then
238    /// [`Self::persist_evolution_snapshot`] to write the [`PersonaSnapshot`] row ([`crate::EVOLUTION_TRAIT_NAME`]).
239    pub fn evolution_engine(&self) -> &EvolutionEngine {
240        &self.extractor.evolution_engine
241    }
242
243    /// Mutable access to the persona [`EvolutionEngine`] (see [`Self::evolution_engine`]).
244    ///
245    /// Direct calls to [`EvolutionEngine::write_persona_node`] bypass [`Self::evolution_writes_enabled`].
246    pub fn evolution_engine_mut(&mut self) -> &mut EvolutionEngine {
247        &mut self.extractor.evolution_engine
248    }
249
250    /// Ingest explicit [`RawSignal`]s without reading the graph (wrapper for [`EvolutionEngine::ingest_signals`]).
251    pub fn apply_evolution_signals(&mut self, signals: Vec<RawSignal>) -> usize {
252        self.extractor.evolution_engine.ingest_signals(signals)
253    }
254
255    /// Apply a host correction nudge on one axis ([`EvolutionEngine::correction_tick`]).
256    pub fn evolution_correction_tick(&mut self, axis: PersonaAxis, correction: f32) {
257        self.extractor
258            .evolution_engine
259            .correction_tick(axis, correction);
260    }
261
262    /// Snapshot current axis EMA state and persist the evolution persona bundle to the store.
263    ///
264    /// Returns [`Err`] when [`Self::evolution_writes_enabled`] is `false` (see [`Self::with_evolution_writes_enabled`]).
265    pub fn persist_evolution_snapshot(&mut self) -> Result<PersonaSnapshot, String> {
266        self.require_evolution_writes_enabled()?;
267        let snap = self.extractor.evolution_engine.snapshot();
268        self.memory.with(|m| {
269            self.extractor
270                .evolution_engine
271                .write_persona_node(m.sqlite_store(), &snap)
272        })?;
273        Ok(snap)
274    }
275
276    /// Graph-backed evolution only: extract signals from the store, ingest, write ([`EvolutionEngine::evolve`]).
277    ///
278    /// This does **not** run semantic `recurrence_count` bumps or the extractor’s `extract_pass`
279    /// heuristics — use [`GraphExtractorTask::run_pass`] for the full scheduled pipeline.
280    ///
281    /// Returns [`Err`] when [`Self::evolution_writes_enabled`] is `false` (see [`Self::with_evolution_writes_enabled`]).
282    pub fn evolve_persona_from_graph_signals(&mut self) -> Result<PersonaSnapshot, String> {
283        self.require_evolution_writes_enabled()?;
284        self.memory
285            .with(|m| self.extractor.evolution_engine.evolve(m.sqlite_store()))
286    }
287
288    /// Boot: export + validate the agent subgraph.
289    pub fn load_artifact(&self) -> Result<AinlGraphArtifact, String> {
290        self.memory
291            .with(|m| AinlGraphArtifact::load(m.sqlite_store(), &self.config.agent_id))
292    }
293
294    /// Same as [`Self::compile_memory_context_for`] with `user_message: None` (treated as empty for
295    /// semantic ranking; see [`Self::compile_memory_context_for`]).
296    pub fn compile_memory_context(&self) -> Result<MemoryContext, String> {
297        self.compile_memory_context_for(None)
298    }
299
300    /// Build [`MemoryContext`] from the live store plus current extractor axis state.
301    ///
302    /// `relevant_semantic` is ranked from this `user_message` only (`ainl-semantic-tagger` topic tags
303    /// + recurrence); `None` is treated as empty (high-recurrence fallback), not the latest episode text.
304    pub fn compile_memory_context_for(
305        &self,
306        user_message: Option<&str>,
307    ) -> Result<MemoryContext, String> {
308        if self.config.agent_id.is_empty() {
309            return Err("RuntimeConfig.agent_id must be set".to_string());
310        }
311        self.memory.with(|m| {
312        let store = m.sqlite_store();
313        let q = store.query(&self.config.agent_id);
314        let recent_episodes = q.recent_episodes(5)?;
315        let all_semantic = q.semantic_nodes()?;
316        let relevant_semantic = self.relevant_semantic_nodes(
317            user_message.unwrap_or(""),
318            all_semantic,
319            10,
320        );
321        let active_patches = q.active_patches()?;
322        let persona_snapshot = persona_snapshot_if_evolved(&self.extractor);
323        Ok(MemoryContext {
324            recent_episodes,
325            relevant_semantic,
326            active_patches,
327            persona_snapshot,
328            compiled_at: chrono::Utc::now(),
329        })
330        })
331    }
332
333    /// Route `EMIT_TO` edges from an episode to hook targets (host implements [`TurnHooks::on_emit`]).
334    pub fn route_emit_edges(
335        &self,
336        episode_id: Uuid,
337        turn_output_payload: &serde_json::Value,
338    ) -> Result<(), String> {
339        self.memory.with(|m| {
340            let store = m.sqlite_store();
341            let neighbors = store
342                .query(&self.config.agent_id)
343                .neighbors(episode_id, EMIT_TO_EDGE)?;
344            for n in neighbors {
345                let target = emit_target_name(&n);
346                self.hooks.on_emit(&target, turn_output_payload);
347            }
348            Ok(())
349        })
350    }
351
352    /// Full single-turn orchestration (no LLM / no IR parse).
353    pub fn run_turn(&mut self, input: TurnInput) -> Result<TurnOutcome, AinlRuntimeError> {
354        let depth = self.current_depth.fetch_add(1, Ordering::SeqCst);
355        let cd = Arc::clone(&self.current_depth);
356        let _depth_guard = scopeguard::guard((), move |()| {
357            cd.fetch_sub(1, Ordering::SeqCst);
358        });
359
360        if depth >= self.config.max_delegation_depth {
361            return Err(AinlRuntimeError::DelegationDepthExceeded {
362                depth,
363                max: self.config.max_delegation_depth,
364            });
365        }
366
367        if !self.config.enable_graph_memory {
368            let memory_context = MemoryContext::default();
369            let result = TurnResult {
370                memory_context,
371                status: TurnStatus::GraphMemoryDisabled,
372                ..Default::default()
373            };
374            let outcome = TurnOutcome::Complete(result);
375            self.hooks.on_turn_complete(&outcome);
376            return Ok(outcome);
377        }
378
379        if self.config.agent_id.is_empty() {
380            return Err(AinlRuntimeError::Message(
381                "RuntimeConfig.agent_id must be set for run_turn".into(),
382            ));
383        }
384
385        let span = tracing::info_span!(
386            "ainl_runtime.run_turn",
387            agent_id = %self.config.agent_id,
388            turn = self.turn_count,
389            depth = input.depth,
390        );
391        let _span_enter = span.enter();
392
393        let validation: GraphValidationReport = self
394            .memory
395            .with(|m| m.sqlite_store().validate_graph(&self.config.agent_id))
396            .map_err(AinlRuntimeError::from)?;
397        if !validation.is_valid {
398            let mut msg = String::from("graph validation failed before turn");
399            for d in &validation.dangling_edge_details {
400                msg.push_str(&format!(
401                    "; {} -> {} [{}]",
402                    d.source_id, d.target_id, d.edge_type
403                ));
404            }
405            return Err(AinlRuntimeError::Message(msg));
406        }
407
408        self.hooks
409            .on_artifact_loaded(&self.config.agent_id, validation.node_count);
410
411        let mut turn_warnings: Vec<TurnWarning> = Vec::new();
412
413        let t_persona = Instant::now();
414        let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
415            Some(cached.clone())
416        } else {
417            let nodes = self
418                .memory
419                .with(|m| {
420                    m.sqlite_store()
421                        .query(&self.config.agent_id)
422                        .persona_nodes()
423                })
424                .map_err(AinlRuntimeError::from)?;
425            let compiled = compile_persona_from_nodes(&nodes).map_err(AinlRuntimeError::from)?;
426            self.persona_cache = compiled.clone();
427            compiled
428        };
429        self.hooks
430            .on_persona_compiled(persona_prompt_contribution.as_deref());
431        tracing::debug!(
432            target: "ainl_runtime",
433            duration_ms = t_persona.elapsed().as_millis() as u64,
434            has_contribution = persona_prompt_contribution.is_some(),
435            "persona_phase"
436        );
437
438        let t_memory = Instant::now();
439        let memory_context = self
440            .compile_memory_context_for(Some(&input.user_message))
441            .map_err(AinlRuntimeError::from)?;
442        self.hooks.on_memory_context_ready(&memory_context);
443        tracing::debug!(
444            target: "ainl_runtime",
445            duration_ms = t_memory.elapsed().as_millis() as u64,
446            episode_count = memory_context.recent_episodes.len(),
447            semantic_count = memory_context.relevant_semantic.len(),
448            patch_count = memory_context.active_patches.len(),
449            "memory_context"
450        );
451
452        let t_patches = Instant::now();
453        let patch_dispatch_results = if self.config.enable_graph_memory {
454            self.dispatch_patches_collect(
455                &memory_context.active_patches,
456                &input.frame,
457                &mut turn_warnings,
458            )
459        } else {
460            Vec::new()
461        };
462        for r in &patch_dispatch_results {
463            tracing::debug!(
464                target: "ainl_runtime",
465                label = %r.label,
466                dispatched = r.dispatched,
467                fitness_before = r.fitness_before,
468                fitness_after = r.fitness_after,
469                "patch_dispatch"
470            );
471        }
472        tracing::debug!(
473            target: "ainl_runtime",
474            duration_ms = t_patches.elapsed().as_millis() as u64,
475            "patch_dispatch_phase"
476        );
477
478        let dispatched_count = patch_dispatch_results
479            .iter()
480            .filter(|r| r.dispatched)
481            .count() as u32;
482        if dispatched_count >= self.config.max_steps {
483            let result = TurnResult {
484                patch_dispatch_results,
485                memory_context,
486                persona_prompt_contribution,
487                steps_executed: dispatched_count,
488                status: TurnStatus::StepLimitExceeded {
489                    steps_executed: dispatched_count,
490                },
491                ..Default::default()
492            };
493            let outcome = TurnOutcome::Complete(result);
494            self.hooks.on_turn_complete(&outcome);
495            return Ok(outcome);
496        }
497
498        let t_episode = Instant::now();
499        let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
500        let episode_id = match self.memory.with(|m| {
501            record_turn_episode(m, &self.config.agent_id, &input, &tools_canonical)
502        }) {
503            Ok(id) => id,
504            Err(e) => {
505                tracing::warn!(
506                    phase = ?TurnPhase::EpisodeWrite,
507                    error = %e,
508                    "non-fatal turn write failed — continuing"
509                );
510                turn_warnings.push(TurnWarning {
511                    phase: TurnPhase::EpisodeWrite,
512                    error: e,
513                });
514                Uuid::nil()
515            }
516        };
517        self.hooks.on_episode_recorded(episode_id);
518        tracing::debug!(
519            target: "ainl_runtime",
520            duration_ms = t_episode.elapsed().as_millis() as u64,
521            episode_id = %episode_id,
522            "episode_record"
523        );
524
525        if !episode_id.is_nil() {
526            for &tid in &input.emit_targets {
527                if let Err(e) = self.memory.with(|m| {
528                    m.sqlite_store()
529                        .insert_graph_edge_checked(episode_id, tid, EMIT_TO_EDGE)
530                }) {
531                    tracing::warn!(
532                        phase = ?TurnPhase::EpisodeWrite,
533                        error = %e,
534                        "non-fatal turn write failed — continuing"
535                    );
536                    turn_warnings.push(TurnWarning {
537                        phase: TurnPhase::EpisodeWrite,
538                        error: e,
539                    });
540                }
541            }
542        }
543
544        let emit_payload = serde_json::json!({
545            "episode_id": episode_id.to_string(),
546            "user_message": input.user_message,
547            "tools_invoked": tools_canonical,
548            "persona_contribution": persona_prompt_contribution,
549            "turn_count": self.turn_count.wrapping_add(1),
550        });
551        if let Err(e) = self.route_emit_edges(episode_id, &emit_payload) {
552            tracing::warn!(
553                phase = ?TurnPhase::EpisodeWrite,
554                error = %e,
555                "non-fatal turn write failed — continuing"
556            );
557            turn_warnings.push(TurnWarning {
558                phase: TurnPhase::EpisodeWrite,
559                error: format!("emit_routing: {e}"),
560            });
561        }
562
563        self.turn_count = self.turn_count.wrapping_add(1);
564
565        let should_extract = self.config.extraction_interval > 0
566            && self.turn_count.saturating_sub(self.last_extraction_at_turn)
567                >= self.config.extraction_interval as u64;
568
569        let t_extract = Instant::now();
570        let (extraction_report, _extraction_failed) = if should_extract {
571            let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
572
573            let res = if force_fail {
574                let e = "test_forced".to_string();
575                tracing::warn!(
576                    phase = ?TurnPhase::ExtractionPass,
577                    error = %e,
578                    "non-fatal turn write failed — continuing"
579                );
580                turn_warnings.push(TurnWarning {
581                    phase: TurnPhase::ExtractionPass,
582                    error: e,
583                });
584                tracing::debug!(
585                    target: "ainl_runtime",
586                    duration_ms = t_extract.elapsed().as_millis() as u64,
587                    signals_ingested = 0u64,
588                    skipped = false,
589                    "extraction_pass"
590                );
591                (None, true)
592            } else {
593                let report = self
594                    .memory
595                    .with(|m| self.extractor.run_pass(m.sqlite_store()));
596                if let Some(ref e) = report.extract_error {
597                    tracing::warn!(
598                        phase = ?TurnPhase::ExtractionPass,
599                        error = %e,
600                        "non-fatal turn write failed — continuing"
601                    );
602                    turn_warnings.push(TurnWarning {
603                        phase: TurnPhase::ExtractionPass,
604                        error: e.clone(),
605                    });
606                }
607                if let Some(ref e) = report.pattern_error {
608                    tracing::warn!(
609                        phase = ?TurnPhase::PatternPersistence,
610                        error = %e,
611                        "non-fatal turn write failed — continuing"
612                    );
613                    turn_warnings.push(TurnWarning {
614                        phase: TurnPhase::PatternPersistence,
615                        error: e.clone(),
616                    });
617                }
618                if let Some(ref e) = report.persona_error {
619                    tracing::warn!(
620                        phase = ?TurnPhase::PersonaEvolution,
621                        error = %e,
622                        "non-fatal turn write failed — continuing"
623                    );
624                    turn_warnings.push(TurnWarning {
625                        phase: TurnPhase::PersonaEvolution,
626                        error: e.clone(),
627                    });
628                }
629                let extraction_failed = report.has_errors();
630                if !extraction_failed {
631                    tracing::info!(
632                        agent_id = %report.agent_id,
633                        signals_extracted = report.signals_extracted,
634                        signals_applied = report.signals_applied,
635                        semantic_nodes_updated = report.semantic_nodes_updated,
636                        "ainl-graph-extractor pass completed (scheduled)"
637                    );
638                }
639                self.hooks.on_extraction_complete(&report);
640                self.persona_cache = None;
641                tracing::debug!(
642                    target: "ainl_runtime",
643                    duration_ms = t_extract.elapsed().as_millis() as u64,
644                    signals_ingested = report.signals_extracted as u64,
645                    skipped = false,
646                    "extraction_pass"
647                );
648                (Some(report), extraction_failed)
649            };
650            self.last_extraction_at_turn = self.turn_count;
651            res
652        } else {
653            tracing::debug!(
654                target: "ainl_runtime",
655                duration_ms = t_extract.elapsed().as_millis() as u64,
656                signals_ingested = 0u64,
657                skipped = true,
658                "extraction_pass"
659            );
660            (None, false)
661        };
662
663        if let Err(e) = self.memory.with(|m| {
664            try_export_graph_json_armaraos(m.sqlite_store(), &self.config.agent_id)
665        }) {
666            tracing::warn!(
667                phase = ?TurnPhase::ExportRefresh,
668                error = %e,
669                "non-fatal turn write failed — continuing"
670            );
671            turn_warnings.push(TurnWarning {
672                phase: TurnPhase::ExportRefresh,
673                error: e,
674            });
675        }
676
677        if !self.config.agent_id.is_empty() {
678            let state = RuntimeStateNode {
679                agent_id: self.config.agent_id.clone(),
680                turn_count: self.turn_count,
681                last_extraction_at_turn: self.last_extraction_at_turn,
682                persona_snapshot_json: self
683                    .persona_cache
684                    .as_ref()
685                    .and_then(|p| serde_json::to_string(p).ok()),
686                updated_at: chrono::Utc::now().timestamp(),
687            };
688            let write_res = if std::mem::take(&mut self.test_force_runtime_state_write_failure) {
689                Err("injected runtime state write failure".to_string())
690            } else {
691                self.memory.with(|m| m.write_runtime_state(&state))
692            };
693            if let Err(e) = write_res {
694                tracing::warn!(
695                    phase = ?TurnPhase::RuntimeStatePersist,
696                    error = %e,
697                    "failed to persist runtime state — cadence will reset on next restart"
698                );
699                turn_warnings.push(TurnWarning {
700                    phase: TurnPhase::RuntimeStatePersist,
701                    error: e,
702                });
703            }
704        }
705
706        // Fire vitals hook before building the outcome so hosts can act synchronously.
707        if let (Some(gate), Some(phase), Some(trust)) = (
708            input.vitals_gate.as_deref(),
709            input.vitals_phase.as_deref(),
710            input.vitals_trust,
711        ) {
712            self.hooks.on_vitals_classified(gate, phase, trust);
713        }
714
715        let result = TurnResult {
716            episode_id,
717            persona_prompt_contribution,
718            memory_context,
719            extraction_report,
720            steps_executed: dispatched_count,
721            patch_dispatch_results,
722            status: TurnStatus::Ok,
723            vitals_gate: input.vitals_gate.clone(),
724            vitals_phase: input.vitals_phase.clone(),
725            vitals_trust: input.vitals_trust,
726        };
727
728        let outcome = if turn_warnings.is_empty() {
729            TurnOutcome::Complete(result)
730        } else {
731            TurnOutcome::PartialSuccess {
732                result,
733                warnings: turn_warnings,
734            }
735        };
736
737        self.hooks.on_turn_complete(&outcome);
738        Ok(outcome)
739    }
740
741    /// Score and rank semantic nodes from `user_message` via `infer_topic_tags` (topic overlap +
742    /// recurrence tiebreaker), or high-recurrence fallback when the message is empty or yields no topic tags.
743    ///
744    /// **Vitals trust bonus (Gap L):** nodes tagged `vitals:*:pass` receive a `+0.2 * confidence`
745    /// bonus so high-trust episodes surface above zero-topic-overlap peers. Nodes tagged
746    /// `vitals:elevated` receive a `-0.1` penalty to deprioritise warn/fail episodes in recall.
747    /// This is a secondary signal — topic overlap still dominates.
748    fn relevant_semantic_nodes(
749        &self,
750        user_message: &str,
751        all_semantic: Vec<AinlMemoryNode>,
752        limit: usize,
753    ) -> Vec<AinlMemoryNode> {
754        let user_tags = infer_topic_tags(user_message);
755        let user_topics: HashSet<String> = user_tags
756            .iter()
757            .filter(|t| t.namespace == TagNamespace::Topic)
758            .map(|t| t.value.to_lowercase())
759            .collect();
760
761        if user_message.trim().is_empty() || user_topics.is_empty() {
762            return fallback_high_recurrence_semantic(all_semantic, limit);
763        }
764
765        let mut scored: Vec<(f32, u32, AinlMemoryNode)> = Vec::new();
766        for n in all_semantic {
767            let (score, rec) = match &n.node_type {
768                AinlNodeType::Semantic { semantic } => {
769                    let mut s = 0f32;
770                    if let Some(cluster) = &semantic.topic_cluster {
771                        for slug in cluster
772                            .split([',', ';'])
773                            .map(|s| s.trim().to_lowercase())
774                            .filter(|s| !s.is_empty())
775                        {
776                            if user_topics.contains(&slug) {
777                                s += 1.0;
778                            }
779                        }
780                    }
781                    if s == 0.0 {
782                        for tag in &semantic.tags {
783                            let tl = tag.to_lowercase();
784                            if let Some(rest) = tl.strip_prefix("topic:") {
785                                let slug = rest.trim().to_lowercase();
786                                if user_topics.contains(&slug) {
787                                    s = 0.5;
788                                    break;
789                                }
790                            }
791                        }
792                    }
793                    // Vitals trust bonus: `vitals:*:pass` → +0.2 * node confidence.
794                    // `vitals:elevated` → -0.1 penalty (warn/fail gate).
795                    // Uses the node's own confidence as a proxy for vitals trust since
796                    // SemanticNode.tags is Vec<String> without per-tag confidence.
797                    let confidence = semantic.confidence;
798                    for tag in &semantic.tags {
799                        let tl = tag.to_lowercase();
800                        if tl.starts_with("vitals:") {
801                            if tl.ends_with(":pass") {
802                                s += 0.2 * confidence;
803                            } else if tl == "vitals:elevated" {
804                                s -= 0.1;
805                            }
806                        }
807                    }
808                    (s, semantic.recurrence_count)
809                }
810                _ => (0.0, 0),
811            };
812            scored.push((score, rec, n));
813        }
814
815        scored.sort_by(|a, b| {
816            b.0.partial_cmp(&a.0)
817                .unwrap_or(std::cmp::Ordering::Equal)
818                .then_with(|| b.1.cmp(&a.1))
819        });
820        scored.into_iter().take(limit).map(|t| t.2).collect()
821    }
822
823    pub fn dispatch_patches(
824        &mut self,
825        patches: &[AinlMemoryNode],
826        frame: &HashMap<String, serde_json::Value>,
827    ) -> Vec<PatchDispatchResult> {
828        let mut w = Vec::new();
829        self.dispatch_patches_collect(patches, frame, &mut w)
830    }
831
832    fn dispatch_patches_collect(
833        &mut self,
834        patches: &[AinlMemoryNode],
835        frame: &HashMap<String, serde_json::Value>,
836        turn_warnings: &mut Vec<TurnWarning>,
837    ) -> Vec<PatchDispatchResult> {
838        let mut out = Vec::new();
839        for node in patches {
840            let res = self.dispatch_one_patch(node, frame);
841            if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
842                tracing::warn!(
843                    phase = ?TurnPhase::FitnessWriteBack,
844                    error = %e,
845                    "non-fatal turn write failed — continuing"
846                );
847                turn_warnings.push(TurnWarning {
848                    phase: TurnPhase::FitnessWriteBack,
849                    error: format!("{}: {}", res.label, e),
850                });
851            }
852            out.push(res);
853        }
854        out
855    }
856
857    fn dispatch_one_patch(
858        &mut self,
859        node: &AinlMemoryNode,
860        frame: &HashMap<String, serde_json::Value>,
861    ) -> PatchDispatchResult {
862        let label_default = String::new();
863        let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
864            AinlNodeType::Procedural { procedural } => (
865                procedural_label(procedural),
866                procedural.patch_version,
867                procedural.retired,
868                procedural.declared_reads.clone(),
869                procedural.fitness,
870            ),
871            _ => {
872                return PatchDispatchResult {
873                    label: label_default,
874                    patch_version: 0,
875                    fitness_before: 0.0,
876                    fitness_after: 0.0,
877                    dispatched: false,
878                    skip_reason: Some(PatchSkipReason::NotProcedural),
879                    adapter_output: None,
880                    adapter_name: None,
881                };
882            }
883        };
884
885        if pv == 0 {
886            return PatchDispatchResult {
887                label: label_src,
888                patch_version: pv,
889                fitness_before: fitness_opt.unwrap_or(0.5),
890                fitness_after: fitness_opt.unwrap_or(0.5),
891                dispatched: false,
892                skip_reason: Some(PatchSkipReason::ZeroVersion),
893                adapter_output: None,
894                adapter_name: None,
895            };
896        }
897        if retired {
898            return PatchDispatchResult {
899                label: label_src.clone(),
900                patch_version: pv,
901                fitness_before: fitness_opt.unwrap_or(0.5),
902                fitness_after: fitness_opt.unwrap_or(0.5),
903                dispatched: false,
904                skip_reason: Some(PatchSkipReason::Retired),
905                adapter_output: None,
906                adapter_name: None,
907            };
908        }
909        for key in &reads {
910            if !frame.contains_key(key) {
911                return PatchDispatchResult {
912                    label: label_src.clone(),
913                    patch_version: pv,
914                    fitness_before: fitness_opt.unwrap_or(0.5),
915                    fitness_after: fitness_opt.unwrap_or(0.5),
916                    dispatched: false,
917                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
918                    adapter_output: None,
919                    adapter_name: None,
920                };
921            }
922        }
923
924        let patch_label = label_src.clone();
925        let adapter_key = patch_label.as_str();
926        let ctx = PatchDispatchContext {
927            patch_label: adapter_key,
928            node,
929            frame,
930        };
931        let (adapter_output, adapter_name) = if let Some(adapter) = self
932            .adapter_registry
933            .get(adapter_key)
934            .or_else(|| self.adapter_registry.get(GraphPatchAdapter::NAME))
935        {
936            let aname = adapter.name().to_string();
937            match adapter.execute_patch(&ctx) {
938                Ok(output) => {
939                    tracing::debug!(
940                        label = %patch_label,
941                        adapter = %aname,
942                        "adapter executed patch"
943                    );
944                    (Some(output), Some(aname))
945                }
946                Err(e) => {
947                    tracing::warn!(
948                        label = %patch_label,
949                        adapter = %aname,
950                        error = %e,
951                        "adapter execution failed — continuing as metadata dispatch"
952                    );
953                    (None, Some(aname))
954                }
955            }
956        } else {
957            (None, None)
958        };
959
960        let fitness_before = fitness_opt.unwrap_or(0.5);
961        let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
962
963        let updated = match self.memory.with(|m| {
964            let store = m.sqlite_store();
965            store.read_node(node.id)
966        }) {
967            Ok(Some(mut n)) => {
968                if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
969                    procedural.fitness = Some(fitness_after);
970                }
971                n
972            }
973            Ok(None) => {
974                return PatchDispatchResult {
975                    label: label_src,
976                    patch_version: pv,
977                    fitness_before,
978                    fitness_after: fitness_before,
979                    dispatched: false,
980                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead("node_row".into())),
981                    adapter_output,
982                    adapter_name,
983                };
984            }
985            Err(e) => {
986                return PatchDispatchResult {
987                    label: label_src,
988                    patch_version: pv,
989                    fitness_before,
990                    fitness_after: fitness_before,
991                    dispatched: false,
992                    skip_reason: Some(PatchSkipReason::PersistFailed(e)),
993                    adapter_output,
994                    adapter_name,
995                };
996            }
997        };
998
999        if self.test_force_fitness_write_failure {
1000            self.test_force_fitness_write_failure = false;
1001            let e = "injected fitness write failure".to_string();
1002            return PatchDispatchResult {
1003                label: label_src.clone(),
1004                patch_version: pv,
1005                fitness_before,
1006                fitness_after: fitness_before,
1007                dispatched: false,
1008                skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1009                adapter_output,
1010                adapter_name,
1011            };
1012        }
1013
1014        if let Err(e) = self.memory.with(|m| m.write_node(&updated)) {
1015            return PatchDispatchResult {
1016                label: label_src.clone(),
1017                patch_version: pv,
1018                fitness_before,
1019                fitness_after: fitness_before,
1020                dispatched: false,
1021                skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1022                adapter_output,
1023                adapter_name,
1024            };
1025        }
1026
1027        self.hooks
1028            .on_patch_dispatched(label_src.as_str(), fitness_after);
1029
1030        PatchDispatchResult {
1031            label: label_src,
1032            patch_version: pv,
1033            fitness_before,
1034            fitness_after,
1035            dispatched: true,
1036            skip_reason: None,
1037            adapter_output,
1038            adapter_name,
1039        }
1040    }
1041}
1042
1043pub(crate) fn emit_target_name(n: &AinlMemoryNode) -> String {
1044    match &n.node_type {
1045        AinlNodeType::Persona { persona } => persona.trait_name.clone(),
1046        AinlNodeType::Procedural { procedural } => procedural_label(procedural),
1047        AinlNodeType::Semantic { semantic } => semantic.fact.chars().take(64).collect(),
1048        AinlNodeType::Episode { episodic } => episodic.turn_id.to_string(),
1049        AinlNodeType::RuntimeState { runtime_state } => {
1050            format!("runtime_state:{}", runtime_state.agent_id)
1051        }
1052    }
1053}
1054
1055pub(crate) fn procedural_label(p: &ProceduralNode) -> String {
1056    if !p.label.is_empty() {
1057        p.label.clone()
1058    } else {
1059        p.pattern_name.clone()
1060    }
1061}
1062
1063pub(crate) fn fallback_high_recurrence_semantic(
1064    all: Vec<AinlMemoryNode>,
1065    limit: usize,
1066) -> Vec<AinlMemoryNode> {
1067    let mut v: Vec<_> = all
1068        .into_iter()
1069        .filter(|n| {
1070            matches!(&n.node_type, AinlNodeType::Semantic { semantic } if semantic.recurrence_count >= 2)
1071        })
1072        .collect();
1073    v.sort_by(|a, b| {
1074        let ra = match &a.node_type {
1075            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1076            _ => 0,
1077        };
1078        let rb = match &b.node_type {
1079            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1080            _ => 0,
1081        };
1082        rb.cmp(&ra)
1083    });
1084    v.into_iter().take(limit).collect()
1085}
1086
1087pub(crate) fn persona_snapshot_if_evolved(
1088    extractor: &GraphExtractorTask,
1089) -> Option<ainl_persona::PersonaSnapshot> {
1090    let snap = extractor.evolution_engine.snapshot();
1091    let defaults = default_axis_map(0.5);
1092    for axis in PersonaAxis::ALL {
1093        let s = snap.axes.get(&axis).map(|a| a.score).unwrap_or(0.5);
1094        let d = defaults.get(&axis).map(|a| a.score).unwrap_or(0.5);
1095        if (s - d).abs() > INGEST_SCORE_EPSILON {
1096            return Some(snap);
1097        }
1098    }
1099    None
1100}
1101
1102pub(crate) fn compile_persona_from_nodes(
1103    nodes: &[AinlMemoryNode],
1104) -> Result<Option<String>, String> {
1105    if nodes.is_empty() {
1106        return Ok(None);
1107    }
1108    let mut lines = Vec::new();
1109    for n in nodes {
1110        if let AinlNodeType::Persona { persona } = &n.node_type {
1111            lines.push(format_persona_line(persona));
1112        }
1113    }
1114    if lines.is_empty() {
1115        Ok(None)
1116    } else {
1117        Ok(Some(lines.join("\n")))
1118    }
1119}
1120
1121fn format_persona_line(p: &PersonaNode) -> String {
1122    format!(
1123        "- {} (strength {:.2}, layer {:?}, source {:?})",
1124        p.trait_name, p.strength, p.layer, p.source
1125    )
1126}
1127
1128/// Canonical tool names for episodic storage: [`tag_tool_names`] → `TagNamespace::Tool` values,
1129/// deduplicated and sorted (lexicographic). Empty input yields `["turn"]` (same sentinel as before).
1130/// Refresh `{AINL_GRAPH_MEMORY_ARMARAOS_EXPORT}/{agent_id}_graph_export.json` when the env var is set.
1131pub(crate) fn try_export_graph_json_armaraos(
1132    store: &SqliteGraphStore,
1133    agent_id: &str,
1134) -> Result<(), String> {
1135    let trimmed = std::env::var("AINL_GRAPH_MEMORY_ARMARAOS_EXPORT").unwrap_or_default();
1136    let dir = trimmed.trim();
1137    if dir.is_empty() {
1138        return Ok(());
1139    }
1140    let dir_path = PathBuf::from(dir);
1141    std::fs::create_dir_all(&dir_path).map_err(|e| format!("export mkdir: {e}"))?;
1142    let path = dir_path.join(format!("{agent_id}_graph_export.json"));
1143    let snap = store.export_graph(agent_id)?;
1144    let v = serde_json::to_value(&snap).map_err(|e| format!("serialize: {e}"))?;
1145    std::fs::write(
1146        &path,
1147        serde_json::to_vec_pretty(&v).map_err(|e| format!("json encode: {e}"))?,
1148    )
1149    .map_err(|e| format!("write export: {e}"))?;
1150    Ok(())
1151}
1152
1153pub(crate) fn normalize_tools_for_episode(tools_invoked: &[String]) -> Vec<String> {
1154    if tools_invoked.is_empty() {
1155        return vec!["turn".to_string()];
1156    }
1157    let tags = tag_tool_names(tools_invoked);
1158    let mut seen: BTreeSet<String> = BTreeSet::new();
1159    for t in tags {
1160        if t.namespace == TagNamespace::Tool {
1161            seen.insert(t.value);
1162        }
1163    }
1164    if seen.is_empty() {
1165        vec!["turn".to_string()]
1166    } else {
1167        seen.into_iter().collect()
1168    }
1169}
1170
1171pub(crate) fn record_turn_episode(
1172    memory: &ainl_memory::GraphMemory,
1173    agent_id: &str,
1174    input: &TurnInput,
1175    tools_invoked_canonical: &[String],
1176) -> Result<Uuid, String> {
1177    let turn_id = Uuid::new_v4();
1178    let timestamp = chrono::Utc::now().timestamp();
1179    let tools = tools_invoked_canonical.to_vec();
1180    let mut node = AinlMemoryNode::new_episode(
1181        turn_id,
1182        timestamp,
1183        tools.clone(),
1184        None,
1185        input.trace_event.clone(),
1186    );
1187    node.agent_id = agent_id.to_string();
1188    if let AinlNodeType::Episode { ref mut episodic } = node.node_type {
1189        episodic.user_message = Some(input.user_message.clone());
1190        episodic.tools_invoked = tools;
1191        // Persist cognitive vitals from the host LLM completion (fail-open: all fields are Option).
1192        episodic.vitals_gate = input.vitals_gate.clone();
1193        episodic.vitals_phase = input.vitals_phase.clone();
1194        episodic.vitals_trust = input.vitals_trust;
1195    }
1196    memory.write_node(&node)?;
1197    Ok(node.id)
1198}
1199
1200#[cfg(feature = "async")]
1201#[path = "runtime_async.rs"]
1202mod runtime_async_impl;