pub struct PooledWorker<B> { /* private fields */ }Expand description
A pooled worker that claims and executes tasks from a shared backend.
PooledWorker is designed for horizontal scaling: multiple workers can run
across different machines/processes, all polling the same backend for tasks.
Task claiming with TTL prevents duplicate execution while allowing automatic
recovery when workers crash.
§When to Use
- Horizontal scaling: Multiple workers process tasks concurrently
- Fault tolerance: Failed workers’ tasks are automatically reclaimed
- Load balancing: Tasks distributed across available workers
For single-process execution with checkpointing, use
CheckpointingRunner.
§Example
let backend = InMemoryBackend::new();
let registry = TaskRegistry::new();
let worker = PooledWorker::new("worker-1", backend, registry);
let ctx = WorkflowContext::new("my-wf", Arc::new(JsonCodec), Arc::new(()));
let workflow = WorkflowBuilder::new(ctx)
.then("step1", |i: u32| async move { Ok(i + 1) })
.build()?;
let workflows = vec![(workflow.definition_hash().to_string(), Arc::new(workflow))];
// Spawn the worker and get a handle for lifecycle control
let handle = worker.spawn(Duration::from_secs(1), workflows);
// ... later ...
handle.shutdown();
handle.join().await?;Implementations§
Source§impl<B> PooledWorker<B>where
B: PersistentBackend + TaskClaimStore + 'static,
impl<B> PooledWorker<B>where
B: PersistentBackend + TaskClaimStore + 'static,
Sourcepub fn new(
worker_id: impl Into<String>,
backend: B,
registry: TaskRegistry,
) -> Self
pub fn new( worker_id: impl Into<String>, backend: B, registry: TaskRegistry, ) -> Self
Create a new worker node.
§Parameters
worker_id: Unique identifier for this worker nodebackend: The persistent backend to useregistry: Task registry containing all task implementations
§Heartbeat
Heartbeats are derived automatically from claim_ttl (TTL / 2).
With the default 5-minute TTL, heartbeats fire every 2.5 minutes.
Sourcepub fn with_claim_ttl(self, ttl: Option<Duration>) -> Self
pub fn with_claim_ttl(self, ttl: Option<Duration>) -> Self
Set the TTL for task claims.
Sourcepub fn with_batch_size(self, size: NonZeroUsize) -> Self
pub fn with_batch_size(self, size: NonZeroUsize) -> Self
Set the number of tasks to fetch per poll (default: 1).
With batch_size=1, the worker fetches one task, executes it, then polls again.
Other workers can pick up remaining tasks immediately.
Higher values reduce polling overhead but may cause workers to hold task IDs they won’t process immediately (though other workers can still claim them).
Sourcepub async fn cancel_workflow(
&self,
instance_id: &str,
reason: Option<String>,
cancelled_by: Option<String>,
) -> Result<(), RuntimeError>
pub async fn cancel_workflow( &self, instance_id: &str, reason: Option<String>, cancelled_by: Option<String>, ) -> Result<(), RuntimeError>
Request cancellation of a workflow.
This requests cancellation of the specified workflow instance. Running tasks will complete, but no new tasks will be started.
§Parameters
instance_id: The workflow instance ID to cancelreason: Optional reason for the cancellationcancelled_by: Optional identifier of who requested the cancellation
§Errors
Returns an error if the workflow cannot be cancelled (not found or in terminal state).
Sourcepub async fn pause_workflow(
&self,
instance_id: &str,
reason: Option<String>,
paused_by: Option<String>,
) -> Result<(), RuntimeError>
pub async fn pause_workflow( &self, instance_id: &str, reason: Option<String>, paused_by: Option<String>, ) -> Result<(), RuntimeError>
Request pausing of a workflow.
This requests pausing of the specified workflow instance. Running tasks will complete, but no new tasks will be started.
§Parameters
instance_id: The workflow instance ID to pausereason: Optional reason for the pausepaused_by: Optional identifier of who requested the pause
§Errors
Returns an error if the workflow cannot be paused (not found or in terminal/paused state).
Sourcepub fn spawn<C, Input, M>(
self,
poll_interval: Duration,
workflows: WorkflowRegistry<C, Input, M>,
) -> WorkerHandle<B>where
Input: Send + Sync + 'static,
M: Send + Sync + 'static,
C: Codec + DecodeValue<Input> + EncodeValue<Input> + 'static,
pub fn spawn<C, Input, M>(
self,
poll_interval: Duration,
workflows: WorkflowRegistry<C, Input, M>,
) -> WorkerHandle<B>where
Input: Send + Sync + 'static,
M: Send + Sync + 'static,
C: Codec + DecodeValue<Input> + EncodeValue<Input> + 'static,
Spawn the worker as a background task and return a handle.
Consumes self, creates an internal command channel, and spawns the
actor loop on the Tokio runtime. Returns a cloneable WorkerHandle
for lifecycle control — call WorkerHandle::shutdown to request
graceful shutdown and [WorkerHandle::wait] to await completion.
The worker runs until:
WorkerHandle::shutdownis called, or- All clones of the handle are dropped, or
- A fatal backend error occurs.
§Parameters
poll_interval: How often to poll for new tasksworkflows: Map of workflow definition hash to workflow
Sourcepub async fn execute_task<C, Input, M>(
&self,
workflow: &Workflow<C, Input, M>,
available_task: AvailableTask,
) -> Result<WorkflowStatus, RuntimeError>where
Input: Send + 'static,
M: Send + Sync + 'static,
C: Codec + DecodeValue<Input> + EncodeValue<Input> + 'static,
pub async fn execute_task<C, Input, M>(
&self,
workflow: &Workflow<C, Input, M>,
available_task: AvailableTask,
) -> Result<WorkflowStatus, RuntimeError>where
Input: Send + 'static,
M: Send + Sync + 'static,
C: Codec + DecodeValue<Input> + EncodeValue<Input> + 'static,
Execute a single task from an available task.
This claims the task, executes it, updates the snapshot, and releases the claim.
§Errors
Returns an error if:
- The task cannot be claimed
- The workflow definition hash doesn’t match
- Task execution fails
- Snapshot update fails
Auto Trait Implementations§
impl<B> Freeze for PooledWorker<B>
impl<B> !RefUnwindSafe for PooledWorker<B>
impl<B> Send for PooledWorker<B>
impl<B> Sync for PooledWorker<B>
impl<B> Unpin for PooledWorker<B>
impl<B> UnsafeUnpin for PooledWorker<B>
impl<B> !UnwindSafe for PooledWorker<B>
Blanket Implementations§
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
Source§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
Source§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
Source§unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
Source§fn resolve_niched(out: Place<NichedOption<T, N1>>)
fn resolve_niched(out: Place<NichedOption<T, N1>>)
out indicating that a T is niched.