Skip to main content

ranvier_runtime/
axon.rs

1//! # Axon: Executable Decision Tree
2//!
3//! The `Axon` is the **runtime execution path** of a Typed Decision Tree.
4//! It functions as a reusable Typed Decision flow (Axon<In, Out, E>).
5//!
6//! ## Design Philosophy
7//!
8//! * **Axon flows, Schematic shows**: Axon executes; Schematic describes
9//! * **Builder pattern**: `Axon::start().then().then()`
10//! * **Schematic extraction**: Every Axon carries its structural metadata
11//!
12//! "Axon is the flowing thing, Schematic is the visible thing."
13
14use crate::persistence::{
15    CompensationAutoTrigger, CompensationContext, CompensationHandle,
16    CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17    PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use ranvier_core::bus::Bus;
20use ranvier_core::outcome::Outcome;
21use ranvier_core::schematic::{
22    BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
23};
24use ranvier_core::timeline::{Timeline, TimelineEvent};
25use ranvier_core::transition::Transition;
26use std::any::type_name;
27use std::ffi::OsString;
28use std::fs;
29use std::future::Future;
30use std::panic::Location;
31use std::path::{Path, PathBuf};
32use std::pin::Pin;
33use std::sync::{Arc, Mutex, OnceLock};
34use std::time::{SystemTime, UNIX_EPOCH};
35use tracing::Instrument;
36
37/// Type alias for async boxed futures used in Axon execution.
38pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
39
40/// Executor type for Axon steps.
41/// Now takes an input state `In`, a resource bundle `Res`, and returns an `Outcome<Out, E>`.
42/// Must be Send + Sync to be reusable across threads and clones.
43pub type Executor<In, Out, E, Res> =
44    Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
45
46/// Helper to extract a readable type name from a type.
47fn type_name_of<T: ?Sized>() -> String {
48    let full = type_name::<T>();
49    full.split("::").last().unwrap_or(full).to_string()
50}
51
52/// The Axon Builder and Runtime.
53///
54/// `Axon` represents an executable decision tree.
55/// It is reusable and thread-safe.
56///
57/// ## Example
58///
59/// ```rust,ignore
60/// use ranvier_core::prelude::*;
61/// // ...
62/// // Start with an identity Axon (In -> In)
63/// let axon = Axon::<(), (), _>::new("My Axon")
64///     .then(StepA)
65///     .then(StepB);
66///
67/// // Execute multiple times
68/// let res1 = axon.execute((), &mut bus1).await;
69/// let res2 = axon.execute((), &mut bus2).await;
70/// ```
71pub struct Axon<In, Out, E, Res = ()> {
72    /// The static structure (for visualization/analysis)
73    pub schematic: Schematic,
74    /// The runtime executor
75    executor: Executor<In, Out, E, Res>,
76}
77
78/// Schematic export request derived from command-line args/env.
79#[derive(Debug, Clone)]
80pub struct SchematicExportRequest {
81    /// Optional output file path. If omitted, schematic is written to stdout.
82    pub output: Option<PathBuf>,
83}
84
85impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
86    fn clone(&self) -> Self {
87        Self {
88            schematic: self.schematic.clone(),
89            executor: self.executor.clone(),
90        }
91    }
92}
93
94impl<In, E, Res> Axon<In, In, E, Res>
95where
96    In: Send + Sync + 'static,
97    E: Send + 'static,
98    Res: ranvier_core::transition::ResourceRequirement,
99{
100    /// Create a new Axon flow with the given label.
101    /// This is the preferred entry point per Flat API guidelines.
102    #[track_caller]
103    pub fn new(label: &str) -> Self {
104        let caller = Location::caller();
105        Self::start_with_source(label, caller)
106    }
107
108    /// Start defining a new Axon flow.
109    /// This creates an Identity Axon (In -> In) with no initial resource requirements.
110    #[track_caller]
111    pub fn start(label: &str) -> Self {
112        let caller = Location::caller();
113        Self::start_with_source(label, caller)
114    }
115
116    fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
117        let node_id = uuid::Uuid::new_v4().to_string();
118        let node = Node {
119            id: node_id,
120            kind: NodeKind::Ingress,
121            label: label.to_string(),
122            description: None,
123            input_type: "void".to_string(),
124            output_type: type_name_of::<In>(),
125            resource_type: type_name_of::<Res>(),
126            metadata: Default::default(),
127            bus_capability: None,
128            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
129        };
130
131        let mut schematic = Schematic::new(label);
132        schematic.nodes.push(node);
133
134        let executor: Executor<In, In, E, Res> =
135            Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
136
137        Self {
138            schematic,
139            executor,
140        }
141    }
142}
143
144impl<In, Out, E, Res> Axon<In, Out, E, Res>
145where
146    In: Send + Sync + 'static,
147    Out: Send + Sync + 'static,
148    E: Send + 'static,
149    Res: ranvier_core::transition::ResourceRequirement,
150{
151    /// Chain a transition to this Axon.
152    ///
153    /// Requires the transition to use the SAME resource bundle as the previous steps.
154    #[track_caller]
155    pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
156    where
157        Next: Send + Sync + 'static,
158        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
159    {
160        let caller = Location::caller();
161        // Decompose self to avoid partial move issues
162        let Axon {
163            mut schematic,
164            executor: prev_executor,
165        } = self;
166
167        // Update Schematic
168        let next_node_id = uuid::Uuid::new_v4().to_string();
169        let next_node = Node {
170            id: next_node_id.clone(),
171            kind: NodeKind::Atom,
172            label: transition.label(),
173            description: transition.description(),
174            input_type: type_name_of::<Out>(),
175            output_type: type_name_of::<Next>(),
176            resource_type: type_name_of::<Res>(),
177            metadata: Default::default(),
178            bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
179            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
180        };
181
182        let last_node_id = schematic
183            .nodes
184            .last()
185            .map(|n| n.id.clone())
186            .unwrap_or_default();
187
188        schematic.nodes.push(next_node);
189        schematic.edges.push(Edge {
190            from: last_node_id,
191            to: next_node_id.clone(),
192            kind: EdgeType::Linear,
193            label: Some("Next".to_string()),
194        });
195
196        // Compose Executor
197        let node_id_for_exec = next_node_id.clone();
198        let node_label_for_exec = transition.label();
199        let bus_policy_for_exec = transition.bus_access_policy();
200        let next_executor: Executor<In, Next, E, Res> = Arc::new(
201            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
202                let prev = prev_executor.clone();
203                let trans = transition.clone();
204                let timeline_node_id = node_id_for_exec.clone();
205                let timeline_node_label = node_label_for_exec.clone();
206                let transition_bus_policy = bus_policy_for_exec.clone();
207
208                Box::pin(async move {
209                    // Run previous step
210                    let prev_result = prev(input, res, bus).await;
211
212                    // Unpack
213                    let state = match prev_result {
214                        Outcome::Next(t) => t,
215                        other => return other.map(|_| unreachable!()),
216                    };
217
218                    // Run this step with automatic instrumentation
219                    let label = trans.label();
220                    let res_type = std::any::type_name::<Res>()
221                        .split("::")
222                        .last()
223                        .unwrap_or("unknown");
224
225                    let enter_ts = now_ms();
226                    if let Some(timeline) = bus.read_mut::<Timeline>() {
227                        timeline.push(TimelineEvent::NodeEnter {
228                            node_id: timeline_node_id.clone(),
229                            node_label: timeline_node_label.clone(),
230                            timestamp: enter_ts,
231                        });
232                    }
233
234                    let node_span = tracing::info_span!(
235                        "Node",
236                        ranvier.node = %label,
237                        ranvier.resource_type = %res_type,
238                        ranvier.outcome_kind = tracing::field::Empty,
239                        ranvier.outcome_target = tracing::field::Empty
240                    );
241                    let started = std::time::Instant::now();
242                    bus.set_access_policy(label.clone(), transition_bus_policy.clone());
243                    let result = trans
244                        .run(state, res, bus)
245                        .instrument(node_span.clone())
246                        .await;
247                    bus.clear_access_policy();
248                    node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
249                    if let Some(target) = outcome_target(&result) {
250                        node_span
251                            .record("ranvier.outcome_target", tracing::field::display(&target));
252                    }
253                    let duration_ms = started.elapsed().as_millis() as u64;
254                    let exit_ts = now_ms();
255
256                    if let Some(timeline) = bus.read_mut::<Timeline>() {
257                        timeline.push(TimelineEvent::NodeExit {
258                            node_id: timeline_node_id.clone(),
259                            outcome_type: outcome_type_name(&result),
260                            duration_ms,
261                            timestamp: exit_ts,
262                        });
263
264                        if let Outcome::Branch(branch_id, _) = &result {
265                            timeline.push(TimelineEvent::Branchtaken {
266                                branch_id: branch_id.clone(),
267                                timestamp: exit_ts,
268                            });
269                        }
270                    }
271
272                    result
273                })
274            },
275        );
276
277        Axon {
278            schematic,
279            executor: next_executor,
280        }
281    }
282
283    /// Add a branch point
284    #[track_caller]
285    pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
286        let caller = Location::caller();
287        let branch_id_str = branch_id.into();
288        let last_node_id = self
289            .schematic
290            .nodes
291            .last()
292            .map(|n| n.id.clone())
293            .unwrap_or_default();
294
295        let branch_node = Node {
296            id: uuid::Uuid::new_v4().to_string(),
297            kind: NodeKind::Synapse,
298            label: label.to_string(),
299            description: None,
300            input_type: type_name_of::<Out>(),
301            output_type: type_name_of::<Out>(),
302            resource_type: type_name_of::<Res>(),
303            metadata: Default::default(),
304            bus_capability: None,
305            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
306        };
307
308        self.schematic.nodes.push(branch_node);
309        self.schematic.edges.push(Edge {
310            from: last_node_id,
311            to: branch_id_str.clone(),
312            kind: EdgeType::Branch(branch_id_str),
313            label: Some("Branch".to_string()),
314        });
315
316        self
317    }
318
319    /// Execute the Axon with the given input and resources.
320    pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
321        let trace_id = persistence_trace_id(bus);
322        let label = self.schematic.name.clone();
323        let persistence_handle = bus.read::<PersistenceHandle>().cloned();
324        let compensation_handle = bus.read::<CompensationHandle>().cloned();
325        let compensation_retry_policy = compensation_retry_policy(bus);
326        let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
327        let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
328            let start_step = next_persistence_step(handle, &trace_id).await;
329            persist_execution_event(handle, &trace_id, &label, start_step, "Enter").await;
330            Some(start_step)
331        } else {
332            None
333        };
334
335        let should_capture = should_attach_timeline(bus);
336        let inserted_timeline = if should_capture {
337            ensure_timeline(bus)
338        } else {
339            false
340        };
341        let ingress_started = std::time::Instant::now();
342        let ingress_enter_ts = now_ms();
343        if should_capture
344            && let (Some(timeline), Some(ingress)) =
345                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
346        {
347            timeline.push(TimelineEvent::NodeEnter {
348                node_id: ingress.id.clone(),
349                node_label: ingress.label.clone(),
350                timestamp: ingress_enter_ts,
351            });
352        }
353
354        let circuit_span = tracing::info_span!(
355            "Circuit",
356            ranvier.circuit = %label,
357            ranvier.outcome_kind = tracing::field::Empty,
358            ranvier.outcome_target = tracing::field::Empty
359        );
360        let outcome = (self.executor)(input, resources, bus)
361            .instrument(circuit_span.clone())
362            .await;
363        circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
364        if let Some(target) = outcome_target(&outcome) {
365            circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
366        }
367
368        let ingress_exit_ts = now_ms();
369        if should_capture
370            && let (Some(timeline), Some(ingress)) =
371                (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
372        {
373            timeline.push(TimelineEvent::NodeExit {
374                node_id: ingress.id.clone(),
375                outcome_type: outcome_type_name(&outcome),
376                duration_ms: ingress_started.elapsed().as_millis() as u64,
377                timestamp: ingress_exit_ts,
378            });
379        }
380
381        if let Some(handle) = persistence_handle.as_ref() {
382            let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
383            persist_execution_event(
384                handle,
385                &trace_id,
386                &label,
387                fault_step,
388                outcome_kind_name(&outcome),
389            )
390            .await;
391
392            let mut completion = completion_from_outcome(&outcome);
393            if matches!(outcome, Outcome::Fault(_))
394                && let Some(compensation) = compensation_handle.as_ref()
395                && compensation_auto_trigger(bus)
396            {
397                let context = CompensationContext {
398                    trace_id: trace_id.clone(),
399                    circuit: label.clone(),
400                    fault_kind: outcome_kind_name(&outcome).to_string(),
401                    fault_step,
402                    timestamp_ms: now_ms(),
403                };
404
405                if run_compensation(
406                    compensation,
407                    context,
408                    compensation_retry_policy,
409                    compensation_idempotency.clone(),
410                )
411                .await
412                {
413                    persist_execution_event(
414                        handle,
415                        &trace_id,
416                        &label,
417                        fault_step.saturating_add(1),
418                        "Compensated",
419                    )
420                    .await;
421                    completion = CompletionState::Compensated;
422                }
423            }
424
425            if persistence_auto_complete(bus) {
426                persist_completion(handle, &trace_id, completion).await;
427            }
428        }
429
430        if should_capture {
431            maybe_export_timeline(bus, &outcome);
432        }
433        if inserted_timeline {
434            let _ = bus.remove::<Timeline>();
435        }
436
437        outcome
438    }
439
440    /// Starts the Ranvier Inspector for this Axon on the specified port.
441    /// This spawns a background task to serve the Schematic.
442    pub fn serve_inspector(self, port: u16) -> Self {
443        if !inspector_dev_mode_from_env() {
444            tracing::info!("Inspector disabled because RANVIER_MODE is production");
445            return self;
446        }
447        if !inspector_enabled_from_env() {
448            tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
449            return self;
450        }
451
452        let schematic = self.schematic.clone();
453        tokio::spawn(async move {
454            if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
455                .with_projection_files_from_env()
456                .with_mode_from_env()
457                .with_auth_policy_from_env()
458                .serve()
459                .await
460            {
461                tracing::error!("Inspector server failed: {}", e);
462            }
463        });
464        self
465    }
466
467    /// Get a reference to the Schematic (structural view).
468    pub fn schematic(&self) -> &Schematic {
469        &self.schematic
470    }
471
472    /// Consume and return the Schematic.
473    pub fn into_schematic(self) -> Schematic {
474        self.schematic
475    }
476
477    /// Detect schematic export mode from runtime flags.
478    ///
479    /// Supported triggers:
480    /// - `RANVIER_SCHEMATIC=1|true|on|yes`
481    /// - `--schematic`
482    ///
483    /// Optional output path:
484    /// - `RANVIER_SCHEMATIC_OUTPUT=<path>`
485    /// - `--schematic-output <path>` / `--schematic-output=<path>`
486    /// - `--output <path>` / `--output=<path>` (only relevant in schematic mode)
487    pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
488        schematic_export_request_from_process()
489    }
490
491    /// Export schematic and return `true` when schematic mode is active.
492    ///
493    /// Use this once after circuit construction and before server/custom loops:
494    ///
495    /// ```rust,ignore
496    /// let axon = build_axon();
497    /// if axon.maybe_export_and_exit()? {
498    ///     return Ok(());
499    /// }
500    /// // Normal runtime path...
501    /// ```
502    pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
503        self.maybe_export_and_exit_with(|_| ())
504    }
505
506    /// Same as [`Self::maybe_export_and_exit`] but allows a custom hook right before export/exit.
507    ///
508    /// This is useful when your app has custom loop/bootstrap behavior and you want
509    /// to skip or cleanup that logic in schematic mode.
510    pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
511    where
512        F: FnOnce(&SchematicExportRequest),
513    {
514        let Some(request) = self.schematic_export_request() else {
515            return Ok(false);
516        };
517        on_before_exit(&request);
518        self.export_schematic(&request)?;
519        Ok(true)
520    }
521
522    /// Export schematic according to the provided request.
523    pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
524        let json = serde_json::to_string_pretty(self.schematic())?;
525        if let Some(path) = &request.output {
526            if let Some(parent) = path.parent() {
527                if !parent.as_os_str().is_empty() {
528                    fs::create_dir_all(parent)?;
529                }
530            }
531            fs::write(path, json.as_bytes())?;
532            return Ok(());
533        }
534        println!("{}", json);
535        Ok(())
536    }
537}
538
539fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
540    let args: Vec<OsString> = std::env::args_os().skip(1).collect();
541    let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
542    let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
543
544    let mut i = 0;
545    while i < args.len() {
546        let arg = args[i].to_string_lossy();
547
548        if arg == "--schematic" {
549            enabled = true;
550            i += 1;
551            continue;
552        }
553
554        if arg == "--schematic-output" || arg == "--output" {
555            if let Some(next) = args.get(i + 1) {
556                output = Some(PathBuf::from(next));
557                i += 2;
558                continue;
559            }
560        } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
561            output = Some(PathBuf::from(value));
562        } else if let Some(value) = arg.strip_prefix("--output=") {
563            output = Some(PathBuf::from(value));
564        }
565
566        i += 1;
567    }
568
569    if enabled {
570        Some(SchematicExportRequest { output })
571    } else {
572        None
573    }
574}
575
576fn env_flag_is_true(key: &str) -> bool {
577    match std::env::var(key) {
578        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
579        Err(_) => false,
580    }
581}
582
583fn inspector_enabled_from_env() -> bool {
584    let raw = std::env::var("RANVIER_INSPECTOR").ok();
585    inspector_enabled_from_value(raw.as_deref())
586}
587
588fn inspector_enabled_from_value(value: Option<&str>) -> bool {
589    match value {
590        Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
591        None => true,
592    }
593}
594
595fn inspector_dev_mode_from_env() -> bool {
596    let raw = std::env::var("RANVIER_MODE").ok();
597    inspector_dev_mode_from_value(raw.as_deref())
598}
599
600fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
601    !matches!(
602        value.map(|v| v.to_ascii_lowercase()),
603        Some(mode) if mode == "prod" || mode == "production"
604    )
605}
606
607fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
608    let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
609        Ok(v) if !v.trim().is_empty() => v,
610        _ => return,
611    };
612
613    let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
614    let policy = timeline_adaptive_policy();
615    let forced = should_force_export(outcome, &policy);
616    let should_export = sampled || forced;
617    if !should_export {
618        record_sampling_stats(false, sampled, forced, "none", &policy);
619        return;
620    }
621
622    let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
623    timeline.sort();
624
625    let mode = std::env::var("RANVIER_TIMELINE_MODE")
626        .unwrap_or_else(|_| "overwrite".to_string())
627        .to_ascii_lowercase();
628
629    if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
630        tracing::warn!(
631            "Failed to persist timeline file {} (mode={}): {}",
632            path,
633            mode,
634            err
635        );
636        record_sampling_stats(false, sampled, forced, &mode, &policy);
637    } else {
638        record_sampling_stats(true, sampled, forced, &mode, &policy);
639    }
640}
641
642fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
643    match outcome {
644        Outcome::Next(_) => "Next".to_string(),
645        Outcome::Branch(id, _) => format!("Branch:{}", id),
646        Outcome::Jump(id, _) => format!("Jump:{}", id),
647        Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
648        Outcome::Fault(_) => "Fault".to_string(),
649    }
650}
651
652fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
653    match outcome {
654        Outcome::Next(_) => "Next",
655        Outcome::Branch(_, _) => "Branch",
656        Outcome::Jump(_, _) => "Jump",
657        Outcome::Emit(_, _) => "Emit",
658        Outcome::Fault(_) => "Fault",
659    }
660}
661
662fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
663    match outcome {
664        Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
665        Outcome::Jump(node_id, _) => Some(node_id.to_string()),
666        Outcome::Emit(event_type, _) => Some(event_type.clone()),
667        Outcome::Next(_) | Outcome::Fault(_) => None,
668    }
669}
670
671fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
672    match outcome {
673        Outcome::Fault(_) => CompletionState::Fault,
674        _ => CompletionState::Success,
675    }
676}
677
678fn persistence_trace_id(bus: &Bus) -> String {
679    if let Some(explicit) = bus.read::<PersistenceTraceId>() {
680        explicit.0.clone()
681    } else {
682        format!("{}:{}", bus.id, now_ms())
683    }
684}
685
686fn persistence_auto_complete(bus: &Bus) -> bool {
687    bus.read::<PersistenceAutoComplete>()
688        .map(|v| v.0)
689        .unwrap_or(true)
690}
691
692fn compensation_auto_trigger(bus: &Bus) -> bool {
693    bus.read::<CompensationAutoTrigger>()
694        .map(|v| v.0)
695        .unwrap_or(true)
696}
697
698fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
699    bus.read::<CompensationRetryPolicy>()
700        .copied()
701        .unwrap_or_default()
702}
703
704async fn next_persistence_step(handle: &PersistenceHandle, trace_id: &str) -> u64 {
705    let store = handle.store();
706    match store.load(trace_id).await {
707        Ok(Some(trace)) => trace
708            .events
709            .last()
710            .map(|event| event.step.saturating_add(1))
711            .unwrap_or(0),
712        Ok(None) => 0,
713        Err(err) => {
714            tracing::warn!(
715                trace_id = %trace_id,
716                error = %err,
717                "Failed to load persistence trace; falling back to step=0"
718            );
719            0
720        }
721    }
722}
723
724async fn persist_execution_event(
725    handle: &PersistenceHandle,
726    trace_id: &str,
727    circuit: &str,
728    step: u64,
729    outcome_kind: &str,
730) {
731    let store = handle.store();
732    let envelope = PersistenceEnvelope {
733        trace_id: trace_id.to_string(),
734        circuit: circuit.to_string(),
735        step,
736        outcome_kind: outcome_kind.to_string(),
737        timestamp_ms: now_ms(),
738        payload_hash: None,
739    };
740
741    if let Err(err) = store.append(envelope).await {
742        tracing::warn!(
743            trace_id = %trace_id,
744            circuit = %circuit,
745            step,
746            outcome_kind = %outcome_kind,
747            error = %err,
748            "Failed to append persistence envelope"
749        );
750    }
751}
752
753async fn persist_completion(
754    handle: &PersistenceHandle,
755    trace_id: &str,
756    completion: CompletionState,
757) {
758    let store = handle.store();
759    if let Err(err) = store.complete(trace_id, completion).await {
760        tracing::warn!(
761            trace_id = %trace_id,
762            error = %err,
763            "Failed to complete persistence trace"
764        );
765    }
766}
767
768fn compensation_idempotency_key(context: &CompensationContext) -> String {
769    format!(
770        "{}:{}:{}",
771        context.trace_id, context.circuit, context.fault_kind
772    )
773}
774
775async fn run_compensation(
776    handle: &CompensationHandle,
777    context: CompensationContext,
778    retry_policy: CompensationRetryPolicy,
779    idempotency: Option<CompensationIdempotencyHandle>,
780) -> bool {
781    let hook = handle.hook();
782    let key = compensation_idempotency_key(&context);
783
784    if let Some(store_handle) = idempotency.as_ref() {
785        let store = store_handle.store();
786        match store.was_compensated(&key).await {
787            Ok(true) => {
788                tracing::info!(
789                    trace_id = %context.trace_id,
790                    circuit = %context.circuit,
791                    key = %key,
792                    "Compensation already recorded; skipping duplicate hook execution"
793                );
794                return true;
795            }
796            Ok(false) => {}
797            Err(err) => {
798                tracing::warn!(
799                    trace_id = %context.trace_id,
800                    key = %key,
801                    error = %err,
802                    "Failed to check compensation idempotency state"
803                );
804            }
805        }
806    }
807
808    let max_attempts = retry_policy.max_attempts.max(1);
809    for attempt in 1..=max_attempts {
810        match hook.compensate(context.clone()).await {
811            Ok(()) => {
812                if let Some(store_handle) = idempotency.as_ref() {
813                    let store = store_handle.store();
814                    if let Err(err) = store.mark_compensated(&key).await {
815                        tracing::warn!(
816                            trace_id = %context.trace_id,
817                            key = %key,
818                            error = %err,
819                            "Failed to mark compensation idempotency state"
820                        );
821                    }
822                }
823                return true;
824            }
825            Err(err) => {
826                let is_last = attempt == max_attempts;
827                tracing::warn!(
828                    trace_id = %context.trace_id,
829                    circuit = %context.circuit,
830                    fault_kind = %context.fault_kind,
831                    fault_step = context.fault_step,
832                    attempt,
833                    max_attempts,
834                    error = %err,
835                    "Compensation hook attempt failed"
836                );
837                if !is_last && retry_policy.backoff_ms > 0 {
838                    tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
839                        .await;
840                }
841            }
842        }
843    }
844    false
845}
846
847fn ensure_timeline(bus: &mut Bus) -> bool {
848    if bus.has::<Timeline>() {
849        false
850    } else {
851        bus.insert(Timeline::new());
852        true
853    }
854}
855
856fn should_attach_timeline(bus: &Bus) -> bool {
857    // Respect explicitly provided timeline collector from caller.
858    if bus.has::<Timeline>() {
859        return true;
860    }
861
862    // Attach timeline when runtime export path exists.
863    has_timeline_output_path()
864}
865
866fn has_timeline_output_path() -> bool {
867    std::env::var("RANVIER_TIMELINE_OUTPUT")
868        .ok()
869        .map(|v| !v.trim().is_empty())
870        .unwrap_or(false)
871}
872
873fn timeline_sample_rate() -> f64 {
874    std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
875        .ok()
876        .and_then(|v| v.parse::<f64>().ok())
877        .map(|v| v.clamp(0.0, 1.0))
878        .unwrap_or(1.0)
879}
880
881fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
882    if rate <= 0.0 {
883        return false;
884    }
885    if rate >= 1.0 {
886        return true;
887    }
888    let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
889    bucket < rate
890}
891
892fn timeline_adaptive_policy() -> String {
893    std::env::var("RANVIER_TIMELINE_ADAPTIVE")
894        .unwrap_or_else(|_| "fault_branch".to_string())
895        .to_ascii_lowercase()
896}
897
898fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
899    match policy {
900        "off" => false,
901        "fault_only" => matches!(outcome, Outcome::Fault(_)),
902        "fault_branch_emit" => {
903            matches!(
904                outcome,
905                Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
906            )
907        }
908        _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
909    }
910}
911
912#[derive(Default, Clone)]
913struct SamplingStats {
914    total_decisions: u64,
915    exported: u64,
916    skipped: u64,
917    sampled_exports: u64,
918    forced_exports: u64,
919    last_mode: String,
920    last_policy: String,
921    last_updated_ms: u64,
922}
923
924static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
925
926fn stats_cell() -> &'static Mutex<SamplingStats> {
927    TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
928}
929
930fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
931    let snapshot = {
932        let mut stats = match stats_cell().lock() {
933            Ok(guard) => guard,
934            Err(_) => return,
935        };
936
937        stats.total_decisions += 1;
938        if exported {
939            stats.exported += 1;
940        } else {
941            stats.skipped += 1;
942        }
943        if sampled && exported {
944            stats.sampled_exports += 1;
945        }
946        if forced && exported {
947            stats.forced_exports += 1;
948        }
949        stats.last_mode = mode.to_string();
950        stats.last_policy = policy.to_string();
951        stats.last_updated_ms = now_ms();
952        stats.clone()
953    };
954
955    tracing::debug!(
956        ranvier.timeline.total_decisions = snapshot.total_decisions,
957        ranvier.timeline.exported = snapshot.exported,
958        ranvier.timeline.skipped = snapshot.skipped,
959        ranvier.timeline.sampled_exports = snapshot.sampled_exports,
960        ranvier.timeline.forced_exports = snapshot.forced_exports,
961        ranvier.timeline.mode = %snapshot.last_mode,
962        ranvier.timeline.policy = %snapshot.last_policy,
963        "Timeline sampling stats updated"
964    );
965
966    if let Some(path) = timeline_stats_output_path() {
967        let payload = serde_json::json!({
968            "total_decisions": snapshot.total_decisions,
969            "exported": snapshot.exported,
970            "skipped": snapshot.skipped,
971            "sampled_exports": snapshot.sampled_exports,
972            "forced_exports": snapshot.forced_exports,
973            "last_mode": snapshot.last_mode,
974            "last_policy": snapshot.last_policy,
975            "last_updated_ms": snapshot.last_updated_ms
976        });
977        if let Some(parent) = Path::new(&path).parent() {
978            let _ = fs::create_dir_all(parent);
979        }
980        if let Err(err) = fs::write(&path, payload.to_string()) {
981            tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
982        }
983    }
984}
985
986fn timeline_stats_output_path() -> Option<String> {
987    std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
988        .ok()
989        .filter(|v| !v.trim().is_empty())
990}
991
992fn write_timeline_with_policy(
993    path: &str,
994    mode: &str,
995    mut timeline: Timeline,
996) -> Result<(), String> {
997    match mode {
998        "append" => {
999            if Path::new(path).exists() {
1000                let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
1001                match serde_json::from_str::<Timeline>(&content) {
1002                    Ok(mut existing) => {
1003                        existing.events.append(&mut timeline.events);
1004                        existing.sort();
1005                        if let Some(max_events) = max_events_limit() {
1006                            truncate_timeline_events(&mut existing, max_events);
1007                        }
1008                        write_timeline_json(path, &existing)
1009                    }
1010                    Err(_) => {
1011                        // Fallback: if existing is invalid, replace with current timeline
1012                        if let Some(max_events) = max_events_limit() {
1013                            truncate_timeline_events(&mut timeline, max_events);
1014                        }
1015                        write_timeline_json(path, &timeline)
1016                    }
1017                }
1018            } else {
1019                if let Some(max_events) = max_events_limit() {
1020                    truncate_timeline_events(&mut timeline, max_events);
1021                }
1022                write_timeline_json(path, &timeline)
1023            }
1024        }
1025        "rotate" => {
1026            let rotated_path = rotated_path(path, now_ms());
1027            write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
1028            if let Some(keep) = rotate_keep_limit() {
1029                cleanup_rotated_files(path, keep)?;
1030            }
1031            Ok(())
1032        }
1033        _ => write_timeline_json(path, &timeline),
1034    }
1035}
1036
1037fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
1038    if let Some(parent) = Path::new(path).parent() {
1039        if !parent.as_os_str().is_empty() {
1040            fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1041        }
1042    }
1043    let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
1044    fs::write(path, json).map_err(|e| e.to_string())
1045}
1046
1047fn rotated_path(path: &str, suffix: u64) -> PathBuf {
1048    let p = Path::new(path);
1049    let parent = p.parent().unwrap_or_else(|| Path::new(""));
1050    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
1051    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
1052    parent.join(format!("{}_{}.{}", stem, suffix, ext))
1053}
1054
1055fn max_events_limit() -> Option<usize> {
1056    std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
1057        .ok()
1058        .and_then(|v| v.parse::<usize>().ok())
1059        .filter(|v| *v > 0)
1060}
1061
1062fn rotate_keep_limit() -> Option<usize> {
1063    std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
1064        .ok()
1065        .and_then(|v| v.parse::<usize>().ok())
1066        .filter(|v| *v > 0)
1067}
1068
1069fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
1070    let len = timeline.events.len();
1071    if len > max_events {
1072        let keep_from = len - max_events;
1073        timeline.events = timeline.events.split_off(keep_from);
1074    }
1075}
1076
1077fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
1078    let p = Path::new(base_path);
1079    let parent = p.parent().unwrap_or_else(|| Path::new("."));
1080    let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
1081    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
1082    let prefix = format!("{}_", stem);
1083    let suffix = format!(".{}", ext);
1084
1085    let mut files = fs::read_dir(parent)
1086        .map_err(|e| e.to_string())?
1087        .filter_map(|entry| entry.ok())
1088        .filter(|entry| {
1089            let name = entry.file_name();
1090            let name = name.to_string_lossy();
1091            name.starts_with(&prefix) && name.ends_with(&suffix)
1092        })
1093        .filter_map(|entry| {
1094            let modified = entry
1095                .metadata()
1096                .ok()
1097                .and_then(|m| m.modified().ok())
1098                .unwrap_or(SystemTime::UNIX_EPOCH);
1099            Some((entry.path(), modified))
1100        })
1101        .collect::<Vec<_>>();
1102
1103    files.sort_by(|a, b| b.1.cmp(&a.1));
1104    for (path, _) in files.into_iter().skip(keep) {
1105        let _ = fs::remove_file(path);
1106    }
1107    Ok(())
1108}
1109
1110fn bus_capability_schema_from_policy(
1111    policy: Option<ranvier_core::bus::BusAccessPolicy>,
1112) -> Option<BusCapabilitySchema> {
1113    let Some(policy) = policy else {
1114        return None;
1115    };
1116
1117    let mut allow = policy
1118        .allow
1119        .unwrap_or_default()
1120        .into_iter()
1121        .map(|entry| entry.type_name.to_string())
1122        .collect::<Vec<_>>();
1123    let mut deny = policy
1124        .deny
1125        .into_iter()
1126        .map(|entry| entry.type_name.to_string())
1127        .collect::<Vec<_>>();
1128    allow.sort();
1129    deny.sort();
1130
1131    if allow.is_empty() && deny.is_empty() {
1132        return None;
1133    }
1134
1135    Some(BusCapabilitySchema { allow, deny })
1136}
1137
1138fn now_ms() -> u64 {
1139    SystemTime::now()
1140        .duration_since(UNIX_EPOCH)
1141        .map(|d| d.as_millis() as u64)
1142        .unwrap_or(0)
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147    use super::{
1148        Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
1149        should_force_export,
1150    };
1151    use crate::persistence::{
1152        CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
1153        CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
1154        InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
1155        PersistenceHandle, PersistenceStore, PersistenceTraceId,
1156    };
1157    use anyhow::Result;
1158    use async_trait::async_trait;
1159    use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
1160    use std::sync::Arc;
1161    use tokio::sync::Mutex;
1162    use uuid::Uuid;
1163
1164    #[test]
1165    fn inspector_enabled_flag_matrix() {
1166        assert!(inspector_enabled_from_value(None));
1167        assert!(inspector_enabled_from_value(Some("1")));
1168        assert!(inspector_enabled_from_value(Some("true")));
1169        assert!(inspector_enabled_from_value(Some("on")));
1170        assert!(!inspector_enabled_from_value(Some("0")));
1171        assert!(!inspector_enabled_from_value(Some("false")));
1172    }
1173
1174    #[test]
1175    fn inspector_dev_mode_matrix() {
1176        assert!(inspector_dev_mode_from_value(None));
1177        assert!(inspector_dev_mode_from_value(Some("dev")));
1178        assert!(inspector_dev_mode_from_value(Some("staging")));
1179        assert!(!inspector_dev_mode_from_value(Some("prod")));
1180        assert!(!inspector_dev_mode_from_value(Some("production")));
1181    }
1182
1183    #[test]
1184    fn adaptive_policy_force_export_matrix() {
1185        let next = Outcome::<(), &'static str>::Next(());
1186        let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
1187        let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
1188        let fault = Outcome::<(), &'static str>::Fault("boom");
1189
1190        assert!(!should_force_export(&next, "off"));
1191        assert!(!should_force_export(&fault, "off"));
1192
1193        assert!(!should_force_export(&branch, "fault_only"));
1194        assert!(should_force_export(&fault, "fault_only"));
1195
1196        assert!(should_force_export(&branch, "fault_branch"));
1197        assert!(!should_force_export(&emit, "fault_branch"));
1198        assert!(should_force_export(&fault, "fault_branch"));
1199
1200        assert!(should_force_export(&branch, "fault_branch_emit"));
1201        assert!(should_force_export(&emit, "fault_branch_emit"));
1202        assert!(should_force_export(&fault, "fault_branch_emit"));
1203    }
1204
1205    #[test]
1206    fn sampling_and_adaptive_combination_decisions() {
1207        let bus_id = Uuid::nil();
1208        let next = Outcome::<(), &'static str>::Next(());
1209        let fault = Outcome::<(), &'static str>::Fault("boom");
1210
1211        let sampled_never = sampled_by_bus_id(bus_id, 0.0);
1212        assert!(!sampled_never);
1213        assert!(!(sampled_never || should_force_export(&next, "off")));
1214        assert!(sampled_never || should_force_export(&fault, "fault_only"));
1215
1216        let sampled_always = sampled_by_bus_id(bus_id, 1.0);
1217        assert!(sampled_always);
1218        assert!(sampled_always || should_force_export(&next, "off"));
1219        assert!(sampled_always || should_force_export(&fault, "off"));
1220    }
1221
1222    #[derive(Clone)]
1223    struct AddOne;
1224
1225    #[async_trait]
1226    impl Transition<i32, i32> for AddOne {
1227        type Error = std::convert::Infallible;
1228        type Resources = ();
1229
1230        async fn run(
1231            &self,
1232            state: i32,
1233            _resources: &Self::Resources,
1234            _bus: &mut Bus,
1235        ) -> Outcome<i32, Self::Error> {
1236            Outcome::Next(state + 1)
1237        }
1238    }
1239
1240    #[derive(Clone)]
1241    struct AlwaysFault;
1242
1243    #[async_trait]
1244    impl Transition<i32, i32> for AlwaysFault {
1245        type Error = &'static str;
1246        type Resources = ();
1247
1248        async fn run(
1249            &self,
1250            _state: i32,
1251            _resources: &Self::Resources,
1252            _bus: &mut Bus,
1253        ) -> Outcome<i32, Self::Error> {
1254            Outcome::Fault("boom")
1255        }
1256    }
1257
1258    #[derive(Clone)]
1259    struct CapabilityGuarded;
1260
1261    #[async_trait]
1262    impl Transition<(), ()> for CapabilityGuarded {
1263        type Error = String;
1264        type Resources = ();
1265
1266        fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
1267            Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
1268        }
1269
1270        async fn run(
1271            &self,
1272            _state: (),
1273            _resources: &Self::Resources,
1274            bus: &mut Bus,
1275        ) -> Outcome<(), Self::Error> {
1276            match bus.get::<String>() {
1277                Ok(_) => Outcome::Next(()),
1278                Err(err) => Outcome::Fault(err.to_string()),
1279            }
1280        }
1281    }
1282
1283    #[derive(Clone)]
1284    struct RecordingCompensationHook {
1285        calls: Arc<Mutex<Vec<CompensationContext>>>,
1286        should_fail: bool,
1287    }
1288
1289    #[async_trait]
1290    impl CompensationHook for RecordingCompensationHook {
1291        async fn compensate(&self, context: CompensationContext) -> Result<()> {
1292            self.calls.lock().await.push(context);
1293            if self.should_fail {
1294                return Err(anyhow::anyhow!("compensation failed"));
1295            }
1296            Ok(())
1297        }
1298    }
1299
1300    #[derive(Clone)]
1301    struct FlakyCompensationHook {
1302        calls: Arc<Mutex<u32>>,
1303        failures_remaining: Arc<Mutex<u32>>,
1304    }
1305
1306    #[async_trait]
1307    impl CompensationHook for FlakyCompensationHook {
1308        async fn compensate(&self, _context: CompensationContext) -> Result<()> {
1309            {
1310                let mut calls = self.calls.lock().await;
1311                *calls += 1;
1312            }
1313            let mut failures_remaining = self.failures_remaining.lock().await;
1314            if *failures_remaining > 0 {
1315                *failures_remaining -= 1;
1316                return Err(anyhow::anyhow!("transient compensation failure"));
1317            }
1318            Ok(())
1319        }
1320    }
1321
1322    #[derive(Clone)]
1323    struct FailingCompensationIdempotencyStore {
1324        read_calls: Arc<Mutex<u32>>,
1325        write_calls: Arc<Mutex<u32>>,
1326    }
1327
1328    #[async_trait]
1329    impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
1330        async fn was_compensated(&self, _key: &str) -> Result<bool> {
1331            let mut read_calls = self.read_calls.lock().await;
1332            *read_calls += 1;
1333            Err(anyhow::anyhow!("forced idempotency read failure"))
1334        }
1335
1336        async fn mark_compensated(&self, _key: &str) -> Result<()> {
1337            let mut write_calls = self.write_calls.lock().await;
1338            *write_calls += 1;
1339            Err(anyhow::anyhow!("forced idempotency write failure"))
1340        }
1341    }
1342
1343    #[tokio::test]
1344    async fn execute_persists_success_trace_when_handle_exists() {
1345        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1346        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1347
1348        let mut bus = Bus::new();
1349        bus.insert(PersistenceHandle::from_arc(store_dyn));
1350        bus.insert(PersistenceTraceId::new("trace-success"));
1351
1352        let axon = Axon::<i32, i32, std::convert::Infallible>::start("PersistSuccess").then(AddOne);
1353        let outcome = axon.execute(41, &(), &mut bus).await;
1354        assert!(matches!(outcome, Outcome::Next(42)));
1355
1356        let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
1357        assert_eq!(persisted.events.len(), 2);
1358        assert_eq!(persisted.events[0].outcome_kind, "Enter");
1359        assert_eq!(persisted.events[1].outcome_kind, "Next");
1360        assert_eq!(persisted.completion, Some(CompletionState::Success));
1361    }
1362
1363    #[tokio::test]
1364    async fn execute_persists_fault_completion_state() {
1365        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1366        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1367
1368        let mut bus = Bus::new();
1369        bus.insert(PersistenceHandle::from_arc(store_dyn));
1370        bus.insert(PersistenceTraceId::new("trace-fault"));
1371
1372        let axon = Axon::<i32, i32, &'static str>::start("PersistFault").then(AlwaysFault);
1373        let outcome = axon.execute(41, &(), &mut bus).await;
1374        assert!(matches!(outcome, Outcome::Fault("boom")));
1375
1376        let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
1377        assert_eq!(persisted.events.len(), 2);
1378        assert_eq!(persisted.events[1].outcome_kind, "Fault");
1379        assert_eq!(persisted.completion, Some(CompletionState::Fault));
1380    }
1381
1382    #[tokio::test]
1383    async fn execute_respects_persistence_auto_complete_off() {
1384        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1385        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1386
1387        let mut bus = Bus::new();
1388        bus.insert(PersistenceHandle::from_arc(store_dyn));
1389        bus.insert(PersistenceTraceId::new("trace-no-complete"));
1390        bus.insert(PersistenceAutoComplete(false));
1391
1392        let axon =
1393            Axon::<i32, i32, std::convert::Infallible>::start("PersistNoComplete").then(AddOne);
1394        let outcome = axon.execute(1, &(), &mut bus).await;
1395        assert!(matches!(outcome, Outcome::Next(2)));
1396
1397        let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
1398        assert_eq!(persisted.events.len(), 2);
1399        assert_eq!(persisted.completion, None);
1400    }
1401
1402    #[tokio::test]
1403    async fn fault_triggers_compensation_and_marks_compensated() {
1404        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1405        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1406        let calls = Arc::new(Mutex::new(Vec::new()));
1407        let compensation = RecordingCompensationHook {
1408            calls: calls.clone(),
1409            should_fail: false,
1410        };
1411
1412        let mut bus = Bus::new();
1413        bus.insert(PersistenceHandle::from_arc(store_dyn));
1414        bus.insert(PersistenceTraceId::new("trace-compensated"));
1415        bus.insert(CompensationHandle::from_hook(compensation));
1416
1417        let axon = Axon::<i32, i32, &'static str>::start("CompensatedFault").then(AlwaysFault);
1418        let outcome = axon.execute(7, &(), &mut bus).await;
1419        assert!(matches!(outcome, Outcome::Fault("boom")));
1420
1421        let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
1422        assert_eq!(persisted.events.len(), 3);
1423        assert_eq!(persisted.events[0].outcome_kind, "Enter");
1424        assert_eq!(persisted.events[1].outcome_kind, "Fault");
1425        assert_eq!(persisted.events[2].outcome_kind, "Compensated");
1426        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
1427
1428        let recorded = calls.lock().await;
1429        assert_eq!(recorded.len(), 1);
1430        assert_eq!(recorded[0].trace_id, "trace-compensated");
1431        assert_eq!(recorded[0].fault_kind, "Fault");
1432    }
1433
1434    #[tokio::test]
1435    async fn failed_compensation_keeps_fault_completion() {
1436        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1437        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1438        let calls = Arc::new(Mutex::new(Vec::new()));
1439        let compensation = RecordingCompensationHook {
1440            calls: calls.clone(),
1441            should_fail: true,
1442        };
1443
1444        let mut bus = Bus::new();
1445        bus.insert(PersistenceHandle::from_arc(store_dyn));
1446        bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
1447        bus.insert(CompensationHandle::from_hook(compensation));
1448
1449        let axon = Axon::<i32, i32, &'static str>::start("CompensationFails").then(AlwaysFault);
1450        let outcome = axon.execute(7, &(), &mut bus).await;
1451        assert!(matches!(outcome, Outcome::Fault("boom")));
1452
1453        let persisted = store_impl
1454            .load("trace-compensation-failed")
1455            .await
1456            .unwrap()
1457            .unwrap();
1458        assert_eq!(persisted.events.len(), 2);
1459        assert_eq!(persisted.events[1].outcome_kind, "Fault");
1460        assert_eq!(persisted.completion, Some(CompletionState::Fault));
1461
1462        let recorded = calls.lock().await;
1463        assert_eq!(recorded.len(), 1);
1464    }
1465
1466    #[tokio::test]
1467    async fn compensation_retry_policy_succeeds_after_retries() {
1468        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1469        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1470        let calls = Arc::new(Mutex::new(0u32));
1471        let failures_remaining = Arc::new(Mutex::new(2u32));
1472        let compensation = FlakyCompensationHook {
1473            calls: calls.clone(),
1474            failures_remaining,
1475        };
1476
1477        let mut bus = Bus::new();
1478        bus.insert(PersistenceHandle::from_arc(store_dyn));
1479        bus.insert(PersistenceTraceId::new("trace-retry-success"));
1480        bus.insert(CompensationHandle::from_hook(compensation));
1481        bus.insert(CompensationRetryPolicy {
1482            max_attempts: 3,
1483            backoff_ms: 0,
1484        });
1485
1486        let axon = Axon::<i32, i32, &'static str>::start("CompensationRetry").then(AlwaysFault);
1487        let outcome = axon.execute(7, &(), &mut bus).await;
1488        assert!(matches!(outcome, Outcome::Fault("boom")));
1489
1490        let persisted = store_impl
1491            .load("trace-retry-success")
1492            .await
1493            .unwrap()
1494            .unwrap();
1495        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
1496        assert_eq!(
1497            persisted.events.last().map(|e| e.outcome_kind.as_str()),
1498            Some("Compensated")
1499        );
1500
1501        let attempt_count = calls.lock().await;
1502        assert_eq!(*attempt_count, 3);
1503    }
1504
1505    #[tokio::test]
1506    async fn compensation_idempotency_skips_duplicate_hook_execution() {
1507        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1508        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1509        let calls = Arc::new(Mutex::new(Vec::new()));
1510        let compensation = RecordingCompensationHook {
1511            calls: calls.clone(),
1512            should_fail: false,
1513        };
1514        let idempotency = InMemoryCompensationIdempotencyStore::new();
1515
1516        let mut bus = Bus::new();
1517        bus.insert(PersistenceHandle::from_arc(store_dyn));
1518        bus.insert(PersistenceTraceId::new("trace-idempotent"));
1519        bus.insert(PersistenceAutoComplete(false));
1520        bus.insert(CompensationHandle::from_hook(compensation));
1521        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
1522
1523        let axon =
1524            Axon::<i32, i32, &'static str>::start("CompensationIdempotency").then(AlwaysFault);
1525
1526        let outcome1 = axon.execute(7, &(), &mut bus).await;
1527        let outcome2 = axon.execute(8, &(), &mut bus).await;
1528        assert!(matches!(outcome1, Outcome::Fault("boom")));
1529        assert!(matches!(outcome2, Outcome::Fault("boom")));
1530
1531        let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
1532        assert_eq!(persisted.completion, None);
1533        assert_eq!(persisted.events.len(), 6);
1534        assert_eq!(persisted.events[2].outcome_kind, "Compensated");
1535        assert_eq!(persisted.events[5].outcome_kind, "Compensated");
1536
1537        let recorded = calls.lock().await;
1538        assert_eq!(recorded.len(), 1);
1539    }
1540
1541    #[tokio::test]
1542    async fn compensation_idempotency_store_failure_does_not_block_compensation() {
1543        let store_impl = Arc::new(InMemoryPersistenceStore::new());
1544        let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1545        let calls = Arc::new(Mutex::new(Vec::new()));
1546        let read_calls = Arc::new(Mutex::new(0u32));
1547        let write_calls = Arc::new(Mutex::new(0u32));
1548        let compensation = RecordingCompensationHook {
1549            calls: calls.clone(),
1550            should_fail: false,
1551        };
1552        let idempotency = FailingCompensationIdempotencyStore {
1553            read_calls: read_calls.clone(),
1554            write_calls: write_calls.clone(),
1555        };
1556
1557        let mut bus = Bus::new();
1558        bus.insert(PersistenceHandle::from_arc(store_dyn));
1559        bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
1560        bus.insert(CompensationHandle::from_hook(compensation));
1561        bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
1562
1563        let axon =
1564            Axon::<i32, i32, &'static str>::start("IdempotencyStoreFailure").then(AlwaysFault);
1565        let outcome = axon.execute(9, &(), &mut bus).await;
1566        assert!(matches!(outcome, Outcome::Fault("boom")));
1567
1568        let persisted = store_impl
1569            .load("trace-idempotency-store-failure")
1570            .await
1571            .unwrap()
1572            .unwrap();
1573        assert_eq!(persisted.completion, Some(CompletionState::Compensated));
1574        assert_eq!(
1575            persisted.events.last().map(|e| e.outcome_kind.as_str()),
1576            Some("Compensated")
1577        );
1578
1579        let recorded = calls.lock().await;
1580        assert_eq!(recorded.len(), 1);
1581        assert_eq!(*read_calls.lock().await, 1);
1582        assert_eq!(*write_calls.lock().await, 1);
1583    }
1584
1585    #[tokio::test]
1586    async fn transition_bus_policy_blocks_unauthorized_resource_access() {
1587        let mut bus = Bus::new();
1588        bus.insert(1_i32);
1589        bus.insert("secret".to_string());
1590
1591        let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
1592        let outcome = axon.execute((), &(), &mut bus).await;
1593
1594        match outcome {
1595            Outcome::Fault(msg) => {
1596                assert!(msg.contains("Bus access denied"), "{msg}");
1597                assert!(msg.contains("CapabilityGuarded"), "{msg}");
1598                assert!(msg.contains("alloc::string::String"), "{msg}");
1599            }
1600            other => panic!("expected fault, got {other:?}"),
1601        }
1602    }
1603}