Skip to main content

ainl_runtime/
runtime.rs

1//! Unified-graph orchestration runtime (v0.2): load, compile context, patch dispatch, record, emit, extract.
2
3use std::collections::{BTreeSet, HashMap, HashSet};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::Arc;
7use std::time::Instant;
8
9use ainl_contracts::{TrajectoryOutcome, TrajectoryStep};
10use ainl_graph_extractor::GraphExtractorTask;
11use ainl_memory::{
12    AinlMemoryNode, AinlNodeType, GraphStore, GraphValidationReport, PersonaNode, ProceduralNode,
13    RuntimeStateNode, SqliteGraphStore,
14};
15use ainl_persona::axes::default_axis_map;
16use ainl_persona::{
17    EvolutionEngine, PersonaAxis, PersonaSnapshot, RawSignal, INGEST_SCORE_EPSILON,
18};
19use ainl_semantic_tagger::infer_topic_tags;
20use ainl_semantic_tagger::tag_tool_names;
21use ainl_semantic_tagger::TagNamespace;
22use uuid::Uuid;
23
24use crate::adapters::{AdapterRegistry, GraphPatchAdapter};
25use crate::engine::{
26    AinlGraphArtifact, AinlRuntimeError, MemoryContext, PatchDispatchContext, PatchDispatchResult,
27    PatchSkipReason, TurnInput, TurnOutcome, TurnPhase, TurnResult, TurnStatus, TurnWarning,
28    EMIT_TO_EDGE,
29};
30use crate::graph_cell::{GraphCell, SqliteStoreRef};
31#[cfg(feature = "async")]
32use crate::hooks::TurnHooksAsync;
33use crate::hooks::{NoOpHooks, TurnHooks};
34use crate::RuntimeConfig;
35
36/// Orchestrates ainl-memory, persona snapshot state, and graph extraction for one agent.
37///
38/// ## Evolution writes vs ArmaraOS / openfang-runtime
39///
40/// In production ArmaraOS, **openfang-runtime**’s `GraphMemoryWriter::run_persona_evolution_pass`
41/// is the active writer of the evolution persona row ([`crate::EVOLUTION_TRAIT_NAME`]) to each
42/// agent’s `ainl_memory.db`. This struct holds its own [`GraphExtractorTask`] and
43/// [`EvolutionEngine`]. Calling [`Self::persist_evolution_snapshot`] or
44/// [`Self::evolve_persona_from_graph_signals`] **concurrently** with that pass on the **same**
45/// SQLite store is undefined (competing last-writer wins on the same persona node).
46///
47/// Prefer [`Self::with_evolution_writes_enabled(false)`] when a host embeds `AinlRuntime` alongside
48/// openfang while openfang remains the sole evolution writer. [`Self::evolution_engine_mut`] can
49/// still mutate in-memory axis state; calling [`EvolutionEngine::write_persona_node`] yourself
50/// bypasses this guard and must be avoided in that configuration.
51pub struct AinlRuntime {
52    config: RuntimeConfig,
53    memory: GraphCell,
54    extractor: GraphExtractorTask,
55    turn_count: u64,
56    last_extraction_at_turn: u64,
57    /// Current delegation depth for the active `run_turn` call chain (incremented per nested entry).
58    current_depth: Arc<AtomicU32>,
59    hooks: Box<dyn TurnHooks>,
60    /// When `false`, [`Self::persist_evolution_snapshot`] and [`Self::evolve_persona_from_graph_signals`]
61    /// return [`Err`] immediately so this runtime does not compete with another evolution writer
62    /// (e.g. openfang’s post-turn pass) on the same DB. Default: `true`.
63    evolution_writes_enabled: bool,
64    persona_cache: Option<String>,
65    /// Test hook: when set, the next scheduled extraction pass is treated as failed (`PartialSuccess`).
66    #[doc(hidden)]
67    test_force_extraction_failure: bool,
68    /// Test hook: next fitness write-back from procedural dispatch fails without touching SQLite.
69    #[doc(hidden)]
70    test_force_fitness_write_failure: bool,
71    /// Test hook: next runtime-state SQLite persist fails (non-fatal warning).
72    #[doc(hidden)]
73    test_force_runtime_state_write_failure: bool,
74    adapter_registry: AdapterRegistry,
75    /// Optional async hooks for [`Self::run_turn_async`] (see `async` feature).
76    #[cfg(feature = "async")]
77    hooks_async: Option<std::sync::Arc<dyn TurnHooksAsync>>,
78}
79
80impl AinlRuntime {
81    pub fn new(config: RuntimeConfig, store: SqliteGraphStore) -> Self {
82        let agent_id = config.agent_id.clone();
83        let memory = GraphCell::new(store);
84        let (init_turn_count, init_persona_cache, init_last_extraction_at_turn) =
85            if agent_id.is_empty() {
86                (0, None, 0)
87            } else {
88                match memory.read_runtime_state(&agent_id) {
89                    Ok(Some(state)) => {
90                        tracing::info!(
91                            agent_id = %agent_id,
92                            turn_count = state.turn_count,
93                            "restored runtime state"
94                        );
95                        let persona_cache = state
96                            .persona_snapshot_json
97                            .as_ref()
98                            .and_then(|json| serde_json::from_str::<String>(json).ok());
99                        (
100                            state.turn_count,
101                            persona_cache,
102                            state.last_extraction_at_turn,
103                        )
104                    }
105                    Ok(None) => (0, None, 0),
106                    Err(e) => {
107                        tracing::warn!(error = %e, "failed to load runtime state — starting fresh");
108                        (0, None, 0)
109                    }
110                }
111            };
112        Self {
113            extractor: GraphExtractorTask::new(&agent_id),
114            memory,
115            config,
116            turn_count: init_turn_count,
117            last_extraction_at_turn: init_last_extraction_at_turn,
118            current_depth: Arc::new(AtomicU32::new(0)),
119            hooks: Box::new(NoOpHooks),
120            evolution_writes_enabled: true,
121            persona_cache: init_persona_cache,
122            test_force_extraction_failure: false,
123            test_force_fitness_write_failure: false,
124            test_force_runtime_state_write_failure: false,
125            adapter_registry: AdapterRegistry::new(),
126            #[cfg(feature = "async")]
127            hooks_async: None,
128        }
129    }
130
131    /// Register a [`crate::PatchAdapter`] keyed by [`PatchAdapter::name`] (e.g. procedural patch label).
132    pub fn register_adapter(&mut self, adapter: impl crate::PatchAdapter + 'static) {
133        self.adapter_registry.register(adapter);
134    }
135
136    /// Install the reference [`GraphPatchAdapter`] as fallback for procedural patches without a
137    /// label-specific adapter (see [`PatchDispatchContext`]).
138    pub fn register_default_patch_adapters(&mut self) {
139        self.register_adapter(GraphPatchAdapter::new());
140    }
141
142    /// Names of currently registered patch adapters.
143    pub fn registered_adapters(&self) -> Vec<&str> {
144        self.adapter_registry.registered_names()
145    }
146
147    #[doc(hidden)]
148    #[cfg(any(test, debug_assertions))]
149    pub fn test_turn_count(&self) -> u64 {
150        self.turn_count
151    }
152
153    #[doc(hidden)]
154    #[cfg(any(test, debug_assertions))]
155    pub fn test_persona_cache(&self) -> Option<&str> {
156        self.persona_cache.as_deref()
157    }
158
159    #[doc(hidden)]
160    #[cfg(any(test, debug_assertions))]
161    pub fn test_delegation_depth(&self) -> u32 {
162        self.current_depth.load(Ordering::SeqCst)
163    }
164
165    #[doc(hidden)]
166    #[cfg(any(test, debug_assertions))]
167    pub fn test_set_delegation_depth(&mut self, depth: u32) {
168        self.current_depth.store(depth, Ordering::SeqCst);
169    }
170
171    #[doc(hidden)]
172    #[cfg(any(test, debug_assertions))]
173    pub fn test_set_force_extraction_failure(&mut self, fail: bool) {
174        self.test_force_extraction_failure = fail;
175    }
176
177    #[doc(hidden)]
178    #[cfg(any(test, debug_assertions))]
179    pub fn test_set_force_fitness_write_failure(&mut self, fail: bool) {
180        self.test_force_fitness_write_failure = fail;
181    }
182
183    /// Test hook: access the graph extractor task for per-phase error injection.
184    #[doc(hidden)]
185    #[cfg(any(test, debug_assertions))]
186    pub fn test_extractor_mut(&mut self) -> &mut GraphExtractorTask {
187        &mut self.extractor
188    }
189
190    #[doc(hidden)]
191    #[cfg(any(test, debug_assertions))]
192    pub fn test_set_force_runtime_state_write_failure(&mut self, fail: bool) {
193        self.test_force_runtime_state_write_failure = fail;
194    }
195
196    pub fn with_hooks(mut self, hooks: impl TurnHooks + 'static) -> Self {
197        self.hooks = Box::new(hooks);
198        self
199    }
200
201    /// Install async turn hooks ([`TurnHooksAsync`]) for [`Self::run_turn_async`].
202    #[cfg(feature = "async")]
203    pub fn with_hooks_async(mut self, hooks: std::sync::Arc<dyn TurnHooksAsync>) -> Self {
204        self.hooks_async = Some(hooks);
205        self
206    }
207
208    /// Set whether [`Self::persist_evolution_snapshot`] and [`Self::evolve_persona_from_graph_signals`]
209    /// may write the evolution persona row. When `false`, those methods return [`Err`]. Chaining
210    /// after [`Self::new`] is the supported way to disable writes for hosts that delegate evolution
211    /// persistence elsewhere (see struct-level docs).
212    pub fn with_evolution_writes_enabled(mut self, enabled: bool) -> Self {
213        self.evolution_writes_enabled = enabled;
214        self
215    }
216
217    /// Current evolution-write mode for this runtime instance.
218    ///
219    /// Hosts embedding `AinlRuntime` alongside OpenFang should keep this `false`
220    /// so only one writer owns the evolution persona row in SQLite.
221    pub fn evolution_writes_enabled(&self) -> bool {
222        self.evolution_writes_enabled
223    }
224
225    fn require_evolution_writes_enabled(&self) -> Result<(), String> {
226        if self.evolution_writes_enabled {
227            Ok(())
228        } else {
229            Err(
230                "ainl_runtime: evolution_writes_enabled is false — persist_evolution_snapshot and \
231                 evolve_persona_from_graph_signals are disabled so this runtime does not compete \
232                 with openfang-runtime GraphMemoryWriter::run_persona_evolution_pass on the same \
233                 ainl_memory.db"
234                    .to_string(),
235            )
236        }
237    }
238
239    /// Borrow the backing SQLite store (same connection as graph memory).
240    ///
241    /// When built with the `async` feature, this locks the in-runtime graph mutex for the lifetime
242    /// of the returned guard (see [`SqliteStoreRef`]). That mutex is [`std::sync::Mutex`] (shared
243    /// via [`std::sync::Arc`]), not `tokio::sync::Mutex`, so this helper remains usable from Tokio
244    /// worker threads for quick reads without forcing an async lock API.
245    pub fn sqlite_store(&self) -> SqliteStoreRef<'_> {
246        self.memory.sqlite_ref()
247    }
248
249    /// Borrow the persona [`EvolutionEngine`] for this runtime’s agent.
250    ///
251    /// This is the **same** `EvolutionEngine` instance held by [`GraphExtractorTask::evolution_engine`].
252    /// Scheduled [`GraphExtractorTask::run_pass`] continues to feed graph + pattern signals into it;
253    /// hosts may also call [`EvolutionEngine::ingest_signals`], [`EvolutionEngine::correction_tick`],
254    /// [`EvolutionEngine::extract_signals`], or [`EvolutionEngine::evolve`] directly, then
255    /// [`Self::persist_evolution_snapshot`] to write the [`PersonaSnapshot`] row ([`crate::EVOLUTION_TRAIT_NAME`]).
256    pub fn evolution_engine(&self) -> &EvolutionEngine {
257        &self.extractor.evolution_engine
258    }
259
260    /// Mutable access to the persona [`EvolutionEngine`] (see [`Self::evolution_engine`]).
261    ///
262    /// Direct calls to [`EvolutionEngine::write_persona_node`] bypass [`Self::evolution_writes_enabled`].
263    pub fn evolution_engine_mut(&mut self) -> &mut EvolutionEngine {
264        &mut self.extractor.evolution_engine
265    }
266
267    /// Ingest explicit [`RawSignal`]s without reading the graph (wrapper for [`EvolutionEngine::ingest_signals`]).
268    pub fn apply_evolution_signals(&mut self, signals: Vec<RawSignal>) -> usize {
269        self.extractor.evolution_engine.ingest_signals(signals)
270    }
271
272    /// Apply a host correction nudge on one axis ([`EvolutionEngine::correction_tick`]).
273    pub fn evolution_correction_tick(&mut self, axis: PersonaAxis, correction: f32) {
274        self.extractor
275            .evolution_engine
276            .correction_tick(axis, correction);
277    }
278
279    /// Snapshot current axis EMA state and persist the evolution persona bundle to the store.
280    ///
281    /// Returns [`Err`] when [`Self::evolution_writes_enabled`] is `false` (see [`Self::with_evolution_writes_enabled`]).
282    pub fn persist_evolution_snapshot(&mut self) -> Result<PersonaSnapshot, String> {
283        self.require_evolution_writes_enabled()?;
284        let snap = self.extractor.evolution_engine.snapshot();
285        self.memory.with(|m| {
286            self.extractor
287                .evolution_engine
288                .write_persona_node(m.sqlite_store(), &snap)
289        })?;
290        Ok(snap)
291    }
292
293    /// Graph-backed evolution only: extract signals from the store, ingest, write ([`EvolutionEngine::evolve`]).
294    ///
295    /// This does **not** run semantic `recurrence_count` bumps or the extractor’s `extract_pass`
296    /// heuristics — use [`GraphExtractorTask::run_pass`] for the full scheduled pipeline.
297    ///
298    /// Returns [`Err`] when [`Self::evolution_writes_enabled`] is `false` (see [`Self::with_evolution_writes_enabled`]).
299    pub fn evolve_persona_from_graph_signals(&mut self) -> Result<PersonaSnapshot, String> {
300        self.require_evolution_writes_enabled()?;
301        self.memory
302            .with(|m| self.extractor.evolution_engine.evolve(m.sqlite_store()))
303    }
304
305    /// Boot: export + validate the agent subgraph.
306    pub fn load_artifact(&self) -> Result<AinlGraphArtifact, String> {
307        self.memory
308            .with(|m| AinlGraphArtifact::load(m.sqlite_store(), &self.config.agent_id))
309    }
310
311    /// Same as [`Self::compile_memory_context_for`] with `user_message: None` (treated as empty for
312    /// semantic ranking; see [`Self::compile_memory_context_for`]).
313    pub fn compile_memory_context(&self) -> Result<MemoryContext, String> {
314        self.compile_memory_context_for(None)
315    }
316
317    /// Build [`MemoryContext`] from the live store plus current extractor axis state.
318    ///
319    /// `relevant_semantic` is ranked from this `user_message` only (`ainl-semantic-tagger` topic tags
320    /// + recurrence); `None` is treated as empty (high-recurrence fallback), not the latest episode text.
321    pub fn compile_memory_context_for(
322        &self,
323        user_message: Option<&str>,
324    ) -> Result<MemoryContext, String> {
325        if self.config.agent_id.is_empty() {
326            return Err("RuntimeConfig.agent_id must be set".to_string());
327        }
328        self.memory.with(|m| {
329            let store = m.sqlite_store();
330            let q = store.query(&self.config.agent_id);
331            let recent_episodes = q.recent_episodes(5)?;
332            let all_semantic = q.semantic_nodes()?;
333            let relevant_semantic =
334                self.relevant_semantic_nodes(user_message.unwrap_or(""), all_semantic, 10);
335            let active_patches = q.active_patches()?;
336            let persona_snapshot = persona_snapshot_if_evolved(&self.extractor);
337            Ok(MemoryContext {
338                recent_episodes,
339                relevant_semantic,
340                active_patches,
341                persona_snapshot,
342                compiled_at: chrono::Utc::now(),
343            })
344        })
345    }
346
347    /// Route `EMIT_TO` edges from an episode to hook targets (host implements [`TurnHooks::on_emit`]).
348    pub fn route_emit_edges(
349        &self,
350        episode_id: Uuid,
351        turn_output_payload: &serde_json::Value,
352    ) -> Result<(), String> {
353        self.memory.with(|m| {
354            let store = m.sqlite_store();
355            let neighbors = store
356                .query(&self.config.agent_id)
357                .neighbors(episode_id, EMIT_TO_EDGE)?;
358            for n in neighbors {
359                let target = emit_target_name(&n);
360                self.hooks.on_emit(&target, turn_output_payload);
361            }
362            Ok(())
363        })
364    }
365
366    /// Full single-turn orchestration (no LLM / no IR parse).
367    pub fn run_turn(&mut self, input: TurnInput) -> Result<TurnOutcome, AinlRuntimeError> {
368        let depth = self.current_depth.fetch_add(1, Ordering::SeqCst);
369        let cd = Arc::clone(&self.current_depth);
370        let _depth_guard = scopeguard::guard((), move |()| {
371            cd.fetch_sub(1, Ordering::SeqCst);
372        });
373
374        if depth >= self.config.max_delegation_depth {
375            return Err(AinlRuntimeError::DelegationDepthExceeded {
376                depth,
377                max: self.config.max_delegation_depth,
378            });
379        }
380
381        if !self.config.enable_graph_memory {
382            let memory_context = MemoryContext::default();
383            let result = TurnResult {
384                memory_context,
385                status: TurnStatus::GraphMemoryDisabled,
386                ..Default::default()
387            };
388            let outcome = TurnOutcome::Complete(result);
389            self.hooks.on_turn_complete(&outcome);
390            return Ok(outcome);
391        }
392
393        if self.config.agent_id.is_empty() {
394            return Err(AinlRuntimeError::Message(
395                "RuntimeConfig.agent_id must be set for run_turn".into(),
396            ));
397        }
398
399        let span = tracing::info_span!(
400            "ainl_runtime.run_turn",
401            agent_id = %self.config.agent_id,
402            turn = self.turn_count,
403            depth = input.depth,
404        );
405        let _span_enter = span.enter();
406
407        let validation: GraphValidationReport = self
408            .memory
409            .with(|m| m.sqlite_store().validate_graph(&self.config.agent_id))
410            .map_err(AinlRuntimeError::from)?;
411        if !validation.is_valid {
412            let mut msg = String::from("graph validation failed before turn");
413            for d in &validation.dangling_edge_details {
414                msg.push_str(&format!(
415                    "; {} -> {} [{}]",
416                    d.source_id, d.target_id, d.edge_type
417                ));
418            }
419            return Err(AinlRuntimeError::Message(msg));
420        }
421
422        self.hooks
423            .on_artifact_loaded(&self.config.agent_id, validation.node_count);
424
425        let mut turn_warnings: Vec<TurnWarning> = Vec::new();
426
427        let t_persona = Instant::now();
428        let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
429            Some(cached.clone())
430        } else {
431            let nodes = self
432                .memory
433                .with(|m| {
434                    m.sqlite_store()
435                        .query(&self.config.agent_id)
436                        .persona_nodes()
437                })
438                .map_err(AinlRuntimeError::from)?;
439            let compiled = compile_persona_from_nodes(&nodes).map_err(AinlRuntimeError::from)?;
440            self.persona_cache = compiled.clone();
441            compiled
442        };
443        self.hooks
444            .on_persona_compiled(persona_prompt_contribution.as_deref());
445        tracing::debug!(
446            target: "ainl_runtime",
447            duration_ms = t_persona.elapsed().as_millis() as u64,
448            has_contribution = persona_prompt_contribution.is_some(),
449            "persona_phase"
450        );
451
452        let t_memory = Instant::now();
453        let memory_context = self
454            .compile_memory_context_for(Some(&input.user_message))
455            .map_err(AinlRuntimeError::from)?;
456        self.hooks.on_memory_context_ready(&memory_context);
457        tracing::debug!(
458            target: "ainl_runtime",
459            duration_ms = t_memory.elapsed().as_millis() as u64,
460            episode_count = memory_context.recent_episodes.len(),
461            semantic_count = memory_context.relevant_semantic.len(),
462            patch_count = memory_context.active_patches.len(),
463            "memory_context"
464        );
465
466        let t_patches = Instant::now();
467        let patch_dispatch_results = if self.config.enable_graph_memory {
468            self.dispatch_patches_collect(
469                &memory_context.active_patches,
470                &input.frame,
471                &mut turn_warnings,
472            )
473        } else {
474            Vec::new()
475        };
476        for r in &patch_dispatch_results {
477            tracing::debug!(
478                target: "ainl_runtime",
479                label = %r.label,
480                dispatched = r.dispatched,
481                fitness_before = r.fitness_before,
482                fitness_after = r.fitness_after,
483                "patch_dispatch"
484            );
485        }
486        tracing::debug!(
487            target: "ainl_runtime",
488            duration_ms = t_patches.elapsed().as_millis() as u64,
489            "patch_dispatch_phase"
490        );
491
492        let dispatched_count = patch_dispatch_results
493            .iter()
494            .filter(|r| r.dispatched)
495            .count() as u32;
496        if dispatched_count >= self.config.max_steps {
497            let result = TurnResult {
498                patch_dispatch_results,
499                memory_context,
500                persona_prompt_contribution,
501                steps_executed: dispatched_count,
502                status: TurnStatus::StepLimitExceeded {
503                    steps_executed: dispatched_count,
504                },
505                ..Default::default()
506            };
507            let outcome = TurnOutcome::Complete(result);
508            self.hooks.on_turn_complete(&outcome);
509            return Ok(outcome);
510        }
511
512        let t_episode = Instant::now();
513        let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
514        let episode_id = match self
515            .memory
516            .with(|m| record_turn_episode(m, &self.config.agent_id, &input, &tools_canonical))
517        {
518            Ok(id) => id,
519            Err(e) => {
520                tracing::warn!(
521                    phase = ?TurnPhase::EpisodeWrite,
522                    error = %e,
523                    "non-fatal turn write failed — continuing"
524                );
525                turn_warnings.push(TurnWarning {
526                    phase: TurnPhase::EpisodeWrite,
527                    error: e,
528                });
529                Uuid::nil()
530            }
531        };
532        self.hooks.on_episode_recorded(episode_id);
533        tracing::debug!(
534            target: "ainl_runtime",
535            duration_ms = t_episode.elapsed().as_millis() as u64,
536            episode_id = %episode_id,
537            "episode_record"
538        );
539
540        if !episode_id.is_nil() {
541            for &tid in &input.emit_targets {
542                if let Err(e) = self.memory.with(|m| {
543                    m.sqlite_store()
544                        .insert_graph_edge_checked(episode_id, tid, EMIT_TO_EDGE)
545                }) {
546                    tracing::warn!(
547                        phase = ?TurnPhase::EpisodeWrite,
548                        error = %e,
549                        "non-fatal turn write failed — continuing"
550                    );
551                    turn_warnings.push(TurnWarning {
552                        phase: TurnPhase::EpisodeWrite,
553                        error: e,
554                    });
555                }
556            }
557        }
558
559        let emit_payload = serde_json::json!({
560            "episode_id": episode_id.to_string(),
561            "user_message": input.user_message,
562            "tools_invoked": tools_canonical,
563            "persona_contribution": persona_prompt_contribution,
564            "turn_count": self.turn_count.wrapping_add(1),
565        });
566        if let Err(e) = self.route_emit_edges(episode_id, &emit_payload) {
567            tracing::warn!(
568                phase = ?TurnPhase::EpisodeWrite,
569                error = %e,
570                "non-fatal turn write failed — continuing"
571            );
572            turn_warnings.push(TurnWarning {
573                phase: TurnPhase::EpisodeWrite,
574                error: format!("emit_routing: {e}"),
575            });
576        }
577
578        if let Err(e) = self.memory.with(|m| {
579            maybe_persist_trajectory_after_episode(
580                m,
581                &self.config.agent_id,
582                episode_id,
583                &tools_canonical,
584                &patch_dispatch_results,
585                &input,
586            )
587        }) {
588            tracing::warn!(
589                phase = ?TurnPhase::EpisodeWrite,
590                error = %e,
591                "non-fatal trajectory persist failed — continuing"
592            );
593            turn_warnings.push(TurnWarning {
594                phase: TurnPhase::EpisodeWrite,
595                error: format!("trajectory_persist: {e}"),
596            });
597        }
598
599        self.turn_count = self.turn_count.wrapping_add(1);
600
601        let should_extract = self.config.extraction_interval > 0
602            && self.turn_count.saturating_sub(self.last_extraction_at_turn)
603                >= self.config.extraction_interval as u64;
604
605        let t_extract = Instant::now();
606        let (extraction_report, _extraction_failed) = if should_extract {
607            let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
608
609            let res = if force_fail {
610                let e = "test_forced".to_string();
611                tracing::warn!(
612                    phase = ?TurnPhase::ExtractionPass,
613                    error = %e,
614                    "non-fatal turn write failed — continuing"
615                );
616                turn_warnings.push(TurnWarning {
617                    phase: TurnPhase::ExtractionPass,
618                    error: e,
619                });
620                tracing::debug!(
621                    target: "ainl_runtime",
622                    duration_ms = t_extract.elapsed().as_millis() as u64,
623                    signals_ingested = 0u64,
624                    skipped = false,
625                    "extraction_pass"
626                );
627                (None, true)
628            } else {
629                let report = self
630                    .memory
631                    .with(|m| self.extractor.run_pass(m.sqlite_store()));
632                if let Some(ref e) = report.extract_error {
633                    tracing::warn!(
634                        phase = ?TurnPhase::ExtractionPass,
635                        error = %e,
636                        "non-fatal turn write failed — continuing"
637                    );
638                    turn_warnings.push(TurnWarning {
639                        phase: TurnPhase::ExtractionPass,
640                        error: e.clone(),
641                    });
642                }
643                if let Some(ref e) = report.pattern_error {
644                    tracing::warn!(
645                        phase = ?TurnPhase::PatternPersistence,
646                        error = %e,
647                        "non-fatal turn write failed — continuing"
648                    );
649                    turn_warnings.push(TurnWarning {
650                        phase: TurnPhase::PatternPersistence,
651                        error: e.clone(),
652                    });
653                }
654                if let Some(ref e) = report.persona_error {
655                    tracing::warn!(
656                        phase = ?TurnPhase::PersonaEvolution,
657                        error = %e,
658                        "non-fatal turn write failed — continuing"
659                    );
660                    turn_warnings.push(TurnWarning {
661                        phase: TurnPhase::PersonaEvolution,
662                        error: e.clone(),
663                    });
664                }
665                let extraction_failed = report.has_errors();
666                if !extraction_failed {
667                    tracing::info!(
668                        agent_id = %report.agent_id,
669                        signals_extracted = report.signals_extracted,
670                        signals_applied = report.signals_applied,
671                        semantic_nodes_updated = report.semantic_nodes_updated,
672                        "ainl-graph-extractor pass completed (scheduled)"
673                    );
674                }
675                self.hooks.on_extraction_complete(&report);
676                self.persona_cache = None;
677                tracing::debug!(
678                    target: "ainl_runtime",
679                    duration_ms = t_extract.elapsed().as_millis() as u64,
680                    signals_ingested = report.signals_extracted as u64,
681                    skipped = false,
682                    "extraction_pass"
683                );
684                (Some(report), extraction_failed)
685            };
686            self.last_extraction_at_turn = self.turn_count;
687            res
688        } else {
689            tracing::debug!(
690                target: "ainl_runtime",
691                duration_ms = t_extract.elapsed().as_millis() as u64,
692                signals_ingested = 0u64,
693                skipped = true,
694                "extraction_pass"
695            );
696            (None, false)
697        };
698
699        if let Err(e) = self
700            .memory
701            .with(|m| try_export_graph_json_armaraos(m.sqlite_store(), &self.config.agent_id))
702        {
703            tracing::warn!(
704                phase = ?TurnPhase::ExportRefresh,
705                error = %e,
706                "non-fatal turn write failed — continuing"
707            );
708            turn_warnings.push(TurnWarning {
709                phase: TurnPhase::ExportRefresh,
710                error: e,
711            });
712        }
713
714        if !self.config.agent_id.is_empty() {
715            let state = RuntimeStateNode {
716                agent_id: self.config.agent_id.clone(),
717                turn_count: self.turn_count,
718                last_extraction_at_turn: self.last_extraction_at_turn,
719                persona_snapshot_json: self
720                    .persona_cache
721                    .as_ref()
722                    .and_then(|p| serde_json::to_string(p).ok()),
723                updated_at: chrono::Utc::now().timestamp(),
724            };
725            let write_res = if std::mem::take(&mut self.test_force_runtime_state_write_failure) {
726                Err("injected runtime state write failure".to_string())
727            } else {
728                self.memory.with(|m| m.write_runtime_state(&state))
729            };
730            if let Err(e) = write_res {
731                tracing::warn!(
732                    phase = ?TurnPhase::RuntimeStatePersist,
733                    error = %e,
734                    "failed to persist runtime state — cadence will reset on next restart"
735                );
736                turn_warnings.push(TurnWarning {
737                    phase: TurnPhase::RuntimeStatePersist,
738                    error: e,
739                });
740            }
741        }
742
743        // Fire vitals hook before building the outcome so hosts can act synchronously.
744        if let (Some(gate), Some(phase), Some(trust)) = (
745            input.vitals_gate.as_deref(),
746            input.vitals_phase.as_deref(),
747            input.vitals_trust,
748        ) {
749            self.hooks.on_vitals_classified(gate, phase, trust);
750        }
751
752        let result = TurnResult {
753            episode_id,
754            persona_prompt_contribution,
755            memory_context,
756            extraction_report,
757            steps_executed: dispatched_count,
758            patch_dispatch_results,
759            status: TurnStatus::Ok,
760            vitals_gate: input.vitals_gate.clone(),
761            vitals_phase: input.vitals_phase.clone(),
762            vitals_trust: input.vitals_trust,
763        };
764
765        let outcome = if turn_warnings.is_empty() {
766            TurnOutcome::Complete(result)
767        } else {
768            TurnOutcome::PartialSuccess {
769                result,
770                warnings: turn_warnings,
771            }
772        };
773
774        self.hooks.on_turn_complete(&outcome);
775        Ok(outcome)
776    }
777
778    /// Score and rank semantic nodes from `user_message` via `infer_topic_tags` (topic overlap +
779    /// recurrence tiebreaker), or high-recurrence fallback when the message is empty or yields no topic tags.
780    ///
781    /// **Vitals trust bonus (Gap L):** nodes tagged `vitals:*:pass` receive a `+0.2 * confidence`
782    /// bonus so high-trust episodes surface above zero-topic-overlap peers. Nodes tagged
783    /// `vitals:elevated` receive a `-0.1` penalty to deprioritise warn/fail episodes in recall.
784    /// This is a secondary signal — topic overlap still dominates.
785    fn relevant_semantic_nodes(
786        &self,
787        user_message: &str,
788        all_semantic: Vec<AinlMemoryNode>,
789        limit: usize,
790    ) -> Vec<AinlMemoryNode> {
791        let user_tags = infer_topic_tags(user_message);
792        let user_topics: HashSet<String> = user_tags
793            .iter()
794            .filter(|t| t.namespace == TagNamespace::Topic)
795            .map(|t| t.value.to_lowercase())
796            .collect();
797
798        if user_message.trim().is_empty() || user_topics.is_empty() {
799            return fallback_high_recurrence_semantic(all_semantic, limit);
800        }
801
802        let mut scored: Vec<(f32, u32, AinlMemoryNode)> = Vec::new();
803        for n in all_semantic {
804            let (score, rec) = match &n.node_type {
805                AinlNodeType::Semantic { semantic } => {
806                    let mut s = 0f32;
807                    if let Some(cluster) = &semantic.topic_cluster {
808                        for slug in cluster
809                            .split([',', ';'])
810                            .map(|s| s.trim().to_lowercase())
811                            .filter(|s| !s.is_empty())
812                        {
813                            if user_topics.contains(&slug) {
814                                s += 1.0;
815                            }
816                        }
817                    }
818                    if s == 0.0 {
819                        for tag in &semantic.tags {
820                            let tl = tag.to_lowercase();
821                            if let Some(rest) = tl.strip_prefix("topic:") {
822                                let slug = rest.trim().to_lowercase();
823                                if user_topics.contains(&slug) {
824                                    s = 0.5;
825                                    break;
826                                }
827                            }
828                        }
829                    }
830                    // Vitals trust bonus: `vitals:*:pass` → +0.2 * node confidence.
831                    // `vitals:elevated` → -0.1 penalty (warn/fail gate).
832                    // Uses the node's own confidence as a proxy for vitals trust since
833                    // SemanticNode.tags is Vec<String> without per-tag confidence.
834                    let confidence = semantic.confidence;
835                    for tag in &semantic.tags {
836                        let tl = tag.to_lowercase();
837                        if tl.starts_with("vitals:") {
838                            if tl.ends_with(":pass") {
839                                s += 0.2 * confidence;
840                            } else if tl == "vitals:elevated" {
841                                s -= 0.1;
842                            }
843                        }
844                    }
845                    (s, semantic.recurrence_count)
846                }
847                _ => (0.0, 0),
848            };
849            scored.push((score, rec, n));
850        }
851
852        scored.sort_by(|a, b| {
853            b.0.partial_cmp(&a.0)
854                .unwrap_or(std::cmp::Ordering::Equal)
855                .then_with(|| b.1.cmp(&a.1))
856        });
857        scored.into_iter().take(limit).map(|t| t.2).collect()
858    }
859
860    pub fn dispatch_patches(
861        &mut self,
862        patches: &[AinlMemoryNode],
863        frame: &HashMap<String, serde_json::Value>,
864    ) -> Vec<PatchDispatchResult> {
865        let mut w = Vec::new();
866        self.dispatch_patches_collect(patches, frame, &mut w)
867    }
868
869    fn dispatch_patches_collect(
870        &mut self,
871        patches: &[AinlMemoryNode],
872        frame: &HashMap<String, serde_json::Value>,
873        turn_warnings: &mut Vec<TurnWarning>,
874    ) -> Vec<PatchDispatchResult> {
875        let mut out = Vec::new();
876        for node in patches {
877            let res = self.dispatch_one_patch(node, frame);
878            if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
879                tracing::warn!(
880                    phase = ?TurnPhase::FitnessWriteBack,
881                    error = %e,
882                    "non-fatal turn write failed — continuing"
883                );
884                turn_warnings.push(TurnWarning {
885                    phase: TurnPhase::FitnessWriteBack,
886                    error: format!("{}: {}", res.label, e),
887                });
888            }
889            out.push(res);
890        }
891        out
892    }
893
894    fn dispatch_one_patch(
895        &mut self,
896        node: &AinlMemoryNode,
897        frame: &HashMap<String, serde_json::Value>,
898    ) -> PatchDispatchResult {
899        let label_default = String::new();
900        let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
901            AinlNodeType::Procedural { procedural } => (
902                procedural_label(procedural),
903                procedural.patch_version,
904                procedural.retired,
905                procedural.declared_reads.clone(),
906                procedural.fitness,
907            ),
908            _ => {
909                return PatchDispatchResult {
910                    label: label_default,
911                    patch_version: 0,
912                    fitness_before: 0.0,
913                    fitness_after: 0.0,
914                    dispatched: false,
915                    skip_reason: Some(PatchSkipReason::NotProcedural),
916                    adapter_output: None,
917                    adapter_name: None,
918                    dispatch_duration_ms: 0,
919                };
920            }
921        };
922
923        if pv == 0 {
924            return PatchDispatchResult {
925                label: label_src,
926                patch_version: pv,
927                fitness_before: fitness_opt.unwrap_or(0.5),
928                fitness_after: fitness_opt.unwrap_or(0.5),
929                dispatched: false,
930                skip_reason: Some(PatchSkipReason::ZeroVersion),
931                adapter_output: None,
932                adapter_name: None,
933                dispatch_duration_ms: 0,
934            };
935        }
936        if retired {
937            return PatchDispatchResult {
938                label: label_src.clone(),
939                patch_version: pv,
940                fitness_before: fitness_opt.unwrap_or(0.5),
941                fitness_after: fitness_opt.unwrap_or(0.5),
942                dispatched: false,
943                skip_reason: Some(PatchSkipReason::Retired),
944                adapter_output: None,
945                adapter_name: None,
946                dispatch_duration_ms: 0,
947            };
948        }
949        for key in &reads {
950            if !frame.contains_key(key) {
951                return PatchDispatchResult {
952                    label: label_src.clone(),
953                    patch_version: pv,
954                    fitness_before: fitness_opt.unwrap_or(0.5),
955                    fitness_after: fitness_opt.unwrap_or(0.5),
956                    dispatched: false,
957                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
958                    adapter_output: None,
959                    adapter_name: None,
960                    dispatch_duration_ms: 0,
961                };
962            }
963        }
964
965        let patch_label = label_src.clone();
966        let adapter_key = patch_label.as_str();
967        let ctx = PatchDispatchContext {
968            patch_label: adapter_key,
969            node,
970            frame,
971        };
972        let (adapter_output, adapter_name, dispatch_duration_ms) = if let Some(adapter) = self
973            .adapter_registry
974            .get(adapter_key)
975            .or_else(|| self.adapter_registry.get(GraphPatchAdapter::NAME))
976        {
977            let aname = adapter.name().to_string();
978            let t_exec = Instant::now();
979            let (out, name) = match adapter.execute_patch(&ctx) {
980                Ok(output) => {
981                    tracing::debug!(
982                        label = %patch_label,
983                        adapter = %aname,
984                        "adapter executed patch"
985                    );
986                    (Some(output), Some(aname))
987                }
988                Err(e) => {
989                    tracing::warn!(
990                        label = %patch_label,
991                        adapter = %aname,
992                        error = %e,
993                        "adapter execution failed — continuing as metadata dispatch"
994                    );
995                    (None, Some(aname))
996                }
997            };
998            let ms = t_exec.elapsed().as_millis() as u64;
999            (out, name, ms)
1000        } else {
1001            (None, None, 0u64)
1002        };
1003
1004        let fitness_before = fitness_opt.unwrap_or(0.5);
1005        let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
1006
1007        let updated = match self.memory.with(|m| {
1008            let store = m.sqlite_store();
1009            store.read_node(node.id)
1010        }) {
1011            Ok(Some(mut n)) => {
1012                if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
1013                    procedural.fitness = Some(fitness_after);
1014                }
1015                n
1016            }
1017            Ok(None) => {
1018                return PatchDispatchResult {
1019                    label: label_src,
1020                    patch_version: pv,
1021                    fitness_before,
1022                    fitness_after: fitness_before,
1023                    dispatched: false,
1024                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead("node_row".into())),
1025                    adapter_output,
1026                    adapter_name,
1027                    dispatch_duration_ms,
1028                };
1029            }
1030            Err(e) => {
1031                return PatchDispatchResult {
1032                    label: label_src,
1033                    patch_version: pv,
1034                    fitness_before,
1035                    fitness_after: fitness_before,
1036                    dispatched: false,
1037                    skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1038                    adapter_output,
1039                    adapter_name,
1040                    dispatch_duration_ms,
1041                };
1042            }
1043        };
1044
1045        if self.test_force_fitness_write_failure {
1046            self.test_force_fitness_write_failure = false;
1047            let e = "injected fitness write failure".to_string();
1048            return PatchDispatchResult {
1049                label: label_src.clone(),
1050                patch_version: pv,
1051                fitness_before,
1052                fitness_after: fitness_before,
1053                dispatched: false,
1054                skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1055                adapter_output,
1056                adapter_name,
1057                dispatch_duration_ms,
1058            };
1059        }
1060
1061        if let Err(e) = self.memory.with(|m| m.write_node(&updated)) {
1062            return PatchDispatchResult {
1063                label: label_src.clone(),
1064                patch_version: pv,
1065                fitness_before,
1066                fitness_after: fitness_before,
1067                dispatched: false,
1068                skip_reason: Some(PatchSkipReason::PersistFailed(e)),
1069                adapter_output,
1070                adapter_name,
1071                dispatch_duration_ms,
1072            };
1073        }
1074
1075        self.hooks
1076            .on_patch_dispatched(label_src.as_str(), fitness_after);
1077
1078        PatchDispatchResult {
1079            label: label_src,
1080            patch_version: pv,
1081            fitness_before,
1082            fitness_after,
1083            dispatched: true,
1084            skip_reason: None,
1085            adapter_output,
1086            adapter_name,
1087            dispatch_duration_ms,
1088        }
1089    }
1090}
1091
1092pub(crate) fn emit_target_name(n: &AinlMemoryNode) -> String {
1093    match &n.node_type {
1094        AinlNodeType::Persona { persona } => persona.trait_name.clone(),
1095        AinlNodeType::Procedural { procedural } => procedural_label(procedural),
1096        AinlNodeType::Semantic { semantic } => semantic.fact.chars().take(64).collect(),
1097        AinlNodeType::Episode { episodic } => episodic.turn_id.to_string(),
1098        AinlNodeType::RuntimeState { runtime_state } => {
1099            format!("runtime_state:{}", runtime_state.agent_id)
1100        }
1101        AinlNodeType::Trajectory { trajectory } => {
1102            format!("trajectory:{}", trajectory.episode_id)
1103        }
1104        AinlNodeType::Failure { failure } => {
1105            format!(
1106                "failure:{}:{}",
1107                failure.source,
1108                failure.tool_name.as_deref().unwrap_or("")
1109            )
1110        }
1111    }
1112}
1113
1114pub(crate) fn procedural_label(p: &ProceduralNode) -> String {
1115    if !p.label.is_empty() {
1116        p.label.clone()
1117    } else {
1118        p.pattern_name.clone()
1119    }
1120}
1121
1122pub(crate) fn fallback_high_recurrence_semantic(
1123    all: Vec<AinlMemoryNode>,
1124    limit: usize,
1125) -> Vec<AinlMemoryNode> {
1126    let mut v: Vec<_> = all
1127        .into_iter()
1128        .filter(|n| {
1129            matches!(&n.node_type, AinlNodeType::Semantic { semantic } if semantic.recurrence_count >= 2)
1130        })
1131        .collect();
1132    v.sort_by(|a, b| {
1133        let ra = match &a.node_type {
1134            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1135            _ => 0,
1136        };
1137        let rb = match &b.node_type {
1138            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1139            _ => 0,
1140        };
1141        rb.cmp(&ra)
1142    });
1143    v.into_iter().take(limit).collect()
1144}
1145
1146pub(crate) fn persona_snapshot_if_evolved(
1147    extractor: &GraphExtractorTask,
1148) -> Option<ainl_persona::PersonaSnapshot> {
1149    let snap = extractor.evolution_engine.snapshot();
1150    let defaults = default_axis_map(0.5);
1151    for axis in PersonaAxis::ALL {
1152        let s = snap.axes.get(&axis).map(|a| a.score).unwrap_or(0.5);
1153        let d = defaults.get(&axis).map(|a| a.score).unwrap_or(0.5);
1154        if (s - d).abs() > INGEST_SCORE_EPSILON {
1155            return Some(snap);
1156        }
1157    }
1158    None
1159}
1160
1161pub(crate) fn compile_persona_from_nodes(
1162    nodes: &[AinlMemoryNode],
1163) -> Result<Option<String>, String> {
1164    if nodes.is_empty() {
1165        return Ok(None);
1166    }
1167    let mut lines = Vec::new();
1168    for n in nodes {
1169        if let AinlNodeType::Persona { persona } = &n.node_type {
1170            lines.push(format_persona_line(persona));
1171        }
1172    }
1173    if lines.is_empty() {
1174        Ok(None)
1175    } else {
1176        Ok(Some(lines.join("\n")))
1177    }
1178}
1179
1180fn format_persona_line(p: &PersonaNode) -> String {
1181    format!(
1182        "- {} (strength {:.2}, layer {:?}, source {:?})",
1183        p.trait_name, p.strength, p.layer, p.source
1184    )
1185}
1186
1187/// Refresh `{AINL_GRAPH_MEMORY_ARMARAOS_EXPORT}/{agent_id}_graph_export.json` when the env var is set.
1188pub(crate) fn try_export_graph_json_armaraos(
1189    store: &SqliteGraphStore,
1190    agent_id: &str,
1191) -> Result<(), String> {
1192    let trimmed = std::env::var("AINL_GRAPH_MEMORY_ARMARAOS_EXPORT").unwrap_or_default();
1193    let dir = trimmed.trim();
1194    if dir.is_empty() {
1195        return Ok(());
1196    }
1197    let dir_path = PathBuf::from(dir);
1198    std::fs::create_dir_all(&dir_path).map_err(|e| format!("export mkdir: {e}"))?;
1199    let path = dir_path.join(format!("{agent_id}_graph_export.json"));
1200    let snap = store.export_graph(agent_id)?;
1201    let v = serde_json::to_value(&snap).map_err(|e| format!("serialize: {e}"))?;
1202    std::fs::write(
1203        &path,
1204        serde_json::to_vec_pretty(&v).map_err(|e| format!("json encode: {e}"))?,
1205    )
1206    .map_err(|e| format!("write export: {e}"))?;
1207    Ok(())
1208}
1209
1210/// Canonical tool names for episodic storage: [`tag_tool_names`] → `TagNamespace::Tool` values,
1211/// deduplicated and sorted (lexicographic). Empty input yields `["turn"]` (same sentinel as before).
1212pub(crate) fn normalize_tools_for_episode(tools_invoked: &[String]) -> Vec<String> {
1213    if tools_invoked.is_empty() {
1214        return vec!["turn".to_string()];
1215    }
1216    let tags = tag_tool_names(tools_invoked);
1217    let mut seen: BTreeSet<String> = BTreeSet::new();
1218    for t in tags {
1219        if t.namespace == TagNamespace::Tool {
1220            seen.insert(t.value);
1221        }
1222    }
1223    if seen.is_empty() {
1224        vec!["turn".to_string()]
1225    } else {
1226        seen.into_iter().collect()
1227    }
1228}
1229
1230const TRAJECTORY_PREVIEW_MAX: usize = 512;
1231
1232fn truncate_trajectory_preview(s: String) -> String {
1233    if s.len() <= TRAJECTORY_PREVIEW_MAX {
1234        return s;
1235    }
1236    let mut t = s;
1237    t.truncate(TRAJECTORY_PREVIEW_MAX);
1238    t.push('…');
1239    t
1240}
1241
1242/// Build [`TrajectoryStep`]s in turn order: procedural patch dispatch attempts, then normalized tool names.
1243pub(crate) fn trajectory_steps_for_runtime_turn(
1244    patch_results: &[PatchDispatchResult],
1245    tools_canonical: &[String],
1246) -> Vec<TrajectoryStep> {
1247    let base_ms = chrono::Utc::now().timestamp_millis();
1248    let mut steps = Vec::new();
1249
1250    for (i, r) in patch_results.iter().enumerate() {
1251        let adapter = r
1252            .adapter_name
1253            .clone()
1254            .unwrap_or_else(|| "patch".to_string());
1255        let operation = if r.label.is_empty() {
1256            "procedural_patch".to_string()
1257        } else {
1258            r.label.clone()
1259        };
1260        let (success, error) = if r.dispatched {
1261            if r.adapter_output.is_some() {
1262                (true, None)
1263            } else if r.adapter_name.is_some() {
1264                (false, Some("adapter_execute_patch_failed".to_string()))
1265            } else {
1266                (true, None)
1267            }
1268        } else {
1269            (true, None)
1270        };
1271        let outputs_preview = r
1272            .adapter_output
1273            .as_ref()
1274            .and_then(|v| serde_json::to_string(v).ok())
1275            .map(truncate_trajectory_preview);
1276
1277        steps.push(TrajectoryStep {
1278            step_id: format!("patch_{i}"),
1279            timestamp_ms: base_ms + i as i64,
1280            adapter,
1281            operation,
1282            inputs_preview: None,
1283            outputs_preview,
1284            duration_ms: r.dispatch_duration_ms,
1285            success,
1286            error,
1287            vitals: None,
1288            freshness_at_step: None,
1289            frame_vars: None,
1290            tool_telemetry: None,
1291        });
1292    }
1293
1294    let tool_base = base_ms + patch_results.len() as i64;
1295    for (i, name) in tools_canonical.iter().enumerate() {
1296        steps.push(TrajectoryStep {
1297            step_id: format!("tool_{i}"),
1298            timestamp_ms: tool_base + i as i64,
1299            adapter: "builtin".into(),
1300            operation: name.clone(),
1301            inputs_preview: None,
1302            outputs_preview: None,
1303            duration_ms: 0,
1304            success: true,
1305            error: None,
1306            vitals: None,
1307            freshness_at_step: None,
1308            frame_vars: None,
1309            tool_telemetry: None,
1310        });
1311    }
1312
1313    steps
1314}
1315
1316/// After a successful [`record_turn_episode`], persist trajectory graph node + `ainl_trajectories`
1317/// row when [`ainl_memory::trajectory_env_enabled`]: per-patch dispatch (timing + adapter) then tool steps.
1318pub(crate) fn maybe_persist_trajectory_after_episode(
1319    memory: &ainl_memory::GraphMemory,
1320    agent_id: &str,
1321    episode_id: Uuid,
1322    tools_canonical: &[String],
1323    patch_dispatch_results: &[PatchDispatchResult],
1324    input: &TurnInput,
1325) -> Result<(), String> {
1326    if episode_id.is_nil() || !ainl_memory::trajectory_env_enabled() {
1327        return Ok(());
1328    }
1329    let session_id = input
1330        .trace_event
1331        .as_ref()
1332        .and_then(|v| v.get("session_id").and_then(|x| x.as_str()))
1333        .unwrap_or("ainl-runtime");
1334    let project_id = std::env::var("AINL_MEMORY_PROJECT_ID").ok();
1335    let hash = std::env::var("AINL_SOURCE_HASH")
1336        .ok()
1337        .map(|s| s.trim().to_string())
1338        .filter(|s| !s.is_empty());
1339
1340    let steps = trajectory_steps_for_runtime_turn(patch_dispatch_results, tools_canonical);
1341    let duration_ms: u64 = steps.iter().map(|s| s.duration_ms).sum();
1342    let outcome = if steps.iter().any(|s| !s.success) {
1343        TrajectoryOutcome::PartialSuccess
1344    } else {
1345        TrajectoryOutcome::Success
1346    };
1347
1348    ainl_memory::persist_trajectory_for_episode(
1349        memory,
1350        agent_id,
1351        episode_id,
1352        steps,
1353        outcome,
1354        session_id,
1355        project_id.as_deref(),
1356        hash.as_deref(),
1357        duration_ms,
1358        None,
1359        None,
1360    )?;
1361    Ok(())
1362}
1363
1364pub(crate) fn record_turn_episode(
1365    memory: &ainl_memory::GraphMemory,
1366    agent_id: &str,
1367    input: &TurnInput,
1368    tools_invoked_canonical: &[String],
1369) -> Result<Uuid, String> {
1370    let turn_id = Uuid::new_v4();
1371    let timestamp = chrono::Utc::now().timestamp();
1372    let tools = tools_invoked_canonical.to_vec();
1373    let mut node = AinlMemoryNode::new_episode(
1374        turn_id,
1375        timestamp,
1376        tools.clone(),
1377        None,
1378        input.trace_event.clone(),
1379    );
1380    node.agent_id = agent_id.to_string();
1381    if let AinlNodeType::Episode { ref mut episodic } = node.node_type {
1382        episodic.user_message = Some(input.user_message.clone());
1383        episodic.tools_invoked = tools;
1384        // Persist cognitive vitals from the host LLM completion (fail-open: all fields are Option).
1385        episodic.vitals_gate = input.vitals_gate.clone();
1386        episodic.vitals_phase = input.vitals_phase.clone();
1387        episodic.vitals_trust = input.vitals_trust;
1388    }
1389    memory.write_node(&node)?;
1390    Ok(node.id)
1391}
1392
1393#[cfg(test)]
1394mod trajectory_step_builder_tests {
1395    use super::trajectory_steps_for_runtime_turn;
1396    use crate::engine::PatchDispatchResult;
1397
1398    fn sample_patch(
1399        dispatched: bool,
1400        adapter_name: Option<String>,
1401        adapter_output: Option<serde_json::Value>,
1402        dispatch_duration_ms: u64,
1403    ) -> PatchDispatchResult {
1404        PatchDispatchResult {
1405            label: "my_patch".into(),
1406            patch_version: 1,
1407            fitness_before: 0.5,
1408            fitness_after: 0.6,
1409            dispatched,
1410            skip_reason: None,
1411            adapter_output,
1412            adapter_name,
1413            dispatch_duration_ms,
1414        }
1415    }
1416
1417    #[test]
1418    fn trajectory_steps_orders_patches_then_tools() {
1419        let patches = vec![sample_patch(
1420            true,
1421            Some("graph".into()),
1422            Some(serde_json::json!({"ok": true})),
1423            12,
1424        )];
1425        let tools = vec!["turn".to_string()];
1426        let steps = trajectory_steps_for_runtime_turn(&patches, &tools);
1427        assert_eq!(steps.len(), 2);
1428        assert!(steps[0].step_id.starts_with("patch_"));
1429        assert!(steps[1].step_id.starts_with("tool_"));
1430        assert_eq!(steps[0].duration_ms, 12);
1431        assert!(steps[0].success);
1432    }
1433
1434    #[test]
1435    fn adapter_failure_marks_step_failed() {
1436        let patches = vec![sample_patch(true, Some("graph".into()), None, 5)];
1437        let steps = trajectory_steps_for_runtime_turn(&patches, &[]);
1438        assert_eq!(steps.len(), 1);
1439        assert!(!steps[0].success);
1440    }
1441}
1442
1443#[cfg(feature = "async")]
1444#[path = "runtime_async.rs"]
1445mod runtime_async_impl;