Skip to main content

ainl_runtime/
runtime.rs

1//! Unified-graph orchestration runtime (v0.2): load, compile context, patch dispatch, record, emit, extract.
2
3use std::collections::{BTreeSet, HashMap, HashSet};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::Arc;
7use std::time::Instant;
8
9use ainl_graph_extractor::GraphExtractorTask;
10use ainl_memory::{
11    AinlMemoryNode, AinlNodeType, GraphStore, GraphValidationReport, PersonaNode, ProceduralNode,
12    RuntimeStateNode, SqliteGraphStore,
13};
14use ainl_persona::axes::default_axis_map;
15use ainl_persona::{
16    EvolutionEngine, PersonaAxis, PersonaSnapshot, RawSignal, INGEST_SCORE_EPSILON,
17};
18use ainl_semantic_tagger::infer_topic_tags;
19use ainl_semantic_tagger::tag_tool_names;
20use ainl_semantic_tagger::TagNamespace;
21use uuid::Uuid;
22
23use crate::adapters::{AdapterRegistry, GraphPatchAdapter};
24use crate::engine::{
25    AinlGraphArtifact, AinlRuntimeError, MemoryContext, PatchDispatchContext, PatchDispatchResult,
26    PatchSkipReason, TurnInput, TurnOutcome, TurnPhase, TurnResult, TurnStatus, TurnWarning,
27    EMIT_TO_EDGE,
28};
29use crate::graph_cell::{GraphCell, SqliteStoreRef};
30use crate::hooks::{NoOpHooks, TurnHooks};
31#[cfg(feature = "async")]
32use crate::hooks::TurnHooksAsync;
33use crate::RuntimeConfig;
34
35/// Orchestrates ainl-memory, persona snapshot state, and graph extraction for one agent.
36///
37/// ## Evolution writes vs ArmaraOS / openfang-runtime
38///
39/// In production ArmaraOS, **openfang-runtime**’s `GraphMemoryWriter::run_persona_evolution_pass`
40/// is the active writer of the evolution persona row ([`crate::EVOLUTION_TRAIT_NAME`]) to each
41/// agent’s `ainl_memory.db`. This struct holds its own [`GraphExtractorTask`] and
42/// [`EvolutionEngine`]. Calling [`Self::persist_evolution_snapshot`] or
43/// [`Self::evolve_persona_from_graph_signals`] **concurrently** with that pass on the **same**
44/// SQLite store is undefined (competing last-writer wins on the same persona node).
45///
46/// Prefer [`Self::with_evolution_writes_enabled(false)`] when a host embeds `AinlRuntime` alongside
47/// openfang while openfang remains the sole evolution writer. [`Self::evolution_engine_mut`] can
48/// still mutate in-memory axis state; calling [`EvolutionEngine::write_persona_node`] yourself
49/// bypasses this guard and must be avoided in that configuration.
50pub struct AinlRuntime {
51    config: RuntimeConfig,
52    memory: GraphCell,
53    extractor: GraphExtractorTask,
54    turn_count: u64,
55    last_extraction_at_turn: u64,
56    /// Current delegation depth for the active `run_turn` call chain (incremented per nested entry).
57    current_depth: Arc<AtomicU32>,
58    hooks: Box<dyn TurnHooks>,
59    /// When `false`, [`Self::persist_evolution_snapshot`] and [`Self::evolve_persona_from_graph_signals`]
60    /// return [`Err`] immediately so this runtime does not compete with another evolution writer
61    /// (e.g. openfang’s post-turn pass) on the same DB. Default: `true`.
62    evolution_writes_enabled: bool,
63    persona_cache: Option<String>,
64    /// Test hook: when set, the next scheduled extraction pass is treated as failed (`PartialSuccess`).
65    #[doc(hidden)]
66    test_force_extraction_failure: bool,
67    /// Test hook: next fitness write-back from procedural dispatch fails without touching SQLite.
68    #[doc(hidden)]
69    test_force_fitness_write_failure: bool,
70    /// Test hook: next runtime-state SQLite persist fails (non-fatal warning).
71    #[doc(hidden)]
72    test_force_runtime_state_write_failure: bool,
73    adapter_registry: AdapterRegistry,
74    /// Optional async hooks for [`Self::run_turn_async`] (see `async` feature).
75    #[cfg(feature = "async")]
76    hooks_async: Option<std::sync::Arc<dyn TurnHooksAsync>>,
77}
78
79impl AinlRuntime {
80    pub fn new(config: RuntimeConfig, store: SqliteGraphStore) -> Self {
81        let agent_id = config.agent_id.clone();
82        let memory = GraphCell::new(store);
83        let (init_turn_count, init_persona_cache, init_last_extraction_at_turn) =
84            if agent_id.is_empty() {
85                (0, None, 0)
86            } else {
87                match memory.read_runtime_state(&agent_id) {
88                    Ok(Some(state)) => {
89                        tracing::info!(
90                            agent_id = %agent_id,
91                            turn_count = state.turn_count,
92                            "restored runtime state"
93                        );
94                        let persona_cache = state
95                            .persona_snapshot_json
96                            .as_ref()
97                            .and_then(|json| serde_json::from_str::<String>(json).ok());
98                        (
99                            state.turn_count,
100                            persona_cache,
101                            state.last_extraction_at_turn,
102                        )
103                    }
104                    Ok(None) => (0, None, 0),
105                    Err(e) => {
106                        tracing::warn!(error = %e, "failed to load runtime state — starting fresh");
107                        (0, None, 0)
108                    }
109                }
110            };
111        Self {
112            extractor: GraphExtractorTask::new(&agent_id),
113            memory,
114            config,
115            turn_count: init_turn_count,
116            last_extraction_at_turn: init_last_extraction_at_turn,
117            current_depth: Arc::new(AtomicU32::new(0)),
118            hooks: Box::new(NoOpHooks),
119            evolution_writes_enabled: true,
120            persona_cache: init_persona_cache,
121            test_force_extraction_failure: false,
122            test_force_fitness_write_failure: false,
123            test_force_runtime_state_write_failure: false,
124            adapter_registry: AdapterRegistry::new(),
125            #[cfg(feature = "async")]
126            hooks_async: None,
127        }
128    }
129
130    /// Register a [`crate::PatchAdapter`] keyed by [`PatchAdapter::name`] (e.g. procedural patch label).
131    pub fn register_adapter(&mut self, adapter: impl crate::PatchAdapter + 'static) {
132        self.adapter_registry.register(adapter);
133    }
134
135    /// Install the reference [`GraphPatchAdapter`] as fallback for procedural patches without a
136    /// label-specific adapter (see [`PatchDispatchContext`]).
137    pub fn register_default_patch_adapters(&mut self) {
138        self.register_adapter(GraphPatchAdapter::new());
139    }
140
141    /// Names of currently registered patch adapters.
142    pub fn registered_adapters(&self) -> Vec<&str> {
143        self.adapter_registry.registered_names()
144    }
145
146    #[doc(hidden)]
147    pub fn test_turn_count(&self) -> u64 {
148        self.turn_count
149    }
150
151    #[doc(hidden)]
152    pub fn test_persona_cache(&self) -> Option<&str> {
153        self.persona_cache.as_deref()
154    }
155
156    #[doc(hidden)]
157    pub fn test_delegation_depth(&self) -> u32 {
158        self.current_depth.load(Ordering::SeqCst)
159    }
160
161    #[doc(hidden)]
162    pub fn test_set_delegation_depth(&mut self, depth: u32) {
163        self.current_depth.store(depth, Ordering::SeqCst);
164    }
165
166    #[doc(hidden)]
167    pub fn test_set_force_extraction_failure(&mut self, fail: bool) {
168        self.test_force_extraction_failure = fail;
169    }
170
171    #[doc(hidden)]
172    pub fn test_set_force_fitness_write_failure(&mut self, fail: bool) {
173        self.test_force_fitness_write_failure = fail;
174    }
175
176    /// Test hook: access the graph extractor task for per-phase error injection.
177    #[doc(hidden)]
178    pub fn test_extractor_mut(&mut self) -> &mut GraphExtractorTask {
179        &mut self.extractor
180    }
181
182    #[doc(hidden)]
183    pub fn test_set_force_runtime_state_write_failure(&mut self, fail: bool) {
184        self.test_force_runtime_state_write_failure = fail;
185    }
186
187    pub fn with_hooks(mut self, hooks: impl TurnHooks + 'static) -> Self {
188        self.hooks = Box::new(hooks);
189        self
190    }
191
192    /// Install async turn hooks ([`TurnHooksAsync`]) for [`Self::run_turn_async`].
193    #[cfg(feature = "async")]
194    pub fn with_hooks_async(mut self, hooks: std::sync::Arc<dyn TurnHooksAsync>) -> Self {
195        self.hooks_async = Some(hooks);
196        self
197    }
198
199    /// Set whether [`Self::persist_evolution_snapshot`] and [`Self::evolve_persona_from_graph_signals`]
200    /// may write the evolution persona row. When `false`, those methods return [`Err`]. Chaining
201    /// after [`Self::new`] is the supported way to disable writes for hosts that delegate evolution
202    /// persistence elsewhere (see struct-level docs).
203    pub fn with_evolution_writes_enabled(mut self, enabled: bool) -> Self {
204        self.evolution_writes_enabled = enabled;
205        self
206    }
207
208    fn require_evolution_writes_enabled(&self) -> Result<(), String> {
209        if self.evolution_writes_enabled {
210            Ok(())
211        } else {
212            Err(
213                "ainl_runtime: evolution_writes_enabled is false — persist_evolution_snapshot and \
214                 evolve_persona_from_graph_signals are disabled so this runtime does not compete \
215                 with openfang-runtime GraphMemoryWriter::run_persona_evolution_pass on the same \
216                 ainl_memory.db"
217                    .to_string(),
218            )
219        }
220    }
221
222    /// Borrow the backing SQLite store (same connection as graph memory).
223    ///
224    /// When built with the `async` feature, this locks the in-runtime graph mutex for the lifetime
225    /// of the returned guard (see [`SqliteStoreRef`]). That mutex is [`std::sync::Mutex`] (shared
226    /// via [`std::sync::Arc`]), not `tokio::sync::Mutex`, so this helper remains usable from Tokio
227    /// worker threads for quick reads without forcing an async lock API.
228    pub fn sqlite_store(&self) -> SqliteStoreRef<'_> {
229        self.memory.sqlite_ref()
230    }
231
232    /// Borrow the persona [`EvolutionEngine`] for this runtime’s agent.
233    ///
234    /// This is the **same** `EvolutionEngine` instance held by [`GraphExtractorTask::evolution_engine`].
235    /// Scheduled [`GraphExtractorTask::run_pass`] continues to feed graph + pattern signals into it;
236    /// hosts may also call [`EvolutionEngine::ingest_signals`], [`EvolutionEngine::correction_tick`],
237    /// [`EvolutionEngine::extract_signals`], or [`EvolutionEngine::evolve`] directly, then
238    /// [`Self::persist_evolution_snapshot`] to write the [`PersonaSnapshot`] row ([`crate::EVOLUTION_TRAIT_NAME`]).
239    pub fn evolution_engine(&self) -> &EvolutionEngine {
240        &self.extractor.evolution_engine
241    }
242
243    /// Mutable access to the persona [`EvolutionEngine`] (see [`Self::evolution_engine`]).
244    ///
245    /// Direct calls to [`EvolutionEngine::write_persona_node`] bypass [`Self::evolution_writes_enabled`].
246    pub fn evolution_engine_mut(&mut self) -> &mut EvolutionEngine {
247        &mut self.extractor.evolution_engine
248    }
249
250    /// Ingest explicit [`RawSignal`]s without reading the graph (wrapper for [`EvolutionEngine::ingest_signals`]).
251    pub fn apply_evolution_signals(&mut self, signals: Vec<RawSignal>) -> usize {
252        self.extractor.evolution_engine.ingest_signals(signals)
253    }
254
255    /// Apply a host correction nudge on one axis ([`EvolutionEngine::correction_tick`]).
256    pub fn evolution_correction_tick(&mut self, axis: PersonaAxis, correction: f32) {
257        self.extractor
258            .evolution_engine
259            .correction_tick(axis, correction);
260    }
261
262    /// Snapshot current axis EMA state and persist the evolution persona bundle to the store.
263    ///
264    /// Returns [`Err`] when [`Self::evolution_writes_enabled`] is `false` (see [`Self::with_evolution_writes_enabled`]).
265    pub fn persist_evolution_snapshot(&mut self) -> Result<PersonaSnapshot, String> {
266        self.require_evolution_writes_enabled()?;
267        let snap = self.extractor.evolution_engine.snapshot();
268        self.memory.with(|m| {
269            self.extractor
270                .evolution_engine
271                .write_persona_node(m.sqlite_store(), &snap)
272        })?;
273        Ok(snap)
274    }
275
276    /// Graph-backed evolution only: extract signals from the store, ingest, write ([`EvolutionEngine::evolve`]).
277    ///
278    /// This does **not** run semantic `recurrence_count` bumps or the extractor’s `extract_pass`
279    /// heuristics — use [`GraphExtractorTask::run_pass`] for the full scheduled pipeline.
280    ///
281    /// Returns [`Err`] when [`Self::evolution_writes_enabled`] is `false` (see [`Self::with_evolution_writes_enabled`]).
282    pub fn evolve_persona_from_graph_signals(&mut self) -> Result<PersonaSnapshot, String> {
283        self.require_evolution_writes_enabled()?;
284        self.memory
285            .with(|m| self.extractor.evolution_engine.evolve(m.sqlite_store()))
286    }
287
288    /// Boot: export + validate the agent subgraph.
289    pub fn load_artifact(&self) -> Result<AinlGraphArtifact, String> {
290        self.memory
291            .with(|m| AinlGraphArtifact::load(m.sqlite_store(), &self.config.agent_id))
292    }
293
294    /// Same as [`Self::compile_memory_context_for`] with `user_message: None` (treated as empty for
295    /// semantic ranking; see [`Self::compile_memory_context_for`]).
296    pub fn compile_memory_context(&self) -> Result<MemoryContext, String> {
297        self.compile_memory_context_for(None)
298    }
299
300    /// Build [`MemoryContext`] from the live store plus current extractor axis state.
301    ///
302    /// `relevant_semantic` is ranked from this `user_message` only (`ainl-semantic-tagger` topic tags
303    /// + recurrence); `None` is treated as empty (high-recurrence fallback), not the latest episode text.
304    pub fn compile_memory_context_for(
305        &self,
306        user_message: Option<&str>,
307    ) -> Result<MemoryContext, String> {
308        if self.config.agent_id.is_empty() {
309            return Err("RuntimeConfig.agent_id must be set".to_string());
310        }
311        self.memory.with(|m| {
312        let store = m.sqlite_store();
313        let q = store.query(&self.config.agent_id);
314        let recent_episodes = q.recent_episodes(5)?;
315        let all_semantic = q.semantic_nodes()?;
316        let relevant_semantic = self.relevant_semantic_nodes(
317            user_message.unwrap_or(""),
318            all_semantic,
319            10,
320        );
321        let active_patches = q.active_patches()?;
322        let persona_snapshot = persona_snapshot_if_evolved(&self.extractor);
323        Ok(MemoryContext {
324            recent_episodes,
325            relevant_semantic,
326            active_patches,
327            persona_snapshot,
328            compiled_at: chrono::Utc::now(),
329        })
330        })
331    }
332
333    /// Route `EMIT_TO` edges from an episode to hook targets (host implements [`TurnHooks::on_emit`]).
334    pub fn route_emit_edges(
335        &self,
336        episode_id: Uuid,
337        turn_output_payload: &serde_json::Value,
338    ) -> Result<(), String> {
339        self.memory.with(|m| {
340            let store = m.sqlite_store();
341            let neighbors = store
342                .query(&self.config.agent_id)
343                .neighbors(episode_id, EMIT_TO_EDGE)?;
344            for n in neighbors {
345                let target = emit_target_name(&n);
346                self.hooks.on_emit(&target, turn_output_payload);
347            }
348            Ok(())
349        })
350    }
351
352    /// Full single-turn orchestration (no LLM / no IR parse).
353    pub fn run_turn(&mut self, input: TurnInput) -> Result<TurnOutcome, AinlRuntimeError> {
354        let depth = self.current_depth.fetch_add(1, Ordering::SeqCst);
355        let cd = Arc::clone(&self.current_depth);
356        let _depth_guard = scopeguard::guard((), move |()| {
357            cd.fetch_sub(1, Ordering::SeqCst);
358        });
359
360        if depth >= self.config.max_delegation_depth {
361            return Err(AinlRuntimeError::DelegationDepthExceeded {
362                depth,
363                max: self.config.max_delegation_depth,
364            });
365        }
366
367        if !self.config.enable_graph_memory {
368            let memory_context = MemoryContext::default();
369            let result = TurnResult {
370                memory_context,
371                status: TurnStatus::GraphMemoryDisabled,
372                ..Default::default()
373            };
374            let outcome = TurnOutcome::Complete(result);
375            self.hooks.on_turn_complete(&outcome);
376            return Ok(outcome);
377        }
378
379        if self.config.agent_id.is_empty() {
380            return Err(AinlRuntimeError::Message(
381                "RuntimeConfig.agent_id must be set for run_turn".into(),
382            ));
383        }
384
385        let span = tracing::info_span!(
386            "ainl_runtime.run_turn",
387            agent_id = %self.config.agent_id,
388            turn = self.turn_count,
389            depth = input.depth,
390        );
391        let _span_enter = span.enter();
392
393        let validation: GraphValidationReport = self
394            .memory
395            .with(|m| m.sqlite_store().validate_graph(&self.config.agent_id))
396            .map_err(AinlRuntimeError::from)?;
397        if !validation.is_valid {
398            let mut msg = String::from("graph validation failed before turn");
399            for d in &validation.dangling_edge_details {
400                msg.push_str(&format!(
401                    "; {} -> {} [{}]",
402                    d.source_id, d.target_id, d.edge_type
403                ));
404            }
405            return Err(AinlRuntimeError::Message(msg));
406        }
407
408        self.hooks
409            .on_artifact_loaded(&self.config.agent_id, validation.node_count);
410
411        let mut turn_warnings: Vec<TurnWarning> = Vec::new();
412
413        let t_persona = Instant::now();
414        let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
415            Some(cached.clone())
416        } else {
417            let nodes = self
418                .memory
419                .with(|m| {
420                    m.sqlite_store()
421                        .query(&self.config.agent_id)
422                        .persona_nodes()
423                })
424                .map_err(AinlRuntimeError::from)?;
425            let compiled = compile_persona_from_nodes(&nodes).map_err(AinlRuntimeError::from)?;
426            self.persona_cache = compiled.clone();
427            compiled
428        };
429        self.hooks
430            .on_persona_compiled(persona_prompt_contribution.as_deref());
431        tracing::debug!(
432            target: "ainl_runtime",
433            duration_ms = t_persona.elapsed().as_millis() as u64,
434            has_contribution = persona_prompt_contribution.is_some(),
435            "persona_phase"
436        );
437
438        let t_memory = Instant::now();
439        let memory_context = self
440            .compile_memory_context_for(Some(&input.user_message))
441            .map_err(AinlRuntimeError::from)?;
442        self.hooks.on_memory_context_ready(&memory_context);
443        tracing::debug!(
444            target: "ainl_runtime",
445            duration_ms = t_memory.elapsed().as_millis() as u64,
446            episode_count = memory_context.recent_episodes.len(),
447            semantic_count = memory_context.relevant_semantic.len(),
448            patch_count = memory_context.active_patches.len(),
449            "memory_context"
450        );
451
452        let t_patches = Instant::now();
453        let patch_dispatch_results = if self.config.enable_graph_memory {
454            self.dispatch_patches_collect(
455                &memory_context.active_patches,
456                &input.frame,
457                &mut turn_warnings,
458            )
459        } else {
460            Vec::new()
461        };
462        for r in &patch_dispatch_results {
463            tracing::debug!(
464                target: "ainl_runtime",
465                label = %r.label,
466                dispatched = r.dispatched,
467                fitness_before = r.fitness_before,
468                fitness_after = r.fitness_after,
469                "patch_dispatch"
470            );
471        }
472        tracing::debug!(
473            target: "ainl_runtime",
474            duration_ms = t_patches.elapsed().as_millis() as u64,
475            "patch_dispatch_phase"
476        );
477
478        let dispatched_count = patch_dispatch_results
479            .iter()
480            .filter(|r| r.dispatched)
481            .count() as u32;
482        if dispatched_count >= self.config.max_steps {
483            let result = TurnResult {
484                patch_dispatch_results,
485                memory_context,
486                persona_prompt_contribution,
487                steps_executed: dispatched_count,
488                status: TurnStatus::StepLimitExceeded {
489                    steps_executed: dispatched_count,
490                },
491                ..Default::default()
492            };
493            let outcome = TurnOutcome::Complete(result);
494            self.hooks.on_turn_complete(&outcome);
495            return Ok(outcome);
496        }
497
498        let t_episode = Instant::now();
499        let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
500        let episode_id = match self.memory.with(|m| {
501            record_turn_episode(m, &self.config.agent_id, &input, &tools_canonical)
502        }) {
503            Ok(id) => id,
504            Err(e) => {
505                tracing::warn!(
506                    phase = ?TurnPhase::EpisodeWrite,
507                    error = %e,
508                    "non-fatal turn write failed — continuing"
509                );
510                turn_warnings.push(TurnWarning {
511                    phase: TurnPhase::EpisodeWrite,
512                    error: e,
513                });
514                Uuid::nil()
515            }
516        };
517        self.hooks.on_episode_recorded(episode_id);
518        tracing::debug!(
519            target: "ainl_runtime",
520            duration_ms = t_episode.elapsed().as_millis() as u64,
521            episode_id = %episode_id,
522            "episode_record"
523        );
524
525        if !episode_id.is_nil() {
526            for &tid in &input.emit_targets {
527                if let Err(e) = self.memory.with(|m| {
528                    m.sqlite_store()
529                        .insert_graph_edge_checked(episode_id, tid, EMIT_TO_EDGE)
530                }) {
531                    tracing::warn!(
532                        phase = ?TurnPhase::EpisodeWrite,
533                        error = %e,
534                        "non-fatal turn write failed — continuing"
535                    );
536                    turn_warnings.push(TurnWarning {
537                        phase: TurnPhase::EpisodeWrite,
538                        error: e,
539                    });
540                }
541            }
542        }
543
544        let emit_payload = serde_json::json!({
545            "episode_id": episode_id.to_string(),
546            "user_message": input.user_message,
547            "tools_invoked": tools_canonical,
548            "persona_contribution": persona_prompt_contribution,
549            "turn_count": self.turn_count.wrapping_add(1),
550        });
551        if let Err(e) = self.route_emit_edges(episode_id, &emit_payload) {
552            tracing::warn!(
553                phase = ?TurnPhase::EpisodeWrite,
554                error = %e,
555                "non-fatal turn write failed — continuing"
556            );
557            turn_warnings.push(TurnWarning {
558                phase: TurnPhase::EpisodeWrite,
559                error: format!("emit_routing: {e}"),
560            });
561        }
562
563        self.turn_count = self.turn_count.wrapping_add(1);
564
565        let should_extract = self.config.extraction_interval > 0
566            && self.turn_count.saturating_sub(self.last_extraction_at_turn)
567                >= self.config.extraction_interval as u64;
568
569        let t_extract = Instant::now();
570        let (extraction_report, _extraction_failed) = if should_extract {
571            let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
572
573            let res = if force_fail {
574                let e = "test_forced".to_string();
575                tracing::warn!(
576                    phase = ?TurnPhase::ExtractionPass,
577                    error = %e,
578                    "non-fatal turn write failed — continuing"
579                );
580                turn_warnings.push(TurnWarning {
581                    phase: TurnPhase::ExtractionPass,
582                    error: e,
583                });
584                tracing::debug!(
585                    target: "ainl_runtime",
586                    duration_ms = t_extract.elapsed().as_millis() as u64,
587                    signals_ingested = 0u64,
588                    skipped = false,
589                    "extraction_pass"
590                );
591                (None, true)
592            } else {
593                let report = self
594                    .memory
595                    .with(|m| self.extractor.run_pass(m.sqlite_store()));
596                if let Some(ref e) = report.extract_error {
597                    tracing::warn!(
598                        phase = ?TurnPhase::ExtractionPass,
599                        error = %e,
600                        "non-fatal turn write failed — continuing"
601                    );
602                    turn_warnings.push(TurnWarning {
603                        phase: TurnPhase::ExtractionPass,
604                        error: e.clone(),
605                    });
606                }
607                if let Some(ref e) = report.pattern_error {
608                    tracing::warn!(
609                        phase = ?TurnPhase::PatternPersistence,
610                        error = %e,
611                        "non-fatal turn write failed — continuing"
612                    );
613                    turn_warnings.push(TurnWarning {
614                        phase: TurnPhase::PatternPersistence,
615                        error: e.clone(),
616                    });
617                }
618                if let Some(ref e) = report.persona_error {
619                    tracing::warn!(
620                        phase = ?TurnPhase::PersonaEvolution,
621                        error = %e,
622                        "non-fatal turn write failed — continuing"
623                    );
624                    turn_warnings.push(TurnWarning {
625                        phase: TurnPhase::PersonaEvolution,
626                        error: e.clone(),
627                    });
628                }
629                let extraction_failed = report.has_errors();
630                if !extraction_failed {
631                    tracing::info!(
632                        agent_id = %report.agent_id,
633                        signals_extracted = report.signals_extracted,
634                        signals_applied = report.signals_applied,
635                        semantic_nodes_updated = report.semantic_nodes_updated,
636                        "ainl-graph-extractor pass completed (scheduled)"
637                    );
638                }
639                self.hooks.on_extraction_complete(&report);
640                self.persona_cache = None;
641                tracing::debug!(
642                    target: "ainl_runtime",
643                    duration_ms = t_extract.elapsed().as_millis() as u64,
644                    signals_ingested = report.signals_extracted as u64,
645                    skipped = false,
646                    "extraction_pass"
647                );
648                (Some(report), extraction_failed)
649            };
650            self.last_extraction_at_turn = self.turn_count;
651            res
652        } else {
653            tracing::debug!(
654                target: "ainl_runtime",
655                duration_ms = t_extract.elapsed().as_millis() as u64,
656                signals_ingested = 0u64,
657                skipped = true,
658                "extraction_pass"
659            );
660            (None, false)
661        };
662
663        if let Err(e) = self.memory.with(|m| {
664            try_export_graph_json_armaraos(m.sqlite_store(), &self.config.agent_id)
665        }) {
666            tracing::warn!(
667                phase = ?TurnPhase::ExportRefresh,
668                error = %e,
669                "non-fatal turn write failed — continuing"
670            );
671            turn_warnings.push(TurnWarning {
672                phase: TurnPhase::ExportRefresh,
673                error: e,
674            });
675        }
676
677        if !self.config.agent_id.is_empty() {
678            let state = RuntimeStateNode {
679                agent_id: self.config.agent_id.clone(),
680                turn_count: self.turn_count,
681                last_extraction_at_turn: self.last_extraction_at_turn,
682                persona_snapshot_json: self
683                    .persona_cache
684                    .as_ref()
685                    .and_then(|p| serde_json::to_string(p).ok()),
686                updated_at: chrono::Utc::now().timestamp(),
687            };
688            let write_res = if std::mem::take(&mut self.test_force_runtime_state_write_failure) {
689                Err("injected runtime state write failure".to_string())
690            } else {
691                self.memory.with(|m| m.write_runtime_state(&state))
692            };
693            if let Err(e) = write_res {
694                tracing::warn!(
695                    phase = ?TurnPhase::RuntimeStatePersist,
696                    error = %e,
697                    "failed to persist runtime state — cadence will reset on next restart"
698                );
699                turn_warnings.push(TurnWarning {
700                    phase: TurnPhase::RuntimeStatePersist,
701                    error: e,
702                });
703            }
704        }
705
706        let result = TurnResult {
707            episode_id,
708            persona_prompt_contribution,
709            memory_context,
710            extraction_report,
711            steps_executed: dispatched_count,
712            patch_dispatch_results,
713            status: TurnStatus::Ok,
714        };
715
716        let outcome = if turn_warnings.is_empty() {
717            TurnOutcome::Complete(result)
718        } else {
719            TurnOutcome::PartialSuccess {
720                result,
721                warnings: turn_warnings,
722            }
723        };
724
725        self.hooks.on_turn_complete(&outcome);
726        Ok(outcome)
727    }
728
729    /// Score and rank semantic nodes from `user_message` via `infer_topic_tags` (topic overlap +
730    /// recurrence tiebreaker), or high-recurrence fallback when the message is empty or yields no topic tags.
731    fn relevant_semantic_nodes(
732        &self,
733        user_message: &str,
734        all_semantic: Vec<AinlMemoryNode>,
735        limit: usize,
736    ) -> Vec<AinlMemoryNode> {
737        let user_tags = infer_topic_tags(user_message);
738        let user_topics: HashSet<String> = user_tags
739            .iter()
740            .filter(|t| t.namespace == TagNamespace::Topic)
741            .map(|t| t.value.to_lowercase())
742            .collect();
743
744        if user_message.trim().is_empty() || user_topics.is_empty() {
745            return fallback_high_recurrence_semantic(all_semantic, limit);
746        }
747
748        let mut scored: Vec<(f32, u32, AinlMemoryNode)> = Vec::new();
749        for n in all_semantic {
750            let (score, rec) = match &n.node_type {
751                AinlNodeType::Semantic { semantic } => {
752                    let mut s = 0f32;
753                    if let Some(cluster) = &semantic.topic_cluster {
754                        for slug in cluster
755                            .split([',', ';'])
756                            .map(|s| s.trim().to_lowercase())
757                            .filter(|s| !s.is_empty())
758                        {
759                            if user_topics.contains(&slug) {
760                                s += 1.0;
761                            }
762                        }
763                    }
764                    if s == 0.0 {
765                        for tag in &semantic.tags {
766                            let tl = tag.to_lowercase();
767                            if let Some(rest) = tl.strip_prefix("topic:") {
768                                let slug = rest.trim().to_lowercase();
769                                if user_topics.contains(&slug) {
770                                    s = 0.5;
771                                    break;
772                                }
773                            }
774                        }
775                    }
776                    (s, semantic.recurrence_count)
777                }
778                _ => (0.0, 0),
779            };
780            scored.push((score, rec, n));
781        }
782
783        scored.sort_by(|a, b| {
784            b.0.partial_cmp(&a.0)
785                .unwrap_or(std::cmp::Ordering::Equal)
786                .then_with(|| b.1.cmp(&a.1))
787        });
788        scored.into_iter().take(limit).map(|t| t.2).collect()
789    }
790
791    pub fn dispatch_patches(
792        &mut self,
793        patches: &[AinlMemoryNode],
794        frame: &HashMap<String, serde_json::Value>,
795    ) -> Vec<PatchDispatchResult> {
796        let mut w = Vec::new();
797        self.dispatch_patches_collect(patches, frame, &mut w)
798    }
799
800    fn dispatch_patches_collect(
801        &mut self,
802        patches: &[AinlMemoryNode],
803        frame: &HashMap<String, serde_json::Value>,
804        turn_warnings: &mut Vec<TurnWarning>,
805    ) -> Vec<PatchDispatchResult> {
806        let mut out = Vec::new();
807        for node in patches {
808            let res = self.dispatch_one_patch(node, frame);
809            if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
810                tracing::warn!(
811                    phase = ?TurnPhase::FitnessWriteBack,
812                    error = %e,
813                    "non-fatal turn write failed — continuing"
814                );
815                turn_warnings.push(TurnWarning {
816                    phase: TurnPhase::FitnessWriteBack,
817                    error: format!("{}: {}", res.label, e),
818                });
819            }
820            out.push(res);
821        }
822        out
823    }
824
825    fn dispatch_one_patch(
826        &mut self,
827        node: &AinlMemoryNode,
828        frame: &HashMap<String, serde_json::Value>,
829    ) -> PatchDispatchResult {
830        let label_default = String::new();
831        let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
832            AinlNodeType::Procedural { procedural } => (
833                procedural_label(procedural),
834                procedural.patch_version,
835                procedural.retired,
836                procedural.declared_reads.clone(),
837                procedural.fitness,
838            ),
839            _ => {
840                return PatchDispatchResult {
841                    label: label_default,
842                    patch_version: 0,
843                    fitness_before: 0.0,
844                    fitness_after: 0.0,
845                    dispatched: false,
846                    skip_reason: Some(PatchSkipReason::NotProcedural),
847                    adapter_output: None,
848                    adapter_name: None,
849                };
850            }
851        };
852
853        if pv == 0 {
854            return PatchDispatchResult {
855                label: label_src,
856                patch_version: pv,
857                fitness_before: fitness_opt.unwrap_or(0.5),
858                fitness_after: fitness_opt.unwrap_or(0.5),
859                dispatched: false,
860                skip_reason: Some(PatchSkipReason::ZeroVersion),
861                adapter_output: None,
862                adapter_name: None,
863            };
864        }
865        if retired {
866            return PatchDispatchResult {
867                label: label_src.clone(),
868                patch_version: pv,
869                fitness_before: fitness_opt.unwrap_or(0.5),
870                fitness_after: fitness_opt.unwrap_or(0.5),
871                dispatched: false,
872                skip_reason: Some(PatchSkipReason::Retired),
873                adapter_output: None,
874                adapter_name: None,
875            };
876        }
877        for key in &reads {
878            if !frame.contains_key(key) {
879                return PatchDispatchResult {
880                    label: label_src.clone(),
881                    patch_version: pv,
882                    fitness_before: fitness_opt.unwrap_or(0.5),
883                    fitness_after: fitness_opt.unwrap_or(0.5),
884                    dispatched: false,
885                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
886                    adapter_output: None,
887                    adapter_name: None,
888                };
889            }
890        }
891
892        let patch_label = label_src.clone();
893        let adapter_key = patch_label.as_str();
894        let ctx = PatchDispatchContext {
895            patch_label: adapter_key,
896            node,
897            frame,
898        };
899        let (adapter_output, adapter_name) = if let Some(adapter) = self
900            .adapter_registry
901            .get(adapter_key)
902            .or_else(|| self.adapter_registry.get(GraphPatchAdapter::NAME))
903        {
904            let aname = adapter.name().to_string();
905            match adapter.execute_patch(&ctx) {
906                Ok(output) => {
907                    tracing::debug!(
908                        label = %patch_label,
909                        adapter = %aname,
910                        "adapter executed patch"
911                    );
912                    (Some(output), Some(aname))
913                }
914                Err(e) => {
915                    tracing::warn!(
916                        label = %patch_label,
917                        adapter = %aname,
918                        error = %e,
919                        "adapter execution failed — continuing as metadata dispatch"
920                    );
921                    (None, Some(aname))
922                }
923            }
924        } else {
925            (None, None)
926        };
927
928        let fitness_before = fitness_opt.unwrap_or(0.5);
929        let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
930
931        let updated = match self.memory.with(|m| {
932            let store = m.sqlite_store();
933            store.read_node(node.id)
934        }) {
935            Ok(Some(mut n)) => {
936                if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
937                    procedural.fitness = Some(fitness_after);
938                }
939                n
940            }
941            Ok(None) => {
942                return PatchDispatchResult {
943                    label: label_src,
944                    patch_version: pv,
945                    fitness_before,
946                    fitness_after: fitness_before,
947                    dispatched: false,
948                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead("node_row".into())),
949                    adapter_output,
950                    adapter_name,
951                };
952            }
953            Err(e) => {
954                return PatchDispatchResult {
955                    label: label_src,
956                    patch_version: pv,
957                    fitness_before,
958                    fitness_after: fitness_before,
959                    dispatched: false,
960                    skip_reason: Some(PatchSkipReason::PersistFailed(e)),
961                    adapter_output,
962                    adapter_name,
963                };
964            }
965        };
966
967        if self.test_force_fitness_write_failure {
968            self.test_force_fitness_write_failure = false;
969            let e = "injected fitness write failure".to_string();
970            return PatchDispatchResult {
971                label: label_src.clone(),
972                patch_version: pv,
973                fitness_before,
974                fitness_after: fitness_before,
975                dispatched: false,
976                skip_reason: Some(PatchSkipReason::PersistFailed(e)),
977                adapter_output,
978                adapter_name,
979            };
980        }
981
982        if let Err(e) = self.memory.with(|m| m.write_node(&updated)) {
983            return PatchDispatchResult {
984                label: label_src.clone(),
985                patch_version: pv,
986                fitness_before,
987                fitness_after: fitness_before,
988                dispatched: false,
989                skip_reason: Some(PatchSkipReason::PersistFailed(e)),
990                adapter_output,
991                adapter_name,
992            };
993        }
994
995        self.hooks
996            .on_patch_dispatched(label_src.as_str(), fitness_after);
997
998        PatchDispatchResult {
999            label: label_src,
1000            patch_version: pv,
1001            fitness_before,
1002            fitness_after,
1003            dispatched: true,
1004            skip_reason: None,
1005            adapter_output,
1006            adapter_name,
1007        }
1008    }
1009}
1010
1011pub(crate) fn emit_target_name(n: &AinlMemoryNode) -> String {
1012    match &n.node_type {
1013        AinlNodeType::Persona { persona } => persona.trait_name.clone(),
1014        AinlNodeType::Procedural { procedural } => procedural_label(procedural),
1015        AinlNodeType::Semantic { semantic } => semantic.fact.chars().take(64).collect(),
1016        AinlNodeType::Episode { episodic } => episodic.turn_id.to_string(),
1017        AinlNodeType::RuntimeState { runtime_state } => {
1018            format!("runtime_state:{}", runtime_state.agent_id)
1019        }
1020    }
1021}
1022
1023pub(crate) fn procedural_label(p: &ProceduralNode) -> String {
1024    if !p.label.is_empty() {
1025        p.label.clone()
1026    } else {
1027        p.pattern_name.clone()
1028    }
1029}
1030
1031pub(crate) fn fallback_high_recurrence_semantic(
1032    all: Vec<AinlMemoryNode>,
1033    limit: usize,
1034) -> Vec<AinlMemoryNode> {
1035    let mut v: Vec<_> = all
1036        .into_iter()
1037        .filter(|n| {
1038            matches!(&n.node_type, AinlNodeType::Semantic { semantic } if semantic.recurrence_count >= 2)
1039        })
1040        .collect();
1041    v.sort_by(|a, b| {
1042        let ra = match &a.node_type {
1043            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1044            _ => 0,
1045        };
1046        let rb = match &b.node_type {
1047            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
1048            _ => 0,
1049        };
1050        rb.cmp(&ra)
1051    });
1052    v.into_iter().take(limit).collect()
1053}
1054
1055pub(crate) fn persona_snapshot_if_evolved(
1056    extractor: &GraphExtractorTask,
1057) -> Option<ainl_persona::PersonaSnapshot> {
1058    let snap = extractor.evolution_engine.snapshot();
1059    let defaults = default_axis_map(0.5);
1060    for axis in PersonaAxis::ALL {
1061        let s = snap.axes.get(&axis).map(|a| a.score).unwrap_or(0.5);
1062        let d = defaults.get(&axis).map(|a| a.score).unwrap_or(0.5);
1063        if (s - d).abs() > INGEST_SCORE_EPSILON {
1064            return Some(snap);
1065        }
1066    }
1067    None
1068}
1069
1070pub(crate) fn compile_persona_from_nodes(
1071    nodes: &[AinlMemoryNode],
1072) -> Result<Option<String>, String> {
1073    if nodes.is_empty() {
1074        return Ok(None);
1075    }
1076    let mut lines = Vec::new();
1077    for n in nodes {
1078        if let AinlNodeType::Persona { persona } = &n.node_type {
1079            lines.push(format_persona_line(persona));
1080        }
1081    }
1082    if lines.is_empty() {
1083        Ok(None)
1084    } else {
1085        Ok(Some(lines.join("\n")))
1086    }
1087}
1088
1089fn format_persona_line(p: &PersonaNode) -> String {
1090    format!(
1091        "- {} (strength {:.2}, layer {:?}, source {:?})",
1092        p.trait_name, p.strength, p.layer, p.source
1093    )
1094}
1095
1096/// Canonical tool names for episodic storage: [`tag_tool_names`] → `TagNamespace::Tool` values,
1097/// deduplicated and sorted (lexicographic). Empty input yields `["turn"]` (same sentinel as before).
1098/// Refresh `{AINL_GRAPH_MEMORY_ARMARAOS_EXPORT}/{agent_id}_graph_export.json` when the env var is set.
1099pub(crate) fn try_export_graph_json_armaraos(
1100    store: &SqliteGraphStore,
1101    agent_id: &str,
1102) -> Result<(), String> {
1103    let trimmed = std::env::var("AINL_GRAPH_MEMORY_ARMARAOS_EXPORT").unwrap_or_default();
1104    let dir = trimmed.trim();
1105    if dir.is_empty() {
1106        return Ok(());
1107    }
1108    let dir_path = PathBuf::from(dir);
1109    std::fs::create_dir_all(&dir_path).map_err(|e| format!("export mkdir: {e}"))?;
1110    let path = dir_path.join(format!("{agent_id}_graph_export.json"));
1111    let snap = store.export_graph(agent_id)?;
1112    let v = serde_json::to_value(&snap).map_err(|e| format!("serialize: {e}"))?;
1113    std::fs::write(
1114        &path,
1115        serde_json::to_vec_pretty(&v).map_err(|e| format!("json encode: {e}"))?,
1116    )
1117    .map_err(|e| format!("write export: {e}"))?;
1118    Ok(())
1119}
1120
1121pub(crate) fn normalize_tools_for_episode(tools_invoked: &[String]) -> Vec<String> {
1122    if tools_invoked.is_empty() {
1123        return vec!["turn".to_string()];
1124    }
1125    let tags = tag_tool_names(tools_invoked);
1126    let mut seen: BTreeSet<String> = BTreeSet::new();
1127    for t in tags {
1128        if t.namespace == TagNamespace::Tool {
1129            seen.insert(t.value);
1130        }
1131    }
1132    if seen.is_empty() {
1133        vec!["turn".to_string()]
1134    } else {
1135        seen.into_iter().collect()
1136    }
1137}
1138
1139pub(crate) fn record_turn_episode(
1140    memory: &ainl_memory::GraphMemory,
1141    agent_id: &str,
1142    input: &TurnInput,
1143    tools_invoked_canonical: &[String],
1144) -> Result<Uuid, String> {
1145    let turn_id = Uuid::new_v4();
1146    let timestamp = chrono::Utc::now().timestamp();
1147    let tools = tools_invoked_canonical.to_vec();
1148    let mut node = AinlMemoryNode::new_episode(
1149        turn_id,
1150        timestamp,
1151        tools.clone(),
1152        None,
1153        input.trace_event.clone(),
1154    );
1155    node.agent_id = agent_id.to_string();
1156    if let AinlNodeType::Episode { ref mut episodic } = node.node_type {
1157        episodic.user_message = Some(input.user_message.clone());
1158        episodic.tools_invoked = tools;
1159    }
1160    memory.write_node(&node)?;
1161    Ok(node.id)
1162}
1163
1164#[cfg(feature = "async")]
1165#[path = "runtime_async.rs"]
1166mod runtime_async_impl;