pub struct Axon<In, Out, E, Res = ()> {
pub schematic: Schematic,
pub execution_mode: ExecutionMode,
pub persistence_store: Option<Arc<dyn PersistenceStore>>,
pub audit_sink: Option<Arc<dyn AuditSink>>,
pub dlq_sink: Option<Arc<dyn DlqSink>>,
pub dlq_policy: DlqPolicy,
pub dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>,
pub saga_policy: SagaPolicy,
pub dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>,
pub saga_compensation_registry: Arc<RwLock<SagaCompensationRegistry<E, Res>>>,
pub iam_handle: Option<IamHandle>,
/* private fields */
}Expand description
The Axon Builder and Runtime.
Axon represents an executable decision tree.
It is reusable and thread-safe.
§Example
use ranvier_core::prelude::*;
// ...
// Start with an identity Axon (In -> In)
let axon = Axon::<(), (), _>::new("My Axon")
.then(StepA)
.then(StepB);
// Execute multiple times
let res1 = axon.execute((), &mut bus1).await;
let res2 = axon.execute((), &mut bus2).await;Fields§
§schematic: SchematicThe static structure (for visualization/analysis)
execution_mode: ExecutionModeHow this Axon is executed across the cluster
persistence_store: Option<Arc<dyn PersistenceStore>>Optional persistence store for state inspection
audit_sink: Option<Arc<dyn AuditSink>>Optional audit sink for tamper-evident logging of interventions
dlq_sink: Option<Arc<dyn DlqSink>>Optional dead-letter queue sink for storing failed events
dlq_policy: DlqPolicyPolicy for handling event failures
dynamic_dlq_policy: Option<DynamicPolicy<DlqPolicy>>Optional dynamic (hot-reloadable) DLQ policy — takes precedence over static dlq_policy
saga_policy: SagaPolicyPolicy for automated saga compensation
dynamic_saga_policy: Option<DynamicPolicy<SagaPolicy>>Optional dynamic (hot-reloadable) Saga policy — takes precedence over static saga_policy
saga_compensation_registry: Arc<RwLock<SagaCompensationRegistry<E, Res>>>Registry for Saga compensation handlers
iam_handle: Option<IamHandle>Optional IAM handle for identity verification at the Schematic boundary
Implementations§
Source§impl<In, E, Res> Axon<In, In, E, Res>where
In: Send + Sync + Serialize + DeserializeOwned + 'static,
E: Send + Sync + Serialize + DeserializeOwned + Debug + 'static,
Res: ResourceRequirement,
impl<In, E, Res> Axon<In, In, E, Res>where
In: Send + Sync + Serialize + DeserializeOwned + 'static,
E: Send + Sync + Serialize + DeserializeOwned + Debug + 'static,
Res: ResourceRequirement,
Source§impl Axon<(), (), (), ()>
impl Axon<(), (), (), ()>
Sourcepub fn simple<E>(label: &str) -> Axon<(), (), E, ()>
pub fn simple<E>(label: &str) -> Axon<(), (), E, ()>
Convenience constructor for simple pipelines with no input state or resources.
Reduces the common Axon::<(), (), E>::new("label") turbofish to
Axon::simple::<E>("label"), requiring only the error type parameter.
§Example
// Before: 3 type parameters, 2 of which are always ()
let axon = Axon::<(), (), String>::new("pipeline");
// After: only the error type
let axon = Axon::simple::<String>("pipeline");Source§impl<In, Out, E, Res> Axon<In, Out, E, Res>where
In: Send + Sync + Serialize + DeserializeOwned + 'static,
Out: Send + Sync + Serialize + DeserializeOwned + 'static,
E: Send + Sync + Serialize + DeserializeOwned + Debug + 'static,
Res: ResourceRequirement,
impl<In, Out, E, Res> Axon<In, Out, E, Res>where
In: Send + Sync + Serialize + DeserializeOwned + 'static,
Out: Send + Sync + Serialize + DeserializeOwned + 'static,
E: Send + Sync + Serialize + DeserializeOwned + Debug + 'static,
Res: ResourceRequirement,
Sourcepub fn with_execution_mode(self, mode: ExecutionMode) -> Self
pub fn with_execution_mode(self, mode: ExecutionMode) -> Self
Update the Execution Mode for this Axon (e.g., Local vs Singleton).
Sourcepub fn with_version(self, version: impl Into<String>) -> Self
pub fn with_version(self, version: impl Into<String>) -> Self
Set the schematic version for this Axon.
Sourcepub fn with_persistence_store<S>(self, store: S) -> Selfwhere
S: PersistenceStore + 'static,
pub fn with_persistence_store<S>(self, store: S) -> Selfwhere
S: PersistenceStore + 'static,
Attach a persistence store to enable state inspection via the Inspector.
Sourcepub fn with_audit_sink<S>(self, sink: S) -> Selfwhere
S: AuditSink + 'static,
pub fn with_audit_sink<S>(self, sink: S) -> Selfwhere
S: AuditSink + 'static,
Attach an audit sink for tamper-evident logging.
Sourcepub fn with_dlq_sink<S>(self, sink: S) -> Selfwhere
S: DlqSink + 'static,
pub fn with_dlq_sink<S>(self, sink: S) -> Selfwhere
S: DlqSink + 'static,
Set the Dead Letter Queue sink for this Axon.
Sourcepub fn with_dlq_policy(self, policy: DlqPolicy) -> Self
pub fn with_dlq_policy(self, policy: DlqPolicy) -> Self
Set the Dead Letter Queue policy for this Axon.
Sourcepub fn with_saga_policy(self, policy: SagaPolicy) -> Self
pub fn with_saga_policy(self, policy: SagaPolicy) -> Self
Set the Saga compensation policy for this Axon.
Sourcepub fn with_dynamic_dlq_policy(self, dynamic: DynamicPolicy<DlqPolicy>) -> Self
pub fn with_dynamic_dlq_policy(self, dynamic: DynamicPolicy<DlqPolicy>) -> Self
Set a dynamic (hot-reloadable) DLQ policy. When set, the dynamic policy’s
current value is read at each execution, overriding the static dlq_policy.
Sourcepub fn with_dynamic_saga_policy(
self,
dynamic: DynamicPolicy<SagaPolicy>,
) -> Self
pub fn with_dynamic_saga_policy( self, dynamic: DynamicPolicy<SagaPolicy>, ) -> Self
Set a dynamic (hot-reloadable) Saga policy. When set, the dynamic policy’s
current value is read at each execution, overriding the static saga_policy.
Sourcepub fn with_iam(
self,
policy: IamPolicy,
verifier: impl IamVerifier + 'static,
) -> Self
pub fn with_iam( self, policy: IamPolicy, verifier: impl IamVerifier + 'static, ) -> Self
Set an IAM policy and verifier for identity verification at the Axon boundary.
When set, execute() will:
- Read
IamTokenfrom the Bus (injected by the HTTP layer or test harness) - Verify the token using the provided verifier
- Enforce the policy against the verified identity
- Insert the resulting
IamIdentityinto the Bus for downstream Transitions
Sourcepub fn with_input_schema_value(self, schema: Value) -> Self
pub fn with_input_schema_value(self, schema: Value) -> Self
Attach a raw JSON Schema value for the last node’s input type in the schematic.
Use this for pre-built schemas without requiring the schema feature.
Sourcepub fn with_output_schema_value(self, schema: Value) -> Self
pub fn with_output_schema_value(self, schema: Value) -> Self
Attach a raw JSON Schema value for the last node’s output type in the schematic.
Source§impl<In, Out, E, Res> Axon<In, Out, E, Res>where
In: Send + Sync + Serialize + DeserializeOwned + 'static,
Out: Send + Sync + Serialize + DeserializeOwned + 'static,
E: Send + Sync + Serialize + DeserializeOwned + Debug + 'static,
Res: ResourceRequirement,
impl<In, Out, E, Res> Axon<In, Out, E, Res>where
In: Send + Sync + Serialize + DeserializeOwned + 'static,
Out: Send + Sync + Serialize + DeserializeOwned + 'static,
E: Send + Sync + Serialize + DeserializeOwned + Debug + 'static,
Res: ResourceRequirement,
Sourcepub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>where
Next: Send + Sync + Serialize + DeserializeOwned + 'static,
Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>where
Next: Send + Sync + Serialize + DeserializeOwned + 'static,
Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
Chain a transition to this Axon.
Requires the transition to use the SAME resource bundle as the previous steps.
Sourcepub fn then_with_retry<Next, Trans>(
self,
transition: Trans,
policy: RetryPolicy,
) -> Axon<In, Next, E, Res>where
Out: Clone,
Next: Send + Sync + Serialize + DeserializeOwned + 'static,
Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
pub fn then_with_retry<Next, Trans>(
self,
transition: Trans,
policy: RetryPolicy,
) -> Axon<In, Next, E, Res>where
Out: Clone,
Next: Send + Sync + Serialize + DeserializeOwned + 'static,
Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
Chain a transition with a retry policy.
If the transition returns Outcome::Fault, it will be retried up to
policy.max_retries times with the configured backoff strategy.
Timeline events are recorded for each retry attempt.
§Example
use ranvier_runtime::{Axon, RetryPolicy};
use std::time::Duration;
let axon = Axon::new("pipeline")
.then_with_retry(my_transition, RetryPolicy::fixed(3, Duration::from_millis(100)));Sourcepub fn then_compensated<Next, Trans, Comp>(
self,
transition: Trans,
compensation: Comp,
) -> Axon<In, Next, E, Res>where
Out: Clone,
Next: Send + Sync + Serialize + DeserializeOwned + 'static,
Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
pub fn then_compensated<Next, Trans, Comp>(
self,
transition: Trans,
compensation: Comp,
) -> Axon<In, Next, E, Res>where
Out: Clone,
Next: Send + Sync + Serialize + DeserializeOwned + 'static,
Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
Comp: Transition<Out, (), Resources = Res, Error = E> + Clone + Send + Sync + 'static,
Chain a transition to this Axon with a Saga compensation step.
If the transition fails, the compensation transition will be executed
automatically if CompensationAutoTrigger is enabled in the Bus.
Sourcepub fn compensate_with<Comp>(self, transition: Comp) -> Self
pub fn compensate_with<Comp>(self, transition: Comp) -> Self
Attach a compensation transition to the previously added node. This establishes a Schematic-level Saga compensation mapping.
Sourcepub fn parallel(
self,
transitions: Vec<Arc<dyn Transition<Out, Out, Resources = Res, Error = E> + Send + Sync>>,
strategy: ParallelStrategy,
) -> Axon<In, Out, E, Res>where
Out: Clone,
pub fn parallel(
self,
transitions: Vec<Arc<dyn Transition<Out, Out, Resources = Res, Error = E> + Send + Sync>>,
strategy: ParallelStrategy,
) -> Axon<In, Out, E, Res>where
Out: Clone,
Run multiple transitions in parallel (fan-out / fan-in).
Each transition receives a clone of the current step’s output and runs
concurrently via futures_util::future::join_all. The strategy controls
how faults are handled:
ParallelStrategy::AllMustSucceed: All branches must produceNext. If any branch returnsFault, the first fault is propagated.ParallelStrategy::AnyCanFail: Branches that fault are ignored as long as at least one succeeds. If all branches fault, the first fault is returned.
The first successful Next value is forwarded to the next step in
the pipeline. A custom merge can be layered on top via a subsequent
.then() step.
Each parallel branch receives its own fresh Bus instance. Resources
should be injected via the shared Res bundle rather than the Bus for
parallel steps.
§Schematic
The method emits a FanOut node, one Atom node per branch (connected
via Parallel edges), and a FanIn join node.
§Example
let axon = Axon::new("Pipeline")
.then(ParseInput)
.parallel(
vec![Arc::new(EnrichA), Arc::new(EnrichB)],
ParallelStrategy::AllMustSucceed,
)
.then(MergeResults);Sourcepub async fn execute(
&self,
input: In,
resources: &Res,
bus: &mut Bus,
) -> Outcome<Out, E>
pub async fn execute( &self, input: In, resources: &Res, bus: &mut Bus, ) -> Outcome<Out, E>
Execute the Axon with the given input and resources.
Sourcepub fn serve_inspector(self, port: u16) -> Self
pub fn serve_inspector(self, port: u16) -> Self
Starts the Ranvier Inspector for this Axon on the specified port. This spawns a background task to serve the Schematic.
Sourcepub fn into_schematic(self) -> Schematic
pub fn into_schematic(self) -> Schematic
Consume and return the Schematic.
Sourcepub fn schematic_export_request(&self) -> Option<SchematicExportRequest>
pub fn schematic_export_request(&self) -> Option<SchematicExportRequest>
Detect schematic export mode from runtime flags.
Supported triggers:
RANVIER_SCHEMATIC=1|true|on|yes--schematic
Optional output path:
RANVIER_SCHEMATIC_OUTPUT=<path>--schematic-output <path>/--schematic-output=<path>--output <path>/--output=<path>(only relevant in schematic mode)
Sourcepub fn maybe_export_and_exit(&self) -> Result<bool>
pub fn maybe_export_and_exit(&self) -> Result<bool>
Export schematic and return true when schematic mode is active.
Use this once after circuit construction and before server/custom loops:
let axon = build_axon();
if axon.maybe_export_and_exit()? {
return Ok(());
}
// Normal runtime path...Sourcepub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> Result<bool>where
F: FnOnce(&SchematicExportRequest),
pub fn maybe_export_and_exit_with<F>(&self, on_before_exit: F) -> Result<bool>where
F: FnOnce(&SchematicExportRequest),
Same as Self::maybe_export_and_exit but allows a custom hook right before export/exit.
This is useful when your app has custom loop/bootstrap behavior and you want to skip or cleanup that logic in schematic mode.
Sourcepub fn export_schematic(&self, request: &SchematicExportRequest) -> Result<()>
pub fn export_schematic(&self, request: &SchematicExportRequest) -> Result<()>
Export schematic according to the provided request.