Skip to main content

Worker

Trait Worker 

Source
pub trait Worker: Send + Sync {
    // Required methods
    fn id(&self) -> &str;
    fn process<'life0, 'async_trait>(
        &'life0 self,
        message: ReceivedMessage<Value>,
    ) -> Pin<Box<dyn Future<Output = WorkerResult<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;

    // Provided methods
    fn name(&self) -> String { ... }
    fn setup<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = WorkerResult<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
    fn teardown<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
    fn concurrency_limit(&self) -> Option<usize> { ... }
    fn restart_backoff_strategy(&self) -> BackoffStrategy { ... }
    fn processing_timeout(&self) -> Option<Duration> { ... }
}
Expand description

Core worker interface that all workers must implement.

Workers are responsible for processing individual messages from a backend. They can be supervised by foxtive-supervisor for automatic restarts on failure.

§Two-Level Backoff System

Foxtive Worker uses two independent backoff strategies:

  1. Worker Restart Backoff (this trait): Controls how quickly a crashed/failed worker is restarted by the supervisor. This handles worker-level failures like panics, setup failures, or connection losses.

  2. Message Retry Backoff (RetryHandler middleware): Controls delays between retry attempts for individual failed messages. This is configured separately in the middleware pipeline.

§Example

use foxtive_worker::{Worker, ReceivedMessage};
use foxtive_worker::error::WorkerResult;
use async_trait::async_trait;

struct MyWorker;

#[async_trait]
impl Worker for MyWorker {
    fn id(&self) -> &str { "my-worker" }
     
    async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
        println!("Processing message: {}", message.message.id);
        // Your processing logic here
        message.ack().await?;
        Ok(())
    }
}

Required Methods§

Source

fn id(&self) -> &str

Unique worker identifier.

This should be stable across restarts and unique within a worker pool.

Source

fn process<'life0, 'async_trait>( &'life0 self, message: ReceivedMessage<Value>, ) -> Pin<Box<dyn Future<Output = WorkerResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Process a single message.

This is the core method where message processing logic is implemented. The worker should acknowledge the message on success or negative-acknowledge on failure.

§Arguments
  • message - The received message with acknowledgment capability
§Returns
  • Ok(()) if processing succeeded
  • Err(WorkerError) if processing failed

Provided Methods§

Source

fn name(&self) -> String

Human-readable name for the worker.

Used for logging and monitoring. Defaults to the worker ID if not overridden.

Source

fn setup<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = WorkerResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Optional setup before worker starts.

This method is called once when the worker is initialized, before any messages are processed. Use it for one-time initialization like establishing connections or loading configuration.

§Returns
  • Ok(()) if setup succeeded
  • Err(WorkerError) if setup failed (worker will not start)
Source

fn teardown<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Optional cleanup on shutdown.

This method is called when the worker is being shut down gracefully. Use it to release resources, close connections, or flush buffers.

Source

fn concurrency_limit(&self) -> Option<usize>

Concurrency limit for this worker.

If Some(n), the worker will process at most n messages concurrently. If None, there is no limit (bounded only by system resources).

This is useful for preventing resource exhaustion when processing expensive messages.

Source

fn restart_backoff_strategy(&self) -> BackoffStrategy

Backoff strategy for worker-level restarts.

This controls how quickly a crashed or failed worker is restarted by the supervisor. It is independent of message-level retry backoff (which is handled by the RetryHandler middleware).

§When This Applies
  • Worker panics during message processing
  • Worker setup fails
  • Worker encounters unrecoverable errors
  • Supervisor detects worker health check failures
§When This Does NOT Apply
  • Individual message processing failures (handled by RetryHandler)
  • Graceful worker shutdown
§Default

Exponential backoff starting at 1 second, max 60 seconds, multiplier 2.0

Source

fn processing_timeout(&self) -> Option<Duration>

Optional processing timeout for individual messages.

If Some(duration), each message processed by this worker will have a maximum processing time enforced. If processing exceeds this timeout, the message will be negative-acknowledged (nacked) with requeue, preventing RabbitMQ consumer timeout errors.

This is especially important when:

  • Processing times are variable or unpredictable
  • External dependencies (APIs, databases) might be slow
  • You want to fail fast rather than wait for broker timeouts
§Relationship to Broker Timeouts

Set this to be less than your broker’s consumer timeout to ensure graceful handling before the broker kills the connection.

Example: If RabbitMQ has a 30-second consumer timeout, set this to 25 seconds.

§Default

None - No timeout enforcement (relies on broker timeouts)

§Example
use foxtive_worker::Worker;
use std::time::Duration;

struct SlowWorker;

#[async_trait::async_trait]
impl Worker for SlowWorker {
    fn id(&self) -> &str { "slow-worker" }
     
    // This worker can take up to 2 minutes per message
    fn processing_timeout(&self) -> Option<Duration> {
        Some(Duration::from_secs(120))
    }
     
    async fn process(&self, message: foxtive_worker::ReceivedMessage<serde_json::Value>)
        -> foxtive_worker::error::WorkerResult<()> {
        // Long-running processing...
        Ok(())
    }
}

Dyn Compatibility§

This trait is dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety".

Implementors§