Skip to main content

PooledWorker

Struct PooledWorker 

Source
pub struct PooledWorker<B> { /* private fields */ }
Expand description

A pooled worker that claims and executes tasks from a shared backend.

PooledWorker is designed for horizontal scaling: multiple workers can run across different machines/processes, all polling the same backend for tasks. Task claiming with TTL prevents duplicate execution while allowing automatic recovery when workers crash.

§When to Use

  • Horizontal scaling: Multiple workers process tasks concurrently
  • Fault tolerance: Failed workers’ tasks are automatically reclaimed
  • Load balancing: Tasks distributed across available workers

For single-process execution with checkpointing, use CheckpointingRunner.

§Example

let backend = InMemoryBackend::new();
let registry = TaskRegistry::new();
let worker = PooledWorker::new("worker-1", backend, registry);

let ctx = WorkflowContext::new("my-wf", Arc::new(JsonCodec), Arc::new(()));
let workflow = WorkflowBuilder::new(ctx)
    .then("step1", |i: u32| async move { Ok(i + 1) })
    .build()?;
let workflows = vec![(workflow.definition_hash().to_string(), Arc::new(workflow))];

// Spawn the worker and get a handle for lifecycle control
let handle = worker.spawn(Duration::from_secs(1), workflows);
// ... later ...
handle.shutdown();
handle.join().await?;

Implementations§

Source§

impl<B> PooledWorker<B>
where B: PersistentBackend + TaskClaimStore + 'static,

Source

pub fn new( worker_id: impl Into<String>, backend: B, registry: TaskRegistry, ) -> Self

Create a new worker node.

§Parameters
  • worker_id: Unique identifier for this worker node
  • backend: The persistent backend to use
  • registry: Task registry containing all task implementations
§Heartbeat

Heartbeats are derived automatically from claim_ttl (TTL / 2). With the default 5-minute TTL, heartbeats fire every 2.5 minutes.

Source

pub fn with_claim_ttl(self, ttl: Option<Duration>) -> Self

Set the TTL for task claims.

Source

pub fn with_batch_size(self, size: NonZeroUsize) -> Self

Set the number of tasks to fetch per poll (default: 1).

With batch_size=1, the worker fetches one task, executes it, then polls again. Other workers can pick up remaining tasks immediately.

Higher values reduce polling overhead but may cause workers to hold task IDs they won’t process immediately (though other workers can still claim them).

Source

pub async fn cancel_workflow( &self, instance_id: &str, reason: Option<String>, cancelled_by: Option<String>, ) -> Result<(), RuntimeError>

Request cancellation of a workflow.

This requests cancellation of the specified workflow instance. Running tasks will complete, but no new tasks will be started.

§Parameters
  • instance_id: The workflow instance ID to cancel
  • reason: Optional reason for the cancellation
  • cancelled_by: Optional identifier of who requested the cancellation
§Errors

Returns an error if the workflow cannot be cancelled (not found or in terminal state).

Source

pub async fn pause_workflow( &self, instance_id: &str, reason: Option<String>, paused_by: Option<String>, ) -> Result<(), RuntimeError>

Request pausing of a workflow.

This requests pausing of the specified workflow instance. Running tasks will complete, but no new tasks will be started.

§Parameters
  • instance_id: The workflow instance ID to pause
  • reason: Optional reason for the pause
  • paused_by: Optional identifier of who requested the pause
§Errors

Returns an error if the workflow cannot be paused (not found or in terminal/paused state).

Source

pub fn backend(&self) -> &Arc<B>

Get a reference to the backend.

Source

pub fn spawn<C, Input, M>( self, poll_interval: Duration, workflows: WorkflowRegistry<C, Input, M>, ) -> WorkerHandle<B>
where Input: Send + Sync + 'static, M: Send + Sync + 'static, C: Codec + DecodeValue<Input> + EncodeValue<Input> + 'static,

Spawn the worker as a background task and return a handle.

Consumes self, creates an internal command channel, and spawns the actor loop on the Tokio runtime. Returns a cloneable WorkerHandle for lifecycle control — call WorkerHandle::shutdown to request graceful shutdown and [WorkerHandle::wait] to await completion.

The worker runs until:

§Parameters
  • poll_interval: How often to poll for new tasks
  • workflows: Map of workflow definition hash to workflow
Source

pub async fn execute_task<C, Input, M>( &self, workflow: &Workflow<C, Input, M>, available_task: AvailableTask, ) -> Result<WorkflowStatus, RuntimeError>
where Input: Send + 'static, M: Send + Sync + 'static, C: Codec + DecodeValue<Input> + EncodeValue<Input> + 'static,

Execute a single task from an available task.

This claims the task, executes it, updates the snapshot, and releases the claim.

§Errors

Returns an error if:

  • The task cannot be claimed
  • The workflow definition hash doesn’t match
  • Task execution fails
  • Snapshot update fails

Auto Trait Implementations§

§

impl<B> Freeze for PooledWorker<B>

§

impl<B> !RefUnwindSafe for PooledWorker<B>

§

impl<B> Send for PooledWorker<B>
where B: Sync + Send,

§

impl<B> Sync for PooledWorker<B>
where B: Sync + Send,

§

impl<B> Unpin for PooledWorker<B>

§

impl<B> UnsafeUnpin for PooledWorker<B>

§

impl<B> !UnwindSafe for PooledWorker<B>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> ArchivePointee for T

Source§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
Source§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> LayoutRaw for T

Source§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Returns the layout of the type.
Source§

impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
where T: SharedNiching<N1, N2>, N1: Niching<T>, N2: Niching<T>,

Source§

unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool

Returns whether the given value has been niched. Read more
Source§

fn resolve_niched(out: Place<NichedOption<T, N1>>)

Writes data to out indicating that a T is niched.
Source§

impl<T> Pointee for T

Source§

type Metadata = ()

The metadata type for pointers and references to this type.
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more