1use crate::persistence::{
15 CompensationAutoTrigger, CompensationContext, CompensationHandle,
16 CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17 PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use ranvier_core::bus::Bus;
20use ranvier_core::outcome::Outcome;
21use ranvier_core::schematic::{
22 BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
23};
24use ranvier_core::timeline::{Timeline, TimelineEvent};
25use ranvier_core::transition::Transition;
26use std::any::type_name;
27use std::ffi::OsString;
28use std::fs;
29use std::future::Future;
30use std::panic::Location;
31use std::path::{Path, PathBuf};
32use std::pin::Pin;
33use std::sync::{Arc, Mutex, OnceLock};
34use std::time::{SystemTime, UNIX_EPOCH};
35use tracing::Instrument;
36
37pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
39
40pub type Executor<In, Out, E, Res> =
44 Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
45
46fn type_name_of<T: ?Sized>() -> String {
48 let full = type_name::<T>();
49 full.split("::").last().unwrap_or(full).to_string()
50}
51
52pub struct Axon<In, Out, E, Res = ()> {
72 pub schematic: Schematic,
74 executor: Executor<In, Out, E, Res>,
76}
77
78#[derive(Debug, Clone)]
80pub struct SchematicExportRequest {
81 pub output: Option<PathBuf>,
83}
84
85impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
86 fn clone(&self) -> Self {
87 Self {
88 schematic: self.schematic.clone(),
89 executor: self.executor.clone(),
90 }
91 }
92}
93
94impl<In, E, Res> Axon<In, In, E, Res>
95where
96 In: Send + Sync + 'static,
97 E: Send + 'static,
98 Res: ranvier_core::transition::ResourceRequirement,
99{
100 #[track_caller]
103 pub fn new(label: &str) -> Self {
104 let caller = Location::caller();
105 Self::start_with_source(label, caller)
106 }
107
108 #[track_caller]
111 pub fn start(label: &str) -> Self {
112 let caller = Location::caller();
113 Self::start_with_source(label, caller)
114 }
115
116 fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
117 let node_id = uuid::Uuid::new_v4().to_string();
118 let node = Node {
119 id: node_id,
120 kind: NodeKind::Ingress,
121 label: label.to_string(),
122 description: None,
123 input_type: "void".to_string(),
124 output_type: type_name_of::<In>(),
125 resource_type: type_name_of::<Res>(),
126 metadata: Default::default(),
127 bus_capability: None,
128 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
129 };
130
131 let mut schematic = Schematic::new(label);
132 schematic.nodes.push(node);
133
134 let executor: Executor<In, In, E, Res> =
135 Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
136
137 Self {
138 schematic,
139 executor,
140 }
141 }
142}
143
144impl<In, Out, E, Res> Axon<In, Out, E, Res>
145where
146 In: Send + Sync + 'static,
147 Out: Send + Sync + 'static,
148 E: Send + 'static,
149 Res: ranvier_core::transition::ResourceRequirement,
150{
151 #[track_caller]
155 pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
156 where
157 Next: Send + Sync + 'static,
158 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
159 {
160 let caller = Location::caller();
161 let Axon {
163 mut schematic,
164 executor: prev_executor,
165 } = self;
166
167 let next_node_id = uuid::Uuid::new_v4().to_string();
169 let next_node = Node {
170 id: next_node_id.clone(),
171 kind: NodeKind::Atom,
172 label: transition.label(),
173 description: transition.description(),
174 input_type: type_name_of::<Out>(),
175 output_type: type_name_of::<Next>(),
176 resource_type: type_name_of::<Res>(),
177 metadata: Default::default(),
178 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
179 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
180 };
181
182 let last_node_id = schematic
183 .nodes
184 .last()
185 .map(|n| n.id.clone())
186 .unwrap_or_default();
187
188 schematic.nodes.push(next_node);
189 schematic.edges.push(Edge {
190 from: last_node_id,
191 to: next_node_id.clone(),
192 kind: EdgeType::Linear,
193 label: Some("Next".to_string()),
194 });
195
196 let node_id_for_exec = next_node_id.clone();
198 let node_label_for_exec = transition.label();
199 let bus_policy_for_exec = transition.bus_access_policy();
200 let next_executor: Executor<In, Next, E, Res> = Arc::new(
201 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
202 let prev = prev_executor.clone();
203 let trans = transition.clone();
204 let timeline_node_id = node_id_for_exec.clone();
205 let timeline_node_label = node_label_for_exec.clone();
206 let transition_bus_policy = bus_policy_for_exec.clone();
207
208 Box::pin(async move {
209 let prev_result = prev(input, res, bus).await;
211
212 let state = match prev_result {
214 Outcome::Next(t) => t,
215 other => return other.map(|_| unreachable!()),
216 };
217
218 let label = trans.label();
220 let res_type = std::any::type_name::<Res>()
221 .split("::")
222 .last()
223 .unwrap_or("unknown");
224
225 let enter_ts = now_ms();
226 if let Some(timeline) = bus.read_mut::<Timeline>() {
227 timeline.push(TimelineEvent::NodeEnter {
228 node_id: timeline_node_id.clone(),
229 node_label: timeline_node_label.clone(),
230 timestamp: enter_ts,
231 });
232 }
233
234 let node_span = tracing::info_span!(
235 "Node",
236 ranvier.node = %label,
237 ranvier.resource_type = %res_type,
238 ranvier.outcome_kind = tracing::field::Empty,
239 ranvier.outcome_target = tracing::field::Empty
240 );
241 let started = std::time::Instant::now();
242 bus.set_access_policy(label.clone(), transition_bus_policy.clone());
243 let result = trans
244 .run(state, res, bus)
245 .instrument(node_span.clone())
246 .await;
247 bus.clear_access_policy();
248 node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
249 if let Some(target) = outcome_target(&result) {
250 node_span
251 .record("ranvier.outcome_target", tracing::field::display(&target));
252 }
253 let duration_ms = started.elapsed().as_millis() as u64;
254 let exit_ts = now_ms();
255
256 if let Some(timeline) = bus.read_mut::<Timeline>() {
257 timeline.push(TimelineEvent::NodeExit {
258 node_id: timeline_node_id.clone(),
259 outcome_type: outcome_type_name(&result),
260 duration_ms,
261 timestamp: exit_ts,
262 });
263
264 if let Outcome::Branch(branch_id, _) = &result {
265 timeline.push(TimelineEvent::Branchtaken {
266 branch_id: branch_id.clone(),
267 timestamp: exit_ts,
268 });
269 }
270 }
271
272 result
273 })
274 },
275 );
276
277 Axon {
278 schematic,
279 executor: next_executor,
280 }
281 }
282
283 #[track_caller]
285 pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
286 let caller = Location::caller();
287 let branch_id_str = branch_id.into();
288 let last_node_id = self
289 .schematic
290 .nodes
291 .last()
292 .map(|n| n.id.clone())
293 .unwrap_or_default();
294
295 let branch_node = Node {
296 id: uuid::Uuid::new_v4().to_string(),
297 kind: NodeKind::Synapse,
298 label: label.to_string(),
299 description: None,
300 input_type: type_name_of::<Out>(),
301 output_type: type_name_of::<Out>(),
302 resource_type: type_name_of::<Res>(),
303 metadata: Default::default(),
304 bus_capability: None,
305 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
306 };
307
308 self.schematic.nodes.push(branch_node);
309 self.schematic.edges.push(Edge {
310 from: last_node_id,
311 to: branch_id_str.clone(),
312 kind: EdgeType::Branch(branch_id_str),
313 label: Some("Branch".to_string()),
314 });
315
316 self
317 }
318
319 pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
321 let trace_id = persistence_trace_id(bus);
322 let label = self.schematic.name.clone();
323 let persistence_handle = bus.read::<PersistenceHandle>().cloned();
324 let compensation_handle = bus.read::<CompensationHandle>().cloned();
325 let compensation_retry_policy = compensation_retry_policy(bus);
326 let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
327 let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
328 let start_step = next_persistence_step(handle, &trace_id).await;
329 persist_execution_event(handle, &trace_id, &label, start_step, "Enter").await;
330 Some(start_step)
331 } else {
332 None
333 };
334
335 let should_capture = should_attach_timeline(bus);
336 let inserted_timeline = if should_capture {
337 ensure_timeline(bus)
338 } else {
339 false
340 };
341 let ingress_started = std::time::Instant::now();
342 let ingress_enter_ts = now_ms();
343 if should_capture
344 && let (Some(timeline), Some(ingress)) =
345 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
346 {
347 timeline.push(TimelineEvent::NodeEnter {
348 node_id: ingress.id.clone(),
349 node_label: ingress.label.clone(),
350 timestamp: ingress_enter_ts,
351 });
352 }
353
354 let circuit_span = tracing::info_span!(
355 "Circuit",
356 ranvier.circuit = %label,
357 ranvier.outcome_kind = tracing::field::Empty,
358 ranvier.outcome_target = tracing::field::Empty
359 );
360 let outcome = (self.executor)(input, resources, bus)
361 .instrument(circuit_span.clone())
362 .await;
363 circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
364 if let Some(target) = outcome_target(&outcome) {
365 circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
366 }
367
368 let ingress_exit_ts = now_ms();
369 if should_capture
370 && let (Some(timeline), Some(ingress)) =
371 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
372 {
373 timeline.push(TimelineEvent::NodeExit {
374 node_id: ingress.id.clone(),
375 outcome_type: outcome_type_name(&outcome),
376 duration_ms: ingress_started.elapsed().as_millis() as u64,
377 timestamp: ingress_exit_ts,
378 });
379 }
380
381 if let Some(handle) = persistence_handle.as_ref() {
382 let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
383 persist_execution_event(
384 handle,
385 &trace_id,
386 &label,
387 fault_step,
388 outcome_kind_name(&outcome),
389 )
390 .await;
391
392 let mut completion = completion_from_outcome(&outcome);
393 if matches!(outcome, Outcome::Fault(_))
394 && let Some(compensation) = compensation_handle.as_ref()
395 && compensation_auto_trigger(bus)
396 {
397 let context = CompensationContext {
398 trace_id: trace_id.clone(),
399 circuit: label.clone(),
400 fault_kind: outcome_kind_name(&outcome).to_string(),
401 fault_step,
402 timestamp_ms: now_ms(),
403 };
404
405 if run_compensation(
406 compensation,
407 context,
408 compensation_retry_policy,
409 compensation_idempotency.clone(),
410 )
411 .await
412 {
413 persist_execution_event(
414 handle,
415 &trace_id,
416 &label,
417 fault_step.saturating_add(1),
418 "Compensated",
419 )
420 .await;
421 completion = CompletionState::Compensated;
422 }
423 }
424
425 if persistence_auto_complete(bus) {
426 persist_completion(handle, &trace_id, completion).await;
427 }
428 }
429
430 if should_capture {
431 maybe_export_timeline(bus, &outcome);
432 }
433 if inserted_timeline {
434 let _ = bus.remove::<Timeline>();
435 }
436
437 outcome
438 }
439
440 pub fn serve_inspector(self, port: u16) -> Self {
443 if !inspector_dev_mode_from_env() {
444 tracing::info!("Inspector disabled because RANVIER_MODE is production");
445 return self;
446 }
447 if !inspector_enabled_from_env() {
448 tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
449 return self;
450 }
451
452 let schematic = self.schematic.clone();
453 tokio::spawn(async move {
454 if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
455 .with_projection_files_from_env()
456 .with_mode_from_env()
457 .with_auth_policy_from_env()
458 .serve()
459 .await
460 {
461 tracing::error!("Inspector server failed: {}", e);
462 }
463 });
464 self
465 }
466
467 pub fn schematic(&self) -> &Schematic {
469 &self.schematic
470 }
471
472 pub fn into_schematic(self) -> Schematic {
474 self.schematic
475 }
476
477 pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
488 schematic_export_request_from_process()
489 }
490
491 pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
503 self.maybe_export_and_exit_with(|_| ())
504 }
505
506 pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
511 where
512 F: FnOnce(&SchematicExportRequest),
513 {
514 let Some(request) = self.schematic_export_request() else {
515 return Ok(false);
516 };
517 on_before_exit(&request);
518 self.export_schematic(&request)?;
519 Ok(true)
520 }
521
522 pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
524 let json = serde_json::to_string_pretty(self.schematic())?;
525 if let Some(path) = &request.output {
526 if let Some(parent) = path.parent() {
527 if !parent.as_os_str().is_empty() {
528 fs::create_dir_all(parent)?;
529 }
530 }
531 fs::write(path, json.as_bytes())?;
532 return Ok(());
533 }
534 println!("{}", json);
535 Ok(())
536 }
537}
538
539fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
540 let args: Vec<OsString> = std::env::args_os().skip(1).collect();
541 let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
542 let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
543
544 let mut i = 0;
545 while i < args.len() {
546 let arg = args[i].to_string_lossy();
547
548 if arg == "--schematic" {
549 enabled = true;
550 i += 1;
551 continue;
552 }
553
554 if arg == "--schematic-output" || arg == "--output" {
555 if let Some(next) = args.get(i + 1) {
556 output = Some(PathBuf::from(next));
557 i += 2;
558 continue;
559 }
560 } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
561 output = Some(PathBuf::from(value));
562 } else if let Some(value) = arg.strip_prefix("--output=") {
563 output = Some(PathBuf::from(value));
564 }
565
566 i += 1;
567 }
568
569 if enabled {
570 Some(SchematicExportRequest { output })
571 } else {
572 None
573 }
574}
575
576fn env_flag_is_true(key: &str) -> bool {
577 match std::env::var(key) {
578 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
579 Err(_) => false,
580 }
581}
582
583fn inspector_enabled_from_env() -> bool {
584 let raw = std::env::var("RANVIER_INSPECTOR").ok();
585 inspector_enabled_from_value(raw.as_deref())
586}
587
588fn inspector_enabled_from_value(value: Option<&str>) -> bool {
589 match value {
590 Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
591 None => true,
592 }
593}
594
595fn inspector_dev_mode_from_env() -> bool {
596 let raw = std::env::var("RANVIER_MODE").ok();
597 inspector_dev_mode_from_value(raw.as_deref())
598}
599
600fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
601 !matches!(
602 value.map(|v| v.to_ascii_lowercase()),
603 Some(mode) if mode == "prod" || mode == "production"
604 )
605}
606
607fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
608 let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
609 Ok(v) if !v.trim().is_empty() => v,
610 _ => return,
611 };
612
613 let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
614 let policy = timeline_adaptive_policy();
615 let forced = should_force_export(outcome, &policy);
616 let should_export = sampled || forced;
617 if !should_export {
618 record_sampling_stats(false, sampled, forced, "none", &policy);
619 return;
620 }
621
622 let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
623 timeline.sort();
624
625 let mode = std::env::var("RANVIER_TIMELINE_MODE")
626 .unwrap_or_else(|_| "overwrite".to_string())
627 .to_ascii_lowercase();
628
629 if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
630 tracing::warn!(
631 "Failed to persist timeline file {} (mode={}): {}",
632 path,
633 mode,
634 err
635 );
636 record_sampling_stats(false, sampled, forced, &mode, &policy);
637 } else {
638 record_sampling_stats(true, sampled, forced, &mode, &policy);
639 }
640}
641
642fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
643 match outcome {
644 Outcome::Next(_) => "Next".to_string(),
645 Outcome::Branch(id, _) => format!("Branch:{}", id),
646 Outcome::Jump(id, _) => format!("Jump:{}", id),
647 Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
648 Outcome::Fault(_) => "Fault".to_string(),
649 }
650}
651
652fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
653 match outcome {
654 Outcome::Next(_) => "Next",
655 Outcome::Branch(_, _) => "Branch",
656 Outcome::Jump(_, _) => "Jump",
657 Outcome::Emit(_, _) => "Emit",
658 Outcome::Fault(_) => "Fault",
659 }
660}
661
662fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
663 match outcome {
664 Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
665 Outcome::Jump(node_id, _) => Some(node_id.to_string()),
666 Outcome::Emit(event_type, _) => Some(event_type.clone()),
667 Outcome::Next(_) | Outcome::Fault(_) => None,
668 }
669}
670
671fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
672 match outcome {
673 Outcome::Fault(_) => CompletionState::Fault,
674 _ => CompletionState::Success,
675 }
676}
677
678fn persistence_trace_id(bus: &Bus) -> String {
679 if let Some(explicit) = bus.read::<PersistenceTraceId>() {
680 explicit.0.clone()
681 } else {
682 format!("{}:{}", bus.id, now_ms())
683 }
684}
685
686fn persistence_auto_complete(bus: &Bus) -> bool {
687 bus.read::<PersistenceAutoComplete>()
688 .map(|v| v.0)
689 .unwrap_or(true)
690}
691
692fn compensation_auto_trigger(bus: &Bus) -> bool {
693 bus.read::<CompensationAutoTrigger>()
694 .map(|v| v.0)
695 .unwrap_or(true)
696}
697
698fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
699 bus.read::<CompensationRetryPolicy>()
700 .copied()
701 .unwrap_or_default()
702}
703
704async fn next_persistence_step(handle: &PersistenceHandle, trace_id: &str) -> u64 {
705 let store = handle.store();
706 match store.load(trace_id).await {
707 Ok(Some(trace)) => trace
708 .events
709 .last()
710 .map(|event| event.step.saturating_add(1))
711 .unwrap_or(0),
712 Ok(None) => 0,
713 Err(err) => {
714 tracing::warn!(
715 trace_id = %trace_id,
716 error = %err,
717 "Failed to load persistence trace; falling back to step=0"
718 );
719 0
720 }
721 }
722}
723
724async fn persist_execution_event(
725 handle: &PersistenceHandle,
726 trace_id: &str,
727 circuit: &str,
728 step: u64,
729 outcome_kind: &str,
730) {
731 let store = handle.store();
732 let envelope = PersistenceEnvelope {
733 trace_id: trace_id.to_string(),
734 circuit: circuit.to_string(),
735 step,
736 outcome_kind: outcome_kind.to_string(),
737 timestamp_ms: now_ms(),
738 payload_hash: None,
739 };
740
741 if let Err(err) = store.append(envelope).await {
742 tracing::warn!(
743 trace_id = %trace_id,
744 circuit = %circuit,
745 step,
746 outcome_kind = %outcome_kind,
747 error = %err,
748 "Failed to append persistence envelope"
749 );
750 }
751}
752
753async fn persist_completion(
754 handle: &PersistenceHandle,
755 trace_id: &str,
756 completion: CompletionState,
757) {
758 let store = handle.store();
759 if let Err(err) = store.complete(trace_id, completion).await {
760 tracing::warn!(
761 trace_id = %trace_id,
762 error = %err,
763 "Failed to complete persistence trace"
764 );
765 }
766}
767
768fn compensation_idempotency_key(context: &CompensationContext) -> String {
769 format!(
770 "{}:{}:{}",
771 context.trace_id, context.circuit, context.fault_kind
772 )
773}
774
775async fn run_compensation(
776 handle: &CompensationHandle,
777 context: CompensationContext,
778 retry_policy: CompensationRetryPolicy,
779 idempotency: Option<CompensationIdempotencyHandle>,
780) -> bool {
781 let hook = handle.hook();
782 let key = compensation_idempotency_key(&context);
783
784 if let Some(store_handle) = idempotency.as_ref() {
785 let store = store_handle.store();
786 match store.was_compensated(&key).await {
787 Ok(true) => {
788 tracing::info!(
789 trace_id = %context.trace_id,
790 circuit = %context.circuit,
791 key = %key,
792 "Compensation already recorded; skipping duplicate hook execution"
793 );
794 return true;
795 }
796 Ok(false) => {}
797 Err(err) => {
798 tracing::warn!(
799 trace_id = %context.trace_id,
800 key = %key,
801 error = %err,
802 "Failed to check compensation idempotency state"
803 );
804 }
805 }
806 }
807
808 let max_attempts = retry_policy.max_attempts.max(1);
809 for attempt in 1..=max_attempts {
810 match hook.compensate(context.clone()).await {
811 Ok(()) => {
812 if let Some(store_handle) = idempotency.as_ref() {
813 let store = store_handle.store();
814 if let Err(err) = store.mark_compensated(&key).await {
815 tracing::warn!(
816 trace_id = %context.trace_id,
817 key = %key,
818 error = %err,
819 "Failed to mark compensation idempotency state"
820 );
821 }
822 }
823 return true;
824 }
825 Err(err) => {
826 let is_last = attempt == max_attempts;
827 tracing::warn!(
828 trace_id = %context.trace_id,
829 circuit = %context.circuit,
830 fault_kind = %context.fault_kind,
831 fault_step = context.fault_step,
832 attempt,
833 max_attempts,
834 error = %err,
835 "Compensation hook attempt failed"
836 );
837 if !is_last && retry_policy.backoff_ms > 0 {
838 tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
839 .await;
840 }
841 }
842 }
843 }
844 false
845}
846
847fn ensure_timeline(bus: &mut Bus) -> bool {
848 if bus.has::<Timeline>() {
849 false
850 } else {
851 bus.insert(Timeline::new());
852 true
853 }
854}
855
856fn should_attach_timeline(bus: &Bus) -> bool {
857 if bus.has::<Timeline>() {
859 return true;
860 }
861
862 has_timeline_output_path()
864}
865
866fn has_timeline_output_path() -> bool {
867 std::env::var("RANVIER_TIMELINE_OUTPUT")
868 .ok()
869 .map(|v| !v.trim().is_empty())
870 .unwrap_or(false)
871}
872
873fn timeline_sample_rate() -> f64 {
874 std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
875 .ok()
876 .and_then(|v| v.parse::<f64>().ok())
877 .map(|v| v.clamp(0.0, 1.0))
878 .unwrap_or(1.0)
879}
880
881fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
882 if rate <= 0.0 {
883 return false;
884 }
885 if rate >= 1.0 {
886 return true;
887 }
888 let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
889 bucket < rate
890}
891
892fn timeline_adaptive_policy() -> String {
893 std::env::var("RANVIER_TIMELINE_ADAPTIVE")
894 .unwrap_or_else(|_| "fault_branch".to_string())
895 .to_ascii_lowercase()
896}
897
898fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
899 match policy {
900 "off" => false,
901 "fault_only" => matches!(outcome, Outcome::Fault(_)),
902 "fault_branch_emit" => {
903 matches!(
904 outcome,
905 Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
906 )
907 }
908 _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
909 }
910}
911
912#[derive(Default, Clone)]
913struct SamplingStats {
914 total_decisions: u64,
915 exported: u64,
916 skipped: u64,
917 sampled_exports: u64,
918 forced_exports: u64,
919 last_mode: String,
920 last_policy: String,
921 last_updated_ms: u64,
922}
923
924static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
925
926fn stats_cell() -> &'static Mutex<SamplingStats> {
927 TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
928}
929
930fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
931 let snapshot = {
932 let mut stats = match stats_cell().lock() {
933 Ok(guard) => guard,
934 Err(_) => return,
935 };
936
937 stats.total_decisions += 1;
938 if exported {
939 stats.exported += 1;
940 } else {
941 stats.skipped += 1;
942 }
943 if sampled && exported {
944 stats.sampled_exports += 1;
945 }
946 if forced && exported {
947 stats.forced_exports += 1;
948 }
949 stats.last_mode = mode.to_string();
950 stats.last_policy = policy.to_string();
951 stats.last_updated_ms = now_ms();
952 stats.clone()
953 };
954
955 tracing::debug!(
956 ranvier.timeline.total_decisions = snapshot.total_decisions,
957 ranvier.timeline.exported = snapshot.exported,
958 ranvier.timeline.skipped = snapshot.skipped,
959 ranvier.timeline.sampled_exports = snapshot.sampled_exports,
960 ranvier.timeline.forced_exports = snapshot.forced_exports,
961 ranvier.timeline.mode = %snapshot.last_mode,
962 ranvier.timeline.policy = %snapshot.last_policy,
963 "Timeline sampling stats updated"
964 );
965
966 if let Some(path) = timeline_stats_output_path() {
967 let payload = serde_json::json!({
968 "total_decisions": snapshot.total_decisions,
969 "exported": snapshot.exported,
970 "skipped": snapshot.skipped,
971 "sampled_exports": snapshot.sampled_exports,
972 "forced_exports": snapshot.forced_exports,
973 "last_mode": snapshot.last_mode,
974 "last_policy": snapshot.last_policy,
975 "last_updated_ms": snapshot.last_updated_ms
976 });
977 if let Some(parent) = Path::new(&path).parent() {
978 let _ = fs::create_dir_all(parent);
979 }
980 if let Err(err) = fs::write(&path, payload.to_string()) {
981 tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
982 }
983 }
984}
985
986fn timeline_stats_output_path() -> Option<String> {
987 std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
988 .ok()
989 .filter(|v| !v.trim().is_empty())
990}
991
992fn write_timeline_with_policy(
993 path: &str,
994 mode: &str,
995 mut timeline: Timeline,
996) -> Result<(), String> {
997 match mode {
998 "append" => {
999 if Path::new(path).exists() {
1000 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
1001 match serde_json::from_str::<Timeline>(&content) {
1002 Ok(mut existing) => {
1003 existing.events.append(&mut timeline.events);
1004 existing.sort();
1005 if let Some(max_events) = max_events_limit() {
1006 truncate_timeline_events(&mut existing, max_events);
1007 }
1008 write_timeline_json(path, &existing)
1009 }
1010 Err(_) => {
1011 if let Some(max_events) = max_events_limit() {
1013 truncate_timeline_events(&mut timeline, max_events);
1014 }
1015 write_timeline_json(path, &timeline)
1016 }
1017 }
1018 } else {
1019 if let Some(max_events) = max_events_limit() {
1020 truncate_timeline_events(&mut timeline, max_events);
1021 }
1022 write_timeline_json(path, &timeline)
1023 }
1024 }
1025 "rotate" => {
1026 let rotated_path = rotated_path(path, now_ms());
1027 write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
1028 if let Some(keep) = rotate_keep_limit() {
1029 cleanup_rotated_files(path, keep)?;
1030 }
1031 Ok(())
1032 }
1033 _ => write_timeline_json(path, &timeline),
1034 }
1035}
1036
1037fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
1038 if let Some(parent) = Path::new(path).parent() {
1039 if !parent.as_os_str().is_empty() {
1040 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1041 }
1042 }
1043 let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
1044 fs::write(path, json).map_err(|e| e.to_string())
1045}
1046
1047fn rotated_path(path: &str, suffix: u64) -> PathBuf {
1048 let p = Path::new(path);
1049 let parent = p.parent().unwrap_or_else(|| Path::new(""));
1050 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
1051 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
1052 parent.join(format!("{}_{}.{}", stem, suffix, ext))
1053}
1054
1055fn max_events_limit() -> Option<usize> {
1056 std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
1057 .ok()
1058 .and_then(|v| v.parse::<usize>().ok())
1059 .filter(|v| *v > 0)
1060}
1061
1062fn rotate_keep_limit() -> Option<usize> {
1063 std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
1064 .ok()
1065 .and_then(|v| v.parse::<usize>().ok())
1066 .filter(|v| *v > 0)
1067}
1068
1069fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
1070 let len = timeline.events.len();
1071 if len > max_events {
1072 let keep_from = len - max_events;
1073 timeline.events = timeline.events.split_off(keep_from);
1074 }
1075}
1076
1077fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
1078 let p = Path::new(base_path);
1079 let parent = p.parent().unwrap_or_else(|| Path::new("."));
1080 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
1081 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
1082 let prefix = format!("{}_", stem);
1083 let suffix = format!(".{}", ext);
1084
1085 let mut files = fs::read_dir(parent)
1086 .map_err(|e| e.to_string())?
1087 .filter_map(|entry| entry.ok())
1088 .filter(|entry| {
1089 let name = entry.file_name();
1090 let name = name.to_string_lossy();
1091 name.starts_with(&prefix) && name.ends_with(&suffix)
1092 })
1093 .filter_map(|entry| {
1094 let modified = entry
1095 .metadata()
1096 .ok()
1097 .and_then(|m| m.modified().ok())
1098 .unwrap_or(SystemTime::UNIX_EPOCH);
1099 Some((entry.path(), modified))
1100 })
1101 .collect::<Vec<_>>();
1102
1103 files.sort_by(|a, b| b.1.cmp(&a.1));
1104 for (path, _) in files.into_iter().skip(keep) {
1105 let _ = fs::remove_file(path);
1106 }
1107 Ok(())
1108}
1109
1110fn bus_capability_schema_from_policy(
1111 policy: Option<ranvier_core::bus::BusAccessPolicy>,
1112) -> Option<BusCapabilitySchema> {
1113 let Some(policy) = policy else {
1114 return None;
1115 };
1116
1117 let mut allow = policy
1118 .allow
1119 .unwrap_or_default()
1120 .into_iter()
1121 .map(|entry| entry.type_name.to_string())
1122 .collect::<Vec<_>>();
1123 let mut deny = policy
1124 .deny
1125 .into_iter()
1126 .map(|entry| entry.type_name.to_string())
1127 .collect::<Vec<_>>();
1128 allow.sort();
1129 deny.sort();
1130
1131 if allow.is_empty() && deny.is_empty() {
1132 return None;
1133 }
1134
1135 Some(BusCapabilitySchema { allow, deny })
1136}
1137
1138fn now_ms() -> u64 {
1139 SystemTime::now()
1140 .duration_since(UNIX_EPOCH)
1141 .map(|d| d.as_millis() as u64)
1142 .unwrap_or(0)
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use super::{
1148 Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
1149 should_force_export,
1150 };
1151 use crate::persistence::{
1152 CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
1153 CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
1154 InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
1155 PersistenceHandle, PersistenceStore, PersistenceTraceId,
1156 };
1157 use anyhow::Result;
1158 use async_trait::async_trait;
1159 use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
1160 use std::sync::Arc;
1161 use tokio::sync::Mutex;
1162 use uuid::Uuid;
1163
1164 #[test]
1165 fn inspector_enabled_flag_matrix() {
1166 assert!(inspector_enabled_from_value(None));
1167 assert!(inspector_enabled_from_value(Some("1")));
1168 assert!(inspector_enabled_from_value(Some("true")));
1169 assert!(inspector_enabled_from_value(Some("on")));
1170 assert!(!inspector_enabled_from_value(Some("0")));
1171 assert!(!inspector_enabled_from_value(Some("false")));
1172 }
1173
1174 #[test]
1175 fn inspector_dev_mode_matrix() {
1176 assert!(inspector_dev_mode_from_value(None));
1177 assert!(inspector_dev_mode_from_value(Some("dev")));
1178 assert!(inspector_dev_mode_from_value(Some("staging")));
1179 assert!(!inspector_dev_mode_from_value(Some("prod")));
1180 assert!(!inspector_dev_mode_from_value(Some("production")));
1181 }
1182
1183 #[test]
1184 fn adaptive_policy_force_export_matrix() {
1185 let next = Outcome::<(), &'static str>::Next(());
1186 let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
1187 let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
1188 let fault = Outcome::<(), &'static str>::Fault("boom");
1189
1190 assert!(!should_force_export(&next, "off"));
1191 assert!(!should_force_export(&fault, "off"));
1192
1193 assert!(!should_force_export(&branch, "fault_only"));
1194 assert!(should_force_export(&fault, "fault_only"));
1195
1196 assert!(should_force_export(&branch, "fault_branch"));
1197 assert!(!should_force_export(&emit, "fault_branch"));
1198 assert!(should_force_export(&fault, "fault_branch"));
1199
1200 assert!(should_force_export(&branch, "fault_branch_emit"));
1201 assert!(should_force_export(&emit, "fault_branch_emit"));
1202 assert!(should_force_export(&fault, "fault_branch_emit"));
1203 }
1204
1205 #[test]
1206 fn sampling_and_adaptive_combination_decisions() {
1207 let bus_id = Uuid::nil();
1208 let next = Outcome::<(), &'static str>::Next(());
1209 let fault = Outcome::<(), &'static str>::Fault("boom");
1210
1211 let sampled_never = sampled_by_bus_id(bus_id, 0.0);
1212 assert!(!sampled_never);
1213 assert!(!(sampled_never || should_force_export(&next, "off")));
1214 assert!(sampled_never || should_force_export(&fault, "fault_only"));
1215
1216 let sampled_always = sampled_by_bus_id(bus_id, 1.0);
1217 assert!(sampled_always);
1218 assert!(sampled_always || should_force_export(&next, "off"));
1219 assert!(sampled_always || should_force_export(&fault, "off"));
1220 }
1221
1222 #[derive(Clone)]
1223 struct AddOne;
1224
1225 #[async_trait]
1226 impl Transition<i32, i32> for AddOne {
1227 type Error = std::convert::Infallible;
1228 type Resources = ();
1229
1230 async fn run(
1231 &self,
1232 state: i32,
1233 _resources: &Self::Resources,
1234 _bus: &mut Bus,
1235 ) -> Outcome<i32, Self::Error> {
1236 Outcome::Next(state + 1)
1237 }
1238 }
1239
1240 #[derive(Clone)]
1241 struct AlwaysFault;
1242
1243 #[async_trait]
1244 impl Transition<i32, i32> for AlwaysFault {
1245 type Error = &'static str;
1246 type Resources = ();
1247
1248 async fn run(
1249 &self,
1250 _state: i32,
1251 _resources: &Self::Resources,
1252 _bus: &mut Bus,
1253 ) -> Outcome<i32, Self::Error> {
1254 Outcome::Fault("boom")
1255 }
1256 }
1257
1258 #[derive(Clone)]
1259 struct CapabilityGuarded;
1260
1261 #[async_trait]
1262 impl Transition<(), ()> for CapabilityGuarded {
1263 type Error = String;
1264 type Resources = ();
1265
1266 fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
1267 Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
1268 }
1269
1270 async fn run(
1271 &self,
1272 _state: (),
1273 _resources: &Self::Resources,
1274 bus: &mut Bus,
1275 ) -> Outcome<(), Self::Error> {
1276 match bus.get::<String>() {
1277 Ok(_) => Outcome::Next(()),
1278 Err(err) => Outcome::Fault(err.to_string()),
1279 }
1280 }
1281 }
1282
1283 #[derive(Clone)]
1284 struct RecordingCompensationHook {
1285 calls: Arc<Mutex<Vec<CompensationContext>>>,
1286 should_fail: bool,
1287 }
1288
1289 #[async_trait]
1290 impl CompensationHook for RecordingCompensationHook {
1291 async fn compensate(&self, context: CompensationContext) -> Result<()> {
1292 self.calls.lock().await.push(context);
1293 if self.should_fail {
1294 return Err(anyhow::anyhow!("compensation failed"));
1295 }
1296 Ok(())
1297 }
1298 }
1299
1300 #[derive(Clone)]
1301 struct FlakyCompensationHook {
1302 calls: Arc<Mutex<u32>>,
1303 failures_remaining: Arc<Mutex<u32>>,
1304 }
1305
1306 #[async_trait]
1307 impl CompensationHook for FlakyCompensationHook {
1308 async fn compensate(&self, _context: CompensationContext) -> Result<()> {
1309 {
1310 let mut calls = self.calls.lock().await;
1311 *calls += 1;
1312 }
1313 let mut failures_remaining = self.failures_remaining.lock().await;
1314 if *failures_remaining > 0 {
1315 *failures_remaining -= 1;
1316 return Err(anyhow::anyhow!("transient compensation failure"));
1317 }
1318 Ok(())
1319 }
1320 }
1321
1322 #[derive(Clone)]
1323 struct FailingCompensationIdempotencyStore {
1324 read_calls: Arc<Mutex<u32>>,
1325 write_calls: Arc<Mutex<u32>>,
1326 }
1327
1328 #[async_trait]
1329 impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
1330 async fn was_compensated(&self, _key: &str) -> Result<bool> {
1331 let mut read_calls = self.read_calls.lock().await;
1332 *read_calls += 1;
1333 Err(anyhow::anyhow!("forced idempotency read failure"))
1334 }
1335
1336 async fn mark_compensated(&self, _key: &str) -> Result<()> {
1337 let mut write_calls = self.write_calls.lock().await;
1338 *write_calls += 1;
1339 Err(anyhow::anyhow!("forced idempotency write failure"))
1340 }
1341 }
1342
1343 #[tokio::test]
1344 async fn execute_persists_success_trace_when_handle_exists() {
1345 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1346 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1347
1348 let mut bus = Bus::new();
1349 bus.insert(PersistenceHandle::from_arc(store_dyn));
1350 bus.insert(PersistenceTraceId::new("trace-success"));
1351
1352 let axon = Axon::<i32, i32, std::convert::Infallible>::start("PersistSuccess").then(AddOne);
1353 let outcome = axon.execute(41, &(), &mut bus).await;
1354 assert!(matches!(outcome, Outcome::Next(42)));
1355
1356 let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
1357 assert_eq!(persisted.events.len(), 2);
1358 assert_eq!(persisted.events[0].outcome_kind, "Enter");
1359 assert_eq!(persisted.events[1].outcome_kind, "Next");
1360 assert_eq!(persisted.completion, Some(CompletionState::Success));
1361 }
1362
1363 #[tokio::test]
1364 async fn execute_persists_fault_completion_state() {
1365 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1366 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1367
1368 let mut bus = Bus::new();
1369 bus.insert(PersistenceHandle::from_arc(store_dyn));
1370 bus.insert(PersistenceTraceId::new("trace-fault"));
1371
1372 let axon = Axon::<i32, i32, &'static str>::start("PersistFault").then(AlwaysFault);
1373 let outcome = axon.execute(41, &(), &mut bus).await;
1374 assert!(matches!(outcome, Outcome::Fault("boom")));
1375
1376 let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
1377 assert_eq!(persisted.events.len(), 2);
1378 assert_eq!(persisted.events[1].outcome_kind, "Fault");
1379 assert_eq!(persisted.completion, Some(CompletionState::Fault));
1380 }
1381
1382 #[tokio::test]
1383 async fn execute_respects_persistence_auto_complete_off() {
1384 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1385 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1386
1387 let mut bus = Bus::new();
1388 bus.insert(PersistenceHandle::from_arc(store_dyn));
1389 bus.insert(PersistenceTraceId::new("trace-no-complete"));
1390 bus.insert(PersistenceAutoComplete(false));
1391
1392 let axon =
1393 Axon::<i32, i32, std::convert::Infallible>::start("PersistNoComplete").then(AddOne);
1394 let outcome = axon.execute(1, &(), &mut bus).await;
1395 assert!(matches!(outcome, Outcome::Next(2)));
1396
1397 let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
1398 assert_eq!(persisted.events.len(), 2);
1399 assert_eq!(persisted.completion, None);
1400 }
1401
1402 #[tokio::test]
1403 async fn fault_triggers_compensation_and_marks_compensated() {
1404 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1405 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1406 let calls = Arc::new(Mutex::new(Vec::new()));
1407 let compensation = RecordingCompensationHook {
1408 calls: calls.clone(),
1409 should_fail: false,
1410 };
1411
1412 let mut bus = Bus::new();
1413 bus.insert(PersistenceHandle::from_arc(store_dyn));
1414 bus.insert(PersistenceTraceId::new("trace-compensated"));
1415 bus.insert(CompensationHandle::from_hook(compensation));
1416
1417 let axon = Axon::<i32, i32, &'static str>::start("CompensatedFault").then(AlwaysFault);
1418 let outcome = axon.execute(7, &(), &mut bus).await;
1419 assert!(matches!(outcome, Outcome::Fault("boom")));
1420
1421 let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
1422 assert_eq!(persisted.events.len(), 3);
1423 assert_eq!(persisted.events[0].outcome_kind, "Enter");
1424 assert_eq!(persisted.events[1].outcome_kind, "Fault");
1425 assert_eq!(persisted.events[2].outcome_kind, "Compensated");
1426 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
1427
1428 let recorded = calls.lock().await;
1429 assert_eq!(recorded.len(), 1);
1430 assert_eq!(recorded[0].trace_id, "trace-compensated");
1431 assert_eq!(recorded[0].fault_kind, "Fault");
1432 }
1433
1434 #[tokio::test]
1435 async fn failed_compensation_keeps_fault_completion() {
1436 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1437 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1438 let calls = Arc::new(Mutex::new(Vec::new()));
1439 let compensation = RecordingCompensationHook {
1440 calls: calls.clone(),
1441 should_fail: true,
1442 };
1443
1444 let mut bus = Bus::new();
1445 bus.insert(PersistenceHandle::from_arc(store_dyn));
1446 bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
1447 bus.insert(CompensationHandle::from_hook(compensation));
1448
1449 let axon = Axon::<i32, i32, &'static str>::start("CompensationFails").then(AlwaysFault);
1450 let outcome = axon.execute(7, &(), &mut bus).await;
1451 assert!(matches!(outcome, Outcome::Fault("boom")));
1452
1453 let persisted = store_impl
1454 .load("trace-compensation-failed")
1455 .await
1456 .unwrap()
1457 .unwrap();
1458 assert_eq!(persisted.events.len(), 2);
1459 assert_eq!(persisted.events[1].outcome_kind, "Fault");
1460 assert_eq!(persisted.completion, Some(CompletionState::Fault));
1461
1462 let recorded = calls.lock().await;
1463 assert_eq!(recorded.len(), 1);
1464 }
1465
1466 #[tokio::test]
1467 async fn compensation_retry_policy_succeeds_after_retries() {
1468 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1469 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1470 let calls = Arc::new(Mutex::new(0u32));
1471 let failures_remaining = Arc::new(Mutex::new(2u32));
1472 let compensation = FlakyCompensationHook {
1473 calls: calls.clone(),
1474 failures_remaining,
1475 };
1476
1477 let mut bus = Bus::new();
1478 bus.insert(PersistenceHandle::from_arc(store_dyn));
1479 bus.insert(PersistenceTraceId::new("trace-retry-success"));
1480 bus.insert(CompensationHandle::from_hook(compensation));
1481 bus.insert(CompensationRetryPolicy {
1482 max_attempts: 3,
1483 backoff_ms: 0,
1484 });
1485
1486 let axon = Axon::<i32, i32, &'static str>::start("CompensationRetry").then(AlwaysFault);
1487 let outcome = axon.execute(7, &(), &mut bus).await;
1488 assert!(matches!(outcome, Outcome::Fault("boom")));
1489
1490 let persisted = store_impl
1491 .load("trace-retry-success")
1492 .await
1493 .unwrap()
1494 .unwrap();
1495 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
1496 assert_eq!(
1497 persisted.events.last().map(|e| e.outcome_kind.as_str()),
1498 Some("Compensated")
1499 );
1500
1501 let attempt_count = calls.lock().await;
1502 assert_eq!(*attempt_count, 3);
1503 }
1504
1505 #[tokio::test]
1506 async fn compensation_idempotency_skips_duplicate_hook_execution() {
1507 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1508 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1509 let calls = Arc::new(Mutex::new(Vec::new()));
1510 let compensation = RecordingCompensationHook {
1511 calls: calls.clone(),
1512 should_fail: false,
1513 };
1514 let idempotency = InMemoryCompensationIdempotencyStore::new();
1515
1516 let mut bus = Bus::new();
1517 bus.insert(PersistenceHandle::from_arc(store_dyn));
1518 bus.insert(PersistenceTraceId::new("trace-idempotent"));
1519 bus.insert(PersistenceAutoComplete(false));
1520 bus.insert(CompensationHandle::from_hook(compensation));
1521 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
1522
1523 let axon =
1524 Axon::<i32, i32, &'static str>::start("CompensationIdempotency").then(AlwaysFault);
1525
1526 let outcome1 = axon.execute(7, &(), &mut bus).await;
1527 let outcome2 = axon.execute(8, &(), &mut bus).await;
1528 assert!(matches!(outcome1, Outcome::Fault("boom")));
1529 assert!(matches!(outcome2, Outcome::Fault("boom")));
1530
1531 let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
1532 assert_eq!(persisted.completion, None);
1533 assert_eq!(persisted.events.len(), 6);
1534 assert_eq!(persisted.events[2].outcome_kind, "Compensated");
1535 assert_eq!(persisted.events[5].outcome_kind, "Compensated");
1536
1537 let recorded = calls.lock().await;
1538 assert_eq!(recorded.len(), 1);
1539 }
1540
1541 #[tokio::test]
1542 async fn compensation_idempotency_store_failure_does_not_block_compensation() {
1543 let store_impl = Arc::new(InMemoryPersistenceStore::new());
1544 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
1545 let calls = Arc::new(Mutex::new(Vec::new()));
1546 let read_calls = Arc::new(Mutex::new(0u32));
1547 let write_calls = Arc::new(Mutex::new(0u32));
1548 let compensation = RecordingCompensationHook {
1549 calls: calls.clone(),
1550 should_fail: false,
1551 };
1552 let idempotency = FailingCompensationIdempotencyStore {
1553 read_calls: read_calls.clone(),
1554 write_calls: write_calls.clone(),
1555 };
1556
1557 let mut bus = Bus::new();
1558 bus.insert(PersistenceHandle::from_arc(store_dyn));
1559 bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
1560 bus.insert(CompensationHandle::from_hook(compensation));
1561 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
1562
1563 let axon =
1564 Axon::<i32, i32, &'static str>::start("IdempotencyStoreFailure").then(AlwaysFault);
1565 let outcome = axon.execute(9, &(), &mut bus).await;
1566 assert!(matches!(outcome, Outcome::Fault("boom")));
1567
1568 let persisted = store_impl
1569 .load("trace-idempotency-store-failure")
1570 .await
1571 .unwrap()
1572 .unwrap();
1573 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
1574 assert_eq!(
1575 persisted.events.last().map(|e| e.outcome_kind.as_str()),
1576 Some("Compensated")
1577 );
1578
1579 let recorded = calls.lock().await;
1580 assert_eq!(recorded.len(), 1);
1581 assert_eq!(*read_calls.lock().await, 1);
1582 assert_eq!(*write_calls.lock().await, 1);
1583 }
1584
1585 #[tokio::test]
1586 async fn transition_bus_policy_blocks_unauthorized_resource_access() {
1587 let mut bus = Bus::new();
1588 bus.insert(1_i32);
1589 bus.insert("secret".to_string());
1590
1591 let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
1592 let outcome = axon.execute((), &(), &mut bus).await;
1593
1594 match outcome {
1595 Outcome::Fault(msg) => {
1596 assert!(msg.contains("Bus access denied"), "{msg}");
1597 assert!(msg.contains("CapabilityGuarded"), "{msg}");
1598 assert!(msg.contains("alloc::string::String"), "{msg}");
1599 }
1600 other => panic!("expected fault, got {other:?}"),
1601 }
1602 }
1603}