Skip to main content

ainl_runtime/
runtime_async.rs

1//! Async turn execution ([`super::AinlRuntime::run_turn_async`]): SQLite and graph reads/writes run
2//! on Tokio’s blocking pool (`tokio::task::spawn_blocking`) while other runtime state stays on the
3//! async caller. Graph memory is shared as `Arc<std::sync::Mutex<ainl_memory::GraphMemory>>`; we
4//! intentionally do **not** use `tokio::sync::Mutex` for that inner lock so [`super::AinlRuntime::new`]
5//! and short borrows such as [`super::AinlRuntime::sqlite_store`] are safe on any thread—including
6//! Tokio worker threads used by `#[tokio::test]`—without the “block inside async context” failure
7//! mode of `Mutex::blocking_lock` on an async mutex (see [Tokio mutex blocking_lock](https://docs.rs/tokio/latest/tokio/sync/struct.Mutex.html#method.blocking_lock)).
8
9use std::collections::HashMap;
10use std::sync::atomic::Ordering;
11use std::sync::{Arc, Mutex};
12use std::time::Instant;
13
14use ainl_graph_extractor::GraphExtractorTask;
15use ainl_memory::{
16    AinlMemoryNode, AinlNodeType, GraphMemory, GraphStore, GraphValidationReport, RuntimeStateNode,
17};
18use uuid::Uuid;
19
20use super::{
21    compile_persona_from_nodes, emit_target_name, maybe_persist_trajectory_after_episode,
22    normalize_tools_for_episode, persona_snapshot_if_evolved, procedural_label,
23    record_turn_episode, try_export_graph_json_armaraos,
24};
25use crate::adapters::GraphPatchAdapter;
26use crate::engine::{
27    AinlRuntimeError, MemoryContext, PatchDispatchContext, PatchDispatchResult, PatchSkipReason,
28    TurnInput, TurnOutcome, TurnPhase, TurnResult, TurnStatus, TurnWarning, EMIT_TO_EDGE,
29};
30
31async fn graph_spawn<T, F>(arc: Arc<Mutex<GraphMemory>>, f: F) -> Result<T, AinlRuntimeError>
32where
33    T: Send + 'static,
34    F: FnOnce(&GraphMemory) -> Result<T, String> + Send + 'static,
35{
36    tokio::task::spawn_blocking(move || {
37        let guard = arc.lock().expect("graph mutex poisoned");
38        f(&guard)
39    })
40    .await
41    .map_err(|e| AinlRuntimeError::AsyncJoinError(e.to_string()))?
42    .map_err(AinlRuntimeError::from)
43}
44
45impl super::AinlRuntime {
46    /// Async single-turn orchestration: graph SQLite I/O is offloaded with `spawn_blocking`.
47    ///
48    /// The graph remains `Arc<std::sync::Mutex<GraphMemory>>` (see crate `README.md`); this method
49    /// does not switch that inner lock to `tokio::sync::Mutex`.
50    ///
51    /// Requires the `async` crate feature and a Tokio runtime (multi-thread recommended).
52    pub async fn run_turn_async(
53        &mut self,
54        input: TurnInput,
55    ) -> Result<TurnOutcome, AinlRuntimeError> {
56        let depth = self.current_depth.fetch_add(1, Ordering::SeqCst);
57        let cd = Arc::clone(&self.current_depth);
58        let _depth_guard = scopeguard::guard((), move |()| {
59            cd.fetch_sub(1, Ordering::SeqCst);
60        });
61
62        if depth >= self.config.max_delegation_depth {
63            return Err(AinlRuntimeError::DelegationDepthExceeded {
64                depth,
65                max: self.config.max_delegation_depth,
66            });
67        }
68
69        if let Some(ref hooks_async) = self.hooks_async {
70            hooks_async.on_turn_start(&input).await;
71        }
72
73        if !self.config.enable_graph_memory {
74            let memory_context = MemoryContext::default();
75            let result = TurnResult {
76                memory_context,
77                status: TurnStatus::GraphMemoryDisabled,
78                ..Default::default()
79            };
80            let outcome = TurnOutcome::Complete(result);
81            self.hooks.on_turn_complete(&outcome);
82            if let Some(ref hooks_async) = self.hooks_async {
83                hooks_async.on_turn_complete(&outcome).await;
84            }
85            return Ok(outcome);
86        }
87
88        if self.config.agent_id.is_empty() {
89            return Err(AinlRuntimeError::Message(
90                "RuntimeConfig.agent_id must be set for run_turn".into(),
91            ));
92        }
93
94        let span = tracing::info_span!(
95            "ainl_runtime.run_turn_async",
96            agent_id = %self.config.agent_id,
97            turn = self.turn_count,
98            depth = input.depth,
99        );
100        let _span_enter = span.enter();
101
102        let arc = self.memory.shared_arc();
103        let agent_id = self.config.agent_id.clone();
104
105        let validation: GraphValidationReport = graph_spawn(Arc::clone(&arc), {
106            let agent_id = agent_id.clone();
107            move |m| m.sqlite_store().validate_graph(&agent_id)
108        })
109        .await?;
110
111        if !validation.is_valid {
112            let mut msg = String::from("graph validation failed before turn");
113            for d in &validation.dangling_edge_details {
114                msg.push_str(&format!(
115                    "; {} -> {} [{}]",
116                    d.source_id, d.target_id, d.edge_type
117                ));
118            }
119            return Err(AinlRuntimeError::Message(msg));
120        }
121
122        self.hooks
123            .on_artifact_loaded(&self.config.agent_id, validation.node_count);
124
125        let mut turn_warnings: Vec<TurnWarning> = Vec::new();
126
127        let t_persona = Instant::now();
128        let persona_prompt_contribution = if let Some(cached) = &self.persona_cache {
129            Some(cached.clone())
130        } else {
131            let nodes = graph_spawn(Arc::clone(&arc), {
132                let agent_id = agent_id.clone();
133                move |m| m.sqlite_store().query(&agent_id).persona_nodes()
134            })
135            .await?;
136            let compiled = compile_persona_from_nodes(&nodes).map_err(AinlRuntimeError::from)?;
137            self.persona_cache = compiled.clone();
138            compiled
139        };
140        self.hooks
141            .on_persona_compiled(persona_prompt_contribution.as_deref());
142        tracing::debug!(
143            target: "ainl_runtime",
144            duration_ms = t_persona.elapsed().as_millis() as u64,
145            has_contribution = persona_prompt_contribution.is_some(),
146            "persona_phase_async"
147        );
148
149        let t_memory = Instant::now();
150        let (recent_episodes, all_semantic, active_patches) = graph_spawn(Arc::clone(&arc), {
151            let agent_id = agent_id.clone();
152            move |m| {
153                let store = m.sqlite_store();
154                let q = store.query(&agent_id);
155                let recent_episodes = q.recent_episodes(5)?;
156                let all_semantic = q.semantic_nodes()?;
157                let active_patches = q.active_patches()?;
158                Ok((recent_episodes, all_semantic, active_patches))
159            }
160        })
161        .await?;
162
163        let relevant_semantic =
164            self.relevant_semantic_nodes(input.user_message.as_str(), all_semantic, 10);
165        let memory_context = MemoryContext {
166            recent_episodes,
167            relevant_semantic,
168            active_patches,
169            persona_snapshot: persona_snapshot_if_evolved(&self.extractor),
170            compiled_at: chrono::Utc::now(),
171        };
172
173        self.hooks.on_memory_context_ready(&memory_context);
174        tracing::debug!(
175            target: "ainl_runtime",
176            duration_ms = t_memory.elapsed().as_millis() as u64,
177            episode_count = memory_context.recent_episodes.len(),
178            semantic_count = memory_context.relevant_semantic.len(),
179            patch_count = memory_context.active_patches.len(),
180            "memory_context_async"
181        );
182
183        let t_patches = Instant::now();
184        let patch_dispatch_results = if self.config.enable_graph_memory {
185            self.dispatch_patches_collect_async(
186                &memory_context.active_patches,
187                &input.frame,
188                &arc,
189                &mut turn_warnings,
190            )
191            .await?
192        } else {
193            Vec::new()
194        };
195        for r in &patch_dispatch_results {
196            tracing::debug!(
197                target: "ainl_runtime",
198                label = %r.label,
199                dispatched = r.dispatched,
200                fitness_before = r.fitness_before,
201                fitness_after = r.fitness_after,
202                "patch_dispatch_async"
203            );
204        }
205        tracing::debug!(
206            target: "ainl_runtime",
207            duration_ms = t_patches.elapsed().as_millis() as u64,
208            "patch_dispatch_phase_async"
209        );
210
211        let dispatched_count = patch_dispatch_results
212            .iter()
213            .filter(|r| r.dispatched)
214            .count() as u32;
215        if dispatched_count >= self.config.max_steps {
216            let result = TurnResult {
217                patch_dispatch_results,
218                memory_context,
219                persona_prompt_contribution,
220                steps_executed: dispatched_count,
221                status: TurnStatus::StepLimitExceeded {
222                    steps_executed: dispatched_count,
223                },
224                ..Default::default()
225            };
226            let outcome = TurnOutcome::Complete(result);
227            self.hooks.on_turn_complete(&outcome);
228            if let Some(ref hooks_async) = self.hooks_async {
229                hooks_async.on_turn_complete(&outcome).await;
230            }
231            return Ok(outcome);
232        }
233
234        let t_episode = Instant::now();
235        let tools_canonical = normalize_tools_for_episode(&input.tools_invoked);
236        let tools_for_episode = tools_canonical.clone();
237        let input_clone = input.clone();
238        let episode_id = match graph_spawn(Arc::clone(&arc), {
239            let agent_id = agent_id.clone();
240            move |m| record_turn_episode(m, &agent_id, &input_clone, &tools_for_episode)
241        })
242        .await
243        {
244            Ok(id) => id,
245            Err(e) => {
246                let e = e.message_str().unwrap_or("episode write").to_string();
247                tracing::warn!(
248                    phase = ?TurnPhase::EpisodeWrite,
249                    error = %e,
250                    "non-fatal turn write failed — continuing"
251                );
252                turn_warnings.push(TurnWarning {
253                    phase: TurnPhase::EpisodeWrite,
254                    error: e,
255                });
256                Uuid::nil()
257            }
258        };
259        self.hooks.on_episode_recorded(episode_id);
260        tracing::debug!(
261            target: "ainl_runtime",
262            duration_ms = t_episode.elapsed().as_millis() as u64,
263            episode_id = %episode_id,
264            "episode_record_async"
265        );
266
267        if !episode_id.is_nil() {
268            for &tid in &input.emit_targets {
269                let eid = episode_id;
270                if let Err(e) = graph_spawn(Arc::clone(&arc), move |m| {
271                    m.sqlite_store()
272                        .insert_graph_edge_checked(eid, tid, EMIT_TO_EDGE)
273                })
274                .await
275                {
276                    let e = e.message_str().unwrap_or("edge").to_string();
277                    tracing::warn!(
278                        phase = ?TurnPhase::EpisodeWrite,
279                        error = %e,
280                        "non-fatal turn write failed — continuing"
281                    );
282                    turn_warnings.push(TurnWarning {
283                        phase: TurnPhase::EpisodeWrite,
284                        error: e,
285                    });
286                }
287            }
288        }
289
290        let emit_payload = serde_json::json!({
291            "episode_id": episode_id.to_string(),
292            "user_message": input.user_message,
293            "tools_invoked": tools_canonical,
294            "persona_contribution": persona_prompt_contribution,
295            "turn_count": self.turn_count.wrapping_add(1),
296        });
297        let emit_neighbors = graph_spawn(Arc::clone(&arc), {
298            let agent_id = agent_id.clone();
299            let eid = episode_id;
300            move |m| {
301                let store = m.sqlite_store();
302                store.query(&agent_id).neighbors(eid, EMIT_TO_EDGE)
303            }
304        })
305        .await;
306        match emit_neighbors {
307            Ok(neighbors) => {
308                for n in neighbors {
309                    let target = emit_target_name(&n);
310                    self.hooks.on_emit(&target, &emit_payload);
311                }
312            }
313            Err(e) => {
314                let e = e.message_str().unwrap_or("emit").to_string();
315                tracing::warn!(
316                    phase = ?TurnPhase::EpisodeWrite,
317                    error = %e,
318                    "non-fatal turn write failed — continuing"
319                );
320                turn_warnings.push(TurnWarning {
321                    phase: TurnPhase::EpisodeWrite,
322                    error: format!("emit_routing: {e}"),
323                });
324            }
325        }
326
327        if !episode_id.is_nil() {
328            let agent_id_traj = agent_id.clone();
329            let input_traj = input.clone();
330            let tools_traj = tools_canonical.clone();
331            let patches_traj = patch_dispatch_results.clone();
332            let eid = episode_id;
333            match graph_spawn(Arc::clone(&arc), move |m| {
334                maybe_persist_trajectory_after_episode(
335                    m,
336                    &agent_id_traj,
337                    eid,
338                    &tools_traj,
339                    &patches_traj,
340                    &input_traj,
341                )
342            })
343            .await
344            {
345                Ok(()) => {}
346                Err(e) => {
347                    let e = e.to_string();
348                    tracing::warn!(
349                        phase = ?TurnPhase::EpisodeWrite,
350                        error = %e,
351                        "non-fatal trajectory persist failed — continuing"
352                    );
353                    turn_warnings.push(TurnWarning {
354                        phase: TurnPhase::EpisodeWrite,
355                        error: format!("trajectory_persist: {e}"),
356                    });
357                }
358            }
359        }
360
361        self.turn_count = self.turn_count.wrapping_add(1);
362
363        let should_extract = self.config.extraction_interval > 0
364            && self.turn_count.saturating_sub(self.last_extraction_at_turn)
365                >= self.config.extraction_interval as u64;
366
367        let t_extract = Instant::now();
368        let (extraction_report, _extraction_failed) = if should_extract {
369            let force_fail = std::mem::take(&mut self.test_force_extraction_failure);
370
371            let res = if force_fail {
372                let e = "test_forced".to_string();
373                tracing::warn!(
374                    phase = ?TurnPhase::ExtractionPass,
375                    error = %e,
376                    "non-fatal turn write failed — continuing"
377                );
378                turn_warnings.push(TurnWarning {
379                    phase: TurnPhase::ExtractionPass,
380                    error: e,
381                });
382                tracing::debug!(
383                    target: "ainl_runtime",
384                    duration_ms = t_extract.elapsed().as_millis() as u64,
385                    signals_ingested = 0u64,
386                    skipped = false,
387                    "extraction_pass_async"
388                );
389                (None, true)
390            } else {
391                let mem = Arc::clone(&arc);
392                let placeholder = GraphExtractorTask::new(&agent_id);
393                let mut task = std::mem::replace(&mut self.extractor, placeholder);
394                let (task_back, report) = tokio::task::spawn_blocking(move || {
395                    let g = mem.lock().expect("graph mutex poisoned");
396                    let report = task.run_pass(g.sqlite_store());
397                    (task, report)
398                })
399                .await
400                .map_err(|e| AinlRuntimeError::AsyncJoinError(e.to_string()))?;
401                self.extractor = task_back;
402
403                if let Some(ref e) = report.extract_error {
404                    tracing::warn!(
405                        phase = ?TurnPhase::ExtractionPass,
406                        error = %e,
407                        "non-fatal turn write failed — continuing"
408                    );
409                    turn_warnings.push(TurnWarning {
410                        phase: TurnPhase::ExtractionPass,
411                        error: e.clone(),
412                    });
413                }
414                if let Some(ref e) = report.pattern_error {
415                    tracing::warn!(
416                        phase = ?TurnPhase::PatternPersistence,
417                        error = %e,
418                        "non-fatal turn write failed — continuing"
419                    );
420                    turn_warnings.push(TurnWarning {
421                        phase: TurnPhase::PatternPersistence,
422                        error: e.clone(),
423                    });
424                }
425                if let Some(ref e) = report.persona_error {
426                    tracing::warn!(
427                        phase = ?TurnPhase::PersonaEvolution,
428                        error = %e,
429                        "non-fatal turn write failed — continuing"
430                    );
431                    turn_warnings.push(TurnWarning {
432                        phase: TurnPhase::PersonaEvolution,
433                        error: e.clone(),
434                    });
435                }
436                let extraction_failed = report.has_errors();
437                if !extraction_failed {
438                    tracing::info!(
439                        agent_id = %report.agent_id,
440                        signals_extracted = report.signals_extracted,
441                        signals_applied = report.signals_applied,
442                        semantic_nodes_updated = report.semantic_nodes_updated,
443                        "ainl-graph-extractor pass completed (scheduled, async)"
444                    );
445                }
446                self.hooks.on_extraction_complete(&report);
447                self.persona_cache = None;
448                tracing::debug!(
449                    target: "ainl_runtime",
450                    duration_ms = t_extract.elapsed().as_millis() as u64,
451                    signals_ingested = report.signals_extracted as u64,
452                    skipped = false,
453                    "extraction_pass_async"
454                );
455                (Some(report), extraction_failed)
456            };
457            self.last_extraction_at_turn = self.turn_count;
458            res
459        } else {
460            tracing::debug!(
461                target: "ainl_runtime",
462                duration_ms = t_extract.elapsed().as_millis() as u64,
463                signals_ingested = 0u64,
464                skipped = true,
465                "extraction_pass_async"
466            );
467            (None, false)
468        };
469
470        if let Err(e) = graph_spawn(Arc::clone(&arc), {
471            let agent_id = agent_id.clone();
472            move |m| try_export_graph_json_armaraos(m.sqlite_store(), &agent_id)
473        })
474        .await
475        {
476            let e = e.message_str().unwrap_or("export").to_string();
477            tracing::warn!(
478                phase = ?TurnPhase::ExportRefresh,
479                error = %e,
480                "non-fatal turn write failed — continuing"
481            );
482            turn_warnings.push(TurnWarning {
483                phase: TurnPhase::ExportRefresh,
484                error: e,
485            });
486        }
487
488        if !self.config.agent_id.is_empty() {
489            let state = RuntimeStateNode {
490                agent_id: self.config.agent_id.clone(),
491                turn_count: self.turn_count,
492                last_extraction_at_turn: self.last_extraction_at_turn,
493                persona_snapshot_json: self
494                    .persona_cache
495                    .as_ref()
496                    .and_then(|p| serde_json::to_string(p).ok()),
497                updated_at: chrono::Utc::now().timestamp(),
498            };
499            let force_fail = std::mem::take(&mut self.test_force_runtime_state_write_failure);
500            let write_res: Result<(), AinlRuntimeError> = if force_fail {
501                Err(AinlRuntimeError::Message(
502                    "injected runtime state write failure".into(),
503                ))
504            } else {
505                graph_spawn(Arc::clone(&arc), move |m| m.write_runtime_state(&state)).await
506            };
507            if let Err(e) = write_res {
508                let e = e.to_string();
509                tracing::warn!(
510                    phase = ?TurnPhase::RuntimeStatePersist,
511                    error = %e,
512                    "failed to persist runtime state — cadence will reset on next restart"
513                );
514                turn_warnings.push(TurnWarning {
515                    phase: TurnPhase::RuntimeStatePersist,
516                    error: e,
517                });
518            }
519        }
520
521        let result = TurnResult {
522            episode_id,
523            persona_prompt_contribution,
524            memory_context,
525            extraction_report,
526            steps_executed: dispatched_count,
527            patch_dispatch_results,
528            status: TurnStatus::Ok,
529            vitals_gate: input.vitals_gate.clone(),
530            vitals_phase: input.vitals_phase.clone(),
531            vitals_trust: input.vitals_trust,
532        };
533
534        let outcome = if turn_warnings.is_empty() {
535            TurnOutcome::Complete(result)
536        } else {
537            TurnOutcome::PartialSuccess {
538                result,
539                warnings: turn_warnings,
540            }
541        };
542
543        self.hooks.on_turn_complete(&outcome);
544        if let Some(ref hooks_async) = self.hooks_async {
545            hooks_async.on_turn_complete(&outcome).await;
546        }
547        Ok(outcome)
548    }
549
550    async fn dispatch_patches_collect_async(
551        &mut self,
552        patches: &[AinlMemoryNode],
553        frame: &HashMap<String, serde_json::Value>,
554        arc: &Arc<Mutex<GraphMemory>>,
555        turn_warnings: &mut Vec<TurnWarning>,
556    ) -> Result<Vec<PatchDispatchResult>, AinlRuntimeError> {
557        let mut out = Vec::new();
558        for node in patches {
559            let res = self
560                .dispatch_one_patch_async(node, frame, Arc::clone(arc))
561                .await?;
562            if let Some(PatchSkipReason::PersistFailed(ref e)) = res.skip_reason {
563                tracing::warn!(
564                    phase = ?TurnPhase::FitnessWriteBack,
565                    error = %e,
566                    "non-fatal turn write failed — continuing"
567                );
568                turn_warnings.push(TurnWarning {
569                    phase: TurnPhase::FitnessWriteBack,
570                    error: format!("{}: {}", res.label, e),
571                });
572            }
573            out.push(res);
574        }
575        Ok(out)
576    }
577
578    async fn dispatch_one_patch_async(
579        &mut self,
580        node: &AinlMemoryNode,
581        frame: &HashMap<String, serde_json::Value>,
582        arc: Arc<Mutex<GraphMemory>>,
583    ) -> Result<PatchDispatchResult, AinlRuntimeError> {
584        let label_default = String::new();
585        let (label_src, pv, retired, reads, fitness_opt) = match &node.node_type {
586            AinlNodeType::Procedural { procedural } => (
587                procedural_label(procedural),
588                procedural.patch_version,
589                procedural.retired,
590                procedural.declared_reads.clone(),
591                procedural.fitness,
592            ),
593            _ => {
594                return Ok(PatchDispatchResult {
595                    label: label_default,
596                    patch_version: 0,
597                    fitness_before: 0.0,
598                    fitness_after: 0.0,
599                    dispatched: false,
600                    skip_reason: Some(PatchSkipReason::NotProcedural),
601                    adapter_output: None,
602                    adapter_name: None,
603                    dispatch_duration_ms: 0,
604                });
605            }
606        };
607
608        if pv == 0 {
609            return Ok(PatchDispatchResult {
610                label: label_src,
611                patch_version: pv,
612                fitness_before: fitness_opt.unwrap_or(0.5),
613                fitness_after: fitness_opt.unwrap_or(0.5),
614                dispatched: false,
615                skip_reason: Some(PatchSkipReason::ZeroVersion),
616                adapter_output: None,
617                adapter_name: None,
618                dispatch_duration_ms: 0,
619            });
620        }
621        if retired {
622            return Ok(PatchDispatchResult {
623                label: label_src.clone(),
624                patch_version: pv,
625                fitness_before: fitness_opt.unwrap_or(0.5),
626                fitness_after: fitness_opt.unwrap_or(0.5),
627                dispatched: false,
628                skip_reason: Some(PatchSkipReason::Retired),
629                adapter_output: None,
630                adapter_name: None,
631                dispatch_duration_ms: 0,
632            });
633        }
634        for key in &reads {
635            if !frame.contains_key(key) {
636                return Ok(PatchDispatchResult {
637                    label: label_src.clone(),
638                    patch_version: pv,
639                    fitness_before: fitness_opt.unwrap_or(0.5),
640                    fitness_after: fitness_opt.unwrap_or(0.5),
641                    dispatched: false,
642                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead(key.clone())),
643                    adapter_output: None,
644                    adapter_name: None,
645                    dispatch_duration_ms: 0,
646                });
647            }
648        }
649
650        let patch_label = label_src.clone();
651        let adapter_key = patch_label.as_str();
652        let ctx = PatchDispatchContext {
653            patch_label: adapter_key,
654            node,
655            frame,
656        };
657        let (adapter_output, adapter_name, dispatch_duration_ms) = if let Some(adapter) = self
658            .adapter_registry
659            .get(adapter_key)
660            .or_else(|| self.adapter_registry.get(GraphPatchAdapter::NAME))
661        {
662            let aname = adapter.name().to_string();
663            let t_exec = Instant::now();
664            let (out, name) = match adapter.execute_patch(&ctx) {
665                Ok(output) => {
666                    tracing::debug!(
667                        label = %patch_label,
668                        adapter = %aname,
669                        "adapter executed patch (async)"
670                    );
671                    (Some(output), Some(aname))
672                }
673                Err(e) => {
674                    tracing::warn!(
675                        label = %patch_label,
676                        adapter = %aname,
677                        error = %e,
678                        "adapter execution failed — continuing as metadata dispatch"
679                    );
680                    (None, Some(aname))
681                }
682            };
683            let ms = t_exec.elapsed().as_millis() as u64;
684            (out, name, ms)
685        } else {
686            (None, None, 0u64)
687        };
688
689        let fitness_before = fitness_opt.unwrap_or(0.5);
690        let fitness_after = 0.2_f32 * 1.0 + 0.8 * fitness_before;
691
692        let nid = node.id;
693        let updated = match graph_spawn(Arc::clone(&arc), move |m| {
694            let store = m.sqlite_store();
695            store.read_node(nid)
696        })
697        .await?
698        {
699            Some(mut n) => {
700                if let AinlNodeType::Procedural { ref mut procedural } = n.node_type {
701                    procedural.fitness = Some(fitness_after);
702                }
703                n
704            }
705            None => {
706                return Ok(PatchDispatchResult {
707                    label: label_src,
708                    patch_version: pv,
709                    fitness_before,
710                    fitness_after: fitness_before,
711                    dispatched: false,
712                    skip_reason: Some(PatchSkipReason::MissingDeclaredRead("node_row".into())),
713                    adapter_output,
714                    adapter_name,
715                    dispatch_duration_ms,
716                });
717            }
718        };
719
720        if self.test_force_fitness_write_failure {
721            self.test_force_fitness_write_failure = false;
722            let e = "injected fitness write failure".to_string();
723            return Ok(PatchDispatchResult {
724                label: label_src.clone(),
725                patch_version: pv,
726                fitness_before,
727                fitness_after: fitness_before,
728                dispatched: false,
729                skip_reason: Some(PatchSkipReason::PersistFailed(e)),
730                adapter_output,
731                adapter_name,
732                dispatch_duration_ms,
733            });
734        }
735
736        let updated_clone = updated.clone();
737        if let Err(e) = graph_spawn(arc, move |m| m.write_node(&updated_clone)).await {
738            return Ok(PatchDispatchResult {
739                label: label_src.clone(),
740                patch_version: pv,
741                fitness_before,
742                fitness_after: fitness_before,
743                dispatched: false,
744                skip_reason: Some(PatchSkipReason::PersistFailed(
745                    e.message_str().unwrap_or("write").to_string(),
746                )),
747                adapter_output,
748                adapter_name,
749                dispatch_duration_ms,
750            });
751        }
752
753        self.hooks
754            .on_patch_dispatched(label_src.as_str(), fitness_after);
755        if let Some(ref hooks_async) = self.hooks_async {
756            let hook_ctx = PatchDispatchContext {
757                patch_label: adapter_key,
758                node,
759                frame,
760            };
761            let _ = hooks_async.on_patch_dispatched(&hook_ctx).await;
762        }
763
764        Ok(PatchDispatchResult {
765            label: label_src,
766            patch_version: pv,
767            fitness_before,
768            fitness_after,
769            dispatched: true,
770            skip_reason: None,
771            adapter_output,
772            adapter_name,
773            dispatch_duration_ms,
774        })
775    }
776}