1use crate::persistence::{
15 CompensationAutoTrigger, CompensationContext, CompensationHandle,
16 CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17 PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28 BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45#[derive(Clone)]
47pub enum ExecutionMode {
48 Local,
50 Singleton {
52 lock_key: String,
53 ttl_ms: u64,
54 lock_provider: Arc<dyn DistributedLock>,
55 },
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum ParallelStrategy {
63 AllMustSucceed,
65 AnyCanFail,
68}
69
70pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
72
73pub type Executor<In, Out, E, Res> =
77 Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
78
79#[derive(Debug, Clone)]
81pub struct ManualJump {
82 pub target_node: String,
83 pub payload_override: Option<serde_json::Value>,
84}
85
86#[derive(Debug, Clone, Copy)]
88struct StartStep(u64);
89
90#[derive(Debug, Clone)]
92struct ResumptionState {
93 payload: Option<serde_json::Value>,
94}
95
96fn type_name_of<T: ?Sized>() -> String {
98 let full = type_name::<T>();
99 full.split("::").last().unwrap_or(full).to_string()
100}
101
102pub struct Axon<In, Out, E, Res = ()> {
122 pub schematic: Schematic,
124 executor: Executor<In, Out, E, Res>,
126 pub execution_mode: ExecutionMode,
128 pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
130 pub audit_sink: Option<Arc<dyn AuditSink>>,
132 pub dlq_sink: Option<Arc<dyn DlqSink>>,
134 pub dlq_policy: DlqPolicy,
136 pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
138 pub saga_policy: SagaPolicy,
140 pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
142 pub saga_compensation_registry:
144 Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
145 pub iam_handle: Option<ranvier_core::iam::IamHandle>,
147}
148
149#[derive(Debug, Clone)]
151pub struct SchematicExportRequest {
152 pub output: Option<PathBuf>,
154}
155
156impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
157 fn clone(&self) -> Self {
158 Self {
159 schematic: self.schematic.clone(),
160 executor: self.executor.clone(),
161 execution_mode: self.execution_mode.clone(),
162 persistence_store: self.persistence_store.clone(),
163 audit_sink: self.audit_sink.clone(),
164 dlq_sink: self.dlq_sink.clone(),
165 dlq_policy: self.dlq_policy.clone(),
166 dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
167 saga_policy: self.saga_policy.clone(),
168 dynamic_saga_policy: self.dynamic_saga_policy.clone(),
169 saga_compensation_registry: self.saga_compensation_registry.clone(),
170 iam_handle: self.iam_handle.clone(),
171 }
172 }
173}
174
175impl<In, E, Res> Axon<In, In, E, Res>
176where
177 In: Send + Sync + Serialize + DeserializeOwned + 'static,
178 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
179 Res: ranvier_core::transition::ResourceRequirement,
180{
181 #[track_caller]
184 pub fn new(label: &str) -> Self {
185 let caller = Location::caller();
186 Self::start_with_source(label, caller)
187 }
188
189 #[track_caller]
192 pub fn start(label: &str) -> Self {
193 let caller = Location::caller();
194 Self::start_with_source(label, caller)
195 }
196
197 fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
198 let node_id = uuid::Uuid::new_v4().to_string();
199 let node = Node {
200 id: node_id,
201 kind: NodeKind::Ingress,
202 label: label.to_string(),
203 description: None,
204 input_type: "void".to_string(),
205 output_type: type_name_of::<In>(),
206 resource_type: type_name_of::<Res>(),
207 metadata: Default::default(),
208 bus_capability: None,
209 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
210 position: None,
211 compensation_node_id: None,
212 input_schema: None,
213 output_schema: None,
214 };
215
216 let mut schematic = Schematic::new(label);
217 schematic.nodes.push(node);
218
219 let executor: Executor<In, In, E, Res> =
220 Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
221
222 Self {
223 schematic,
224 executor,
225 execution_mode: ExecutionMode::Local,
226 persistence_store: None,
227 audit_sink: None,
228 dlq_sink: None,
229 dlq_policy: DlqPolicy::default(),
230 dynamic_dlq_policy: None,
231 saga_policy: SagaPolicy::default(),
232 dynamic_saga_policy: None,
233 saga_compensation_registry: Arc::new(std::sync::RwLock::new(
234 ranvier_core::saga::SagaCompensationRegistry::new(),
235 )),
236 iam_handle: None,
237 }
238 }
239}
240
241impl Axon<(), (), (), ()> {
242 #[track_caller]
257 pub fn simple<E>(label: &str) -> Axon<(), (), E, ()>
258 where
259 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
260 {
261 let caller = Location::caller();
262 <Axon<(), (), E, ()>>::start_with_source(label, caller)
263 }
264
265 #[track_caller]
284 pub fn typed<In, E>(label: &str) -> Axon<In, In, E, ()>
285 where
286 In: Send + Sync + Serialize + DeserializeOwned + 'static,
287 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
288 {
289 let caller = Location::caller();
290 <Axon<In, In, E, ()>>::start_with_source(label, caller)
291 }
292}
293
294impl<In, Out, E, Res> Axon<In, Out, E, Res>
295where
296 In: Send + Sync + Serialize + DeserializeOwned + 'static,
297 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
298 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
299 Res: ranvier_core::transition::ResourceRequirement,
300{
301 pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
303 self.execution_mode = mode;
304 self
305 }
306
307 pub fn with_version(mut self, version: impl Into<String>) -> Self {
309 self.schematic.schema_version = version.into();
310 self
311 }
312
313 pub fn with_persistence_store<S>(mut self, store: S) -> Self
315 where
316 S: crate::persistence::PersistenceStore + 'static,
317 {
318 self.persistence_store = Some(Arc::new(store));
319 self
320 }
321
322 pub fn with_audit_sink<S>(mut self, sink: S) -> Self
324 where
325 S: AuditSink + 'static,
326 {
327 self.audit_sink = Some(Arc::new(sink));
328 self
329 }
330
331 pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
333 where
334 S: DlqSink + 'static,
335 {
336 self.dlq_sink = Some(Arc::new(sink));
337 self
338 }
339
340 pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
342 self.dlq_policy = policy;
343 self
344 }
345
346 pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
348 self.saga_policy = policy;
349 self
350 }
351
352 pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
355 self.dynamic_dlq_policy = Some(dynamic);
356 self
357 }
358
359 pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
362 self.dynamic_saga_policy = Some(dynamic);
363 self
364 }
365
366 pub fn with_iam(
374 mut self,
375 policy: ranvier_core::iam::IamPolicy,
376 verifier: impl ranvier_core::iam::IamVerifier + 'static,
377 ) -> Self {
378 self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
379 policy,
380 Arc::new(verifier),
381 ));
382 self
383 }
384
385 #[cfg(feature = "schema")]
396 pub fn with_input_schema<T>(mut self) -> Self
397 where
398 T: schemars::JsonSchema,
399 {
400 if let Some(last_node) = self.schematic.nodes.last_mut() {
401 let schema = schemars::schema_for!(T);
402 last_node.input_schema =
403 Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
404 }
405 self
406 }
407
408 #[cfg(feature = "schema")]
412 pub fn with_output_schema<T>(mut self) -> Self
413 where
414 T: schemars::JsonSchema,
415 {
416 if let Some(last_node) = self.schematic.nodes.last_mut() {
417 let schema = schemars::schema_for!(T);
418 last_node.output_schema =
419 Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
420 }
421 self
422 }
423
424 pub fn with_input_schema_value(mut self, schema: serde_json::Value) -> Self {
428 if let Some(last_node) = self.schematic.nodes.last_mut() {
429 last_node.input_schema = Some(schema);
430 }
431 self
432 }
433
434 pub fn with_output_schema_value(mut self, schema: serde_json::Value) -> Self {
436 if let Some(last_node) = self.schematic.nodes.last_mut() {
437 last_node.output_schema = Some(schema);
438 }
439 self
440 }
441}
442
443#[async_trait]
444impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
445where
446 In: Send + Sync + Serialize + DeserializeOwned + 'static,
447 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
448 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
449 Res: ranvier_core::transition::ResourceRequirement,
450{
451 async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
452 let store = self.persistence_store.as_ref()?;
453 let trace = store.load(trace_id).await.ok().flatten()?;
454 Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
455 }
456
457 async fn force_resume(
458 &self,
459 trace_id: &str,
460 target_node: &str,
461 payload_override: Option<Value>,
462 ) -> Result<(), String> {
463 let store = self
464 .persistence_store
465 .as_ref()
466 .ok_or("No persistence store attached to Axon")?;
467
468 let intervention = crate::persistence::Intervention {
469 target_node: target_node.to_string(),
470 payload_override,
471 timestamp_ms: now_ms(),
472 };
473
474 store
475 .save_intervention(trace_id, intervention)
476 .await
477 .map_err(|e| format!("Failed to save intervention: {}", e))?;
478
479 if let Some(sink) = self.audit_sink.as_ref() {
480 let event = AuditEvent::new(
481 uuid::Uuid::new_v4().to_string(),
482 "Inspector".to_string(),
483 "ForceResume".to_string(),
484 trace_id.to_string(),
485 )
486 .with_metadata("target_node", target_node);
487
488 let _ = sink.append(&event).await;
489 }
490
491 tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
492 Ok(())
493 }
494}
495
496impl<In, Out, E, Res> Axon<In, Out, E, Res>
497where
498 In: Send + Sync + Serialize + DeserializeOwned + 'static,
499 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
500 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
501 Res: ranvier_core::transition::ResourceRequirement,
502{
503 #[track_caller]
507 pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
508 where
509 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
510 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
511 {
512 let caller = Location::caller();
513 let Axon {
515 mut schematic,
516 executor: prev_executor,
517 execution_mode,
518 persistence_store,
519 audit_sink,
520 dlq_sink,
521 dlq_policy,
522 dynamic_dlq_policy,
523 saga_policy,
524 dynamic_saga_policy,
525 saga_compensation_registry,
526 iam_handle,
527 } = self;
528
529 let next_node_id = uuid::Uuid::new_v4().to_string();
531 let next_node = Node {
532 id: next_node_id.clone(),
533 kind: NodeKind::Atom,
534 label: transition.label(),
535 description: transition.description(),
536 input_type: type_name_of::<Out>(),
537 output_type: type_name_of::<Next>(),
538 resource_type: type_name_of::<Res>(),
539 metadata: Default::default(),
540 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
541 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
542 position: transition
543 .position()
544 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
545 compensation_node_id: None,
546 input_schema: transition.input_schema(),
547 output_schema: None,
548 };
549
550 let last_node_id = schematic
551 .nodes
552 .last()
553 .map(|n| n.id.clone())
554 .unwrap_or_default();
555
556 schematic.nodes.push(next_node);
557 schematic.edges.push(Edge {
558 from: last_node_id,
559 to: next_node_id.clone(),
560 kind: EdgeType::Linear,
561 label: Some("Next".to_string()),
562 });
563
564 let node_id_for_exec = next_node_id.clone();
566 let node_label_for_exec = transition.label();
567 let bus_policy_for_exec = transition.bus_access_policy();
568 let bus_policy_clone = bus_policy_for_exec.clone();
569 let current_step_idx = schematic.nodes.len() as u64 - 1;
570 let next_executor: Executor<In, Next, E, Res> = Arc::new(
571 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
572 let prev = prev_executor.clone();
573 let trans = transition.clone();
574 let timeline_node_id = node_id_for_exec.clone();
575 let timeline_node_label = node_label_for_exec.clone();
576 let transition_bus_policy = bus_policy_clone.clone();
577 let step_idx = current_step_idx;
578
579 Box::pin(async move {
580 if let Some(jump) = bus.read::<ManualJump>()
582 && (jump.target_node == timeline_node_id
583 || jump.target_node == timeline_node_label)
584 {
585 tracing::info!(
586 node_id = %timeline_node_id,
587 node_label = %timeline_node_label,
588 "Manual jump target reached; skipping previous steps"
589 );
590
591 let state = if let Some(ow) = jump.payload_override.clone() {
592 match serde_json::from_value::<Out>(ow) {
593 Ok(s) => s,
594 Err(e) => {
595 tracing::error!(
596 "Payload override deserialization failed: {}",
597 e
598 );
599 return Outcome::emit(
600 "execution.jump.payload_error",
601 Some(serde_json::json!({"error": e.to_string()})),
602 )
603 .map(|_: ()| unreachable!());
604 }
605 }
606 } else {
607 return Outcome::emit(
611 "execution.jump.missing_payload",
612 Some(serde_json::json!({"node_id": timeline_node_id})),
613 );
614 };
615
616 return run_this_step::<Out, Next, E, Res>(
618 &trans,
619 state,
620 res,
621 bus,
622 &timeline_node_id,
623 &timeline_node_label,
624 &transition_bus_policy,
625 step_idx,
626 )
627 .await;
628 }
629
630 if let Some(start) = bus.read::<StartStep>()
632 && step_idx == start.0
633 && bus.read::<ResumptionState>().is_some()
634 {
635 let fresh_state = serde_json::to_value(&input)
639 .ok()
640 .and_then(|v| serde_json::from_value::<Out>(v).ok());
641 let persisted_state = bus
642 .read::<ResumptionState>()
643 .and_then(|r| r.payload.clone())
644 .and_then(|p| serde_json::from_value::<Out>(p).ok());
645
646 if let Some(s) = fresh_state.or(persisted_state) {
647 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
648 return run_this_step::<Out, Next, E, Res>(
649 &trans,
650 s,
651 res,
652 bus,
653 &timeline_node_id,
654 &timeline_node_label,
655 &transition_bus_policy,
656 step_idx,
657 )
658 .await;
659 }
660
661 return Outcome::emit(
662 "execution.resumption.payload_error",
663 Some(serde_json::json!({"error": "no compatible resumption state"})),
664 )
665 .map(|_: ()| unreachable!());
666 }
667
668 let prev_result = prev(input, res, bus).await;
670
671 let state = match prev_result {
673 Outcome::Next(t) => t,
674 other => return other.map(|_| unreachable!()),
675 };
676
677 run_this_step::<Out, Next, E, Res>(
678 &trans,
679 state,
680 res,
681 bus,
682 &timeline_node_id,
683 &timeline_node_label,
684 &transition_bus_policy,
685 step_idx,
686 )
687 .await
688 })
689 },
690 );
691 Axon {
692 schematic,
693 executor: next_executor,
694 execution_mode,
695 persistence_store,
696 audit_sink,
697 dlq_sink,
698 dlq_policy,
699 dynamic_dlq_policy,
700 saga_policy,
701 dynamic_saga_policy,
702 saga_compensation_registry,
703 iam_handle,
704 }
705 }
706
707 #[track_caller]
733 pub fn then_fn<Next, F>(self, label: &str, f: F) -> Axon<In, Next, E, Res>
734 where
735 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
736 F: Fn(Out, &mut Bus) -> Outcome<Next, E> + Clone + Send + Sync + 'static,
737 {
738 self.then(crate::closure_transition::ClosureTransition::new(label, f))
739 }
740
741 #[track_caller]
757 pub fn then_with_retry<Next, Trans>(
758 self,
759 transition: Trans,
760 policy: crate::retry::RetryPolicy,
761 ) -> Axon<In, Next, E, Res>
762 where
763 Out: Clone,
764 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
765 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
766 {
767 let caller = Location::caller();
768 let Axon {
769 mut schematic,
770 executor: prev_executor,
771 execution_mode,
772 persistence_store,
773 audit_sink,
774 dlq_sink,
775 dlq_policy,
776 dynamic_dlq_policy,
777 saga_policy,
778 dynamic_saga_policy,
779 saga_compensation_registry,
780 iam_handle,
781 } = self;
782
783 let next_node_id = uuid::Uuid::new_v4().to_string();
784 let next_node = Node {
785 id: next_node_id.clone(),
786 kind: NodeKind::Atom,
787 label: transition.label(),
788 description: transition.description(),
789 input_type: type_name_of::<Out>(),
790 output_type: type_name_of::<Next>(),
791 resource_type: type_name_of::<Res>(),
792 metadata: Default::default(),
793 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
794 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
795 position: transition
796 .position()
797 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
798 compensation_node_id: None,
799 input_schema: transition.input_schema(),
800 output_schema: None,
801 };
802
803 let last_node_id = schematic
804 .nodes
805 .last()
806 .map(|n| n.id.clone())
807 .unwrap_or_default();
808
809 schematic.nodes.push(next_node);
810 schematic.edges.push(Edge {
811 from: last_node_id,
812 to: next_node_id.clone(),
813 kind: EdgeType::Linear,
814 label: Some("Next (retryable)".to_string()),
815 });
816
817 let node_id_for_exec = next_node_id.clone();
818 let node_label_for_exec = transition.label();
819 let bus_policy_for_exec = transition.bus_access_policy();
820 let bus_policy_clone = bus_policy_for_exec.clone();
821 let current_step_idx = schematic.nodes.len() as u64 - 1;
822 let next_executor: Executor<In, Next, E, Res> = Arc::new(
823 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
824 let prev = prev_executor.clone();
825 let trans = transition.clone();
826 let timeline_node_id = node_id_for_exec.clone();
827 let timeline_node_label = node_label_for_exec.clone();
828 let transition_bus_policy = bus_policy_clone.clone();
829 let step_idx = current_step_idx;
830 let retry_policy = policy.clone();
831
832 Box::pin(async move {
833 let prev_result = prev(input, res, bus).await;
835 let state = match prev_result {
836 Outcome::Next(t) => t,
837 other => return other.map(|_| unreachable!()),
838 };
839
840 let mut last_result = None;
842 for attempt in 0..=retry_policy.max_retries {
843 let attempt_state = state.clone();
844
845 let result = run_this_step::<Out, Next, E, Res>(
846 &trans,
847 attempt_state,
848 res,
849 bus,
850 &timeline_node_id,
851 &timeline_node_label,
852 &transition_bus_policy,
853 step_idx,
854 )
855 .await;
856
857 match &result {
858 Outcome::Next(_) => return result,
859 Outcome::Fault(_) if attempt < retry_policy.max_retries => {
860 let delay = retry_policy.delay_for_attempt(attempt);
861 tracing::warn!(
862 node_id = %timeline_node_id,
863 attempt = attempt + 1,
864 max = retry_policy.max_retries,
865 delay_ms = delay.as_millis() as u64,
866 "Transition failed, retrying"
867 );
868 if let Some(timeline) = bus.read_mut::<Timeline>() {
869 timeline.push(TimelineEvent::NodeRetry {
870 node_id: timeline_node_id.clone(),
871 attempt: attempt + 1,
872 max_attempts: retry_policy.max_retries,
873 backoff_ms: delay.as_millis() as u64,
874 timestamp: now_ms(),
875 });
876 }
877 tokio::time::sleep(delay).await;
878 }
879 _ => {
880 last_result = Some(result);
881 break;
882 }
883 }
884 }
885
886 last_result.unwrap_or_else(|| {
887 Outcome::emit("execution.retry.exhausted", None)
888 })
889 })
890 },
891 );
892 Axon {
893 schematic,
894 executor: next_executor,
895 execution_mode,
896 persistence_store,
897 audit_sink,
898 dlq_sink,
899 dlq_policy,
900 dynamic_dlq_policy,
901 saga_policy,
902 dynamic_saga_policy,
903 saga_compensation_registry,
904 iam_handle,
905 }
906 }
907
908 #[track_caller]
927 pub fn then_with_timeout<Next, Trans, F>(
928 self,
929 transition: Trans,
930 duration: std::time::Duration,
931 make_timeout_error: F,
932 ) -> Axon<In, Next, E, Res>
933 where
934 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
935 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
936 F: Fn() -> E + Clone + Send + Sync + 'static,
937 {
938 let caller = Location::caller();
939 let Axon {
940 mut schematic,
941 executor: prev_executor,
942 execution_mode,
943 persistence_store,
944 audit_sink,
945 dlq_sink,
946 dlq_policy,
947 dynamic_dlq_policy,
948 saga_policy,
949 dynamic_saga_policy,
950 saga_compensation_registry,
951 iam_handle,
952 } = self;
953
954 let next_node_id = uuid::Uuid::new_v4().to_string();
955 let next_node = Node {
956 id: next_node_id.clone(),
957 kind: NodeKind::Atom,
958 label: transition.label(),
959 description: transition.description(),
960 input_type: type_name_of::<Out>(),
961 output_type: type_name_of::<Next>(),
962 resource_type: type_name_of::<Res>(),
963 metadata: Default::default(),
964 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
965 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
966 position: transition
967 .position()
968 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
969 compensation_node_id: None,
970 input_schema: transition.input_schema(),
971 output_schema: None,
972 };
973
974 let last_node_id = schematic
975 .nodes
976 .last()
977 .map(|n| n.id.clone())
978 .unwrap_or_default();
979
980 schematic.nodes.push(next_node);
981 schematic.edges.push(Edge {
982 from: last_node_id,
983 to: next_node_id.clone(),
984 kind: EdgeType::Linear,
985 label: Some("Next (timeout-guarded)".to_string()),
986 });
987
988 let node_id_for_exec = next_node_id.clone();
989 let node_label_for_exec = transition.label();
990 let bus_policy_for_exec = transition.bus_access_policy();
991 let bus_policy_clone = bus_policy_for_exec.clone();
992 let current_step_idx = schematic.nodes.len() as u64 - 1;
993 let next_executor: Executor<In, Next, E, Res> = Arc::new(
994 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
995 let prev = prev_executor.clone();
996 let trans = transition.clone();
997 let timeline_node_id = node_id_for_exec.clone();
998 let timeline_node_label = node_label_for_exec.clone();
999 let transition_bus_policy = bus_policy_clone.clone();
1000 let step_idx = current_step_idx;
1001 let timeout_duration = duration;
1002 let error_factory = make_timeout_error.clone();
1003
1004 Box::pin(async move {
1005 let prev_result = prev(input, res, bus).await;
1007 let state = match prev_result {
1008 Outcome::Next(t) => t,
1009 other => return other.map(|_| unreachable!()),
1010 };
1011
1012 match tokio::time::timeout(
1014 timeout_duration,
1015 run_this_step::<Out, Next, E, Res>(
1016 &trans,
1017 state,
1018 res,
1019 bus,
1020 &timeline_node_id,
1021 &timeline_node_label,
1022 &transition_bus_policy,
1023 step_idx,
1024 ),
1025 )
1026 .await
1027 {
1028 Ok(result) => result,
1029 Err(_elapsed) => {
1030 tracing::warn!(
1031 node_id = %timeline_node_id,
1032 timeout_ms = timeout_duration.as_millis() as u64,
1033 "Transition timed out"
1034 );
1035 if let Some(timeline) = bus.read_mut::<Timeline>() {
1036 timeline.push(TimelineEvent::NodeTimeout {
1037 node_id: timeline_node_id.clone(),
1038 timeout_ms: timeout_duration.as_millis() as u64,
1039 timestamp: now_ms(),
1040 });
1041 }
1042 Outcome::Fault(error_factory())
1043 }
1044 }
1045 })
1046 },
1047 );
1048 Axon {
1049 schematic,
1050 executor: next_executor,
1051 execution_mode,
1052 persistence_store,
1053 audit_sink,
1054 dlq_sink,
1055 dlq_policy,
1056 dynamic_dlq_policy,
1057 saga_policy,
1058 dynamic_saga_policy,
1059 saga_compensation_registry,
1060 iam_handle,
1061 }
1062 }
1063
1064 #[track_caller]
1069 pub fn then_compensated<Next, Trans, Comp>(
1070 self,
1071 transition: Trans,
1072 compensation: Comp,
1073 ) -> Axon<In, Next, E, Res>
1074 where
1075 Out: Clone,
1076 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
1077 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1078 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1079 {
1080 let caller = Location::caller();
1081 let Axon {
1082 mut schematic,
1083 executor: prev_executor,
1084 execution_mode,
1085 persistence_store,
1086 audit_sink,
1087 dlq_sink,
1088 dlq_policy,
1089 dynamic_dlq_policy,
1090 saga_policy,
1091 dynamic_saga_policy,
1092 saga_compensation_registry,
1093 iam_handle,
1094 } = self;
1095
1096 let next_node_id = uuid::Uuid::new_v4().to_string();
1098 let comp_node_id = uuid::Uuid::new_v4().to_string();
1099
1100 let next_node = Node {
1101 id: next_node_id.clone(),
1102 kind: NodeKind::Atom,
1103 label: transition.label(),
1104 description: transition.description(),
1105 input_type: type_name_of::<Out>(),
1106 output_type: type_name_of::<Next>(),
1107 resource_type: type_name_of::<Res>(),
1108 metadata: Default::default(),
1109 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
1110 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1111 position: transition
1112 .position()
1113 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1114 compensation_node_id: Some(comp_node_id.clone()),
1115 input_schema: None,
1116 output_schema: None,
1117 };
1118
1119 let comp_node = Node {
1121 id: comp_node_id.clone(),
1122 kind: NodeKind::Atom,
1123 label: format!("Compensate: {}", compensation.label()),
1124 description: compensation.description(),
1125 input_type: type_name_of::<Out>(),
1126 output_type: "void".to_string(),
1127 resource_type: type_name_of::<Res>(),
1128 metadata: Default::default(),
1129 bus_capability: None,
1130 source_location: None,
1131 position: compensation
1132 .position()
1133 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1134 compensation_node_id: None,
1135 input_schema: None,
1136 output_schema: None,
1137 };
1138
1139 let last_node_id = schematic
1140 .nodes
1141 .last()
1142 .map(|n| n.id.clone())
1143 .unwrap_or_default();
1144
1145 schematic.nodes.push(next_node);
1146 schematic.nodes.push(comp_node);
1147 schematic.edges.push(Edge {
1148 from: last_node_id,
1149 to: next_node_id.clone(),
1150 kind: EdgeType::Linear,
1151 label: Some("Next".to_string()),
1152 });
1153
1154 let node_id_for_exec = next_node_id.clone();
1156 let comp_id_for_exec = comp_node_id.clone();
1157 let node_label_for_exec = transition.label();
1158 let bus_policy_for_exec = transition.bus_access_policy();
1159 let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
1160 let comp_for_exec = compensation.clone();
1161 let bus_policy_for_executor = bus_policy_for_exec.clone();
1162 let bus_policy_for_registry = bus_policy_for_exec.clone();
1163 let next_executor: Executor<In, Next, E, Res> = Arc::new(
1164 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
1165 let prev = prev_executor.clone();
1166 let trans = transition.clone();
1167 let comp = comp_for_exec.clone();
1168 let timeline_node_id = node_id_for_exec.clone();
1169 let timeline_comp_id = comp_id_for_exec.clone();
1170 let timeline_node_label = node_label_for_exec.clone();
1171 let transition_bus_policy = bus_policy_for_executor.clone();
1172 let step_idx = step_idx_for_exec;
1173
1174 Box::pin(async move {
1175 if let Some(jump) = bus.read::<ManualJump>()
1177 && (jump.target_node == timeline_node_id
1178 || jump.target_node == timeline_node_label)
1179 {
1180 tracing::info!(
1181 node_id = %timeline_node_id,
1182 node_label = %timeline_node_label,
1183 "Manual jump target reached (compensated); skipping previous steps"
1184 );
1185
1186 let state = if let Some(ow) = jump.payload_override.clone() {
1187 match serde_json::from_value::<Out>(ow) {
1188 Ok(s) => s,
1189 Err(e) => {
1190 tracing::error!(
1191 "Payload override deserialization failed: {}",
1192 e
1193 );
1194 return Outcome::emit(
1195 "execution.jump.payload_error",
1196 Some(serde_json::json!({"error": e.to_string()})),
1197 )
1198 .map(|_: ()| unreachable!());
1199 }
1200 }
1201 } else {
1202 return Outcome::emit(
1203 "execution.jump.missing_payload",
1204 Some(serde_json::json!({"node_id": timeline_node_id})),
1205 )
1206 .map(|_: ()| unreachable!());
1207 };
1208
1209 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
1211 &trans,
1212 &comp,
1213 state,
1214 res,
1215 bus,
1216 &timeline_node_id,
1217 &timeline_comp_id,
1218 &timeline_node_label,
1219 &transition_bus_policy,
1220 step_idx,
1221 )
1222 .await;
1223 }
1224
1225 if let Some(start) = bus.read::<StartStep>()
1227 && step_idx == start.0
1228 && bus.read::<ResumptionState>().is_some()
1229 {
1230 let fresh_state = serde_json::to_value(&input)
1231 .ok()
1232 .and_then(|v| serde_json::from_value::<Out>(v).ok());
1233 let persisted_state = bus
1234 .read::<ResumptionState>()
1235 .and_then(|r| r.payload.clone())
1236 .and_then(|p| serde_json::from_value::<Out>(p).ok());
1237
1238 if let Some(s) = fresh_state.or(persisted_state) {
1239 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
1240 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
1241 &trans,
1242 &comp,
1243 s,
1244 res,
1245 bus,
1246 &timeline_node_id,
1247 &timeline_comp_id,
1248 &timeline_node_label,
1249 &transition_bus_policy,
1250 step_idx,
1251 )
1252 .await;
1253 }
1254
1255 return Outcome::emit(
1256 "execution.resumption.payload_error",
1257 Some(serde_json::json!({"error": "no compatible resumption state"})),
1258 )
1259 .map(|_: ()| unreachable!());
1260 }
1261
1262 let prev_result = prev(input, res, bus).await;
1264
1265 let state = match prev_result {
1267 Outcome::Next(t) => t,
1268 other => return other.map(|_| unreachable!()),
1269 };
1270
1271 run_this_compensated_step::<Out, Next, E, Res, Comp>(
1272 &trans,
1273 &comp,
1274 state,
1275 res,
1276 bus,
1277 &timeline_node_id,
1278 &timeline_comp_id,
1279 &timeline_node_label,
1280 &transition_bus_policy,
1281 step_idx,
1282 )
1283 .await
1284 })
1285 },
1286 );
1287 {
1289 let mut registry = saga_compensation_registry.write().expect("saga compensation registry lock poisoned");
1290 let comp_fn = compensation.clone();
1291 let transition_bus_policy = bus_policy_for_registry.clone();
1292
1293 let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
1294 Arc::new(move |input_data, res, bus| {
1295 let comp = comp_fn.clone();
1296 let bus_policy = transition_bus_policy.clone();
1297 Box::pin(async move {
1298 let input: Out = serde_json::from_slice(&input_data).expect("saga compensation input deserialization failed — type mismatch between snapshot and compensation handler");
1299 bus.set_access_policy(comp.label(), bus_policy);
1300 let res = comp.run(input, res, bus).await;
1301 bus.clear_access_policy();
1302 res
1303 })
1304 });
1305 registry.register(next_node_id.clone(), handler);
1306 }
1307
1308 Axon {
1309 schematic,
1310 executor: next_executor,
1311 execution_mode,
1312 persistence_store,
1313 audit_sink,
1314 dlq_sink,
1315 dlq_policy,
1316 dynamic_dlq_policy,
1317 saga_policy,
1318 dynamic_saga_policy,
1319 saga_compensation_registry,
1320 iam_handle,
1321 }
1322 }
1323
1324 #[track_caller]
1327 pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
1328 where
1329 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1330 {
1331 let caller = Location::caller();
1334 let comp_node_id = uuid::Uuid::new_v4().to_string();
1335
1336 let comp_node = Node {
1337 id: comp_node_id.clone(),
1338 kind: NodeKind::Atom,
1339 label: transition.label(),
1340 description: transition.description(),
1341 input_type: type_name_of::<Out>(),
1342 output_type: "void".to_string(),
1343 resource_type: type_name_of::<Res>(),
1344 metadata: Default::default(),
1345 bus_capability: None,
1346 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1347 position: transition
1348 .position()
1349 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1350 compensation_node_id: None,
1351 input_schema: None,
1352 output_schema: None,
1353 };
1354
1355 if let Some(last_node) = self.schematic.nodes.last_mut() {
1356 last_node.compensation_node_id = Some(comp_node_id.clone());
1357 }
1358
1359 self.schematic.nodes.push(comp_node);
1360 self
1361 }
1362
1363 #[track_caller]
1365 pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
1366 let caller = Location::caller();
1367 let branch_id_str = branch_id.into();
1368 let last_node_id = self
1369 .schematic
1370 .nodes
1371 .last()
1372 .map(|n| n.id.clone())
1373 .unwrap_or_default();
1374
1375 let branch_node = Node {
1376 id: uuid::Uuid::new_v4().to_string(),
1377 kind: NodeKind::Synapse,
1378 label: label.to_string(),
1379 description: None,
1380 input_type: type_name_of::<Out>(),
1381 output_type: type_name_of::<Out>(),
1382 resource_type: type_name_of::<Res>(),
1383 metadata: Default::default(),
1384 bus_capability: None,
1385 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1386 position: None,
1387 compensation_node_id: None,
1388 input_schema: None,
1389 output_schema: None,
1390 };
1391
1392 self.schematic.nodes.push(branch_node);
1393 self.schematic.edges.push(Edge {
1394 from: last_node_id,
1395 to: branch_id_str.clone(),
1396 kind: EdgeType::Branch(branch_id_str),
1397 label: Some("Branch".to_string()),
1398 });
1399
1400 self
1401 }
1402
1403 #[track_caller]
1440 pub fn parallel(
1441 self,
1442 transitions: Vec<Arc<dyn Transition<Out, Out, Resources = Res, Error = E> + Send + Sync>>,
1443 strategy: ParallelStrategy,
1444 ) -> Axon<In, Out, E, Res>
1445 where
1446 Out: Clone,
1447 {
1448 let caller = Location::caller();
1449 let Axon {
1450 mut schematic,
1451 executor: prev_executor,
1452 execution_mode,
1453 persistence_store,
1454 audit_sink,
1455 dlq_sink,
1456 dlq_policy,
1457 dynamic_dlq_policy,
1458 saga_policy,
1459 dynamic_saga_policy,
1460 saga_compensation_registry,
1461 iam_handle,
1462 } = self;
1463
1464 let fanout_id = uuid::Uuid::new_v4().to_string();
1466 let fanin_id = uuid::Uuid::new_v4().to_string();
1467
1468 let last_node_id = schematic
1469 .nodes
1470 .last()
1471 .map(|n| n.id.clone())
1472 .unwrap_or_default();
1473
1474 let fanout_node = Node {
1475 id: fanout_id.clone(),
1476 kind: NodeKind::FanOut,
1477 label: "FanOut".to_string(),
1478 description: Some(format!(
1479 "Parallel split ({} branches, {:?})",
1480 transitions.len(),
1481 strategy
1482 )),
1483 input_type: type_name_of::<Out>(),
1484 output_type: type_name_of::<Out>(),
1485 resource_type: type_name_of::<Res>(),
1486 metadata: Default::default(),
1487 bus_capability: None,
1488 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1489 position: None,
1490 compensation_node_id: None,
1491 input_schema: None,
1492 output_schema: None,
1493 };
1494
1495 schematic.nodes.push(fanout_node);
1496 schematic.edges.push(Edge {
1497 from: last_node_id,
1498 to: fanout_id.clone(),
1499 kind: EdgeType::Linear,
1500 label: Some("Next".to_string()),
1501 });
1502
1503 let mut branch_node_ids = Vec::with_capacity(transitions.len());
1505 for (i, trans) in transitions.iter().enumerate() {
1506 let branch_id = uuid::Uuid::new_v4().to_string();
1507 let branch_node = Node {
1508 id: branch_id.clone(),
1509 kind: NodeKind::Atom,
1510 label: trans.label(),
1511 description: trans.description(),
1512 input_type: type_name_of::<Out>(),
1513 output_type: type_name_of::<Out>(),
1514 resource_type: type_name_of::<Res>(),
1515 metadata: Default::default(),
1516 bus_capability: bus_capability_schema_from_policy(trans.bus_access_policy()),
1517 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1518 position: None,
1519 compensation_node_id: None,
1520 input_schema: trans.input_schema(),
1521 output_schema: None,
1522 };
1523 schematic.nodes.push(branch_node);
1524 schematic.edges.push(Edge {
1525 from: fanout_id.clone(),
1526 to: branch_id.clone(),
1527 kind: EdgeType::Parallel,
1528 label: Some(format!("Branch {}", i)),
1529 });
1530 branch_node_ids.push(branch_id);
1531 }
1532
1533 let fanin_node = Node {
1535 id: fanin_id.clone(),
1536 kind: NodeKind::FanIn,
1537 label: "FanIn".to_string(),
1538 description: Some(format!("Parallel join ({:?})", strategy)),
1539 input_type: type_name_of::<Out>(),
1540 output_type: type_name_of::<Out>(),
1541 resource_type: type_name_of::<Res>(),
1542 metadata: Default::default(),
1543 bus_capability: None,
1544 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1545 position: None,
1546 compensation_node_id: None,
1547 input_schema: None,
1548 output_schema: None,
1549 };
1550
1551 schematic.nodes.push(fanin_node);
1552 for branch_id in &branch_node_ids {
1553 schematic.edges.push(Edge {
1554 from: branch_id.clone(),
1555 to: fanin_id.clone(),
1556 kind: EdgeType::Parallel,
1557 label: Some("Join".to_string()),
1558 });
1559 }
1560
1561 let fanout_node_id = fanout_id.clone();
1563 let fanin_node_id = fanin_id.clone();
1564 let branch_ids_for_exec = branch_node_ids.clone();
1565 let step_idx = schematic.nodes.len() as u64 - 1;
1566
1567 let next_executor: Executor<In, Out, E, Res> = Arc::new(
1568 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Out, E>> {
1569 let prev = prev_executor.clone();
1570 let branches = transitions.clone();
1571 let fanout_id = fanout_node_id.clone();
1572 let fanin_id = fanin_node_id.clone();
1573 let branch_ids = branch_ids_for_exec.clone();
1574
1575 Box::pin(async move {
1576 let prev_result = prev(input, res, bus).await;
1578 let state = match prev_result {
1579 Outcome::Next(t) => t,
1580 other => return other.map(|_| unreachable!()),
1581 };
1582
1583 let fanout_enter_ts = now_ms();
1585 if let Some(timeline) = bus.read_mut::<Timeline>() {
1586 timeline.push(TimelineEvent::NodeEnter {
1587 node_id: fanout_id.clone(),
1588 node_label: "FanOut".to_string(),
1589 timestamp: fanout_enter_ts,
1590 });
1591 }
1592
1593 let futs: Vec<_> = branches
1597 .iter()
1598 .enumerate()
1599 .map(|(i, trans)| {
1600 let branch_state = state.clone();
1601 let branch_node_id = branch_ids[i].clone();
1602 let trans = trans.clone();
1603
1604 async move {
1605 let mut branch_bus = Bus::new();
1606 let label = trans.label();
1607 let bus_policy = trans.bus_access_policy();
1608
1609 branch_bus.set_access_policy(label.clone(), bus_policy);
1610 let result = trans.run(branch_state, res, &mut branch_bus).await;
1611 branch_bus.clear_access_policy();
1612
1613 (i, branch_node_id, label, result)
1614 }
1615 })
1616 .collect();
1617
1618 let results: Vec<(usize, String, String, Outcome<Out, E>)> =
1620 futures_util::future::join_all(futs).await;
1621
1622 for (_, branch_node_id, branch_label, outcome) in &results {
1624 if let Some(timeline) = bus.read_mut::<Timeline>() {
1625 let ts = now_ms();
1626 timeline.push(TimelineEvent::NodeEnter {
1627 node_id: branch_node_id.clone(),
1628 node_label: branch_label.clone(),
1629 timestamp: ts,
1630 });
1631 timeline.push(TimelineEvent::NodeExit {
1632 node_id: branch_node_id.clone(),
1633 outcome_type: outcome_type_name(outcome),
1634 duration_ms: 0,
1635 timestamp: ts,
1636 });
1637 }
1638 }
1639
1640 if let Some(timeline) = bus.read_mut::<Timeline>() {
1642 timeline.push(TimelineEvent::NodeExit {
1643 node_id: fanout_id.clone(),
1644 outcome_type: "Next".to_string(),
1645 duration_ms: 0,
1646 timestamp: now_ms(),
1647 });
1648 }
1649
1650 let combined = match strategy {
1652 ParallelStrategy::AllMustSucceed => {
1653 let mut first_fault = None;
1654 let mut first_success = None;
1655
1656 for (_, _, _, outcome) in results {
1657 match outcome {
1658 Outcome::Fault(e) => {
1659 if first_fault.is_none() {
1660 first_fault = Some(Outcome::Fault(e));
1661 }
1662 }
1663 Outcome::Next(val) => {
1664 if first_success.is_none() {
1665 first_success = Some(Outcome::Next(val));
1666 }
1667 }
1668 other => {
1669 if first_fault.is_none() {
1672 first_fault = Some(other);
1673 }
1674 }
1675 }
1676 }
1677
1678 if let Some(fault) = first_fault {
1679 fault
1680 } else {
1681 first_success.unwrap_or_else(|| {
1682 Outcome::emit("execution.parallel.no_results", None)
1683 })
1684 }
1685 }
1686 ParallelStrategy::AnyCanFail => {
1687 let mut first_success = None;
1688 let mut first_fault = None;
1689
1690 for (_, _, _, outcome) in results {
1691 match outcome {
1692 Outcome::Next(val) => {
1693 if first_success.is_none() {
1694 first_success = Some(Outcome::Next(val));
1695 }
1696 }
1697 Outcome::Fault(e) => {
1698 if first_fault.is_none() {
1699 first_fault = Some(Outcome::Fault(e));
1700 }
1701 }
1702 _ => {}
1703 }
1704 }
1705
1706 first_success.unwrap_or_else(|| {
1707 first_fault.unwrap_or_else(|| {
1708 Outcome::emit("execution.parallel.no_results", None)
1709 })
1710 })
1711 }
1712 };
1713
1714 let fanin_enter_ts = now_ms();
1716 if let Some(timeline) = bus.read_mut::<Timeline>() {
1717 timeline.push(TimelineEvent::NodeEnter {
1718 node_id: fanin_id.clone(),
1719 node_label: "FanIn".to_string(),
1720 timestamp: fanin_enter_ts,
1721 });
1722 timeline.push(TimelineEvent::NodeExit {
1723 node_id: fanin_id.clone(),
1724 outcome_type: outcome_type_name(&combined),
1725 duration_ms: 0,
1726 timestamp: fanin_enter_ts,
1727 });
1728 }
1729
1730 if let Some(handle) = bus.read::<PersistenceHandle>() {
1732 let trace_id = persistence_trace_id(bus);
1733 let circuit = bus
1734 .read::<ranvier_core::schematic::Schematic>()
1735 .map(|s| s.name.clone())
1736 .unwrap_or_default();
1737 let version = bus
1738 .read::<ranvier_core::schematic::Schematic>()
1739 .map(|s| s.schema_version.clone())
1740 .unwrap_or_default();
1741
1742 persist_execution_event(
1743 handle,
1744 &trace_id,
1745 &circuit,
1746 &version,
1747 step_idx,
1748 Some(fanin_id.clone()),
1749 outcome_kind_name(&combined),
1750 Some(combined.to_json_value()),
1751 )
1752 .await;
1753 }
1754
1755 combined
1756 })
1757 },
1758 );
1759
1760 Axon {
1761 schematic,
1762 executor: next_executor,
1763 execution_mode,
1764 persistence_store,
1765 audit_sink,
1766 dlq_sink,
1767 dlq_policy,
1768 dynamic_dlq_policy,
1769 saga_policy,
1770 dynamic_saga_policy,
1771 saga_compensation_registry,
1772 iam_handle,
1773 }
1774 }
1775
1776 pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
1778 if let ExecutionMode::Singleton {
1779 lock_key,
1780 ttl_ms,
1781 lock_provider,
1782 } = &self.execution_mode
1783 {
1784 let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
1785 let _enter = trace_span.enter();
1786 match lock_provider.try_acquire(lock_key, *ttl_ms).await {
1787 Ok(true) => {
1788 tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
1789 }
1790 Ok(false) => {
1791 tracing::debug!(
1792 "Singleton lock {} already held, aborting execution.",
1793 lock_key
1794 );
1795 return Outcome::emit(
1797 "execution.skipped.lock_held",
1798 Some(serde_json::json!({
1799 "lock_key": lock_key
1800 })),
1801 );
1802 }
1803 Err(e) => {
1804 tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
1805 return Outcome::emit(
1806 "execution.skipped.lock_error",
1807 Some(serde_json::json!({
1808 "error": e.to_string()
1809 })),
1810 );
1811 }
1812 }
1813 }
1814
1815 if let Some(iam) = &self.iam_handle {
1817 use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
1818
1819 if matches!(iam.policy, IamPolicy::None) {
1820 } else {
1822 let token = bus.read::<IamToken>().map(|t| t.0.clone());
1823
1824 match token {
1825 Some(raw_token) => {
1826 match iam.verifier.verify(&raw_token).await {
1827 Ok(identity) => {
1828 if let Err(e) = enforce_policy(&iam.policy, &identity) {
1829 tracing::warn!(
1830 policy = ?iam.policy,
1831 subject = %identity.subject,
1832 "IAM policy enforcement failed: {}",
1833 e
1834 );
1835 return Outcome::emit(
1836 "iam.policy_denied",
1837 Some(serde_json::json!({
1838 "error": e.to_string(),
1839 "subject": identity.subject,
1840 })),
1841 );
1842 }
1843 bus.insert(identity);
1845 }
1846 Err(e) => {
1847 tracing::warn!("IAM token verification failed: {}", e);
1848 return Outcome::emit(
1849 "iam.verification_failed",
1850 Some(serde_json::json!({
1851 "error": e.to_string()
1852 })),
1853 );
1854 }
1855 }
1856 }
1857 None => {
1858 tracing::warn!("IAM policy requires token but none found in Bus");
1859 return Outcome::emit("iam.missing_token", None);
1860 }
1861 }
1862 }
1863 }
1864
1865 let trace_id = persistence_trace_id(bus);
1866 let label = self.schematic.name.clone();
1867
1868 if let Some(sink) = &self.dlq_sink {
1870 bus.insert(sink.clone());
1871 }
1872 let effective_dlq_policy = self
1874 .dynamic_dlq_policy
1875 .as_ref()
1876 .map(|d| d.current())
1877 .unwrap_or_else(|| self.dlq_policy.clone());
1878 bus.insert(effective_dlq_policy);
1879 bus.insert(self.schematic.clone());
1880 let effective_saga_policy = self
1881 .dynamic_saga_policy
1882 .as_ref()
1883 .map(|d| d.current())
1884 .unwrap_or_else(|| self.saga_policy.clone());
1885 bus.insert(effective_saga_policy.clone());
1886
1887 if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1889 bus.insert(SagaStack::new());
1890 }
1891
1892 let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1893 let compensation_handle = bus.read::<CompensationHandle>().cloned();
1894 let compensation_retry_policy = compensation_retry_policy(bus);
1895 let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1896 let version = self.schematic.schema_version.clone();
1897 let migration_registry = bus
1898 .read::<ranvier_core::schematic::MigrationRegistry>()
1899 .cloned();
1900
1901 let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1902 let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1903 load_persistence_version(handle, &trace_id).await;
1904
1905 if let Some(interv) = intervention {
1906 tracing::info!(
1907 trace_id = %trace_id,
1908 target_node = %interv.target_node,
1909 "Applying manual intervention command"
1910 );
1911
1912 if let Some(target_idx) = self
1914 .schematic
1915 .nodes
1916 .iter()
1917 .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1918 {
1919 tracing::info!(
1920 trace_id = %trace_id,
1921 target_node = %interv.target_node,
1922 target_step = target_idx,
1923 "Intervention: Jumping to target node"
1924 );
1925 start_step = target_idx as u64;
1926
1927 bus.insert(ManualJump {
1929 target_node: interv.target_node.clone(),
1930 payload_override: interv.payload_override.clone(),
1931 });
1932
1933 if let Some(sink) = self.audit_sink.as_ref() {
1935 let event = AuditEvent::new(
1936 uuid::Uuid::new_v4().to_string(),
1937 "System".to_string(),
1938 "ApplyIntervention".to_string(),
1939 trace_id.to_string(),
1940 )
1941 .with_metadata("target_node", interv.target_node.clone())
1942 .with_metadata("target_step", target_idx);
1943
1944 let _ = sink.append(&event).await;
1945 }
1946 } else {
1947 tracing::warn!(
1948 trace_id = %trace_id,
1949 target_node = %interv.target_node,
1950 "Intervention target node not found in schematic; ignoring jump"
1951 );
1952 }
1953 }
1954
1955 if let Some(old_version) = trace_version
1956 && old_version != version
1957 {
1958 tracing::info!(
1959 trace_id = %trace_id,
1960 old_version = %old_version,
1961 current_version = %version,
1962 "Version mismatch detected during resumption"
1963 );
1964
1965 let migration_path = migration_registry
1967 .as_ref()
1968 .and_then(|r| r.find_migration_path(&old_version, &version));
1969
1970 let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1971 if path.is_empty() {
1972 (None, last_payload.clone())
1973 } else {
1974 let mut payload = last_payload.clone();
1976 for hop in &path {
1977 if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1978 {
1979 match mapper.map_state(p) {
1980 Ok(mapped) => payload = Some(mapped),
1981 Err(e) => {
1982 tracing::error!(
1983 trace_id = %trace_id,
1984 from = %hop.from_version,
1985 to = %hop.to_version,
1986 error = %e,
1987 "Payload migration mapper failed"
1988 );
1989 return Outcome::emit(
1990 "execution.resumption.payload_migration_failed",
1991 Some(serde_json::json!({
1992 "trace_id": trace_id,
1993 "from": hop.from_version,
1994 "to": hop.to_version,
1995 "error": e.to_string()
1996 })),
1997 );
1998 }
1999 }
2000 }
2001 }
2002 let hops: Vec<String> = path
2003 .iter()
2004 .map(|h| format!("{}->{}", h.from_version, h.to_version))
2005 .collect();
2006 tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
2007 (path.last().copied(), payload)
2008 }
2009 } else {
2010 (None, last_payload.clone())
2011 };
2012
2013 let migration = final_migration.or_else(|| {
2015 migration_registry
2016 .as_ref()
2017 .and_then(|r| r.find_migration(&old_version, &version))
2018 });
2019
2020 if mapped_payload.is_some() {
2022 last_payload = mapped_payload;
2023 }
2024
2025 let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
2026 {
2027 m.node_mapping
2028 .get(node_id)
2029 .cloned()
2030 .unwrap_or(m.default_strategy.clone())
2031 } else {
2032 migration
2033 .map(|m| m.default_strategy.clone())
2034 .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
2035 };
2036
2037 match strategy {
2038 ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
2039 tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
2040 start_step = 0;
2041 }
2042 ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
2043 new_node_id,
2044 ..
2045 } => {
2046 tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
2047 if let Some(target_idx) = self
2048 .schematic
2049 .nodes
2050 .iter()
2051 .position(|n| n.id == new_node_id || n.label == new_node_id)
2052 {
2053 start_step = target_idx as u64;
2054 } else {
2055 tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
2056 return Outcome::emit(
2057 "execution.resumption.migration_target_not_found",
2058 Some(serde_json::json!({ "node_id": new_node_id })),
2059 );
2060 }
2061 }
2062 ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
2063 tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
2064 if let Some(target_idx) = self
2065 .schematic
2066 .nodes
2067 .iter()
2068 .position(|n| n.id == node_id || n.label == node_id)
2069 {
2070 start_step = target_idx as u64;
2071 } else {
2072 tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
2073 return Outcome::emit(
2074 "execution.resumption.migration_target_not_found",
2075 Some(serde_json::json!({ "node_id": node_id })),
2076 );
2077 }
2078 }
2079 ranvier_core::schematic::MigrationStrategy::Fail => {
2080 tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
2081 return Outcome::emit(
2082 "execution.resumption.version_mismatch_failed",
2083 Some(serde_json::json!({
2084 "trace_id": trace_id,
2085 "old_version": old_version,
2086 "current_version": version
2087 })),
2088 );
2089 }
2090 _ => {
2091 tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
2092 return Outcome::emit(
2093 "execution.resumption.unsupported_migration",
2094 Some(serde_json::json!({
2095 "trace_id": trace_id,
2096 "strategy": format!("{:?}", strategy)
2097 })),
2098 );
2099 }
2100 }
2101 }
2102
2103 let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
2104 persist_execution_event(
2105 handle,
2106 &trace_id,
2107 &label,
2108 &version,
2109 start_step,
2110 ingress_node_id,
2111 "Enter",
2112 None,
2113 )
2114 .await;
2115
2116 bus.insert(StartStep(start_step));
2117 if start_step > 0 {
2118 bus.insert(ResumptionState {
2119 payload: last_payload,
2120 });
2121 }
2122
2123 Some(start_step)
2124 } else {
2125 None
2126 };
2127
2128 let should_capture = should_attach_timeline(bus);
2129 let inserted_timeline = if should_capture {
2130 ensure_timeline(bus)
2131 } else {
2132 false
2133 };
2134 let ingress_started = std::time::Instant::now();
2135 let ingress_enter_ts = now_ms();
2136 if should_capture
2137 && let (Some(timeline), Some(ingress)) =
2138 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
2139 {
2140 timeline.push(TimelineEvent::NodeEnter {
2141 node_id: ingress.id.clone(),
2142 node_label: ingress.label.clone(),
2143 timestamp: ingress_enter_ts,
2144 });
2145 }
2146
2147 let circuit_span = tracing::info_span!(
2148 "Circuit",
2149 ranvier.circuit = %label,
2150 ranvier.outcome_kind = tracing::field::Empty,
2151 ranvier.outcome_target = tracing::field::Empty
2152 );
2153 let outcome = (self.executor)(input, resources, bus)
2154 .instrument(circuit_span.clone())
2155 .await;
2156 circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
2157 if let Some(target) = outcome_target(&outcome) {
2158 circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
2159 }
2160
2161 if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
2163 while let Some(task) = {
2164 let mut stack = bus.read_mut::<SagaStack>();
2165 stack.as_mut().and_then(|s| s.pop())
2166 } {
2167 tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
2168
2169 let handler = {
2170 let registry = self.saga_compensation_registry.read().expect("saga compensation registry lock poisoned");
2171 registry.get(&task.node_id)
2172 };
2173 if let Some(handler) = handler {
2174 let res = handler(task.input_snapshot, resources, bus).await;
2175 if let Outcome::Fault(e) = res {
2176 tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
2177 }
2178 } else {
2179 tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
2180 }
2181 }
2182 tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
2183 }
2184
2185 let ingress_exit_ts = now_ms();
2186 if should_capture
2187 && let (Some(timeline), Some(ingress)) =
2188 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
2189 {
2190 timeline.push(TimelineEvent::NodeExit {
2191 node_id: ingress.id.clone(),
2192 outcome_type: outcome_type_name(&outcome),
2193 duration_ms: ingress_started.elapsed().as_millis() as u64,
2194 timestamp: ingress_exit_ts,
2195 });
2196 }
2197
2198 if let Some(handle) = persistence_handle.as_ref() {
2199 let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
2200 persist_execution_event(
2201 handle,
2202 &trace_id,
2203 &label,
2204 &version,
2205 fault_step,
2206 None, outcome_kind_name(&outcome),
2208 Some(outcome.to_json_value()),
2209 )
2210 .await;
2211
2212 let mut completion = completion_from_outcome(&outcome);
2213 if matches!(outcome, Outcome::Fault(_))
2214 && let Some(compensation) = compensation_handle.as_ref()
2215 && compensation_auto_trigger(bus)
2216 {
2217 let context = CompensationContext {
2218 trace_id: trace_id.clone(),
2219 circuit: label.clone(),
2220 fault_kind: outcome_kind_name(&outcome).to_string(),
2221 fault_step,
2222 timestamp_ms: now_ms(),
2223 };
2224
2225 if run_compensation(
2226 compensation,
2227 context,
2228 compensation_retry_policy,
2229 compensation_idempotency.clone(),
2230 )
2231 .await
2232 {
2233 persist_execution_event(
2234 handle,
2235 &trace_id,
2236 &label,
2237 &version,
2238 fault_step.saturating_add(1),
2239 None,
2240 "Compensated",
2241 None,
2242 )
2243 .await;
2244 completion = CompletionState::Compensated;
2245 }
2246 }
2247
2248 if persistence_auto_complete(bus) {
2249 persist_completion(handle, &trace_id, completion).await;
2250 }
2251 }
2252
2253 if should_capture {
2254 maybe_export_timeline(bus, &outcome);
2255 }
2256 if inserted_timeline {
2257 let _ = bus.remove::<Timeline>();
2258 }
2259
2260 outcome
2261 }
2262
2263 pub fn serve_inspector(self, port: u16) -> Self {
2266 if !inspector_dev_mode_from_env() {
2267 tracing::info!("Inspector disabled because RANVIER_MODE is production");
2268 return self;
2269 }
2270 if !inspector_enabled_from_env() {
2271 tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
2272 return self;
2273 }
2274
2275 let schematic = self.schematic.clone();
2276 let axon_inspector = Arc::new(self.clone());
2277 tokio::spawn(async move {
2278 if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
2279 .with_projection_files_from_env()
2280 .with_mode_from_env()
2281 .with_auth_policy_from_env()
2282 .with_state_inspector(axon_inspector)
2283 .serve()
2284 .await
2285 {
2286 tracing::error!("Inspector server failed: {}", e);
2287 }
2288 });
2289 self
2290 }
2291
2292 pub fn schematic(&self) -> &Schematic {
2294 &self.schematic
2295 }
2296
2297 pub fn into_schematic(self) -> Schematic {
2299 self.schematic
2300 }
2301
2302 pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
2313 schematic_export_request_from_process()
2314 }
2315
2316 pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
2328 self.maybe_export_and_exit_with(|_| ())
2329 }
2330
2331 pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
2336 where
2337 F: FnOnce(&SchematicExportRequest),
2338 {
2339 let Some(request) = self.schematic_export_request() else {
2340 return Ok(false);
2341 };
2342 on_before_exit(&request);
2343 self.export_schematic(&request)?;
2344 Ok(true)
2345 }
2346
2347 pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
2349 let json = serde_json::to_string_pretty(self.schematic())?;
2350 if let Some(path) = &request.output {
2351 if let Some(parent) = path.parent()
2352 && !parent.as_os_str().is_empty()
2353 {
2354 fs::create_dir_all(parent)?;
2355 }
2356 fs::write(path, json.as_bytes())?;
2357 return Ok(());
2358 }
2359 println!("{}", json);
2360 Ok(())
2361 }
2362}
2363
2364fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
2365 let args: Vec<OsString> = std::env::args_os().skip(1).collect();
2366 let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
2367 let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
2368
2369 let mut i = 0;
2370 while i < args.len() {
2371 let arg = args[i].to_string_lossy();
2372
2373 if arg == "--schematic" {
2374 enabled = true;
2375 i += 1;
2376 continue;
2377 }
2378
2379 if arg == "--schematic-output" || arg == "--output" {
2380 if let Some(next) = args.get(i + 1) {
2381 output = Some(PathBuf::from(next));
2382 i += 2;
2383 continue;
2384 }
2385 } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
2386 output = Some(PathBuf::from(value));
2387 } else if let Some(value) = arg.strip_prefix("--output=") {
2388 output = Some(PathBuf::from(value));
2389 }
2390
2391 i += 1;
2392 }
2393
2394 if enabled {
2395 Some(SchematicExportRequest { output })
2396 } else {
2397 None
2398 }
2399}
2400
2401fn env_flag_is_true(key: &str) -> bool {
2402 match std::env::var(key) {
2403 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2404 Err(_) => false,
2405 }
2406}
2407
2408fn inspector_enabled_from_env() -> bool {
2409 let raw = std::env::var("RANVIER_INSPECTOR").ok();
2410 inspector_enabled_from_value(raw.as_deref())
2411}
2412
2413fn inspector_enabled_from_value(value: Option<&str>) -> bool {
2414 match value {
2415 Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2416 None => true,
2417 }
2418}
2419
2420fn inspector_dev_mode_from_env() -> bool {
2421 let raw = std::env::var("RANVIER_MODE").ok();
2422 inspector_dev_mode_from_value(raw.as_deref())
2423}
2424
2425fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
2426 !matches!(
2427 value.map(|v| v.to_ascii_lowercase()),
2428 Some(mode) if mode == "prod" || mode == "production"
2429 )
2430}
2431
2432fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
2433 let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
2434 Ok(v) if !v.trim().is_empty() => v,
2435 _ => return,
2436 };
2437
2438 let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
2439 let policy = timeline_adaptive_policy();
2440 let forced = should_force_export(outcome, &policy);
2441 let should_export = sampled || forced;
2442 if !should_export {
2443 record_sampling_stats(false, sampled, forced, "none", &policy);
2444 return;
2445 }
2446
2447 let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
2448 timeline.sort();
2449
2450 let mode = std::env::var("RANVIER_TIMELINE_MODE")
2451 .unwrap_or_else(|_| "overwrite".to_string())
2452 .to_ascii_lowercase();
2453
2454 if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
2455 tracing::warn!(
2456 "Failed to persist timeline file {} (mode={}): {}",
2457 path,
2458 mode,
2459 err
2460 );
2461 record_sampling_stats(false, sampled, forced, &mode, &policy);
2462 } else {
2463 record_sampling_stats(true, sampled, forced, &mode, &policy);
2464 }
2465}
2466
2467fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
2468 match outcome {
2469 Outcome::Next(_) => "Next".to_string(),
2470 Outcome::Branch(id, _) => format!("Branch:{}", id),
2471 Outcome::Jump(id, _) => format!("Jump:{}", id),
2472 Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
2473 Outcome::Fault(_) => "Fault".to_string(),
2474 }
2475}
2476
2477fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
2478 match outcome {
2479 Outcome::Next(_) => "Next",
2480 Outcome::Branch(_, _) => "Branch",
2481 Outcome::Jump(_, _) => "Jump",
2482 Outcome::Emit(_, _) => "Emit",
2483 Outcome::Fault(_) => "Fault",
2484 }
2485}
2486
2487fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
2488 match outcome {
2489 Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
2490 Outcome::Jump(node_id, _) => Some(node_id.to_string()),
2491 Outcome::Emit(event_type, _) => Some(event_type.clone()),
2492 Outcome::Next(_) | Outcome::Fault(_) => None,
2493 }
2494}
2495
2496fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
2497 match outcome {
2498 Outcome::Fault(_) => CompletionState::Fault,
2499 _ => CompletionState::Success,
2500 }
2501}
2502
2503fn persistence_trace_id(bus: &Bus) -> String {
2504 if let Some(explicit) = bus.read::<PersistenceTraceId>() {
2505 explicit.0.clone()
2506 } else {
2507 format!("{}:{}", bus.id, now_ms())
2508 }
2509}
2510
2511fn persistence_auto_complete(bus: &Bus) -> bool {
2512 bus.read::<PersistenceAutoComplete>()
2513 .map(|v| v.0)
2514 .unwrap_or(true)
2515}
2516
2517fn compensation_auto_trigger(bus: &Bus) -> bool {
2518 bus.read::<CompensationAutoTrigger>()
2519 .map(|v| v.0)
2520 .unwrap_or(true)
2521}
2522
2523fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
2524 bus.read::<CompensationRetryPolicy>()
2525 .copied()
2526 .unwrap_or_default()
2527}
2528
2529fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
2535 payload.map(|p| {
2536 p.get("Next")
2537 .or_else(|| p.get("Branch"))
2538 .or_else(|| p.get("Jump"))
2539 .cloned()
2540 .unwrap_or_else(|| p.clone())
2541 })
2542}
2543
2544async fn load_persistence_version(
2545 handle: &PersistenceHandle,
2546 trace_id: &str,
2547) -> (
2548 u64,
2549 Option<String>,
2550 Option<crate::persistence::Intervention>,
2551 Option<String>,
2552 Option<serde_json::Value>,
2553) {
2554 let store = handle.store();
2555 match store.load(trace_id).await {
2556 Ok(Some(trace)) => {
2557 let (next_step, last_node_id, last_payload) =
2558 if let Some(resume_from_step) = trace.resumed_from_step {
2559 let anchor_event = trace
2560 .events
2561 .iter()
2562 .rev()
2563 .find(|event| {
2564 event.step <= resume_from_step
2565 && event.outcome_kind == "Next"
2566 && event.payload.is_some()
2567 })
2568 .or_else(|| {
2569 trace.events.iter().rev().find(|event| {
2570 event.step <= resume_from_step
2571 && event.outcome_kind != "Fault"
2572 && event.payload.is_some()
2573 })
2574 })
2575 .or_else(|| {
2576 trace.events.iter().rev().find(|event| {
2577 event.step <= resume_from_step && event.payload.is_some()
2578 })
2579 })
2580 .or_else(|| trace.events.last());
2581
2582 (
2583 resume_from_step.saturating_add(1),
2584 anchor_event.and_then(|event| event.node_id.clone()),
2585 anchor_event.and_then(|event| {
2586 unwrap_outcome_payload(event.payload.as_ref())
2587 }),
2588 )
2589 } else {
2590 let last_event = trace.events.last();
2591 (
2592 last_event
2593 .map(|event| event.step.saturating_add(1))
2594 .unwrap_or(0),
2595 last_event.and_then(|event| event.node_id.clone()),
2596 last_event.and_then(|event| {
2597 unwrap_outcome_payload(event.payload.as_ref())
2598 }),
2599 )
2600 };
2601
2602 (
2603 next_step,
2604 Some(trace.schematic_version),
2605 trace.interventions.last().cloned(),
2606 last_node_id,
2607 last_payload,
2608 )
2609 }
2610 Ok(None) => (0, None, None, None, None),
2611 Err(err) => {
2612 tracing::warn!(
2613 trace_id = %trace_id,
2614 error = %err,
2615 "Failed to load persistence trace; falling back to step=0"
2616 );
2617 (0, None, None, None, None)
2618 }
2619 }
2620}
2621
2622#[allow(clippy::too_many_arguments)]
2623async fn run_this_step<In, Out, E, Res>(
2624 trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
2625 state: In,
2626 res: &Res,
2627 bus: &mut Bus,
2628 node_id: &str,
2629 node_label: &str,
2630 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2631 step_idx: u64,
2632) -> Outcome<Out, E>
2633where
2634 In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2635 Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2636 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2637 Res: ranvier_core::transition::ResourceRequirement,
2638{
2639 let label = trans.label();
2640 let res_type = std::any::type_name::<Res>()
2641 .split("::")
2642 .last()
2643 .unwrap_or("unknown");
2644
2645 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2647 debug.should_pause(node_id)
2648 } else {
2649 false
2650 };
2651
2652 if should_pause {
2653 let trace_id = persistence_trace_id(bus);
2654 tracing::event!(
2655 target: "ranvier.debugger",
2656 tracing::Level::INFO,
2657 trace_id = %trace_id,
2658 node_id = %node_id,
2659 "Node paused"
2660 );
2661
2662 if let Some(timeline) = bus.read_mut::<Timeline>() {
2663 timeline.push(TimelineEvent::NodePaused {
2664 node_id: node_id.to_string(),
2665 timestamp: now_ms(),
2666 });
2667 }
2668 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2669 debug.wait().await;
2670 }
2671 }
2672
2673 let enter_ts = now_ms();
2674 if let Some(timeline) = bus.read_mut::<Timeline>() {
2675 timeline.push(TimelineEvent::NodeEnter {
2676 node_id: node_id.to_string(),
2677 node_label: node_label.to_string(),
2678 timestamp: enter_ts,
2679 });
2680 }
2681
2682 let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
2684 if let DlqPolicy::RetryThenDlq {
2685 max_attempts,
2686 backoff_ms,
2687 } = p
2688 {
2689 Some((*max_attempts, *backoff_ms))
2690 } else {
2691 None
2692 }
2693 });
2694 let retry_state_snapshot = if dlq_retry_config.is_some() {
2695 serde_json::to_value(&state).ok()
2696 } else {
2697 None
2698 };
2699
2700 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2702 Some(serde_json::to_vec(&state).unwrap_or_default())
2703 } else {
2704 None
2705 };
2706
2707 let node_span = tracing::info_span!(
2708 "Node",
2709 ranvier.node = %label,
2710 ranvier.resource_type = %res_type,
2711 ranvier.outcome_kind = tracing::field::Empty,
2712 ranvier.outcome_target = tracing::field::Empty
2713 );
2714 let started = std::time::Instant::now();
2715 bus.set_access_policy(label.clone(), bus_policy.clone());
2716 let result = trans
2717 .run(state, res, bus)
2718 .instrument(node_span.clone())
2719 .await;
2720 bus.clear_access_policy();
2721
2722 let result = if let Outcome::Fault(_) = &result {
2725 if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
2726 (dlq_retry_config, &retry_state_snapshot)
2727 {
2728 let mut final_result = result;
2729 for attempt in 2..=max_attempts {
2731 let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
2732
2733 tracing::info!(
2734 ranvier.node = %label,
2735 attempt = attempt,
2736 max_attempts = max_attempts,
2737 backoff_ms = delay,
2738 "Retrying faulted node"
2739 );
2740
2741 if let Some(timeline) = bus.read_mut::<Timeline>() {
2742 timeline.push(TimelineEvent::NodeRetry {
2743 node_id: node_id.to_string(),
2744 attempt,
2745 max_attempts,
2746 backoff_ms: delay,
2747 timestamp: now_ms(),
2748 });
2749 }
2750
2751 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
2752
2753 if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
2754 bus.set_access_policy(label.clone(), bus_policy.clone());
2755 let retry_result = trans
2756 .run(retry_state, res, bus)
2757 .instrument(tracing::info_span!(
2758 "NodeRetry",
2759 ranvier.node = %label,
2760 attempt = attempt
2761 ))
2762 .await;
2763 bus.clear_access_policy();
2764
2765 match &retry_result {
2766 Outcome::Fault(_) => {
2767 final_result = retry_result;
2768 }
2769 _ => {
2770 final_result = retry_result;
2771 break;
2772 }
2773 }
2774 }
2775 }
2776 final_result
2777 } else {
2778 result
2779 }
2780 } else {
2781 result
2782 };
2783
2784 node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
2785 if let Some(target) = outcome_target(&result) {
2786 node_span.record("ranvier.outcome_target", tracing::field::display(&target));
2787 }
2788
2789 if let Outcome::Fault(ref err) = result {
2791 let pipeline_name = bus
2792 .read::<ranvier_core::schematic::Schematic>()
2793 .map(|s| s.name.clone())
2794 .unwrap_or_default();
2795 let ctx = ranvier_core::error::TransitionErrorContext {
2796 pipeline_name: pipeline_name.clone(),
2797 transition_name: label.clone(),
2798 step_index: step_idx as usize,
2799 };
2800 tracing::error!(
2801 pipeline = %pipeline_name,
2802 transition = %label,
2803 step = step_idx,
2804 error = ?err,
2805 "Transition fault"
2806 );
2807 bus.insert(ctx);
2808 }
2809
2810 let duration_ms = started.elapsed().as_millis() as u64;
2811 let exit_ts = now_ms();
2812
2813 if let Some(timeline) = bus.read_mut::<Timeline>() {
2814 timeline.push(TimelineEvent::NodeExit {
2815 node_id: node_id.to_string(),
2816 outcome_type: outcome_type_name(&result),
2817 duration_ms,
2818 timestamp: exit_ts,
2819 });
2820
2821 if let Outcome::Branch(branch_id, _) = &result {
2822 timeline.push(TimelineEvent::Branchtaken {
2823 branch_id: branch_id.clone(),
2824 timestamp: exit_ts,
2825 });
2826 }
2827 }
2828
2829 if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
2831 && let Some(stack) = bus.read_mut::<SagaStack>()
2832 {
2833 stack.push(node_id.to_string(), label.clone(), snapshot);
2834 }
2835
2836 if let Some(handle) = bus.read::<PersistenceHandle>() {
2837 let trace_id = persistence_trace_id(bus);
2838 let circuit = bus
2839 .read::<ranvier_core::schematic::Schematic>()
2840 .map(|s| s.name.clone())
2841 .unwrap_or_default();
2842 let version = bus
2843 .read::<ranvier_core::schematic::Schematic>()
2844 .map(|s| s.schema_version.clone())
2845 .unwrap_or_default();
2846
2847 persist_execution_event(
2848 handle,
2849 &trace_id,
2850 &circuit,
2851 &version,
2852 step_idx,
2853 Some(node_id.to_string()),
2854 outcome_kind_name(&result),
2855 Some(result.to_json_value()),
2856 )
2857 .await;
2858 }
2859
2860 if let Outcome::Fault(f) = &result {
2863 let dlq_action = {
2865 let policy = bus.read::<DlqPolicy>();
2866 let sink = bus.read::<Arc<dyn DlqSink>>();
2867 match (sink, policy) {
2868 (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
2869 _ => None,
2870 }
2871 };
2872
2873 if let Some(sink) = dlq_action {
2874 if let Some((max_attempts, _)) = dlq_retry_config
2875 && let Some(timeline) = bus.read_mut::<Timeline>()
2876 {
2877 timeline.push(TimelineEvent::DlqExhausted {
2878 node_id: node_id.to_string(),
2879 total_attempts: max_attempts,
2880 timestamp: now_ms(),
2881 });
2882 }
2883
2884 let trace_id = persistence_trace_id(bus);
2885 let circuit = bus
2886 .read::<ranvier_core::schematic::Schematic>()
2887 .map(|s| s.name.clone())
2888 .unwrap_or_default();
2889 let _ = sink
2890 .store_dead_letter(
2891 &trace_id,
2892 &circuit,
2893 node_id,
2894 &format!("{:?}", f),
2895 &serde_json::to_vec(&f).unwrap_or_default(),
2896 )
2897 .await;
2898 }
2899 }
2900
2901 result
2902}
2903
2904#[allow(clippy::too_many_arguments)]
2905async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2906 trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2907 comp: &Comp,
2908 state: Out,
2909 res: &Res,
2910 bus: &mut Bus,
2911 node_id: &str,
2912 comp_node_id: &str,
2913 node_label: &str,
2914 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2915 step_idx: u64,
2916) -> Outcome<Next, E>
2917where
2918 Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2919 Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2920 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2921 Res: ranvier_core::transition::ResourceRequirement,
2922 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2923{
2924 let label = trans.label();
2925
2926 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2928 debug.should_pause(node_id)
2929 } else {
2930 false
2931 };
2932
2933 if should_pause {
2934 let trace_id = persistence_trace_id(bus);
2935 tracing::event!(
2936 target: "ranvier.debugger",
2937 tracing::Level::INFO,
2938 trace_id = %trace_id,
2939 node_id = %node_id,
2940 "Node paused (compensated)"
2941 );
2942
2943 if let Some(timeline) = bus.read_mut::<Timeline>() {
2944 timeline.push(TimelineEvent::NodePaused {
2945 node_id: node_id.to_string(),
2946 timestamp: now_ms(),
2947 });
2948 }
2949 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2950 debug.wait().await;
2951 }
2952 }
2953
2954 let enter_ts = now_ms();
2955 if let Some(timeline) = bus.read_mut::<Timeline>() {
2956 timeline.push(TimelineEvent::NodeEnter {
2957 node_id: node_id.to_string(),
2958 node_label: node_label.to_string(),
2959 timestamp: enter_ts,
2960 });
2961 }
2962
2963 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2965 Some(serde_json::to_vec(&state).unwrap_or_default())
2966 } else {
2967 None
2968 };
2969
2970 let node_span = tracing::info_span!("Node", ranvier.node = %label);
2971 bus.set_access_policy(label.clone(), bus_policy.clone());
2972 let result = trans
2973 .run(state.clone(), res, bus)
2974 .instrument(node_span)
2975 .await;
2976 bus.clear_access_policy();
2977
2978 let duration_ms = 0; let exit_ts = now_ms();
2980
2981 if let Some(timeline) = bus.read_mut::<Timeline>() {
2982 timeline.push(TimelineEvent::NodeExit {
2983 node_id: node_id.to_string(),
2984 outcome_type: outcome_type_name(&result),
2985 duration_ms,
2986 timestamp: exit_ts,
2987 });
2988 }
2989
2990 if let Outcome::Fault(ref err) = result {
2992 if compensation_auto_trigger(bus) {
2993 tracing::info!(
2994 ranvier.node = %label,
2995 ranvier.compensation.trigger = "saga",
2996 error = ?err,
2997 "Saga compensation triggered"
2998 );
2999
3000 if let Some(timeline) = bus.read_mut::<Timeline>() {
3001 timeline.push(TimelineEvent::NodeEnter {
3002 node_id: comp_node_id.to_string(),
3003 node_label: format!("Compensate: {}", comp.label()),
3004 timestamp: exit_ts,
3005 });
3006 }
3007
3008 let _ = comp.run(state, res, bus).await;
3010
3011 if let Some(timeline) = bus.read_mut::<Timeline>() {
3012 timeline.push(TimelineEvent::NodeExit {
3013 node_id: comp_node_id.to_string(),
3014 outcome_type: "Compensated".to_string(),
3015 duration_ms: 0,
3016 timestamp: now_ms(),
3017 });
3018 }
3019
3020 if let Some(handle) = bus.read::<PersistenceHandle>() {
3021 let trace_id = persistence_trace_id(bus);
3022 let circuit = bus
3023 .read::<ranvier_core::schematic::Schematic>()
3024 .map(|s| s.name.clone())
3025 .unwrap_or_default();
3026 let version = bus
3027 .read::<ranvier_core::schematic::Schematic>()
3028 .map(|s| s.schema_version.clone())
3029 .unwrap_or_default();
3030
3031 persist_execution_event(
3032 handle,
3033 &trace_id,
3034 &circuit,
3035 &version,
3036 step_idx + 1, Some(comp_node_id.to_string()),
3038 "Compensated",
3039 None,
3040 )
3041 .await;
3042 }
3043 }
3044 } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
3045 if let Some(stack) = bus.read_mut::<SagaStack>() {
3047 stack.push(node_id.to_string(), label.clone(), snapshot);
3048 }
3049
3050 if let Some(handle) = bus.read::<PersistenceHandle>() {
3051 let trace_id = persistence_trace_id(bus);
3052 let circuit = bus
3053 .read::<ranvier_core::schematic::Schematic>()
3054 .map(|s| s.name.clone())
3055 .unwrap_or_default();
3056 let version = bus
3057 .read::<ranvier_core::schematic::Schematic>()
3058 .map(|s| s.schema_version.clone())
3059 .unwrap_or_default();
3060
3061 persist_execution_event(
3062 handle,
3063 &trace_id,
3064 &circuit,
3065 &version,
3066 step_idx,
3067 Some(node_id.to_string()),
3068 outcome_kind_name(&result),
3069 Some(result.to_json_value()),
3070 )
3071 .await;
3072 }
3073 }
3074
3075 if let Outcome::Fault(f) = &result
3077 && let (Some(sink), Some(policy)) =
3078 (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
3079 {
3080 let should_dlq = !matches!(policy, DlqPolicy::Drop);
3081 if should_dlq {
3082 let trace_id = persistence_trace_id(bus);
3083 let circuit = bus
3084 .read::<ranvier_core::schematic::Schematic>()
3085 .map(|s| s.name.clone())
3086 .unwrap_or_default();
3087 let _ = sink
3088 .store_dead_letter(
3089 &trace_id,
3090 &circuit,
3091 node_id,
3092 &format!("{:?}", f),
3093 &serde_json::to_vec(&f).unwrap_or_default(),
3094 )
3095 .await;
3096 }
3097 }
3098
3099 result
3100}
3101
3102#[allow(clippy::too_many_arguments)]
3103pub async fn persist_execution_event(
3104 handle: &PersistenceHandle,
3105 trace_id: &str,
3106 circuit: &str,
3107 schematic_version: &str,
3108 step: u64,
3109 node_id: Option<String>,
3110 outcome_kind: &str,
3111 payload: Option<serde_json::Value>,
3112) {
3113 let store = handle.store();
3114 let envelope = PersistenceEnvelope {
3115 trace_id: trace_id.to_string(),
3116 circuit: circuit.to_string(),
3117 schematic_version: schematic_version.to_string(),
3118 step,
3119 node_id,
3120 outcome_kind: outcome_kind.to_string(),
3121 timestamp_ms: now_ms(),
3122 payload_hash: None,
3123 payload,
3124 };
3125
3126 if let Err(err) = store.append(envelope).await {
3127 tracing::warn!(
3128 trace_id = %trace_id,
3129 circuit = %circuit,
3130 step,
3131 outcome_kind = %outcome_kind,
3132 error = %err,
3133 "Failed to append persistence envelope"
3134 );
3135 }
3136}
3137
3138async fn persist_completion(
3139 handle: &PersistenceHandle,
3140 trace_id: &str,
3141 completion: CompletionState,
3142) {
3143 let store = handle.store();
3144 if let Err(err) = store.complete(trace_id, completion).await {
3145 tracing::warn!(
3146 trace_id = %trace_id,
3147 error = %err,
3148 "Failed to complete persistence trace"
3149 );
3150 }
3151}
3152
3153fn compensation_idempotency_key(context: &CompensationContext) -> String {
3154 format!(
3155 "{}:{}:{}",
3156 context.trace_id, context.circuit, context.fault_kind
3157 )
3158}
3159
3160async fn run_compensation(
3161 handle: &CompensationHandle,
3162 context: CompensationContext,
3163 retry_policy: CompensationRetryPolicy,
3164 idempotency: Option<CompensationIdempotencyHandle>,
3165) -> bool {
3166 let hook = handle.hook();
3167 let key = compensation_idempotency_key(&context);
3168
3169 if let Some(store_handle) = idempotency.as_ref() {
3170 let store = store_handle.store();
3171 match store.was_compensated(&key).await {
3172 Ok(true) => {
3173 tracing::info!(
3174 trace_id = %context.trace_id,
3175 circuit = %context.circuit,
3176 key = %key,
3177 "Compensation already recorded; skipping duplicate hook execution"
3178 );
3179 return true;
3180 }
3181 Ok(false) => {}
3182 Err(err) => {
3183 tracing::warn!(
3184 trace_id = %context.trace_id,
3185 key = %key,
3186 error = %err,
3187 "Failed to check compensation idempotency state"
3188 );
3189 }
3190 }
3191 }
3192
3193 let max_attempts = retry_policy.max_attempts.max(1);
3194 for attempt in 1..=max_attempts {
3195 match hook.compensate(context.clone()).await {
3196 Ok(()) => {
3197 if let Some(store_handle) = idempotency.as_ref() {
3198 let store = store_handle.store();
3199 if let Err(err) = store.mark_compensated(&key).await {
3200 tracing::warn!(
3201 trace_id = %context.trace_id,
3202 key = %key,
3203 error = %err,
3204 "Failed to mark compensation idempotency state"
3205 );
3206 }
3207 }
3208 return true;
3209 }
3210 Err(err) => {
3211 let is_last = attempt == max_attempts;
3212 tracing::warn!(
3213 trace_id = %context.trace_id,
3214 circuit = %context.circuit,
3215 fault_kind = %context.fault_kind,
3216 fault_step = context.fault_step,
3217 attempt,
3218 max_attempts,
3219 error = %err,
3220 "Compensation hook attempt failed"
3221 );
3222 if !is_last && retry_policy.backoff_ms > 0 {
3223 tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
3224 .await;
3225 }
3226 }
3227 }
3228 }
3229 false
3230}
3231
3232fn ensure_timeline(bus: &mut Bus) -> bool {
3233 if bus.has::<Timeline>() {
3234 false
3235 } else {
3236 bus.insert(Timeline::new());
3237 true
3238 }
3239}
3240
3241fn should_attach_timeline(bus: &Bus) -> bool {
3242 if bus.has::<Timeline>() {
3244 return true;
3245 }
3246
3247 has_timeline_output_path()
3249}
3250
3251fn has_timeline_output_path() -> bool {
3252 std::env::var("RANVIER_TIMELINE_OUTPUT")
3253 .ok()
3254 .map(|v| !v.trim().is_empty())
3255 .unwrap_or(false)
3256}
3257
3258fn timeline_sample_rate() -> f64 {
3259 std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
3260 .ok()
3261 .and_then(|v| v.parse::<f64>().ok())
3262 .map(|v| v.clamp(0.0, 1.0))
3263 .unwrap_or(1.0)
3264}
3265
3266fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
3267 if rate <= 0.0 {
3268 return false;
3269 }
3270 if rate >= 1.0 {
3271 return true;
3272 }
3273 let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
3274 bucket < rate
3275}
3276
3277fn timeline_adaptive_policy() -> String {
3278 std::env::var("RANVIER_TIMELINE_ADAPTIVE")
3279 .unwrap_or_else(|_| "fault_branch".to_string())
3280 .to_ascii_lowercase()
3281}
3282
3283fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
3284 match policy {
3285 "off" => false,
3286 "fault_only" => matches!(outcome, Outcome::Fault(_)),
3287 "fault_branch_emit" => {
3288 matches!(
3289 outcome,
3290 Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
3291 )
3292 }
3293 _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
3294 }
3295}
3296
3297#[derive(Default, Clone)]
3298struct SamplingStats {
3299 total_decisions: u64,
3300 exported: u64,
3301 skipped: u64,
3302 sampled_exports: u64,
3303 forced_exports: u64,
3304 last_mode: String,
3305 last_policy: String,
3306 last_updated_ms: u64,
3307}
3308
3309static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
3310
3311fn stats_cell() -> &'static Mutex<SamplingStats> {
3312 TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
3313}
3314
3315fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
3316 let snapshot = {
3317 let mut stats = match stats_cell().lock() {
3318 Ok(guard) => guard,
3319 Err(_) => return,
3320 };
3321
3322 stats.total_decisions += 1;
3323 if exported {
3324 stats.exported += 1;
3325 } else {
3326 stats.skipped += 1;
3327 }
3328 if sampled && exported {
3329 stats.sampled_exports += 1;
3330 }
3331 if forced && exported {
3332 stats.forced_exports += 1;
3333 }
3334 stats.last_mode = mode.to_string();
3335 stats.last_policy = policy.to_string();
3336 stats.last_updated_ms = now_ms();
3337 stats.clone()
3338 };
3339
3340 tracing::debug!(
3341 ranvier.timeline.total_decisions = snapshot.total_decisions,
3342 ranvier.timeline.exported = snapshot.exported,
3343 ranvier.timeline.skipped = snapshot.skipped,
3344 ranvier.timeline.sampled_exports = snapshot.sampled_exports,
3345 ranvier.timeline.forced_exports = snapshot.forced_exports,
3346 ranvier.timeline.mode = %snapshot.last_mode,
3347 ranvier.timeline.policy = %snapshot.last_policy,
3348 "Timeline sampling stats updated"
3349 );
3350
3351 if let Some(path) = timeline_stats_output_path() {
3352 let payload = serde_json::json!({
3353 "total_decisions": snapshot.total_decisions,
3354 "exported": snapshot.exported,
3355 "skipped": snapshot.skipped,
3356 "sampled_exports": snapshot.sampled_exports,
3357 "forced_exports": snapshot.forced_exports,
3358 "last_mode": snapshot.last_mode,
3359 "last_policy": snapshot.last_policy,
3360 "last_updated_ms": snapshot.last_updated_ms
3361 });
3362 if let Some(parent) = Path::new(&path).parent() {
3363 let _ = fs::create_dir_all(parent);
3364 }
3365 if let Err(err) = fs::write(&path, payload.to_string()) {
3366 tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
3367 }
3368 }
3369}
3370
3371fn timeline_stats_output_path() -> Option<String> {
3372 std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
3373 .ok()
3374 .filter(|v| !v.trim().is_empty())
3375}
3376
3377fn write_timeline_with_policy(
3378 path: &str,
3379 mode: &str,
3380 mut timeline: Timeline,
3381) -> Result<(), String> {
3382 match mode {
3383 "append" => {
3384 if Path::new(path).exists() {
3385 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
3386 match serde_json::from_str::<Timeline>(&content) {
3387 Ok(mut existing) => {
3388 existing.events.append(&mut timeline.events);
3389 existing.sort();
3390 if let Some(max_events) = max_events_limit() {
3391 truncate_timeline_events(&mut existing, max_events);
3392 }
3393 write_timeline_json(path, &existing)
3394 }
3395 Err(_) => {
3396 if let Some(max_events) = max_events_limit() {
3398 truncate_timeline_events(&mut timeline, max_events);
3399 }
3400 write_timeline_json(path, &timeline)
3401 }
3402 }
3403 } else {
3404 if let Some(max_events) = max_events_limit() {
3405 truncate_timeline_events(&mut timeline, max_events);
3406 }
3407 write_timeline_json(path, &timeline)
3408 }
3409 }
3410 "rotate" => {
3411 let rotated_path = rotated_path(path, now_ms());
3412 write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
3413 if let Some(keep) = rotate_keep_limit() {
3414 cleanup_rotated_files(path, keep)?;
3415 }
3416 Ok(())
3417 }
3418 _ => write_timeline_json(path, &timeline),
3419 }
3420}
3421
3422fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
3423 if let Some(parent) = Path::new(path).parent()
3424 && !parent.as_os_str().is_empty()
3425 {
3426 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
3427 }
3428 let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
3429 fs::write(path, json).map_err(|e| e.to_string())
3430}
3431
3432fn rotated_path(path: &str, suffix: u64) -> PathBuf {
3433 let p = Path::new(path);
3434 let parent = p.parent().unwrap_or_else(|| Path::new(""));
3435 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3436 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3437 parent.join(format!("{}_{}.{}", stem, suffix, ext))
3438}
3439
3440fn max_events_limit() -> Option<usize> {
3441 std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
3442 .ok()
3443 .and_then(|v| v.parse::<usize>().ok())
3444 .filter(|v| *v > 0)
3445}
3446
3447fn rotate_keep_limit() -> Option<usize> {
3448 std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
3449 .ok()
3450 .and_then(|v| v.parse::<usize>().ok())
3451 .filter(|v| *v > 0)
3452}
3453
3454fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
3455 let len = timeline.events.len();
3456 if len > max_events {
3457 let keep_from = len - max_events;
3458 timeline.events = timeline.events.split_off(keep_from);
3459 }
3460}
3461
3462fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
3463 let p = Path::new(base_path);
3464 let parent = p.parent().unwrap_or_else(|| Path::new("."));
3465 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3466 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3467 let prefix = format!("{}_", stem);
3468 let suffix = format!(".{}", ext);
3469
3470 let mut files = fs::read_dir(parent)
3471 .map_err(|e| e.to_string())?
3472 .filter_map(|entry| entry.ok())
3473 .filter(|entry| {
3474 let name = entry.file_name();
3475 let name = name.to_string_lossy();
3476 name.starts_with(&prefix) && name.ends_with(&suffix)
3477 })
3478 .map(|entry| {
3479 let modified = entry
3480 .metadata()
3481 .ok()
3482 .and_then(|m| m.modified().ok())
3483 .unwrap_or(SystemTime::UNIX_EPOCH);
3484 (entry.path(), modified)
3485 })
3486 .collect::<Vec<_>>();
3487
3488 files.sort_by(|a, b| b.1.cmp(&a.1));
3489 for (path, _) in files.into_iter().skip(keep) {
3490 let _ = fs::remove_file(path);
3491 }
3492 Ok(())
3493}
3494
3495fn bus_capability_schema_from_policy(
3496 policy: Option<ranvier_core::bus::BusAccessPolicy>,
3497) -> Option<BusCapabilitySchema> {
3498 let policy = policy?;
3499
3500 let mut allow = policy
3501 .allow
3502 .unwrap_or_default()
3503 .into_iter()
3504 .map(|entry| entry.type_name.to_string())
3505 .collect::<Vec<_>>();
3506 let mut deny = policy
3507 .deny
3508 .into_iter()
3509 .map(|entry| entry.type_name.to_string())
3510 .collect::<Vec<_>>();
3511 allow.sort();
3512 deny.sort();
3513
3514 if allow.is_empty() && deny.is_empty() {
3515 return None;
3516 }
3517
3518 Some(BusCapabilitySchema { allow, deny })
3519}
3520
3521fn now_ms() -> u64 {
3522 SystemTime::now()
3523 .duration_since(UNIX_EPOCH)
3524 .map(|d| d.as_millis() as u64)
3525 .unwrap_or(0)
3526}
3527
3528#[cfg(test)]
3529mod tests {
3530 use super::{
3531 Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
3532 should_force_export,
3533 };
3534 use crate::persistence::{
3535 CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
3536 CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
3537 InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
3538 PersistenceHandle, PersistenceStore, PersistenceTraceId,
3539 };
3540 use anyhow::Result;
3541 use async_trait::async_trait;
3542 use ranvier_audit::{AuditError, AuditEvent, AuditSink};
3543 use ranvier_core::event::{DlqPolicy, DlqSink};
3544 use ranvier_core::saga::SagaStack;
3545 use ranvier_core::timeline::{Timeline, TimelineEvent};
3546 use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
3547 use serde::{Deserialize, Serialize};
3548 use std::sync::Arc;
3549 use tokio::sync::Mutex;
3550 use uuid::Uuid;
3551
3552 struct MockAuditSink {
3553 events: Arc<Mutex<Vec<AuditEvent>>>,
3554 }
3555
3556 #[async_trait]
3557 impl AuditSink for MockAuditSink {
3558 async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
3559 self.events.lock().await.push(event.clone());
3560 Ok(())
3561 }
3562 }
3563
3564 #[tokio::test]
3565 async fn execute_logs_audit_events_for_intervention() {
3566 use ranvier_inspector::StateInspector;
3567
3568 let trace_id = "test-audit-trace";
3569 let store_impl = InMemoryPersistenceStore::new();
3570 let events = Arc::new(Mutex::new(Vec::new()));
3571 let sink = MockAuditSink {
3572 events: events.clone(),
3573 };
3574
3575 let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
3576 .then(AddOne)
3577 .with_persistence_store(store_impl.clone())
3578 .with_audit_sink(sink);
3579
3580 let mut bus = Bus::new();
3581 bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
3582 bus.insert(PersistenceTraceId::new(trace_id));
3583 let target_node_id = axon.schematic.nodes[0].id.clone();
3584
3585 store_impl
3587 .append(crate::persistence::PersistenceEnvelope {
3588 trace_id: trace_id.to_string(),
3589 circuit: "AuditTest".to_string(),
3590 schematic_version: "v1.0".to_string(),
3591 step: 0,
3592 node_id: None,
3593 outcome_kind: "Next".to_string(),
3594 timestamp_ms: 0,
3595 payload_hash: None,
3596 payload: None,
3597 })
3598 .await
3599 .unwrap();
3600
3601 axon.force_resume(trace_id, &target_node_id, None)
3603 .await
3604 .unwrap();
3605
3606 axon.execute(10, &(), &mut bus).await;
3608
3609 let recorded = events.lock().await;
3610 assert_eq!(
3611 recorded.len(),
3612 2,
3613 "Should have 2 audit events: ForceResume and ApplyIntervention"
3614 );
3615 assert_eq!(recorded[0].action, "ForceResume");
3616 assert_eq!(recorded[0].target, trace_id);
3617 assert_eq!(recorded[1].action, "ApplyIntervention");
3618 assert_eq!(recorded[1].target, trace_id);
3619 }
3620
3621 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
3622 pub enum TestInfallible {}
3623
3624 #[test]
3625 fn inspector_enabled_flag_matrix() {
3626 assert!(inspector_enabled_from_value(None));
3627 assert!(inspector_enabled_from_value(Some("1")));
3628 assert!(inspector_enabled_from_value(Some("true")));
3629 assert!(inspector_enabled_from_value(Some("on")));
3630 assert!(!inspector_enabled_from_value(Some("0")));
3631 assert!(!inspector_enabled_from_value(Some("false")));
3632 }
3633
3634 #[test]
3635 fn inspector_dev_mode_matrix() {
3636 assert!(inspector_dev_mode_from_value(None));
3637 assert!(inspector_dev_mode_from_value(Some("dev")));
3638 assert!(inspector_dev_mode_from_value(Some("staging")));
3639 assert!(!inspector_dev_mode_from_value(Some("prod")));
3640 assert!(!inspector_dev_mode_from_value(Some("production")));
3641 }
3642
3643 #[test]
3644 fn adaptive_policy_force_export_matrix() {
3645 let next = Outcome::<(), &'static str>::Next(());
3646 let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
3647 let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
3648 let fault = Outcome::<(), &'static str>::Fault("boom");
3649
3650 assert!(!should_force_export(&next, "off"));
3651 assert!(!should_force_export(&fault, "off"));
3652
3653 assert!(!should_force_export(&branch, "fault_only"));
3654 assert!(should_force_export(&fault, "fault_only"));
3655
3656 assert!(should_force_export(&branch, "fault_branch"));
3657 assert!(!should_force_export(&emit, "fault_branch"));
3658 assert!(should_force_export(&fault, "fault_branch"));
3659
3660 assert!(should_force_export(&branch, "fault_branch_emit"));
3661 assert!(should_force_export(&emit, "fault_branch_emit"));
3662 assert!(should_force_export(&fault, "fault_branch_emit"));
3663 }
3664
3665 #[test]
3666 fn sampling_and_adaptive_combination_decisions() {
3667 let bus_id = Uuid::nil();
3668 let next = Outcome::<(), &'static str>::Next(());
3669 let fault = Outcome::<(), &'static str>::Fault("boom");
3670
3671 let sampled_never = sampled_by_bus_id(bus_id, 0.0);
3672 assert!(!sampled_never);
3673 assert!(!(sampled_never || should_force_export(&next, "off")));
3674 assert!(sampled_never || should_force_export(&fault, "fault_only"));
3675
3676 let sampled_always = sampled_by_bus_id(bus_id, 1.0);
3677 assert!(sampled_always);
3678 assert!(sampled_always || should_force_export(&next, "off"));
3679 assert!(sampled_always || should_force_export(&fault, "off"));
3680 }
3681
3682 #[derive(Clone)]
3683 struct AddOne;
3684
3685 #[async_trait]
3686 impl Transition<i32, i32> for AddOne {
3687 type Error = TestInfallible;
3688 type Resources = ();
3689
3690 async fn run(
3691 &self,
3692 state: i32,
3693 _resources: &Self::Resources,
3694 _bus: &mut Bus,
3695 ) -> Outcome<i32, Self::Error> {
3696 Outcome::Next(state + 1)
3697 }
3698 }
3699
3700 #[derive(Clone)]
3701 struct AlwaysFault;
3702
3703 #[async_trait]
3704 impl Transition<i32, i32> for AlwaysFault {
3705 type Error = String;
3706 type Resources = ();
3707
3708 async fn run(
3709 &self,
3710 _state: i32,
3711 _resources: &Self::Resources,
3712 _bus: &mut Bus,
3713 ) -> Outcome<i32, Self::Error> {
3714 Outcome::Fault("boom".to_string())
3715 }
3716 }
3717
3718 #[derive(Clone)]
3719 struct CapabilityGuarded;
3720
3721 #[async_trait]
3722 impl Transition<(), ()> for CapabilityGuarded {
3723 type Error = String;
3724 type Resources = ();
3725
3726 fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
3727 Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
3728 }
3729
3730 async fn run(
3731 &self,
3732 _state: (),
3733 _resources: &Self::Resources,
3734 bus: &mut Bus,
3735 ) -> Outcome<(), Self::Error> {
3736 match bus.get::<String>() {
3737 Ok(_) => Outcome::Next(()),
3738 Err(err) => Outcome::Fault(err.to_string()),
3739 }
3740 }
3741 }
3742
3743 #[derive(Clone)]
3744 struct RecordingCompensationHook {
3745 calls: Arc<Mutex<Vec<CompensationContext>>>,
3746 should_fail: bool,
3747 }
3748
3749 #[async_trait]
3750 impl CompensationHook for RecordingCompensationHook {
3751 async fn compensate(&self, context: CompensationContext) -> Result<()> {
3752 self.calls.lock().await.push(context);
3753 if self.should_fail {
3754 return Err(anyhow::anyhow!("compensation failed"));
3755 }
3756 Ok(())
3757 }
3758 }
3759
3760 #[derive(Clone)]
3761 struct FlakyCompensationHook {
3762 calls: Arc<Mutex<u32>>,
3763 failures_remaining: Arc<Mutex<u32>>,
3764 }
3765
3766 #[async_trait]
3767 impl CompensationHook for FlakyCompensationHook {
3768 async fn compensate(&self, _context: CompensationContext) -> Result<()> {
3769 {
3770 let mut calls = self.calls.lock().await;
3771 *calls += 1;
3772 }
3773 let mut failures_remaining = self.failures_remaining.lock().await;
3774 if *failures_remaining > 0 {
3775 *failures_remaining -= 1;
3776 return Err(anyhow::anyhow!("transient compensation failure"));
3777 }
3778 Ok(())
3779 }
3780 }
3781
3782 #[derive(Clone)]
3783 struct FailingCompensationIdempotencyStore {
3784 read_calls: Arc<Mutex<u32>>,
3785 write_calls: Arc<Mutex<u32>>,
3786 }
3787
3788 #[async_trait]
3789 impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
3790 async fn was_compensated(&self, _key: &str) -> Result<bool> {
3791 let mut read_calls = self.read_calls.lock().await;
3792 *read_calls += 1;
3793 Err(anyhow::anyhow!("forced idempotency read failure"))
3794 }
3795
3796 async fn mark_compensated(&self, _key: &str) -> Result<()> {
3797 let mut write_calls = self.write_calls.lock().await;
3798 *write_calls += 1;
3799 Err(anyhow::anyhow!("forced idempotency write failure"))
3800 }
3801 }
3802
3803 #[tokio::test]
3804 async fn execute_persists_success_trace_when_handle_exists() {
3805 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3806 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3807
3808 let mut bus = Bus::new();
3809 bus.insert(PersistenceHandle::from_arc(store_dyn));
3810 bus.insert(PersistenceTraceId::new("trace-success"));
3811
3812 let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
3813 let outcome = axon.execute(41, &(), &mut bus).await;
3814 assert!(matches!(outcome, Outcome::Next(42)));
3815
3816 let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
3817 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[0].outcome_kind, "Enter");
3819 assert_eq!(persisted.events[1].outcome_kind, "Next"); assert_eq!(persisted.events[2].outcome_kind, "Next"); assert_eq!(persisted.completion, Some(CompletionState::Success));
3822 }
3823
3824 #[tokio::test]
3825 async fn execute_persists_fault_completion_state() {
3826 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3827 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3828
3829 let mut bus = Bus::new();
3830 bus.insert(PersistenceHandle::from_arc(store_dyn));
3831 bus.insert(PersistenceTraceId::new("trace-fault"));
3832
3833 let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
3834 let outcome = axon.execute(41, &(), &mut bus).await;
3835 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3836
3837 let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
3838 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3842 }
3843
3844 #[tokio::test]
3845 async fn execute_respects_persistence_auto_complete_off() {
3846 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3847 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3848
3849 let mut bus = Bus::new();
3850 bus.insert(PersistenceHandle::from_arc(store_dyn));
3851 bus.insert(PersistenceTraceId::new("trace-no-complete"));
3852 bus.insert(PersistenceAutoComplete(false));
3853
3854 let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
3855 let outcome = axon.execute(1, &(), &mut bus).await;
3856 assert!(matches!(outcome, Outcome::Next(2)));
3857
3858 let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
3859 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.completion, None);
3861 }
3862
3863 #[tokio::test]
3864 async fn fault_triggers_compensation_and_marks_compensated() {
3865 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3866 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3867 let calls = Arc::new(Mutex::new(Vec::new()));
3868 let compensation = RecordingCompensationHook {
3869 calls: calls.clone(),
3870 should_fail: false,
3871 };
3872
3873 let mut bus = Bus::new();
3874 bus.insert(PersistenceHandle::from_arc(store_dyn));
3875 bus.insert(PersistenceTraceId::new("trace-compensated"));
3876 bus.insert(CompensationHandle::from_hook(compensation));
3877
3878 let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
3879 let outcome = axon.execute(7, &(), &mut bus).await;
3880 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3881
3882 let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
3883 assert_eq!(persisted.events.len(), 4); assert_eq!(persisted.events[0].outcome_kind, "Enter");
3885 assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3888 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3889
3890 let recorded = calls.lock().await;
3891 assert_eq!(recorded.len(), 1);
3892 assert_eq!(recorded[0].trace_id, "trace-compensated");
3893 assert_eq!(recorded[0].fault_kind, "Fault");
3894 }
3895
3896 #[tokio::test]
3897 async fn failed_compensation_keeps_fault_completion() {
3898 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3899 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3900 let calls = Arc::new(Mutex::new(Vec::new()));
3901 let compensation = RecordingCompensationHook {
3902 calls: calls.clone(),
3903 should_fail: true,
3904 };
3905
3906 let mut bus = Bus::new();
3907 bus.insert(PersistenceHandle::from_arc(store_dyn));
3908 bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3909 bus.insert(CompensationHandle::from_hook(compensation));
3910
3911 let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3912 let outcome = axon.execute(7, &(), &mut bus).await;
3913 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3914
3915 let persisted = store_impl
3916 .load("trace-compensation-failed")
3917 .await
3918 .unwrap()
3919 .unwrap();
3920 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3923
3924 let recorded = calls.lock().await;
3925 assert_eq!(recorded.len(), 1);
3926 }
3927
3928 #[tokio::test]
3929 async fn compensation_retry_policy_succeeds_after_retries() {
3930 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3931 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3932 let calls = Arc::new(Mutex::new(0u32));
3933 let failures_remaining = Arc::new(Mutex::new(2u32));
3934 let compensation = FlakyCompensationHook {
3935 calls: calls.clone(),
3936 failures_remaining,
3937 };
3938
3939 let mut bus = Bus::new();
3940 bus.insert(PersistenceHandle::from_arc(store_dyn));
3941 bus.insert(PersistenceTraceId::new("trace-retry-success"));
3942 bus.insert(CompensationHandle::from_hook(compensation));
3943 bus.insert(CompensationRetryPolicy {
3944 max_attempts: 3,
3945 backoff_ms: 0,
3946 });
3947
3948 let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3949 let outcome = axon.execute(7, &(), &mut bus).await;
3950 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3951
3952 let persisted = store_impl
3953 .load("trace-retry-success")
3954 .await
3955 .unwrap()
3956 .unwrap();
3957 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3958 assert_eq!(
3959 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3960 Some("Compensated")
3961 );
3962
3963 let attempt_count = calls.lock().await;
3964 assert_eq!(*attempt_count, 3);
3965 }
3966
3967 #[tokio::test]
3968 async fn compensation_idempotency_skips_duplicate_hook_execution() {
3969 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3970 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3971 let calls = Arc::new(Mutex::new(Vec::new()));
3972 let compensation = RecordingCompensationHook {
3973 calls: calls.clone(),
3974 should_fail: false,
3975 };
3976 let idempotency = InMemoryCompensationIdempotencyStore::new();
3977
3978 let mut bus = Bus::new();
3979 bus.insert(PersistenceHandle::from_arc(store_dyn));
3980 bus.insert(PersistenceTraceId::new("trace-idempotent"));
3981 bus.insert(PersistenceAutoComplete(false));
3982 bus.insert(CompensationHandle::from_hook(compensation));
3983 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3984
3985 let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3986
3987 let outcome1 = axon.execute(7, &(), &mut bus).await;
3988 let outcome2 = axon.execute(8, &(), &mut bus).await;
3989 assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3990 assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3991
3992 let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3993 assert_eq!(persisted.completion, None);
3994 let compensated_count = persisted
3996 .events
3997 .iter()
3998 .filter(|e| e.outcome_kind == "Compensated")
3999 .count();
4000 assert_eq!(
4001 compensated_count, 2,
4002 "Should have 2 Compensated events (one per execution)"
4003 );
4004
4005 let recorded = calls.lock().await;
4006 assert_eq!(recorded.len(), 1);
4007 }
4008
4009 #[tokio::test]
4010 async fn compensation_idempotency_store_failure_does_not_block_compensation() {
4011 let store_impl = Arc::new(InMemoryPersistenceStore::new());
4012 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
4013 let calls = Arc::new(Mutex::new(Vec::new()));
4014 let read_calls = Arc::new(Mutex::new(0u32));
4015 let write_calls = Arc::new(Mutex::new(0u32));
4016 let compensation = RecordingCompensationHook {
4017 calls: calls.clone(),
4018 should_fail: false,
4019 };
4020 let idempotency = FailingCompensationIdempotencyStore {
4021 read_calls: read_calls.clone(),
4022 write_calls: write_calls.clone(),
4023 };
4024
4025 let mut bus = Bus::new();
4026 bus.insert(PersistenceHandle::from_arc(store_dyn));
4027 bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
4028 bus.insert(CompensationHandle::from_hook(compensation));
4029 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
4030
4031 let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
4032 let outcome = axon.execute(9, &(), &mut bus).await;
4033 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
4034
4035 let persisted = store_impl
4036 .load("trace-idempotency-store-failure")
4037 .await
4038 .unwrap()
4039 .unwrap();
4040 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
4041 assert_eq!(
4042 persisted.events.last().map(|e| e.outcome_kind.as_str()),
4043 Some("Compensated")
4044 );
4045
4046 let recorded = calls.lock().await;
4047 assert_eq!(recorded.len(), 1);
4048 assert_eq!(*read_calls.lock().await, 1);
4049 assert_eq!(*write_calls.lock().await, 1);
4050 }
4051
4052 #[tokio::test]
4053 async fn transition_bus_policy_blocks_unauthorized_resource_access() {
4054 let mut bus = Bus::new();
4055 bus.insert(1_i32);
4056 bus.insert("secret".to_string());
4057
4058 let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
4059 let outcome = axon.execute((), &(), &mut bus).await;
4060
4061 match outcome {
4062 Outcome::Fault(msg) => {
4063 assert!(msg.contains("Bus access denied"), "{msg}");
4064 assert!(msg.contains("CapabilityGuarded"), "{msg}");
4065 assert!(msg.contains("alloc::string::String"), "{msg}");
4066 }
4067 other => panic!("expected fault, got {other:?}"),
4068 }
4069 }
4070
4071 #[tokio::test]
4072 async fn execute_fails_on_version_mismatch_without_migration() {
4073 let store_impl = Arc::new(InMemoryPersistenceStore::new());
4074 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
4075
4076 let trace_id = "v-mismatch";
4077 let old_envelope = crate::persistence::PersistenceEnvelope {
4079 trace_id: trace_id.to_string(),
4080 circuit: "TestCircuit".to_string(),
4081 schematic_version: "0.9".to_string(),
4082 step: 0,
4083 node_id: None,
4084 outcome_kind: "Enter".to_string(),
4085 timestamp_ms: 0,
4086 payload_hash: None,
4087 payload: None,
4088 };
4089 store_impl.append(old_envelope).await.unwrap();
4090
4091 let mut bus = Bus::new();
4092 bus.insert(PersistenceHandle::from_arc(store_dyn));
4093 bus.insert(PersistenceTraceId::new(trace_id));
4094
4095 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
4097 let outcome = axon.execute(10, &(), &mut bus).await;
4098
4099 if let Outcome::Emit(kind, _) = outcome {
4100 assert_eq!(kind, "execution.resumption.version_mismatch_failed");
4101 } else {
4102 panic!("Expected version mismatch emission, got {:?}", outcome);
4103 }
4104 }
4105
4106 #[tokio::test]
4107 async fn execute_resumes_from_start_on_migration_strategy() {
4108 let store_impl = Arc::new(InMemoryPersistenceStore::new());
4109 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
4110
4111 let trace_id = "v-migration";
4112 let old_envelope = crate::persistence::PersistenceEnvelope {
4114 trace_id: trace_id.to_string(),
4115 circuit: "TestCircuit".to_string(),
4116 schematic_version: "0.9".to_string(),
4117 step: 5,
4118 node_id: None,
4119 outcome_kind: "Next".to_string(),
4120 timestamp_ms: 0,
4121 payload_hash: None,
4122 payload: None,
4123 };
4124 store_impl.append(old_envelope).await.unwrap();
4125
4126 let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
4127 registry.register(ranvier_core::schematic::SnapshotMigration {
4128 name: Some("v0.9 to v1.0".to_string()),
4129 from_version: "0.9".to_string(),
4130 to_version: "1.0".to_string(),
4131 default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
4132 node_mapping: std::collections::HashMap::new(),
4133 payload_mapper: None,
4134 });
4135
4136 let mut bus = Bus::new();
4137 bus.insert(PersistenceHandle::from_arc(store_dyn));
4138 bus.insert(PersistenceTraceId::new(trace_id));
4139 bus.insert(registry);
4140
4141 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
4142 let outcome = axon.execute(10, &(), &mut bus).await;
4143
4144 assert!(matches!(outcome, Outcome::Next(11)));
4146
4147 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
4149 assert_eq!(persisted.schematic_version, "1.0");
4150 }
4151
4152 #[tokio::test]
4153 async fn execute_applies_manual_intervention_jump_and_payload() {
4154 let store_impl = Arc::new(InMemoryPersistenceStore::new());
4155 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
4156
4157 let trace_id = "intervention-test";
4158 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
4160 .then(AddOne)
4161 .then(AddOne);
4162
4163 let mut bus = Bus::new();
4164 bus.insert(PersistenceHandle::from_arc(store_dyn));
4165 bus.insert(PersistenceTraceId::new(trace_id));
4166
4167 let _target_node_label = "AddOne";
4172 let target_node_id = axon.schematic.nodes[2].id.clone();
4174
4175 store_impl
4177 .append(crate::persistence::PersistenceEnvelope {
4178 trace_id: trace_id.to_string(),
4179 circuit: "TestCircuit".to_string(),
4180 schematic_version: "1.0".to_string(),
4181 step: 0,
4182 node_id: None,
4183 outcome_kind: "Enter".to_string(),
4184 timestamp_ms: 0,
4185 payload_hash: None,
4186 payload: None,
4187 })
4188 .await
4189 .unwrap();
4190
4191 store_impl
4192 .save_intervention(
4193 trace_id,
4194 crate::persistence::Intervention {
4195 target_node: target_node_id.clone(),
4196 payload_override: Some(serde_json::json!(100)),
4197 timestamp_ms: 0,
4198 },
4199 )
4200 .await
4201 .unwrap();
4202
4203 let outcome = axon.execute(10, &(), &mut bus).await;
4206
4207 match outcome {
4208 Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
4209 other => panic!("Expected Outcome::Next(101), got {:?}", other),
4210 }
4211
4212 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
4214 assert_eq!(persisted.interventions.len(), 1);
4216 assert_eq!(persisted.interventions[0].target_node, target_node_id);
4217 }
4218
4219 #[derive(Clone)]
4223 struct FailNThenSucceed {
4224 remaining: Arc<tokio::sync::Mutex<u32>>,
4225 }
4226
4227 #[async_trait]
4228 impl Transition<i32, i32> for FailNThenSucceed {
4229 type Error = String;
4230 type Resources = ();
4231
4232 async fn run(
4233 &self,
4234 state: i32,
4235 _resources: &Self::Resources,
4236 _bus: &mut Bus,
4237 ) -> Outcome<i32, Self::Error> {
4238 let mut rem = self.remaining.lock().await;
4239 if *rem > 0 {
4240 *rem -= 1;
4241 Outcome::Fault("transient failure".to_string())
4242 } else {
4243 Outcome::Next(state + 1)
4244 }
4245 }
4246 }
4247
4248 #[derive(Clone)]
4250 struct MockDlqSink {
4251 letters: Arc<tokio::sync::Mutex<Vec<String>>>,
4252 }
4253
4254 #[async_trait]
4255 impl DlqSink for MockDlqSink {
4256 async fn store_dead_letter(
4257 &self,
4258 workflow_id: &str,
4259 _circuit_label: &str,
4260 node_id: &str,
4261 error_msg: &str,
4262 _payload: &[u8],
4263 ) -> Result<(), String> {
4264 let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
4265 self.letters.lock().await.push(entry);
4266 Ok(())
4267 }
4268 }
4269
4270 #[tokio::test]
4271 async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
4272 let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
4274 let trans = FailNThenSucceed { remaining };
4275
4276 let dlq_sink = MockDlqSink {
4277 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4278 };
4279
4280 let mut bus = Bus::new();
4281 bus.insert(Timeline::new());
4282
4283 let axon = Axon::<i32, i32, String>::start("RetrySucceed")
4284 .then(trans)
4285 .with_dlq_policy(DlqPolicy::RetryThenDlq {
4286 max_attempts: 5,
4287 backoff_ms: 1,
4288 })
4289 .with_dlq_sink(dlq_sink.clone());
4290 let outcome = axon.execute(10, &(), &mut bus).await;
4291
4292 assert!(
4294 matches!(outcome, Outcome::Next(11)),
4295 "Expected Next(11), got {:?}",
4296 outcome
4297 );
4298
4299 let letters = dlq_sink.letters.lock().await;
4301 assert!(
4302 letters.is_empty(),
4303 "Should have 0 dead letters, got {}",
4304 letters.len()
4305 );
4306
4307 let timeline = bus.read::<Timeline>().unwrap();
4309 let retry_count = timeline
4310 .events
4311 .iter()
4312 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4313 .count();
4314 assert_eq!(retry_count, 2, "Should have 2 retry events");
4315 }
4316
4317 #[tokio::test]
4318 async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
4319 let mut bus = Bus::new();
4321 bus.insert(Timeline::new());
4322
4323 let dlq_sink = MockDlqSink {
4324 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4325 };
4326
4327 let axon = Axon::<i32, i32, String>::start("RetryExhaust")
4328 .then(AlwaysFault)
4329 .with_dlq_policy(DlqPolicy::RetryThenDlq {
4330 max_attempts: 3,
4331 backoff_ms: 1,
4332 })
4333 .with_dlq_sink(dlq_sink.clone());
4334 let outcome = axon.execute(42, &(), &mut bus).await;
4335
4336 assert!(
4337 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4338 "Expected Fault(boom), got {:?}",
4339 outcome
4340 );
4341
4342 let letters = dlq_sink.letters.lock().await;
4344 assert_eq!(letters.len(), 1, "Should have 1 dead letter");
4345
4346 let timeline = bus.read::<Timeline>().unwrap();
4348 let retry_count = timeline
4349 .events
4350 .iter()
4351 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4352 .count();
4353 let dlq_count = timeline
4354 .events
4355 .iter()
4356 .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
4357 .count();
4358 assert_eq!(
4359 retry_count, 2,
4360 "Should have 2 retry events (attempts 2 and 3)"
4361 );
4362 assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
4363 }
4364
4365 #[tokio::test]
4366 async fn send_to_dlq_policy_sends_immediately_without_retry() {
4367 let mut bus = Bus::new();
4368 bus.insert(Timeline::new());
4369
4370 let dlq_sink = MockDlqSink {
4371 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4372 };
4373
4374 let axon = Axon::<i32, i32, String>::start("SendDlq")
4375 .then(AlwaysFault)
4376 .with_dlq_policy(DlqPolicy::SendToDlq)
4377 .with_dlq_sink(dlq_sink.clone());
4378 let outcome = axon.execute(1, &(), &mut bus).await;
4379
4380 assert!(matches!(outcome, Outcome::Fault(_)));
4381
4382 let letters = dlq_sink.letters.lock().await;
4384 assert_eq!(letters.len(), 1);
4385
4386 let timeline = bus.read::<Timeline>().unwrap();
4388 let retry_count = timeline
4389 .events
4390 .iter()
4391 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4392 .count();
4393 assert_eq!(retry_count, 0);
4394 }
4395
4396 #[tokio::test]
4397 async fn drop_policy_does_not_send_to_dlq() {
4398 let mut bus = Bus::new();
4399
4400 let dlq_sink = MockDlqSink {
4401 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4402 };
4403
4404 let axon = Axon::<i32, i32, String>::start("DropDlq")
4405 .then(AlwaysFault)
4406 .with_dlq_policy(DlqPolicy::Drop)
4407 .with_dlq_sink(dlq_sink.clone());
4408 let outcome = axon.execute(1, &(), &mut bus).await;
4409
4410 assert!(matches!(outcome, Outcome::Fault(_)));
4411
4412 let letters = dlq_sink.letters.lock().await;
4414 assert!(letters.is_empty());
4415 }
4416
4417 #[tokio::test]
4418 async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
4419 use ranvier_core::policy::DynamicPolicy;
4420
4421 let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
4423 let dlq_sink = MockDlqSink {
4424 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4425 };
4426
4427 let axon = Axon::<i32, i32, String>::start("DynamicDlq")
4428 .then(AlwaysFault)
4429 .with_dynamic_dlq_policy(dynamic)
4430 .with_dlq_sink(dlq_sink.clone());
4431
4432 let mut bus = Bus::new();
4434 let outcome = axon.execute(1, &(), &mut bus).await;
4435 assert!(matches!(outcome, Outcome::Fault(_)));
4436 assert!(
4437 dlq_sink.letters.lock().await.is_empty(),
4438 "Drop policy should produce no DLQ entries"
4439 );
4440
4441 tx.send(DlqPolicy::SendToDlq).unwrap();
4443
4444 let mut bus2 = Bus::new();
4446 let outcome2 = axon.execute(2, &(), &mut bus2).await;
4447 assert!(matches!(outcome2, Outcome::Fault(_)));
4448 assert_eq!(
4449 dlq_sink.letters.lock().await.len(),
4450 1,
4451 "SendToDlq policy should produce 1 DLQ entry"
4452 );
4453 }
4454
4455 #[tokio::test]
4456 async fn dynamic_saga_policy_hot_reload() {
4457 use ranvier_core::policy::DynamicPolicy;
4458 use ranvier_core::saga::SagaPolicy;
4459
4460 let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
4462
4463 let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
4464 .then(AddOne)
4465 .with_dynamic_saga_policy(dynamic);
4466
4467 let mut bus = Bus::new();
4469 let _outcome = axon.execute(1, &(), &mut bus).await;
4470 assert!(
4471 bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
4472 "SagaStack should be absent or empty when disabled"
4473 );
4474
4475 tx.send(SagaPolicy::Enabled).unwrap();
4477
4478 let mut bus2 = Bus::new();
4480 let _outcome2 = axon.execute(10, &(), &mut bus2).await;
4481 assert!(
4482 bus2.read::<SagaStack>().is_some(),
4483 "SagaStack should exist when saga is enabled"
4484 );
4485 }
4486
4487 mod iam_tests {
4490 use super::*;
4491 use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
4492
4493 #[derive(Clone)]
4495 struct MockVerifier {
4496 identity: IamIdentity,
4497 should_fail: bool,
4498 }
4499
4500 #[async_trait]
4501 impl IamVerifier for MockVerifier {
4502 async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
4503 if self.should_fail {
4504 Err(IamError::InvalidToken("mock verification failure".into()))
4505 } else {
4506 Ok(self.identity.clone())
4507 }
4508 }
4509 }
4510
4511 #[tokio::test]
4512 async fn iam_require_identity_passes_with_valid_token() {
4513 let verifier = MockVerifier {
4514 identity: IamIdentity::new("alice").with_role("user"),
4515 should_fail: false,
4516 };
4517
4518 let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
4519 .with_iam(IamPolicy::RequireIdentity, verifier)
4520 .then(AddOne);
4521
4522 let mut bus = Bus::new();
4523 bus.insert(IamToken("valid-token".to_string()));
4524 let outcome = axon.execute(10, &(), &mut bus).await;
4525
4526 assert!(matches!(outcome, Outcome::Next(11)));
4527 let identity = bus
4529 .read::<IamIdentity>()
4530 .expect("IamIdentity should be in Bus");
4531 assert_eq!(identity.subject, "alice");
4532 }
4533
4534 #[tokio::test]
4535 async fn iam_require_identity_rejects_missing_token() {
4536 let verifier = MockVerifier {
4537 identity: IamIdentity::new("ignored"),
4538 should_fail: false,
4539 };
4540
4541 let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
4542 .with_iam(IamPolicy::RequireIdentity, verifier)
4543 .then(AddOne);
4544
4545 let mut bus = Bus::new();
4546 let outcome = axon.execute(10, &(), &mut bus).await;
4548
4549 match &outcome {
4551 Outcome::Emit(label, _) => {
4552 assert_eq!(label, "iam.missing_token");
4553 }
4554 other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
4555 }
4556 }
4557
4558 #[tokio::test]
4559 async fn iam_rejects_failed_verification() {
4560 let verifier = MockVerifier {
4561 identity: IamIdentity::new("ignored"),
4562 should_fail: true,
4563 };
4564
4565 let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
4566 .with_iam(IamPolicy::RequireIdentity, verifier)
4567 .then(AddOne);
4568
4569 let mut bus = Bus::new();
4570 bus.insert(IamToken("bad-token".to_string()));
4571 let outcome = axon.execute(10, &(), &mut bus).await;
4572
4573 match &outcome {
4574 Outcome::Emit(label, _) => {
4575 assert_eq!(label, "iam.verification_failed");
4576 }
4577 other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
4578 }
4579 }
4580
4581 #[tokio::test]
4582 async fn iam_require_role_passes_with_matching_role() {
4583 let verifier = MockVerifier {
4584 identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
4585 should_fail: false,
4586 };
4587
4588 let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
4589 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4590 .then(AddOne);
4591
4592 let mut bus = Bus::new();
4593 bus.insert(IamToken("token".to_string()));
4594 let outcome = axon.execute(5, &(), &mut bus).await;
4595
4596 assert!(matches!(outcome, Outcome::Next(6)));
4597 }
4598
4599 #[tokio::test]
4600 async fn iam_require_role_denies_without_role() {
4601 let verifier = MockVerifier {
4602 identity: IamIdentity::new("carol").with_role("user"),
4603 should_fail: false,
4604 };
4605
4606 let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
4607 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4608 .then(AddOne);
4609
4610 let mut bus = Bus::new();
4611 bus.insert(IamToken("token".to_string()));
4612 let outcome = axon.execute(5, &(), &mut bus).await;
4613
4614 match &outcome {
4615 Outcome::Emit(label, _) => {
4616 assert_eq!(label, "iam.policy_denied");
4617 }
4618 other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
4619 }
4620 }
4621
4622 #[tokio::test]
4623 async fn iam_policy_none_skips_verification() {
4624 let verifier = MockVerifier {
4625 identity: IamIdentity::new("ignored"),
4626 should_fail: true, };
4628
4629 let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
4630 .with_iam(IamPolicy::None, verifier)
4631 .then(AddOne);
4632
4633 let mut bus = Bus::new();
4634 let outcome = axon.execute(10, &(), &mut bus).await;
4636
4637 assert!(matches!(outcome, Outcome::Next(11)));
4638 }
4639 }
4640
4641 #[derive(Clone)]
4644 struct SchemaTransition;
4645
4646 #[async_trait]
4647 impl Transition<String, String> for SchemaTransition {
4648 type Error = String;
4649 type Resources = ();
4650
4651 fn input_schema(&self) -> Option<serde_json::Value> {
4652 Some(serde_json::json!({
4653 "type": "object",
4654 "required": ["name"],
4655 "properties": {
4656 "name": { "type": "string" }
4657 }
4658 }))
4659 }
4660
4661 async fn run(
4662 &self,
4663 state: String,
4664 _resources: &Self::Resources,
4665 _bus: &mut Bus,
4666 ) -> Outcome<String, Self::Error> {
4667 Outcome::Next(state)
4668 }
4669 }
4670
4671 #[test]
4672 fn then_auto_populates_input_schema_from_transition() {
4673 let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
4674
4675 let last_node = axon.schematic.nodes.last().unwrap();
4677 assert!(last_node.input_schema.is_some());
4678 let schema = last_node.input_schema.as_ref().unwrap();
4679 assert_eq!(schema["type"], "object");
4680 assert_eq!(schema["required"][0], "name");
4681 }
4682
4683 #[test]
4684 fn then_leaves_input_schema_none_when_not_provided() {
4685 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
4686
4687 let last_node = axon.schematic.nodes.last().unwrap();
4688 assert!(last_node.input_schema.is_none());
4689 }
4690
4691 #[test]
4692 fn with_input_schema_value_sets_on_last_node() {
4693 let schema = serde_json::json!({"type": "integer"});
4694 let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
4695 .then(AddOne)
4696 .with_input_schema_value(schema.clone());
4697
4698 let last_node = axon.schematic.nodes.last().unwrap();
4699 assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
4700 }
4701
4702 #[test]
4703 fn with_output_schema_value_sets_on_last_node() {
4704 let schema = serde_json::json!({"type": "integer"});
4705 let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
4706 .then(AddOne)
4707 .with_output_schema_value(schema.clone());
4708
4709 let last_node = axon.schematic.nodes.last().unwrap();
4710 assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
4711 }
4712
4713 #[test]
4714 fn schematic_export_includes_schema_fields() {
4715 let axon = Axon::<String, String, String>::new("ExportTest")
4716 .then(SchemaTransition)
4717 .with_output_schema_value(serde_json::json!({"type": "string"}));
4718
4719 let json = serde_json::to_value(&axon.schematic).unwrap();
4720 let nodes = json["nodes"].as_array().unwrap();
4721 let last = nodes.last().unwrap();
4723 assert!(last.get("input_schema").is_some());
4724 assert_eq!(last["input_schema"]["type"], "object");
4725 assert_eq!(last["output_schema"]["type"], "string");
4726 }
4727
4728 #[test]
4729 fn schematic_export_omits_schema_fields_when_none() {
4730 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
4731
4732 let json = serde_json::to_value(&axon.schematic).unwrap();
4733 let nodes = json["nodes"].as_array().unwrap();
4734 let last = nodes.last().unwrap();
4735 let obj = last.as_object().unwrap();
4736 assert!(!obj.contains_key("input_schema"));
4737 assert!(!obj.contains_key("output_schema"));
4738 }
4739
4740 #[test]
4741 fn schematic_json_roundtrip_preserves_schemas() {
4742 let axon = Axon::<String, String, String>::new("Roundtrip")
4743 .then(SchemaTransition)
4744 .with_output_schema_value(serde_json::json!({"type": "string"}));
4745
4746 let json_str = serde_json::to_string(&axon.schematic).unwrap();
4747 let deserialized: ranvier_core::schematic::Schematic =
4748 serde_json::from_str(&json_str).unwrap();
4749
4750 let last = deserialized.nodes.last().unwrap();
4751 assert!(last.input_schema.is_some());
4752 assert!(last.output_schema.is_some());
4753 assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
4754 assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
4755 }
4756
4757 #[derive(Clone)]
4759 struct MultiplyByTwo;
4760
4761 #[async_trait]
4762 impl Transition<i32, i32> for MultiplyByTwo {
4763 type Error = TestInfallible;
4764 type Resources = ();
4765
4766 async fn run(
4767 &self,
4768 state: i32,
4769 _resources: &Self::Resources,
4770 _bus: &mut Bus,
4771 ) -> Outcome<i32, Self::Error> {
4772 Outcome::Next(state * 2)
4773 }
4774 }
4775
4776 #[derive(Clone)]
4777 struct AddTen;
4778
4779 #[async_trait]
4780 impl Transition<i32, i32> for AddTen {
4781 type Error = TestInfallible;
4782 type Resources = ();
4783
4784 async fn run(
4785 &self,
4786 state: i32,
4787 _resources: &Self::Resources,
4788 _bus: &mut Bus,
4789 ) -> Outcome<i32, Self::Error> {
4790 Outcome::Next(state + 10)
4791 }
4792 }
4793
4794 #[derive(Clone)]
4795 struct AddOneString;
4796
4797 #[async_trait]
4798 impl Transition<i32, i32> for AddOneString {
4799 type Error = String;
4800 type Resources = ();
4801
4802 async fn run(
4803 &self,
4804 state: i32,
4805 _resources: &Self::Resources,
4806 _bus: &mut Bus,
4807 ) -> Outcome<i32, Self::Error> {
4808 Outcome::Next(state + 1)
4809 }
4810 }
4811
4812 #[derive(Clone)]
4813 struct AddTenString;
4814
4815 #[async_trait]
4816 impl Transition<i32, i32> for AddTenString {
4817 type Error = String;
4818 type Resources = ();
4819
4820 async fn run(
4821 &self,
4822 state: i32,
4823 _resources: &Self::Resources,
4824 _bus: &mut Bus,
4825 ) -> Outcome<i32, Self::Error> {
4826 Outcome::Next(state + 10)
4827 }
4828 }
4829
4830 #[tokio::test]
4831 async fn axon_single_step_chain_executes_and_returns_next() {
4832 let mut bus = Bus::new();
4833 let axon = Axon::<i32, i32, TestInfallible>::start("SingleStep").then(AddOne);
4834
4835 let outcome = axon.execute(5, &(), &mut bus).await;
4836 assert!(matches!(outcome, Outcome::Next(6)));
4837 }
4838
4839 #[tokio::test]
4840 async fn axon_three_step_chain_executes_in_order() {
4841 let mut bus = Bus::new();
4842 let axon = Axon::<i32, i32, TestInfallible>::start("ThreeStep")
4843 .then(AddOne)
4844 .then(MultiplyByTwo)
4845 .then(AddTen);
4846
4847 let outcome = axon.execute(5, &(), &mut bus).await;
4849 assert!(matches!(outcome, Outcome::Next(22)));
4850 }
4851
4852 #[tokio::test]
4853 async fn axon_with_fault_in_middle_step_propagates_error() {
4854 let mut bus = Bus::new();
4855
4856 let axon = Axon::<i32, i32, String>::start("FaultInMiddle")
4861 .then(AddOneString)
4862 .then(AlwaysFault)
4863 .then(AddTenString);
4864
4865 let outcome = axon.execute(5, &(), &mut bus).await;
4866 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
4867 }
4868
4869 #[tokio::test]
4870 async fn fault_injects_transition_error_context_into_bus() {
4871 let mut bus = Bus::new();
4872
4873 let axon = Axon::<i32, i32, String>::start("my-pipeline")
4875 .then(AddOneString)
4876 .then(AlwaysFault)
4877 .then(AddTenString);
4878
4879 let outcome = axon.execute(5, &(), &mut bus).await;
4880 assert!(matches!(outcome, Outcome::Fault(_)));
4881
4882 let ctx = bus
4883 .read::<ranvier_core::error::TransitionErrorContext>()
4884 .expect("TransitionErrorContext should be in Bus after fault");
4885 assert_eq!(ctx.pipeline_name, "my-pipeline");
4886 assert_eq!(ctx.transition_name, "AlwaysFault");
4887 assert_eq!(ctx.step_index, 2); }
4889
4890 #[test]
4891 fn axon_schematic_has_correct_node_count_after_chaining() {
4892 let axon = Axon::<i32, i32, TestInfallible>::start("NodeCount")
4893 .then(AddOne)
4894 .then(MultiplyByTwo)
4895 .then(AddTen);
4896
4897 assert_eq!(axon.schematic.nodes.len(), 4);
4899 assert_eq!(axon.schematic.name, "NodeCount");
4900 }
4901
4902 #[tokio::test]
4903 async fn axon_execution_records_timeline_events() {
4904 let mut bus = Bus::new();
4905 bus.insert(Timeline::new());
4906
4907 let axon = Axon::<i32, i32, TestInfallible>::start("TimelineTest")
4908 .then(AddOne)
4909 .then(MultiplyByTwo);
4910
4911 let outcome = axon.execute(3, &(), &mut bus).await;
4912 assert!(matches!(outcome, Outcome::Next(8))); let timeline = bus.read::<Timeline>().unwrap();
4915
4916 let enter_count = timeline
4918 .events
4919 .iter()
4920 .filter(|e| matches!(e, TimelineEvent::NodeEnter { .. }))
4921 .count();
4922 let exit_count = timeline
4923 .events
4924 .iter()
4925 .filter(|e| matches!(e, TimelineEvent::NodeExit { .. }))
4926 .count();
4927
4928 assert!(enter_count >= 1, "Should have at least 1 NodeEnter event");
4930 assert!(exit_count >= 1, "Should have at least 1 NodeExit event");
4931 }
4932
4933 #[tokio::test]
4936 async fn parallel_all_succeed_returns_first_next() {
4937 use super::ParallelStrategy;
4938
4939 let mut bus = Bus::new();
4940 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelAllSucceed")
4941 .parallel(
4942 vec![
4943 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4944 Arc::new(MultiplyByTwo),
4945 ],
4946 ParallelStrategy::AllMustSucceed,
4947 );
4948
4949 let outcome = axon.execute(5, &(), &mut bus).await;
4952 assert!(matches!(outcome, Outcome::Next(6)));
4953 }
4954
4955 #[tokio::test]
4956 async fn parallel_all_must_succeed_returns_fault_when_any_fails() {
4957 use super::ParallelStrategy;
4958
4959 let mut bus = Bus::new();
4960 let axon = Axon::<i32, i32, String>::start("ParallelAllFault")
4961 .parallel(
4962 vec![
4963 Arc::new(AddOneString) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4964 Arc::new(AlwaysFault),
4965 ],
4966 ParallelStrategy::AllMustSucceed,
4967 );
4968
4969 let outcome = axon.execute(5, &(), &mut bus).await;
4970 assert!(
4971 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4972 "Expected Fault(boom), got {:?}",
4973 outcome
4974 );
4975 }
4976
4977 #[tokio::test]
4978 async fn parallel_any_can_fail_returns_success_despite_fault() {
4979 use super::ParallelStrategy;
4980
4981 let mut bus = Bus::new();
4982 let axon = Axon::<i32, i32, String>::start("ParallelAnyCanFail")
4983 .parallel(
4984 vec![
4985 Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4986 Arc::new(AddOneString),
4987 ],
4988 ParallelStrategy::AnyCanFail,
4989 );
4990
4991 let outcome = axon.execute(5, &(), &mut bus).await;
4993 assert!(
4994 matches!(outcome, Outcome::Next(6)),
4995 "Expected Next(6), got {:?}",
4996 outcome
4997 );
4998 }
4999
5000 #[tokio::test]
5001 async fn parallel_any_can_fail_all_fault_returns_first_fault() {
5002 use super::ParallelStrategy;
5003
5004 #[derive(Clone)]
5005 struct AlwaysFault2;
5006 #[async_trait]
5007 impl Transition<i32, i32> for AlwaysFault2 {
5008 type Error = String;
5009 type Resources = ();
5010 async fn run(&self, _state: i32, _resources: &(), _bus: &mut Bus) -> Outcome<i32, String> {
5011 Outcome::Fault("boom2".to_string())
5012 }
5013 }
5014
5015 let mut bus = Bus::new();
5016 let axon = Axon::<i32, i32, String>::start("ParallelAllFault2")
5017 .parallel(
5018 vec![
5019 Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
5020 Arc::new(AlwaysFault2),
5021 ],
5022 ParallelStrategy::AnyCanFail,
5023 );
5024
5025 let outcome = axon.execute(5, &(), &mut bus).await;
5026 assert!(
5028 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
5029 "Expected Fault(boom), got {:?}",
5030 outcome
5031 );
5032 }
5033
5034 #[test]
5035 fn parallel_schematic_has_fanout_fanin_nodes() {
5036 use super::ParallelStrategy;
5037 use ranvier_core::schematic::{EdgeType, NodeKind};
5038
5039 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelSchematic")
5040 .parallel(
5041 vec![
5042 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
5043 Arc::new(MultiplyByTwo),
5044 Arc::new(AddTen),
5045 ],
5046 ParallelStrategy::AllMustSucceed,
5047 );
5048
5049 assert_eq!(axon.schematic.nodes.len(), 6);
5051 assert!(matches!(axon.schematic.nodes[1].kind, NodeKind::FanOut));
5052 assert!(matches!(axon.schematic.nodes[2].kind, NodeKind::Atom));
5053 assert!(matches!(axon.schematic.nodes[3].kind, NodeKind::Atom));
5054 assert!(matches!(axon.schematic.nodes[4].kind, NodeKind::Atom));
5055 assert!(matches!(axon.schematic.nodes[5].kind, NodeKind::FanIn));
5056
5057 assert!(axon.schematic.nodes[1]
5059 .description
5060 .as_ref()
5061 .unwrap()
5062 .contains("3 branches"));
5063
5064 let parallel_edges: Vec<_> = axon
5066 .schematic
5067 .edges
5068 .iter()
5069 .filter(|e| matches!(e.kind, EdgeType::Parallel))
5070 .collect();
5071 assert_eq!(parallel_edges.len(), 6);
5073 }
5074
5075 #[tokio::test]
5076 async fn parallel_then_chain_composes_correctly() {
5077 use super::ParallelStrategy;
5078
5079 let mut bus = Bus::new();
5080 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelThenChain")
5081 .then(AddOne)
5082 .parallel(
5083 vec![
5084 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
5085 Arc::new(MultiplyByTwo),
5086 ],
5087 ParallelStrategy::AllMustSucceed,
5088 )
5089 .then(AddTen);
5090
5091 let outcome = axon.execute(5, &(), &mut bus).await;
5093 assert!(
5094 matches!(outcome, Outcome::Next(17)),
5095 "Expected Next(17), got {:?}",
5096 outcome
5097 );
5098 }
5099
5100 #[tokio::test]
5101 async fn parallel_records_timeline_events() {
5102 use super::ParallelStrategy;
5103 use ranvier_core::timeline::TimelineEvent;
5104
5105 let mut bus = Bus::new();
5106 bus.insert(Timeline::new());
5107
5108 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelTimeline")
5109 .parallel(
5110 vec![
5111 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
5112 Arc::new(MultiplyByTwo),
5113 ],
5114 ParallelStrategy::AllMustSucceed,
5115 );
5116
5117 let outcome = axon.execute(3, &(), &mut bus).await;
5118 assert!(matches!(outcome, Outcome::Next(4)));
5119
5120 let timeline = bus.read::<Timeline>().unwrap();
5121
5122 let fanout_enters = timeline
5124 .events
5125 .iter()
5126 .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanOut"))
5127 .count();
5128 let fanin_enters = timeline
5129 .events
5130 .iter()
5131 .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanIn"))
5132 .count();
5133
5134 assert_eq!(fanout_enters, 1, "Should have 1 FanOut enter");
5135 assert_eq!(fanin_enters, 1, "Should have 1 FanIn enter");
5136 }
5137
5138 #[derive(Clone)]
5141 struct Greet;
5142
5143 #[async_trait]
5144 impl Transition<(), String> for Greet {
5145 type Error = String;
5146 type Resources = ();
5147
5148 async fn run(
5149 &self,
5150 _state: (),
5151 _resources: &Self::Resources,
5152 _bus: &mut Bus,
5153 ) -> Outcome<String, Self::Error> {
5154 Outcome::Next("Hello from simple!".to_string())
5155 }
5156 }
5157
5158 #[tokio::test]
5159 async fn axon_simple_creates_pipeline() {
5160 let axon = Axon::simple::<String>("SimpleTest").then(Greet);
5161
5162 let mut bus = Bus::new();
5163 let result = axon.execute((), &(), &mut bus).await;
5164
5165 match result {
5166 Outcome::Next(msg) => assert_eq!(msg, "Hello from simple!"),
5167 other => panic!("Expected Outcome::Next, got {:?}", other),
5168 }
5169 }
5170
5171 #[tokio::test]
5172 async fn axon_simple_equivalent_to_explicit() {
5173 let simple = Axon::simple::<String>("Equiv").then(Greet);
5175 let explicit = Axon::<(), (), String>::new("Equiv").then(Greet);
5176
5177 let mut bus1 = Bus::new();
5178 let mut bus2 = Bus::new();
5179
5180 let r1 = simple.execute((), &(), &mut bus1).await;
5181 let r2 = explicit.execute((), &(), &mut bus2).await;
5182
5183 match (r1, r2) {
5184 (Outcome::Next(a), Outcome::Next(b)) => assert_eq!(a, b),
5185 _ => panic!("Both should produce Outcome::Next"),
5186 }
5187 }
5188
5189 #[tokio::test]
5190 async fn then_fn_closure_transition() {
5191 let axon = Axon::simple::<String>("ClosureTest")
5192 .then_fn("to_greeting", |_input: (), _bus: &mut Bus| {
5193 Outcome::next("hello from closure".to_string())
5194 });
5195
5196 let mut bus = Bus::new();
5197 let result = axon.execute((), &(), &mut bus).await;
5198
5199 match result {
5200 Outcome::Next(msg) => assert_eq!(msg, "hello from closure"),
5201 other => panic!("Expected Outcome::Next, got {:?}", other),
5202 }
5203 }
5204
5205 #[tokio::test]
5206 async fn then_fn_reads_bus() {
5207 let axon = Axon::simple::<String>("BusReadClosure")
5208 .then_fn("check_score", |_input: (), bus: &mut Bus| {
5209 let score = bus.read::<u32>().copied().unwrap_or(0);
5210 if score > 75 {
5211 Outcome::next("REJECTED".to_string())
5212 } else {
5213 Outcome::next("APPROVED".to_string())
5214 }
5215 });
5216
5217 let mut bus = Bus::new();
5218 bus.insert(80u32);
5219 let result = axon.execute((), &(), &mut bus).await;
5220 match result {
5221 Outcome::Next(msg) => assert_eq!(msg, "REJECTED"),
5222 other => panic!("Expected REJECTED, got {:?}", other),
5223 }
5224 }
5225
5226 #[tokio::test]
5227 async fn then_fn_mixed_with_transition() {
5228 let axon = Axon::simple::<String>("MixedPipeline")
5230 .then(Greet)
5231 .then_fn("uppercase", |input: String, _bus: &mut Bus| {
5232 Outcome::next(input.to_uppercase())
5233 });
5234
5235 let mut bus = Bus::new();
5236 let result = axon.execute((), &(), &mut bus).await;
5237 match result {
5238 Outcome::Next(msg) => assert_eq!(msg, "HELLO FROM SIMPLE!"),
5239 other => panic!("Expected uppercase greeting, got {:?}", other),
5240 }
5241 }
5242
5243 #[tokio::test]
5244 async fn then_fn_schematic_label() {
5245 let axon = Axon::simple::<String>("SchematicTest")
5246 .then_fn("my_custom_label", |_: (), _: &mut Bus| {
5247 Outcome::next("ok".to_string())
5248 });
5249
5250 assert_eq!(axon.schematic.nodes.len(), 2);
5252 assert_eq!(axon.schematic.nodes[1].label, "my_custom_label");
5253 }
5254}