Skip to main content

ainl_runtime/
runtime.rs

1//! Unified-graph orchestration runtime (v0.2): load, compile context, patch dispatch, record, emit, extract.
2
3use std::collections::{BTreeSet, HashMap, HashSet};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::Arc;
7use std::time::Instant;
8
9use ainl_graph_extractor::GraphExtractorTask;
10use ainl_memory::{
11    AinlMemoryNode, AinlNodeType, GraphStore, GraphValidationReport, PersonaNode, ProceduralNode,
12    RuntimeStateNode, SqliteGraphStore,
13};
14use ainl_persona::axes::default_axis_map;
15use ainl_persona::{
16    EvolutionEngine, PersonaAxis, PersonaSnapshot, RawSignal, INGEST_SCORE_EPSILON,
17};
18use ainl_semantic_tagger::infer_topic_tags;
19use ainl_semantic_tagger::tag_tool_names;
20use ainl_semantic_tagger::TagNamespace;
21use uuid::Uuid;
22
23use crate::adapters::{AdapterRegistry, GraphPatchAdapter};
24use crate::engine::{
25    AinlGraphArtifact, AinlRuntimeError, MemoryContext, PatchDispatchContext, PatchDispatchResult,
26    PatchSkipReason, TurnInput, TurnOutcome, TurnPhase, TurnResult, TurnStatus, TurnWarning,
27    EMIT_TO_EDGE,
28};
29use crate::graph_cell::{GraphCell, SqliteStoreRef};
30#[cfg(feature = "async")]
31use crate::hooks::TurnHooksAsync;
32use crate::hooks::{NoOpHooks, TurnHooks};
33use crate::RuntimeConfig;
34
35/// Orchestrates ainl-memory, persona snapshot state, and graph extraction for one agent.
36///
37/// ## Evolution writes vs ArmaraOS / openfang-runtime
38///
39/// In production ArmaraOS, **openfang-runtime**’s `GraphMemoryWriter::run_persona_evolution_pass`
40/// is the active writer of the evolution persona row ([`crate::EVOLUTION_TRAIT_NAME`]) to each
41/// agent’s `ainl_memory.db`. This struct holds its own [`GraphExtractorTask`] and
42/// [`EvolutionEngine`]. Calling [`Self::persist_evolution_snapshot`] or
43/// [`Self::evolve_persona_from_graph_signals`] **concurrently** with that pass on the **same**
44/// SQLite store is undefined (competing last-writer wins on the same persona node).
45///
46/// Prefer [`Self::with_evolution_writes_enabled(false)`] when a host embeds `AinlRuntime` alongside
47/// openfang while openfang remains the sole evolution writer. [`Self::evolution_engine_mut`] can
48/// still mutate in-memory axis state; calling [`EvolutionEngine::write_persona_node`] yourself
49/// bypasses this guard and must be avoided in that configuration.
50pub struct AinlRuntime {
51    config: RuntimeConfig,
52    memory: GraphCell,
53    extractor: GraphExtractorTask,
54    turn_count: u64,
55    last_extraction_at_turn: u64,
56    /// Current delegation depth for the active `run_turn` call chain (incremented per nested entry).
57    current_depth: Arc<AtomicU32>,
58    hooks: Box<dyn TurnHooks>,
59    /// When `false`, [`Self::persist_evolution_snapshot`] and [`Self::evolve_persona_from_graph_signals`]
60    /// return [`Err`] immediately so this runtime does not compete with another evolution writer
61    /// (e.g. openfang’s post-turn pass) on the same DB. Default: `true`.
62    evolution_writes_enabled: bool,
63    persona_cache: Option<String>,
64    /// Test hook: when set, the next scheduled extraction pass is treated as failed (`PartialSuccess`).
65    #[doc(hidden)]
66    test_force_extraction_failure: bool,
67    /// Test hook: next fitness write-back from procedural dispatch fails without touching SQLite.
68    #[doc(hidden)]
69    test_force_fitness_write_failure: bool,
70    /// Test hook: next runtime-state SQLite persist fails (non-fatal warning).
71    #[doc(hidden)]
72    test_force_runtime_state_write_failure: bool,
73    adapter_registry: AdapterRegistry,
74    /// Optional async hooks for [`Self::run_turn_async`] (see `async` feature).
75    #[cfg(feature = "async")]
76    hooks_async: Option<std::sync::Arc<dyn TurnHooksAsync>>,
77}
78
79impl AinlRuntime {
80    pub fn new(config: RuntimeConfig, store: SqliteGraphStore) -> Self {
81        let agent_id = config.agent_id.clone();
82        let memory = GraphCell::new(store);
83        let (init_turn_count, init_persona_cache, init_last_extraction_at_turn) =
84            if agent_id.is_empty() {
85                (0, None, 0)
86            } else {
87                match memory.read_runtime_state(&agent_id) {
88                    Ok(Some(state)) => {
89                        tracing::info!(
90                            agent_id = %agent_id,
91                            turn_count = state.turn_count,
92                            "restored runtime state"
93                        );
94                        let persona_cache = state
95                            .persona_snapshot_json
96                            .as_ref()
97                            .and_then(|json| serde_json::from_str::<String>(json).ok());
98                        (
99                            state.turn_count,
100                            persona_cache,
101                            state.last_extraction_at_turn,
102                        )
103                    }
104                    Ok(None) => (0, None, 0),
105                    Err(e) => {
106                        tracing::warn!(error = %e, "failed to load runtime state — starting fresh");
107                        (0, None, 0)
108                    }
109                }
110            };
111        Self {
112            extractor: GraphExtractorTask::new(&agent_id),
113            memory,
114            config,
115            turn_count: init_turn_count,
116            last_extraction_at_turn: init_last_extraction_at_turn,
117            current_depth: Arc::new(AtomicU32::new(0)),
118            hooks: Box::new(NoOpHooks),
119            evolution_writes_enabled: true,
120            persona_cache: init_persona_cache,
121            test_force_extraction_failure: false,
122            test_force_fitness_write_failure: false,
123            test_force_runtime_state_write_failure: false,
124            adapter_registry: AdapterRegistry::new(),
125            #[cfg(feature = "async")]
126            hooks_async: None,
127        }
128    }
129
130    /// Register a [`crate::PatchAdapter`] keyed by [`PatchAdapter::name`] (e.g. procedural patch label).
131    pub fn register_adapter(&mut self, adapter: impl crate::PatchAdapter + 'static) {
132        self.adapter_registry.register(adapter);
133    }
134
135    /// Install the reference [`GraphPatchAdapter`] as fallback for procedural patches without a
136    /// label-specific adapter (see [`PatchDispatchContext`]).
137    pub fn register_default_patch_adapters(&mut self) {
138        self.register_adapter(GraphPatchAdapter::new());
139    }
140
141    /// Names of currently registered patch adapters.
142    pub fn registered_adapters(&self) -> Vec<&str> {
143        self.adapter_registry.registered_names()
144    }
145
146    #[doc(hidden)]
147    #[cfg(any(test, debug_assertions))]
148    pub fn test_turn_count(&self) -> u64 {
149        self.turn_count
150    }
151
152    #[doc(hidden)]
153    #[cfg(any(test, debug_assertions))]
154    pub fn test_persona_cache(&self) -> Option<&str> {
155        self.persona_cache.as_deref()
156    }
157
158    #[doc(hidden)]
159    #[cfg(any(test, debug_assertions))]
160    pub fn test_delegation_depth(&self) -> u32 {
161        self.current_depth.load(Ordering::SeqCst)
162    }
163
164    #[doc(hidden)]
165    #[cfg(any(test, debug_assertions))]
166    pub fn test_set_delegation_depth(&mut self, depth: u32) {
167        self.current_depth.store(depth, Ordering::SeqCst);
168    }
169
170    #[doc(hidden)]
171    #[cfg(any(test, debug_assertions))]
172    pub fn test_set_force_extraction_failure(&mut self, fail: bool) {
173        self.test_force_extraction_failure = fail;
174    }
175
176    #[doc(hidden)]
177    #[cfg(any(test, debug_assertions))]
178    pub fn test_set_force_fitness_write_failure(&mut self, fail: bool) {
179        self.test_force_fitness_write_failure = fail;
180    }
181
182    /// Test hook: access the graph extractor task for per-phase error injection.
183    #[doc(hidden)]
184    #[cfg(any(test, debug_assertions))]
185    pub fn test_extractor_mut(&mut self) -> &mut GraphExtractorTask {
186        &mut self.extractor
187    }
188
189    #[doc(hidden)]
190    #[cfg(any(test, debug_assertions))]
191    pub fn test_set_force_runtime_state_write_failure(&mut self, fail: bool) {
192        self.test_force_runtime_state_write_failure = fail;
193    }
194
195    pub fn with_hooks(mut self, hooks: impl TurnHooks + 'static) -> Self {
196        self.hooks = Box::new(hooks);
197        self
198    }
199
200    /// Install async turn hooks ([`TurnHooksAsync`]) for [`Self::run_turn_async`].
201    #[cfg(feature = "async")]
202    pub fn with_hooks_async(mut self, hooks: std::sync::Arc<dyn TurnHooksAsync>) -> Self {
203        self.hooks_async = Some(hooks);
204        self
205    }
206
207    /// Set whether [`Self::persist_evolution_snapshot`] and [`Self::evolve_persona_from_graph_signals`]
208    /// may write the evolution persona row. When `false`, those methods return [`Err`]. Chaining
209    /// after [`Self::new`] is the supported way to disable writes for hosts that delegate evolution
210    /// persistence elsewhere (see struct-level docs).
211    pub fn with_evolution_writes_enabled(mut self, enabled: bool) -> Self {
212        self.evolution_writes_enabled = enabled;
213        self
214    }
215
216    /// Current evolution-write mode for this runtime instance.
217    ///
218    /// Hosts embedding `AinlRuntime` alongside OpenFang should keep this `false`
219    /// so only one writer owns the evolution persona row in SQLite.
220    pub fn evolution_writes_enabled(&self) -> bool {
221        self.evolution_writes_enabled
222    }
223
224    fn require_evolution_writes_enabled(&self) -> Result<(), String> {
225        if self.evolution_writes_enabled {
226            Ok(())
227        } else {
228            Err(
229                "ainl_runtime: evolution_writes_enabled is false — persist_evolution_snapshot and \
230                 evolve_persona_from_graph_signals are disabled so this runtime does not compete \
231                 with openfang-runtime GraphMemoryWriter::run_persona_evolution_pass on the same \
232                 ainl_memory.db"
233                    .to_string(),
234            )
235        }
236    }
237
238    /// Borrow the backing SQLite store (same connection as graph memory).
239    ///
240    /// When built with the `async` feature, this locks the in-runtime graph mutex for the lifetime
241    /// of the returned guard (see [`SqliteStoreRef`]). That mutex is [`std::sync::Mutex`] (shared
242    /// via [`std::sync::Arc`]), not `tokio::sync::Mutex`, so this helper remains usable from Tokio
243    /// worker threads for quick reads without forcing an async lock API.
244    pub fn sqlite_store(&self) -> SqliteStoreRef<'_> {
245        self.memory.sqlite_ref()
246    }
247
248    /// Borrow the persona [`EvolutionEngine`] for this runtime’s agent.
249    ///
250    /// This is the **same** `EvolutionEngine` instance held by [`GraphExtractorTask::evolution_engine`].
251    /// Scheduled [`GraphExtractorTask::run_pass`] continues to feed graph + pattern signals into it;
252    /// hosts may also call [`EvolutionEngine::ingest_signals`], [`EvolutionEngine::correction_tick`],
253    /// [`EvolutionEngine::extract_signals`], or [`EvolutionEngine::evolve`] directly, then
254    /// [`Self::persist_evolution_snapshot`] to write the [`PersonaSnapshot`] row ([`crate::EVOLUTION_TRAIT_NAME`]).
255    pub fn evolution_engine(&self) -> &EvolutionEngine {
256        &self.extractor.evolution_engine
257    }
258
259    /// Mutable access to the persona [`EvolutionEngine`] (see [`Self::evolution_engine`]).
260    ///
261    /// Direct calls to [`EvolutionEngine::write_persona_node`] bypass [`Self::evolution_writes_enabled`].
262    pub fn evolution_engine_mut(&mut self) -> &mut EvolutionEngine {
263        &mut self.extractor.evolution_engine
264    }
265
266    /// Ingest explicit [`RawSignal`]s without reading the graph (wrapper for [`EvolutionEngine::ingest_signals`]).
267    pub fn apply_evolution_signals(&mut self, signals: Vec<RawSignal>) -> usize {
268        self.extractor.evolution_engine.ingest_signals(signals)
269    }
270
271    /// Apply a host correction nudge on one axis ([`EvolutionEngine::correction_tick`]).
272    pub fn evolution_correction_tick(&mut self, axis: PersonaAxis, correction: f32) {
273        self.extractor
274            .evolution_engine
275            .correction_tick(axis, correction);
276    }
277
278    /// Snapshot current axis EMA state and persist the evolution persona bundle to the store.
279    ///
280    /// Returns [`Err`] when [`Self::evolution_writes_enabled`] is `false` (see [`Self::with_evolution_writes_enabled`]).
281    pub fn persist_evolution_snapshot(&mut self) -> Result<PersonaSnapshot, String> {
282        self.require_evolution_writes_enabled()?;
283        let snap = self.extractor.evolution_engine.snapshot();
284        self.memory.with(|m| {
285            self.extractor
286                .evolution_engine
287                .write_persona_node(m.sqlite_store(), &snap)
288        })?;
289        Ok(snap)
290    }
291
292    /// Graph-backed evolution only: extract signals from the store, ingest, write ([`EvolutionEngine::evolve`]).
293    ///
294    /// This does **not** run semantic `recurrence_count` bumps or the extractor’s `extract_pass`
295    /// heuristics — use [`GraphExtractorTask::run_pass`] for the full scheduled pipeline.
296    ///
297    /// Returns [`Err`] when [`Self::evolution_writes_enabled`] is `false` (see [`Self::with_evolution_writes_enabled`]).
298    pub fn evolve_persona_from_graph_signals(&mut self) -> Result<PersonaSnapshot, String> {
299        self.require_evolution_writes_enabled()?;
300        self.memory
301            .with(|m| self.extractor.evolution_engine.evolve(m.sqlite_store()))
302    }
303
304    /// Boot: export + validate the agent subgraph.
305    pub fn load_artifact(&self) -> Result<AinlGraphArtifact, String> {
306        self.memory
307            .with(|m| AinlGraphArtifact::load(m.sqlite_store(), &self.config.agent_id))
308    }
309
310    /// Same as [`Self::compile_memory_context_for`] with `user_message: None` (treated as empty for
311    /// semantic ranking; see [`Self::compile_memory_context_for`]).
312    pub fn compile_memory_context(&self) -> Result<MemoryContext, String> {
313        self.compile_memory_context_for(None)
314    }
315
316    /// Build [`MemoryContext`] from the live store plus current extractor axis state.
317    ///
318    /// `relevant_semantic` is ranked from this `user_message` only (`ainl-semantic-tagger` topic tags
319    /// + recurrence); `None` is treated as empty (high-recurrence fallback), not the latest episode text.
320    pub fn compile_memory_context_for(
321        &self,
322        user_message: Option<&str>,
323    ) -> Result<MemoryContext, String> {
324        if self.config.agent_id.is_empty() {
325            return Err("RuntimeConfig.agent_id must be set".to_string());
326        }
327        self.memory.with(|m| {
328            let store = m.sqlite_store();
329            let q = store.query(&self.config.agent_id);
330            let recent_episodes = q.recent_episodes(5)?;
331            let all_semantic = q.semantic_nodes()?;
332            let relevant_semantic =
333                self.relevant_semantic_nodes(user_message.unwrap_or(""), all_semantic, 10);
334            let active_patches = q.active_patches()?;
335            let persona_snapshot = persona_snapshot_if_evolved(&self.extractor);
336            Ok(MemoryContext {
337                recent_episodes,
338                relevant_semantic,
339                active_patches,
340                persona_snapshot,
341                compiled_at: chrono::Utc::now(),
342            })
343        })
344    }
345
346    /// Route `EMIT_TO` edges from an episode to hook targets (host implements [`TurnHooks::on_emit`]).
347    pub fn route_emit_edges(
348        &self,
349        episode_id: Uuid,
350        turn_output_payload: &serde_json::Value,
351    ) -> Result<(), String> {
352        self.memory.with(|m| {
353            let store = m.sqlite_store();
354            let neighbors = store
355                .query(&self.config.agent_id)
356                .neighbors(episode_id, EMIT_TO_EDGE)?;
357            for n in neighbors {
358                let target = emit_target_name(&n);
359                self.hooks.on_emit(&target, turn_output_payload);
360            }
361            Ok(())
362        })
363    }
364
365    /// Full single-turn orchestration (no LLM / no IR parse).
366    pub fn run_turn(&mut self, input: TurnInput) -> Result<TurnOutcome, AinlRuntimeError> {
367        let depth = self.current_depth.fetch_add(1, Ordering::SeqCst);
368        let cd = Arc::clone(&self.current_depth);
369        let _depth_guard = scopeguard::guard((), move |()| {
370            cd.fetch_sub(1, Ordering::SeqCst);
371        });
372
373        if depth >= self.config.max_delegation_depth {
374            return Err(AinlRuntimeError::DelegationDepthExceeded {
375                depth,
376                max: self.config.max_delegation_depth,
377            });
378        }
379
380        if !self.config.enable_graph_memory {
381            let memory_context = MemoryContext::default();
382            let result = TurnResult {
383                memory_context,
384                status: TurnStatus::GraphMemoryDisabled,
385                ..Default::default()
386            };
387            let outcome = TurnOutcome::Complete(result);
388            self.hooks.on_turn_complete(&outcome);
389            return Ok(outcome);
390        }
391
392        if self.config.agent_id.is_empty() {
393            return Err(AinlRuntimeError::Message(
394                "RuntimeConfig.agent_id must be set for run_turn".into(),
395            ));
396        }
397
398        let span = tracing::info_span!(
399            "ainl_runtime.run_turn",
400            agent_id = %self.config.agent_id,
401            turn = self.turn_count,
402            depth = input.depth,
403        );
404        let _span_enter = span.enter();
405
406        let validation: GraphValidationReport = self
407            .memory
408            .with(|m| m.sqlite_store().validate_graph(&self.config.agent_id))
409            .map_err(AinlRuntimeError::from)?;
410        if !validation.is_valid {
411            let mut msg = String::from("graph validation failed before turn");
412            for d in &validation.dangling_edge_details {
413                msg.push_str(&format!(
414                    "; {} -> {} [{}]",
415                    d.source_id, d.target_id, d.edge_type
416                ));
417            }
418            return Err(AinlRuntimeError::Message(msg));
419        }
420
421        self.hooks
422            .on_artifact_loaded(&self.config.agent_id, validation.node_count);
423
424        let mut turn_warnings: Vec<TurnWarning> = Vec::new();
425
426        let t_persona = Instant::now();
427        let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
428            Some(cached.clone())
429        } else {
430            let nodes = self
431                .memory
432                .with(|m| {
433                    m.sqlite_store()
434                        .query(&self.config.agent_id)
435                        .persona_nodes()
436                })
437                .map_err(AinlRuntimeError::from)?;
438            let compiled = compile_persona_from_nodes(&nodes).map_err(AinlRuntimeError::from)?;
439            self.persona_cache = compiled.clone();
440            compiled
441        };
442        self.hooks
443            .on_persona_compiled(persona_prompt_contribution.as_deref());
444        tracing::debug!(
445            target: "ainl_runtime",
446            duration_ms = t_persona.elapsed().as_millis() as u64,
447            has_contribution = persona_prompt_contribution.is_some(),
448            "persona_phase"
449        );
450
451        let t_memory = Instant::now();
452        let memory_context = self
453            .compile_memory_context_for(Some(&input.user_message))
454            .map_err(AinlRuntimeError::from)?;
455        self.hooks.on_memory_context_ready(&memory_context);
456        tracing::debug!(
457            target: "ainl_runtime",
458            duration_ms = t_memory.elapsed().as_millis() as u64,
459            episode_count = memory_context.recent_episodes.len(),
460            semantic_count = memory_context.relevant_semantic.len(),
461            patch_count = memory_context.active_patches.len(),
462            "memory_context"
463        );
464
465        let t_patches = Instant::now();
466        let patch_dispatch_results = if self.config.enable_graph_memory {
467            self.dispatch_patches_collect(
468                &memory_context.active_patches,
469                &input.frame,
470                &mut turn_warnings,
471            )
472        } else {
473            Vec::new()
474        };
475        for r in &patch_dispatch_results {
476            tracing::debug!(
477                target: "ainl_runtime",
478                label = %r.label,
479                dispatched = r.dispatched,
480                fitness_before = r.fitness_before,
481                fitness_after = r.fitness_after,
482                "patch_dispatch"
483            );
484        }
485        tracing::debug!(
486            target: "ainl_runtime",
487            duration_ms = t_patches.elapsed().as_millis() as u64,
488            "patch_dispatch_phase"
489        );
490
491        let dispatched_count = patch_dispatch_results
492            .iter()
493            .filter(|r| r.dispatched)
494            .count() as u32;
495        if dispatched_count >= self.config.max_steps {
496            let result = TurnResult {
497                patch_dispatch_results,
498                memory_context,
499                persona_prompt_contribution,
500                steps_executed: dispatched_count,
501                status: TurnStatus::StepLimitExceeded {
502                    steps_executed: dispatched_count,
503                },
504                ..Default::default()
505            };
506            let outcome = TurnOutcome::Complete(result);
507            self.hooks.on_turn_complete(&outcome);
508            return Ok(outcome);
509        }
510
511        let t_episode = Instant::now();
512        let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
513        let episode_id = match self
514            .memory
515            .with(|m| record_turn_episode(m, &self.config.agent_id, &input, &tools_canonical))
516        {
517            Ok(id) => id,
518            Err(e) => {
519                tracing::warn!(
520                    phase = ?TurnPhase::EpisodeWrite,
521                    error = %e,
522                    "non-fatal turn write failed — continuing"
523                );
524                turn_warnings.push(TurnWarning {
525                    phase: TurnPhase::EpisodeWrite,
526                    error: e,
527                });
528                Uuid::nil()
529            }
530        };
531        self.hooks.on_episode_recorded(episode_id);
532        tracing::debug!(
533            target: "ainl_runtime",
534            duration_ms = t_episode.elapsed().as_millis() as u64,
535            episode_id = %episode_id,
536            "episode_record"
537        );
538
539        if !episode_id.is_nil() {
540            for &tid in &input.emit_targets {
541                if let Err(e) = self.memory.with(|m| {
542                    m.sqlite_store()
543                        .insert_graph_edge_checked(episode_id, tid, EMIT_TO_EDGE)
544                }) {
545                    tracing::warn!(
546                        phase = ?TurnPhase::EpisodeWrite,
547                        error = %e,
548                        "non-fatal turn write failed — continuing"
549                    );
550                    turn_warnings.push(TurnWarning {
551                        phase: TurnPhase::EpisodeWrite,
552                        error: e,
553                    });
554                }
555            }
556        }
557
558        let emit_payload = serde_json::json!({
559            "episode_id": episode_id.to_string(),
560            "user_message": input.user_message,
561            "tools_invoked": tools_canonical,
562            "persona_contribution": persona_prompt_contribution,
563            "turn_count": self.turn_count.wrapping_add(1),
564        });
565        if let Err(e) = self.route_emit_edges(episode_id, &emit_payload) {
566            tracing::warn!(
567                phase = ?TurnPhase::EpisodeWrite,
568                error = %e,
569                "non-fatal turn write failed — continuing"
570            );
571            turn_warnings.push(TurnWarning {
572                phase: TurnPhase::EpisodeWrite,
573                error: format!("emit_routing: {e}"),
574            });
575        }
576
577        self.turn_count = self.turn_count.wrapping_add(1);
578
579        let should_extract = self.config.extraction_interval > 0
580            && self.turn_count.saturating_sub(self.last_extraction_at_turn)
581                >= self.config.extraction_interval as u64;
582
583        let t_extract = Instant::now();
584        let (extraction_report, _extraction_failed) = if should_extract {
585            let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
586
587            let res = if force_fail {
588                let e = "test_forced".to_string();
589                tracing::warn!(
590                    phase = ?TurnPhase::ExtractionPass,
591                    error = %e,
592                    "non-fatal turn write failed — continuing"
593                );
594                turn_warnings.push(TurnWarning {
595                    phase: TurnPhase::ExtractionPass,
596                    error: e,
597                });
598                tracing::debug!(
599                    target: "ainl_runtime",
600                    duration_ms = t_extract.elapsed().as_millis() as u64,
601                    signals_ingested = 0u64,
602                    skipped = false,
603                    "extraction_pass"
604                );
605                (None, true)
606            } else {
607                let report = self
608                    .memory
609                    .with(|m| self.extractor.run_pass(m.sqlite_store()));
610                if let Some(ref e) = report.extract_error {
611                    tracing::warn!(
612                        phase = ?TurnPhase::ExtractionPass,
613                        error = %e,
614                        "non-fatal turn write failed — continuing"
615                    );
616                    turn_warnings.push(TurnWarning {
617                        phase: TurnPhase::ExtractionPass,
618                        error: e.clone(),
619                    });
620                }
621                if let Some(ref e) = report.pattern_error {
622                    tracing::warn!(
623                        phase = ?TurnPhase::PatternPersistence,
624                        error = %e,
625                        "non-fatal turn write failed — continuing"
626                    );
627                    turn_warnings.push(TurnWarning {
628                        phase: TurnPhase::PatternPersistence,
629                        error: e.clone(),
630                    });
631                }
632                if let Some(ref e) = report.persona_error {
633                    tracing::warn!(
634                        phase = ?TurnPhase::PersonaEvolution,
635                        error = %e,
636                        "non-fatal turn write failed — continuing"
637                    );
638                    turn_warnings.push(TurnWarning {
639                        phase: TurnPhase::PersonaEvolution,
640                        error: e.clone(),
641                    });
642                }
643                let extraction_failed = report.has_errors();
644                if !extraction_failed {
645                    tracing::info!(
646                        agent_id = %report.agent_id,
647                        signals_extracted = report.signals_extracted,
648                        signals_applied = report.signals_applied,
649                        semantic_nodes_updated = report.semantic_nodes_updated,
650                        "ainl-graph-extractor pass completed (scheduled)"
651                    );
652                }
653                self.hooks.on_extraction_complete(&report);
654                self.persona_cache = None;
655                tracing::debug!(
656                    target: "ainl_runtime",
657                    duration_ms = t_extract.elapsed().as_millis() as u64,
658                    signals_ingested = report.signals_extracted as u64,
659                    skipped = false,
660                    "extraction_pass"
661                );
662                (Some(report), extraction_failed)
663            };
664            self.last_extraction_at_turn = self.turn_count;
665            res
666        } else {
667            tracing::debug!(
668                target: "ainl_runtime",
669                duration_ms = t_extract.elapsed().as_millis() as u64,
670                signals_ingested = 0u64,
671                skipped = true,
672                "extraction_pass"
673            );
674            (None, false)
675        };
676
677        if let Err(e) = self
678            .memory
679            .with(|m| try_export_graph_json_armaraos(m.sqlite_store(), &self.config.agent_id))
680        {
681            tracing::warn!(
682                phase = ?TurnPhase::ExportRefresh,
683                error = %e,
684                "non-fatal turn write failed — continuing"
685            );
686            turn_warnings.push(TurnWarning {
687                phase: TurnPhase::ExportRefresh,
688                error: e,
689            });
690        }
691
692        if !self.config.agent_id.is_empty() {
693            let state = RuntimeStateNode {
694                agent_id: self.config.agent_id.clone(),
695                turn_count: self.turn_count,
696                last_extraction_at_turn: self.last_extraction_at_turn,
697                persona_snapshot_json: self
698                    .persona_cache
699                    .as_ref()
700                    .and_then(|p| serde_json::to_string(p).ok()),
701                updated_at: chrono::Utc::now().timestamp(),
702            };
703            let write_res = if std::mem::take(&mut self.test_force_runtime_state_write_failure) {
704                Err("injected runtime state write failure".to_string())
705            } else {
706                self.memory.with(|m| m.write_runtime_state(&state))
707            };
708            if let Err(e) = write_res {
709                tracing::warn!(
710                    phase = ?TurnPhase::RuntimeStatePersist,
711                    error = %e,
712                    "failed to persist runtime state — cadence will reset on next restart"
713                );
714                turn_warnings.push(TurnWarning {
715                    phase: TurnPhase::RuntimeStatePersist,
716                    error: e,
717                });
718            }
719        }
720
721        // Fire vitals hook before building the outcome so hosts can act synchronously.
722        if let (Some(gate), Some(phase), Some(trust)) = (
723            input.vitals_gate.as_deref(),
724            input.vitals_phase.as_deref(),
725            input.vitals_trust,
726        ) {
727            self.hooks.on_vitals_classified(gate, phase, trust);
728        }
729
730        let result = TurnResult {
731            episode_id,
732            persona_prompt_contribution,
733            memory_context,
734            extraction_report,
735            steps_executed: dispatched_count,
736            patch_dispatch_results,
737            status: TurnStatus::Ok,
738            vitals_gate: input.vitals_gate.clone(),
739            vitals_phase: input.vitals_phase.clone(),
740            vitals_trust: input.vitals_trust,
741        };
742
743        let outcome = if turn_warnings.is_empty() {
744            TurnOutcome::Complete(result)
745        } else {
746            TurnOutcome::PartialSuccess {
747                result,
748                warnings: turn_warnings,
749            }
750        };
751
752        self.hooks.on_turn_complete(&outcome);
753        Ok(outcome)
754    }
755
756    /// Score and rank semantic nodes from `user_message` via `infer_topic_tags` (topic overlap +
757    /// recurrence tiebreaker), or high-recurrence fallback when the message is empty or yields no topic tags.
758    ///
759    /// **Vitals trust bonus (Gap L):** nodes tagged `vitals:*:pass` receive a `+0.2 * confidence`
760    /// bonus so high-trust episodes surface above zero-topic-overlap peers. Nodes tagged
761    /// `vitals:elevated` receive a `-0.1` penalty to deprioritise warn/fail episodes in recall.
762    /// This is a secondary signal — topic overlap still dominates.
763    fn relevant_semantic_nodes(
764        &self,
765        user_message: &str,
766        all_semantic: Vec<AinlMemoryNode>,
767        limit: usize,
768    ) -> Vec<AinlMemoryNode> {
769        let user_tags = infer_topic_tags(user_message);
770        let user_topics: HashSet<String> = user_tags
771            .iter()
772            .filter(|t| t.namespace == TagNamespace::Topic)
773            .map(|t| t.value.to_lowercase())
774            .collect();
775
776        if user_message.trim().is_empty() || user_topics.is_empty() {
777            return fallback_high_recurrence_semantic(all_semantic, limit);
778        }
779
780        let mut scored: Vec<(f32, u32, AinlMemoryNode)> = Vec::new();
781        for n in all_semantic {
782            let (score, rec) = match &n.node_type {
783                AinlNodeType::Semantic { semantic } => {
784                    let mut s = 0f32;
785                    if let Some(cluster) = &semantic.topic_cluster {
786                        for slug in cluster
787                            .split([',', ';'])
788                            .map(|s| s.trim().to_lowercase())
789                            .filter(|s| !s.is_empty())
790                        {
791                            if user_topics.contains(&slug) {
792                                s += 1.0;
793                            }
794                        }
795                    }
796                    if s == 0.0 {
797                        for tag in &semantic.tags {
798                            let tl = tag.to_lowercase();
799                            if let Some(rest) = tl.strip_prefix("topic:") {
800                                let slug = rest.trim().to_lowercase();
801                                if user_topics.contains(&slug) {
802                                    s = 0.5;
803                                    break;
804                                }
805                            }
806                        }
807                    }
808                    // Vitals trust bonus: `vitals:*:pass` → +0.2 * node confidence.
809                    // `vitals:elevated` → -0.1 penalty (warn/fail gate).
810                    // Uses the node's own confidence as a proxy for vitals trust since
811                    // SemanticNode.tags is Vec<String> without per-tag confidence.
812                    let confidence = semantic.confidence;
813                    for tag in &semantic.tags {
814                        let tl = tag.to_lowercase();
815                        if tl.starts_with("vitals:") {
816                            if tl.ends_with(":pass") {
817                                s += 0.2 * confidence;
818                            } else if tl == "vitals:elevated" {
819                                s -= 0.1;
820                            }
821                        }
822                    }
823                    (s, semantic.recurrence_count)
824                }
825                _ => (0.0, 0),
826            };
827            scored.push((score, rec, n));
828        }
829
830        scored.sort_by(|a, b| {
831            b.0.partial_cmp(&a.0)
832                .unwrap_or(std::cmp::Ordering::Equal)
833                .then_with(|| b.1.cmp(&a.1))
834        });
835        scored.into_iter().take(limit).map(|t| t.2).collect()
836    }
837
838    pub fn dispatch_patches(
839        &mut self,
840        patches: &[AinlMemoryNode],
841        frame: &HashMap<String, serde_json::Value>,
842    ) -> Vec<PatchDispatchResult> {
843        let mut w = Vec::new();
844        self.dispatch_patches_collect(patches, frame, &mut w)
845    }
846
847    fn dispatch_patches_collect(
848        &mut self,
849        patches: &[AinlMemoryNode],
850        frame: &HashMap<String, serde_json::Value>,
851        turn_warnings: &mut Vec<TurnWarning>,
852    ) -> Vec<PatchDispatchResult> {
853        let mut out = Vec::new();
854        for node in patches {
855            let res = self.dispatch_one_patch(node, frame);
856            if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
857                tracing::warn!(
858                    phase = ?TurnPhase::FitnessWriteBack,
859                    error = %e,
860                    "non-fatal turn write failed — continuing"
861                );
862                turn_warnings.push(TurnWarning {
863                    phase: TurnPhase::FitnessWriteBack,
864                    error: format!("{}: {}", res.label, e),
865                });
866            }
867            out.push(res);
868        }
869        out
870    }
871
872    fn dispatch_one_patch(
873        &mut self,
874        node: &AinlMemoryNode,
875        frame: &HashMap<String, serde_json::Value>,
876    ) -> PatchDispatchResult {
877        let label_default = String::new();
878        let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
879            AinlNodeType::Procedural { procedural } => (
880                procedural_label(procedural),
881                procedural.patch_version,
882                procedural.retired,
883                procedural.declared_reads.clone(),
884                procedural.fitness,
885            ),
886            _ => {
887                return PatchDispatchResult {
888                    label: label_default,
889                    patch_version: 0,
890                    fitness_before: 0.0,
891                    fitness_after: 0.0,
892                    dispatched: false,
893                    skip_reason: Some(PatchSkipReason::NotProcedural),
894                    adapter_output: None,
895                    adapter_name: None,
896                };
897            }
898        };
899
900        if pv == 0 {
901            return PatchDispatchResult {
902                label: label_src,
903                patch_version: pv,
904                fitness_before: fitness_opt.unwrap_or(0.5),
905                fitness_after: fitness_opt.unwrap_or(0.5),
906                dispatched: false,
907                skip_reason: Some(PatchSkipReason::ZeroVersion),
908                adapter_output: None,
909                adapter_name: None,
910            };
911        }
912        if retired {
913            return PatchDispatchResult {
914                label: label_src.clone(),
915                patch_version: pv,
916                fitness_before: fitness_opt.unwrap_or(0.5),
917                fitness_after: fitness_opt.unwrap_or(0.5),
918                dispatched: false,
919                skip_reason: Some(PatchSkipReason::Retired),
920                adapter_output: None,
921                adapter_name: None,
922            };
923        }
924        for key in &reads {
925            if !frame.contains_key(key) {
926                return PatchDispatchResult {
927                    label: label_src.clone(),
928                    patch_version: pv,
929                    fitness_before: fitness_opt.unwrap_or(0.5),
930                    fitness_after: fitness_opt.unwrap_or(0.5),
931                    dispatched: false,
932                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
933                    adapter_output: None,
934                    adapter_name: None,
935                };
936            }
937        }
938
939        let patch_label = label_src.clone();
940        let adapter_key = patch_label.as_str();
941        let ctx = PatchDispatchContext {
942            patch_label: adapter_key,
943            node,
944            frame,
945        };
946        let (adapter_output, adapter_name) = if let Some(adapter) = self
947            .adapter_registry
948            .get(adapter_key)
949            .or_else(|| self.adapter_registry.get(GraphPatchAdapter::NAME))
950        {
951            let aname = adapter.name().to_string();
952            match adapter.execute_patch(&ctx) {
953                Ok(output) => {
954                    tracing::debug!(
955                        label = %patch_label,
956                        adapter = %aname,
957                        "adapter executed patch"
958                    );
959                    (Some(output), Some(aname))
960                }
961                Err(e) => {
962                    tracing::warn!(
963                        label = %patch_label,
964                        adapter = %aname,
965                        error = %e,
966                        "adapter execution failed — continuing as metadata dispatch"
967                    );
968                    (None, Some(aname))
969                }
970            }
971        } else {
972            (None, None)
973        };
974
975        let fitness_before = fitness_opt.unwrap_or(0.5);
976        let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
977
978        let updated = match self.memory.with(|m| {
979            let store = m.sqlite_store();
980            store.read_node(node.id)
981        }) {
982            Ok(Some(mut n)) => {
983                if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
984                    procedural.fitness = Some(fitness_after);
985                }
986                n
987            }
988            Ok(None) => {
989                return PatchDispatchResult {
990                    label: label_src,
991                    patch_version: pv,
992                    fitness_before,
993                    fitness_after: fitness_before,
994                    dispatched: false,
995                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead("node_row".into())),
996                    adapter_output,
997                    adapter_name,
998                };
999            }
1000            Err(e) => {
1001                return PatchDispatchResult {
1002                    label: label_src,
1003                    patch_version: pv,
1004                    fitness_before,
1005                    fitness_after: fitness_before,
1006                    dispatched: false,
1007                    skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1008                    adapter_output,
1009                    adapter_name,
1010                };
1011            }
1012        };
1013
1014        if self.test_force_fitness_write_failure {
1015            self.test_force_fitness_write_failure = false;
1016            let e = "injected fitness write failure".to_string();
1017            return PatchDispatchResult {
1018                label: label_src.clone(),
1019                patch_version: pv,
1020                fitness_before,
1021                fitness_after: fitness_before,
1022                dispatched: false,
1023                skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1024                adapter_output,
1025                adapter_name,
1026            };
1027        }
1028
1029        if let Err(e) = self.memory.with(|m| m.write_node(&updated)) {
1030            return PatchDispatchResult {
1031                label: label_src.clone(),
1032                patch_version: pv,
1033                fitness_before,
1034                fitness_after: fitness_before,
1035                dispatched: false,
1036                skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1037                adapter_output,
1038                adapter_name,
1039            };
1040        }
1041
1042        self.hooks
1043            .on_patch_dispatched(label_src.as_str(), fitness_after);
1044
1045        PatchDispatchResult {
1046            label: label_src,
1047            patch_version: pv,
1048            fitness_before,
1049            fitness_after,
1050            dispatched: true,
1051            skip_reason: None,
1052            adapter_output,
1053            adapter_name,
1054        }
1055    }
1056}
1057
1058pub(crate) fn emit_target_name(n: &AinlMemoryNode) -> String {
1059    match &n.node_type {
1060        AinlNodeType::Persona { persona } => persona.trait_name.clone(),
1061        AinlNodeType::Procedural { procedural } => procedural_label(procedural),
1062        AinlNodeType::Semantic { semantic } => semantic.fact.chars().take(64).collect(),
1063        AinlNodeType::Episode { episodic } => episodic.turn_id.to_string(),
1064        AinlNodeType::RuntimeState { runtime_state } => {
1065            format!("runtime_state:{}", runtime_state.agent_id)
1066        }
1067    }
1068}
1069
1070pub(crate) fn procedural_label(p: &ProceduralNode) -> String {
1071    if !p.label.is_empty() {
1072        p.label.clone()
1073    } else {
1074        p.pattern_name.clone()
1075    }
1076}
1077
1078pub(crate) fn fallback_high_recurrence_semantic(
1079    all: Vec<AinlMemoryNode>,
1080    limit: usize,
1081) -> Vec<AinlMemoryNode> {
1082    let mut v: Vec<_> = all
1083        .into_iter()
1084        .filter(|n| {
1085            matches!(&n.node_type, AinlNodeType::Semantic { semantic } if semantic.recurrence_count >= 2)
1086        })
1087        .collect();
1088    v.sort_by(|a, b| {
1089        let ra = match &a.node_type {
1090            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1091            _ => 0,
1092        };
1093        let rb = match &b.node_type {
1094            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1095            _ => 0,
1096        };
1097        rb.cmp(&ra)
1098    });
1099    v.into_iter().take(limit).collect()
1100}
1101
1102pub(crate) fn persona_snapshot_if_evolved(
1103    extractor: &GraphExtractorTask,
1104) -> Option<ainl_persona::PersonaSnapshot> {
1105    let snap = extractor.evolution_engine.snapshot();
1106    let defaults = default_axis_map(0.5);
1107    for axis in PersonaAxis::ALL {
1108        let s = snap.axes.get(&axis).map(|a| a.score).unwrap_or(0.5);
1109        let d = defaults.get(&axis).map(|a| a.score).unwrap_or(0.5);
1110        if (s - d).abs() > INGEST_SCORE_EPSILON {
1111            return Some(snap);
1112        }
1113    }
1114    None
1115}
1116
1117pub(crate) fn compile_persona_from_nodes(
1118    nodes: &[AinlMemoryNode],
1119) -> Result<Option<String>, String> {
1120    if nodes.is_empty() {
1121        return Ok(None);
1122    }
1123    let mut lines = Vec::new();
1124    for n in nodes {
1125        if let AinlNodeType::Persona { persona } = &n.node_type {
1126            lines.push(format_persona_line(persona));
1127        }
1128    }
1129    if lines.is_empty() {
1130        Ok(None)
1131    } else {
1132        Ok(Some(lines.join("\n")))
1133    }
1134}
1135
1136fn format_persona_line(p: &PersonaNode) -> String {
1137    format!(
1138        "- {} (strength {:.2}, layer {:?}, source {:?})",
1139        p.trait_name, p.strength, p.layer, p.source
1140    )
1141}
1142
1143/// Canonical tool names for episodic storage: [`tag_tool_names`] → `TagNamespace::Tool` values,
1144/// deduplicated and sorted (lexicographic). Empty input yields `["turn"]` (same sentinel as before).
1145/// Refresh `{AINL_GRAPH_MEMORY_ARMARAOS_EXPORT}/{agent_id}_graph_export.json` when the env var is set.
1146pub(crate) fn try_export_graph_json_armaraos(
1147    store: &SqliteGraphStore,
1148    agent_id: &str,
1149) -> Result<(), String> {
1150    let trimmed = std::env::var("AINL_GRAPH_MEMORY_ARMARAOS_EXPORT").unwrap_or_default();
1151    let dir = trimmed.trim();
1152    if dir.is_empty() {
1153        return Ok(());
1154    }
1155    let dir_path = PathBuf::from(dir);
1156    std::fs::create_dir_all(&dir_path).map_err(|e| format!("export mkdir: {e}"))?;
1157    let path = dir_path.join(format!("{agent_id}_graph_export.json"));
1158    let snap = store.export_graph(agent_id)?;
1159    let v = serde_json::to_value(&snap).map_err(|e| format!("serialize: {e}"))?;
1160    std::fs::write(
1161        &path,
1162        serde_json::to_vec_pretty(&v).map_err(|e| format!("json encode: {e}"))?,
1163    )
1164    .map_err(|e| format!("write export: {e}"))?;
1165    Ok(())
1166}
1167
1168pub(crate) fn normalize_tools_for_episode(tools_invoked: &[String]) -> Vec<String> {
1169    if tools_invoked.is_empty() {
1170        return vec!["turn".to_string()];
1171    }
1172    let tags = tag_tool_names(tools_invoked);
1173    let mut seen: BTreeSet<String> = BTreeSet::new();
1174    for t in tags {
1175        if t.namespace == TagNamespace::Tool {
1176            seen.insert(t.value);
1177        }
1178    }
1179    if seen.is_empty() {
1180        vec!["turn".to_string()]
1181    } else {
1182        seen.into_iter().collect()
1183    }
1184}
1185
1186pub(crate) fn record_turn_episode(
1187    memory: &ainl_memory::GraphMemory,
1188    agent_id: &str,
1189    input: &TurnInput,
1190    tools_invoked_canonical: &[String],
1191) -> Result<Uuid, String> {
1192    let turn_id = Uuid::new_v4();
1193    let timestamp = chrono::Utc::now().timestamp();
1194    let tools = tools_invoked_canonical.to_vec();
1195    let mut node = AinlMemoryNode::new_episode(
1196        turn_id,
1197        timestamp,
1198        tools.clone(),
1199        None,
1200        input.trace_event.clone(),
1201    );
1202    node.agent_id = agent_id.to_string();
1203    if let AinlNodeType::Episode { ref mut episodic } = node.node_type {
1204        episodic.user_message = Some(input.user_message.clone());
1205        episodic.tools_invoked = tools;
1206        // Persist cognitive vitals from the host LLM completion (fail-open: all fields are Option).
1207        episodic.vitals_gate = input.vitals_gate.clone();
1208        episodic.vitals_phase = input.vitals_phase.clone();
1209        episodic.vitals_trust = input.vitals_trust;
1210    }
1211    memory.write_node(&node)?;
1212    Ok(node.id)
1213}
1214
1215#[cfg(feature = "async")]
1216#[path = "runtime_async.rs"]
1217mod runtime_async_impl;