pub struct CheckpointingRunner<B> { /* private fields */ }Expand description
A single-process workflow runner with checkpointing for crash recovery.
CheckpointingRunner executes an entire workflow within one process,
saving snapshots after each task. Fork branches run concurrently as tokio tasks.
If the process crashes, the workflow can be resumed from the last checkpoint.
§When to Use
- Single-node execution: One process runs the entire workflow
- Crash recovery: Resume from the last completed task after restart
- Simple deployment: No coordination between workers needed
For horizontal scaling with multiple workers, use PooledWorker.
§Example
let backend = InMemoryBackend::new();
let runner = CheckpointingRunner::new(backend);
let ctx = WorkflowContext::new("my-workflow", Arc::new(JsonCodec), Arc::new(()));
let workflow = WorkflowBuilder::new(ctx)
.then("step1", |i: u32| async move { Ok(i + 1) })
.build()?;
// Run workflow - snapshots are saved automatically
let status = runner.run(&workflow, "instance-123", 1u32).await?;
// Resume from checkpoint if needed (e.g., after crash)
let status = runner.resume(&workflow, "instance-123").await?;Implementations§
Source§impl<B> CheckpointingRunner<B>where
B: PersistentBackend,
impl<B> CheckpointingRunner<B>where
B: PersistentBackend,
Sourcepub async fn cancel(
&self,
instance_id: &str,
reason: Option<String>,
cancelled_by: Option<String>,
) -> Result<(), RuntimeError>
pub async fn cancel( &self, instance_id: &str, reason: Option<String>, cancelled_by: Option<String>, ) -> Result<(), RuntimeError>
Request cancellation of a workflow.
This requests cancellation of the specified workflow instance. The workflow will be cancelled at the next task boundary.
§Parameters
instance_id: The workflow instance ID to cancelreason: Optional reason for the cancellationcancelled_by: Optional identifier of who requested the cancellation
§Errors
Returns an error if the workflow cannot be cancelled (not found or in terminal state).
Sourcepub async fn pause(
&self,
instance_id: &str,
reason: Option<String>,
paused_by: Option<String>,
) -> Result<(), RuntimeError>
pub async fn pause( &self, instance_id: &str, reason: Option<String>, paused_by: Option<String>, ) -> Result<(), RuntimeError>
Request pausing of a workflow.
The workflow will be paused at the next task boundary.
§Errors
Returns an error if the backend fails to store the pause request.
Sourcepub async fn unpause(
&self,
instance_id: &str,
) -> Result<WorkflowSnapshot, RuntimeError>
pub async fn unpause( &self, instance_id: &str, ) -> Result<WorkflowSnapshot, RuntimeError>
Unpause a paused workflow and return the updated snapshot.
Transitions the workflow from Paused back to InProgress.
§Errors
Returns an error if the backend fails to unpause the workflow.
Source§impl<B> CheckpointingRunner<B>where
B: PersistentBackend + 'static,
impl<B> CheckpointingRunner<B>where
B: PersistentBackend + 'static,
Sourcepub async fn run<C, Input, M>(
&self,
workflow: &Workflow<C, Input, M>,
instance_id: impl Into<String>,
input: Input,
) -> Result<WorkflowStatus, RuntimeError>where
Input: Send + 'static,
M: Send + Sync + 'static,
C: Codec + EncodeValue<Input> + DecodeValue<Input> + 'static,
pub async fn run<C, Input, M>(
&self,
workflow: &Workflow<C, Input, M>,
instance_id: impl Into<String>,
input: Input,
) -> Result<WorkflowStatus, RuntimeError>where
Input: Send + 'static,
M: Send + Sync + 'static,
C: Codec + EncodeValue<Input> + DecodeValue<Input> + 'static,
Run a workflow from the beginning, saving checkpoints after each task.
The instance_id uniquely identifies this workflow execution instance.
If a snapshot with this ID already exists, it will be overwritten.
§Errors
Returns an error if the workflow cannot be executed or if snapshot operations fail.
Sourcepub async fn resume<'w, C, Input, M>(
&self,
workflow: &'w Workflow<C, Input, M>,
instance_id: &str,
) -> Result<WorkflowStatus, RuntimeError>where
Input: Send + 'static,
M: Send + Sync + 'static,
C: Codec + DecodeValue<Input> + EncodeValue<Input> + 'static,
pub async fn resume<'w, C, Input, M>(
&self,
workflow: &'w Workflow<C, Input, M>,
instance_id: &str,
) -> Result<WorkflowStatus, RuntimeError>where
Input: Send + 'static,
M: Send + Sync + 'static,
C: Codec + DecodeValue<Input> + EncodeValue<Input> + 'static,
Resume a workflow from a saved snapshot.
Loads the snapshot for the given instance ID and continues execution from the last checkpoint.
§Errors
Returns an error if:
- The snapshot is not found
- The workflow definition hash doesn’t match (workflow definition changed)
- The workflow cannot be resumed
Auto Trait Implementations§
impl<B> Freeze for CheckpointingRunner<B>
impl<B> RefUnwindSafe for CheckpointingRunner<B>where
B: RefUnwindSafe,
impl<B> Send for CheckpointingRunner<B>
impl<B> Sync for CheckpointingRunner<B>
impl<B> Unpin for CheckpointingRunner<B>
impl<B> UnsafeUnpin for CheckpointingRunner<B>
impl<B> UnwindSafe for CheckpointingRunner<B>where
B: RefUnwindSafe,
Blanket Implementations§
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
Source§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
Source§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
Source§unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
Source§fn resolve_niched(out: Place<NichedOption<T, N1>>)
fn resolve_niched(out: Place<NichedOption<T, N1>>)
out indicating that a T is niched.