Skip to main content

PooledWorker

Struct PooledWorker 

Source
pub struct PooledWorker<B> { /* private fields */ }
Expand description

A pooled worker that claims and executes tasks from a shared backend.

PooledWorker is designed for horizontal scaling: multiple workers can run across different machines/processes, all polling the same backend for tasks. Task claiming with TTL prevents duplicate execution while allowing automatic recovery when workers crash.

§When to Use

  • Horizontal scaling: Multiple workers process tasks concurrently
  • Fault tolerance: Failed workers’ tasks are automatically reclaimed
  • Load balancing: Tasks distributed across available workers

For single-process execution with checkpointing, use CheckpointingRunner.

§Example

let backend = InMemoryBackend::new();
let registry = TaskRegistry::new();
let worker = PooledWorker::new("worker-1", backend, registry);

let ctx = WorkflowContext::new("my-wf", Arc::new(JsonCodec), Arc::new(()));
let workflow = WorkflowBuilder::new(ctx)
    .then("step1", |i: u32| async move { Ok(i + 1) })
    .build()?;
let workflows = vec![(workflow.definition_hash().to_string(), Arc::new(workflow))];

// Spawn the worker and get a handle for lifecycle control
let handle = worker.spawn(Duration::from_secs(1), workflows);
// ... later ...
handle.shutdown();
handle.join().await?;

Implementations§

Source§

impl<B> PooledWorker<B>
where B: PersistentBackend + TaskClaimStore + 'static,

Source

pub fn new( worker_id: impl Into<String>, backend: B, registry: TaskRegistry, ) -> Self

Create a new worker node.

§Parameters
  • worker_id: Unique identifier for this worker node
  • backend: The persistent backend to use
  • registry: Task registry containing all task implementations
§Heartbeat

Heartbeats are derived automatically from claim_ttl (TTL / 2). With the default 5-minute TTL, heartbeats fire every 2.5 minutes.

Source

pub fn with_claim_ttl(self, ttl: Option<Duration>) -> Self

Set the TTL for task claims.

Source

pub fn with_aging_interval(self, interval: Duration) -> Self

Set the aging interval for priority-based scheduling.

Lower-priority tasks that have been waiting longer than this interval get their effective priority boosted, preventing starvation. Default: 5 minutes (300 seconds).

§Panics

Panics if interval is zero.

Source

pub fn with_batch_size(self, size: NonZeroUsize) -> Self

Set the number of tasks to fetch per poll (default: 1).

With batch_size=1, the worker fetches one task, executes it, then polls again. Other workers can pick up remaining tasks immediately.

Higher values reduce polling overhead but may cause workers to hold task IDs they won’t process immediately (though other workers can still claim them).

Source

pub fn with_tags(self, tags: Vec<String>) -> Self

Set affinity tags for this worker.

When tags are set, the worker only picks up tasks whose tags are a subset of the worker’s tags (or tasks with no tags). When no tags are set (the default), the worker accepts all tasks.

Source

pub async fn cancel_workflow( &self, instance_id: &str, reason: Option<String>, cancelled_by: Option<String>, ) -> Result<(), RuntimeError>

Request cancellation of a workflow.

This requests cancellation of the specified workflow instance. Running tasks will complete, but no new tasks will be started.

§Parameters
  • instance_id: The workflow instance ID to cancel
  • reason: Optional reason for the cancellation
  • cancelled_by: Optional identifier of who requested the cancellation
§Errors

Returns an error if the workflow cannot be cancelled (not found or in terminal state).

Source

pub async fn pause_workflow( &self, instance_id: &str, reason: Option<String>, paused_by: Option<String>, ) -> Result<(), RuntimeError>

Request pausing of a workflow.

This requests pausing of the specified workflow instance. Running tasks will complete, but no new tasks will be started.

§Parameters
  • instance_id: The workflow instance ID to pause
  • reason: Optional reason for the pause
  • paused_by: Optional identifier of who requested the pause
§Errors

Returns an error if the workflow cannot be paused (not found or in terminal/paused state).

Source

pub fn backend(&self) -> &Arc<B>

Get a reference to the backend.

Source

pub fn spawn<C, Input, M>( self, poll_interval: Duration, workflows: WorkflowRegistry<C, Input, M>, ) -> WorkerHandle<B>
where Input: Send + Sync + 'static, M: Send + Sync + 'static, C: Codec + EnvelopeCodec + DecodeValue<Input> + EncodeValue<Input> + 'static,

Spawn the worker as a background task and return a handle.

Consumes self, creates an internal command channel, and spawns the actor loop on the Tokio runtime. Returns a cloneable WorkerHandle for lifecycle control — call WorkerHandle::shutdown to request graceful shutdown and WorkerHandle::join to await completion.

The worker runs until:

§Parameters
  • poll_interval: How often to poll for new tasks
  • workflows: Map of workflow definition hash to workflow
Source

pub fn spawn_with_executor( self, poll_interval: Duration, workflows: WorkflowIndex, executor: ExternalTaskExecutor, ) -> WorkerHandle<B>

Spawn the worker with an external executor and return a handle.

Like spawn but instead of executing tasks via typed Workflow closures, delegates all task execution to the provided executor. This is used by language bindings (Python, Node.js) where task functions live in the host language.

§Parameters
  • poll_interval: How often to poll for new tasks
  • workflows: Workflow definitions (hash + continuation tree)
  • executor: Closure that executes a task by ID given input bytes
Source

pub async fn execute_task<C, Input, M>( &self, workflow: &Workflow<C, Input, M>, available_task: AvailableTask, ) -> Result<WorkflowStatus, RuntimeError>
where Input: Send + 'static, M: Send + Sync + 'static, C: Codec + EnvelopeCodec + DecodeValue<Input> + EncodeValue<Input> + 'static,

Execute a single task from an available task.

This claims the task, executes it, updates the snapshot, and releases the claim.

§Errors

Returns an error if:

  • The task cannot be claimed
  • The workflow definition hash doesn’t match
  • Task execution fails
  • Snapshot update fails
Source

pub fn builder(backend: B, registry: TaskRegistry) -> PooledWorkerBuilder<B>

Create a builder with sensible defaults.

By default, the worker ID is derived from {hostname}-{pid}. Override with PooledWorkerBuilder::worker_id.

§Example
// Auto-generated worker ID from hostname + PID
let worker = PooledWorker::builder(InMemoryBackend::new(), TaskRegistry::new()).build();

// Or override with explicit ID
let worker = PooledWorker::builder(InMemoryBackend::new(), TaskRegistry::new())
    .worker_id("custom-worker-1")
    .build();

Auto Trait Implementations§

§

impl<B> Freeze for PooledWorker<B>

§

impl<B> !RefUnwindSafe for PooledWorker<B>

§

impl<B> Send for PooledWorker<B>
where B: Sync + Send,

§

impl<B> Sync for PooledWorker<B>
where B: Sync + Send,

§

impl<B> Unpin for PooledWorker<B>

§

impl<B> UnsafeUnpin for PooledWorker<B>

§

impl<B> !UnwindSafe for PooledWorker<B>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> ArchivePointee for T

Source§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
Source§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> LayoutRaw for T

Source§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Returns the layout of the type.
Source§

impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
where T: SharedNiching<N1, N2>, N1: Niching<T>, N2: Niching<T>,

Source§

unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool

Returns whether the given value has been niched. Read more
Source§

fn resolve_niched(out: Place<NichedOption<T, N1>>)

Writes data to out indicating that a T is niched.
Source§

impl<T> Pointee for T

Source§

type Metadata = ()

The metadata type for pointers and references to this type.
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more