1use crate::persistence::{
15 CompensationAutoTrigger, CompensationContext, CompensationHandle,
16 CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17 PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28 BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45#[derive(Clone)]
47pub enum ExecutionMode {
48 Local,
50 Singleton {
52 lock_key: String,
53 ttl_ms: u64,
54 lock_provider: Arc<dyn DistributedLock>,
55 },
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum ParallelStrategy {
63 AllMustSucceed,
65 AnyCanFail,
68}
69
70pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
72
73pub type Executor<In, Out, E, Res> =
77 Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
78
79#[derive(Debug, Clone)]
81pub struct ManualJump {
82 pub target_node: String,
83 pub payload_override: Option<serde_json::Value>,
84}
85
86#[derive(Debug, Clone, Copy)]
88struct StartStep(u64);
89
90#[derive(Debug, Clone)]
92struct ResumptionState {
93 payload: Option<serde_json::Value>,
94}
95
96fn type_name_of<T: ?Sized>() -> String {
98 let full = type_name::<T>();
99 full.split("::").last().unwrap_or(full).to_string()
100}
101
102pub struct Axon<In, Out, E, Res = ()> {
122 pub schematic: Schematic,
124 executor: Executor<In, Out, E, Res>,
126 pub execution_mode: ExecutionMode,
128 pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
130 pub audit_sink: Option<Arc<dyn AuditSink>>,
132 pub dlq_sink: Option<Arc<dyn DlqSink>>,
134 pub dlq_policy: DlqPolicy,
136 pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
138 pub saga_policy: SagaPolicy,
140 pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
142 pub saga_compensation_registry:
144 Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
145 pub iam_handle: Option<ranvier_core::iam::IamHandle>,
147}
148
149#[derive(Debug, Clone)]
151pub struct SchematicExportRequest {
152 pub output: Option<PathBuf>,
154}
155
156impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
157 fn clone(&self) -> Self {
158 Self {
159 schematic: self.schematic.clone(),
160 executor: self.executor.clone(),
161 execution_mode: self.execution_mode.clone(),
162 persistence_store: self.persistence_store.clone(),
163 audit_sink: self.audit_sink.clone(),
164 dlq_sink: self.dlq_sink.clone(),
165 dlq_policy: self.dlq_policy.clone(),
166 dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
167 saga_policy: self.saga_policy.clone(),
168 dynamic_saga_policy: self.dynamic_saga_policy.clone(),
169 saga_compensation_registry: self.saga_compensation_registry.clone(),
170 iam_handle: self.iam_handle.clone(),
171 }
172 }
173}
174
175impl<In, E, Res> Axon<In, In, E, Res>
176where
177 In: Send + Sync + Serialize + DeserializeOwned + 'static,
178 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
179 Res: ranvier_core::transition::ResourceRequirement,
180{
181 #[track_caller]
184 pub fn new(label: &str) -> Self {
185 let caller = Location::caller();
186 Self::start_with_source(label, caller)
187 }
188
189 #[track_caller]
192 pub fn start(label: &str) -> Self {
193 let caller = Location::caller();
194 Self::start_with_source(label, caller)
195 }
196
197 fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
198 let node_id = uuid::Uuid::new_v4().to_string();
199 let node = Node {
200 id: node_id,
201 kind: NodeKind::Ingress,
202 label: label.to_string(),
203 description: None,
204 input_type: "void".to_string(),
205 output_type: type_name_of::<In>(),
206 resource_type: type_name_of::<Res>(),
207 metadata: Default::default(),
208 bus_capability: None,
209 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
210 position: None,
211 compensation_node_id: None,
212 input_schema: None,
213 output_schema: None,
214 };
215
216 let mut schematic = Schematic::new(label);
217 schematic.nodes.push(node);
218
219 let executor: Executor<In, In, E, Res> =
220 Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
221
222 Self {
223 schematic,
224 executor,
225 execution_mode: ExecutionMode::Local,
226 persistence_store: None,
227 audit_sink: None,
228 dlq_sink: None,
229 dlq_policy: DlqPolicy::default(),
230 dynamic_dlq_policy: None,
231 saga_policy: SagaPolicy::default(),
232 dynamic_saga_policy: None,
233 saga_compensation_registry: Arc::new(std::sync::RwLock::new(
234 ranvier_core::saga::SagaCompensationRegistry::new(),
235 )),
236 iam_handle: None,
237 }
238 }
239}
240
241impl<In, Out, E, Res> Axon<In, Out, E, Res>
242where
243 In: Send + Sync + Serialize + DeserializeOwned + 'static,
244 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
245 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
246 Res: ranvier_core::transition::ResourceRequirement,
247{
248 pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
250 self.execution_mode = mode;
251 self
252 }
253
254 pub fn with_version(mut self, version: impl Into<String>) -> Self {
256 self.schematic.schema_version = version.into();
257 self
258 }
259
260 pub fn with_persistence_store<S>(mut self, store: S) -> Self
262 where
263 S: crate::persistence::PersistenceStore + 'static,
264 {
265 self.persistence_store = Some(Arc::new(store));
266 self
267 }
268
269 pub fn with_audit_sink<S>(mut self, sink: S) -> Self
271 where
272 S: AuditSink + 'static,
273 {
274 self.audit_sink = Some(Arc::new(sink));
275 self
276 }
277
278 pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
280 where
281 S: DlqSink + 'static,
282 {
283 self.dlq_sink = Some(Arc::new(sink));
284 self
285 }
286
287 pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
289 self.dlq_policy = policy;
290 self
291 }
292
293 pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
295 self.saga_policy = policy;
296 self
297 }
298
299 pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
302 self.dynamic_dlq_policy = Some(dynamic);
303 self
304 }
305
306 pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
309 self.dynamic_saga_policy = Some(dynamic);
310 self
311 }
312
313 pub fn with_iam(
321 mut self,
322 policy: ranvier_core::iam::IamPolicy,
323 verifier: impl ranvier_core::iam::IamVerifier + 'static,
324 ) -> Self {
325 self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
326 policy,
327 Arc::new(verifier),
328 ));
329 self
330 }
331
332 #[cfg(feature = "schema")]
343 pub fn with_input_schema<T>(mut self) -> Self
344 where
345 T: schemars::JsonSchema,
346 {
347 if let Some(last_node) = self.schematic.nodes.last_mut() {
348 let schema = schemars::schema_for!(T);
349 last_node.input_schema =
350 Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
351 }
352 self
353 }
354
355 #[cfg(feature = "schema")]
359 pub fn with_output_schema<T>(mut self) -> Self
360 where
361 T: schemars::JsonSchema,
362 {
363 if let Some(last_node) = self.schematic.nodes.last_mut() {
364 let schema = schemars::schema_for!(T);
365 last_node.output_schema =
366 Some(serde_json::to_value(schema).unwrap_or(serde_json::Value::Null));
367 }
368 self
369 }
370
371 pub fn with_input_schema_value(mut self, schema: serde_json::Value) -> Self {
375 if let Some(last_node) = self.schematic.nodes.last_mut() {
376 last_node.input_schema = Some(schema);
377 }
378 self
379 }
380
381 pub fn with_output_schema_value(mut self, schema: serde_json::Value) -> Self {
383 if let Some(last_node) = self.schematic.nodes.last_mut() {
384 last_node.output_schema = Some(schema);
385 }
386 self
387 }
388}
389
390#[async_trait]
391impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
392where
393 In: Send + Sync + Serialize + DeserializeOwned + 'static,
394 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
395 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
396 Res: ranvier_core::transition::ResourceRequirement,
397{
398 async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
399 let store = self.persistence_store.as_ref()?;
400 let trace = store.load(trace_id).await.ok().flatten()?;
401 Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
402 }
403
404 async fn force_resume(
405 &self,
406 trace_id: &str,
407 target_node: &str,
408 payload_override: Option<Value>,
409 ) -> Result<(), String> {
410 let store = self
411 .persistence_store
412 .as_ref()
413 .ok_or("No persistence store attached to Axon")?;
414
415 let intervention = crate::persistence::Intervention {
416 target_node: target_node.to_string(),
417 payload_override,
418 timestamp_ms: now_ms(),
419 };
420
421 store
422 .save_intervention(trace_id, intervention)
423 .await
424 .map_err(|e| format!("Failed to save intervention: {}", e))?;
425
426 if let Some(sink) = self.audit_sink.as_ref() {
427 let event = AuditEvent::new(
428 uuid::Uuid::new_v4().to_string(),
429 "Inspector".to_string(),
430 "ForceResume".to_string(),
431 trace_id.to_string(),
432 )
433 .with_metadata("target_node", target_node);
434
435 let _ = sink.append(&event).await;
436 }
437
438 tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
439 Ok(())
440 }
441}
442
443impl<In, Out, E, Res> Axon<In, Out, E, Res>
444where
445 In: Send + Sync + Serialize + DeserializeOwned + 'static,
446 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
447 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
448 Res: ranvier_core::transition::ResourceRequirement,
449{
450 #[track_caller]
454 pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
455 where
456 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
457 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
458 {
459 let caller = Location::caller();
460 let Axon {
462 mut schematic,
463 executor: prev_executor,
464 execution_mode,
465 persistence_store,
466 audit_sink,
467 dlq_sink,
468 dlq_policy,
469 dynamic_dlq_policy,
470 saga_policy,
471 dynamic_saga_policy,
472 saga_compensation_registry,
473 iam_handle,
474 } = self;
475
476 let next_node_id = uuid::Uuid::new_v4().to_string();
478 let next_node = Node {
479 id: next_node_id.clone(),
480 kind: NodeKind::Atom,
481 label: transition.label(),
482 description: transition.description(),
483 input_type: type_name_of::<Out>(),
484 output_type: type_name_of::<Next>(),
485 resource_type: type_name_of::<Res>(),
486 metadata: Default::default(),
487 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
488 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
489 position: transition
490 .position()
491 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
492 compensation_node_id: None,
493 input_schema: transition.input_schema(),
494 output_schema: None,
495 };
496
497 let last_node_id = schematic
498 .nodes
499 .last()
500 .map(|n| n.id.clone())
501 .unwrap_or_default();
502
503 schematic.nodes.push(next_node);
504 schematic.edges.push(Edge {
505 from: last_node_id,
506 to: next_node_id.clone(),
507 kind: EdgeType::Linear,
508 label: Some("Next".to_string()),
509 });
510
511 let node_id_for_exec = next_node_id.clone();
513 let node_label_for_exec = transition.label();
514 let bus_policy_for_exec = transition.bus_access_policy();
515 let bus_policy_clone = bus_policy_for_exec.clone();
516 let current_step_idx = schematic.nodes.len() as u64 - 1;
517 let next_executor: Executor<In, Next, E, Res> = Arc::new(
518 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
519 let prev = prev_executor.clone();
520 let trans = transition.clone();
521 let timeline_node_id = node_id_for_exec.clone();
522 let timeline_node_label = node_label_for_exec.clone();
523 let transition_bus_policy = bus_policy_clone.clone();
524 let step_idx = current_step_idx;
525
526 Box::pin(async move {
527 if let Some(jump) = bus.read::<ManualJump>()
529 && (jump.target_node == timeline_node_id
530 || jump.target_node == timeline_node_label)
531 {
532 tracing::info!(
533 node_id = %timeline_node_id,
534 node_label = %timeline_node_label,
535 "Manual jump target reached; skipping previous steps"
536 );
537
538 let state = if let Some(ow) = jump.payload_override.clone() {
539 match serde_json::from_value::<Out>(ow) {
540 Ok(s) => s,
541 Err(e) => {
542 tracing::error!(
543 "Payload override deserialization failed: {}",
544 e
545 );
546 return Outcome::emit(
547 "execution.jump.payload_error",
548 Some(serde_json::json!({"error": e.to_string()})),
549 )
550 .map(|_: ()| unreachable!());
551 }
552 }
553 } else {
554 return Outcome::emit(
558 "execution.jump.missing_payload",
559 Some(serde_json::json!({"node_id": timeline_node_id})),
560 );
561 };
562
563 return run_this_step::<Out, Next, E, Res>(
565 &trans,
566 state,
567 res,
568 bus,
569 &timeline_node_id,
570 &timeline_node_label,
571 &transition_bus_policy,
572 step_idx,
573 )
574 .await;
575 }
576
577 if let Some(start) = bus.read::<StartStep>()
579 && step_idx == start.0
580 && bus.read::<ResumptionState>().is_some()
581 {
582 let fresh_state = serde_json::to_value(&input)
586 .ok()
587 .and_then(|v| serde_json::from_value::<Out>(v).ok());
588 let persisted_state = bus
589 .read::<ResumptionState>()
590 .and_then(|r| r.payload.clone())
591 .and_then(|p| serde_json::from_value::<Out>(p).ok());
592
593 if let Some(s) = fresh_state.or(persisted_state) {
594 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
595 return run_this_step::<Out, Next, E, Res>(
596 &trans,
597 s,
598 res,
599 bus,
600 &timeline_node_id,
601 &timeline_node_label,
602 &transition_bus_policy,
603 step_idx,
604 )
605 .await;
606 }
607
608 return Outcome::emit(
609 "execution.resumption.payload_error",
610 Some(serde_json::json!({"error": "no compatible resumption state"})),
611 )
612 .map(|_: ()| unreachable!());
613 }
614
615 let prev_result = prev(input, res, bus).await;
617
618 let state = match prev_result {
620 Outcome::Next(t) => t,
621 other => return other.map(|_| unreachable!()),
622 };
623
624 run_this_step::<Out, Next, E, Res>(
625 &trans,
626 state,
627 res,
628 bus,
629 &timeline_node_id,
630 &timeline_node_label,
631 &transition_bus_policy,
632 step_idx,
633 )
634 .await
635 })
636 },
637 );
638 Axon {
639 schematic,
640 executor: next_executor,
641 execution_mode,
642 persistence_store,
643 audit_sink,
644 dlq_sink,
645 dlq_policy,
646 dynamic_dlq_policy,
647 saga_policy,
648 dynamic_saga_policy,
649 saga_compensation_registry,
650 iam_handle,
651 }
652 }
653
654 #[track_caller]
670 pub fn then_with_retry<Next, Trans>(
671 self,
672 transition: Trans,
673 policy: crate::retry::RetryPolicy,
674 ) -> Axon<In, Next, E, Res>
675 where
676 Out: Clone,
677 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
678 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
679 {
680 let caller = Location::caller();
681 let Axon {
682 mut schematic,
683 executor: prev_executor,
684 execution_mode,
685 persistence_store,
686 audit_sink,
687 dlq_sink,
688 dlq_policy,
689 dynamic_dlq_policy,
690 saga_policy,
691 dynamic_saga_policy,
692 saga_compensation_registry,
693 iam_handle,
694 } = self;
695
696 let next_node_id = uuid::Uuid::new_v4().to_string();
697 let next_node = Node {
698 id: next_node_id.clone(),
699 kind: NodeKind::Atom,
700 label: transition.label(),
701 description: transition.description(),
702 input_type: type_name_of::<Out>(),
703 output_type: type_name_of::<Next>(),
704 resource_type: type_name_of::<Res>(),
705 metadata: Default::default(),
706 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
707 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
708 position: transition
709 .position()
710 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
711 compensation_node_id: None,
712 input_schema: transition.input_schema(),
713 output_schema: None,
714 };
715
716 let last_node_id = schematic
717 .nodes
718 .last()
719 .map(|n| n.id.clone())
720 .unwrap_or_default();
721
722 schematic.nodes.push(next_node);
723 schematic.edges.push(Edge {
724 from: last_node_id,
725 to: next_node_id.clone(),
726 kind: EdgeType::Linear,
727 label: Some("Next (retryable)".to_string()),
728 });
729
730 let node_id_for_exec = next_node_id.clone();
731 let node_label_for_exec = transition.label();
732 let bus_policy_for_exec = transition.bus_access_policy();
733 let bus_policy_clone = bus_policy_for_exec.clone();
734 let current_step_idx = schematic.nodes.len() as u64 - 1;
735 let next_executor: Executor<In, Next, E, Res> = Arc::new(
736 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
737 let prev = prev_executor.clone();
738 let trans = transition.clone();
739 let timeline_node_id = node_id_for_exec.clone();
740 let timeline_node_label = node_label_for_exec.clone();
741 let transition_bus_policy = bus_policy_clone.clone();
742 let step_idx = current_step_idx;
743 let retry_policy = policy.clone();
744
745 Box::pin(async move {
746 let prev_result = prev(input, res, bus).await;
748 let state = match prev_result {
749 Outcome::Next(t) => t,
750 other => return other.map(|_| unreachable!()),
751 };
752
753 let mut last_result = None;
755 for attempt in 0..=retry_policy.max_retries {
756 let attempt_state = state.clone();
757
758 let result = run_this_step::<Out, Next, E, Res>(
759 &trans,
760 attempt_state,
761 res,
762 bus,
763 &timeline_node_id,
764 &timeline_node_label,
765 &transition_bus_policy,
766 step_idx,
767 )
768 .await;
769
770 match &result {
771 Outcome::Next(_) => return result,
772 Outcome::Fault(_) if attempt < retry_policy.max_retries => {
773 let delay = retry_policy.delay_for_attempt(attempt);
774 tracing::warn!(
775 node_id = %timeline_node_id,
776 attempt = attempt + 1,
777 max = retry_policy.max_retries,
778 delay_ms = delay.as_millis() as u64,
779 "Transition failed, retrying"
780 );
781 if let Some(timeline) = bus.read_mut::<Timeline>() {
782 timeline.push(TimelineEvent::NodeRetry {
783 node_id: timeline_node_id.clone(),
784 attempt: attempt + 1,
785 max_attempts: retry_policy.max_retries,
786 backoff_ms: delay.as_millis() as u64,
787 timestamp: now_ms(),
788 });
789 }
790 tokio::time::sleep(delay).await;
791 }
792 _ => {
793 last_result = Some(result);
794 break;
795 }
796 }
797 }
798
799 last_result.unwrap_or_else(|| {
800 Outcome::emit("execution.retry.exhausted", None)
801 })
802 })
803 },
804 );
805 Axon {
806 schematic,
807 executor: next_executor,
808 execution_mode,
809 persistence_store,
810 audit_sink,
811 dlq_sink,
812 dlq_policy,
813 dynamic_dlq_policy,
814 saga_policy,
815 dynamic_saga_policy,
816 saga_compensation_registry,
817 iam_handle,
818 }
819 }
820
821 #[track_caller]
826 pub fn then_compensated<Next, Trans, Comp>(
827 self,
828 transition: Trans,
829 compensation: Comp,
830 ) -> Axon<In, Next, E, Res>
831 where
832 Out: Clone,
833 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
834 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
835 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
836 {
837 let caller = Location::caller();
838 let Axon {
839 mut schematic,
840 executor: prev_executor,
841 execution_mode,
842 persistence_store,
843 audit_sink,
844 dlq_sink,
845 dlq_policy,
846 dynamic_dlq_policy,
847 saga_policy,
848 dynamic_saga_policy,
849 saga_compensation_registry,
850 iam_handle,
851 } = self;
852
853 let next_node_id = uuid::Uuid::new_v4().to_string();
855 let comp_node_id = uuid::Uuid::new_v4().to_string();
856
857 let next_node = Node {
858 id: next_node_id.clone(),
859 kind: NodeKind::Atom,
860 label: transition.label(),
861 description: transition.description(),
862 input_type: type_name_of::<Out>(),
863 output_type: type_name_of::<Next>(),
864 resource_type: type_name_of::<Res>(),
865 metadata: Default::default(),
866 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
867 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
868 position: transition
869 .position()
870 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
871 compensation_node_id: Some(comp_node_id.clone()),
872 input_schema: None,
873 output_schema: None,
874 };
875
876 let comp_node = Node {
878 id: comp_node_id.clone(),
879 kind: NodeKind::Atom,
880 label: format!("Compensate: {}", compensation.label()),
881 description: compensation.description(),
882 input_type: type_name_of::<Out>(),
883 output_type: "void".to_string(),
884 resource_type: type_name_of::<Res>(),
885 metadata: Default::default(),
886 bus_capability: None,
887 source_location: None,
888 position: compensation
889 .position()
890 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
891 compensation_node_id: None,
892 input_schema: None,
893 output_schema: None,
894 };
895
896 let last_node_id = schematic
897 .nodes
898 .last()
899 .map(|n| n.id.clone())
900 .unwrap_or_default();
901
902 schematic.nodes.push(next_node);
903 schematic.nodes.push(comp_node);
904 schematic.edges.push(Edge {
905 from: last_node_id,
906 to: next_node_id.clone(),
907 kind: EdgeType::Linear,
908 label: Some("Next".to_string()),
909 });
910
911 let node_id_for_exec = next_node_id.clone();
913 let comp_id_for_exec = comp_node_id.clone();
914 let node_label_for_exec = transition.label();
915 let bus_policy_for_exec = transition.bus_access_policy();
916 let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
917 let comp_for_exec = compensation.clone();
918 let bus_policy_for_executor = bus_policy_for_exec.clone();
919 let bus_policy_for_registry = bus_policy_for_exec.clone();
920 let next_executor: Executor<In, Next, E, Res> = Arc::new(
921 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
922 let prev = prev_executor.clone();
923 let trans = transition.clone();
924 let comp = comp_for_exec.clone();
925 let timeline_node_id = node_id_for_exec.clone();
926 let timeline_comp_id = comp_id_for_exec.clone();
927 let timeline_node_label = node_label_for_exec.clone();
928 let transition_bus_policy = bus_policy_for_executor.clone();
929 let step_idx = step_idx_for_exec;
930
931 Box::pin(async move {
932 if let Some(jump) = bus.read::<ManualJump>()
934 && (jump.target_node == timeline_node_id
935 || jump.target_node == timeline_node_label)
936 {
937 tracing::info!(
938 node_id = %timeline_node_id,
939 node_label = %timeline_node_label,
940 "Manual jump target reached (compensated); skipping previous steps"
941 );
942
943 let state = if let Some(ow) = jump.payload_override.clone() {
944 match serde_json::from_value::<Out>(ow) {
945 Ok(s) => s,
946 Err(e) => {
947 tracing::error!(
948 "Payload override deserialization failed: {}",
949 e
950 );
951 return Outcome::emit(
952 "execution.jump.payload_error",
953 Some(serde_json::json!({"error": e.to_string()})),
954 )
955 .map(|_: ()| unreachable!());
956 }
957 }
958 } else {
959 return Outcome::emit(
960 "execution.jump.missing_payload",
961 Some(serde_json::json!({"node_id": timeline_node_id})),
962 )
963 .map(|_: ()| unreachable!());
964 };
965
966 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
968 &trans,
969 &comp,
970 state,
971 res,
972 bus,
973 &timeline_node_id,
974 &timeline_comp_id,
975 &timeline_node_label,
976 &transition_bus_policy,
977 step_idx,
978 )
979 .await;
980 }
981
982 if let Some(start) = bus.read::<StartStep>()
984 && step_idx == start.0
985 && bus.read::<ResumptionState>().is_some()
986 {
987 let fresh_state = serde_json::to_value(&input)
988 .ok()
989 .and_then(|v| serde_json::from_value::<Out>(v).ok());
990 let persisted_state = bus
991 .read::<ResumptionState>()
992 .and_then(|r| r.payload.clone())
993 .and_then(|p| serde_json::from_value::<Out>(p).ok());
994
995 if let Some(s) = fresh_state.or(persisted_state) {
996 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
997 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
998 &trans,
999 &comp,
1000 s,
1001 res,
1002 bus,
1003 &timeline_node_id,
1004 &timeline_comp_id,
1005 &timeline_node_label,
1006 &transition_bus_policy,
1007 step_idx,
1008 )
1009 .await;
1010 }
1011
1012 return Outcome::emit(
1013 "execution.resumption.payload_error",
1014 Some(serde_json::json!({"error": "no compatible resumption state"})),
1015 )
1016 .map(|_: ()| unreachable!());
1017 }
1018
1019 let prev_result = prev(input, res, bus).await;
1021
1022 let state = match prev_result {
1024 Outcome::Next(t) => t,
1025 other => return other.map(|_| unreachable!()),
1026 };
1027
1028 run_this_compensated_step::<Out, Next, E, Res, Comp>(
1029 &trans,
1030 &comp,
1031 state,
1032 res,
1033 bus,
1034 &timeline_node_id,
1035 &timeline_comp_id,
1036 &timeline_node_label,
1037 &transition_bus_policy,
1038 step_idx,
1039 )
1040 .await
1041 })
1042 },
1043 );
1044 {
1046 let mut registry = saga_compensation_registry.write().expect("saga compensation registry lock poisoned");
1047 let comp_fn = compensation.clone();
1048 let transition_bus_policy = bus_policy_for_registry.clone();
1049
1050 let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
1051 Arc::new(move |input_data, res, bus| {
1052 let comp = comp_fn.clone();
1053 let bus_policy = transition_bus_policy.clone();
1054 Box::pin(async move {
1055 let input: Out = serde_json::from_slice(&input_data).expect("saga compensation input deserialization failed — type mismatch between snapshot and compensation handler");
1056 bus.set_access_policy(comp.label(), bus_policy);
1057 let res = comp.run(input, res, bus).await;
1058 bus.clear_access_policy();
1059 res
1060 })
1061 });
1062 registry.register(next_node_id.clone(), handler);
1063 }
1064
1065 Axon {
1066 schematic,
1067 executor: next_executor,
1068 execution_mode,
1069 persistence_store,
1070 audit_sink,
1071 dlq_sink,
1072 dlq_policy,
1073 dynamic_dlq_policy,
1074 saga_policy,
1075 dynamic_saga_policy,
1076 saga_compensation_registry,
1077 iam_handle,
1078 }
1079 }
1080
1081 #[track_caller]
1084 pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
1085 where
1086 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
1087 {
1088 let caller = Location::caller();
1091 let comp_node_id = uuid::Uuid::new_v4().to_string();
1092
1093 let comp_node = Node {
1094 id: comp_node_id.clone(),
1095 kind: NodeKind::Atom,
1096 label: transition.label(),
1097 description: transition.description(),
1098 input_type: type_name_of::<Out>(),
1099 output_type: "void".to_string(),
1100 resource_type: type_name_of::<Res>(),
1101 metadata: Default::default(),
1102 bus_capability: None,
1103 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1104 position: transition
1105 .position()
1106 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
1107 compensation_node_id: None,
1108 input_schema: None,
1109 output_schema: None,
1110 };
1111
1112 if let Some(last_node) = self.schematic.nodes.last_mut() {
1113 last_node.compensation_node_id = Some(comp_node_id.clone());
1114 }
1115
1116 self.schematic.nodes.push(comp_node);
1117 self
1118 }
1119
1120 #[track_caller]
1122 pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
1123 let caller = Location::caller();
1124 let branch_id_str = branch_id.into();
1125 let last_node_id = self
1126 .schematic
1127 .nodes
1128 .last()
1129 .map(|n| n.id.clone())
1130 .unwrap_or_default();
1131
1132 let branch_node = Node {
1133 id: uuid::Uuid::new_v4().to_string(),
1134 kind: NodeKind::Synapse,
1135 label: label.to_string(),
1136 description: None,
1137 input_type: type_name_of::<Out>(),
1138 output_type: type_name_of::<Out>(),
1139 resource_type: type_name_of::<Res>(),
1140 metadata: Default::default(),
1141 bus_capability: None,
1142 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1143 position: None,
1144 compensation_node_id: None,
1145 input_schema: None,
1146 output_schema: None,
1147 };
1148
1149 self.schematic.nodes.push(branch_node);
1150 self.schematic.edges.push(Edge {
1151 from: last_node_id,
1152 to: branch_id_str.clone(),
1153 kind: EdgeType::Branch(branch_id_str),
1154 label: Some("Branch".to_string()),
1155 });
1156
1157 self
1158 }
1159
1160 #[track_caller]
1197 pub fn parallel(
1198 self,
1199 transitions: Vec<Arc<dyn Transition<Out, Out, Resources = Res, Error = E> + Send + Sync>>,
1200 strategy: ParallelStrategy,
1201 ) -> Axon<In, Out, E, Res>
1202 where
1203 Out: Clone,
1204 {
1205 let caller = Location::caller();
1206 let Axon {
1207 mut schematic,
1208 executor: prev_executor,
1209 execution_mode,
1210 persistence_store,
1211 audit_sink,
1212 dlq_sink,
1213 dlq_policy,
1214 dynamic_dlq_policy,
1215 saga_policy,
1216 dynamic_saga_policy,
1217 saga_compensation_registry,
1218 iam_handle,
1219 } = self;
1220
1221 let fanout_id = uuid::Uuid::new_v4().to_string();
1223 let fanin_id = uuid::Uuid::new_v4().to_string();
1224
1225 let last_node_id = schematic
1226 .nodes
1227 .last()
1228 .map(|n| n.id.clone())
1229 .unwrap_or_default();
1230
1231 let fanout_node = Node {
1232 id: fanout_id.clone(),
1233 kind: NodeKind::FanOut,
1234 label: "FanOut".to_string(),
1235 description: Some(format!(
1236 "Parallel split ({} branches, {:?})",
1237 transitions.len(),
1238 strategy
1239 )),
1240 input_type: type_name_of::<Out>(),
1241 output_type: type_name_of::<Out>(),
1242 resource_type: type_name_of::<Res>(),
1243 metadata: Default::default(),
1244 bus_capability: None,
1245 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1246 position: None,
1247 compensation_node_id: None,
1248 input_schema: None,
1249 output_schema: None,
1250 };
1251
1252 schematic.nodes.push(fanout_node);
1253 schematic.edges.push(Edge {
1254 from: last_node_id,
1255 to: fanout_id.clone(),
1256 kind: EdgeType::Linear,
1257 label: Some("Next".to_string()),
1258 });
1259
1260 let mut branch_node_ids = Vec::with_capacity(transitions.len());
1262 for (i, trans) in transitions.iter().enumerate() {
1263 let branch_id = uuid::Uuid::new_v4().to_string();
1264 let branch_node = Node {
1265 id: branch_id.clone(),
1266 kind: NodeKind::Atom,
1267 label: trans.label(),
1268 description: trans.description(),
1269 input_type: type_name_of::<Out>(),
1270 output_type: type_name_of::<Out>(),
1271 resource_type: type_name_of::<Res>(),
1272 metadata: Default::default(),
1273 bus_capability: bus_capability_schema_from_policy(trans.bus_access_policy()),
1274 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1275 position: None,
1276 compensation_node_id: None,
1277 input_schema: trans.input_schema(),
1278 output_schema: None,
1279 };
1280 schematic.nodes.push(branch_node);
1281 schematic.edges.push(Edge {
1282 from: fanout_id.clone(),
1283 to: branch_id.clone(),
1284 kind: EdgeType::Parallel,
1285 label: Some(format!("Branch {}", i)),
1286 });
1287 branch_node_ids.push(branch_id);
1288 }
1289
1290 let fanin_node = Node {
1292 id: fanin_id.clone(),
1293 kind: NodeKind::FanIn,
1294 label: "FanIn".to_string(),
1295 description: Some(format!("Parallel join ({:?})", strategy)),
1296 input_type: type_name_of::<Out>(),
1297 output_type: type_name_of::<Out>(),
1298 resource_type: type_name_of::<Res>(),
1299 metadata: Default::default(),
1300 bus_capability: None,
1301 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
1302 position: None,
1303 compensation_node_id: None,
1304 input_schema: None,
1305 output_schema: None,
1306 };
1307
1308 schematic.nodes.push(fanin_node);
1309 for branch_id in &branch_node_ids {
1310 schematic.edges.push(Edge {
1311 from: branch_id.clone(),
1312 to: fanin_id.clone(),
1313 kind: EdgeType::Parallel,
1314 label: Some("Join".to_string()),
1315 });
1316 }
1317
1318 let fanout_node_id = fanout_id.clone();
1320 let fanin_node_id = fanin_id.clone();
1321 let branch_ids_for_exec = branch_node_ids.clone();
1322 let step_idx = schematic.nodes.len() as u64 - 1;
1323
1324 let next_executor: Executor<In, Out, E, Res> = Arc::new(
1325 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Out, E>> {
1326 let prev = prev_executor.clone();
1327 let branches = transitions.clone();
1328 let fanout_id = fanout_node_id.clone();
1329 let fanin_id = fanin_node_id.clone();
1330 let branch_ids = branch_ids_for_exec.clone();
1331
1332 Box::pin(async move {
1333 let prev_result = prev(input, res, bus).await;
1335 let state = match prev_result {
1336 Outcome::Next(t) => t,
1337 other => return other.map(|_| unreachable!()),
1338 };
1339
1340 let fanout_enter_ts = now_ms();
1342 if let Some(timeline) = bus.read_mut::<Timeline>() {
1343 timeline.push(TimelineEvent::NodeEnter {
1344 node_id: fanout_id.clone(),
1345 node_label: "FanOut".to_string(),
1346 timestamp: fanout_enter_ts,
1347 });
1348 }
1349
1350 let futs: Vec<_> = branches
1354 .iter()
1355 .enumerate()
1356 .map(|(i, trans)| {
1357 let branch_state = state.clone();
1358 let branch_node_id = branch_ids[i].clone();
1359 let trans = trans.clone();
1360
1361 async move {
1362 let mut branch_bus = Bus::new();
1363 let label = trans.label();
1364 let bus_policy = trans.bus_access_policy();
1365
1366 branch_bus.set_access_policy(label.clone(), bus_policy);
1367 let result = trans.run(branch_state, res, &mut branch_bus).await;
1368 branch_bus.clear_access_policy();
1369
1370 (i, branch_node_id, label, result)
1371 }
1372 })
1373 .collect();
1374
1375 let results: Vec<(usize, String, String, Outcome<Out, E>)> =
1377 futures_util::future::join_all(futs).await;
1378
1379 for (_, branch_node_id, branch_label, outcome) in &results {
1381 if let Some(timeline) = bus.read_mut::<Timeline>() {
1382 let ts = now_ms();
1383 timeline.push(TimelineEvent::NodeEnter {
1384 node_id: branch_node_id.clone(),
1385 node_label: branch_label.clone(),
1386 timestamp: ts,
1387 });
1388 timeline.push(TimelineEvent::NodeExit {
1389 node_id: branch_node_id.clone(),
1390 outcome_type: outcome_type_name(outcome),
1391 duration_ms: 0,
1392 timestamp: ts,
1393 });
1394 }
1395 }
1396
1397 if let Some(timeline) = bus.read_mut::<Timeline>() {
1399 timeline.push(TimelineEvent::NodeExit {
1400 node_id: fanout_id.clone(),
1401 outcome_type: "Next".to_string(),
1402 duration_ms: 0,
1403 timestamp: now_ms(),
1404 });
1405 }
1406
1407 let combined = match strategy {
1409 ParallelStrategy::AllMustSucceed => {
1410 let mut first_fault = None;
1411 let mut first_success = None;
1412
1413 for (_, _, _, outcome) in results {
1414 match outcome {
1415 Outcome::Fault(e) => {
1416 if first_fault.is_none() {
1417 first_fault = Some(Outcome::Fault(e));
1418 }
1419 }
1420 Outcome::Next(val) => {
1421 if first_success.is_none() {
1422 first_success = Some(Outcome::Next(val));
1423 }
1424 }
1425 other => {
1426 if first_fault.is_none() {
1429 first_fault = Some(other);
1430 }
1431 }
1432 }
1433 }
1434
1435 if let Some(fault) = first_fault {
1436 fault
1437 } else {
1438 first_success.unwrap_or_else(|| {
1439 Outcome::emit("execution.parallel.no_results", None)
1440 })
1441 }
1442 }
1443 ParallelStrategy::AnyCanFail => {
1444 let mut first_success = None;
1445 let mut first_fault = None;
1446
1447 for (_, _, _, outcome) in results {
1448 match outcome {
1449 Outcome::Next(val) => {
1450 if first_success.is_none() {
1451 first_success = Some(Outcome::Next(val));
1452 }
1453 }
1454 Outcome::Fault(e) => {
1455 if first_fault.is_none() {
1456 first_fault = Some(Outcome::Fault(e));
1457 }
1458 }
1459 _ => {}
1460 }
1461 }
1462
1463 first_success.unwrap_or_else(|| {
1464 first_fault.unwrap_or_else(|| {
1465 Outcome::emit("execution.parallel.no_results", None)
1466 })
1467 })
1468 }
1469 };
1470
1471 let fanin_enter_ts = now_ms();
1473 if let Some(timeline) = bus.read_mut::<Timeline>() {
1474 timeline.push(TimelineEvent::NodeEnter {
1475 node_id: fanin_id.clone(),
1476 node_label: "FanIn".to_string(),
1477 timestamp: fanin_enter_ts,
1478 });
1479 timeline.push(TimelineEvent::NodeExit {
1480 node_id: fanin_id.clone(),
1481 outcome_type: outcome_type_name(&combined),
1482 duration_ms: 0,
1483 timestamp: fanin_enter_ts,
1484 });
1485 }
1486
1487 if let Some(handle) = bus.read::<PersistenceHandle>() {
1489 let trace_id = persistence_trace_id(bus);
1490 let circuit = bus
1491 .read::<ranvier_core::schematic::Schematic>()
1492 .map(|s| s.name.clone())
1493 .unwrap_or_default();
1494 let version = bus
1495 .read::<ranvier_core::schematic::Schematic>()
1496 .map(|s| s.schema_version.clone())
1497 .unwrap_or_default();
1498
1499 persist_execution_event(
1500 handle,
1501 &trace_id,
1502 &circuit,
1503 &version,
1504 step_idx,
1505 Some(fanin_id.clone()),
1506 outcome_kind_name(&combined),
1507 Some(combined.to_json_value()),
1508 )
1509 .await;
1510 }
1511
1512 combined
1513 })
1514 },
1515 );
1516
1517 Axon {
1518 schematic,
1519 executor: next_executor,
1520 execution_mode,
1521 persistence_store,
1522 audit_sink,
1523 dlq_sink,
1524 dlq_policy,
1525 dynamic_dlq_policy,
1526 saga_policy,
1527 dynamic_saga_policy,
1528 saga_compensation_registry,
1529 iam_handle,
1530 }
1531 }
1532
1533 pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
1535 if let ExecutionMode::Singleton {
1536 lock_key,
1537 ttl_ms,
1538 lock_provider,
1539 } = &self.execution_mode
1540 {
1541 let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
1542 let _enter = trace_span.enter();
1543 match lock_provider.try_acquire(lock_key, *ttl_ms).await {
1544 Ok(true) => {
1545 tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
1546 }
1547 Ok(false) => {
1548 tracing::debug!(
1549 "Singleton lock {} already held, aborting execution.",
1550 lock_key
1551 );
1552 return Outcome::emit(
1554 "execution.skipped.lock_held",
1555 Some(serde_json::json!({
1556 "lock_key": lock_key
1557 })),
1558 );
1559 }
1560 Err(e) => {
1561 tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
1562 return Outcome::emit(
1563 "execution.skipped.lock_error",
1564 Some(serde_json::json!({
1565 "error": e.to_string()
1566 })),
1567 );
1568 }
1569 }
1570 }
1571
1572 if let Some(iam) = &self.iam_handle {
1574 use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
1575
1576 if matches!(iam.policy, IamPolicy::None) {
1577 } else {
1579 let token = bus.read::<IamToken>().map(|t| t.0.clone());
1580
1581 match token {
1582 Some(raw_token) => {
1583 match iam.verifier.verify(&raw_token).await {
1584 Ok(identity) => {
1585 if let Err(e) = enforce_policy(&iam.policy, &identity) {
1586 tracing::warn!(
1587 policy = ?iam.policy,
1588 subject = %identity.subject,
1589 "IAM policy enforcement failed: {}",
1590 e
1591 );
1592 return Outcome::emit(
1593 "iam.policy_denied",
1594 Some(serde_json::json!({
1595 "error": e.to_string(),
1596 "subject": identity.subject,
1597 })),
1598 );
1599 }
1600 bus.insert(identity);
1602 }
1603 Err(e) => {
1604 tracing::warn!("IAM token verification failed: {}", e);
1605 return Outcome::emit(
1606 "iam.verification_failed",
1607 Some(serde_json::json!({
1608 "error": e.to_string()
1609 })),
1610 );
1611 }
1612 }
1613 }
1614 None => {
1615 tracing::warn!("IAM policy requires token but none found in Bus");
1616 return Outcome::emit("iam.missing_token", None);
1617 }
1618 }
1619 }
1620 }
1621
1622 let trace_id = persistence_trace_id(bus);
1623 let label = self.schematic.name.clone();
1624
1625 if let Some(sink) = &self.dlq_sink {
1627 bus.insert(sink.clone());
1628 }
1629 let effective_dlq_policy = self
1631 .dynamic_dlq_policy
1632 .as_ref()
1633 .map(|d| d.current())
1634 .unwrap_or_else(|| self.dlq_policy.clone());
1635 bus.insert(effective_dlq_policy);
1636 bus.insert(self.schematic.clone());
1637 let effective_saga_policy = self
1638 .dynamic_saga_policy
1639 .as_ref()
1640 .map(|d| d.current())
1641 .unwrap_or_else(|| self.saga_policy.clone());
1642 bus.insert(effective_saga_policy.clone());
1643
1644 if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1646 bus.insert(SagaStack::new());
1647 }
1648
1649 let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1650 let compensation_handle = bus.read::<CompensationHandle>().cloned();
1651 let compensation_retry_policy = compensation_retry_policy(bus);
1652 let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1653 let version = self.schematic.schema_version.clone();
1654 let migration_registry = bus
1655 .read::<ranvier_core::schematic::MigrationRegistry>()
1656 .cloned();
1657
1658 let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1659 let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1660 load_persistence_version(handle, &trace_id).await;
1661
1662 if let Some(interv) = intervention {
1663 tracing::info!(
1664 trace_id = %trace_id,
1665 target_node = %interv.target_node,
1666 "Applying manual intervention command"
1667 );
1668
1669 if let Some(target_idx) = self
1671 .schematic
1672 .nodes
1673 .iter()
1674 .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1675 {
1676 tracing::info!(
1677 trace_id = %trace_id,
1678 target_node = %interv.target_node,
1679 target_step = target_idx,
1680 "Intervention: Jumping to target node"
1681 );
1682 start_step = target_idx as u64;
1683
1684 bus.insert(ManualJump {
1686 target_node: interv.target_node.clone(),
1687 payload_override: interv.payload_override.clone(),
1688 });
1689
1690 if let Some(sink) = self.audit_sink.as_ref() {
1692 let event = AuditEvent::new(
1693 uuid::Uuid::new_v4().to_string(),
1694 "System".to_string(),
1695 "ApplyIntervention".to_string(),
1696 trace_id.to_string(),
1697 )
1698 .with_metadata("target_node", interv.target_node.clone())
1699 .with_metadata("target_step", target_idx);
1700
1701 let _ = sink.append(&event).await;
1702 }
1703 } else {
1704 tracing::warn!(
1705 trace_id = %trace_id,
1706 target_node = %interv.target_node,
1707 "Intervention target node not found in schematic; ignoring jump"
1708 );
1709 }
1710 }
1711
1712 if let Some(old_version) = trace_version
1713 && old_version != version
1714 {
1715 tracing::info!(
1716 trace_id = %trace_id,
1717 old_version = %old_version,
1718 current_version = %version,
1719 "Version mismatch detected during resumption"
1720 );
1721
1722 let migration_path = migration_registry
1724 .as_ref()
1725 .and_then(|r| r.find_migration_path(&old_version, &version));
1726
1727 let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1728 if path.is_empty() {
1729 (None, last_payload.clone())
1730 } else {
1731 let mut payload = last_payload.clone();
1733 for hop in &path {
1734 if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1735 {
1736 match mapper.map_state(p) {
1737 Ok(mapped) => payload = Some(mapped),
1738 Err(e) => {
1739 tracing::error!(
1740 trace_id = %trace_id,
1741 from = %hop.from_version,
1742 to = %hop.to_version,
1743 error = %e,
1744 "Payload migration mapper failed"
1745 );
1746 return Outcome::emit(
1747 "execution.resumption.payload_migration_failed",
1748 Some(serde_json::json!({
1749 "trace_id": trace_id,
1750 "from": hop.from_version,
1751 "to": hop.to_version,
1752 "error": e.to_string()
1753 })),
1754 );
1755 }
1756 }
1757 }
1758 }
1759 let hops: Vec<String> = path
1760 .iter()
1761 .map(|h| format!("{}->{}", h.from_version, h.to_version))
1762 .collect();
1763 tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
1764 (path.last().copied(), payload)
1765 }
1766 } else {
1767 (None, last_payload.clone())
1768 };
1769
1770 let migration = final_migration.or_else(|| {
1772 migration_registry
1773 .as_ref()
1774 .and_then(|r| r.find_migration(&old_version, &version))
1775 });
1776
1777 if mapped_payload.is_some() {
1779 last_payload = mapped_payload;
1780 }
1781
1782 let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
1783 {
1784 m.node_mapping
1785 .get(node_id)
1786 .cloned()
1787 .unwrap_or(m.default_strategy.clone())
1788 } else {
1789 migration
1790 .map(|m| m.default_strategy.clone())
1791 .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
1792 };
1793
1794 match strategy {
1795 ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
1796 tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
1797 start_step = 0;
1798 }
1799 ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
1800 new_node_id,
1801 ..
1802 } => {
1803 tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
1804 if let Some(target_idx) = self
1805 .schematic
1806 .nodes
1807 .iter()
1808 .position(|n| n.id == new_node_id || n.label == new_node_id)
1809 {
1810 start_step = target_idx as u64;
1811 } else {
1812 tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
1813 return Outcome::emit(
1814 "execution.resumption.migration_target_not_found",
1815 Some(serde_json::json!({ "node_id": new_node_id })),
1816 );
1817 }
1818 }
1819 ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
1820 tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
1821 if let Some(target_idx) = self
1822 .schematic
1823 .nodes
1824 .iter()
1825 .position(|n| n.id == node_id || n.label == node_id)
1826 {
1827 start_step = target_idx as u64;
1828 } else {
1829 tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
1830 return Outcome::emit(
1831 "execution.resumption.migration_target_not_found",
1832 Some(serde_json::json!({ "node_id": node_id })),
1833 );
1834 }
1835 }
1836 ranvier_core::schematic::MigrationStrategy::Fail => {
1837 tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
1838 return Outcome::emit(
1839 "execution.resumption.version_mismatch_failed",
1840 Some(serde_json::json!({
1841 "trace_id": trace_id,
1842 "old_version": old_version,
1843 "current_version": version
1844 })),
1845 );
1846 }
1847 _ => {
1848 tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
1849 return Outcome::emit(
1850 "execution.resumption.unsupported_migration",
1851 Some(serde_json::json!({
1852 "trace_id": trace_id,
1853 "strategy": format!("{:?}", strategy)
1854 })),
1855 );
1856 }
1857 }
1858 }
1859
1860 let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
1861 persist_execution_event(
1862 handle,
1863 &trace_id,
1864 &label,
1865 &version,
1866 start_step,
1867 ingress_node_id,
1868 "Enter",
1869 None,
1870 )
1871 .await;
1872
1873 bus.insert(StartStep(start_step));
1874 if start_step > 0 {
1875 bus.insert(ResumptionState {
1876 payload: last_payload,
1877 });
1878 }
1879
1880 Some(start_step)
1881 } else {
1882 None
1883 };
1884
1885 let should_capture = should_attach_timeline(bus);
1886 let inserted_timeline = if should_capture {
1887 ensure_timeline(bus)
1888 } else {
1889 false
1890 };
1891 let ingress_started = std::time::Instant::now();
1892 let ingress_enter_ts = now_ms();
1893 if should_capture
1894 && let (Some(timeline), Some(ingress)) =
1895 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1896 {
1897 timeline.push(TimelineEvent::NodeEnter {
1898 node_id: ingress.id.clone(),
1899 node_label: ingress.label.clone(),
1900 timestamp: ingress_enter_ts,
1901 });
1902 }
1903
1904 let circuit_span = tracing::info_span!(
1905 "Circuit",
1906 ranvier.circuit = %label,
1907 ranvier.outcome_kind = tracing::field::Empty,
1908 ranvier.outcome_target = tracing::field::Empty
1909 );
1910 let outcome = (self.executor)(input, resources, bus)
1911 .instrument(circuit_span.clone())
1912 .await;
1913 circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
1914 if let Some(target) = outcome_target(&outcome) {
1915 circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
1916 }
1917
1918 if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
1920 while let Some(task) = {
1921 let mut stack = bus.read_mut::<SagaStack>();
1922 stack.as_mut().and_then(|s| s.pop())
1923 } {
1924 tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
1925
1926 let handler = {
1927 let registry = self.saga_compensation_registry.read().expect("saga compensation registry lock poisoned");
1928 registry.get(&task.node_id)
1929 };
1930 if let Some(handler) = handler {
1931 let res = handler(task.input_snapshot, resources, bus).await;
1932 if let Outcome::Fault(e) = res {
1933 tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
1934 }
1935 } else {
1936 tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
1937 }
1938 }
1939 tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
1940 }
1941
1942 let ingress_exit_ts = now_ms();
1943 if should_capture
1944 && let (Some(timeline), Some(ingress)) =
1945 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1946 {
1947 timeline.push(TimelineEvent::NodeExit {
1948 node_id: ingress.id.clone(),
1949 outcome_type: outcome_type_name(&outcome),
1950 duration_ms: ingress_started.elapsed().as_millis() as u64,
1951 timestamp: ingress_exit_ts,
1952 });
1953 }
1954
1955 if let Some(handle) = persistence_handle.as_ref() {
1956 let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
1957 persist_execution_event(
1958 handle,
1959 &trace_id,
1960 &label,
1961 &version,
1962 fault_step,
1963 None, outcome_kind_name(&outcome),
1965 Some(outcome.to_json_value()),
1966 )
1967 .await;
1968
1969 let mut completion = completion_from_outcome(&outcome);
1970 if matches!(outcome, Outcome::Fault(_))
1971 && let Some(compensation) = compensation_handle.as_ref()
1972 && compensation_auto_trigger(bus)
1973 {
1974 let context = CompensationContext {
1975 trace_id: trace_id.clone(),
1976 circuit: label.clone(),
1977 fault_kind: outcome_kind_name(&outcome).to_string(),
1978 fault_step,
1979 timestamp_ms: now_ms(),
1980 };
1981
1982 if run_compensation(
1983 compensation,
1984 context,
1985 compensation_retry_policy,
1986 compensation_idempotency.clone(),
1987 )
1988 .await
1989 {
1990 persist_execution_event(
1991 handle,
1992 &trace_id,
1993 &label,
1994 &version,
1995 fault_step.saturating_add(1),
1996 None,
1997 "Compensated",
1998 None,
1999 )
2000 .await;
2001 completion = CompletionState::Compensated;
2002 }
2003 }
2004
2005 if persistence_auto_complete(bus) {
2006 persist_completion(handle, &trace_id, completion).await;
2007 }
2008 }
2009
2010 if should_capture {
2011 maybe_export_timeline(bus, &outcome);
2012 }
2013 if inserted_timeline {
2014 let _ = bus.remove::<Timeline>();
2015 }
2016
2017 outcome
2018 }
2019
2020 pub fn serve_inspector(self, port: u16) -> Self {
2023 if !inspector_dev_mode_from_env() {
2024 tracing::info!("Inspector disabled because RANVIER_MODE is production");
2025 return self;
2026 }
2027 if !inspector_enabled_from_env() {
2028 tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
2029 return self;
2030 }
2031
2032 let schematic = self.schematic.clone();
2033 let axon_inspector = Arc::new(self.clone());
2034 tokio::spawn(async move {
2035 if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
2036 .with_projection_files_from_env()
2037 .with_mode_from_env()
2038 .with_auth_policy_from_env()
2039 .with_state_inspector(axon_inspector)
2040 .serve()
2041 .await
2042 {
2043 tracing::error!("Inspector server failed: {}", e);
2044 }
2045 });
2046 self
2047 }
2048
2049 pub fn schematic(&self) -> &Schematic {
2051 &self.schematic
2052 }
2053
2054 pub fn into_schematic(self) -> Schematic {
2056 self.schematic
2057 }
2058
2059 pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
2070 schematic_export_request_from_process()
2071 }
2072
2073 pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
2085 self.maybe_export_and_exit_with(|_| ())
2086 }
2087
2088 pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
2093 where
2094 F: FnOnce(&SchematicExportRequest),
2095 {
2096 let Some(request) = self.schematic_export_request() else {
2097 return Ok(false);
2098 };
2099 on_before_exit(&request);
2100 self.export_schematic(&request)?;
2101 Ok(true)
2102 }
2103
2104 pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
2106 let json = serde_json::to_string_pretty(self.schematic())?;
2107 if let Some(path) = &request.output {
2108 if let Some(parent) = path.parent()
2109 && !parent.as_os_str().is_empty()
2110 {
2111 fs::create_dir_all(parent)?;
2112 }
2113 fs::write(path, json.as_bytes())?;
2114 return Ok(());
2115 }
2116 println!("{}", json);
2117 Ok(())
2118 }
2119}
2120
2121fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
2122 let args: Vec<OsString> = std::env::args_os().skip(1).collect();
2123 let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
2124 let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
2125
2126 let mut i = 0;
2127 while i < args.len() {
2128 let arg = args[i].to_string_lossy();
2129
2130 if arg == "--schematic" {
2131 enabled = true;
2132 i += 1;
2133 continue;
2134 }
2135
2136 if arg == "--schematic-output" || arg == "--output" {
2137 if let Some(next) = args.get(i + 1) {
2138 output = Some(PathBuf::from(next));
2139 i += 2;
2140 continue;
2141 }
2142 } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
2143 output = Some(PathBuf::from(value));
2144 } else if let Some(value) = arg.strip_prefix("--output=") {
2145 output = Some(PathBuf::from(value));
2146 }
2147
2148 i += 1;
2149 }
2150
2151 if enabled {
2152 Some(SchematicExportRequest { output })
2153 } else {
2154 None
2155 }
2156}
2157
2158fn env_flag_is_true(key: &str) -> bool {
2159 match std::env::var(key) {
2160 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2161 Err(_) => false,
2162 }
2163}
2164
2165fn inspector_enabled_from_env() -> bool {
2166 let raw = std::env::var("RANVIER_INSPECTOR").ok();
2167 inspector_enabled_from_value(raw.as_deref())
2168}
2169
2170fn inspector_enabled_from_value(value: Option<&str>) -> bool {
2171 match value {
2172 Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
2173 None => true,
2174 }
2175}
2176
2177fn inspector_dev_mode_from_env() -> bool {
2178 let raw = std::env::var("RANVIER_MODE").ok();
2179 inspector_dev_mode_from_value(raw.as_deref())
2180}
2181
2182fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
2183 !matches!(
2184 value.map(|v| v.to_ascii_lowercase()),
2185 Some(mode) if mode == "prod" || mode == "production"
2186 )
2187}
2188
2189fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
2190 let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
2191 Ok(v) if !v.trim().is_empty() => v,
2192 _ => return,
2193 };
2194
2195 let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
2196 let policy = timeline_adaptive_policy();
2197 let forced = should_force_export(outcome, &policy);
2198 let should_export = sampled || forced;
2199 if !should_export {
2200 record_sampling_stats(false, sampled, forced, "none", &policy);
2201 return;
2202 }
2203
2204 let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
2205 timeline.sort();
2206
2207 let mode = std::env::var("RANVIER_TIMELINE_MODE")
2208 .unwrap_or_else(|_| "overwrite".to_string())
2209 .to_ascii_lowercase();
2210
2211 if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
2212 tracing::warn!(
2213 "Failed to persist timeline file {} (mode={}): {}",
2214 path,
2215 mode,
2216 err
2217 );
2218 record_sampling_stats(false, sampled, forced, &mode, &policy);
2219 } else {
2220 record_sampling_stats(true, sampled, forced, &mode, &policy);
2221 }
2222}
2223
2224fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
2225 match outcome {
2226 Outcome::Next(_) => "Next".to_string(),
2227 Outcome::Branch(id, _) => format!("Branch:{}", id),
2228 Outcome::Jump(id, _) => format!("Jump:{}", id),
2229 Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
2230 Outcome::Fault(_) => "Fault".to_string(),
2231 }
2232}
2233
2234fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
2235 match outcome {
2236 Outcome::Next(_) => "Next",
2237 Outcome::Branch(_, _) => "Branch",
2238 Outcome::Jump(_, _) => "Jump",
2239 Outcome::Emit(_, _) => "Emit",
2240 Outcome::Fault(_) => "Fault",
2241 }
2242}
2243
2244fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
2245 match outcome {
2246 Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
2247 Outcome::Jump(node_id, _) => Some(node_id.to_string()),
2248 Outcome::Emit(event_type, _) => Some(event_type.clone()),
2249 Outcome::Next(_) | Outcome::Fault(_) => None,
2250 }
2251}
2252
2253fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
2254 match outcome {
2255 Outcome::Fault(_) => CompletionState::Fault,
2256 _ => CompletionState::Success,
2257 }
2258}
2259
2260fn persistence_trace_id(bus: &Bus) -> String {
2261 if let Some(explicit) = bus.read::<PersistenceTraceId>() {
2262 explicit.0.clone()
2263 } else {
2264 format!("{}:{}", bus.id, now_ms())
2265 }
2266}
2267
2268fn persistence_auto_complete(bus: &Bus) -> bool {
2269 bus.read::<PersistenceAutoComplete>()
2270 .map(|v| v.0)
2271 .unwrap_or(true)
2272}
2273
2274fn compensation_auto_trigger(bus: &Bus) -> bool {
2275 bus.read::<CompensationAutoTrigger>()
2276 .map(|v| v.0)
2277 .unwrap_or(true)
2278}
2279
2280fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
2281 bus.read::<CompensationRetryPolicy>()
2282 .copied()
2283 .unwrap_or_default()
2284}
2285
2286fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
2292 payload.map(|p| {
2293 p.get("Next")
2294 .or_else(|| p.get("Branch"))
2295 .or_else(|| p.get("Jump"))
2296 .cloned()
2297 .unwrap_or_else(|| p.clone())
2298 })
2299}
2300
2301async fn load_persistence_version(
2302 handle: &PersistenceHandle,
2303 trace_id: &str,
2304) -> (
2305 u64,
2306 Option<String>,
2307 Option<crate::persistence::Intervention>,
2308 Option<String>,
2309 Option<serde_json::Value>,
2310) {
2311 let store = handle.store();
2312 match store.load(trace_id).await {
2313 Ok(Some(trace)) => {
2314 let (next_step, last_node_id, last_payload) =
2315 if let Some(resume_from_step) = trace.resumed_from_step {
2316 let anchor_event = trace
2317 .events
2318 .iter()
2319 .rev()
2320 .find(|event| {
2321 event.step <= resume_from_step
2322 && event.outcome_kind == "Next"
2323 && event.payload.is_some()
2324 })
2325 .or_else(|| {
2326 trace.events.iter().rev().find(|event| {
2327 event.step <= resume_from_step
2328 && event.outcome_kind != "Fault"
2329 && event.payload.is_some()
2330 })
2331 })
2332 .or_else(|| {
2333 trace.events.iter().rev().find(|event| {
2334 event.step <= resume_from_step && event.payload.is_some()
2335 })
2336 })
2337 .or_else(|| trace.events.last());
2338
2339 (
2340 resume_from_step.saturating_add(1),
2341 anchor_event.and_then(|event| event.node_id.clone()),
2342 anchor_event.and_then(|event| {
2343 unwrap_outcome_payload(event.payload.as_ref())
2344 }),
2345 )
2346 } else {
2347 let last_event = trace.events.last();
2348 (
2349 last_event
2350 .map(|event| event.step.saturating_add(1))
2351 .unwrap_or(0),
2352 last_event.and_then(|event| event.node_id.clone()),
2353 last_event.and_then(|event| {
2354 unwrap_outcome_payload(event.payload.as_ref())
2355 }),
2356 )
2357 };
2358
2359 (
2360 next_step,
2361 Some(trace.schematic_version),
2362 trace.interventions.last().cloned(),
2363 last_node_id,
2364 last_payload,
2365 )
2366 }
2367 Ok(None) => (0, None, None, None, None),
2368 Err(err) => {
2369 tracing::warn!(
2370 trace_id = %trace_id,
2371 error = %err,
2372 "Failed to load persistence trace; falling back to step=0"
2373 );
2374 (0, None, None, None, None)
2375 }
2376 }
2377}
2378
2379#[allow(clippy::too_many_arguments)]
2380async fn run_this_step<In, Out, E, Res>(
2381 trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
2382 state: In,
2383 res: &Res,
2384 bus: &mut Bus,
2385 node_id: &str,
2386 node_label: &str,
2387 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2388 step_idx: u64,
2389) -> Outcome<Out, E>
2390where
2391 In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2392 Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2393 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2394 Res: ranvier_core::transition::ResourceRequirement,
2395{
2396 let label = trans.label();
2397 let res_type = std::any::type_name::<Res>()
2398 .split("::")
2399 .last()
2400 .unwrap_or("unknown");
2401
2402 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2404 debug.should_pause(node_id)
2405 } else {
2406 false
2407 };
2408
2409 if should_pause {
2410 let trace_id = persistence_trace_id(bus);
2411 tracing::event!(
2412 target: "ranvier.debugger",
2413 tracing::Level::INFO,
2414 trace_id = %trace_id,
2415 node_id = %node_id,
2416 "Node paused"
2417 );
2418
2419 if let Some(timeline) = bus.read_mut::<Timeline>() {
2420 timeline.push(TimelineEvent::NodePaused {
2421 node_id: node_id.to_string(),
2422 timestamp: now_ms(),
2423 });
2424 }
2425 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2426 debug.wait().await;
2427 }
2428 }
2429
2430 let enter_ts = now_ms();
2431 if let Some(timeline) = bus.read_mut::<Timeline>() {
2432 timeline.push(TimelineEvent::NodeEnter {
2433 node_id: node_id.to_string(),
2434 node_label: node_label.to_string(),
2435 timestamp: enter_ts,
2436 });
2437 }
2438
2439 let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
2441 if let DlqPolicy::RetryThenDlq {
2442 max_attempts,
2443 backoff_ms,
2444 } = p
2445 {
2446 Some((*max_attempts, *backoff_ms))
2447 } else {
2448 None
2449 }
2450 });
2451 let retry_state_snapshot = if dlq_retry_config.is_some() {
2452 serde_json::to_value(&state).ok()
2453 } else {
2454 None
2455 };
2456
2457 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2459 Some(serde_json::to_vec(&state).unwrap_or_default())
2460 } else {
2461 None
2462 };
2463
2464 let node_span = tracing::info_span!(
2465 "Node",
2466 ranvier.node = %label,
2467 ranvier.resource_type = %res_type,
2468 ranvier.outcome_kind = tracing::field::Empty,
2469 ranvier.outcome_target = tracing::field::Empty
2470 );
2471 let started = std::time::Instant::now();
2472 bus.set_access_policy(label.clone(), bus_policy.clone());
2473 let result = trans
2474 .run(state, res, bus)
2475 .instrument(node_span.clone())
2476 .await;
2477 bus.clear_access_policy();
2478
2479 let result = if let Outcome::Fault(_) = &result {
2482 if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
2483 (dlq_retry_config, &retry_state_snapshot)
2484 {
2485 let mut final_result = result;
2486 for attempt in 2..=max_attempts {
2488 let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
2489
2490 tracing::info!(
2491 ranvier.node = %label,
2492 attempt = attempt,
2493 max_attempts = max_attempts,
2494 backoff_ms = delay,
2495 "Retrying faulted node"
2496 );
2497
2498 if let Some(timeline) = bus.read_mut::<Timeline>() {
2499 timeline.push(TimelineEvent::NodeRetry {
2500 node_id: node_id.to_string(),
2501 attempt,
2502 max_attempts,
2503 backoff_ms: delay,
2504 timestamp: now_ms(),
2505 });
2506 }
2507
2508 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
2509
2510 if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
2511 bus.set_access_policy(label.clone(), bus_policy.clone());
2512 let retry_result = trans
2513 .run(retry_state, res, bus)
2514 .instrument(tracing::info_span!(
2515 "NodeRetry",
2516 ranvier.node = %label,
2517 attempt = attempt
2518 ))
2519 .await;
2520 bus.clear_access_policy();
2521
2522 match &retry_result {
2523 Outcome::Fault(_) => {
2524 final_result = retry_result;
2525 }
2526 _ => {
2527 final_result = retry_result;
2528 break;
2529 }
2530 }
2531 }
2532 }
2533 final_result
2534 } else {
2535 result
2536 }
2537 } else {
2538 result
2539 };
2540
2541 node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
2542 if let Some(target) = outcome_target(&result) {
2543 node_span.record("ranvier.outcome_target", tracing::field::display(&target));
2544 }
2545 let duration_ms = started.elapsed().as_millis() as u64;
2546 let exit_ts = now_ms();
2547
2548 if let Some(timeline) = bus.read_mut::<Timeline>() {
2549 timeline.push(TimelineEvent::NodeExit {
2550 node_id: node_id.to_string(),
2551 outcome_type: outcome_type_name(&result),
2552 duration_ms,
2553 timestamp: exit_ts,
2554 });
2555
2556 if let Outcome::Branch(branch_id, _) = &result {
2557 timeline.push(TimelineEvent::Branchtaken {
2558 branch_id: branch_id.clone(),
2559 timestamp: exit_ts,
2560 });
2561 }
2562 }
2563
2564 if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
2566 && let Some(stack) = bus.read_mut::<SagaStack>()
2567 {
2568 stack.push(node_id.to_string(), label.clone(), snapshot);
2569 }
2570
2571 if let Some(handle) = bus.read::<PersistenceHandle>() {
2572 let trace_id = persistence_trace_id(bus);
2573 let circuit = bus
2574 .read::<ranvier_core::schematic::Schematic>()
2575 .map(|s| s.name.clone())
2576 .unwrap_or_default();
2577 let version = bus
2578 .read::<ranvier_core::schematic::Schematic>()
2579 .map(|s| s.schema_version.clone())
2580 .unwrap_or_default();
2581
2582 persist_execution_event(
2583 handle,
2584 &trace_id,
2585 &circuit,
2586 &version,
2587 step_idx,
2588 Some(node_id.to_string()),
2589 outcome_kind_name(&result),
2590 Some(result.to_json_value()),
2591 )
2592 .await;
2593 }
2594
2595 if let Outcome::Fault(f) = &result {
2598 let dlq_action = {
2600 let policy = bus.read::<DlqPolicy>();
2601 let sink = bus.read::<Arc<dyn DlqSink>>();
2602 match (sink, policy) {
2603 (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
2604 _ => None,
2605 }
2606 };
2607
2608 if let Some(sink) = dlq_action {
2609 if let Some((max_attempts, _)) = dlq_retry_config
2610 && let Some(timeline) = bus.read_mut::<Timeline>()
2611 {
2612 timeline.push(TimelineEvent::DlqExhausted {
2613 node_id: node_id.to_string(),
2614 total_attempts: max_attempts,
2615 timestamp: now_ms(),
2616 });
2617 }
2618
2619 let trace_id = persistence_trace_id(bus);
2620 let circuit = bus
2621 .read::<ranvier_core::schematic::Schematic>()
2622 .map(|s| s.name.clone())
2623 .unwrap_or_default();
2624 let _ = sink
2625 .store_dead_letter(
2626 &trace_id,
2627 &circuit,
2628 node_id,
2629 &format!("{:?}", f),
2630 &serde_json::to_vec(&f).unwrap_or_default(),
2631 )
2632 .await;
2633 }
2634 }
2635
2636 result
2637}
2638
2639#[allow(clippy::too_many_arguments)]
2640async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2641 trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2642 comp: &Comp,
2643 state: Out,
2644 res: &Res,
2645 bus: &mut Bus,
2646 node_id: &str,
2647 comp_node_id: &str,
2648 node_label: &str,
2649 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2650 step_idx: u64,
2651) -> Outcome<Next, E>
2652where
2653 Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2654 Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2655 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2656 Res: ranvier_core::transition::ResourceRequirement,
2657 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2658{
2659 let label = trans.label();
2660
2661 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2663 debug.should_pause(node_id)
2664 } else {
2665 false
2666 };
2667
2668 if should_pause {
2669 let trace_id = persistence_trace_id(bus);
2670 tracing::event!(
2671 target: "ranvier.debugger",
2672 tracing::Level::INFO,
2673 trace_id = %trace_id,
2674 node_id = %node_id,
2675 "Node paused (compensated)"
2676 );
2677
2678 if let Some(timeline) = bus.read_mut::<Timeline>() {
2679 timeline.push(TimelineEvent::NodePaused {
2680 node_id: node_id.to_string(),
2681 timestamp: now_ms(),
2682 });
2683 }
2684 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2685 debug.wait().await;
2686 }
2687 }
2688
2689 let enter_ts = now_ms();
2690 if let Some(timeline) = bus.read_mut::<Timeline>() {
2691 timeline.push(TimelineEvent::NodeEnter {
2692 node_id: node_id.to_string(),
2693 node_label: node_label.to_string(),
2694 timestamp: enter_ts,
2695 });
2696 }
2697
2698 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2700 Some(serde_json::to_vec(&state).unwrap_or_default())
2701 } else {
2702 None
2703 };
2704
2705 let node_span = tracing::info_span!("Node", ranvier.node = %label);
2706 bus.set_access_policy(label.clone(), bus_policy.clone());
2707 let result = trans
2708 .run(state.clone(), res, bus)
2709 .instrument(node_span)
2710 .await;
2711 bus.clear_access_policy();
2712
2713 let duration_ms = 0; let exit_ts = now_ms();
2715
2716 if let Some(timeline) = bus.read_mut::<Timeline>() {
2717 timeline.push(TimelineEvent::NodeExit {
2718 node_id: node_id.to_string(),
2719 outcome_type: outcome_type_name(&result),
2720 duration_ms,
2721 timestamp: exit_ts,
2722 });
2723 }
2724
2725 if let Outcome::Fault(ref err) = result {
2727 if compensation_auto_trigger(bus) {
2728 tracing::info!(
2729 ranvier.node = %label,
2730 ranvier.compensation.trigger = "saga",
2731 error = ?err,
2732 "Saga compensation triggered"
2733 );
2734
2735 if let Some(timeline) = bus.read_mut::<Timeline>() {
2736 timeline.push(TimelineEvent::NodeEnter {
2737 node_id: comp_node_id.to_string(),
2738 node_label: format!("Compensate: {}", comp.label()),
2739 timestamp: exit_ts,
2740 });
2741 }
2742
2743 let _ = comp.run(state, res, bus).await;
2745
2746 if let Some(timeline) = bus.read_mut::<Timeline>() {
2747 timeline.push(TimelineEvent::NodeExit {
2748 node_id: comp_node_id.to_string(),
2749 outcome_type: "Compensated".to_string(),
2750 duration_ms: 0,
2751 timestamp: now_ms(),
2752 });
2753 }
2754
2755 if let Some(handle) = bus.read::<PersistenceHandle>() {
2756 let trace_id = persistence_trace_id(bus);
2757 let circuit = bus
2758 .read::<ranvier_core::schematic::Schematic>()
2759 .map(|s| s.name.clone())
2760 .unwrap_or_default();
2761 let version = bus
2762 .read::<ranvier_core::schematic::Schematic>()
2763 .map(|s| s.schema_version.clone())
2764 .unwrap_or_default();
2765
2766 persist_execution_event(
2767 handle,
2768 &trace_id,
2769 &circuit,
2770 &version,
2771 step_idx + 1, Some(comp_node_id.to_string()),
2773 "Compensated",
2774 None,
2775 )
2776 .await;
2777 }
2778 }
2779 } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
2780 if let Some(stack) = bus.read_mut::<SagaStack>() {
2782 stack.push(node_id.to_string(), label.clone(), snapshot);
2783 }
2784
2785 if let Some(handle) = bus.read::<PersistenceHandle>() {
2786 let trace_id = persistence_trace_id(bus);
2787 let circuit = bus
2788 .read::<ranvier_core::schematic::Schematic>()
2789 .map(|s| s.name.clone())
2790 .unwrap_or_default();
2791 let version = bus
2792 .read::<ranvier_core::schematic::Schematic>()
2793 .map(|s| s.schema_version.clone())
2794 .unwrap_or_default();
2795
2796 persist_execution_event(
2797 handle,
2798 &trace_id,
2799 &circuit,
2800 &version,
2801 step_idx,
2802 Some(node_id.to_string()),
2803 outcome_kind_name(&result),
2804 Some(result.to_json_value()),
2805 )
2806 .await;
2807 }
2808 }
2809
2810 if let Outcome::Fault(f) = &result
2812 && let (Some(sink), Some(policy)) =
2813 (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
2814 {
2815 let should_dlq = !matches!(policy, DlqPolicy::Drop);
2816 if should_dlq {
2817 let trace_id = persistence_trace_id(bus);
2818 let circuit = bus
2819 .read::<ranvier_core::schematic::Schematic>()
2820 .map(|s| s.name.clone())
2821 .unwrap_or_default();
2822 let _ = sink
2823 .store_dead_letter(
2824 &trace_id,
2825 &circuit,
2826 node_id,
2827 &format!("{:?}", f),
2828 &serde_json::to_vec(&f).unwrap_or_default(),
2829 )
2830 .await;
2831 }
2832 }
2833
2834 result
2835}
2836
2837#[allow(clippy::too_many_arguments)]
2838pub async fn persist_execution_event(
2839 handle: &PersistenceHandle,
2840 trace_id: &str,
2841 circuit: &str,
2842 schematic_version: &str,
2843 step: u64,
2844 node_id: Option<String>,
2845 outcome_kind: &str,
2846 payload: Option<serde_json::Value>,
2847) {
2848 let store = handle.store();
2849 let envelope = PersistenceEnvelope {
2850 trace_id: trace_id.to_string(),
2851 circuit: circuit.to_string(),
2852 schematic_version: schematic_version.to_string(),
2853 step,
2854 node_id,
2855 outcome_kind: outcome_kind.to_string(),
2856 timestamp_ms: now_ms(),
2857 payload_hash: None,
2858 payload,
2859 };
2860
2861 if let Err(err) = store.append(envelope).await {
2862 tracing::warn!(
2863 trace_id = %trace_id,
2864 circuit = %circuit,
2865 step,
2866 outcome_kind = %outcome_kind,
2867 error = %err,
2868 "Failed to append persistence envelope"
2869 );
2870 }
2871}
2872
2873async fn persist_completion(
2874 handle: &PersistenceHandle,
2875 trace_id: &str,
2876 completion: CompletionState,
2877) {
2878 let store = handle.store();
2879 if let Err(err) = store.complete(trace_id, completion).await {
2880 tracing::warn!(
2881 trace_id = %trace_id,
2882 error = %err,
2883 "Failed to complete persistence trace"
2884 );
2885 }
2886}
2887
2888fn compensation_idempotency_key(context: &CompensationContext) -> String {
2889 format!(
2890 "{}:{}:{}",
2891 context.trace_id, context.circuit, context.fault_kind
2892 )
2893}
2894
2895async fn run_compensation(
2896 handle: &CompensationHandle,
2897 context: CompensationContext,
2898 retry_policy: CompensationRetryPolicy,
2899 idempotency: Option<CompensationIdempotencyHandle>,
2900) -> bool {
2901 let hook = handle.hook();
2902 let key = compensation_idempotency_key(&context);
2903
2904 if let Some(store_handle) = idempotency.as_ref() {
2905 let store = store_handle.store();
2906 match store.was_compensated(&key).await {
2907 Ok(true) => {
2908 tracing::info!(
2909 trace_id = %context.trace_id,
2910 circuit = %context.circuit,
2911 key = %key,
2912 "Compensation already recorded; skipping duplicate hook execution"
2913 );
2914 return true;
2915 }
2916 Ok(false) => {}
2917 Err(err) => {
2918 tracing::warn!(
2919 trace_id = %context.trace_id,
2920 key = %key,
2921 error = %err,
2922 "Failed to check compensation idempotency state"
2923 );
2924 }
2925 }
2926 }
2927
2928 let max_attempts = retry_policy.max_attempts.max(1);
2929 for attempt in 1..=max_attempts {
2930 match hook.compensate(context.clone()).await {
2931 Ok(()) => {
2932 if let Some(store_handle) = idempotency.as_ref() {
2933 let store = store_handle.store();
2934 if let Err(err) = store.mark_compensated(&key).await {
2935 tracing::warn!(
2936 trace_id = %context.trace_id,
2937 key = %key,
2938 error = %err,
2939 "Failed to mark compensation idempotency state"
2940 );
2941 }
2942 }
2943 return true;
2944 }
2945 Err(err) => {
2946 let is_last = attempt == max_attempts;
2947 tracing::warn!(
2948 trace_id = %context.trace_id,
2949 circuit = %context.circuit,
2950 fault_kind = %context.fault_kind,
2951 fault_step = context.fault_step,
2952 attempt,
2953 max_attempts,
2954 error = %err,
2955 "Compensation hook attempt failed"
2956 );
2957 if !is_last && retry_policy.backoff_ms > 0 {
2958 tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
2959 .await;
2960 }
2961 }
2962 }
2963 }
2964 false
2965}
2966
2967fn ensure_timeline(bus: &mut Bus) -> bool {
2968 if bus.has::<Timeline>() {
2969 false
2970 } else {
2971 bus.insert(Timeline::new());
2972 true
2973 }
2974}
2975
2976fn should_attach_timeline(bus: &Bus) -> bool {
2977 if bus.has::<Timeline>() {
2979 return true;
2980 }
2981
2982 has_timeline_output_path()
2984}
2985
2986fn has_timeline_output_path() -> bool {
2987 std::env::var("RANVIER_TIMELINE_OUTPUT")
2988 .ok()
2989 .map(|v| !v.trim().is_empty())
2990 .unwrap_or(false)
2991}
2992
2993fn timeline_sample_rate() -> f64 {
2994 std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
2995 .ok()
2996 .and_then(|v| v.parse::<f64>().ok())
2997 .map(|v| v.clamp(0.0, 1.0))
2998 .unwrap_or(1.0)
2999}
3000
3001fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
3002 if rate <= 0.0 {
3003 return false;
3004 }
3005 if rate >= 1.0 {
3006 return true;
3007 }
3008 let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
3009 bucket < rate
3010}
3011
3012fn timeline_adaptive_policy() -> String {
3013 std::env::var("RANVIER_TIMELINE_ADAPTIVE")
3014 .unwrap_or_else(|_| "fault_branch".to_string())
3015 .to_ascii_lowercase()
3016}
3017
3018fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
3019 match policy {
3020 "off" => false,
3021 "fault_only" => matches!(outcome, Outcome::Fault(_)),
3022 "fault_branch_emit" => {
3023 matches!(
3024 outcome,
3025 Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
3026 )
3027 }
3028 _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
3029 }
3030}
3031
3032#[derive(Default, Clone)]
3033struct SamplingStats {
3034 total_decisions: u64,
3035 exported: u64,
3036 skipped: u64,
3037 sampled_exports: u64,
3038 forced_exports: u64,
3039 last_mode: String,
3040 last_policy: String,
3041 last_updated_ms: u64,
3042}
3043
3044static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
3045
3046fn stats_cell() -> &'static Mutex<SamplingStats> {
3047 TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
3048}
3049
3050fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
3051 let snapshot = {
3052 let mut stats = match stats_cell().lock() {
3053 Ok(guard) => guard,
3054 Err(_) => return,
3055 };
3056
3057 stats.total_decisions += 1;
3058 if exported {
3059 stats.exported += 1;
3060 } else {
3061 stats.skipped += 1;
3062 }
3063 if sampled && exported {
3064 stats.sampled_exports += 1;
3065 }
3066 if forced && exported {
3067 stats.forced_exports += 1;
3068 }
3069 stats.last_mode = mode.to_string();
3070 stats.last_policy = policy.to_string();
3071 stats.last_updated_ms = now_ms();
3072 stats.clone()
3073 };
3074
3075 tracing::debug!(
3076 ranvier.timeline.total_decisions = snapshot.total_decisions,
3077 ranvier.timeline.exported = snapshot.exported,
3078 ranvier.timeline.skipped = snapshot.skipped,
3079 ranvier.timeline.sampled_exports = snapshot.sampled_exports,
3080 ranvier.timeline.forced_exports = snapshot.forced_exports,
3081 ranvier.timeline.mode = %snapshot.last_mode,
3082 ranvier.timeline.policy = %snapshot.last_policy,
3083 "Timeline sampling stats updated"
3084 );
3085
3086 if let Some(path) = timeline_stats_output_path() {
3087 let payload = serde_json::json!({
3088 "total_decisions": snapshot.total_decisions,
3089 "exported": snapshot.exported,
3090 "skipped": snapshot.skipped,
3091 "sampled_exports": snapshot.sampled_exports,
3092 "forced_exports": snapshot.forced_exports,
3093 "last_mode": snapshot.last_mode,
3094 "last_policy": snapshot.last_policy,
3095 "last_updated_ms": snapshot.last_updated_ms
3096 });
3097 if let Some(parent) = Path::new(&path).parent() {
3098 let _ = fs::create_dir_all(parent);
3099 }
3100 if let Err(err) = fs::write(&path, payload.to_string()) {
3101 tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
3102 }
3103 }
3104}
3105
3106fn timeline_stats_output_path() -> Option<String> {
3107 std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
3108 .ok()
3109 .filter(|v| !v.trim().is_empty())
3110}
3111
3112fn write_timeline_with_policy(
3113 path: &str,
3114 mode: &str,
3115 mut timeline: Timeline,
3116) -> Result<(), String> {
3117 match mode {
3118 "append" => {
3119 if Path::new(path).exists() {
3120 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
3121 match serde_json::from_str::<Timeline>(&content) {
3122 Ok(mut existing) => {
3123 existing.events.append(&mut timeline.events);
3124 existing.sort();
3125 if let Some(max_events) = max_events_limit() {
3126 truncate_timeline_events(&mut existing, max_events);
3127 }
3128 write_timeline_json(path, &existing)
3129 }
3130 Err(_) => {
3131 if let Some(max_events) = max_events_limit() {
3133 truncate_timeline_events(&mut timeline, max_events);
3134 }
3135 write_timeline_json(path, &timeline)
3136 }
3137 }
3138 } else {
3139 if let Some(max_events) = max_events_limit() {
3140 truncate_timeline_events(&mut timeline, max_events);
3141 }
3142 write_timeline_json(path, &timeline)
3143 }
3144 }
3145 "rotate" => {
3146 let rotated_path = rotated_path(path, now_ms());
3147 write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
3148 if let Some(keep) = rotate_keep_limit() {
3149 cleanup_rotated_files(path, keep)?;
3150 }
3151 Ok(())
3152 }
3153 _ => write_timeline_json(path, &timeline),
3154 }
3155}
3156
3157fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
3158 if let Some(parent) = Path::new(path).parent()
3159 && !parent.as_os_str().is_empty()
3160 {
3161 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
3162 }
3163 let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
3164 fs::write(path, json).map_err(|e| e.to_string())
3165}
3166
3167fn rotated_path(path: &str, suffix: u64) -> PathBuf {
3168 let p = Path::new(path);
3169 let parent = p.parent().unwrap_or_else(|| Path::new(""));
3170 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3171 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3172 parent.join(format!("{}_{}.{}", stem, suffix, ext))
3173}
3174
3175fn max_events_limit() -> Option<usize> {
3176 std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
3177 .ok()
3178 .and_then(|v| v.parse::<usize>().ok())
3179 .filter(|v| *v > 0)
3180}
3181
3182fn rotate_keep_limit() -> Option<usize> {
3183 std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
3184 .ok()
3185 .and_then(|v| v.parse::<usize>().ok())
3186 .filter(|v| *v > 0)
3187}
3188
3189fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
3190 let len = timeline.events.len();
3191 if len > max_events {
3192 let keep_from = len - max_events;
3193 timeline.events = timeline.events.split_off(keep_from);
3194 }
3195}
3196
3197fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
3198 let p = Path::new(base_path);
3199 let parent = p.parent().unwrap_or_else(|| Path::new("."));
3200 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
3201 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
3202 let prefix = format!("{}_", stem);
3203 let suffix = format!(".{}", ext);
3204
3205 let mut files = fs::read_dir(parent)
3206 .map_err(|e| e.to_string())?
3207 .filter_map(|entry| entry.ok())
3208 .filter(|entry| {
3209 let name = entry.file_name();
3210 let name = name.to_string_lossy();
3211 name.starts_with(&prefix) && name.ends_with(&suffix)
3212 })
3213 .map(|entry| {
3214 let modified = entry
3215 .metadata()
3216 .ok()
3217 .and_then(|m| m.modified().ok())
3218 .unwrap_or(SystemTime::UNIX_EPOCH);
3219 (entry.path(), modified)
3220 })
3221 .collect::<Vec<_>>();
3222
3223 files.sort_by(|a, b| b.1.cmp(&a.1));
3224 for (path, _) in files.into_iter().skip(keep) {
3225 let _ = fs::remove_file(path);
3226 }
3227 Ok(())
3228}
3229
3230fn bus_capability_schema_from_policy(
3231 policy: Option<ranvier_core::bus::BusAccessPolicy>,
3232) -> Option<BusCapabilitySchema> {
3233 let policy = policy?;
3234
3235 let mut allow = policy
3236 .allow
3237 .unwrap_or_default()
3238 .into_iter()
3239 .map(|entry| entry.type_name.to_string())
3240 .collect::<Vec<_>>();
3241 let mut deny = policy
3242 .deny
3243 .into_iter()
3244 .map(|entry| entry.type_name.to_string())
3245 .collect::<Vec<_>>();
3246 allow.sort();
3247 deny.sort();
3248
3249 if allow.is_empty() && deny.is_empty() {
3250 return None;
3251 }
3252
3253 Some(BusCapabilitySchema { allow, deny })
3254}
3255
3256fn now_ms() -> u64 {
3257 SystemTime::now()
3258 .duration_since(UNIX_EPOCH)
3259 .map(|d| d.as_millis() as u64)
3260 .unwrap_or(0)
3261}
3262
3263#[cfg(test)]
3264mod tests {
3265 use super::{
3266 Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
3267 should_force_export,
3268 };
3269 use crate::persistence::{
3270 CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
3271 CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
3272 InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
3273 PersistenceHandle, PersistenceStore, PersistenceTraceId,
3274 };
3275 use anyhow::Result;
3276 use async_trait::async_trait;
3277 use ranvier_audit::{AuditError, AuditEvent, AuditSink};
3278 use ranvier_core::event::{DlqPolicy, DlqSink};
3279 use ranvier_core::saga::SagaStack;
3280 use ranvier_core::timeline::{Timeline, TimelineEvent};
3281 use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
3282 use serde::{Deserialize, Serialize};
3283 use std::sync::Arc;
3284 use tokio::sync::Mutex;
3285 use uuid::Uuid;
3286
3287 struct MockAuditSink {
3288 events: Arc<Mutex<Vec<AuditEvent>>>,
3289 }
3290
3291 #[async_trait]
3292 impl AuditSink for MockAuditSink {
3293 async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
3294 self.events.lock().await.push(event.clone());
3295 Ok(())
3296 }
3297 }
3298
3299 #[tokio::test]
3300 async fn execute_logs_audit_events_for_intervention() {
3301 use ranvier_inspector::StateInspector;
3302
3303 let trace_id = "test-audit-trace";
3304 let store_impl = InMemoryPersistenceStore::new();
3305 let events = Arc::new(Mutex::new(Vec::new()));
3306 let sink = MockAuditSink {
3307 events: events.clone(),
3308 };
3309
3310 let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
3311 .then(AddOne)
3312 .with_persistence_store(store_impl.clone())
3313 .with_audit_sink(sink);
3314
3315 let mut bus = Bus::new();
3316 bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
3317 bus.insert(PersistenceTraceId::new(trace_id));
3318 let target_node_id = axon.schematic.nodes[0].id.clone();
3319
3320 store_impl
3322 .append(crate::persistence::PersistenceEnvelope {
3323 trace_id: trace_id.to_string(),
3324 circuit: "AuditTest".to_string(),
3325 schematic_version: "v1.0".to_string(),
3326 step: 0,
3327 node_id: None,
3328 outcome_kind: "Next".to_string(),
3329 timestamp_ms: 0,
3330 payload_hash: None,
3331 payload: None,
3332 })
3333 .await
3334 .unwrap();
3335
3336 axon.force_resume(trace_id, &target_node_id, None)
3338 .await
3339 .unwrap();
3340
3341 axon.execute(10, &(), &mut bus).await;
3343
3344 let recorded = events.lock().await;
3345 assert_eq!(
3346 recorded.len(),
3347 2,
3348 "Should have 2 audit events: ForceResume and ApplyIntervention"
3349 );
3350 assert_eq!(recorded[0].action, "ForceResume");
3351 assert_eq!(recorded[0].target, trace_id);
3352 assert_eq!(recorded[1].action, "ApplyIntervention");
3353 assert_eq!(recorded[1].target, trace_id);
3354 }
3355
3356 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
3357 pub enum TestInfallible {}
3358
3359 #[test]
3360 fn inspector_enabled_flag_matrix() {
3361 assert!(inspector_enabled_from_value(None));
3362 assert!(inspector_enabled_from_value(Some("1")));
3363 assert!(inspector_enabled_from_value(Some("true")));
3364 assert!(inspector_enabled_from_value(Some("on")));
3365 assert!(!inspector_enabled_from_value(Some("0")));
3366 assert!(!inspector_enabled_from_value(Some("false")));
3367 }
3368
3369 #[test]
3370 fn inspector_dev_mode_matrix() {
3371 assert!(inspector_dev_mode_from_value(None));
3372 assert!(inspector_dev_mode_from_value(Some("dev")));
3373 assert!(inspector_dev_mode_from_value(Some("staging")));
3374 assert!(!inspector_dev_mode_from_value(Some("prod")));
3375 assert!(!inspector_dev_mode_from_value(Some("production")));
3376 }
3377
3378 #[test]
3379 fn adaptive_policy_force_export_matrix() {
3380 let next = Outcome::<(), &'static str>::Next(());
3381 let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
3382 let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
3383 let fault = Outcome::<(), &'static str>::Fault("boom");
3384
3385 assert!(!should_force_export(&next, "off"));
3386 assert!(!should_force_export(&fault, "off"));
3387
3388 assert!(!should_force_export(&branch, "fault_only"));
3389 assert!(should_force_export(&fault, "fault_only"));
3390
3391 assert!(should_force_export(&branch, "fault_branch"));
3392 assert!(!should_force_export(&emit, "fault_branch"));
3393 assert!(should_force_export(&fault, "fault_branch"));
3394
3395 assert!(should_force_export(&branch, "fault_branch_emit"));
3396 assert!(should_force_export(&emit, "fault_branch_emit"));
3397 assert!(should_force_export(&fault, "fault_branch_emit"));
3398 }
3399
3400 #[test]
3401 fn sampling_and_adaptive_combination_decisions() {
3402 let bus_id = Uuid::nil();
3403 let next = Outcome::<(), &'static str>::Next(());
3404 let fault = Outcome::<(), &'static str>::Fault("boom");
3405
3406 let sampled_never = sampled_by_bus_id(bus_id, 0.0);
3407 assert!(!sampled_never);
3408 assert!(!(sampled_never || should_force_export(&next, "off")));
3409 assert!(sampled_never || should_force_export(&fault, "fault_only"));
3410
3411 let sampled_always = sampled_by_bus_id(bus_id, 1.0);
3412 assert!(sampled_always);
3413 assert!(sampled_always || should_force_export(&next, "off"));
3414 assert!(sampled_always || should_force_export(&fault, "off"));
3415 }
3416
3417 #[derive(Clone)]
3418 struct AddOne;
3419
3420 #[async_trait]
3421 impl Transition<i32, i32> for AddOne {
3422 type Error = TestInfallible;
3423 type Resources = ();
3424
3425 async fn run(
3426 &self,
3427 state: i32,
3428 _resources: &Self::Resources,
3429 _bus: &mut Bus,
3430 ) -> Outcome<i32, Self::Error> {
3431 Outcome::Next(state + 1)
3432 }
3433 }
3434
3435 #[derive(Clone)]
3436 struct AlwaysFault;
3437
3438 #[async_trait]
3439 impl Transition<i32, i32> for AlwaysFault {
3440 type Error = String;
3441 type Resources = ();
3442
3443 async fn run(
3444 &self,
3445 _state: i32,
3446 _resources: &Self::Resources,
3447 _bus: &mut Bus,
3448 ) -> Outcome<i32, Self::Error> {
3449 Outcome::Fault("boom".to_string())
3450 }
3451 }
3452
3453 #[derive(Clone)]
3454 struct CapabilityGuarded;
3455
3456 #[async_trait]
3457 impl Transition<(), ()> for CapabilityGuarded {
3458 type Error = String;
3459 type Resources = ();
3460
3461 fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
3462 Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
3463 }
3464
3465 async fn run(
3466 &self,
3467 _state: (),
3468 _resources: &Self::Resources,
3469 bus: &mut Bus,
3470 ) -> Outcome<(), Self::Error> {
3471 match bus.get::<String>() {
3472 Ok(_) => Outcome::Next(()),
3473 Err(err) => Outcome::Fault(err.to_string()),
3474 }
3475 }
3476 }
3477
3478 #[derive(Clone)]
3479 struct RecordingCompensationHook {
3480 calls: Arc<Mutex<Vec<CompensationContext>>>,
3481 should_fail: bool,
3482 }
3483
3484 #[async_trait]
3485 impl CompensationHook for RecordingCompensationHook {
3486 async fn compensate(&self, context: CompensationContext) -> Result<()> {
3487 self.calls.lock().await.push(context);
3488 if self.should_fail {
3489 return Err(anyhow::anyhow!("compensation failed"));
3490 }
3491 Ok(())
3492 }
3493 }
3494
3495 #[derive(Clone)]
3496 struct FlakyCompensationHook {
3497 calls: Arc<Mutex<u32>>,
3498 failures_remaining: Arc<Mutex<u32>>,
3499 }
3500
3501 #[async_trait]
3502 impl CompensationHook for FlakyCompensationHook {
3503 async fn compensate(&self, _context: CompensationContext) -> Result<()> {
3504 {
3505 let mut calls = self.calls.lock().await;
3506 *calls += 1;
3507 }
3508 let mut failures_remaining = self.failures_remaining.lock().await;
3509 if *failures_remaining > 0 {
3510 *failures_remaining -= 1;
3511 return Err(anyhow::anyhow!("transient compensation failure"));
3512 }
3513 Ok(())
3514 }
3515 }
3516
3517 #[derive(Clone)]
3518 struct FailingCompensationIdempotencyStore {
3519 read_calls: Arc<Mutex<u32>>,
3520 write_calls: Arc<Mutex<u32>>,
3521 }
3522
3523 #[async_trait]
3524 impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
3525 async fn was_compensated(&self, _key: &str) -> Result<bool> {
3526 let mut read_calls = self.read_calls.lock().await;
3527 *read_calls += 1;
3528 Err(anyhow::anyhow!("forced idempotency read failure"))
3529 }
3530
3531 async fn mark_compensated(&self, _key: &str) -> Result<()> {
3532 let mut write_calls = self.write_calls.lock().await;
3533 *write_calls += 1;
3534 Err(anyhow::anyhow!("forced idempotency write failure"))
3535 }
3536 }
3537
3538 #[tokio::test]
3539 async fn execute_persists_success_trace_when_handle_exists() {
3540 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3541 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3542
3543 let mut bus = Bus::new();
3544 bus.insert(PersistenceHandle::from_arc(store_dyn));
3545 bus.insert(PersistenceTraceId::new("trace-success"));
3546
3547 let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
3548 let outcome = axon.execute(41, &(), &mut bus).await;
3549 assert!(matches!(outcome, Outcome::Next(42)));
3550
3551 let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
3552 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[0].outcome_kind, "Enter");
3554 assert_eq!(persisted.events[1].outcome_kind, "Next"); assert_eq!(persisted.events[2].outcome_kind, "Next"); assert_eq!(persisted.completion, Some(CompletionState::Success));
3557 }
3558
3559 #[tokio::test]
3560 async fn execute_persists_fault_completion_state() {
3561 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3562 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3563
3564 let mut bus = Bus::new();
3565 bus.insert(PersistenceHandle::from_arc(store_dyn));
3566 bus.insert(PersistenceTraceId::new("trace-fault"));
3567
3568 let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
3569 let outcome = axon.execute(41, &(), &mut bus).await;
3570 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3571
3572 let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
3573 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3577 }
3578
3579 #[tokio::test]
3580 async fn execute_respects_persistence_auto_complete_off() {
3581 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3582 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3583
3584 let mut bus = Bus::new();
3585 bus.insert(PersistenceHandle::from_arc(store_dyn));
3586 bus.insert(PersistenceTraceId::new("trace-no-complete"));
3587 bus.insert(PersistenceAutoComplete(false));
3588
3589 let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
3590 let outcome = axon.execute(1, &(), &mut bus).await;
3591 assert!(matches!(outcome, Outcome::Next(2)));
3592
3593 let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
3594 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.completion, None);
3596 }
3597
3598 #[tokio::test]
3599 async fn fault_triggers_compensation_and_marks_compensated() {
3600 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3601 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3602 let calls = Arc::new(Mutex::new(Vec::new()));
3603 let compensation = RecordingCompensationHook {
3604 calls: calls.clone(),
3605 should_fail: false,
3606 };
3607
3608 let mut bus = Bus::new();
3609 bus.insert(PersistenceHandle::from_arc(store_dyn));
3610 bus.insert(PersistenceTraceId::new("trace-compensated"));
3611 bus.insert(CompensationHandle::from_hook(compensation));
3612
3613 let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
3614 let outcome = axon.execute(7, &(), &mut bus).await;
3615 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3616
3617 let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
3618 assert_eq!(persisted.events.len(), 4); assert_eq!(persisted.events[0].outcome_kind, "Enter");
3620 assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3623 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3624
3625 let recorded = calls.lock().await;
3626 assert_eq!(recorded.len(), 1);
3627 assert_eq!(recorded[0].trace_id, "trace-compensated");
3628 assert_eq!(recorded[0].fault_kind, "Fault");
3629 }
3630
3631 #[tokio::test]
3632 async fn failed_compensation_keeps_fault_completion() {
3633 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3634 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3635 let calls = Arc::new(Mutex::new(Vec::new()));
3636 let compensation = RecordingCompensationHook {
3637 calls: calls.clone(),
3638 should_fail: true,
3639 };
3640
3641 let mut bus = Bus::new();
3642 bus.insert(PersistenceHandle::from_arc(store_dyn));
3643 bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3644 bus.insert(CompensationHandle::from_hook(compensation));
3645
3646 let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3647 let outcome = axon.execute(7, &(), &mut bus).await;
3648 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3649
3650 let persisted = store_impl
3651 .load("trace-compensation-failed")
3652 .await
3653 .unwrap()
3654 .unwrap();
3655 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3658
3659 let recorded = calls.lock().await;
3660 assert_eq!(recorded.len(), 1);
3661 }
3662
3663 #[tokio::test]
3664 async fn compensation_retry_policy_succeeds_after_retries() {
3665 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3666 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3667 let calls = Arc::new(Mutex::new(0u32));
3668 let failures_remaining = Arc::new(Mutex::new(2u32));
3669 let compensation = FlakyCompensationHook {
3670 calls: calls.clone(),
3671 failures_remaining,
3672 };
3673
3674 let mut bus = Bus::new();
3675 bus.insert(PersistenceHandle::from_arc(store_dyn));
3676 bus.insert(PersistenceTraceId::new("trace-retry-success"));
3677 bus.insert(CompensationHandle::from_hook(compensation));
3678 bus.insert(CompensationRetryPolicy {
3679 max_attempts: 3,
3680 backoff_ms: 0,
3681 });
3682
3683 let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3684 let outcome = axon.execute(7, &(), &mut bus).await;
3685 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3686
3687 let persisted = store_impl
3688 .load("trace-retry-success")
3689 .await
3690 .unwrap()
3691 .unwrap();
3692 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3693 assert_eq!(
3694 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3695 Some("Compensated")
3696 );
3697
3698 let attempt_count = calls.lock().await;
3699 assert_eq!(*attempt_count, 3);
3700 }
3701
3702 #[tokio::test]
3703 async fn compensation_idempotency_skips_duplicate_hook_execution() {
3704 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3705 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3706 let calls = Arc::new(Mutex::new(Vec::new()));
3707 let compensation = RecordingCompensationHook {
3708 calls: calls.clone(),
3709 should_fail: false,
3710 };
3711 let idempotency = InMemoryCompensationIdempotencyStore::new();
3712
3713 let mut bus = Bus::new();
3714 bus.insert(PersistenceHandle::from_arc(store_dyn));
3715 bus.insert(PersistenceTraceId::new("trace-idempotent"));
3716 bus.insert(PersistenceAutoComplete(false));
3717 bus.insert(CompensationHandle::from_hook(compensation));
3718 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3719
3720 let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3721
3722 let outcome1 = axon.execute(7, &(), &mut bus).await;
3723 let outcome2 = axon.execute(8, &(), &mut bus).await;
3724 assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3725 assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3726
3727 let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3728 assert_eq!(persisted.completion, None);
3729 let compensated_count = persisted
3731 .events
3732 .iter()
3733 .filter(|e| e.outcome_kind == "Compensated")
3734 .count();
3735 assert_eq!(
3736 compensated_count, 2,
3737 "Should have 2 Compensated events (one per execution)"
3738 );
3739
3740 let recorded = calls.lock().await;
3741 assert_eq!(recorded.len(), 1);
3742 }
3743
3744 #[tokio::test]
3745 async fn compensation_idempotency_store_failure_does_not_block_compensation() {
3746 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3747 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3748 let calls = Arc::new(Mutex::new(Vec::new()));
3749 let read_calls = Arc::new(Mutex::new(0u32));
3750 let write_calls = Arc::new(Mutex::new(0u32));
3751 let compensation = RecordingCompensationHook {
3752 calls: calls.clone(),
3753 should_fail: false,
3754 };
3755 let idempotency = FailingCompensationIdempotencyStore {
3756 read_calls: read_calls.clone(),
3757 write_calls: write_calls.clone(),
3758 };
3759
3760 let mut bus = Bus::new();
3761 bus.insert(PersistenceHandle::from_arc(store_dyn));
3762 bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
3763 bus.insert(CompensationHandle::from_hook(compensation));
3764 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3765
3766 let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
3767 let outcome = axon.execute(9, &(), &mut bus).await;
3768 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3769
3770 let persisted = store_impl
3771 .load("trace-idempotency-store-failure")
3772 .await
3773 .unwrap()
3774 .unwrap();
3775 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3776 assert_eq!(
3777 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3778 Some("Compensated")
3779 );
3780
3781 let recorded = calls.lock().await;
3782 assert_eq!(recorded.len(), 1);
3783 assert_eq!(*read_calls.lock().await, 1);
3784 assert_eq!(*write_calls.lock().await, 1);
3785 }
3786
3787 #[tokio::test]
3788 async fn transition_bus_policy_blocks_unauthorized_resource_access() {
3789 let mut bus = Bus::new();
3790 bus.insert(1_i32);
3791 bus.insert("secret".to_string());
3792
3793 let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
3794 let outcome = axon.execute((), &(), &mut bus).await;
3795
3796 match outcome {
3797 Outcome::Fault(msg) => {
3798 assert!(msg.contains("Bus access denied"), "{msg}");
3799 assert!(msg.contains("CapabilityGuarded"), "{msg}");
3800 assert!(msg.contains("alloc::string::String"), "{msg}");
3801 }
3802 other => panic!("expected fault, got {other:?}"),
3803 }
3804 }
3805
3806 #[tokio::test]
3807 async fn execute_fails_on_version_mismatch_without_migration() {
3808 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3809 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3810
3811 let trace_id = "v-mismatch";
3812 let old_envelope = crate::persistence::PersistenceEnvelope {
3814 trace_id: trace_id.to_string(),
3815 circuit: "TestCircuit".to_string(),
3816 schematic_version: "0.9".to_string(),
3817 step: 0,
3818 node_id: None,
3819 outcome_kind: "Enter".to_string(),
3820 timestamp_ms: 0,
3821 payload_hash: None,
3822 payload: None,
3823 };
3824 store_impl.append(old_envelope).await.unwrap();
3825
3826 let mut bus = Bus::new();
3827 bus.insert(PersistenceHandle::from_arc(store_dyn));
3828 bus.insert(PersistenceTraceId::new(trace_id));
3829
3830 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3832 let outcome = axon.execute(10, &(), &mut bus).await;
3833
3834 if let Outcome::Emit(kind, _) = outcome {
3835 assert_eq!(kind, "execution.resumption.version_mismatch_failed");
3836 } else {
3837 panic!("Expected version mismatch emission, got {:?}", outcome);
3838 }
3839 }
3840
3841 #[tokio::test]
3842 async fn execute_resumes_from_start_on_migration_strategy() {
3843 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3844 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3845
3846 let trace_id = "v-migration";
3847 let old_envelope = crate::persistence::PersistenceEnvelope {
3849 trace_id: trace_id.to_string(),
3850 circuit: "TestCircuit".to_string(),
3851 schematic_version: "0.9".to_string(),
3852 step: 5,
3853 node_id: None,
3854 outcome_kind: "Next".to_string(),
3855 timestamp_ms: 0,
3856 payload_hash: None,
3857 payload: None,
3858 };
3859 store_impl.append(old_envelope).await.unwrap();
3860
3861 let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
3862 registry.register(ranvier_core::schematic::SnapshotMigration {
3863 name: Some("v0.9 to v1.0".to_string()),
3864 from_version: "0.9".to_string(),
3865 to_version: "1.0".to_string(),
3866 default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
3867 node_mapping: std::collections::HashMap::new(),
3868 payload_mapper: None,
3869 });
3870
3871 let mut bus = Bus::new();
3872 bus.insert(PersistenceHandle::from_arc(store_dyn));
3873 bus.insert(PersistenceTraceId::new(trace_id));
3874 bus.insert(registry);
3875
3876 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3877 let outcome = axon.execute(10, &(), &mut bus).await;
3878
3879 assert!(matches!(outcome, Outcome::Next(11)));
3881
3882 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3884 assert_eq!(persisted.schematic_version, "1.0");
3885 }
3886
3887 #[tokio::test]
3888 async fn execute_applies_manual_intervention_jump_and_payload() {
3889 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3890 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3891
3892 let trace_id = "intervention-test";
3893 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
3895 .then(AddOne)
3896 .then(AddOne);
3897
3898 let mut bus = Bus::new();
3899 bus.insert(PersistenceHandle::from_arc(store_dyn));
3900 bus.insert(PersistenceTraceId::new(trace_id));
3901
3902 let _target_node_label = "AddOne";
3907 let target_node_id = axon.schematic.nodes[2].id.clone();
3909
3910 store_impl
3912 .append(crate::persistence::PersistenceEnvelope {
3913 trace_id: trace_id.to_string(),
3914 circuit: "TestCircuit".to_string(),
3915 schematic_version: "1.0".to_string(),
3916 step: 0,
3917 node_id: None,
3918 outcome_kind: "Enter".to_string(),
3919 timestamp_ms: 0,
3920 payload_hash: None,
3921 payload: None,
3922 })
3923 .await
3924 .unwrap();
3925
3926 store_impl
3927 .save_intervention(
3928 trace_id,
3929 crate::persistence::Intervention {
3930 target_node: target_node_id.clone(),
3931 payload_override: Some(serde_json::json!(100)),
3932 timestamp_ms: 0,
3933 },
3934 )
3935 .await
3936 .unwrap();
3937
3938 let outcome = axon.execute(10, &(), &mut bus).await;
3941
3942 match outcome {
3943 Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
3944 other => panic!("Expected Outcome::Next(101), got {:?}", other),
3945 }
3946
3947 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3949 assert_eq!(persisted.interventions.len(), 1);
3951 assert_eq!(persisted.interventions[0].target_node, target_node_id);
3952 }
3953
3954 #[derive(Clone)]
3958 struct FailNThenSucceed {
3959 remaining: Arc<tokio::sync::Mutex<u32>>,
3960 }
3961
3962 #[async_trait]
3963 impl Transition<i32, i32> for FailNThenSucceed {
3964 type Error = String;
3965 type Resources = ();
3966
3967 async fn run(
3968 &self,
3969 state: i32,
3970 _resources: &Self::Resources,
3971 _bus: &mut Bus,
3972 ) -> Outcome<i32, Self::Error> {
3973 let mut rem = self.remaining.lock().await;
3974 if *rem > 0 {
3975 *rem -= 1;
3976 Outcome::Fault("transient failure".to_string())
3977 } else {
3978 Outcome::Next(state + 1)
3979 }
3980 }
3981 }
3982
3983 #[derive(Clone)]
3985 struct MockDlqSink {
3986 letters: Arc<tokio::sync::Mutex<Vec<String>>>,
3987 }
3988
3989 #[async_trait]
3990 impl DlqSink for MockDlqSink {
3991 async fn store_dead_letter(
3992 &self,
3993 workflow_id: &str,
3994 _circuit_label: &str,
3995 node_id: &str,
3996 error_msg: &str,
3997 _payload: &[u8],
3998 ) -> Result<(), String> {
3999 let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
4000 self.letters.lock().await.push(entry);
4001 Ok(())
4002 }
4003 }
4004
4005 #[tokio::test]
4006 async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
4007 let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
4009 let trans = FailNThenSucceed { remaining };
4010
4011 let dlq_sink = MockDlqSink {
4012 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4013 };
4014
4015 let mut bus = Bus::new();
4016 bus.insert(Timeline::new());
4017
4018 let axon = Axon::<i32, i32, String>::start("RetrySucceed")
4019 .then(trans)
4020 .with_dlq_policy(DlqPolicy::RetryThenDlq {
4021 max_attempts: 5,
4022 backoff_ms: 1,
4023 })
4024 .with_dlq_sink(dlq_sink.clone());
4025 let outcome = axon.execute(10, &(), &mut bus).await;
4026
4027 assert!(
4029 matches!(outcome, Outcome::Next(11)),
4030 "Expected Next(11), got {:?}",
4031 outcome
4032 );
4033
4034 let letters = dlq_sink.letters.lock().await;
4036 assert!(
4037 letters.is_empty(),
4038 "Should have 0 dead letters, got {}",
4039 letters.len()
4040 );
4041
4042 let timeline = bus.read::<Timeline>().unwrap();
4044 let retry_count = timeline
4045 .events
4046 .iter()
4047 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4048 .count();
4049 assert_eq!(retry_count, 2, "Should have 2 retry events");
4050 }
4051
4052 #[tokio::test]
4053 async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
4054 let mut bus = Bus::new();
4056 bus.insert(Timeline::new());
4057
4058 let dlq_sink = MockDlqSink {
4059 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4060 };
4061
4062 let axon = Axon::<i32, i32, String>::start("RetryExhaust")
4063 .then(AlwaysFault)
4064 .with_dlq_policy(DlqPolicy::RetryThenDlq {
4065 max_attempts: 3,
4066 backoff_ms: 1,
4067 })
4068 .with_dlq_sink(dlq_sink.clone());
4069 let outcome = axon.execute(42, &(), &mut bus).await;
4070
4071 assert!(
4072 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4073 "Expected Fault(boom), got {:?}",
4074 outcome
4075 );
4076
4077 let letters = dlq_sink.letters.lock().await;
4079 assert_eq!(letters.len(), 1, "Should have 1 dead letter");
4080
4081 let timeline = bus.read::<Timeline>().unwrap();
4083 let retry_count = timeline
4084 .events
4085 .iter()
4086 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4087 .count();
4088 let dlq_count = timeline
4089 .events
4090 .iter()
4091 .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
4092 .count();
4093 assert_eq!(
4094 retry_count, 2,
4095 "Should have 2 retry events (attempts 2 and 3)"
4096 );
4097 assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
4098 }
4099
4100 #[tokio::test]
4101 async fn send_to_dlq_policy_sends_immediately_without_retry() {
4102 let mut bus = Bus::new();
4103 bus.insert(Timeline::new());
4104
4105 let dlq_sink = MockDlqSink {
4106 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4107 };
4108
4109 let axon = Axon::<i32, i32, String>::start("SendDlq")
4110 .then(AlwaysFault)
4111 .with_dlq_policy(DlqPolicy::SendToDlq)
4112 .with_dlq_sink(dlq_sink.clone());
4113 let outcome = axon.execute(1, &(), &mut bus).await;
4114
4115 assert!(matches!(outcome, Outcome::Fault(_)));
4116
4117 let letters = dlq_sink.letters.lock().await;
4119 assert_eq!(letters.len(), 1);
4120
4121 let timeline = bus.read::<Timeline>().unwrap();
4123 let retry_count = timeline
4124 .events
4125 .iter()
4126 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
4127 .count();
4128 assert_eq!(retry_count, 0);
4129 }
4130
4131 #[tokio::test]
4132 async fn drop_policy_does_not_send_to_dlq() {
4133 let mut bus = Bus::new();
4134
4135 let dlq_sink = MockDlqSink {
4136 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4137 };
4138
4139 let axon = Axon::<i32, i32, String>::start("DropDlq")
4140 .then(AlwaysFault)
4141 .with_dlq_policy(DlqPolicy::Drop)
4142 .with_dlq_sink(dlq_sink.clone());
4143 let outcome = axon.execute(1, &(), &mut bus).await;
4144
4145 assert!(matches!(outcome, Outcome::Fault(_)));
4146
4147 let letters = dlq_sink.letters.lock().await;
4149 assert!(letters.is_empty());
4150 }
4151
4152 #[tokio::test]
4153 async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
4154 use ranvier_core::policy::DynamicPolicy;
4155
4156 let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
4158 let dlq_sink = MockDlqSink {
4159 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
4160 };
4161
4162 let axon = Axon::<i32, i32, String>::start("DynamicDlq")
4163 .then(AlwaysFault)
4164 .with_dynamic_dlq_policy(dynamic)
4165 .with_dlq_sink(dlq_sink.clone());
4166
4167 let mut bus = Bus::new();
4169 let outcome = axon.execute(1, &(), &mut bus).await;
4170 assert!(matches!(outcome, Outcome::Fault(_)));
4171 assert!(
4172 dlq_sink.letters.lock().await.is_empty(),
4173 "Drop policy should produce no DLQ entries"
4174 );
4175
4176 tx.send(DlqPolicy::SendToDlq).unwrap();
4178
4179 let mut bus2 = Bus::new();
4181 let outcome2 = axon.execute(2, &(), &mut bus2).await;
4182 assert!(matches!(outcome2, Outcome::Fault(_)));
4183 assert_eq!(
4184 dlq_sink.letters.lock().await.len(),
4185 1,
4186 "SendToDlq policy should produce 1 DLQ entry"
4187 );
4188 }
4189
4190 #[tokio::test]
4191 async fn dynamic_saga_policy_hot_reload() {
4192 use ranvier_core::policy::DynamicPolicy;
4193 use ranvier_core::saga::SagaPolicy;
4194
4195 let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
4197
4198 let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
4199 .then(AddOne)
4200 .with_dynamic_saga_policy(dynamic);
4201
4202 let mut bus = Bus::new();
4204 let _outcome = axon.execute(1, &(), &mut bus).await;
4205 assert!(
4206 bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
4207 "SagaStack should be absent or empty when disabled"
4208 );
4209
4210 tx.send(SagaPolicy::Enabled).unwrap();
4212
4213 let mut bus2 = Bus::new();
4215 let _outcome2 = axon.execute(10, &(), &mut bus2).await;
4216 assert!(
4217 bus2.read::<SagaStack>().is_some(),
4218 "SagaStack should exist when saga is enabled"
4219 );
4220 }
4221
4222 mod iam_tests {
4225 use super::*;
4226 use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
4227
4228 #[derive(Clone)]
4230 struct MockVerifier {
4231 identity: IamIdentity,
4232 should_fail: bool,
4233 }
4234
4235 #[async_trait]
4236 impl IamVerifier for MockVerifier {
4237 async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
4238 if self.should_fail {
4239 Err(IamError::InvalidToken("mock verification failure".into()))
4240 } else {
4241 Ok(self.identity.clone())
4242 }
4243 }
4244 }
4245
4246 #[tokio::test]
4247 async fn iam_require_identity_passes_with_valid_token() {
4248 let verifier = MockVerifier {
4249 identity: IamIdentity::new("alice").with_role("user"),
4250 should_fail: false,
4251 };
4252
4253 let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
4254 .with_iam(IamPolicy::RequireIdentity, verifier)
4255 .then(AddOne);
4256
4257 let mut bus = Bus::new();
4258 bus.insert(IamToken("valid-token".to_string()));
4259 let outcome = axon.execute(10, &(), &mut bus).await;
4260
4261 assert!(matches!(outcome, Outcome::Next(11)));
4262 let identity = bus
4264 .read::<IamIdentity>()
4265 .expect("IamIdentity should be in Bus");
4266 assert_eq!(identity.subject, "alice");
4267 }
4268
4269 #[tokio::test]
4270 async fn iam_require_identity_rejects_missing_token() {
4271 let verifier = MockVerifier {
4272 identity: IamIdentity::new("ignored"),
4273 should_fail: false,
4274 };
4275
4276 let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
4277 .with_iam(IamPolicy::RequireIdentity, verifier)
4278 .then(AddOne);
4279
4280 let mut bus = Bus::new();
4281 let outcome = axon.execute(10, &(), &mut bus).await;
4283
4284 match &outcome {
4286 Outcome::Emit(label, _) => {
4287 assert_eq!(label, "iam.missing_token");
4288 }
4289 other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
4290 }
4291 }
4292
4293 #[tokio::test]
4294 async fn iam_rejects_failed_verification() {
4295 let verifier = MockVerifier {
4296 identity: IamIdentity::new("ignored"),
4297 should_fail: true,
4298 };
4299
4300 let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
4301 .with_iam(IamPolicy::RequireIdentity, verifier)
4302 .then(AddOne);
4303
4304 let mut bus = Bus::new();
4305 bus.insert(IamToken("bad-token".to_string()));
4306 let outcome = axon.execute(10, &(), &mut bus).await;
4307
4308 match &outcome {
4309 Outcome::Emit(label, _) => {
4310 assert_eq!(label, "iam.verification_failed");
4311 }
4312 other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
4313 }
4314 }
4315
4316 #[tokio::test]
4317 async fn iam_require_role_passes_with_matching_role() {
4318 let verifier = MockVerifier {
4319 identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
4320 should_fail: false,
4321 };
4322
4323 let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
4324 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4325 .then(AddOne);
4326
4327 let mut bus = Bus::new();
4328 bus.insert(IamToken("token".to_string()));
4329 let outcome = axon.execute(5, &(), &mut bus).await;
4330
4331 assert!(matches!(outcome, Outcome::Next(6)));
4332 }
4333
4334 #[tokio::test]
4335 async fn iam_require_role_denies_without_role() {
4336 let verifier = MockVerifier {
4337 identity: IamIdentity::new("carol").with_role("user"),
4338 should_fail: false,
4339 };
4340
4341 let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
4342 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
4343 .then(AddOne);
4344
4345 let mut bus = Bus::new();
4346 bus.insert(IamToken("token".to_string()));
4347 let outcome = axon.execute(5, &(), &mut bus).await;
4348
4349 match &outcome {
4350 Outcome::Emit(label, _) => {
4351 assert_eq!(label, "iam.policy_denied");
4352 }
4353 other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
4354 }
4355 }
4356
4357 #[tokio::test]
4358 async fn iam_policy_none_skips_verification() {
4359 let verifier = MockVerifier {
4360 identity: IamIdentity::new("ignored"),
4361 should_fail: true, };
4363
4364 let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
4365 .with_iam(IamPolicy::None, verifier)
4366 .then(AddOne);
4367
4368 let mut bus = Bus::new();
4369 let outcome = axon.execute(10, &(), &mut bus).await;
4371
4372 assert!(matches!(outcome, Outcome::Next(11)));
4373 }
4374 }
4375
4376 #[derive(Clone)]
4379 struct SchemaTransition;
4380
4381 #[async_trait]
4382 impl Transition<String, String> for SchemaTransition {
4383 type Error = String;
4384 type Resources = ();
4385
4386 fn input_schema(&self) -> Option<serde_json::Value> {
4387 Some(serde_json::json!({
4388 "type": "object",
4389 "required": ["name"],
4390 "properties": {
4391 "name": { "type": "string" }
4392 }
4393 }))
4394 }
4395
4396 async fn run(
4397 &self,
4398 state: String,
4399 _resources: &Self::Resources,
4400 _bus: &mut Bus,
4401 ) -> Outcome<String, Self::Error> {
4402 Outcome::Next(state)
4403 }
4404 }
4405
4406 #[test]
4407 fn then_auto_populates_input_schema_from_transition() {
4408 let axon = Axon::<String, String, String>::new("SchemaTest").then(SchemaTransition);
4409
4410 let last_node = axon.schematic.nodes.last().unwrap();
4412 assert!(last_node.input_schema.is_some());
4413 let schema = last_node.input_schema.as_ref().unwrap();
4414 assert_eq!(schema["type"], "object");
4415 assert_eq!(schema["required"][0], "name");
4416 }
4417
4418 #[test]
4419 fn then_leaves_input_schema_none_when_not_provided() {
4420 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchema").then(AddOne);
4421
4422 let last_node = axon.schematic.nodes.last().unwrap();
4423 assert!(last_node.input_schema.is_none());
4424 }
4425
4426 #[test]
4427 fn with_input_schema_value_sets_on_last_node() {
4428 let schema = serde_json::json!({"type": "integer"});
4429 let axon = Axon::<i32, i32, TestInfallible>::new("ManualSchema")
4430 .then(AddOne)
4431 .with_input_schema_value(schema.clone());
4432
4433 let last_node = axon.schematic.nodes.last().unwrap();
4434 assert_eq!(last_node.input_schema.as_ref().unwrap(), &schema);
4435 }
4436
4437 #[test]
4438 fn with_output_schema_value_sets_on_last_node() {
4439 let schema = serde_json::json!({"type": "integer"});
4440 let axon = Axon::<i32, i32, TestInfallible>::new("OutputSchema")
4441 .then(AddOne)
4442 .with_output_schema_value(schema.clone());
4443
4444 let last_node = axon.schematic.nodes.last().unwrap();
4445 assert_eq!(last_node.output_schema.as_ref().unwrap(), &schema);
4446 }
4447
4448 #[test]
4449 fn schematic_export_includes_schema_fields() {
4450 let axon = Axon::<String, String, String>::new("ExportTest")
4451 .then(SchemaTransition)
4452 .with_output_schema_value(serde_json::json!({"type": "string"}));
4453
4454 let json = serde_json::to_value(&axon.schematic).unwrap();
4455 let nodes = json["nodes"].as_array().unwrap();
4456 let last = nodes.last().unwrap();
4458 assert!(last.get("input_schema").is_some());
4459 assert_eq!(last["input_schema"]["type"], "object");
4460 assert_eq!(last["output_schema"]["type"], "string");
4461 }
4462
4463 #[test]
4464 fn schematic_export_omits_schema_fields_when_none() {
4465 let axon = Axon::<i32, i32, TestInfallible>::new("NoSchemaExport").then(AddOne);
4466
4467 let json = serde_json::to_value(&axon.schematic).unwrap();
4468 let nodes = json["nodes"].as_array().unwrap();
4469 let last = nodes.last().unwrap();
4470 let obj = last.as_object().unwrap();
4471 assert!(!obj.contains_key("input_schema"));
4472 assert!(!obj.contains_key("output_schema"));
4473 }
4474
4475 #[test]
4476 fn schematic_json_roundtrip_preserves_schemas() {
4477 let axon = Axon::<String, String, String>::new("Roundtrip")
4478 .then(SchemaTransition)
4479 .with_output_schema_value(serde_json::json!({"type": "string"}));
4480
4481 let json_str = serde_json::to_string(&axon.schematic).unwrap();
4482 let deserialized: ranvier_core::schematic::Schematic =
4483 serde_json::from_str(&json_str).unwrap();
4484
4485 let last = deserialized.nodes.last().unwrap();
4486 assert!(last.input_schema.is_some());
4487 assert!(last.output_schema.is_some());
4488 assert_eq!(last.input_schema.as_ref().unwrap()["required"][0], "name");
4489 assert_eq!(last.output_schema.as_ref().unwrap()["type"], "string");
4490 }
4491
4492 #[derive(Clone)]
4494 struct MultiplyByTwo;
4495
4496 #[async_trait]
4497 impl Transition<i32, i32> for MultiplyByTwo {
4498 type Error = TestInfallible;
4499 type Resources = ();
4500
4501 async fn run(
4502 &self,
4503 state: i32,
4504 _resources: &Self::Resources,
4505 _bus: &mut Bus,
4506 ) -> Outcome<i32, Self::Error> {
4507 Outcome::Next(state * 2)
4508 }
4509 }
4510
4511 #[derive(Clone)]
4512 struct AddTen;
4513
4514 #[async_trait]
4515 impl Transition<i32, i32> for AddTen {
4516 type Error = TestInfallible;
4517 type Resources = ();
4518
4519 async fn run(
4520 &self,
4521 state: i32,
4522 _resources: &Self::Resources,
4523 _bus: &mut Bus,
4524 ) -> Outcome<i32, Self::Error> {
4525 Outcome::Next(state + 10)
4526 }
4527 }
4528
4529 #[derive(Clone)]
4530 struct AddOneString;
4531
4532 #[async_trait]
4533 impl Transition<i32, i32> for AddOneString {
4534 type Error = String;
4535 type Resources = ();
4536
4537 async fn run(
4538 &self,
4539 state: i32,
4540 _resources: &Self::Resources,
4541 _bus: &mut Bus,
4542 ) -> Outcome<i32, Self::Error> {
4543 Outcome::Next(state + 1)
4544 }
4545 }
4546
4547 #[derive(Clone)]
4548 struct AddTenString;
4549
4550 #[async_trait]
4551 impl Transition<i32, i32> for AddTenString {
4552 type Error = String;
4553 type Resources = ();
4554
4555 async fn run(
4556 &self,
4557 state: i32,
4558 _resources: &Self::Resources,
4559 _bus: &mut Bus,
4560 ) -> Outcome<i32, Self::Error> {
4561 Outcome::Next(state + 10)
4562 }
4563 }
4564
4565 #[tokio::test]
4566 async fn axon_single_step_chain_executes_and_returns_next() {
4567 let mut bus = Bus::new();
4568 let axon = Axon::<i32, i32, TestInfallible>::start("SingleStep").then(AddOne);
4569
4570 let outcome = axon.execute(5, &(), &mut bus).await;
4571 assert!(matches!(outcome, Outcome::Next(6)));
4572 }
4573
4574 #[tokio::test]
4575 async fn axon_three_step_chain_executes_in_order() {
4576 let mut bus = Bus::new();
4577 let axon = Axon::<i32, i32, TestInfallible>::start("ThreeStep")
4578 .then(AddOne)
4579 .then(MultiplyByTwo)
4580 .then(AddTen);
4581
4582 let outcome = axon.execute(5, &(), &mut bus).await;
4584 assert!(matches!(outcome, Outcome::Next(22)));
4585 }
4586
4587 #[tokio::test]
4588 async fn axon_with_fault_in_middle_step_propagates_error() {
4589 let mut bus = Bus::new();
4590
4591 let axon = Axon::<i32, i32, String>::start("FaultInMiddle")
4596 .then(AddOneString)
4597 .then(AlwaysFault)
4598 .then(AddTenString);
4599
4600 let outcome = axon.execute(5, &(), &mut bus).await;
4601 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
4602 }
4603
4604 #[test]
4605 fn axon_schematic_has_correct_node_count_after_chaining() {
4606 let axon = Axon::<i32, i32, TestInfallible>::start("NodeCount")
4607 .then(AddOne)
4608 .then(MultiplyByTwo)
4609 .then(AddTen);
4610
4611 assert_eq!(axon.schematic.nodes.len(), 4);
4613 assert_eq!(axon.schematic.name, "NodeCount");
4614 }
4615
4616 #[tokio::test]
4617 async fn axon_execution_records_timeline_events() {
4618 let mut bus = Bus::new();
4619 bus.insert(Timeline::new());
4620
4621 let axon = Axon::<i32, i32, TestInfallible>::start("TimelineTest")
4622 .then(AddOne)
4623 .then(MultiplyByTwo);
4624
4625 let outcome = axon.execute(3, &(), &mut bus).await;
4626 assert!(matches!(outcome, Outcome::Next(8))); let timeline = bus.read::<Timeline>().unwrap();
4629
4630 let enter_count = timeline
4632 .events
4633 .iter()
4634 .filter(|e| matches!(e, TimelineEvent::NodeEnter { .. }))
4635 .count();
4636 let exit_count = timeline
4637 .events
4638 .iter()
4639 .filter(|e| matches!(e, TimelineEvent::NodeExit { .. }))
4640 .count();
4641
4642 assert!(enter_count >= 1, "Should have at least 1 NodeEnter event");
4644 assert!(exit_count >= 1, "Should have at least 1 NodeExit event");
4645 }
4646
4647 #[tokio::test]
4650 async fn parallel_all_succeed_returns_first_next() {
4651 use super::ParallelStrategy;
4652
4653 let mut bus = Bus::new();
4654 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelAllSucceed")
4655 .parallel(
4656 vec![
4657 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4658 Arc::new(MultiplyByTwo),
4659 ],
4660 ParallelStrategy::AllMustSucceed,
4661 );
4662
4663 let outcome = axon.execute(5, &(), &mut bus).await;
4666 assert!(matches!(outcome, Outcome::Next(6)));
4667 }
4668
4669 #[tokio::test]
4670 async fn parallel_all_must_succeed_returns_fault_when_any_fails() {
4671 use super::ParallelStrategy;
4672
4673 let mut bus = Bus::new();
4674 let axon = Axon::<i32, i32, String>::start("ParallelAllFault")
4675 .parallel(
4676 vec![
4677 Arc::new(AddOneString) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4678 Arc::new(AlwaysFault),
4679 ],
4680 ParallelStrategy::AllMustSucceed,
4681 );
4682
4683 let outcome = axon.execute(5, &(), &mut bus).await;
4684 assert!(
4685 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4686 "Expected Fault(boom), got {:?}",
4687 outcome
4688 );
4689 }
4690
4691 #[tokio::test]
4692 async fn parallel_any_can_fail_returns_success_despite_fault() {
4693 use super::ParallelStrategy;
4694
4695 let mut bus = Bus::new();
4696 let axon = Axon::<i32, i32, String>::start("ParallelAnyCanFail")
4697 .parallel(
4698 vec![
4699 Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4700 Arc::new(AddOneString),
4701 ],
4702 ParallelStrategy::AnyCanFail,
4703 );
4704
4705 let outcome = axon.execute(5, &(), &mut bus).await;
4707 assert!(
4708 matches!(outcome, Outcome::Next(6)),
4709 "Expected Next(6), got {:?}",
4710 outcome
4711 );
4712 }
4713
4714 #[tokio::test]
4715 async fn parallel_any_can_fail_all_fault_returns_first_fault() {
4716 use super::ParallelStrategy;
4717
4718 #[derive(Clone)]
4719 struct AlwaysFault2;
4720 #[async_trait]
4721 impl Transition<i32, i32> for AlwaysFault2 {
4722 type Error = String;
4723 type Resources = ();
4724 async fn run(&self, _state: i32, _resources: &(), _bus: &mut Bus) -> Outcome<i32, String> {
4725 Outcome::Fault("boom2".to_string())
4726 }
4727 }
4728
4729 let mut bus = Bus::new();
4730 let axon = Axon::<i32, i32, String>::start("ParallelAllFault2")
4731 .parallel(
4732 vec![
4733 Arc::new(AlwaysFault) as Arc<dyn Transition<i32, i32, Resources = (), Error = String> + Send + Sync>,
4734 Arc::new(AlwaysFault2),
4735 ],
4736 ParallelStrategy::AnyCanFail,
4737 );
4738
4739 let outcome = axon.execute(5, &(), &mut bus).await;
4740 assert!(
4742 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
4743 "Expected Fault(boom), got {:?}",
4744 outcome
4745 );
4746 }
4747
4748 #[test]
4749 fn parallel_schematic_has_fanout_fanin_nodes() {
4750 use super::ParallelStrategy;
4751 use ranvier_core::schematic::{EdgeType, NodeKind};
4752
4753 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelSchematic")
4754 .parallel(
4755 vec![
4756 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4757 Arc::new(MultiplyByTwo),
4758 Arc::new(AddTen),
4759 ],
4760 ParallelStrategy::AllMustSucceed,
4761 );
4762
4763 assert_eq!(axon.schematic.nodes.len(), 6);
4765 assert!(matches!(axon.schematic.nodes[1].kind, NodeKind::FanOut));
4766 assert!(matches!(axon.schematic.nodes[2].kind, NodeKind::Atom));
4767 assert!(matches!(axon.schematic.nodes[3].kind, NodeKind::Atom));
4768 assert!(matches!(axon.schematic.nodes[4].kind, NodeKind::Atom));
4769 assert!(matches!(axon.schematic.nodes[5].kind, NodeKind::FanIn));
4770
4771 assert!(axon.schematic.nodes[1]
4773 .description
4774 .as_ref()
4775 .unwrap()
4776 .contains("3 branches"));
4777
4778 let parallel_edges: Vec<_> = axon
4780 .schematic
4781 .edges
4782 .iter()
4783 .filter(|e| matches!(e.kind, EdgeType::Parallel))
4784 .collect();
4785 assert_eq!(parallel_edges.len(), 6);
4787 }
4788
4789 #[tokio::test]
4790 async fn parallel_then_chain_composes_correctly() {
4791 use super::ParallelStrategy;
4792
4793 let mut bus = Bus::new();
4794 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelThenChain")
4795 .then(AddOne)
4796 .parallel(
4797 vec![
4798 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4799 Arc::new(MultiplyByTwo),
4800 ],
4801 ParallelStrategy::AllMustSucceed,
4802 )
4803 .then(AddTen);
4804
4805 let outcome = axon.execute(5, &(), &mut bus).await;
4807 assert!(
4808 matches!(outcome, Outcome::Next(17)),
4809 "Expected Next(17), got {:?}",
4810 outcome
4811 );
4812 }
4813
4814 #[tokio::test]
4815 async fn parallel_records_timeline_events() {
4816 use super::ParallelStrategy;
4817 use ranvier_core::timeline::TimelineEvent;
4818
4819 let mut bus = Bus::new();
4820 bus.insert(Timeline::new());
4821
4822 let axon = Axon::<i32, i32, TestInfallible>::start("ParallelTimeline")
4823 .parallel(
4824 vec![
4825 Arc::new(AddOne) as Arc<dyn Transition<i32, i32, Resources = (), Error = TestInfallible> + Send + Sync>,
4826 Arc::new(MultiplyByTwo),
4827 ],
4828 ParallelStrategy::AllMustSucceed,
4829 );
4830
4831 let outcome = axon.execute(3, &(), &mut bus).await;
4832 assert!(matches!(outcome, Outcome::Next(4)));
4833
4834 let timeline = bus.read::<Timeline>().unwrap();
4835
4836 let fanout_enters = timeline
4838 .events
4839 .iter()
4840 .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanOut"))
4841 .count();
4842 let fanin_enters = timeline
4843 .events
4844 .iter()
4845 .filter(|e| matches!(e, TimelineEvent::NodeEnter { node_label, .. } if node_label == "FanIn"))
4846 .count();
4847
4848 assert_eq!(fanout_enters, 1, "Should have 1 FanOut enter");
4849 assert_eq!(fanin_enters, 1, "Should have 1 FanIn enter");
4850 }
4851}