Skip to main content

ainl_runtime/
runtime.rs

1//! Unified-graph orchestration runtime (v0.2): load, compile context, patch dispatch, record, emit, extract.
2
3use std::collections::{BTreeSet, HashMap, HashSet};
4use std::time::Instant;
5
6use ainl_graph_extractor::GraphExtractorTask;
7use ainl_memory::{
8    AinlMemoryNode, AinlNodeType, GraphStore, GraphValidationReport, PersonaNode, ProceduralNode,
9    RuntimeStateNode, SqliteGraphStore,
10};
11use ainl_persona::axes::default_axis_map;
12use ainl_persona::{EvolutionEngine, PersonaAxis, PersonaSnapshot, RawSignal, INGEST_SCORE_EPSILON};
13use ainl_semantic_tagger::infer_topic_tags;
14use ainl_semantic_tagger::tag_tool_names;
15use ainl_semantic_tagger::TagNamespace;
16use uuid::Uuid;
17
18use crate::adapters::AdapterRegistry;
19use crate::engine::{
20    AinlGraphArtifact, MemoryContext, PatchDispatchResult, PatchSkipReason, TurnInput, TurnOutcome,
21    TurnOutput, EMIT_TO_EDGE,
22};
23use crate::hooks::{NoOpHooks, TurnHooks};
24use crate::RuntimeConfig;
25
26/// Orchestrates ainl-memory, persona snapshot state, and graph extraction for one agent.
27pub struct AinlRuntime {
28    config: RuntimeConfig,
29    memory: ainl_memory::GraphMemory,
30    extractor: GraphExtractorTask,
31    turn_count: u32,
32    last_extraction_turn: u32,
33    delegation_depth: u32,
34    hooks: Box<dyn TurnHooks>,
35    persona_cache: Option<String>,
36    /// Test hook: when set, the next scheduled extraction pass is treated as failed (`PartialSuccess`).
37    #[doc(hidden)]
38    test_force_extraction_failure: bool,
39    adapter_registry: AdapterRegistry,
40}
41
42impl AinlRuntime {
43    pub fn new(config: RuntimeConfig, store: SqliteGraphStore) -> Self {
44        let agent_id = config.agent_id.clone();
45        let memory = ainl_memory::GraphMemory::from_sqlite_store(store);
46        let (init_turn_count, init_persona_cache, init_last_extraction_turn) = if agent_id.is_empty()
47        {
48            (0, None, 0)
49        } else {
50            match memory.sqlite_store().load_runtime_state(&agent_id) {
51                Ok(Some(state)) => {
52                    tracing::info!(
53                        agent_id = %agent_id,
54                        turn_count = state.turn_count,
55                        "restored runtime state"
56                    );
57                    (
58                        state.turn_count,
59                        state.last_persona_prompt,
60                        state.last_extraction_turn,
61                    )
62                }
63                Ok(None) => (0, None, 0),
64                Err(e) => {
65                    tracing::warn!(error = %e, "failed to load runtime state — starting fresh");
66                    (0, None, 0)
67                }
68            }
69        };
70        Self {
71            extractor: GraphExtractorTask::new(&agent_id),
72            memory,
73            config,
74            turn_count: init_turn_count,
75            last_extraction_turn: init_last_extraction_turn,
76            delegation_depth: 0,
77            hooks: Box::new(NoOpHooks),
78            persona_cache: init_persona_cache,
79            test_force_extraction_failure: false,
80            adapter_registry: AdapterRegistry::new(),
81        }
82    }
83
84    /// Register a [`crate::PatchAdapter`] keyed by [`PatchAdapter::name`] (e.g. procedural patch label).
85    pub fn register_adapter(&mut self, adapter: impl crate::PatchAdapter + 'static) {
86        self.adapter_registry.register(adapter);
87    }
88
89    /// Names of currently registered patch adapters.
90    pub fn registered_adapters(&self) -> Vec<&str> {
91        self.adapter_registry.registered_names()
92    }
93
94    #[doc(hidden)]
95    pub fn test_turn_count(&self) -> u32 {
96        self.turn_count
97    }
98
99    #[doc(hidden)]
100    pub fn test_delegation_depth(&self) -> u32 {
101        self.delegation_depth
102    }
103
104    #[doc(hidden)]
105    pub fn test_set_delegation_depth(&mut self, depth: u32) {
106        self.delegation_depth = depth;
107    }
108
109    #[doc(hidden)]
110    pub fn test_set_force_extraction_failure(&mut self, fail: bool) {
111        self.test_force_extraction_failure = fail;
112    }
113
114    pub fn with_hooks(mut self, hooks: impl TurnHooks + 'static) -> Self {
115        self.hooks = Box::new(hooks);
116        self
117    }
118
119    /// Borrow the backing SQLite store (same connection as graph memory).
120    pub fn sqlite_store(&self) -> &SqliteGraphStore {
121        self.memory.sqlite_store()
122    }
123
124    /// Borrow the persona [`EvolutionEngine`] for this runtime’s agent.
125    ///
126    /// This is the **same** `EvolutionEngine` instance held by [`GraphExtractorTask::evolution_engine`].
127    /// Scheduled [`GraphExtractorTask::run_pass`] continues to feed graph + pattern signals into it;
128    /// hosts may also call [`EvolutionEngine::ingest_signals`], [`EvolutionEngine::correction_tick`],
129    /// [`EvolutionEngine::extract_signals`], or [`EvolutionEngine::evolve`] directly, then
130    /// [`Self::persist_evolution_snapshot`] to write the [`PersonaSnapshot`] row ([`crate::EVOLUTION_TRAIT_NAME`]).
131    pub fn evolution_engine(&self) -> &EvolutionEngine {
132        &self.extractor.evolution_engine
133    }
134
135    /// Mutable access to the persona [`EvolutionEngine`] (see [`Self::evolution_engine`]).
136    pub fn evolution_engine_mut(&mut self) -> &mut EvolutionEngine {
137        &mut self.extractor.evolution_engine
138    }
139
140    /// Ingest explicit [`RawSignal`]s without reading the graph (wrapper for [`EvolutionEngine::ingest_signals`]).
141    pub fn apply_evolution_signals(&mut self, signals: Vec<RawSignal>) -> usize {
142        self.extractor.evolution_engine.ingest_signals(signals)
143    }
144
145    /// Apply a host correction nudge on one axis ([`EvolutionEngine::correction_tick`]).
146    pub fn evolution_correction_tick(&mut self, axis: PersonaAxis, correction: f32) {
147        self.extractor.evolution_engine.correction_tick(axis, correction);
148    }
149
150    /// Snapshot current axis EMA state and persist the evolution persona bundle to the store.
151    pub fn persist_evolution_snapshot(&mut self) -> Result<PersonaSnapshot, String> {
152        let store = self.memory.sqlite_store();
153        let snap = self.extractor.evolution_engine.snapshot();
154        self.extractor
155            .evolution_engine
156            .write_persona_node(store, &snap)?;
157        Ok(snap)
158    }
159
160    /// Graph-backed evolution only: extract signals from the store, ingest, write ([`EvolutionEngine::evolve`]).
161    ///
162    /// This does **not** run semantic `recurrence_count` bumps or the extractor’s `extract_pass`
163    /// heuristics — use [`GraphExtractorTask::run_pass`] for the full scheduled pipeline.
164    pub fn evolve_persona_from_graph_signals(&mut self) -> Result<PersonaSnapshot, String> {
165        let store = self.memory.sqlite_store();
166        self.extractor.evolution_engine.evolve(store)
167    }
168
169    /// Boot: export + validate the agent subgraph.
170    pub fn load_artifact(&self) -> Result<AinlGraphArtifact, String> {
171        AinlGraphArtifact::load(self.memory.sqlite_store(), &self.config.agent_id)
172    }
173
174    /// Same as [`Self::compile_memory_context_for`] with `user_message: None` (semantic relevance falls back
175    /// to the latest episode’s `user_message` when present).
176    pub fn compile_memory_context(&self) -> Result<MemoryContext, String> {
177        self.compile_memory_context_for(None)
178    }
179
180    /// Build [`MemoryContext`] from the live store plus current extractor axis state.
181    pub fn compile_memory_context_for(&self, user_message: Option<&str>) -> Result<MemoryContext, String> {
182        if self.config.agent_id.is_empty() {
183            return Err("RuntimeConfig.agent_id must be set".to_string());
184        }
185        let store = self.memory.sqlite_store();
186        let q = store.query(&self.config.agent_id);
187        let recent_episodes = q.recent_episodes(5)?;
188        let effective_user = user_message
189            .map(|s| s.to_string())
190            .filter(|s| !s.is_empty())
191            .or_else(|| {
192                recent_episodes.first().and_then(|n| {
193                    if let AinlNodeType::Episode { episodic } = &n.node_type {
194                        episodic.user_message.clone().filter(|m| !m.is_empty())
195                    } else {
196                        None
197                    }
198                })
199            });
200        let all_semantic = q.semantic_nodes()?;
201        let relevant_semantic = match effective_user.as_deref() {
202            Some(msg) => self.relevant_semantic_nodes(msg, all_semantic, 10),
203            None => fallback_high_recurrence_semantic(all_semantic, 10),
204        };
205        let active_patches = q.active_patches()?;
206        let persona_snapshot = persona_snapshot_if_evolved(&self.extractor);
207        Ok(MemoryContext {
208            recent_episodes,
209            relevant_semantic,
210            active_patches,
211            persona_snapshot,
212            compiled_at: chrono::Utc::now(),
213        })
214    }
215
216    /// Route `EMIT_TO` edges from an episode to hook targets (host implements [`TurnHooks::on_emit`]).
217    pub fn route_emit_edges(
218        &self,
219        episode_id: Uuid,
220        turn_output_payload: &serde_json::Value,
221    ) -> Result<(), String> {
222        let store = self.memory.sqlite_store();
223        let neighbors = store
224            .query(&self.config.agent_id)
225            .neighbors(episode_id, EMIT_TO_EDGE)?;
226        for n in neighbors {
227            let target = emit_target_name(&n);
228            self.hooks.on_emit(&target, turn_output_payload);
229        }
230        Ok(())
231    }
232
233    /// Full single-turn orchestration (no LLM / no IR parse).
234    pub fn run_turn(&mut self, input: TurnInput) -> Result<TurnOutput, String> {
235        self.delegation_depth += 1;
236        let rt_ptr = self as *mut Self;
237        // Safety: `rt_ptr` aliases `self` for the synchronous body of `run_turn` only; the defer runs on
238        // return before `self` is invalidated.
239        // `scopeguard::defer!` only supports expression statements; use `guard` for the same drop semantics.
240        let _depth_guard = scopeguard::guard((), |()| unsafe {
241            if (*rt_ptr).delegation_depth > 0 {
242                (*rt_ptr).delegation_depth -= 1;
243            }
244        });
245
246        if self.delegation_depth > self.config.max_delegation_depth {
247            let out = TurnOutput {
248                outcome: TurnOutcome::DepthLimitExceeded,
249                ..Default::default()
250            };
251            self.hooks.on_turn_complete(&out);
252            return Ok(out);
253        }
254
255        if !self.config.enable_graph_memory {
256            let memory_context = MemoryContext::default();
257            let out = TurnOutput {
258                memory_context,
259                outcome: TurnOutcome::GraphMemoryDisabled,
260                ..Default::default()
261            };
262            self.hooks.on_turn_complete(&out);
263            return Ok(out);
264        }
265
266        if self.config.agent_id.is_empty() {
267            return Err("RuntimeConfig.agent_id must be set for run_turn".to_string());
268        }
269
270        let span = tracing::info_span!(
271            "ainl_runtime.run_turn",
272            agent_id = %self.config.agent_id,
273            turn = self.turn_count,
274            depth = input.depth,
275        );
276        let _span_enter = span.enter();
277
278        let validation: GraphValidationReport = self
279            .memory
280            .sqlite_store()
281            .validate_graph(&self.config.agent_id)?;
282        if !validation.is_valid {
283            let mut msg = String::from("graph validation failed before turn");
284            for d in &validation.dangling_edge_details {
285                msg.push_str(&format!(
286                    "; {} -> {} [{}]",
287                    d.source_id, d.target_id, d.edge_type
288                ));
289            }
290            return Err(msg);
291        }
292
293        self.hooks
294            .on_artifact_loaded(&self.config.agent_id, validation.node_count);
295
296        let mut patches_failed: Vec<String> = Vec::new();
297        let mut warnings: Vec<String> = Vec::new();
298
299        let t_persona = Instant::now();
300        let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
301            Some(cached.clone())
302        } else {
303            let nodes = self
304                .memory
305                .sqlite_store()
306                .query(&self.config.agent_id)
307                .persona_nodes()?;
308            let compiled = compile_persona_from_nodes(&nodes)?;
309            self.persona_cache = compiled.clone();
310            compiled
311        };
312        self.hooks
313            .on_persona_compiled(persona_prompt_contribution.as_deref());
314        tracing::debug!(
315            target: "ainl_runtime",
316            duration_ms = t_persona.elapsed().as_millis() as u64,
317            has_contribution = persona_prompt_contribution.is_some(),
318            "persona_phase"
319        );
320
321        let t_memory = Instant::now();
322        let memory_context = self.compile_memory_context_for(Some(&input.user_message))?;
323        self.hooks.on_memory_context_ready(&memory_context);
324        tracing::debug!(
325            target: "ainl_runtime",
326            duration_ms = t_memory.elapsed().as_millis() as u64,
327            episode_count = memory_context.recent_episodes.len(),
328            semantic_count = memory_context.relevant_semantic.len(),
329            patch_count = memory_context.active_patches.len(),
330            "memory_context"
331        );
332
333        let t_patches = Instant::now();
334        let patch_dispatch_results = if self.config.enable_graph_memory {
335            self.dispatch_patches_collect(
336                &memory_context.active_patches,
337                &input.frame,
338                &mut patches_failed,
339            )
340        } else {
341            Vec::new()
342        };
343        for r in &patch_dispatch_results {
344            tracing::debug!(
345                target: "ainl_runtime",
346                label = %r.label,
347                dispatched = r.dispatched,
348                fitness_before = r.fitness_before,
349                fitness_after = r.fitness_after,
350                "patch_dispatch"
351            );
352        }
353        tracing::debug!(
354            target: "ainl_runtime",
355            duration_ms = t_patches.elapsed().as_millis() as u64,
356            "patch_dispatch_phase"
357        );
358
359        let dispatched_count = patch_dispatch_results
360            .iter()
361            .filter(|r| r.dispatched)
362            .count() as u32;
363        if dispatched_count >= self.config.max_steps {
364            let out = TurnOutput {
365                patch_dispatch_results,
366                memory_context,
367                persona_prompt_contribution,
368                steps_executed: dispatched_count,
369                outcome: TurnOutcome::StepLimitExceeded {
370                    steps_executed: dispatched_count,
371                },
372                ..Default::default()
373            };
374            self.hooks.on_turn_complete(&out);
375            return Ok(out);
376        }
377
378        let t_episode = Instant::now();
379        let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
380        let episode_id = record_turn_episode(
381            &self.memory,
382            &self.config.agent_id,
383            &input,
384            &tools_canonical,
385        )?;
386        self.hooks.on_episode_recorded(episode_id);
387        tracing::debug!(
388            target: "ainl_runtime",
389            duration_ms = t_episode.elapsed().as_millis() as u64,
390            episode_id = %episode_id,
391            "episode_record"
392        );
393
394        for &tid in &input.emit_targets {
395            self.memory
396                .sqlite_store()
397                .insert_graph_edge_checked(episode_id, tid, EMIT_TO_EDGE)?;
398        }
399
400        let emit_payload = serde_json::json!({
401            "episode_id": episode_id.to_string(),
402            "user_message": input.user_message,
403            "tools_invoked": tools_canonical,
404            "persona_contribution": persona_prompt_contribution,
405            "turn_count": self.turn_count.wrapping_add(1),
406        });
407        if let Err(e) = self.route_emit_edges(episode_id, &emit_payload) {
408            tracing::warn!(error = %e, "emit routing failed — continuing");
409            warnings.push(format!("emit_routing: {e}"));
410        }
411
412        self.turn_count = self.turn_count.wrapping_add(1);
413
414        let should_extract = self.config.extraction_interval > 0
415            && self
416                .turn_count
417                .saturating_sub(self.last_extraction_turn)
418                >= self.config.extraction_interval;
419
420        let t_extract = Instant::now();
421        let (extraction_report, extraction_failed) = if should_extract {
422            let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
423
424            let res = if force_fail {
425                tracing::warn!(error = "test_forced", "extraction pass failed — continuing");
426                tracing::debug!(
427                    target: "ainl_runtime",
428                    duration_ms = t_extract.elapsed().as_millis() as u64,
429                    signals_ingested = 0u64,
430                    skipped = false,
431                    "extraction_pass"
432                );
433                (None, true)
434            } else {
435                match self.extractor.run_pass(self.memory.sqlite_store()) {
436                    Ok(report) => {
437                        tracing::info!(
438                            agent_id = %report.agent_id,
439                            signals_extracted = report.signals_extracted,
440                            signals_applied = report.signals_applied,
441                            semantic_nodes_updated = report.semantic_nodes_updated,
442                            "ainl-graph-extractor pass completed (scheduled)"
443                        );
444                        self.hooks.on_extraction_complete(&report);
445                        self.persona_cache = None;
446                        tracing::debug!(
447                            target: "ainl_runtime",
448                            duration_ms = t_extract.elapsed().as_millis() as u64,
449                            signals_ingested = report.signals_extracted as u64,
450                            skipped = false,
451                            "extraction_pass"
452                        );
453                        (Some(report), false)
454                    }
455                    Err(e) => {
456                        tracing::warn!(error = %e, "extraction pass failed — continuing");
457                        tracing::debug!(
458                            target: "ainl_runtime",
459                            duration_ms = t_extract.elapsed().as_millis() as u64,
460                            signals_ingested = 0u64,
461                            skipped = false,
462                            "extraction_pass"
463                        );
464                        (None, true)
465                    }
466                }
467            };
468            self.last_extraction_turn = self.turn_count;
469            res
470        } else {
471            tracing::debug!(
472                target: "ainl_runtime",
473                duration_ms = t_extract.elapsed().as_millis() as u64,
474                signals_ingested = 0u64,
475                skipped = true,
476                "extraction_pass"
477            );
478            (None, false)
479        };
480
481        let outcome = if extraction_failed
482            || !patches_failed.is_empty()
483            || !warnings.is_empty()
484        {
485            TurnOutcome::PartialSuccess {
486                episode_recorded: true,
487                extraction_failed,
488                patches_failed,
489                warnings,
490            }
491        } else {
492            TurnOutcome::Success
493        };
494
495        let out = TurnOutput {
496            episode_id,
497            persona_prompt_contribution,
498            memory_context,
499            extraction_report,
500            steps_executed: dispatched_count,
501            outcome,
502            patch_dispatch_results,
503        };
504
505        if !self.config.agent_id.is_empty() {
506            let persist_state = RuntimeStateNode {
507                agent_id: self.config.agent_id.clone(),
508                turn_count: self.turn_count,
509                last_extraction_turn: self.last_extraction_turn,
510                last_persona_prompt: self.persona_cache.clone(),
511                updated_at: chrono::Utc::now().to_rfc3339(),
512            };
513            if let Err(e) = self.memory.sqlite_store().save_runtime_state(&persist_state) {
514                tracing::warn!(error = %e, "failed to persist runtime state — non-fatal");
515            }
516        }
517
518        self.hooks.on_turn_complete(&out);
519        Ok(out)
520    }
521
522    /// Score and rank semantic nodes for the current user text (`ainl-semantic-tagger` topic tags + recurrence).
523    pub fn relevant_semantic_nodes(
524        &self,
525        user_message: &str,
526        all_semantic: Vec<AinlMemoryNode>,
527        limit: usize,
528    ) -> Vec<AinlMemoryNode> {
529        let user_tags = infer_topic_tags(user_message);
530        let user_topics: HashSet<String> = user_tags
531            .iter()
532            .filter(|t| t.namespace == TagNamespace::Topic)
533            .map(|t| t.value.to_lowercase())
534            .collect();
535
536        if user_topics.is_empty() {
537            return fallback_high_recurrence_semantic(all_semantic, limit);
538        }
539
540        let mut scored: Vec<(f32, u32, AinlMemoryNode)> = Vec::new();
541        for n in all_semantic {
542            let (score, rec) = match &n.node_type {
543                AinlNodeType::Semantic { semantic } => {
544                    let mut s = 0f32;
545                    if let Some(cluster) = &semantic.topic_cluster {
546                        for slug in cluster
547                            .split([',', ';'])
548                            .map(|s| s.trim().to_lowercase())
549                            .filter(|s| !s.is_empty())
550                        {
551                            if user_topics.contains(&slug) {
552                                s += 1.0;
553                            }
554                        }
555                    }
556                    if s == 0.0 {
557                        for tag in &semantic.tags {
558                            let tl = tag.to_lowercase();
559                            if let Some(rest) = tl.strip_prefix("topic:") {
560                                let slug = rest.trim();
561                                if user_topics.contains(slug) {
562                                    s = 0.5;
563                                    break;
564                                }
565                            }
566                        }
567                    }
568                    (s, semantic.recurrence_count)
569                }
570                _ => (0.0, 0),
571            };
572            if score > 0.0 {
573                scored.push((score, rec, n));
574            }
575        }
576
577        scored.sort_by(|a, b| {
578            b.0.partial_cmp(&a.0)
579                .unwrap_or(std::cmp::Ordering::Equal)
580                .then_with(|| b.1.cmp(&a.1))
581        });
582        scored.into_iter().take(limit).map(|t| t.2).collect()
583    }
584
585    pub fn dispatch_patches(
586        &mut self,
587        patches: &[AinlMemoryNode],
588        frame: &HashMap<String, serde_json::Value>,
589    ) -> Vec<PatchDispatchResult> {
590        let mut discarded = Vec::new();
591        self.dispatch_patches_collect(patches, frame, &mut discarded)
592    }
593
594    fn dispatch_patches_collect(
595        &mut self,
596        patches: &[AinlMemoryNode],
597        frame: &HashMap<String, serde_json::Value>,
598        patches_failed: &mut Vec<String>,
599    ) -> Vec<PatchDispatchResult> {
600        let mut out = Vec::new();
601        for node in patches {
602            let res = self.dispatch_one_patch(node, frame);
603            if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
604                tracing::warn!(label = %res.label, error = %e, "patch fitness write failed — continuing");
605                patches_failed.push(res.label.clone());
606            }
607            out.push(res);
608        }
609        out
610    }
611
612    fn dispatch_one_patch(
613        &mut self,
614        node: &AinlMemoryNode,
615        frame: &HashMap<String, serde_json::Value>,
616    ) -> PatchDispatchResult {
617        let label_default = String::new();
618        let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
619            AinlNodeType::Procedural { procedural } => (
620                procedural_label(procedural),
621                procedural.patch_version,
622                procedural.retired,
623                procedural.declared_reads.clone(),
624                procedural.fitness,
625            ),
626            _ => {
627                return PatchDispatchResult {
628                    label: label_default,
629                    patch_version: 0,
630                    fitness_before: 0.0,
631                    fitness_after: 0.0,
632                    dispatched: false,
633                    skip_reason: Some(PatchSkipReason::NotProcedural),
634                    adapter_output: None,
635                    adapter_name: None,
636                };
637            }
638        };
639
640        if pv == 0 {
641            return PatchDispatchResult {
642                label: label_src,
643                patch_version: pv,
644                fitness_before: fitness_opt.unwrap_or(0.5),
645                fitness_after: fitness_opt.unwrap_or(0.5),
646                dispatched: false,
647                skip_reason: Some(PatchSkipReason::ZeroVersion),
648                adapter_output: None,
649                adapter_name: None,
650            };
651        }
652        if retired {
653            return PatchDispatchResult {
654                label: label_src.clone(),
655                patch_version: pv,
656                fitness_before: fitness_opt.unwrap_or(0.5),
657                fitness_after: fitness_opt.unwrap_or(0.5),
658                dispatched: false,
659                skip_reason: Some(PatchSkipReason::Retired),
660                adapter_output: None,
661                adapter_name: None,
662            };
663        }
664        for key in &reads {
665            if !frame.contains_key(key) {
666                return PatchDispatchResult {
667                    label: label_src.clone(),
668                    patch_version: pv,
669                    fitness_before: fitness_opt.unwrap_or(0.5),
670                    fitness_after: fitness_opt.unwrap_or(0.5),
671                    dispatched: false,
672                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
673                    adapter_output: None,
674                    adapter_name: None,
675                };
676            }
677        }
678
679        let patch_label = label_src.clone();
680        let adapter_key = patch_label.as_str();
681        let (adapter_output, adapter_name) =
682            if let Some(adapter) = self.adapter_registry.get(adapter_key) {
683                match adapter.execute(adapter_key, frame) {
684                    Ok(output) => {
685                        tracing::debug!(
686                            label = %patch_label,
687                            adapter = %adapter_key,
688                            "adapter executed patch"
689                        );
690                        (Some(output), Some(adapter_key.to_string()))
691                    }
692                    Err(e) => {
693                        tracing::warn!(
694                            label = %patch_label,
695                            error = %e,
696                            "adapter execution failed — continuing as metadata dispatch"
697                        );
698                        (None, Some(adapter_key.to_string()))
699                    }
700                }
701            } else {
702                (None, None)
703            };
704
705        let fitness_before = fitness_opt.unwrap_or(0.5);
706        let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
707
708        let store = self.memory.sqlite_store();
709        let updated = match store.read_node(node.id) {
710            Ok(Some(mut n)) => {
711                if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
712                    procedural.fitness = Some(fitness_after);
713                }
714                n
715            }
716            Ok(None) => {
717                return PatchDispatchResult {
718                    label: label_src,
719                    patch_version: pv,
720                    fitness_before,
721                    fitness_after: fitness_before,
722                    dispatched: false,
723                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead(
724                        "node_row".into(),
725                    )),
726                    adapter_output,
727                    adapter_name,
728                };
729            }
730            Err(e) => {
731                return PatchDispatchResult {
732                    label: label_src,
733                    patch_version: pv,
734                    fitness_before,
735                    fitness_after: fitness_before,
736                    dispatched: false,
737                    skip_reason: Some(PatchSkipReason::PersistFailed(e)),
738                    adapter_output,
739                    adapter_name,
740                };
741            }
742        };
743
744        if let Err(e) = self.memory.write_node(&updated) {
745            return PatchDispatchResult {
746                label: label_src.clone(),
747                patch_version: pv,
748                fitness_before,
749                fitness_after: fitness_before,
750                dispatched: false,
751                skip_reason: Some(PatchSkipReason::PersistFailed(e)),
752                adapter_output,
753                adapter_name,
754            };
755        }
756
757        self.hooks
758            .on_patch_dispatched(label_src.as_str(), fitness_after);
759
760        PatchDispatchResult {
761            label: label_src,
762            patch_version: pv,
763            fitness_before,
764            fitness_after,
765            dispatched: true,
766            skip_reason: None,
767            adapter_output,
768            adapter_name,
769        }
770    }
771}
772
773fn emit_target_name(n: &AinlMemoryNode) -> String {
774    match &n.node_type {
775        AinlNodeType::Persona { persona } => persona.trait_name.clone(),
776        AinlNodeType::Procedural { procedural } => procedural_label(procedural),
777        AinlNodeType::Semantic { semantic } => semantic.fact.chars().take(64).collect(),
778        AinlNodeType::Episode { episodic } => episodic.turn_id.to_string(),
779        AinlNodeType::RuntimeState { runtime_state } => {
780            format!("runtime_state:{}", runtime_state.agent_id)
781        }
782    }
783}
784
785fn procedural_label(p: &ProceduralNode) -> String {
786    if !p.label.is_empty() {
787        p.label.clone()
788    } else {
789        p.pattern_name.clone()
790    }
791}
792
793fn fallback_high_recurrence_semantic(all: Vec<AinlMemoryNode>, limit: usize) -> Vec<AinlMemoryNode> {
794    let mut v: Vec<_> = all
795        .into_iter()
796        .filter(|n| {
797            matches!(&n.node_type, AinlNodeType::Semantic { semantic } if semantic.recurrence_count >= 2)
798        })
799        .collect();
800    v.sort_by(|a, b| {
801        let ra = match &a.node_type {
802            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
803            _ => 0,
804        };
805        let rb = match &b.node_type {
806            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
807            _ => 0,
808        };
809        rb.cmp(&ra)
810    });
811    v.into_iter().take(limit).collect()
812}
813
814fn persona_snapshot_if_evolved(extractor: &GraphExtractorTask) -> Option<ainl_persona::PersonaSnapshot> {
815    let snap = extractor.evolution_engine.snapshot();
816    let defaults = default_axis_map(0.5);
817    for axis in PersonaAxis::ALL {
818        let s = snap.axes.get(&axis).map(|a| a.score).unwrap_or(0.5);
819        let d = defaults.get(&axis).map(|a| a.score).unwrap_or(0.5);
820        if (s - d).abs() > INGEST_SCORE_EPSILON {
821            return Some(snap);
822        }
823    }
824    None
825}
826
827fn compile_persona_from_nodes(nodes: &[AinlMemoryNode]) -> Result<Option<String>, String> {
828    if nodes.is_empty() {
829        return Ok(None);
830    }
831    let mut lines = Vec::new();
832    for n in nodes {
833        if let AinlNodeType::Persona { persona } = &n.node_type {
834            lines.push(format_persona_line(persona));
835        }
836    }
837    if lines.is_empty() {
838        Ok(None)
839    } else {
840        Ok(Some(lines.join("\n")))
841    }
842}
843
844fn format_persona_line(p: &PersonaNode) -> String {
845    format!(
846        "- {} (strength {:.2}, layer {:?}, source {:?})",
847        p.trait_name, p.strength, p.layer, p.source
848    )
849}
850
851/// Canonical tool names for episodic storage: [`tag_tool_names`] → `TagNamespace::Tool` values,
852/// deduplicated and sorted (lexicographic). Empty input yields `["turn"]` (same sentinel as before).
853fn normalize_tools_for_episode(tools_invoked: &[String]) -> Vec<String> {
854    if tools_invoked.is_empty() {
855        return vec!["turn".to_string()];
856    }
857    let tags = tag_tool_names(tools_invoked);
858    let mut seen: BTreeSet<String> = BTreeSet::new();
859    for t in tags {
860        if t.namespace == TagNamespace::Tool {
861            seen.insert(t.value);
862        }
863    }
864    if seen.is_empty() {
865        vec!["turn".to_string()]
866    } else {
867        seen.into_iter().collect()
868    }
869}
870
871fn record_turn_episode(
872    memory: &ainl_memory::GraphMemory,
873    agent_id: &str,
874    input: &TurnInput,
875    tools_invoked_canonical: &[String],
876) -> Result<Uuid, String> {
877    let turn_id = Uuid::new_v4();
878    let timestamp = chrono::Utc::now().timestamp();
879    let tools = tools_invoked_canonical.to_vec();
880    let mut node = AinlMemoryNode::new_episode(
881        turn_id,
882        timestamp,
883        tools.clone(),
884        None,
885        input.trace_event.clone(),
886    );
887    node.agent_id = agent_id.to_string();
888    if let AinlNodeType::Episode { ref mut episodic } = node.node_type {
889        episodic.user_message = Some(input.user_message.clone());
890        episodic.tools_invoked = tools;
891    }
892    memory.write_node(&node)?;
893    Ok(node.id)
894}