pub trait Worker: Send + Sync {
// Required methods
fn id(&self) -> &str;
fn process<'life0, 'async_trait>(
&'life0 self,
message: ReceivedMessage<Value>,
) -> Pin<Box<dyn Future<Output = WorkerResult<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
// Provided methods
fn name(&self) -> String { ... }
fn setup<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = WorkerResult<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn teardown<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn concurrency_limit(&self) -> Option<usize> { ... }
fn restart_backoff_strategy(&self) -> BackoffStrategy { ... }
fn processing_timeout(&self) -> Option<Duration> { ... }
}Expand description
Core worker interface that all workers must implement.
Workers are responsible for processing individual messages from a backend.
They can be supervised by foxtive-supervisor for automatic restarts on failure.
§Two-Level Backoff System
Foxtive Worker uses two independent backoff strategies:
-
Worker Restart Backoff (this trait): Controls how quickly a crashed/failed worker is restarted by the supervisor. This handles worker-level failures like panics, setup failures, or connection losses.
-
Message Retry Backoff (RetryHandler middleware): Controls delays between retry attempts for individual failed messages. This is configured separately in the middleware pipeline.
§Example
use foxtive_worker::{Worker, ReceivedMessage};
use foxtive_worker::error::WorkerResult;
use async_trait::async_trait;
struct MyWorker;
#[async_trait]
impl Worker for MyWorker {
fn id(&self) -> &str { "my-worker" }
async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
println!("Processing message: {}", message.message.id);
// Your processing logic here
message.ack().await?;
Ok(())
}
}Required Methods§
Sourcefn id(&self) -> &str
fn id(&self) -> &str
Unique worker identifier.
This should be stable across restarts and unique within a worker pool.
Sourcefn process<'life0, 'async_trait>(
&'life0 self,
message: ReceivedMessage<Value>,
) -> Pin<Box<dyn Future<Output = WorkerResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn process<'life0, 'async_trait>(
&'life0 self,
message: ReceivedMessage<Value>,
) -> Pin<Box<dyn Future<Output = WorkerResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Process a single message.
This is the core method where message processing logic is implemented. The worker should acknowledge the message on success or negative-acknowledge on failure.
§Arguments
message- The received message with acknowledgment capability
§Returns
Ok(())if processing succeededErr(WorkerError)if processing failed
Provided Methods§
Sourcefn name(&self) -> String
fn name(&self) -> String
Human-readable name for the worker.
Used for logging and monitoring. Defaults to the worker ID if not overridden.
Sourcefn setup<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = WorkerResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn setup<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = WorkerResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Optional setup before worker starts.
This method is called once when the worker is initialized, before any messages are processed. Use it for one-time initialization like establishing connections or loading configuration.
§Returns
Ok(())if setup succeededErr(WorkerError)if setup failed (worker will not start)
Sourcefn teardown<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn teardown<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Optional cleanup on shutdown.
This method is called when the worker is being shut down gracefully. Use it to release resources, close connections, or flush buffers.
Sourcefn concurrency_limit(&self) -> Option<usize>
fn concurrency_limit(&self) -> Option<usize>
Concurrency limit for this worker.
If Some(n), the worker will process at most n messages concurrently.
If None, there is no limit (bounded only by system resources).
This is useful for preventing resource exhaustion when processing expensive messages.
Sourcefn restart_backoff_strategy(&self) -> BackoffStrategy
fn restart_backoff_strategy(&self) -> BackoffStrategy
Backoff strategy for worker-level restarts.
This controls how quickly a crashed or failed worker is restarted by the supervisor. It is independent of message-level retry backoff (which is handled by the RetryHandler middleware).
§When This Applies
- Worker panics during message processing
- Worker setup fails
- Worker encounters unrecoverable errors
- Supervisor detects worker health check failures
§When This Does NOT Apply
- Individual message processing failures (handled by RetryHandler)
- Graceful worker shutdown
§Default
Exponential backoff starting at 1 second, max 60 seconds, multiplier 2.0
Sourcefn processing_timeout(&self) -> Option<Duration>
fn processing_timeout(&self) -> Option<Duration>
Optional processing timeout for individual messages.
If Some(duration), each message processed by this worker will have a maximum
processing time enforced. If processing exceeds this timeout, the message will
be negative-acknowledged (nacked) with requeue, preventing RabbitMQ consumer
timeout errors.
This is especially important when:
- Processing times are variable or unpredictable
- External dependencies (APIs, databases) might be slow
- You want to fail fast rather than wait for broker timeouts
§Relationship to Broker Timeouts
Set this to be less than your broker’s consumer timeout to ensure graceful handling before the broker kills the connection.
Example: If RabbitMQ has a 30-second consumer timeout, set this to 25 seconds.
§Default
None - No timeout enforcement (relies on broker timeouts)
§Example
use foxtive_worker::Worker;
use std::time::Duration;
struct SlowWorker;
#[async_trait::async_trait]
impl Worker for SlowWorker {
fn id(&self) -> &str { "slow-worker" }
// This worker can take up to 2 minutes per message
fn processing_timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(120))
}
async fn process(&self, message: foxtive_worker::ReceivedMessage<serde_json::Value>)
-> foxtive_worker::error::WorkerResult<()> {
// Long-running processing...
Ok(())
}
}Dyn Compatibility§
This trait is dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety".