pub struct CheckpointingRunner<B> { /* private fields */ }Expand description
A single-process workflow runner with checkpointing for crash recovery.
CheckpointingRunner executes an entire workflow within one process,
saving snapshots after each task. Fork branches run concurrently as tokio tasks.
If the process crashes, the workflow can be resumed from the last checkpoint.
§When to Use
- Single-node execution: One process runs the entire workflow
- Crash recovery: Resume from the last completed task after restart
- Simple deployment: No coordination between workers needed
For horizontal scaling with multiple workers, use PooledWorker.
§Example
let backend = InMemoryBackend::new();
let runner = CheckpointingRunner::new(backend);
let ctx = WorkflowContext::new("my-workflow", Arc::new(JsonCodec), Arc::new(()));
let workflow = WorkflowBuilder::new(ctx)
.then("step1", |i: u32| async move { Ok(i + 1) })
.build()?;
// Run workflow - snapshots are saved automatically
let status = runner.run(&workflow, "instance-123", 1u32).await?;
// Resume from checkpoint if needed (e.g., after crash)
let status = runner.resume(&workflow, "instance-123").await?;Implementations§
Source§impl<B> CheckpointingRunner<B>where
B: PersistentBackend,
impl<B> CheckpointingRunner<B>where
B: PersistentBackend,
Create a runner from a shared backend reference.
Useful when the same backend is shared with a WorkflowClient.
Sourcepub fn with_conflict_policy(self, policy: ConflictPolicy) -> Self
pub fn with_conflict_policy(self, policy: ConflictPolicy) -> Self
Set the conflict policy for duplicate instance IDs.
Source§impl<B> CheckpointingRunner<B>where
B: PersistentBackend + 'static,
impl<B> CheckpointingRunner<B>where
B: PersistentBackend + 'static,
Sourcepub async fn run<C, Input, M>(
&self,
workflow: &Workflow<C, Input, M>,
instance_id: impl Into<String>,
input: Input,
) -> Result<WorkflowStatus, RuntimeError>where
Input: Send + 'static,
M: Send + Sync + 'static,
C: Codec + EnvelopeCodec + EncodeValue<Input> + DecodeValue<Input> + 'static,
pub async fn run<C, Input, M>(
&self,
workflow: &Workflow<C, Input, M>,
instance_id: impl Into<String>,
input: Input,
) -> Result<WorkflowStatus, RuntimeError>where
Input: Send + 'static,
M: Send + Sync + 'static,
C: Codec + EnvelopeCodec + EncodeValue<Input> + DecodeValue<Input> + 'static,
Run a workflow from the beginning, saving checkpoints after each task.
The instance_id uniquely identifies this workflow execution instance.
The ConflictPolicy (set via with_conflict_policy)
controls behaviour when a snapshot with this ID already exists.
§Errors
Returns an error if the workflow cannot be executed, if snapshot operations fail, or if the conflict policy rejects a duplicate.
Sourcepub async fn resume<'w, C, Input, M>(
&self,
workflow: &'w Workflow<C, Input, M>,
instance_id: &str,
) -> Result<WorkflowStatus, RuntimeError>where
Input: Send + 'static,
M: Send + Sync + 'static,
C: Codec + EnvelopeCodec + DecodeValue<Input> + EncodeValue<Input> + 'static,
pub async fn resume<'w, C, Input, M>(
&self,
workflow: &'w Workflow<C, Input, M>,
instance_id: &str,
) -> Result<WorkflowStatus, RuntimeError>where
Input: Send + 'static,
M: Send + Sync + 'static,
C: Codec + EnvelopeCodec + DecodeValue<Input> + EncodeValue<Input> + 'static,
Resume a workflow from a saved snapshot.
Loads the snapshot for the given instance ID and continues execution from the last checkpoint.
§Errors
Returns an error if:
- The snapshot is not found
- The workflow definition hash doesn’t match (workflow definition changed)
- The workflow cannot be resumed
Auto Trait Implementations§
impl<B> Freeze for CheckpointingRunner<B>
impl<B> RefUnwindSafe for CheckpointingRunner<B>where
B: RefUnwindSafe,
impl<B> Send for CheckpointingRunner<B>
impl<B> Sync for CheckpointingRunner<B>
impl<B> Unpin for CheckpointingRunner<B>
impl<B> UnsafeUnpin for CheckpointingRunner<B>
impl<B> UnwindSafe for CheckpointingRunner<B>where
B: RefUnwindSafe,
Blanket Implementations§
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::RequestSource§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
Source§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
Source§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
Source§unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
Source§fn resolve_niched(out: Place<NichedOption<T, N1>>)
fn resolve_niched(out: Place<NichedOption<T, N1>>)
out indicating that a T is niched.