Skip to main content

ainl_runtime/
runtime.rs

1//! Unified-graph orchestration runtime (v0.2): load, compile context, patch dispatch, record, emit, extract.
2
3use std::collections::{BTreeSet, HashMap, HashSet};
4use std::time::Instant;
5
6use ainl_graph_extractor::GraphExtractorTask;
7use ainl_memory::{
8    AinlMemoryNode, AinlNodeType, GraphStore, GraphValidationReport, PersonaNode, ProceduralNode,
9    RuntimeStateNode, SqliteGraphStore,
10};
11use ainl_persona::axes::default_axis_map;
12use ainl_persona::{
13    EvolutionEngine, PersonaAxis, PersonaSnapshot, RawSignal, INGEST_SCORE_EPSILON,
14};
15use ainl_semantic_tagger::infer_topic_tags;
16use ainl_semantic_tagger::tag_tool_names;
17use ainl_semantic_tagger::TagNamespace;
18use uuid::Uuid;
19
20use crate::adapters::{AdapterRegistry, GraphPatchAdapter};
21use crate::engine::{
22    AinlGraphArtifact, MemoryContext, PatchDispatchContext, PatchDispatchResult, PatchSkipReason,
23    TurnInput, TurnOutcome, TurnOutput, EMIT_TO_EDGE,
24};
25use crate::hooks::{NoOpHooks, TurnHooks};
26use crate::RuntimeConfig;
27
28/// Orchestrates ainl-memory, persona snapshot state, and graph extraction for one agent.
29///
30/// ## Evolution writes vs ArmaraOS / openfang-runtime
31///
32/// In production ArmaraOS, **openfang-runtime**’s `GraphMemoryWriter::run_persona_evolution_pass`
33/// is the active writer of the evolution persona row ([`crate::EVOLUTION_TRAIT_NAME`]) to each
34/// agent’s `ainl_memory.db`. This struct holds its own [`GraphExtractorTask`] and
35/// [`EvolutionEngine`]. Calling [`Self::persist_evolution_snapshot`] or
36/// [`Self::evolve_persona_from_graph_signals`] **concurrently** with that pass on the **same**
37/// SQLite store is undefined (competing last-writer wins on the same persona node).
38///
39/// Prefer [`Self::with_evolution_writes_enabled(false)`] when a host embeds `AinlRuntime` alongside
40/// openfang while openfang remains the sole evolution writer. [`Self::evolution_engine_mut`] can
41/// still mutate in-memory axis state; calling [`EvolutionEngine::write_persona_node`] yourself
42/// bypasses this guard and must be avoided in that configuration.
43pub struct AinlRuntime {
44    config: RuntimeConfig,
45    memory: ainl_memory::GraphMemory,
46    extractor: GraphExtractorTask,
47    turn_count: u32,
48    last_extraction_turn: u32,
49    delegation_depth: u32,
50    hooks: Box<dyn TurnHooks>,
51    /// When `false`, [`Self::persist_evolution_snapshot`] and [`Self::evolve_persona_from_graph_signals`]
52    /// return [`Err`] immediately so this runtime does not compete with another evolution writer
53    /// (e.g. openfang’s post-turn pass) on the same DB. Default: `true`.
54    evolution_writes_enabled: bool,
55    persona_cache: Option<String>,
56    /// Test hook: when set, the next scheduled extraction pass is treated as failed (`PartialSuccess`).
57    #[doc(hidden)]
58    test_force_extraction_failure: bool,
59    adapter_registry: AdapterRegistry,
60}
61
62impl AinlRuntime {
63    pub fn new(config: RuntimeConfig, store: SqliteGraphStore) -> Self {
64        let agent_id = config.agent_id.clone();
65        let memory = ainl_memory::GraphMemory::from_sqlite_store(store);
66        let (init_turn_count, init_persona_cache, init_last_extraction_turn) =
67            if agent_id.is_empty() {
68                (0, None, 0)
69            } else {
70                match memory.sqlite_store().load_runtime_state(&agent_id) {
71                    Ok(Some(state)) => {
72                        tracing::info!(
73                            agent_id = %agent_id,
74                            turn_count = state.turn_count,
75                            "restored runtime state"
76                        );
77                        (
78                            state.turn_count,
79                            state.last_persona_prompt,
80                            state.last_extraction_turn,
81                        )
82                    }
83                    Ok(None) => (0, None, 0),
84                    Err(e) => {
85                        tracing::warn!(error = %e, "failed to load runtime state — starting fresh");
86                        (0, None, 0)
87                    }
88                }
89            };
90        Self {
91            extractor: GraphExtractorTask::new(&agent_id),
92            memory,
93            config,
94            turn_count: init_turn_count,
95            last_extraction_turn: init_last_extraction_turn,
96            delegation_depth: 0,
97            hooks: Box::new(NoOpHooks),
98            evolution_writes_enabled: true,
99            persona_cache: init_persona_cache,
100            test_force_extraction_failure: false,
101            adapter_registry: AdapterRegistry::new(),
102        }
103    }
104
105    /// Register a [`crate::PatchAdapter`] keyed by [`PatchAdapter::name`] (e.g. procedural patch label).
106    pub fn register_adapter(&mut self, adapter: impl crate::PatchAdapter + 'static) {
107        self.adapter_registry.register(adapter);
108    }
109
110    /// Install the reference [`GraphPatchAdapter`] as fallback for procedural patches without a
111    /// label-specific adapter (see [`PatchDispatchContext`]).
112    pub fn register_default_patch_adapters(&mut self) {
113        self.register_adapter(GraphPatchAdapter::new());
114    }
115
116    /// Names of currently registered patch adapters.
117    pub fn registered_adapters(&self) -> Vec<&str> {
118        self.adapter_registry.registered_names()
119    }
120
121    #[doc(hidden)]
122    pub fn test_turn_count(&self) -> u32 {
123        self.turn_count
124    }
125
126    #[doc(hidden)]
127    pub fn test_delegation_depth(&self) -> u32 {
128        self.delegation_depth
129    }
130
131    #[doc(hidden)]
132    pub fn test_set_delegation_depth(&mut self, depth: u32) {
133        self.delegation_depth = depth;
134    }
135
136    #[doc(hidden)]
137    pub fn test_set_force_extraction_failure(&mut self, fail: bool) {
138        self.test_force_extraction_failure = fail;
139    }
140
141    pub fn with_hooks(mut self, hooks: impl TurnHooks + 'static) -> Self {
142        self.hooks = Box::new(hooks);
143        self
144    }
145
146    /// Set whether [`Self::persist_evolution_snapshot`] and [`Self::evolve_persona_from_graph_signals`]
147    /// may write the evolution persona row. When `false`, those methods return [`Err`]. Chaining
148    /// after [`Self::new`] is the supported way to disable writes for hosts that delegate evolution
149    /// persistence elsewhere (see struct-level docs).
150    pub fn with_evolution_writes_enabled(mut self, enabled: bool) -> Self {
151        self.evolution_writes_enabled = enabled;
152        self
153    }
154
155    fn require_evolution_writes_enabled(&self) -> Result<(), String> {
156        if self.evolution_writes_enabled {
157            Ok(())
158        } else {
159            Err(
160                "ainl_runtime: evolution_writes_enabled is false — persist_evolution_snapshot and \
161                 evolve_persona_from_graph_signals are disabled so this runtime does not compete \
162                 with openfang-runtime GraphMemoryWriter::run_persona_evolution_pass on the same \
163                 ainl_memory.db"
164                    .to_string(),
165            )
166        }
167    }
168
169    /// Borrow the backing SQLite store (same connection as graph memory).
170    pub fn sqlite_store(&self) -> &SqliteGraphStore {
171        self.memory.sqlite_store()
172    }
173
174    /// Borrow the persona [`EvolutionEngine`] for this runtime’s agent.
175    ///
176    /// This is the **same** `EvolutionEngine` instance held by [`GraphExtractorTask::evolution_engine`].
177    /// Scheduled [`GraphExtractorTask::run_pass`] continues to feed graph + pattern signals into it;
178    /// hosts may also call [`EvolutionEngine::ingest_signals`], [`EvolutionEngine::correction_tick`],
179    /// [`EvolutionEngine::extract_signals`], or [`EvolutionEngine::evolve`] directly, then
180    /// [`Self::persist_evolution_snapshot`] to write the [`PersonaSnapshot`] row ([`crate::EVOLUTION_TRAIT_NAME`]).
181    pub fn evolution_engine(&self) -> &EvolutionEngine {
182        &self.extractor.evolution_engine
183    }
184
185    /// Mutable access to the persona [`EvolutionEngine`] (see [`Self::evolution_engine`]).
186    ///
187    /// Direct calls to [`EvolutionEngine::write_persona_node`] bypass [`Self::evolution_writes_enabled`].
188    pub fn evolution_engine_mut(&mut self) -> &mut EvolutionEngine {
189        &mut self.extractor.evolution_engine
190    }
191
192    /// Ingest explicit [`RawSignal`]s without reading the graph (wrapper for [`EvolutionEngine::ingest_signals`]).
193    pub fn apply_evolution_signals(&mut self, signals: Vec<RawSignal>) -> usize {
194        self.extractor.evolution_engine.ingest_signals(signals)
195    }
196
197    /// Apply a host correction nudge on one axis ([`EvolutionEngine::correction_tick`]).
198    pub fn evolution_correction_tick(&mut self, axis: PersonaAxis, correction: f32) {
199        self.extractor
200            .evolution_engine
201            .correction_tick(axis, correction);
202    }
203
204    /// Snapshot current axis EMA state and persist the evolution persona bundle to the store.
205    ///
206    /// Returns [`Err`] when [`Self::evolution_writes_enabled`] is `false` (see [`Self::with_evolution_writes_enabled`]).
207    pub fn persist_evolution_snapshot(&mut self) -> Result<PersonaSnapshot, String> {
208        self.require_evolution_writes_enabled()?;
209        let store = self.memory.sqlite_store();
210        let snap = self.extractor.evolution_engine.snapshot();
211        self.extractor
212            .evolution_engine
213            .write_persona_node(store, &snap)?;
214        Ok(snap)
215    }
216
217    /// Graph-backed evolution only: extract signals from the store, ingest, write ([`EvolutionEngine::evolve`]).
218    ///
219    /// This does **not** run semantic `recurrence_count` bumps or the extractor’s `extract_pass`
220    /// heuristics — use [`GraphExtractorTask::run_pass`] for the full scheduled pipeline.
221    ///
222    /// Returns [`Err`] when [`Self::evolution_writes_enabled`] is `false` (see [`Self::with_evolution_writes_enabled`]).
223    pub fn evolve_persona_from_graph_signals(&mut self) -> Result<PersonaSnapshot, String> {
224        self.require_evolution_writes_enabled()?;
225        let store = self.memory.sqlite_store();
226        self.extractor.evolution_engine.evolve(store)
227    }
228
229    /// Boot: export + validate the agent subgraph.
230    pub fn load_artifact(&self) -> Result<AinlGraphArtifact, String> {
231        AinlGraphArtifact::load(self.memory.sqlite_store(), &self.config.agent_id)
232    }
233
234    /// Same as [`Self::compile_memory_context_for`] with `user_message: None` (semantic relevance falls back
235    /// to the latest episode’s `user_message` when present).
236    pub fn compile_memory_context(&self) -> Result<MemoryContext, String> {
237        self.compile_memory_context_for(None)
238    }
239
240    /// Build [`MemoryContext`] from the live store plus current extractor axis state.
241    pub fn compile_memory_context_for(
242        &self,
243        user_message: Option<&str>,
244    ) -> Result<MemoryContext, String> {
245        if self.config.agent_id.is_empty() {
246            return Err("RuntimeConfig.agent_id must be set".to_string());
247        }
248        let store = self.memory.sqlite_store();
249        let q = store.query(&self.config.agent_id);
250        let recent_episodes = q.recent_episodes(5)?;
251        let effective_user = user_message
252            .map(|s| s.to_string())
253            .filter(|s| !s.is_empty())
254            .or_else(|| {
255                recent_episodes.first().and_then(|n| {
256                    if let AinlNodeType::Episode { episodic } = &n.node_type {
257                        episodic.user_message.clone().filter(|m| !m.is_empty())
258                    } else {
259                        None
260                    }
261                })
262            });
263        let all_semantic = q.semantic_nodes()?;
264        let relevant_semantic = match effective_user.as_deref() {
265            Some(msg) => self.relevant_semantic_nodes(msg, all_semantic, 10),
266            None => fallback_high_recurrence_semantic(all_semantic, 10),
267        };
268        let active_patches = q.active_patches()?;
269        let persona_snapshot = persona_snapshot_if_evolved(&self.extractor);
270        Ok(MemoryContext {
271            recent_episodes,
272            relevant_semantic,
273            active_patches,
274            persona_snapshot,
275            compiled_at: chrono::Utc::now(),
276        })
277    }
278
279    /// Route `EMIT_TO` edges from an episode to hook targets (host implements [`TurnHooks::on_emit`]).
280    pub fn route_emit_edges(
281        &self,
282        episode_id: Uuid,
283        turn_output_payload: &serde_json::Value,
284    ) -> Result<(), String> {
285        let store = self.memory.sqlite_store();
286        let neighbors = store
287            .query(&self.config.agent_id)
288            .neighbors(episode_id, EMIT_TO_EDGE)?;
289        for n in neighbors {
290            let target = emit_target_name(&n);
291            self.hooks.on_emit(&target, turn_output_payload);
292        }
293        Ok(())
294    }
295
296    /// Full single-turn orchestration (no LLM / no IR parse).
297    pub fn run_turn(&mut self, input: TurnInput) -> Result<TurnOutput, String> {
298        self.delegation_depth += 1;
299        let rt_ptr = self as *mut Self;
300        // Safety: `rt_ptr` aliases `self` for the synchronous body of `run_turn` only; the defer runs on
301        // return before `self` is invalidated.
302        // `scopeguard::defer!` only supports expression statements; use `guard` for the same drop semantics.
303        let _depth_guard = scopeguard::guard((), |()| unsafe {
304            if (*rt_ptr).delegation_depth > 0 {
305                (*rt_ptr).delegation_depth -= 1;
306            }
307        });
308
309        if self.delegation_depth > self.config.max_delegation_depth {
310            let out = TurnOutput {
311                outcome: TurnOutcome::DepthLimitExceeded,
312                ..Default::default()
313            };
314            self.hooks.on_turn_complete(&out);
315            return Ok(out);
316        }
317
318        if !self.config.enable_graph_memory {
319            let memory_context = MemoryContext::default();
320            let out = TurnOutput {
321                memory_context,
322                outcome: TurnOutcome::GraphMemoryDisabled,
323                ..Default::default()
324            };
325            self.hooks.on_turn_complete(&out);
326            return Ok(out);
327        }
328
329        if self.config.agent_id.is_empty() {
330            return Err("RuntimeConfig.agent_id must be set for run_turn".to_string());
331        }
332
333        let span = tracing::info_span!(
334            "ainl_runtime.run_turn",
335            agent_id = %self.config.agent_id,
336            turn = self.turn_count,
337            depth = input.depth,
338        );
339        let _span_enter = span.enter();
340
341        let validation: GraphValidationReport = self
342            .memory
343            .sqlite_store()
344            .validate_graph(&self.config.agent_id)?;
345        if !validation.is_valid {
346            let mut msg = String::from("graph validation failed before turn");
347            for d in &validation.dangling_edge_details {
348                msg.push_str(&format!(
349                    "; {} -> {} [{}]",
350                    d.source_id, d.target_id, d.edge_type
351                ));
352            }
353            return Err(msg);
354        }
355
356        self.hooks
357            .on_artifact_loaded(&self.config.agent_id, validation.node_count);
358
359        let mut patches_failed: Vec<String> = Vec::new();
360        let mut warnings: Vec<String> = Vec::new();
361
362        let t_persona = Instant::now();
363        let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
364            Some(cached.clone())
365        } else {
366            let nodes = self
367                .memory
368                .sqlite_store()
369                .query(&self.config.agent_id)
370                .persona_nodes()?;
371            let compiled = compile_persona_from_nodes(&nodes)?;
372            self.persona_cache = compiled.clone();
373            compiled
374        };
375        self.hooks
376            .on_persona_compiled(persona_prompt_contribution.as_deref());
377        tracing::debug!(
378            target: "ainl_runtime",
379            duration_ms = t_persona.elapsed().as_millis() as u64,
380            has_contribution = persona_prompt_contribution.is_some(),
381            "persona_phase"
382        );
383
384        let t_memory = Instant::now();
385        let memory_context = self.compile_memory_context_for(Some(&input.user_message))?;
386        self.hooks.on_memory_context_ready(&memory_context);
387        tracing::debug!(
388            target: "ainl_runtime",
389            duration_ms = t_memory.elapsed().as_millis() as u64,
390            episode_count = memory_context.recent_episodes.len(),
391            semantic_count = memory_context.relevant_semantic.len(),
392            patch_count = memory_context.active_patches.len(),
393            "memory_context"
394        );
395
396        let t_patches = Instant::now();
397        let patch_dispatch_results = if self.config.enable_graph_memory {
398            self.dispatch_patches_collect(
399                &memory_context.active_patches,
400                &input.frame,
401                &mut patches_failed,
402            )
403        } else {
404            Vec::new()
405        };
406        for r in &patch_dispatch_results {
407            tracing::debug!(
408                target: "ainl_runtime",
409                label = %r.label,
410                dispatched = r.dispatched,
411                fitness_before = r.fitness_before,
412                fitness_after = r.fitness_after,
413                "patch_dispatch"
414            );
415        }
416        tracing::debug!(
417            target: "ainl_runtime",
418            duration_ms = t_patches.elapsed().as_millis() as u64,
419            "patch_dispatch_phase"
420        );
421
422        let dispatched_count = patch_dispatch_results
423            .iter()
424            .filter(|r| r.dispatched)
425            .count() as u32;
426        if dispatched_count >= self.config.max_steps {
427            let out = TurnOutput {
428                patch_dispatch_results,
429                memory_context,
430                persona_prompt_contribution,
431                steps_executed: dispatched_count,
432                outcome: TurnOutcome::StepLimitExceeded {
433                    steps_executed: dispatched_count,
434                },
435                ..Default::default()
436            };
437            self.hooks.on_turn_complete(&out);
438            return Ok(out);
439        }
440
441        let t_episode = Instant::now();
442        let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
443        let episode_id = record_turn_episode(
444            &self.memory,
445            &self.config.agent_id,
446            &input,
447            &tools_canonical,
448        )?;
449        self.hooks.on_episode_recorded(episode_id);
450        tracing::debug!(
451            target: "ainl_runtime",
452            duration_ms = t_episode.elapsed().as_millis() as u64,
453            episode_id = %episode_id,
454            "episode_record"
455        );
456
457        for &tid in &input.emit_targets {
458            self.memory
459                .sqlite_store()
460                .insert_graph_edge_checked(episode_id, tid, EMIT_TO_EDGE)?;
461        }
462
463        let emit_payload = serde_json::json!({
464            "episode_id": episode_id.to_string(),
465            "user_message": input.user_message,
466            "tools_invoked": tools_canonical,
467            "persona_contribution": persona_prompt_contribution,
468            "turn_count": self.turn_count.wrapping_add(1),
469        });
470        if let Err(e) = self.route_emit_edges(episode_id, &emit_payload) {
471            tracing::warn!(error = %e, "emit routing failed — continuing");
472            warnings.push(format!("emit_routing: {e}"));
473        }
474
475        self.turn_count = self.turn_count.wrapping_add(1);
476
477        let should_extract = self.config.extraction_interval > 0
478            && self.turn_count.saturating_sub(self.last_extraction_turn)
479                >= self.config.extraction_interval;
480
481        let t_extract = Instant::now();
482        let (extraction_report, extraction_failed) = if should_extract {
483            let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
484
485            let res = if force_fail {
486                tracing::warn!(error = "test_forced", "extraction pass failed — continuing");
487                tracing::debug!(
488                    target: "ainl_runtime",
489                    duration_ms = t_extract.elapsed().as_millis() as u64,
490                    signals_ingested = 0u64,
491                    skipped = false,
492                    "extraction_pass"
493                );
494                (None, true)
495            } else {
496                match self.extractor.run_pass(self.memory.sqlite_store()) {
497                    Ok(report) => {
498                        tracing::info!(
499                            agent_id = %report.agent_id,
500                            signals_extracted = report.signals_extracted,
501                            signals_applied = report.signals_applied,
502                            semantic_nodes_updated = report.semantic_nodes_updated,
503                            "ainl-graph-extractor pass completed (scheduled)"
504                        );
505                        self.hooks.on_extraction_complete(&report);
506                        self.persona_cache = None;
507                        tracing::debug!(
508                            target: "ainl_runtime",
509                            duration_ms = t_extract.elapsed().as_millis() as u64,
510                            signals_ingested = report.signals_extracted as u64,
511                            skipped = false,
512                            "extraction_pass"
513                        );
514                        (Some(report), false)
515                    }
516                    Err(e) => {
517                        tracing::warn!(error = %e, "extraction pass failed — continuing");
518                        tracing::debug!(
519                            target: "ainl_runtime",
520                            duration_ms = t_extract.elapsed().as_millis() as u64,
521                            signals_ingested = 0u64,
522                            skipped = false,
523                            "extraction_pass"
524                        );
525                        (None, true)
526                    }
527                }
528            };
529            self.last_extraction_turn = self.turn_count;
530            res
531        } else {
532            tracing::debug!(
533                target: "ainl_runtime",
534                duration_ms = t_extract.elapsed().as_millis() as u64,
535                signals_ingested = 0u64,
536                skipped = true,
537                "extraction_pass"
538            );
539            (None, false)
540        };
541
542        let outcome = if extraction_failed || !patches_failed.is_empty() || !warnings.is_empty() {
543            TurnOutcome::PartialSuccess {
544                episode_recorded: true,
545                extraction_failed,
546                patches_failed,
547                warnings,
548            }
549        } else {
550            TurnOutcome::Success
551        };
552
553        let out = TurnOutput {
554            episode_id,
555            persona_prompt_contribution,
556            memory_context,
557            extraction_report,
558            steps_executed: dispatched_count,
559            outcome,
560            patch_dispatch_results,
561        };
562
563        if !self.config.agent_id.is_empty() {
564            let persist_state = RuntimeStateNode {
565                agent_id: self.config.agent_id.clone(),
566                turn_count: self.turn_count,
567                last_extraction_turn: self.last_extraction_turn,
568                last_persona_prompt: self.persona_cache.clone(),
569                updated_at: chrono::Utc::now().to_rfc3339(),
570            };
571            if let Err(e) = self
572                .memory
573                .sqlite_store()
574                .save_runtime_state(&persist_state)
575            {
576                tracing::warn!(error = %e, "failed to persist runtime state — non-fatal");
577            }
578        }
579
580        self.hooks.on_turn_complete(&out);
581        Ok(out)
582    }
583
584    /// Score and rank semantic nodes for the current user text (`ainl-semantic-tagger` topic tags + recurrence).
585    pub fn relevant_semantic_nodes(
586        &self,
587        user_message: &str,
588        all_semantic: Vec<AinlMemoryNode>,
589        limit: usize,
590    ) -> Vec<AinlMemoryNode> {
591        let user_tags = infer_topic_tags(user_message);
592        let user_topics: HashSet<String> = user_tags
593            .iter()
594            .filter(|t| t.namespace == TagNamespace::Topic)
595            .map(|t| t.value.to_lowercase())
596            .collect();
597
598        if user_topics.is_empty() {
599            return fallback_high_recurrence_semantic(all_semantic, limit);
600        }
601
602        let mut scored: Vec<(f32, u32, AinlMemoryNode)> = Vec::new();
603        for n in all_semantic {
604            let (score, rec) = match &n.node_type {
605                AinlNodeType::Semantic { semantic } => {
606                    let mut s = 0f32;
607                    if let Some(cluster) = &semantic.topic_cluster {
608                        for slug in cluster
609                            .split([',', ';'])
610                            .map(|s| s.trim().to_lowercase())
611                            .filter(|s| !s.is_empty())
612                        {
613                            if user_topics.contains(&slug) {
614                                s += 1.0;
615                            }
616                        }
617                    }
618                    if s == 0.0 {
619                        for tag in &semantic.tags {
620                            let tl = tag.to_lowercase();
621                            if let Some(rest) = tl.strip_prefix("topic:") {
622                                let slug = rest.trim();
623                                if user_topics.contains(slug) {
624                                    s = 0.5;
625                                    break;
626                                }
627                            }
628                        }
629                    }
630                    (s, semantic.recurrence_count)
631                }
632                _ => (0.0, 0),
633            };
634            if score > 0.0 {
635                scored.push((score, rec, n));
636            }
637        }
638
639        scored.sort_by(|a, b| {
640            b.0.partial_cmp(&a.0)
641                .unwrap_or(std::cmp::Ordering::Equal)
642                .then_with(|| b.1.cmp(&a.1))
643        });
644        scored.into_iter().take(limit).map(|t| t.2).collect()
645    }
646
647    pub fn dispatch_patches(
648        &mut self,
649        patches: &[AinlMemoryNode],
650        frame: &HashMap<String, serde_json::Value>,
651    ) -> Vec<PatchDispatchResult> {
652        let mut discarded = Vec::new();
653        self.dispatch_patches_collect(patches, frame, &mut discarded)
654    }
655
656    fn dispatch_patches_collect(
657        &mut self,
658        patches: &[AinlMemoryNode],
659        frame: &HashMap<String, serde_json::Value>,
660        patches_failed: &mut Vec<String>,
661    ) -> Vec<PatchDispatchResult> {
662        let mut out = Vec::new();
663        for node in patches {
664            let res = self.dispatch_one_patch(node, frame);
665            if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
666                tracing::warn!(label = %res.label, error = %e, "patch fitness write failed — continuing");
667                patches_failed.push(res.label.clone());
668            }
669            out.push(res);
670        }
671        out
672    }
673
674    fn dispatch_one_patch(
675        &mut self,
676        node: &AinlMemoryNode,
677        frame: &HashMap<String, serde_json::Value>,
678    ) -> PatchDispatchResult {
679        let label_default = String::new();
680        let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
681            AinlNodeType::Procedural { procedural } => (
682                procedural_label(procedural),
683                procedural.patch_version,
684                procedural.retired,
685                procedural.declared_reads.clone(),
686                procedural.fitness,
687            ),
688            _ => {
689                return PatchDispatchResult {
690                    label: label_default,
691                    patch_version: 0,
692                    fitness_before: 0.0,
693                    fitness_after: 0.0,
694                    dispatched: false,
695                    skip_reason: Some(PatchSkipReason::NotProcedural),
696                    adapter_output: None,
697                    adapter_name: None,
698                };
699            }
700        };
701
702        if pv == 0 {
703            return PatchDispatchResult {
704                label: label_src,
705                patch_version: pv,
706                fitness_before: fitness_opt.unwrap_or(0.5),
707                fitness_after: fitness_opt.unwrap_or(0.5),
708                dispatched: false,
709                skip_reason: Some(PatchSkipReason::ZeroVersion),
710                adapter_output: None,
711                adapter_name: None,
712            };
713        }
714        if retired {
715            return PatchDispatchResult {
716                label: label_src.clone(),
717                patch_version: pv,
718                fitness_before: fitness_opt.unwrap_or(0.5),
719                fitness_after: fitness_opt.unwrap_or(0.5),
720                dispatched: false,
721                skip_reason: Some(PatchSkipReason::Retired),
722                adapter_output: None,
723                adapter_name: None,
724            };
725        }
726        for key in &reads {
727            if !frame.contains_key(key) {
728                return PatchDispatchResult {
729                    label: label_src.clone(),
730                    patch_version: pv,
731                    fitness_before: fitness_opt.unwrap_or(0.5),
732                    fitness_after: fitness_opt.unwrap_or(0.5),
733                    dispatched: false,
734                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
735                    adapter_output: None,
736                    adapter_name: None,
737                };
738            }
739        }
740
741        let patch_label = label_src.clone();
742        let adapter_key = patch_label.as_str();
743        let ctx = PatchDispatchContext {
744            patch_label: adapter_key,
745            node,
746            frame,
747        };
748        let (adapter_output, adapter_name) = if let Some(adapter) = self
749            .adapter_registry
750            .get(adapter_key)
751            .or_else(|| self.adapter_registry.get(GraphPatchAdapter::NAME))
752        {
753            let aname = adapter.name().to_string();
754            match adapter.execute_patch(&ctx) {
755                Ok(output) => {
756                    tracing::debug!(
757                        label = %patch_label,
758                        adapter = %aname,
759                        "adapter executed patch"
760                    );
761                    (Some(output), Some(aname))
762                }
763                Err(e) => {
764                    tracing::warn!(
765                        label = %patch_label,
766                        adapter = %aname,
767                        error = %e,
768                        "adapter execution failed — continuing as metadata dispatch"
769                    );
770                    (None, Some(aname))
771                }
772            }
773        } else {
774            (None, None)
775        };
776
777        let fitness_before = fitness_opt.unwrap_or(0.5);
778        let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
779
780        let store = self.memory.sqlite_store();
781        let updated = match store.read_node(node.id) {
782            Ok(Some(mut n)) => {
783                if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
784                    procedural.fitness = Some(fitness_after);
785                }
786                n
787            }
788            Ok(None) => {
789                return PatchDispatchResult {
790                    label: label_src,
791                    patch_version: pv,
792                    fitness_before,
793                    fitness_after: fitness_before,
794                    dispatched: false,
795                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead("node_row".into())),
796                    adapter_output,
797                    adapter_name,
798                };
799            }
800            Err(e) => {
801                return PatchDispatchResult {
802                    label: label_src,
803                    patch_version: pv,
804                    fitness_before,
805                    fitness_after: fitness_before,
806                    dispatched: false,
807                    skip_reason: Some(PatchSkipReason::PersistFailed(e)),
808                    adapter_output,
809                    adapter_name,
810                };
811            }
812        };
813
814        if let Err(e) = self.memory.write_node(&updated) {
815            return PatchDispatchResult {
816                label: label_src.clone(),
817                patch_version: pv,
818                fitness_before,
819                fitness_after: fitness_before,
820                dispatched: false,
821                skip_reason: Some(PatchSkipReason::PersistFailed(e)),
822                adapter_output,
823                adapter_name,
824            };
825        }
826
827        self.hooks
828            .on_patch_dispatched(label_src.as_str(), fitness_after);
829
830        PatchDispatchResult {
831            label: label_src,
832            patch_version: pv,
833            fitness_before,
834            fitness_after,
835            dispatched: true,
836            skip_reason: None,
837            adapter_output,
838            adapter_name,
839        }
840    }
841}
842
843fn emit_target_name(n: &AinlMemoryNode) -> String {
844    match &n.node_type {
845        AinlNodeType::Persona { persona } => persona.trait_name.clone(),
846        AinlNodeType::Procedural { procedural } => procedural_label(procedural),
847        AinlNodeType::Semantic { semantic } => semantic.fact.chars().take(64).collect(),
848        AinlNodeType::Episode { episodic } => episodic.turn_id.to_string(),
849        AinlNodeType::RuntimeState { runtime_state } => {
850            format!("runtime_state:{}", runtime_state.agent_id)
851        }
852    }
853}
854
855fn procedural_label(p: &ProceduralNode) -> String {
856    if !p.label.is_empty() {
857        p.label.clone()
858    } else {
859        p.pattern_name.clone()
860    }
861}
862
863fn fallback_high_recurrence_semantic(
864    all: Vec<AinlMemoryNode>,
865    limit: usize,
866) -> Vec<AinlMemoryNode> {
867    let mut v: Vec<_> = all
868        .into_iter()
869        .filter(|n| {
870            matches!(&n.node_type, AinlNodeType::Semantic { semantic } if semantic.recurrence_count >= 2)
871        })
872        .collect();
873    v.sort_by(|a, b| {
874        let ra = match &a.node_type {
875            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
876            _ => 0,
877        };
878        let rb = match &b.node_type {
879            AinlNodeType::Semantic { semantic } => semantic.recurrence_count,
880            _ => 0,
881        };
882        rb.cmp(&ra)
883    });
884    v.into_iter().take(limit).collect()
885}
886
887fn persona_snapshot_if_evolved(
888    extractor: &GraphExtractorTask,
889) -> Option<ainl_persona::PersonaSnapshot> {
890    let snap = extractor.evolution_engine.snapshot();
891    let defaults = default_axis_map(0.5);
892    for axis in PersonaAxis::ALL {
893        let s = snap.axes.get(&axis).map(|a| a.score).unwrap_or(0.5);
894        let d = defaults.get(&axis).map(|a| a.score).unwrap_or(0.5);
895        if (s - d).abs() > INGEST_SCORE_EPSILON {
896            return Some(snap);
897        }
898    }
899    None
900}
901
902fn compile_persona_from_nodes(nodes: &[AinlMemoryNode]) -> Result<Option<String>, String> {
903    if nodes.is_empty() {
904        return Ok(None);
905    }
906    let mut lines = Vec::new();
907    for n in nodes {
908        if let AinlNodeType::Persona { persona } = &n.node_type {
909            lines.push(format_persona_line(persona));
910        }
911    }
912    if lines.is_empty() {
913        Ok(None)
914    } else {
915        Ok(Some(lines.join("\n")))
916    }
917}
918
919fn format_persona_line(p: &PersonaNode) -> String {
920    format!(
921        "- {} (strength {:.2}, layer {:?}, source {:?})",
922        p.trait_name, p.strength, p.layer, p.source
923    )
924}
925
926/// Canonical tool names for episodic storage: [`tag_tool_names`] → `TagNamespace::Tool` values,
927/// deduplicated and sorted (lexicographic). Empty input yields `["turn"]` (same sentinel as before).
928fn normalize_tools_for_episode(tools_invoked: &[String]) -> Vec<String> {
929    if tools_invoked.is_empty() {
930        return vec!["turn".to_string()];
931    }
932    let tags = tag_tool_names(tools_invoked);
933    let mut seen: BTreeSet<String> = BTreeSet::new();
934    for t in tags {
935        if t.namespace == TagNamespace::Tool {
936            seen.insert(t.value);
937        }
938    }
939    if seen.is_empty() {
940        vec!["turn".to_string()]
941    } else {
942        seen.into_iter().collect()
943    }
944}
945
946fn record_turn_episode(
947    memory: &ainl_memory::GraphMemory,
948    agent_id: &str,
949    input: &TurnInput,
950    tools_invoked_canonical: &[String],
951) -> Result<Uuid, String> {
952    let turn_id = Uuid::new_v4();
953    let timestamp = chrono::Utc::now().timestamp();
954    let tools = tools_invoked_canonical.to_vec();
955    let mut node = AinlMemoryNode::new_episode(
956        turn_id,
957        timestamp,
958        tools.clone(),
959        None,
960        input.trace_event.clone(),
961    );
962    node.agent_id = agent_id.to_string();
963    if let AinlNodeType::Episode { ref mut episodic } = node.node_type {
964        episodic.user_message = Some(input.user_message.clone());
965        episodic.tools_invoked = tools;
966    }
967    memory.write_node(&node)?;
968    Ok(node.id)
969}