1use crate::persistence::{
15 CompensationAutoTrigger, CompensationContext, CompensationHandle,
16 CompensationIdempotencyHandle, CompensationRetryPolicy, CompletionState,
17 PersistenceAutoComplete, PersistenceEnvelope, PersistenceHandle, PersistenceTraceId,
18};
19use async_trait::async_trait;
20use ranvier_audit::{AuditEvent, AuditSink};
21use ranvier_core::bus::Bus;
22use ranvier_core::cluster::DistributedLock;
23use ranvier_core::event::{DlqPolicy, DlqSink};
24use ranvier_core::outcome::Outcome;
25use ranvier_core::policy::DynamicPolicy;
26use ranvier_core::saga::{SagaPolicy, SagaStack};
27use ranvier_core::schematic::{
28 BusCapabilitySchema, Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation,
29};
30use ranvier_core::timeline::{Timeline, TimelineEvent};
31use ranvier_core::transition::Transition;
32use serde::{Serialize, de::DeserializeOwned};
33use serde_json::Value;
34use std::any::type_name;
35use std::ffi::OsString;
36use std::fs;
37use std::future::Future;
38use std::panic::Location;
39use std::path::{Path, PathBuf};
40use std::pin::Pin;
41use std::sync::{Arc, Mutex, OnceLock};
42use std::time::{SystemTime, UNIX_EPOCH};
43use tracing::Instrument;
44
45#[derive(Clone)]
47pub enum ExecutionMode {
48 Local,
50 Singleton {
52 lock_key: String,
53 ttl_ms: u64,
54 lock_provider: Arc<dyn DistributedLock>,
55 },
56}
57
58pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
60
61pub type Executor<In, Out, E, Res> =
65 Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
66
67#[derive(Debug, Clone)]
69pub struct ManualJump {
70 pub target_node: String,
71 pub payload_override: Option<serde_json::Value>,
72}
73
74#[derive(Debug, Clone, Copy)]
76struct StartStep(u64);
77
78#[derive(Debug, Clone)]
80struct ResumptionState {
81 payload: Option<serde_json::Value>,
82}
83
84fn type_name_of<T: ?Sized>() -> String {
86 let full = type_name::<T>();
87 full.split("::").last().unwrap_or(full).to_string()
88}
89
90pub struct Axon<In, Out, E, Res = ()> {
110 pub schematic: Schematic,
112 executor: Executor<In, Out, E, Res>,
114 pub execution_mode: ExecutionMode,
116 pub persistence_store: Option<Arc<dyn crate::persistence::PersistenceStore>>,
118 pub audit_sink: Option<Arc<dyn AuditSink>>,
120 pub dlq_sink: Option<Arc<dyn DlqSink>>,
122 pub dlq_policy: DlqPolicy,
124 pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
126 pub saga_policy: SagaPolicy,
128 pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
130 pub saga_compensation_registry:
132 Arc<std::sync::RwLock<ranvier_core::saga::SagaCompensationRegistry<E, Res>>>,
133 pub iam_handle: Option<ranvier_core::iam::IamHandle>,
135}
136
137#[derive(Debug, Clone)]
139pub struct SchematicExportRequest {
140 pub output: Option<PathBuf>,
142}
143
144impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
145 fn clone(&self) -> Self {
146 Self {
147 schematic: self.schematic.clone(),
148 executor: self.executor.clone(),
149 execution_mode: self.execution_mode.clone(),
150 persistence_store: self.persistence_store.clone(),
151 audit_sink: self.audit_sink.clone(),
152 dlq_sink: self.dlq_sink.clone(),
153 dlq_policy: self.dlq_policy.clone(),
154 dynamic_dlq_policy: self.dynamic_dlq_policy.clone(),
155 saga_policy: self.saga_policy.clone(),
156 dynamic_saga_policy: self.dynamic_saga_policy.clone(),
157 saga_compensation_registry: self.saga_compensation_registry.clone(),
158 iam_handle: self.iam_handle.clone(),
159 }
160 }
161}
162
163impl<In, E, Res> Axon<In, In, E, Res>
164where
165 In: Send + Sync + Serialize + DeserializeOwned + 'static,
166 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
167 Res: ranvier_core::transition::ResourceRequirement,
168{
169 #[track_caller]
172 pub fn new(label: &str) -> Self {
173 let caller = Location::caller();
174 Self::start_with_source(label, caller)
175 }
176
177 #[track_caller]
180 pub fn start(label: &str) -> Self {
181 let caller = Location::caller();
182 Self::start_with_source(label, caller)
183 }
184
185 fn start_with_source(label: &str, caller: &'static Location<'static>) -> Self {
186 let node_id = uuid::Uuid::new_v4().to_string();
187 let node = Node {
188 id: node_id,
189 kind: NodeKind::Ingress,
190 label: label.to_string(),
191 description: None,
192 input_type: "void".to_string(),
193 output_type: type_name_of::<In>(),
194 resource_type: type_name_of::<Res>(),
195 metadata: Default::default(),
196 bus_capability: None,
197 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
198 position: None,
199 compensation_node_id: None,
200 };
201
202 let mut schematic = Schematic::new(label);
203 schematic.nodes.push(node);
204
205 let executor: Executor<In, In, E, Res> =
206 Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
207
208 Self {
209 schematic,
210 executor,
211 execution_mode: ExecutionMode::Local,
212 persistence_store: None,
213 audit_sink: None,
214 dlq_sink: None,
215 dlq_policy: DlqPolicy::default(),
216 dynamic_dlq_policy: None,
217 saga_policy: SagaPolicy::default(),
218 dynamic_saga_policy: None,
219 saga_compensation_registry: Arc::new(std::sync::RwLock::new(
220 ranvier_core::saga::SagaCompensationRegistry::new(),
221 )),
222 iam_handle: None,
223 }
224 }
225}
226
227impl<In, Out, E, Res> Axon<In, Out, E, Res>
228where
229 In: Send + Sync + Serialize + DeserializeOwned + 'static,
230 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
231 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
232 Res: ranvier_core::transition::ResourceRequirement,
233{
234 pub fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
236 self.execution_mode = mode;
237 self
238 }
239
240 pub fn with_version(mut self, version: impl Into<String>) -> Self {
242 self.schematic.schema_version = version.into();
243 self
244 }
245
246 pub fn with_persistence_store<S>(mut self, store: S) -> Self
248 where
249 S: crate::persistence::PersistenceStore + 'static,
250 {
251 self.persistence_store = Some(Arc::new(store));
252 self
253 }
254
255 pub fn with_audit_sink<S>(mut self, sink: S) -> Self
257 where
258 S: AuditSink + 'static,
259 {
260 self.audit_sink = Some(Arc::new(sink));
261 self
262 }
263
264 pub fn with_dlq_sink<S>(mut self, sink: S) -> Self
266 where
267 S: DlqSink + 'static,
268 {
269 self.dlq_sink = Some(Arc::new(sink));
270 self
271 }
272
273 pub fn with_dlq_policy(mut self, policy: DlqPolicy) -> Self {
275 self.dlq_policy = policy;
276 self
277 }
278
279 pub fn with_saga_policy(mut self, policy: SagaPolicy) -> Self {
281 self.saga_policy = policy;
282 self
283 }
284
285 pub fn with_dynamic_dlq_policy(mut self, dynamic: DynamicPolicy<DlqPolicy>) -> Self {
288 self.dynamic_dlq_policy = Some(dynamic);
289 self
290 }
291
292 pub fn with_dynamic_saga_policy(mut self, dynamic: DynamicPolicy<SagaPolicy>) -> Self {
295 self.dynamic_saga_policy = Some(dynamic);
296 self
297 }
298
299 pub fn with_iam(
307 mut self,
308 policy: ranvier_core::iam::IamPolicy,
309 verifier: impl ranvier_core::iam::IamVerifier + 'static,
310 ) -> Self {
311 self.iam_handle = Some(ranvier_core::iam::IamHandle::new(
312 policy,
313 Arc::new(verifier),
314 ));
315 self
316 }
317}
318
319#[async_trait]
320impl<In, Out, E, Res> ranvier_inspector::StateInspector for Axon<In, Out, E, Res>
321where
322 In: Send + Sync + Serialize + DeserializeOwned + 'static,
323 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
324 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
325 Res: ranvier_core::transition::ResourceRequirement,
326{
327 async fn get_state(&self, trace_id: &str) -> Option<serde_json::Value> {
328 let store = self.persistence_store.as_ref()?;
329 let trace = store.load(trace_id).await.ok().flatten()?;
330 Some(serde_json::to_value(trace).unwrap_or(serde_json::Value::Null))
331 }
332
333 async fn force_resume(
334 &self,
335 trace_id: &str,
336 target_node: &str,
337 payload_override: Option<Value>,
338 ) -> Result<(), String> {
339 let store = self
340 .persistence_store
341 .as_ref()
342 .ok_or("No persistence store attached to Axon")?;
343
344 let intervention = crate::persistence::Intervention {
345 target_node: target_node.to_string(),
346 payload_override,
347 timestamp_ms: now_ms(),
348 };
349
350 store
351 .save_intervention(trace_id, intervention)
352 .await
353 .map_err(|e| format!("Failed to save intervention: {}", e))?;
354
355 if let Some(sink) = self.audit_sink.as_ref() {
356 let event = AuditEvent::new(
357 uuid::Uuid::new_v4().to_string(),
358 "Inspector".to_string(),
359 "ForceResume".to_string(),
360 trace_id.to_string(),
361 )
362 .with_metadata("target_node", target_node);
363
364 let _ = sink.append(&event).await;
365 }
366
367 tracing::info!(trace_id = %trace_id, target_node = %target_node, "Force resume requested via Inspector");
368 Ok(())
369 }
370}
371
372impl<In, Out, E, Res> Axon<In, Out, E, Res>
373where
374 In: Send + Sync + Serialize + DeserializeOwned + 'static,
375 Out: Send + Sync + Serialize + DeserializeOwned + 'static,
376 E: Send + Sync + Serialize + DeserializeOwned + std::fmt::Debug + 'static,
377 Res: ranvier_core::transition::ResourceRequirement,
378{
379 #[track_caller]
383 pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
384 where
385 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
386 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
387 {
388 let caller = Location::caller();
389 let Axon {
391 mut schematic,
392 executor: prev_executor,
393 execution_mode,
394 persistence_store,
395 audit_sink,
396 dlq_sink,
397 dlq_policy,
398 dynamic_dlq_policy,
399 saga_policy,
400 dynamic_saga_policy,
401 saga_compensation_registry,
402 iam_handle,
403 } = self;
404
405 let next_node_id = uuid::Uuid::new_v4().to_string();
407 let next_node = Node {
408 id: next_node_id.clone(),
409 kind: NodeKind::Atom,
410 label: transition.label(),
411 description: transition.description(),
412 input_type: type_name_of::<Out>(),
413 output_type: type_name_of::<Next>(),
414 resource_type: type_name_of::<Res>(),
415 metadata: Default::default(),
416 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
417 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
418 position: transition
419 .position()
420 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
421 compensation_node_id: None,
422 };
423
424 let last_node_id = schematic
425 .nodes
426 .last()
427 .map(|n| n.id.clone())
428 .unwrap_or_default();
429
430 schematic.nodes.push(next_node);
431 schematic.edges.push(Edge {
432 from: last_node_id,
433 to: next_node_id.clone(),
434 kind: EdgeType::Linear,
435 label: Some("Next".to_string()),
436 });
437
438 let node_id_for_exec = next_node_id.clone();
440 let node_label_for_exec = transition.label();
441 let bus_policy_for_exec = transition.bus_access_policy();
442 let bus_policy_clone = bus_policy_for_exec.clone();
443 let current_step_idx = schematic.nodes.len() as u64 - 1;
444 let next_executor: Executor<In, Next, E, Res> = Arc::new(
445 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
446 let prev = prev_executor.clone();
447 let trans = transition.clone();
448 let timeline_node_id = node_id_for_exec.clone();
449 let timeline_node_label = node_label_for_exec.clone();
450 let transition_bus_policy = bus_policy_clone.clone();
451 let step_idx = current_step_idx;
452
453 Box::pin(async move {
454 if let Some(jump) = bus.read::<ManualJump>()
456 && (jump.target_node == timeline_node_id
457 || jump.target_node == timeline_node_label)
458 {
459 tracing::info!(
460 node_id = %timeline_node_id,
461 node_label = %timeline_node_label,
462 "Manual jump target reached; skipping previous steps"
463 );
464
465 let state = if let Some(ow) = jump.payload_override.clone() {
466 match serde_json::from_value::<Out>(ow) {
467 Ok(s) => s,
468 Err(e) => {
469 tracing::error!(
470 "Payload override deserialization failed: {}",
471 e
472 );
473 return Outcome::emit(
474 "execution.jump.payload_error",
475 Some(serde_json::json!({"error": e.to_string()})),
476 )
477 .map(|_: ()| unreachable!());
478 }
479 }
480 } else {
481 return Outcome::emit(
485 "execution.jump.missing_payload",
486 Some(serde_json::json!({"node_id": timeline_node_id})),
487 );
488 };
489
490 return run_this_step::<Out, Next, E, Res>(
492 &trans,
493 state,
494 res,
495 bus,
496 &timeline_node_id,
497 &timeline_node_label,
498 &transition_bus_policy,
499 step_idx,
500 )
501 .await;
502 }
503
504 if let Some(start) = bus.read::<StartStep>()
506 && step_idx == start.0
507 && bus.read::<ResumptionState>().is_some()
508 {
509 let fresh_state = serde_json::to_value(&input)
513 .ok()
514 .and_then(|v| serde_json::from_value::<Out>(v).ok());
515 let persisted_state = bus
516 .read::<ResumptionState>()
517 .and_then(|r| r.payload.clone())
518 .and_then(|p| serde_json::from_value::<Out>(p).ok());
519
520 if let Some(s) = fresh_state.or(persisted_state) {
521 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint");
522 return run_this_step::<Out, Next, E, Res>(
523 &trans,
524 s,
525 res,
526 bus,
527 &timeline_node_id,
528 &timeline_node_label,
529 &transition_bus_policy,
530 step_idx,
531 )
532 .await;
533 }
534
535 return Outcome::emit(
536 "execution.resumption.payload_error",
537 Some(serde_json::json!({"error": "no compatible resumption state"})),
538 )
539 .map(|_: ()| unreachable!());
540 }
541
542 let prev_result = prev(input, res, bus).await;
544
545 let state = match prev_result {
547 Outcome::Next(t) => t,
548 other => return other.map(|_| unreachable!()),
549 };
550
551 run_this_step::<Out, Next, E, Res>(
552 &trans,
553 state,
554 res,
555 bus,
556 &timeline_node_id,
557 &timeline_node_label,
558 &transition_bus_policy,
559 step_idx,
560 )
561 .await
562 })
563 },
564 );
565 Axon {
566 schematic,
567 executor: next_executor,
568 execution_mode,
569 persistence_store,
570 audit_sink,
571 dlq_sink,
572 dlq_policy,
573 dynamic_dlq_policy,
574 saga_policy,
575 dynamic_saga_policy,
576 saga_compensation_registry,
577 iam_handle,
578 }
579 }
580
581 #[track_caller]
586 pub fn then_compensated<Next, Trans, Comp>(
587 self,
588 transition: Trans,
589 compensation: Comp,
590 ) -> Axon<In, Next, E, Res>
591 where
592 Out: Clone,
593 Next: Send + Sync + Serialize + DeserializeOwned + 'static,
594 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
595 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
596 {
597 let caller = Location::caller();
598 let Axon {
599 mut schematic,
600 executor: prev_executor,
601 execution_mode,
602 persistence_store,
603 audit_sink,
604 dlq_sink,
605 dlq_policy,
606 dynamic_dlq_policy,
607 saga_policy,
608 dynamic_saga_policy,
609 saga_compensation_registry,
610 iam_handle,
611 } = self;
612
613 let next_node_id = uuid::Uuid::new_v4().to_string();
615 let comp_node_id = uuid::Uuid::new_v4().to_string();
616
617 let next_node = Node {
618 id: next_node_id.clone(),
619 kind: NodeKind::Atom,
620 label: transition.label(),
621 description: transition.description(),
622 input_type: type_name_of::<Out>(),
623 output_type: type_name_of::<Next>(),
624 resource_type: type_name_of::<Res>(),
625 metadata: Default::default(),
626 bus_capability: bus_capability_schema_from_policy(transition.bus_access_policy()),
627 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
628 position: transition
629 .position()
630 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
631 compensation_node_id: Some(comp_node_id.clone()),
632 };
633
634 let comp_node = Node {
636 id: comp_node_id.clone(),
637 kind: NodeKind::Atom,
638 label: format!("Compensate: {}", compensation.label()),
639 description: compensation.description(),
640 input_type: type_name_of::<Out>(),
641 output_type: "void".to_string(),
642 resource_type: type_name_of::<Res>(),
643 metadata: Default::default(),
644 bus_capability: None,
645 source_location: None,
646 position: compensation
647 .position()
648 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
649 compensation_node_id: None,
650 };
651
652 let last_node_id = schematic
653 .nodes
654 .last()
655 .map(|n| n.id.clone())
656 .unwrap_or_default();
657
658 schematic.nodes.push(next_node);
659 schematic.nodes.push(comp_node);
660 schematic.edges.push(Edge {
661 from: last_node_id,
662 to: next_node_id.clone(),
663 kind: EdgeType::Linear,
664 label: Some("Next".to_string()),
665 });
666
667 let node_id_for_exec = next_node_id.clone();
669 let comp_id_for_exec = comp_node_id.clone();
670 let node_label_for_exec = transition.label();
671 let bus_policy_for_exec = transition.bus_access_policy();
672 let step_idx_for_exec = schematic.nodes.len() as u64 - 2;
673 let comp_for_exec = compensation.clone();
674 let bus_policy_for_executor = bus_policy_for_exec.clone();
675 let bus_policy_for_registry = bus_policy_for_exec.clone();
676 let next_executor: Executor<In, Next, E, Res> = Arc::new(
677 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
678 let prev = prev_executor.clone();
679 let trans = transition.clone();
680 let comp = comp_for_exec.clone();
681 let timeline_node_id = node_id_for_exec.clone();
682 let timeline_comp_id = comp_id_for_exec.clone();
683 let timeline_node_label = node_label_for_exec.clone();
684 let transition_bus_policy = bus_policy_for_executor.clone();
685 let step_idx = step_idx_for_exec;
686
687 Box::pin(async move {
688 if let Some(jump) = bus.read::<ManualJump>()
690 && (jump.target_node == timeline_node_id
691 || jump.target_node == timeline_node_label)
692 {
693 tracing::info!(
694 node_id = %timeline_node_id,
695 node_label = %timeline_node_label,
696 "Manual jump target reached (compensated); skipping previous steps"
697 );
698
699 let state = if let Some(ow) = jump.payload_override.clone() {
700 match serde_json::from_value::<Out>(ow) {
701 Ok(s) => s,
702 Err(e) => {
703 tracing::error!(
704 "Payload override deserialization failed: {}",
705 e
706 );
707 return Outcome::emit(
708 "execution.jump.payload_error",
709 Some(serde_json::json!({"error": e.to_string()})),
710 )
711 .map(|_: ()| unreachable!());
712 }
713 }
714 } else {
715 return Outcome::emit(
716 "execution.jump.missing_payload",
717 Some(serde_json::json!({"node_id": timeline_node_id})),
718 )
719 .map(|_: ()| unreachable!());
720 };
721
722 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
724 &trans,
725 &comp,
726 state,
727 res,
728 bus,
729 &timeline_node_id,
730 &timeline_comp_id,
731 &timeline_node_label,
732 &transition_bus_policy,
733 step_idx,
734 )
735 .await;
736 }
737
738 if let Some(start) = bus.read::<StartStep>()
740 && step_idx == start.0
741 && bus.read::<ResumptionState>().is_some()
742 {
743 let fresh_state = serde_json::to_value(&input)
744 .ok()
745 .and_then(|v| serde_json::from_value::<Out>(v).ok());
746 let persisted_state = bus
747 .read::<ResumptionState>()
748 .and_then(|r| r.payload.clone())
749 .and_then(|p| serde_json::from_value::<Out>(p).ok());
750
751 if let Some(s) = fresh_state.or(persisted_state) {
752 tracing::info!(node_id = %timeline_node_id, "Resuming at checkpoint (compensated)");
753 return run_this_compensated_step::<Out, Next, E, Res, Comp>(
754 &trans,
755 &comp,
756 s,
757 res,
758 bus,
759 &timeline_node_id,
760 &timeline_comp_id,
761 &timeline_node_label,
762 &transition_bus_policy,
763 step_idx,
764 )
765 .await;
766 }
767
768 return Outcome::emit(
769 "execution.resumption.payload_error",
770 Some(serde_json::json!({"error": "no compatible resumption state"})),
771 )
772 .map(|_: ()| unreachable!());
773 }
774
775 let prev_result = prev(input, res, bus).await;
777
778 let state = match prev_result {
780 Outcome::Next(t) => t,
781 other => return other.map(|_| unreachable!()),
782 };
783
784 run_this_compensated_step::<Out, Next, E, Res, Comp>(
785 &trans,
786 &comp,
787 state,
788 res,
789 bus,
790 &timeline_node_id,
791 &timeline_comp_id,
792 &timeline_node_label,
793 &transition_bus_policy,
794 step_idx,
795 )
796 .await
797 })
798 },
799 );
800 {
802 let mut registry = saga_compensation_registry.write().unwrap();
803 let comp_fn = compensation.clone();
804 let transition_bus_policy = bus_policy_for_registry.clone();
805
806 let handler: ranvier_core::saga::SagaCompensationFn<E, Res> =
807 Arc::new(move |input_data, res, bus| {
808 let comp = comp_fn.clone();
809 let bus_policy = transition_bus_policy.clone();
810 Box::pin(async move {
811 let input: Out = serde_json::from_slice(&input_data).unwrap();
812 bus.set_access_policy(comp.label(), bus_policy);
813 let res = comp.run(input, res, bus).await;
814 bus.clear_access_policy();
815 res
816 })
817 });
818 registry.register(next_node_id.clone(), handler);
819 }
820
821 Axon {
822 schematic,
823 executor: next_executor,
824 execution_mode,
825 persistence_store,
826 audit_sink,
827 dlq_sink,
828 dlq_policy,
829 dynamic_dlq_policy,
830 saga_policy,
831 dynamic_saga_policy,
832 saga_compensation_registry,
833 iam_handle,
834 }
835 }
836
837 #[track_caller]
840 pub fn compensate_with<Comp>(mut self, transition: Comp) -> Self
841 where
842 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
843 {
844 let caller = Location::caller();
847 let comp_node_id = uuid::Uuid::new_v4().to_string();
848
849 let comp_node = Node {
850 id: comp_node_id.clone(),
851 kind: NodeKind::Atom,
852 label: transition.label(),
853 description: transition.description(),
854 input_type: type_name_of::<Out>(),
855 output_type: "void".to_string(),
856 resource_type: type_name_of::<Res>(),
857 metadata: Default::default(),
858 bus_capability: None,
859 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
860 position: transition
861 .position()
862 .map(|(x, y)| ranvier_core::schematic::Position { x, y }),
863 compensation_node_id: None,
864 };
865
866 if let Some(last_node) = self.schematic.nodes.last_mut() {
867 last_node.compensation_node_id = Some(comp_node_id.clone());
868 }
869
870 self.schematic.nodes.push(comp_node);
871 self
872 }
873
874 #[track_caller]
876 pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
877 let caller = Location::caller();
878 let branch_id_str = branch_id.into();
879 let last_node_id = self
880 .schematic
881 .nodes
882 .last()
883 .map(|n| n.id.clone())
884 .unwrap_or_default();
885
886 let branch_node = Node {
887 id: uuid::Uuid::new_v4().to_string(),
888 kind: NodeKind::Synapse,
889 label: label.to_string(),
890 description: None,
891 input_type: type_name_of::<Out>(),
892 output_type: type_name_of::<Out>(),
893 resource_type: type_name_of::<Res>(),
894 metadata: Default::default(),
895 bus_capability: None,
896 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
897 position: None,
898 compensation_node_id: None,
899 };
900
901 self.schematic.nodes.push(branch_node);
902 self.schematic.edges.push(Edge {
903 from: last_node_id,
904 to: branch_id_str.clone(),
905 kind: EdgeType::Branch(branch_id_str),
906 label: Some("Branch".to_string()),
907 });
908
909 self
910 }
911
912 pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
914 if let ExecutionMode::Singleton {
915 lock_key,
916 ttl_ms,
917 lock_provider,
918 } = &self.execution_mode
919 {
920 let trace_span = tracing::info_span!("Singleton Execution", key = %lock_key);
921 let _enter = trace_span.enter();
922 match lock_provider.try_acquire(lock_key, *ttl_ms).await {
923 Ok(true) => {
924 tracing::debug!("Successfully acquired singleton lock: {}", lock_key);
925 }
926 Ok(false) => {
927 tracing::debug!(
928 "Singleton lock {} already held, aborting execution.",
929 lock_key
930 );
931 return Outcome::emit(
933 "execution.skipped.lock_held",
934 Some(serde_json::json!({
935 "lock_key": lock_key
936 })),
937 );
938 }
939 Err(e) => {
940 tracing::error!("Failed to check singleton lock {}: {:?}", lock_key, e);
941 return Outcome::emit(
942 "execution.skipped.lock_error",
943 Some(serde_json::json!({
944 "error": e.to_string()
945 })),
946 );
947 }
948 }
949 }
950
951 if let Some(iam) = &self.iam_handle {
953 use ranvier_core::iam::{IamPolicy, IamToken, enforce_policy};
954
955 if matches!(iam.policy, IamPolicy::None) {
956 } else {
958 let token = bus.read::<IamToken>().map(|t| t.0.clone());
959
960 match token {
961 Some(raw_token) => {
962 match iam.verifier.verify(&raw_token).await {
963 Ok(identity) => {
964 if let Err(e) = enforce_policy(&iam.policy, &identity) {
965 tracing::warn!(
966 policy = ?iam.policy,
967 subject = %identity.subject,
968 "IAM policy enforcement failed: {}",
969 e
970 );
971 return Outcome::emit(
972 "iam.policy_denied",
973 Some(serde_json::json!({
974 "error": e.to_string(),
975 "subject": identity.subject,
976 })),
977 );
978 }
979 bus.insert(identity);
981 }
982 Err(e) => {
983 tracing::warn!("IAM token verification failed: {}", e);
984 return Outcome::emit(
985 "iam.verification_failed",
986 Some(serde_json::json!({
987 "error": e.to_string()
988 })),
989 );
990 }
991 }
992 }
993 None => {
994 tracing::warn!("IAM policy requires token but none found in Bus");
995 return Outcome::emit("iam.missing_token", None);
996 }
997 }
998 }
999 }
1000
1001 let trace_id = persistence_trace_id(bus);
1002 let label = self.schematic.name.clone();
1003
1004 if let Some(sink) = &self.dlq_sink {
1006 bus.insert(sink.clone());
1007 }
1008 let effective_dlq_policy = self
1010 .dynamic_dlq_policy
1011 .as_ref()
1012 .map(|d| d.current())
1013 .unwrap_or_else(|| self.dlq_policy.clone());
1014 bus.insert(effective_dlq_policy);
1015 bus.insert(self.schematic.clone());
1016 let effective_saga_policy = self
1017 .dynamic_saga_policy
1018 .as_ref()
1019 .map(|d| d.current())
1020 .unwrap_or_else(|| self.saga_policy.clone());
1021 bus.insert(effective_saga_policy.clone());
1022
1023 if effective_saga_policy == SagaPolicy::Enabled && bus.read::<SagaStack>().is_none() {
1025 bus.insert(SagaStack::new());
1026 }
1027
1028 let persistence_handle = bus.read::<PersistenceHandle>().cloned();
1029 let compensation_handle = bus.read::<CompensationHandle>().cloned();
1030 let compensation_retry_policy = compensation_retry_policy(bus);
1031 let compensation_idempotency = bus.read::<CompensationIdempotencyHandle>().cloned();
1032 let version = self.schematic.schema_version.clone();
1033 let migration_registry = bus
1034 .read::<ranvier_core::schematic::MigrationRegistry>()
1035 .cloned();
1036
1037 let persistence_start_step = if let Some(handle) = persistence_handle.as_ref() {
1038 let (mut start_step, trace_version, intervention, last_node_id, mut last_payload) =
1039 load_persistence_version(handle, &trace_id).await;
1040
1041 if let Some(interv) = intervention {
1042 tracing::info!(
1043 trace_id = %trace_id,
1044 target_node = %interv.target_node,
1045 "Applying manual intervention command"
1046 );
1047
1048 if let Some(target_idx) = self
1050 .schematic
1051 .nodes
1052 .iter()
1053 .position(|n| n.id == interv.target_node || n.label == interv.target_node)
1054 {
1055 tracing::info!(
1056 trace_id = %trace_id,
1057 target_node = %interv.target_node,
1058 target_step = target_idx,
1059 "Intervention: Jumping to target node"
1060 );
1061 start_step = target_idx as u64;
1062
1063 bus.insert(ManualJump {
1065 target_node: interv.target_node.clone(),
1066 payload_override: interv.payload_override.clone(),
1067 });
1068
1069 if let Some(sink) = self.audit_sink.as_ref() {
1071 let event = AuditEvent::new(
1072 uuid::Uuid::new_v4().to_string(),
1073 "System".to_string(),
1074 "ApplyIntervention".to_string(),
1075 trace_id.to_string(),
1076 )
1077 .with_metadata("target_node", interv.target_node.clone())
1078 .with_metadata("target_step", target_idx);
1079
1080 let _ = sink.append(&event).await;
1081 }
1082 } else {
1083 tracing::warn!(
1084 trace_id = %trace_id,
1085 target_node = %interv.target_node,
1086 "Intervention target node not found in schematic; ignoring jump"
1087 );
1088 }
1089 }
1090
1091 if let Some(old_version) = trace_version
1092 && old_version != version
1093 {
1094 tracing::info!(
1095 trace_id = %trace_id,
1096 old_version = %old_version,
1097 current_version = %version,
1098 "Version mismatch detected during resumption"
1099 );
1100
1101 let migration_path = migration_registry
1103 .as_ref()
1104 .and_then(|r| r.find_migration_path(&old_version, &version));
1105
1106 let (final_migration, mapped_payload) = if let Some(path) = migration_path {
1107 if path.is_empty() {
1108 (None, last_payload.clone())
1109 } else {
1110 let mut payload = last_payload.clone();
1112 for hop in &path {
1113 if let (Some(mapper), Some(p)) = (&hop.payload_mapper, payload.as_ref())
1114 {
1115 match mapper.map_state(p) {
1116 Ok(mapped) => payload = Some(mapped),
1117 Err(e) => {
1118 tracing::error!(
1119 trace_id = %trace_id,
1120 from = %hop.from_version,
1121 to = %hop.to_version,
1122 error = %e,
1123 "Payload migration mapper failed"
1124 );
1125 return Outcome::emit(
1126 "execution.resumption.payload_migration_failed",
1127 Some(serde_json::json!({
1128 "trace_id": trace_id,
1129 "from": hop.from_version,
1130 "to": hop.to_version,
1131 "error": e.to_string()
1132 })),
1133 );
1134 }
1135 }
1136 }
1137 }
1138 let hops: Vec<String> = path
1139 .iter()
1140 .map(|h| format!("{}->{}", h.from_version, h.to_version))
1141 .collect();
1142 tracing::info!(trace_id = %trace_id, hops = ?hops, "Applied multi-hop migration path");
1143 (path.last().copied(), payload)
1144 }
1145 } else {
1146 (None, last_payload.clone())
1147 };
1148
1149 let migration = final_migration.or_else(|| {
1151 migration_registry
1152 .as_ref()
1153 .and_then(|r| r.find_migration(&old_version, &version))
1154 });
1155
1156 if mapped_payload.is_some() {
1158 last_payload = mapped_payload;
1159 }
1160
1161 let strategy = if let (Some(m), Some(node_id)) = (migration, last_node_id.as_ref())
1162 {
1163 m.node_mapping
1164 .get(node_id)
1165 .cloned()
1166 .unwrap_or(m.default_strategy.clone())
1167 } else {
1168 migration
1169 .map(|m| m.default_strategy.clone())
1170 .unwrap_or(ranvier_core::schematic::MigrationStrategy::Fail)
1171 };
1172
1173 match strategy {
1174 ranvier_core::schematic::MigrationStrategy::ResumeFromStart => {
1175 tracing::info!(trace_id = %trace_id, "Applying ResumeFromStart migration strategy");
1176 start_step = 0;
1177 }
1178 ranvier_core::schematic::MigrationStrategy::MigrateActiveNode {
1179 new_node_id,
1180 ..
1181 } => {
1182 tracing::info!(trace_id = %trace_id, to_node = %new_node_id, "Applying MigrateActiveNode strategy");
1183 if let Some(target_idx) = self
1184 .schematic
1185 .nodes
1186 .iter()
1187 .position(|n| n.id == new_node_id || n.label == new_node_id)
1188 {
1189 start_step = target_idx as u64;
1190 } else {
1191 tracing::warn!(trace_id = %trace_id, "MigrateActiveNode: target node {} not found", new_node_id);
1192 return Outcome::emit(
1193 "execution.resumption.migration_target_not_found",
1194 Some(serde_json::json!({ "node_id": new_node_id })),
1195 );
1196 }
1197 }
1198 ranvier_core::schematic::MigrationStrategy::FallbackToNode(node_id) => {
1199 tracing::info!(trace_id = %trace_id, to_node = %node_id, "Applying FallbackToNode strategy");
1200 if let Some(target_idx) = self
1201 .schematic
1202 .nodes
1203 .iter()
1204 .position(|n| n.id == node_id || n.label == node_id)
1205 {
1206 start_step = target_idx as u64;
1207 } else {
1208 tracing::warn!(trace_id = %trace_id, "FallbackToNode: node {} not found", node_id);
1209 return Outcome::emit(
1210 "execution.resumption.migration_target_not_found",
1211 Some(serde_json::json!({ "node_id": node_id })),
1212 );
1213 }
1214 }
1215 ranvier_core::schematic::MigrationStrategy::Fail => {
1216 tracing::error!(trace_id = %trace_id, "Version mismatch: no migration path found. Failing resumption.");
1217 return Outcome::emit(
1218 "execution.resumption.version_mismatch_failed",
1219 Some(serde_json::json!({
1220 "trace_id": trace_id,
1221 "old_version": old_version,
1222 "current_version": version
1223 })),
1224 );
1225 }
1226 _ => {
1227 tracing::error!(trace_id = %trace_id, "Unsupported migration strategy: {:?}", strategy);
1228 return Outcome::emit(
1229 "execution.resumption.unsupported_migration",
1230 Some(serde_json::json!({
1231 "trace_id": trace_id,
1232 "strategy": format!("{:?}", strategy)
1233 })),
1234 );
1235 }
1236 }
1237 }
1238
1239 let ingress_node_id = self.schematic.nodes.first().map(|n| n.id.clone());
1240 persist_execution_event(
1241 handle,
1242 &trace_id,
1243 &label,
1244 &version,
1245 start_step,
1246 ingress_node_id,
1247 "Enter",
1248 None,
1249 )
1250 .await;
1251
1252 bus.insert(StartStep(start_step));
1253 if start_step > 0 {
1254 bus.insert(ResumptionState {
1255 payload: last_payload,
1256 });
1257 }
1258
1259 Some(start_step)
1260 } else {
1261 None
1262 };
1263
1264 let should_capture = should_attach_timeline(bus);
1265 let inserted_timeline = if should_capture {
1266 ensure_timeline(bus)
1267 } else {
1268 false
1269 };
1270 let ingress_started = std::time::Instant::now();
1271 let ingress_enter_ts = now_ms();
1272 if should_capture
1273 && let (Some(timeline), Some(ingress)) =
1274 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1275 {
1276 timeline.push(TimelineEvent::NodeEnter {
1277 node_id: ingress.id.clone(),
1278 node_label: ingress.label.clone(),
1279 timestamp: ingress_enter_ts,
1280 });
1281 }
1282
1283 let circuit_span = tracing::info_span!(
1284 "Circuit",
1285 ranvier.circuit = %label,
1286 ranvier.outcome_kind = tracing::field::Empty,
1287 ranvier.outcome_target = tracing::field::Empty
1288 );
1289 let outcome = (self.executor)(input, resources, bus)
1290 .instrument(circuit_span.clone())
1291 .await;
1292 circuit_span.record("ranvier.outcome_kind", outcome_kind_name(&outcome));
1293 if let Some(target) = outcome_target(&outcome) {
1294 circuit_span.record("ranvier.outcome_target", tracing::field::display(&target));
1295 }
1296
1297 if matches!(outcome, Outcome::Fault(_)) && self.saga_policy == SagaPolicy::Enabled {
1299 while let Some(task) = {
1300 let mut stack = bus.read_mut::<SagaStack>();
1301 stack.as_mut().and_then(|s| s.pop())
1302 } {
1303 tracing::info!(trace_id = %trace_id, node_id = %task.node_id, "Compensating step: {}", task.node_label);
1304
1305 let handler = {
1306 let registry = self.saga_compensation_registry.read().unwrap();
1307 registry.get(&task.node_id)
1308 };
1309 if let Some(handler) = handler {
1310 let res = handler(task.input_snapshot, resources, bus).await;
1311 if let Outcome::Fault(e) = res {
1312 tracing::error!(trace_id = %trace_id, node_id = %task.node_id, "Saga compensation FAILED: {:?}", e);
1313 }
1314 } else {
1315 tracing::warn!(trace_id = %trace_id, node_id = %task.node_id, "No compensation handler found in registry for saga rollback");
1316 }
1317 }
1318 tracing::info!(trace_id = %trace_id, "Saga automated rollback completed");
1319 }
1320
1321 let ingress_exit_ts = now_ms();
1322 if should_capture
1323 && let (Some(timeline), Some(ingress)) =
1324 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
1325 {
1326 timeline.push(TimelineEvent::NodeExit {
1327 node_id: ingress.id.clone(),
1328 outcome_type: outcome_type_name(&outcome),
1329 duration_ms: ingress_started.elapsed().as_millis() as u64,
1330 timestamp: ingress_exit_ts,
1331 });
1332 }
1333
1334 if let Some(handle) = persistence_handle.as_ref() {
1335 let fault_step = persistence_start_step.map(|s| s + 1).unwrap_or(1);
1336 persist_execution_event(
1337 handle,
1338 &trace_id,
1339 &label,
1340 &version,
1341 fault_step,
1342 None, outcome_kind_name(&outcome),
1344 Some(outcome.to_json_value()),
1345 )
1346 .await;
1347
1348 let mut completion = completion_from_outcome(&outcome);
1349 if matches!(outcome, Outcome::Fault(_))
1350 && let Some(compensation) = compensation_handle.as_ref()
1351 && compensation_auto_trigger(bus)
1352 {
1353 let context = CompensationContext {
1354 trace_id: trace_id.clone(),
1355 circuit: label.clone(),
1356 fault_kind: outcome_kind_name(&outcome).to_string(),
1357 fault_step,
1358 timestamp_ms: now_ms(),
1359 };
1360
1361 if run_compensation(
1362 compensation,
1363 context,
1364 compensation_retry_policy,
1365 compensation_idempotency.clone(),
1366 )
1367 .await
1368 {
1369 persist_execution_event(
1370 handle,
1371 &trace_id,
1372 &label,
1373 &version,
1374 fault_step.saturating_add(1),
1375 None,
1376 "Compensated",
1377 None,
1378 )
1379 .await;
1380 completion = CompletionState::Compensated;
1381 }
1382 }
1383
1384 if persistence_auto_complete(bus) {
1385 persist_completion(handle, &trace_id, completion).await;
1386 }
1387 }
1388
1389 if should_capture {
1390 maybe_export_timeline(bus, &outcome);
1391 }
1392 if inserted_timeline {
1393 let _ = bus.remove::<Timeline>();
1394 }
1395
1396 outcome
1397 }
1398
1399 pub fn serve_inspector(self, port: u16) -> Self {
1402 if !inspector_dev_mode_from_env() {
1403 tracing::info!("Inspector disabled because RANVIER_MODE is production");
1404 return self;
1405 }
1406 if !inspector_enabled_from_env() {
1407 tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
1408 return self;
1409 }
1410
1411 let schematic = self.schematic.clone();
1412 let axon_inspector = Arc::new(self.clone());
1413 tokio::spawn(async move {
1414 if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
1415 .with_projection_files_from_env()
1416 .with_mode_from_env()
1417 .with_auth_policy_from_env()
1418 .with_state_inspector(axon_inspector)
1419 .serve()
1420 .await
1421 {
1422 tracing::error!("Inspector server failed: {}", e);
1423 }
1424 });
1425 self
1426 }
1427
1428 pub fn schematic(&self) -> &Schematic {
1430 &self.schematic
1431 }
1432
1433 pub fn into_schematic(self) -> Schematic {
1435 self.schematic
1436 }
1437
1438 pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
1449 schematic_export_request_from_process()
1450 }
1451
1452 pub fn maybe_export_and_exit(&self) -> anyhow::Result<bool> {
1464 self.maybe_export_and_exit_with(|_| ())
1465 }
1466
1467 pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> anyhow::Result<bool>
1472 where
1473 F: FnOnce(&SchematicExportRequest),
1474 {
1475 let Some(request) = self.schematic_export_request() else {
1476 return Ok(false);
1477 };
1478 on_before_exit(&request);
1479 self.export_schematic(&request)?;
1480 Ok(true)
1481 }
1482
1483 pub fn export_schematic(&self, request: &SchematicExportRequest) -> anyhow::Result<()> {
1485 let json = serde_json::to_string_pretty(self.schematic())?;
1486 if let Some(path) = &request.output {
1487 if let Some(parent) = path.parent()
1488 && !parent.as_os_str().is_empty()
1489 {
1490 fs::create_dir_all(parent)?;
1491 }
1492 fs::write(path, json.as_bytes())?;
1493 return Ok(());
1494 }
1495 println!("{}", json);
1496 Ok(())
1497 }
1498}
1499
1500fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
1501 let args: Vec<OsString> = std::env::args_os().skip(1).collect();
1502 let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
1503 let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
1504
1505 let mut i = 0;
1506 while i < args.len() {
1507 let arg = args[i].to_string_lossy();
1508
1509 if arg == "--schematic" {
1510 enabled = true;
1511 i += 1;
1512 continue;
1513 }
1514
1515 if arg == "--schematic-output" || arg == "--output" {
1516 if let Some(next) = args.get(i + 1) {
1517 output = Some(PathBuf::from(next));
1518 i += 2;
1519 continue;
1520 }
1521 } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
1522 output = Some(PathBuf::from(value));
1523 } else if let Some(value) = arg.strip_prefix("--output=") {
1524 output = Some(PathBuf::from(value));
1525 }
1526
1527 i += 1;
1528 }
1529
1530 if enabled {
1531 Some(SchematicExportRequest { output })
1532 } else {
1533 None
1534 }
1535}
1536
1537fn env_flag_is_true(key: &str) -> bool {
1538 match std::env::var(key) {
1539 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1540 Err(_) => false,
1541 }
1542}
1543
1544fn inspector_enabled_from_env() -> bool {
1545 let raw = std::env::var("RANVIER_INSPECTOR").ok();
1546 inspector_enabled_from_value(raw.as_deref())
1547}
1548
1549fn inspector_enabled_from_value(value: Option<&str>) -> bool {
1550 match value {
1551 Some(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
1552 None => true,
1553 }
1554}
1555
1556fn inspector_dev_mode_from_env() -> bool {
1557 let raw = std::env::var("RANVIER_MODE").ok();
1558 inspector_dev_mode_from_value(raw.as_deref())
1559}
1560
1561fn inspector_dev_mode_from_value(value: Option<&str>) -> bool {
1562 !matches!(
1563 value.map(|v| v.to_ascii_lowercase()),
1564 Some(mode) if mode == "prod" || mode == "production"
1565 )
1566}
1567
1568fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
1569 let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
1570 Ok(v) if !v.trim().is_empty() => v,
1571 _ => return,
1572 };
1573
1574 let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
1575 let policy = timeline_adaptive_policy();
1576 let forced = should_force_export(outcome, &policy);
1577 let should_export = sampled || forced;
1578 if !should_export {
1579 record_sampling_stats(false, sampled, forced, "none", &policy);
1580 return;
1581 }
1582
1583 let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
1584 timeline.sort();
1585
1586 let mode = std::env::var("RANVIER_TIMELINE_MODE")
1587 .unwrap_or_else(|_| "overwrite".to_string())
1588 .to_ascii_lowercase();
1589
1590 if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
1591 tracing::warn!(
1592 "Failed to persist timeline file {} (mode={}): {}",
1593 path,
1594 mode,
1595 err
1596 );
1597 record_sampling_stats(false, sampled, forced, &mode, &policy);
1598 } else {
1599 record_sampling_stats(true, sampled, forced, &mode, &policy);
1600 }
1601}
1602
1603fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
1604 match outcome {
1605 Outcome::Next(_) => "Next".to_string(),
1606 Outcome::Branch(id, _) => format!("Branch:{}", id),
1607 Outcome::Jump(id, _) => format!("Jump:{}", id),
1608 Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
1609 Outcome::Fault(_) => "Fault".to_string(),
1610 }
1611}
1612
1613fn outcome_kind_name<Out, E>(outcome: &Outcome<Out, E>) -> &'static str {
1614 match outcome {
1615 Outcome::Next(_) => "Next",
1616 Outcome::Branch(_, _) => "Branch",
1617 Outcome::Jump(_, _) => "Jump",
1618 Outcome::Emit(_, _) => "Emit",
1619 Outcome::Fault(_) => "Fault",
1620 }
1621}
1622
1623fn outcome_target<Out, E>(outcome: &Outcome<Out, E>) -> Option<String> {
1624 match outcome {
1625 Outcome::Branch(branch_id, _) => Some(branch_id.clone()),
1626 Outcome::Jump(node_id, _) => Some(node_id.to_string()),
1627 Outcome::Emit(event_type, _) => Some(event_type.clone()),
1628 Outcome::Next(_) | Outcome::Fault(_) => None,
1629 }
1630}
1631
1632fn completion_from_outcome<Out, E>(outcome: &Outcome<Out, E>) -> CompletionState {
1633 match outcome {
1634 Outcome::Fault(_) => CompletionState::Fault,
1635 _ => CompletionState::Success,
1636 }
1637}
1638
1639fn persistence_trace_id(bus: &Bus) -> String {
1640 if let Some(explicit) = bus.read::<PersistenceTraceId>() {
1641 explicit.0.clone()
1642 } else {
1643 format!("{}:{}", bus.id, now_ms())
1644 }
1645}
1646
1647fn persistence_auto_complete(bus: &Bus) -> bool {
1648 bus.read::<PersistenceAutoComplete>()
1649 .map(|v| v.0)
1650 .unwrap_or(true)
1651}
1652
1653fn compensation_auto_trigger(bus: &Bus) -> bool {
1654 bus.read::<CompensationAutoTrigger>()
1655 .map(|v| v.0)
1656 .unwrap_or(true)
1657}
1658
1659fn compensation_retry_policy(bus: &Bus) -> CompensationRetryPolicy {
1660 bus.read::<CompensationRetryPolicy>()
1661 .copied()
1662 .unwrap_or_default()
1663}
1664
1665fn unwrap_outcome_payload(payload: Option<&serde_json::Value>) -> Option<serde_json::Value> {
1671 payload.map(|p| {
1672 p.get("Next")
1673 .or_else(|| p.get("Branch"))
1674 .or_else(|| p.get("Jump"))
1675 .cloned()
1676 .unwrap_or_else(|| p.clone())
1677 })
1678}
1679
1680async fn load_persistence_version(
1681 handle: &PersistenceHandle,
1682 trace_id: &str,
1683) -> (
1684 u64,
1685 Option<String>,
1686 Option<crate::persistence::Intervention>,
1687 Option<String>,
1688 Option<serde_json::Value>,
1689) {
1690 let store = handle.store();
1691 match store.load(trace_id).await {
1692 Ok(Some(trace)) => {
1693 let (next_step, last_node_id, last_payload) =
1694 if let Some(resume_from_step) = trace.resumed_from_step {
1695 let anchor_event = trace
1696 .events
1697 .iter()
1698 .rev()
1699 .find(|event| {
1700 event.step <= resume_from_step
1701 && event.outcome_kind == "Next"
1702 && event.payload.is_some()
1703 })
1704 .or_else(|| {
1705 trace.events.iter().rev().find(|event| {
1706 event.step <= resume_from_step
1707 && event.outcome_kind != "Fault"
1708 && event.payload.is_some()
1709 })
1710 })
1711 .or_else(|| {
1712 trace.events.iter().rev().find(|event| {
1713 event.step <= resume_from_step && event.payload.is_some()
1714 })
1715 })
1716 .or_else(|| trace.events.last());
1717
1718 (
1719 resume_from_step.saturating_add(1),
1720 anchor_event.and_then(|event| event.node_id.clone()),
1721 anchor_event.and_then(|event| {
1722 unwrap_outcome_payload(event.payload.as_ref())
1723 }),
1724 )
1725 } else {
1726 let last_event = trace.events.last();
1727 (
1728 last_event
1729 .map(|event| event.step.saturating_add(1))
1730 .unwrap_or(0),
1731 last_event.and_then(|event| event.node_id.clone()),
1732 last_event.and_then(|event| {
1733 unwrap_outcome_payload(event.payload.as_ref())
1734 }),
1735 )
1736 };
1737
1738 (
1739 next_step,
1740 Some(trace.schematic_version),
1741 trace.interventions.last().cloned(),
1742 last_node_id,
1743 last_payload,
1744 )
1745 }
1746 Ok(None) => (0, None, None, None, None),
1747 Err(err) => {
1748 tracing::warn!(
1749 trace_id = %trace_id,
1750 error = %err,
1751 "Failed to load persistence trace; falling back to step=0"
1752 );
1753 (0, None, None, None, None)
1754 }
1755 }
1756}
1757
1758#[allow(clippy::too_many_arguments)]
1759async fn run_this_step<In, Out, E, Res>(
1760 trans: &(impl Transition<In, Out, Resources = Res, Error = E> + Clone + 'static),
1761 state: In,
1762 res: &Res,
1763 bus: &mut Bus,
1764 node_id: &str,
1765 node_label: &str,
1766 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
1767 step_idx: u64,
1768) -> Outcome<Out, E>
1769where
1770 In: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1771 Out: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
1772 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
1773 Res: ranvier_core::transition::ResourceRequirement,
1774{
1775 let label = trans.label();
1776 let res_type = std::any::type_name::<Res>()
1777 .split("::")
1778 .last()
1779 .unwrap_or("unknown");
1780
1781 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
1783 debug.should_pause(node_id)
1784 } else {
1785 false
1786 };
1787
1788 if should_pause {
1789 let trace_id = persistence_trace_id(bus);
1790 tracing::event!(
1791 target: "ranvier.debugger",
1792 tracing::Level::INFO,
1793 trace_id = %trace_id,
1794 node_id = %node_id,
1795 "Node paused"
1796 );
1797
1798 if let Some(timeline) = bus.read_mut::<Timeline>() {
1799 timeline.push(TimelineEvent::NodePaused {
1800 node_id: node_id.to_string(),
1801 timestamp: now_ms(),
1802 });
1803 }
1804 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
1805 debug.wait().await;
1806 }
1807 }
1808
1809 let enter_ts = now_ms();
1810 if let Some(timeline) = bus.read_mut::<Timeline>() {
1811 timeline.push(TimelineEvent::NodeEnter {
1812 node_id: node_id.to_string(),
1813 node_label: node_label.to_string(),
1814 timestamp: enter_ts,
1815 });
1816 }
1817
1818 let dlq_retry_config = bus.read::<DlqPolicy>().and_then(|p| {
1820 if let DlqPolicy::RetryThenDlq {
1821 max_attempts,
1822 backoff_ms,
1823 } = p
1824 {
1825 Some((*max_attempts, *backoff_ms))
1826 } else {
1827 None
1828 }
1829 });
1830 let retry_state_snapshot = if dlq_retry_config.is_some() {
1831 serde_json::to_value(&state).ok()
1832 } else {
1833 None
1834 };
1835
1836 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
1838 Some(serde_json::to_vec(&state).unwrap_or_default())
1839 } else {
1840 None
1841 };
1842
1843 let node_span = tracing::info_span!(
1844 "Node",
1845 ranvier.node = %label,
1846 ranvier.resource_type = %res_type,
1847 ranvier.outcome_kind = tracing::field::Empty,
1848 ranvier.outcome_target = tracing::field::Empty
1849 );
1850 let started = std::time::Instant::now();
1851 bus.set_access_policy(label.clone(), bus_policy.clone());
1852 let result = trans
1853 .run(state, res, bus)
1854 .instrument(node_span.clone())
1855 .await;
1856 bus.clear_access_policy();
1857
1858 let result = if let Outcome::Fault(_) = &result {
1861 if let (Some((max_attempts, backoff_ms)), Some(snapshot)) =
1862 (dlq_retry_config, &retry_state_snapshot)
1863 {
1864 let mut final_result = result;
1865 for attempt in 2..=max_attempts {
1867 let delay = backoff_ms.saturating_mul(2u64.saturating_pow(attempt - 2));
1868
1869 tracing::info!(
1870 ranvier.node = %label,
1871 attempt = attempt,
1872 max_attempts = max_attempts,
1873 backoff_ms = delay,
1874 "Retrying faulted node"
1875 );
1876
1877 if let Some(timeline) = bus.read_mut::<Timeline>() {
1878 timeline.push(TimelineEvent::NodeRetry {
1879 node_id: node_id.to_string(),
1880 attempt,
1881 max_attempts,
1882 backoff_ms: delay,
1883 timestamp: now_ms(),
1884 });
1885 }
1886
1887 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
1888
1889 if let Ok(retry_state) = serde_json::from_value::<In>(snapshot.clone()) {
1890 bus.set_access_policy(label.clone(), bus_policy.clone());
1891 let retry_result = trans
1892 .run(retry_state, res, bus)
1893 .instrument(tracing::info_span!(
1894 "NodeRetry",
1895 ranvier.node = %label,
1896 attempt = attempt
1897 ))
1898 .await;
1899 bus.clear_access_policy();
1900
1901 match &retry_result {
1902 Outcome::Fault(_) => {
1903 final_result = retry_result;
1904 }
1905 _ => {
1906 final_result = retry_result;
1907 break;
1908 }
1909 }
1910 }
1911 }
1912 final_result
1913 } else {
1914 result
1915 }
1916 } else {
1917 result
1918 };
1919
1920 node_span.record("ranvier.outcome_kind", outcome_kind_name(&result));
1921 if let Some(target) = outcome_target(&result) {
1922 node_span.record("ranvier.outcome_target", tracing::field::display(&target));
1923 }
1924 let duration_ms = started.elapsed().as_millis() as u64;
1925 let exit_ts = now_ms();
1926
1927 if let Some(timeline) = bus.read_mut::<Timeline>() {
1928 timeline.push(TimelineEvent::NodeExit {
1929 node_id: node_id.to_string(),
1930 outcome_type: outcome_type_name(&result),
1931 duration_ms,
1932 timestamp: exit_ts,
1933 });
1934
1935 if let Outcome::Branch(branch_id, _) = &result {
1936 timeline.push(TimelineEvent::Branchtaken {
1937 branch_id: branch_id.clone(),
1938 timestamp: exit_ts,
1939 });
1940 }
1941 }
1942
1943 if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot)
1945 && let Some(stack) = bus.read_mut::<SagaStack>()
1946 {
1947 stack.push(node_id.to_string(), label.clone(), snapshot);
1948 }
1949
1950 if let Some(handle) = bus.read::<PersistenceHandle>() {
1951 let trace_id = persistence_trace_id(bus);
1952 let circuit = bus
1953 .read::<ranvier_core::schematic::Schematic>()
1954 .map(|s| s.name.clone())
1955 .unwrap_or_default();
1956 let version = bus
1957 .read::<ranvier_core::schematic::Schematic>()
1958 .map(|s| s.schema_version.clone())
1959 .unwrap_or_default();
1960
1961 persist_execution_event(
1962 handle,
1963 &trace_id,
1964 &circuit,
1965 &version,
1966 step_idx,
1967 Some(node_id.to_string()),
1968 outcome_kind_name(&result),
1969 Some(result.to_json_value()),
1970 )
1971 .await;
1972 }
1973
1974 if let Outcome::Fault(f) = &result {
1977 let dlq_action = {
1979 let policy = bus.read::<DlqPolicy>();
1980 let sink = bus.read::<Arc<dyn DlqSink>>();
1981 match (sink, policy) {
1982 (Some(s), Some(p)) if !matches!(p, DlqPolicy::Drop) => Some(s.clone()),
1983 _ => None,
1984 }
1985 };
1986
1987 if let Some(sink) = dlq_action {
1988 if let Some((max_attempts, _)) = dlq_retry_config
1989 && let Some(timeline) = bus.read_mut::<Timeline>()
1990 {
1991 timeline.push(TimelineEvent::DlqExhausted {
1992 node_id: node_id.to_string(),
1993 total_attempts: max_attempts,
1994 timestamp: now_ms(),
1995 });
1996 }
1997
1998 let trace_id = persistence_trace_id(bus);
1999 let circuit = bus
2000 .read::<ranvier_core::schematic::Schematic>()
2001 .map(|s| s.name.clone())
2002 .unwrap_or_default();
2003 let _ = sink
2004 .store_dead_letter(
2005 &trace_id,
2006 &circuit,
2007 node_id,
2008 &format!("{:?}", f),
2009 &serde_json::to_vec(&f).unwrap_or_default(),
2010 )
2011 .await;
2012 }
2013 }
2014
2015 result
2016}
2017
2018#[allow(clippy::too_many_arguments)]
2019async fn run_this_compensated_step<Out, Next, E, Res, Comp>(
2020 trans: &(impl Transition<Out, Next, Resources = Res, Error = E> + Clone + 'static),
2021 comp: &Comp,
2022 state: Out,
2023 res: &Res,
2024 bus: &mut Bus,
2025 node_id: &str,
2026 comp_node_id: &str,
2027 node_label: &str,
2028 bus_policy: &Option<ranvier_core::bus::BusAccessPolicy>,
2029 step_idx: u64,
2030) -> Outcome<Next, E>
2031where
2032 Out: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + 'static,
2033 Next: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
2034 E: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
2035 Res: ranvier_core::transition::ResourceRequirement,
2036 Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
2037{
2038 let label = trans.label();
2039
2040 let should_pause = if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2042 debug.should_pause(node_id)
2043 } else {
2044 false
2045 };
2046
2047 if should_pause {
2048 let trace_id = persistence_trace_id(bus);
2049 tracing::event!(
2050 target: "ranvier.debugger",
2051 tracing::Level::INFO,
2052 trace_id = %trace_id,
2053 node_id = %node_id,
2054 "Node paused (compensated)"
2055 );
2056
2057 if let Some(timeline) = bus.read_mut::<Timeline>() {
2058 timeline.push(TimelineEvent::NodePaused {
2059 node_id: node_id.to_string(),
2060 timestamp: now_ms(),
2061 });
2062 }
2063 if let Some(debug) = bus.read::<ranvier_core::debug::DebugControl>() {
2064 debug.wait().await;
2065 }
2066 }
2067
2068 let enter_ts = now_ms();
2069 if let Some(timeline) = bus.read_mut::<Timeline>() {
2070 timeline.push(TimelineEvent::NodeEnter {
2071 node_id: node_id.to_string(),
2072 node_label: node_label.to_string(),
2073 timestamp: enter_ts,
2074 });
2075 }
2076
2077 let saga_snapshot = if let Some(SagaPolicy::Enabled) = bus.read::<SagaPolicy>() {
2079 Some(serde_json::to_vec(&state).unwrap_or_default())
2080 } else {
2081 None
2082 };
2083
2084 let node_span = tracing::info_span!("Node", ranvier.node = %label);
2085 bus.set_access_policy(label.clone(), bus_policy.clone());
2086 let result = trans
2087 .run(state.clone(), res, bus)
2088 .instrument(node_span)
2089 .await;
2090 bus.clear_access_policy();
2091
2092 let duration_ms = 0; let exit_ts = now_ms();
2094
2095 if let Some(timeline) = bus.read_mut::<Timeline>() {
2096 timeline.push(TimelineEvent::NodeExit {
2097 node_id: node_id.to_string(),
2098 outcome_type: outcome_type_name(&result),
2099 duration_ms,
2100 timestamp: exit_ts,
2101 });
2102 }
2103
2104 if let Outcome::Fault(ref err) = result {
2106 if compensation_auto_trigger(bus) {
2107 tracing::info!(
2108 ranvier.node = %label,
2109 ranvier.compensation.trigger = "saga",
2110 error = ?err,
2111 "Saga compensation triggered"
2112 );
2113
2114 if let Some(timeline) = bus.read_mut::<Timeline>() {
2115 timeline.push(TimelineEvent::NodeEnter {
2116 node_id: comp_node_id.to_string(),
2117 node_label: format!("Compensate: {}", comp.label()),
2118 timestamp: exit_ts,
2119 });
2120 }
2121
2122 let _ = comp.run(state, res, bus).await;
2124
2125 if let Some(timeline) = bus.read_mut::<Timeline>() {
2126 timeline.push(TimelineEvent::NodeExit {
2127 node_id: comp_node_id.to_string(),
2128 outcome_type: "Compensated".to_string(),
2129 duration_ms: 0,
2130 timestamp: now_ms(),
2131 });
2132 }
2133
2134 if let Some(handle) = bus.read::<PersistenceHandle>() {
2135 let trace_id = persistence_trace_id(bus);
2136 let circuit = bus
2137 .read::<ranvier_core::schematic::Schematic>()
2138 .map(|s| s.name.clone())
2139 .unwrap_or_default();
2140 let version = bus
2141 .read::<ranvier_core::schematic::Schematic>()
2142 .map(|s| s.schema_version.clone())
2143 .unwrap_or_default();
2144
2145 persist_execution_event(
2146 handle,
2147 &trace_id,
2148 &circuit,
2149 &version,
2150 step_idx + 1, Some(comp_node_id.to_string()),
2152 "Compensated",
2153 None,
2154 )
2155 .await;
2156 }
2157 }
2158 } else if let (Outcome::Next(_), Some(snapshot)) = (&result, saga_snapshot) {
2159 if let Some(stack) = bus.read_mut::<SagaStack>() {
2161 stack.push(node_id.to_string(), label.clone(), snapshot);
2162 }
2163
2164 if let Some(handle) = bus.read::<PersistenceHandle>() {
2165 let trace_id = persistence_trace_id(bus);
2166 let circuit = bus
2167 .read::<ranvier_core::schematic::Schematic>()
2168 .map(|s| s.name.clone())
2169 .unwrap_or_default();
2170 let version = bus
2171 .read::<ranvier_core::schematic::Schematic>()
2172 .map(|s| s.schema_version.clone())
2173 .unwrap_or_default();
2174
2175 persist_execution_event(
2176 handle,
2177 &trace_id,
2178 &circuit,
2179 &version,
2180 step_idx,
2181 Some(node_id.to_string()),
2182 outcome_kind_name(&result),
2183 Some(result.to_json_value()),
2184 )
2185 .await;
2186 }
2187 }
2188
2189 if let Outcome::Fault(f) = &result
2191 && let (Some(sink), Some(policy)) =
2192 (bus.read::<Arc<dyn DlqSink>>(), bus.read::<DlqPolicy>())
2193 {
2194 let should_dlq = !matches!(policy, DlqPolicy::Drop);
2195 if should_dlq {
2196 let trace_id = persistence_trace_id(bus);
2197 let circuit = bus
2198 .read::<ranvier_core::schematic::Schematic>()
2199 .map(|s| s.name.clone())
2200 .unwrap_or_default();
2201 let _ = sink
2202 .store_dead_letter(
2203 &trace_id,
2204 &circuit,
2205 node_id,
2206 &format!("{:?}", f),
2207 &serde_json::to_vec(&f).unwrap_or_default(),
2208 )
2209 .await;
2210 }
2211 }
2212
2213 result
2214}
2215
2216#[allow(clippy::too_many_arguments)]
2217pub async fn persist_execution_event(
2218 handle: &PersistenceHandle,
2219 trace_id: &str,
2220 circuit: &str,
2221 schematic_version: &str,
2222 step: u64,
2223 node_id: Option<String>,
2224 outcome_kind: &str,
2225 payload: Option<serde_json::Value>,
2226) {
2227 let store = handle.store();
2228 let envelope = PersistenceEnvelope {
2229 trace_id: trace_id.to_string(),
2230 circuit: circuit.to_string(),
2231 schematic_version: schematic_version.to_string(),
2232 step,
2233 node_id,
2234 outcome_kind: outcome_kind.to_string(),
2235 timestamp_ms: now_ms(),
2236 payload_hash: None,
2237 payload,
2238 };
2239
2240 if let Err(err) = store.append(envelope).await {
2241 tracing::warn!(
2242 trace_id = %trace_id,
2243 circuit = %circuit,
2244 step,
2245 outcome_kind = %outcome_kind,
2246 error = %err,
2247 "Failed to append persistence envelope"
2248 );
2249 }
2250}
2251
2252async fn persist_completion(
2253 handle: &PersistenceHandle,
2254 trace_id: &str,
2255 completion: CompletionState,
2256) {
2257 let store = handle.store();
2258 if let Err(err) = store.complete(trace_id, completion).await {
2259 tracing::warn!(
2260 trace_id = %trace_id,
2261 error = %err,
2262 "Failed to complete persistence trace"
2263 );
2264 }
2265}
2266
2267fn compensation_idempotency_key(context: &CompensationContext) -> String {
2268 format!(
2269 "{}:{}:{}",
2270 context.trace_id, context.circuit, context.fault_kind
2271 )
2272}
2273
2274async fn run_compensation(
2275 handle: &CompensationHandle,
2276 context: CompensationContext,
2277 retry_policy: CompensationRetryPolicy,
2278 idempotency: Option<CompensationIdempotencyHandle>,
2279) -> bool {
2280 let hook = handle.hook();
2281 let key = compensation_idempotency_key(&context);
2282
2283 if let Some(store_handle) = idempotency.as_ref() {
2284 let store = store_handle.store();
2285 match store.was_compensated(&key).await {
2286 Ok(true) => {
2287 tracing::info!(
2288 trace_id = %context.trace_id,
2289 circuit = %context.circuit,
2290 key = %key,
2291 "Compensation already recorded; skipping duplicate hook execution"
2292 );
2293 return true;
2294 }
2295 Ok(false) => {}
2296 Err(err) => {
2297 tracing::warn!(
2298 trace_id = %context.trace_id,
2299 key = %key,
2300 error = %err,
2301 "Failed to check compensation idempotency state"
2302 );
2303 }
2304 }
2305 }
2306
2307 let max_attempts = retry_policy.max_attempts.max(1);
2308 for attempt in 1..=max_attempts {
2309 match hook.compensate(context.clone()).await {
2310 Ok(()) => {
2311 if let Some(store_handle) = idempotency.as_ref() {
2312 let store = store_handle.store();
2313 if let Err(err) = store.mark_compensated(&key).await {
2314 tracing::warn!(
2315 trace_id = %context.trace_id,
2316 key = %key,
2317 error = %err,
2318 "Failed to mark compensation idempotency state"
2319 );
2320 }
2321 }
2322 return true;
2323 }
2324 Err(err) => {
2325 let is_last = attempt == max_attempts;
2326 tracing::warn!(
2327 trace_id = %context.trace_id,
2328 circuit = %context.circuit,
2329 fault_kind = %context.fault_kind,
2330 fault_step = context.fault_step,
2331 attempt,
2332 max_attempts,
2333 error = %err,
2334 "Compensation hook attempt failed"
2335 );
2336 if !is_last && retry_policy.backoff_ms > 0 {
2337 tokio::time::sleep(tokio::time::Duration::from_millis(retry_policy.backoff_ms))
2338 .await;
2339 }
2340 }
2341 }
2342 }
2343 false
2344}
2345
2346fn ensure_timeline(bus: &mut Bus) -> bool {
2347 if bus.has::<Timeline>() {
2348 false
2349 } else {
2350 bus.insert(Timeline::new());
2351 true
2352 }
2353}
2354
2355fn should_attach_timeline(bus: &Bus) -> bool {
2356 if bus.has::<Timeline>() {
2358 return true;
2359 }
2360
2361 has_timeline_output_path()
2363}
2364
2365fn has_timeline_output_path() -> bool {
2366 std::env::var("RANVIER_TIMELINE_OUTPUT")
2367 .ok()
2368 .map(|v| !v.trim().is_empty())
2369 .unwrap_or(false)
2370}
2371
2372fn timeline_sample_rate() -> f64 {
2373 std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
2374 .ok()
2375 .and_then(|v| v.parse::<f64>().ok())
2376 .map(|v| v.clamp(0.0, 1.0))
2377 .unwrap_or(1.0)
2378}
2379
2380fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
2381 if rate <= 0.0 {
2382 return false;
2383 }
2384 if rate >= 1.0 {
2385 return true;
2386 }
2387 let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
2388 bucket < rate
2389}
2390
2391fn timeline_adaptive_policy() -> String {
2392 std::env::var("RANVIER_TIMELINE_ADAPTIVE")
2393 .unwrap_or_else(|_| "fault_branch".to_string())
2394 .to_ascii_lowercase()
2395}
2396
2397fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
2398 match policy {
2399 "off" => false,
2400 "fault_only" => matches!(outcome, Outcome::Fault(_)),
2401 "fault_branch_emit" => {
2402 matches!(
2403 outcome,
2404 Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
2405 )
2406 }
2407 _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
2408 }
2409}
2410
2411#[derive(Default, Clone)]
2412struct SamplingStats {
2413 total_decisions: u64,
2414 exported: u64,
2415 skipped: u64,
2416 sampled_exports: u64,
2417 forced_exports: u64,
2418 last_mode: String,
2419 last_policy: String,
2420 last_updated_ms: u64,
2421}
2422
2423static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
2424
2425fn stats_cell() -> &'static Mutex<SamplingStats> {
2426 TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
2427}
2428
2429fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
2430 let snapshot = {
2431 let mut stats = match stats_cell().lock() {
2432 Ok(guard) => guard,
2433 Err(_) => return,
2434 };
2435
2436 stats.total_decisions += 1;
2437 if exported {
2438 stats.exported += 1;
2439 } else {
2440 stats.skipped += 1;
2441 }
2442 if sampled && exported {
2443 stats.sampled_exports += 1;
2444 }
2445 if forced && exported {
2446 stats.forced_exports += 1;
2447 }
2448 stats.last_mode = mode.to_string();
2449 stats.last_policy = policy.to_string();
2450 stats.last_updated_ms = now_ms();
2451 stats.clone()
2452 };
2453
2454 tracing::debug!(
2455 ranvier.timeline.total_decisions = snapshot.total_decisions,
2456 ranvier.timeline.exported = snapshot.exported,
2457 ranvier.timeline.skipped = snapshot.skipped,
2458 ranvier.timeline.sampled_exports = snapshot.sampled_exports,
2459 ranvier.timeline.forced_exports = snapshot.forced_exports,
2460 ranvier.timeline.mode = %snapshot.last_mode,
2461 ranvier.timeline.policy = %snapshot.last_policy,
2462 "Timeline sampling stats updated"
2463 );
2464
2465 if let Some(path) = timeline_stats_output_path() {
2466 let payload = serde_json::json!({
2467 "total_decisions": snapshot.total_decisions,
2468 "exported": snapshot.exported,
2469 "skipped": snapshot.skipped,
2470 "sampled_exports": snapshot.sampled_exports,
2471 "forced_exports": snapshot.forced_exports,
2472 "last_mode": snapshot.last_mode,
2473 "last_policy": snapshot.last_policy,
2474 "last_updated_ms": snapshot.last_updated_ms
2475 });
2476 if let Some(parent) = Path::new(&path).parent() {
2477 let _ = fs::create_dir_all(parent);
2478 }
2479 if let Err(err) = fs::write(&path, payload.to_string()) {
2480 tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
2481 }
2482 }
2483}
2484
2485fn timeline_stats_output_path() -> Option<String> {
2486 std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
2487 .ok()
2488 .filter(|v| !v.trim().is_empty())
2489}
2490
2491fn write_timeline_with_policy(
2492 path: &str,
2493 mode: &str,
2494 mut timeline: Timeline,
2495) -> Result<(), String> {
2496 match mode {
2497 "append" => {
2498 if Path::new(path).exists() {
2499 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
2500 match serde_json::from_str::<Timeline>(&content) {
2501 Ok(mut existing) => {
2502 existing.events.append(&mut timeline.events);
2503 existing.sort();
2504 if let Some(max_events) = max_events_limit() {
2505 truncate_timeline_events(&mut existing, max_events);
2506 }
2507 write_timeline_json(path, &existing)
2508 }
2509 Err(_) => {
2510 if let Some(max_events) = max_events_limit() {
2512 truncate_timeline_events(&mut timeline, max_events);
2513 }
2514 write_timeline_json(path, &timeline)
2515 }
2516 }
2517 } else {
2518 if let Some(max_events) = max_events_limit() {
2519 truncate_timeline_events(&mut timeline, max_events);
2520 }
2521 write_timeline_json(path, &timeline)
2522 }
2523 }
2524 "rotate" => {
2525 let rotated_path = rotated_path(path, now_ms());
2526 write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
2527 if let Some(keep) = rotate_keep_limit() {
2528 cleanup_rotated_files(path, keep)?;
2529 }
2530 Ok(())
2531 }
2532 _ => write_timeline_json(path, &timeline),
2533 }
2534}
2535
2536fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
2537 if let Some(parent) = Path::new(path).parent()
2538 && !parent.as_os_str().is_empty()
2539 {
2540 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2541 }
2542 let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
2543 fs::write(path, json).map_err(|e| e.to_string())
2544}
2545
2546fn rotated_path(path: &str, suffix: u64) -> PathBuf {
2547 let p = Path::new(path);
2548 let parent = p.parent().unwrap_or_else(|| Path::new(""));
2549 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
2550 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
2551 parent.join(format!("{}_{}.{}", stem, suffix, ext))
2552}
2553
2554fn max_events_limit() -> Option<usize> {
2555 std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
2556 .ok()
2557 .and_then(|v| v.parse::<usize>().ok())
2558 .filter(|v| *v > 0)
2559}
2560
2561fn rotate_keep_limit() -> Option<usize> {
2562 std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
2563 .ok()
2564 .and_then(|v| v.parse::<usize>().ok())
2565 .filter(|v| *v > 0)
2566}
2567
2568fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
2569 let len = timeline.events.len();
2570 if len > max_events {
2571 let keep_from = len - max_events;
2572 timeline.events = timeline.events.split_off(keep_from);
2573 }
2574}
2575
2576fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
2577 let p = Path::new(base_path);
2578 let parent = p.parent().unwrap_or_else(|| Path::new("."));
2579 let stem = p.file_stem().and_then(|s| s.to_str()).unwrap_or("timeline");
2580 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
2581 let prefix = format!("{}_", stem);
2582 let suffix = format!(".{}", ext);
2583
2584 let mut files = fs::read_dir(parent)
2585 .map_err(|e| e.to_string())?
2586 .filter_map(|entry| entry.ok())
2587 .filter(|entry| {
2588 let name = entry.file_name();
2589 let name = name.to_string_lossy();
2590 name.starts_with(&prefix) && name.ends_with(&suffix)
2591 })
2592 .map(|entry| {
2593 let modified = entry
2594 .metadata()
2595 .ok()
2596 .and_then(|m| m.modified().ok())
2597 .unwrap_or(SystemTime::UNIX_EPOCH);
2598 (entry.path(), modified)
2599 })
2600 .collect::<Vec<_>>();
2601
2602 files.sort_by(|a, b| b.1.cmp(&a.1));
2603 for (path, _) in files.into_iter().skip(keep) {
2604 let _ = fs::remove_file(path);
2605 }
2606 Ok(())
2607}
2608
2609fn bus_capability_schema_from_policy(
2610 policy: Option<ranvier_core::bus::BusAccessPolicy>,
2611) -> Option<BusCapabilitySchema> {
2612 let policy = policy?;
2613
2614 let mut allow = policy
2615 .allow
2616 .unwrap_or_default()
2617 .into_iter()
2618 .map(|entry| entry.type_name.to_string())
2619 .collect::<Vec<_>>();
2620 let mut deny = policy
2621 .deny
2622 .into_iter()
2623 .map(|entry| entry.type_name.to_string())
2624 .collect::<Vec<_>>();
2625 allow.sort();
2626 deny.sort();
2627
2628 if allow.is_empty() && deny.is_empty() {
2629 return None;
2630 }
2631
2632 Some(BusCapabilitySchema { allow, deny })
2633}
2634
2635fn now_ms() -> u64 {
2636 SystemTime::now()
2637 .duration_since(UNIX_EPOCH)
2638 .map(|d| d.as_millis() as u64)
2639 .unwrap_or(0)
2640}
2641
2642#[cfg(test)]
2643mod tests {
2644 use super::{
2645 Axon, inspector_dev_mode_from_value, inspector_enabled_from_value, sampled_by_bus_id,
2646 should_force_export,
2647 };
2648 use crate::persistence::{
2649 CompensationContext, CompensationHandle, CompensationHook, CompensationIdempotencyHandle,
2650 CompensationIdempotencyStore, CompensationRetryPolicy, CompletionState,
2651 InMemoryCompensationIdempotencyStore, InMemoryPersistenceStore, PersistenceAutoComplete,
2652 PersistenceHandle, PersistenceStore, PersistenceTraceId,
2653 };
2654 use anyhow::Result;
2655 use async_trait::async_trait;
2656 use ranvier_audit::{AuditError, AuditEvent, AuditSink};
2657 use ranvier_core::event::{DlqPolicy, DlqSink};
2658 use ranvier_core::saga::SagaStack;
2659 use ranvier_core::timeline::{Timeline, TimelineEvent};
2660 use ranvier_core::{Bus, BusAccessPolicy, BusTypeRef, Outcome, Transition};
2661 use serde::{Deserialize, Serialize};
2662 use std::sync::Arc;
2663 use tokio::sync::Mutex;
2664 use uuid::Uuid;
2665
2666 struct MockAuditSink {
2667 events: Arc<Mutex<Vec<AuditEvent>>>,
2668 }
2669
2670 #[async_trait]
2671 impl AuditSink for MockAuditSink {
2672 async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
2673 self.events.lock().await.push(event.clone());
2674 Ok(())
2675 }
2676 }
2677
2678 #[tokio::test]
2679 async fn execute_logs_audit_events_for_intervention() {
2680 use ranvier_inspector::StateInspector;
2681
2682 let trace_id = "test-audit-trace";
2683 let store_impl = InMemoryPersistenceStore::new();
2684 let events = Arc::new(Mutex::new(Vec::new()));
2685 let sink = MockAuditSink {
2686 events: events.clone(),
2687 };
2688
2689 let axon = Axon::<i32, i32, TestInfallible>::start("AuditTest")
2690 .then(AddOne)
2691 .with_persistence_store(store_impl.clone())
2692 .with_audit_sink(sink);
2693
2694 let mut bus = Bus::new();
2695 bus.insert(PersistenceHandle::from_arc(Arc::new(store_impl.clone())));
2696 bus.insert(PersistenceTraceId::new(trace_id));
2697 let target_node_id = axon.schematic.nodes[0].id.clone();
2698
2699 store_impl
2701 .append(crate::persistence::PersistenceEnvelope {
2702 trace_id: trace_id.to_string(),
2703 circuit: "AuditTest".to_string(),
2704 schematic_version: "v1.0".to_string(),
2705 step: 0,
2706 node_id: None,
2707 outcome_kind: "Next".to_string(),
2708 timestamp_ms: 0,
2709 payload_hash: None,
2710 payload: None,
2711 })
2712 .await
2713 .unwrap();
2714
2715 axon.force_resume(trace_id, &target_node_id, None)
2717 .await
2718 .unwrap();
2719
2720 axon.execute(10, &(), &mut bus).await;
2722
2723 let recorded = events.lock().await;
2724 assert_eq!(
2725 recorded.len(),
2726 2,
2727 "Should have 2 audit events: ForceResume and ApplyIntervention"
2728 );
2729 assert_eq!(recorded[0].action, "ForceResume");
2730 assert_eq!(recorded[0].target, trace_id);
2731 assert_eq!(recorded[1].action, "ApplyIntervention");
2732 assert_eq!(recorded[1].target, trace_id);
2733 }
2734
2735 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2736 pub enum TestInfallible {}
2737
2738 #[test]
2739 fn inspector_enabled_flag_matrix() {
2740 assert!(inspector_enabled_from_value(None));
2741 assert!(inspector_enabled_from_value(Some("1")));
2742 assert!(inspector_enabled_from_value(Some("true")));
2743 assert!(inspector_enabled_from_value(Some("on")));
2744 assert!(!inspector_enabled_from_value(Some("0")));
2745 assert!(!inspector_enabled_from_value(Some("false")));
2746 }
2747
2748 #[test]
2749 fn inspector_dev_mode_matrix() {
2750 assert!(inspector_dev_mode_from_value(None));
2751 assert!(inspector_dev_mode_from_value(Some("dev")));
2752 assert!(inspector_dev_mode_from_value(Some("staging")));
2753 assert!(!inspector_dev_mode_from_value(Some("prod")));
2754 assert!(!inspector_dev_mode_from_value(Some("production")));
2755 }
2756
2757 #[test]
2758 fn adaptive_policy_force_export_matrix() {
2759 let next = Outcome::<(), &'static str>::Next(());
2760 let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
2761 let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
2762 let fault = Outcome::<(), &'static str>::Fault("boom");
2763
2764 assert!(!should_force_export(&next, "off"));
2765 assert!(!should_force_export(&fault, "off"));
2766
2767 assert!(!should_force_export(&branch, "fault_only"));
2768 assert!(should_force_export(&fault, "fault_only"));
2769
2770 assert!(should_force_export(&branch, "fault_branch"));
2771 assert!(!should_force_export(&emit, "fault_branch"));
2772 assert!(should_force_export(&fault, "fault_branch"));
2773
2774 assert!(should_force_export(&branch, "fault_branch_emit"));
2775 assert!(should_force_export(&emit, "fault_branch_emit"));
2776 assert!(should_force_export(&fault, "fault_branch_emit"));
2777 }
2778
2779 #[test]
2780 fn sampling_and_adaptive_combination_decisions() {
2781 let bus_id = Uuid::nil();
2782 let next = Outcome::<(), &'static str>::Next(());
2783 let fault = Outcome::<(), &'static str>::Fault("boom");
2784
2785 let sampled_never = sampled_by_bus_id(bus_id, 0.0);
2786 assert!(!sampled_never);
2787 assert!(!(sampled_never || should_force_export(&next, "off")));
2788 assert!(sampled_never || should_force_export(&fault, "fault_only"));
2789
2790 let sampled_always = sampled_by_bus_id(bus_id, 1.0);
2791 assert!(sampled_always);
2792 assert!(sampled_always || should_force_export(&next, "off"));
2793 assert!(sampled_always || should_force_export(&fault, "off"));
2794 }
2795
2796 #[derive(Clone)]
2797 struct AddOne;
2798
2799 #[async_trait]
2800 impl Transition<i32, i32> for AddOne {
2801 type Error = TestInfallible;
2802 type Resources = ();
2803
2804 async fn run(
2805 &self,
2806 state: i32,
2807 _resources: &Self::Resources,
2808 _bus: &mut Bus,
2809 ) -> Outcome<i32, Self::Error> {
2810 Outcome::Next(state + 1)
2811 }
2812 }
2813
2814 #[derive(Clone)]
2815 struct AlwaysFault;
2816
2817 #[async_trait]
2818 impl Transition<i32, i32> for AlwaysFault {
2819 type Error = String;
2820 type Resources = ();
2821
2822 async fn run(
2823 &self,
2824 _state: i32,
2825 _resources: &Self::Resources,
2826 _bus: &mut Bus,
2827 ) -> Outcome<i32, Self::Error> {
2828 Outcome::Fault("boom".to_string())
2829 }
2830 }
2831
2832 #[derive(Clone)]
2833 struct CapabilityGuarded;
2834
2835 #[async_trait]
2836 impl Transition<(), ()> for CapabilityGuarded {
2837 type Error = String;
2838 type Resources = ();
2839
2840 fn bus_access_policy(&self) -> Option<BusAccessPolicy> {
2841 Some(BusAccessPolicy::allow_only(vec![BusTypeRef::of::<i32>()]))
2842 }
2843
2844 async fn run(
2845 &self,
2846 _state: (),
2847 _resources: &Self::Resources,
2848 bus: &mut Bus,
2849 ) -> Outcome<(), Self::Error> {
2850 match bus.get::<String>() {
2851 Ok(_) => Outcome::Next(()),
2852 Err(err) => Outcome::Fault(err.to_string()),
2853 }
2854 }
2855 }
2856
2857 #[derive(Clone)]
2858 struct RecordingCompensationHook {
2859 calls: Arc<Mutex<Vec<CompensationContext>>>,
2860 should_fail: bool,
2861 }
2862
2863 #[async_trait]
2864 impl CompensationHook for RecordingCompensationHook {
2865 async fn compensate(&self, context: CompensationContext) -> Result<()> {
2866 self.calls.lock().await.push(context);
2867 if self.should_fail {
2868 return Err(anyhow::anyhow!("compensation failed"));
2869 }
2870 Ok(())
2871 }
2872 }
2873
2874 #[derive(Clone)]
2875 struct FlakyCompensationHook {
2876 calls: Arc<Mutex<u32>>,
2877 failures_remaining: Arc<Mutex<u32>>,
2878 }
2879
2880 #[async_trait]
2881 impl CompensationHook for FlakyCompensationHook {
2882 async fn compensate(&self, _context: CompensationContext) -> Result<()> {
2883 {
2884 let mut calls = self.calls.lock().await;
2885 *calls += 1;
2886 }
2887 let mut failures_remaining = self.failures_remaining.lock().await;
2888 if *failures_remaining > 0 {
2889 *failures_remaining -= 1;
2890 return Err(anyhow::anyhow!("transient compensation failure"));
2891 }
2892 Ok(())
2893 }
2894 }
2895
2896 #[derive(Clone)]
2897 struct FailingCompensationIdempotencyStore {
2898 read_calls: Arc<Mutex<u32>>,
2899 write_calls: Arc<Mutex<u32>>,
2900 }
2901
2902 #[async_trait]
2903 impl CompensationIdempotencyStore for FailingCompensationIdempotencyStore {
2904 async fn was_compensated(&self, _key: &str) -> Result<bool> {
2905 let mut read_calls = self.read_calls.lock().await;
2906 *read_calls += 1;
2907 Err(anyhow::anyhow!("forced idempotency read failure"))
2908 }
2909
2910 async fn mark_compensated(&self, _key: &str) -> Result<()> {
2911 let mut write_calls = self.write_calls.lock().await;
2912 *write_calls += 1;
2913 Err(anyhow::anyhow!("forced idempotency write failure"))
2914 }
2915 }
2916
2917 #[tokio::test]
2918 async fn execute_persists_success_trace_when_handle_exists() {
2919 let store_impl = Arc::new(InMemoryPersistenceStore::new());
2920 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
2921
2922 let mut bus = Bus::new();
2923 bus.insert(PersistenceHandle::from_arc(store_dyn));
2924 bus.insert(PersistenceTraceId::new("trace-success"));
2925
2926 let axon = Axon::<i32, i32, TestInfallible>::start("PersistSuccess").then(AddOne);
2927 let outcome = axon.execute(41, &(), &mut bus).await;
2928 assert!(matches!(outcome, Outcome::Next(42)));
2929
2930 let persisted = store_impl.load("trace-success").await.unwrap().unwrap();
2931 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[0].outcome_kind, "Enter");
2933 assert_eq!(persisted.events[1].outcome_kind, "Next"); assert_eq!(persisted.events[2].outcome_kind, "Next"); assert_eq!(persisted.completion, Some(CompletionState::Success));
2936 }
2937
2938 #[tokio::test]
2939 async fn execute_persists_fault_completion_state() {
2940 let store_impl = Arc::new(InMemoryPersistenceStore::new());
2941 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
2942
2943 let mut bus = Bus::new();
2944 bus.insert(PersistenceHandle::from_arc(store_dyn));
2945 bus.insert(PersistenceTraceId::new("trace-fault"));
2946
2947 let axon = Axon::<i32, i32, String>::start("PersistFault").then(AlwaysFault);
2948 let outcome = axon.execute(41, &(), &mut bus).await;
2949 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
2950
2951 let persisted = store_impl.load("trace-fault").await.unwrap().unwrap();
2952 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
2956 }
2957
2958 #[tokio::test]
2959 async fn execute_respects_persistence_auto_complete_off() {
2960 let store_impl = Arc::new(InMemoryPersistenceStore::new());
2961 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
2962
2963 let mut bus = Bus::new();
2964 bus.insert(PersistenceHandle::from_arc(store_dyn));
2965 bus.insert(PersistenceTraceId::new("trace-no-complete"));
2966 bus.insert(PersistenceAutoComplete(false));
2967
2968 let axon = Axon::<i32, i32, TestInfallible>::start("PersistNoComplete").then(AddOne);
2969 let outcome = axon.execute(1, &(), &mut bus).await;
2970 assert!(matches!(outcome, Outcome::Next(2)));
2971
2972 let persisted = store_impl.load("trace-no-complete").await.unwrap().unwrap();
2973 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.completion, None);
2975 }
2976
2977 #[tokio::test]
2978 async fn fault_triggers_compensation_and_marks_compensated() {
2979 let store_impl = Arc::new(InMemoryPersistenceStore::new());
2980 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
2981 let calls = Arc::new(Mutex::new(Vec::new()));
2982 let compensation = RecordingCompensationHook {
2983 calls: calls.clone(),
2984 should_fail: false,
2985 };
2986
2987 let mut bus = Bus::new();
2988 bus.insert(PersistenceHandle::from_arc(store_dyn));
2989 bus.insert(PersistenceTraceId::new("trace-compensated"));
2990 bus.insert(CompensationHandle::from_hook(compensation));
2991
2992 let axon = Axon::<i32, i32, String>::start("CompensatedFault").then(AlwaysFault);
2993 let outcome = axon.execute(7, &(), &mut bus).await;
2994 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
2995
2996 let persisted = store_impl.load("trace-compensated").await.unwrap().unwrap();
2997 assert_eq!(persisted.events.len(), 4); assert_eq!(persisted.events[0].outcome_kind, "Enter");
2999 assert_eq!(persisted.events[1].outcome_kind, "Fault"); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.events[3].outcome_kind, "Compensated");
3002 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3003
3004 let recorded = calls.lock().await;
3005 assert_eq!(recorded.len(), 1);
3006 assert_eq!(recorded[0].trace_id, "trace-compensated");
3007 assert_eq!(recorded[0].fault_kind, "Fault");
3008 }
3009
3010 #[tokio::test]
3011 async fn failed_compensation_keeps_fault_completion() {
3012 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3013 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3014 let calls = Arc::new(Mutex::new(Vec::new()));
3015 let compensation = RecordingCompensationHook {
3016 calls: calls.clone(),
3017 should_fail: true,
3018 };
3019
3020 let mut bus = Bus::new();
3021 bus.insert(PersistenceHandle::from_arc(store_dyn));
3022 bus.insert(PersistenceTraceId::new("trace-compensation-failed"));
3023 bus.insert(CompensationHandle::from_hook(compensation));
3024
3025 let axon = Axon::<i32, i32, String>::start("CompensationFails").then(AlwaysFault);
3026 let outcome = axon.execute(7, &(), &mut bus).await;
3027 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3028
3029 let persisted = store_impl
3030 .load("trace-compensation-failed")
3031 .await
3032 .unwrap()
3033 .unwrap();
3034 assert_eq!(persisted.events.len(), 3); assert_eq!(persisted.events[2].outcome_kind, "Fault"); assert_eq!(persisted.completion, Some(CompletionState::Fault));
3037
3038 let recorded = calls.lock().await;
3039 assert_eq!(recorded.len(), 1);
3040 }
3041
3042 #[tokio::test]
3043 async fn compensation_retry_policy_succeeds_after_retries() {
3044 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3045 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3046 let calls = Arc::new(Mutex::new(0u32));
3047 let failures_remaining = Arc::new(Mutex::new(2u32));
3048 let compensation = FlakyCompensationHook {
3049 calls: calls.clone(),
3050 failures_remaining,
3051 };
3052
3053 let mut bus = Bus::new();
3054 bus.insert(PersistenceHandle::from_arc(store_dyn));
3055 bus.insert(PersistenceTraceId::new("trace-retry-success"));
3056 bus.insert(CompensationHandle::from_hook(compensation));
3057 bus.insert(CompensationRetryPolicy {
3058 max_attempts: 3,
3059 backoff_ms: 0,
3060 });
3061
3062 let axon = Axon::<i32, i32, String>::start("CompensationRetry").then(AlwaysFault);
3063 let outcome = axon.execute(7, &(), &mut bus).await;
3064 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3065
3066 let persisted = store_impl
3067 .load("trace-retry-success")
3068 .await
3069 .unwrap()
3070 .unwrap();
3071 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3072 assert_eq!(
3073 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3074 Some("Compensated")
3075 );
3076
3077 let attempt_count = calls.lock().await;
3078 assert_eq!(*attempt_count, 3);
3079 }
3080
3081 #[tokio::test]
3082 async fn compensation_idempotency_skips_duplicate_hook_execution() {
3083 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3084 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3085 let calls = Arc::new(Mutex::new(Vec::new()));
3086 let compensation = RecordingCompensationHook {
3087 calls: calls.clone(),
3088 should_fail: false,
3089 };
3090 let idempotency = InMemoryCompensationIdempotencyStore::new();
3091
3092 let mut bus = Bus::new();
3093 bus.insert(PersistenceHandle::from_arc(store_dyn));
3094 bus.insert(PersistenceTraceId::new("trace-idempotent"));
3095 bus.insert(PersistenceAutoComplete(false));
3096 bus.insert(CompensationHandle::from_hook(compensation));
3097 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3098
3099 let axon = Axon::<i32, i32, String>::start("CompensationIdempotency").then(AlwaysFault);
3100
3101 let outcome1 = axon.execute(7, &(), &mut bus).await;
3102 let outcome2 = axon.execute(8, &(), &mut bus).await;
3103 assert!(matches!(outcome1, Outcome::Fault(msg) if msg == "boom"));
3104 assert!(matches!(outcome2, Outcome::Fault(msg) if msg == "boom"));
3105
3106 let persisted = store_impl.load("trace-idempotent").await.unwrap().unwrap();
3107 assert_eq!(persisted.completion, None);
3108 let compensated_count = persisted
3110 .events
3111 .iter()
3112 .filter(|e| e.outcome_kind == "Compensated")
3113 .count();
3114 assert_eq!(
3115 compensated_count, 2,
3116 "Should have 2 Compensated events (one per execution)"
3117 );
3118
3119 let recorded = calls.lock().await;
3120 assert_eq!(recorded.len(), 1);
3121 }
3122
3123 #[tokio::test]
3124 async fn compensation_idempotency_store_failure_does_not_block_compensation() {
3125 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3126 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3127 let calls = Arc::new(Mutex::new(Vec::new()));
3128 let read_calls = Arc::new(Mutex::new(0u32));
3129 let write_calls = Arc::new(Mutex::new(0u32));
3130 let compensation = RecordingCompensationHook {
3131 calls: calls.clone(),
3132 should_fail: false,
3133 };
3134 let idempotency = FailingCompensationIdempotencyStore {
3135 read_calls: read_calls.clone(),
3136 write_calls: write_calls.clone(),
3137 };
3138
3139 let mut bus = Bus::new();
3140 bus.insert(PersistenceHandle::from_arc(store_dyn));
3141 bus.insert(PersistenceTraceId::new("trace-idempotency-store-failure"));
3142 bus.insert(CompensationHandle::from_hook(compensation));
3143 bus.insert(CompensationIdempotencyHandle::from_store(idempotency));
3144
3145 let axon = Axon::<i32, i32, String>::start("IdempotencyStoreFailure").then(AlwaysFault);
3146 let outcome = axon.execute(9, &(), &mut bus).await;
3147 assert!(matches!(outcome, Outcome::Fault(msg) if msg == "boom"));
3148
3149 let persisted = store_impl
3150 .load("trace-idempotency-store-failure")
3151 .await
3152 .unwrap()
3153 .unwrap();
3154 assert_eq!(persisted.completion, Some(CompletionState::Compensated));
3155 assert_eq!(
3156 persisted.events.last().map(|e| e.outcome_kind.as_str()),
3157 Some("Compensated")
3158 );
3159
3160 let recorded = calls.lock().await;
3161 assert_eq!(recorded.len(), 1);
3162 assert_eq!(*read_calls.lock().await, 1);
3163 assert_eq!(*write_calls.lock().await, 1);
3164 }
3165
3166 #[tokio::test]
3167 async fn transition_bus_policy_blocks_unauthorized_resource_access() {
3168 let mut bus = Bus::new();
3169 bus.insert(1_i32);
3170 bus.insert("secret".to_string());
3171
3172 let axon = Axon::<(), (), String>::start("BusPolicy").then(CapabilityGuarded);
3173 let outcome = axon.execute((), &(), &mut bus).await;
3174
3175 match outcome {
3176 Outcome::Fault(msg) => {
3177 assert!(msg.contains("Bus access denied"), "{msg}");
3178 assert!(msg.contains("CapabilityGuarded"), "{msg}");
3179 assert!(msg.contains("alloc::string::String"), "{msg}");
3180 }
3181 other => panic!("expected fault, got {other:?}"),
3182 }
3183 }
3184
3185 #[tokio::test]
3186 async fn execute_fails_on_version_mismatch_without_migration() {
3187 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3188 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3189
3190 let trace_id = "v-mismatch";
3191 let old_envelope = crate::persistence::PersistenceEnvelope {
3193 trace_id: trace_id.to_string(),
3194 circuit: "TestCircuit".to_string(),
3195 schematic_version: "0.9".to_string(),
3196 step: 0,
3197 node_id: None,
3198 outcome_kind: "Enter".to_string(),
3199 timestamp_ms: 0,
3200 payload_hash: None,
3201 payload: None,
3202 };
3203 store_impl.append(old_envelope).await.unwrap();
3204
3205 let mut bus = Bus::new();
3206 bus.insert(PersistenceHandle::from_arc(store_dyn));
3207 bus.insert(PersistenceTraceId::new(trace_id));
3208
3209 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3211 let outcome = axon.execute(10, &(), &mut bus).await;
3212
3213 if let Outcome::Emit(kind, _) = outcome {
3214 assert_eq!(kind, "execution.resumption.version_mismatch_failed");
3215 } else {
3216 panic!("Expected version mismatch emission, got {:?}", outcome);
3217 }
3218 }
3219
3220 #[tokio::test]
3221 async fn execute_resumes_from_start_on_migration_strategy() {
3222 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3223 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3224
3225 let trace_id = "v-migration";
3226 let old_envelope = crate::persistence::PersistenceEnvelope {
3228 trace_id: trace_id.to_string(),
3229 circuit: "TestCircuit".to_string(),
3230 schematic_version: "0.9".to_string(),
3231 step: 5,
3232 node_id: None,
3233 outcome_kind: "Next".to_string(),
3234 timestamp_ms: 0,
3235 payload_hash: None,
3236 payload: None,
3237 };
3238 store_impl.append(old_envelope).await.unwrap();
3239
3240 let mut registry = ranvier_core::schematic::MigrationRegistry::new("TestCircuit");
3241 registry.register(ranvier_core::schematic::SnapshotMigration {
3242 name: Some("v0.9 to v1.0".to_string()),
3243 from_version: "0.9".to_string(),
3244 to_version: "1.0".to_string(),
3245 default_strategy: ranvier_core::schematic::MigrationStrategy::ResumeFromStart,
3246 node_mapping: std::collections::HashMap::new(),
3247 payload_mapper: None,
3248 });
3249
3250 let mut bus = Bus::new();
3251 bus.insert(PersistenceHandle::from_arc(store_dyn));
3252 bus.insert(PersistenceTraceId::new(trace_id));
3253 bus.insert(registry);
3254
3255 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit").then(AddOne);
3256 let outcome = axon.execute(10, &(), &mut bus).await;
3257
3258 assert!(matches!(outcome, Outcome::Next(11)));
3260
3261 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3263 assert_eq!(persisted.schematic_version, "1.0");
3264 }
3265
3266 #[tokio::test]
3267 async fn execute_applies_manual_intervention_jump_and_payload() {
3268 let store_impl = Arc::new(InMemoryPersistenceStore::new());
3269 let store_dyn: Arc<dyn PersistenceStore> = store_impl.clone();
3270
3271 let trace_id = "intervention-test";
3272 let axon = Axon::<i32, i32, TestInfallible>::new("TestCircuit")
3274 .then(AddOne)
3275 .then(AddOne);
3276
3277 let mut bus = Bus::new();
3278 bus.insert(PersistenceHandle::from_arc(store_dyn));
3279 bus.insert(PersistenceTraceId::new(trace_id));
3280
3281 let _target_node_label = "AddOne";
3286 let target_node_id = axon.schematic.nodes[2].id.clone();
3288
3289 store_impl
3291 .append(crate::persistence::PersistenceEnvelope {
3292 trace_id: trace_id.to_string(),
3293 circuit: "TestCircuit".to_string(),
3294 schematic_version: "1.0".to_string(),
3295 step: 0,
3296 node_id: None,
3297 outcome_kind: "Enter".to_string(),
3298 timestamp_ms: 0,
3299 payload_hash: None,
3300 payload: None,
3301 })
3302 .await
3303 .unwrap();
3304
3305 store_impl
3306 .save_intervention(
3307 trace_id,
3308 crate::persistence::Intervention {
3309 target_node: target_node_id.clone(),
3310 payload_override: Some(serde_json::json!(100)),
3311 timestamp_ms: 0,
3312 },
3313 )
3314 .await
3315 .unwrap();
3316
3317 let outcome = axon.execute(10, &(), &mut bus).await;
3320
3321 match outcome {
3322 Outcome::Next(val) => assert_eq!(val, 101, "Should have used payload 100 and added 1"),
3323 other => panic!("Expected Outcome::Next(101), got {:?}", other),
3324 }
3325
3326 let persisted = store_impl.load(trace_id).await.unwrap().unwrap();
3328 assert_eq!(persisted.interventions.len(), 1);
3330 assert_eq!(persisted.interventions[0].target_node, target_node_id);
3331 }
3332
3333 #[derive(Clone)]
3337 struct FailNThenSucceed {
3338 remaining: Arc<tokio::sync::Mutex<u32>>,
3339 }
3340
3341 #[async_trait]
3342 impl Transition<i32, i32> for FailNThenSucceed {
3343 type Error = String;
3344 type Resources = ();
3345
3346 async fn run(
3347 &self,
3348 state: i32,
3349 _resources: &Self::Resources,
3350 _bus: &mut Bus,
3351 ) -> Outcome<i32, Self::Error> {
3352 let mut rem = self.remaining.lock().await;
3353 if *rem > 0 {
3354 *rem -= 1;
3355 Outcome::Fault("transient failure".to_string())
3356 } else {
3357 Outcome::Next(state + 1)
3358 }
3359 }
3360 }
3361
3362 #[derive(Clone)]
3364 struct MockDlqSink {
3365 letters: Arc<tokio::sync::Mutex<Vec<String>>>,
3366 }
3367
3368 #[async_trait]
3369 impl DlqSink for MockDlqSink {
3370 async fn store_dead_letter(
3371 &self,
3372 workflow_id: &str,
3373 _circuit_label: &str,
3374 node_id: &str,
3375 error_msg: &str,
3376 _payload: &[u8],
3377 ) -> Result<(), String> {
3378 let entry = format!("{}:{}:{}", workflow_id, node_id, error_msg);
3379 self.letters.lock().await.push(entry);
3380 Ok(())
3381 }
3382 }
3383
3384 #[tokio::test]
3385 async fn retry_then_dlq_retries_and_succeeds_before_exhaustion() {
3386 let remaining = Arc::new(tokio::sync::Mutex::new(2u32));
3388 let trans = FailNThenSucceed { remaining };
3389
3390 let dlq_sink = MockDlqSink {
3391 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3392 };
3393
3394 let mut bus = Bus::new();
3395 bus.insert(Timeline::new());
3396
3397 let axon = Axon::<i32, i32, String>::start("RetrySucceed")
3398 .then(trans)
3399 .with_dlq_policy(DlqPolicy::RetryThenDlq {
3400 max_attempts: 5,
3401 backoff_ms: 1,
3402 })
3403 .with_dlq_sink(dlq_sink.clone());
3404 let outcome = axon.execute(10, &(), &mut bus).await;
3405
3406 assert!(
3408 matches!(outcome, Outcome::Next(11)),
3409 "Expected Next(11), got {:?}",
3410 outcome
3411 );
3412
3413 let letters = dlq_sink.letters.lock().await;
3415 assert!(
3416 letters.is_empty(),
3417 "Should have 0 dead letters, got {}",
3418 letters.len()
3419 );
3420
3421 let timeline = bus.read::<Timeline>().unwrap();
3423 let retry_count = timeline
3424 .events
3425 .iter()
3426 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3427 .count();
3428 assert_eq!(retry_count, 2, "Should have 2 retry events");
3429 }
3430
3431 #[tokio::test]
3432 async fn retry_then_dlq_exhausts_retries_and_sends_to_dlq() {
3433 let mut bus = Bus::new();
3435 bus.insert(Timeline::new());
3436
3437 let dlq_sink = MockDlqSink {
3438 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3439 };
3440
3441 let axon = Axon::<i32, i32, String>::start("RetryExhaust")
3442 .then(AlwaysFault)
3443 .with_dlq_policy(DlqPolicy::RetryThenDlq {
3444 max_attempts: 3,
3445 backoff_ms: 1,
3446 })
3447 .with_dlq_sink(dlq_sink.clone());
3448 let outcome = axon.execute(42, &(), &mut bus).await;
3449
3450 assert!(
3451 matches!(outcome, Outcome::Fault(ref msg) if msg == "boom"),
3452 "Expected Fault(boom), got {:?}",
3453 outcome
3454 );
3455
3456 let letters = dlq_sink.letters.lock().await;
3458 assert_eq!(letters.len(), 1, "Should have 1 dead letter");
3459
3460 let timeline = bus.read::<Timeline>().unwrap();
3462 let retry_count = timeline
3463 .events
3464 .iter()
3465 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3466 .count();
3467 let dlq_count = timeline
3468 .events
3469 .iter()
3470 .filter(|e| matches!(e, TimelineEvent::DlqExhausted { .. }))
3471 .count();
3472 assert_eq!(
3473 retry_count, 2,
3474 "Should have 2 retry events (attempts 2 and 3)"
3475 );
3476 assert_eq!(dlq_count, 1, "Should have 1 DlqExhausted event");
3477 }
3478
3479 #[tokio::test]
3480 async fn send_to_dlq_policy_sends_immediately_without_retry() {
3481 let mut bus = Bus::new();
3482 bus.insert(Timeline::new());
3483
3484 let dlq_sink = MockDlqSink {
3485 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3486 };
3487
3488 let axon = Axon::<i32, i32, String>::start("SendDlq")
3489 .then(AlwaysFault)
3490 .with_dlq_policy(DlqPolicy::SendToDlq)
3491 .with_dlq_sink(dlq_sink.clone());
3492 let outcome = axon.execute(1, &(), &mut bus).await;
3493
3494 assert!(matches!(outcome, Outcome::Fault(_)));
3495
3496 let letters = dlq_sink.letters.lock().await;
3498 assert_eq!(letters.len(), 1);
3499
3500 let timeline = bus.read::<Timeline>().unwrap();
3502 let retry_count = timeline
3503 .events
3504 .iter()
3505 .filter(|e| matches!(e, TimelineEvent::NodeRetry { .. }))
3506 .count();
3507 assert_eq!(retry_count, 0);
3508 }
3509
3510 #[tokio::test]
3511 async fn drop_policy_does_not_send_to_dlq() {
3512 let mut bus = Bus::new();
3513
3514 let dlq_sink = MockDlqSink {
3515 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3516 };
3517
3518 let axon = Axon::<i32, i32, String>::start("DropDlq")
3519 .then(AlwaysFault)
3520 .with_dlq_policy(DlqPolicy::Drop)
3521 .with_dlq_sink(dlq_sink.clone());
3522 let outcome = axon.execute(1, &(), &mut bus).await;
3523
3524 assert!(matches!(outcome, Outcome::Fault(_)));
3525
3526 let letters = dlq_sink.letters.lock().await;
3528 assert!(letters.is_empty());
3529 }
3530
3531 #[tokio::test]
3532 async fn dynamic_policy_hot_reload_changes_dlq_behavior() {
3533 use ranvier_core::policy::DynamicPolicy;
3534
3535 let (tx, dynamic) = DynamicPolicy::new(DlqPolicy::Drop);
3537 let dlq_sink = MockDlqSink {
3538 letters: Arc::new(tokio::sync::Mutex::new(Vec::new())),
3539 };
3540
3541 let axon = Axon::<i32, i32, String>::start("DynamicDlq")
3542 .then(AlwaysFault)
3543 .with_dynamic_dlq_policy(dynamic)
3544 .with_dlq_sink(dlq_sink.clone());
3545
3546 let mut bus = Bus::new();
3548 let outcome = axon.execute(1, &(), &mut bus).await;
3549 assert!(matches!(outcome, Outcome::Fault(_)));
3550 assert!(
3551 dlq_sink.letters.lock().await.is_empty(),
3552 "Drop policy should produce no DLQ entries"
3553 );
3554
3555 tx.send(DlqPolicy::SendToDlq).unwrap();
3557
3558 let mut bus2 = Bus::new();
3560 let outcome2 = axon.execute(2, &(), &mut bus2).await;
3561 assert!(matches!(outcome2, Outcome::Fault(_)));
3562 assert_eq!(
3563 dlq_sink.letters.lock().await.len(),
3564 1,
3565 "SendToDlq policy should produce 1 DLQ entry"
3566 );
3567 }
3568
3569 #[tokio::test]
3570 async fn dynamic_saga_policy_hot_reload() {
3571 use ranvier_core::policy::DynamicPolicy;
3572 use ranvier_core::saga::SagaPolicy;
3573
3574 let (tx, dynamic) = DynamicPolicy::new(SagaPolicy::Disabled);
3576
3577 let axon = Axon::<i32, i32, TestInfallible>::start("DynamicSaga")
3578 .then(AddOne)
3579 .with_dynamic_saga_policy(dynamic);
3580
3581 let mut bus = Bus::new();
3583 let _outcome = axon.execute(1, &(), &mut bus).await;
3584 assert!(
3585 bus.read::<SagaStack>().is_none() || bus.read::<SagaStack>().unwrap().is_empty(),
3586 "SagaStack should be absent or empty when disabled"
3587 );
3588
3589 tx.send(SagaPolicy::Enabled).unwrap();
3591
3592 let mut bus2 = Bus::new();
3594 let _outcome2 = axon.execute(10, &(), &mut bus2).await;
3595 assert!(
3596 bus2.read::<SagaStack>().is_some(),
3597 "SagaStack should exist when saga is enabled"
3598 );
3599 }
3600
3601 mod iam_tests {
3604 use super::*;
3605 use ranvier_core::iam::{IamError, IamIdentity, IamPolicy, IamToken, IamVerifier};
3606
3607 #[derive(Clone)]
3609 struct MockVerifier {
3610 identity: IamIdentity,
3611 should_fail: bool,
3612 }
3613
3614 #[async_trait]
3615 impl IamVerifier for MockVerifier {
3616 async fn verify(&self, _token: &str) -> Result<IamIdentity, IamError> {
3617 if self.should_fail {
3618 Err(IamError::InvalidToken("mock verification failure".into()))
3619 } else {
3620 Ok(self.identity.clone())
3621 }
3622 }
3623 }
3624
3625 #[tokio::test]
3626 async fn iam_require_identity_passes_with_valid_token() {
3627 let verifier = MockVerifier {
3628 identity: IamIdentity::new("alice").with_role("user"),
3629 should_fail: false,
3630 };
3631
3632 let axon = Axon::<i32, i32, TestInfallible>::new("IamTest")
3633 .with_iam(IamPolicy::RequireIdentity, verifier)
3634 .then(AddOne);
3635
3636 let mut bus = Bus::new();
3637 bus.insert(IamToken("valid-token".to_string()));
3638 let outcome = axon.execute(10, &(), &mut bus).await;
3639
3640 assert!(matches!(outcome, Outcome::Next(11)));
3641 let identity = bus
3643 .read::<IamIdentity>()
3644 .expect("IamIdentity should be in Bus");
3645 assert_eq!(identity.subject, "alice");
3646 }
3647
3648 #[tokio::test]
3649 async fn iam_require_identity_rejects_missing_token() {
3650 let verifier = MockVerifier {
3651 identity: IamIdentity::new("ignored"),
3652 should_fail: false,
3653 };
3654
3655 let axon = Axon::<i32, i32, TestInfallible>::new("IamNoToken")
3656 .with_iam(IamPolicy::RequireIdentity, verifier)
3657 .then(AddOne);
3658
3659 let mut bus = Bus::new();
3660 let outcome = axon.execute(10, &(), &mut bus).await;
3662
3663 match &outcome {
3665 Outcome::Emit(label, _) => {
3666 assert_eq!(label, "iam.missing_token");
3667 }
3668 other => panic!("Expected Emit(iam.missing_token), got {:?}", other),
3669 }
3670 }
3671
3672 #[tokio::test]
3673 async fn iam_rejects_failed_verification() {
3674 let verifier = MockVerifier {
3675 identity: IamIdentity::new("ignored"),
3676 should_fail: true,
3677 };
3678
3679 let axon = Axon::<i32, i32, TestInfallible>::new("IamBadToken")
3680 .with_iam(IamPolicy::RequireIdentity, verifier)
3681 .then(AddOne);
3682
3683 let mut bus = Bus::new();
3684 bus.insert(IamToken("bad-token".to_string()));
3685 let outcome = axon.execute(10, &(), &mut bus).await;
3686
3687 match &outcome {
3688 Outcome::Emit(label, _) => {
3689 assert_eq!(label, "iam.verification_failed");
3690 }
3691 other => panic!("Expected Emit(iam.verification_failed), got {:?}", other),
3692 }
3693 }
3694
3695 #[tokio::test]
3696 async fn iam_require_role_passes_with_matching_role() {
3697 let verifier = MockVerifier {
3698 identity: IamIdentity::new("bob").with_role("admin").with_role("user"),
3699 should_fail: false,
3700 };
3701
3702 let axon = Axon::<i32, i32, TestInfallible>::new("IamRole")
3703 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
3704 .then(AddOne);
3705
3706 let mut bus = Bus::new();
3707 bus.insert(IamToken("token".to_string()));
3708 let outcome = axon.execute(5, &(), &mut bus).await;
3709
3710 assert!(matches!(outcome, Outcome::Next(6)));
3711 }
3712
3713 #[tokio::test]
3714 async fn iam_require_role_denies_without_role() {
3715 let verifier = MockVerifier {
3716 identity: IamIdentity::new("carol").with_role("user"),
3717 should_fail: false,
3718 };
3719
3720 let axon = Axon::<i32, i32, TestInfallible>::new("IamRoleDeny")
3721 .with_iam(IamPolicy::RequireRole("admin".into()), verifier)
3722 .then(AddOne);
3723
3724 let mut bus = Bus::new();
3725 bus.insert(IamToken("token".to_string()));
3726 let outcome = axon.execute(5, &(), &mut bus).await;
3727
3728 match &outcome {
3729 Outcome::Emit(label, _) => {
3730 assert_eq!(label, "iam.policy_denied");
3731 }
3732 other => panic!("Expected Emit(iam.policy_denied), got {:?}", other),
3733 }
3734 }
3735
3736 #[tokio::test]
3737 async fn iam_policy_none_skips_verification() {
3738 let verifier = MockVerifier {
3739 identity: IamIdentity::new("ignored"),
3740 should_fail: true, };
3742
3743 let axon = Axon::<i32, i32, TestInfallible>::new("IamNone")
3744 .with_iam(IamPolicy::None, verifier)
3745 .then(AddOne);
3746
3747 let mut bus = Bus::new();
3748 let outcome = axon.execute(10, &(), &mut bus).await;
3750
3751 assert!(matches!(outcome, Outcome::Next(11)));
3752 }
3753 }
3754}