foxtive-worker 0.4.0

Foxtive Worker - Background worker framework for message processing
Documentation

🦊 Foxtive Worker

A production-ready background worker framework for Rust.

Process messages from RabbitMQ, Redis Streams, or any queue system with confidence. Built-in retries, circuit breakers, dead letter queues, and observability-so you can focus on your business logic.


Table of Contents


Quick Start

Get a worker processing messages in under 5 minutes.

1. Add to Cargo.toml

[dependencies]
foxtive-worker = { version = "0.1", features = ["rabbitmq"] }
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
serde_json = "1.0"

2. Create a Worker

use foxtive_worker::{Worker, ReceivedMessage};
use foxtive_worker::error::WorkerResult;
use async_trait::async_trait;

struct MyWorker;

#[async_trait]
impl Worker for MyWorker {
    fn id(&self) -> &str { "my-worker" }
    
    async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
        println!("Got message: {:?}", message.message.payload);
        // Process your message...
        Ok(())  // Return Ok(()) and let middleware handle acknowledgment
    }
}

Note: In production, you'll typically add AckNackMiddleware to automatically acknowledge messages based on success/failure. See Adding Reliability below.

3. Run It

use foxtive_worker::{WorkerPoolBuilder, backends::MemoryBackend};
use std::sync::Arc;

#[tokio::main]
async fn main() {
    // Use in-memory backend for testing
    let backend = Arc::new(MemoryBackend::new());
    let pool = WorkerPoolBuilder::new("test-pool")
        .add_worker(MyWorker)
        .build()
        .unwrap();
    
    // Send a test message
    backend.add_message(serde_json::json!({"hello": "world"})).await.unwrap();
    
    // Process it
    if let Some(msg) = backend.receive().await.unwrap() {
        pool.dispatch(msg).await.unwrap();
    }
    
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    println!("Done!");
}

That's it! You've got a working message processor. Now let's make it production-ready.


User Guide

Follow this step-by-step guide to go from zero to production.

1. Your First Worker

Let's build something real-an email notification service.

The Problem

You have a queue of emails to send. Each message looks like:

{
  "to": "user@example.com",
  "subject": "Welcome!",
  "body": "Thanks for signing up"
}

The Solution

use foxtive_worker::{Worker, ReceivedMessage};
use foxtive_worker::error::WorkerResult;
use async_trait::async_trait;

struct EmailWorker;

#[async_trait]
impl Worker for EmailWorker {
    fn id(&self) -> &str { 
        "email-worker" 
    }
    
    async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
        // Extract data from message
        let to = message.message.payload["to"]
            .as_str()
            .ok_or_else(|| WorkerError::ProcessingFailed("Missing 'to' field".into()))?;
        
        let subject = message.message.payload["subject"]
            .as_str()
            .ok_or_else(|| WorkerError::ProcessingFailed("Missing 'subject' field".into()))?;
        
        // Send the email (your implementation here)
        send_email(to, subject).await
            .map_err(|e| WorkerError::ProcessingFailed(e.to_string()))?;
        
        // Acknowledge success
        message.ack().await?;
        Ok(())
    }
}

async fn send_email(to: &str, subject: &str) -> Result<(), Box<dyn std::error::Error>> {
    // Your SMTP logic here
    println!("Sending {} to {}", subject, to);
    Ok(())
}

Key concepts:

  • Return Ok(()) - Signals successful processing; middleware will ack the message
  • Return Err(...) - Signals failure; middleware will nack/retry based on configuration
  • Always validate your input-bad messages happen
  • Don't manually ack/nack when using AckNackMiddleware-let it handle acknowledgment automatically

Try It

#[tokio::main]
async fn main() {
    let backend = Arc::new(MemoryBackend::new());
    let pool = WorkerPoolBuilder::new("email-pool")
        .add_worker(EmailWorker)
        .build()
        .unwrap();
    
    // Simulate incoming email
    backend.add_message(serde_json::json!({
        "to": "alice@example.com",
        "subject": "Hello",
        "body": "Hi there!"
    })).await.unwrap();
    
    // Process all messages
    while let Some(msg) = backend.receive().await.unwrap() {
        pool.dispatch(msg).await.unwrap();
    }
    
    tokio::time::sleep(Duration::from_millis(100)).await;
}

2. Adding Reliability

Your first worker works, but what happens when:

  • The SMTP server is down?
  • A message is malformed?
  • Your worker crashes?

Let's add safety nets.

Automatic Retries

Messages fail sometimes. Retry them with exponential backoff:

use foxtive_worker::middleware::{RetryHandler, AckNackMiddleware};

let retry_handler = RetryHandler::default()
    .with_max_retries(3)                    // Try 3 times total
    .with_initial_backoff(Duration::from_secs(1))   // Wait 1s after first failure
    .with_max_backoff(Duration::from_secs(60));     // Never wait more than 60s

let pool = WorkerPoolBuilder::new("email-pool")
    .add_worker(EmailWorker)
    .with_middleware(AckNackMiddleware::default())  // Auto-ack/nack based on result
    .with_middleware(retry_handler)
    .build()
    .unwrap();

What happens:

  1. First attempt fails → middleware nacks (requeues) → wait 1 second
  2. Second attempt fails → middleware nacks (requeues) → wait 2 seconds
  3. Third attempt fails → middleware nacks (requeues) → wait 4 seconds
  4. All retries exhausted → send to dead letter queue (if configured)

The backoff doubles each time (exponential) with random jitter to prevent thundering herds.

Important: With AckNackMiddleware, you don't need to call message.ack() or message.nack() in your worker-just return Ok(()) for success or Err(...) for failure!

Dead Letter Queues

When retries are exhausted, don't lose the message-save it for later investigation:

use foxtive_worker::dlq::DeadLetterQueueBackend;

// Create a DLQ backend (in production, use Redis or file-based)
let dlq = Arc::new(DeadLetterQueueBackend::new());

let retry_handler = RetryHandler::default()
    .with_max_retries(3)
    .with_dead_letter_queue(dlq.clone());

// Later, inspect failed messages
let failed_messages = dlq.get_failed_messages().await;
for msg in failed_messages {
    eprintln!("Failed message {}: {}", msg.original_id, msg.error);
}

Circuit Breaker

If your SMTP server is down, stop hammering it:

use foxtive_worker::middleware::{CircuitBreakerMiddleware, AckNackMiddleware};

let circuit_breaker = CircuitBreakerMiddleware::new(
    5,                              // Open circuit after 5 failures
    Duration::from_secs(30)         // Try again after 30 seconds
);

let pool = WorkerPoolBuilder::new("email-pool")
    .add_worker(EmailWorker)
    .with_middleware(AckNackMiddleware::default())
    .with_middleware(circuit_breaker)
    .with_middleware(RetryHandler::default())
    .build()
    .unwrap();

How it works:

  • Closed (normal): Messages flow through
  • Open (after 5 failures): Reject immediately, fail fast
  • Half-Open (after 30s): Allow one test message through
    • Success → Close circuit
    • Failure → Reopen circuit

Tracing

See what's happening in production:

use foxtive_worker::middleware::{TracingMiddleware, AckNackMiddleware};
use tracing_subscriber;

// Initialize tracing
tracing_subscriber::fmt::init();

let pool = WorkerPoolBuilder::new("email-pool")
    .add_worker(EmailWorker)
    .with_middleware(AckNackMiddleware::default())
    .with_middleware(TracingMiddleware::new())
    .with_middleware(RetryHandler::default())
    .build()
    .unwrap();

Now you get structured logs:

INFO Message msg-123 received from email-queue
DEBUG Processing message msg-123 (attempt 1/3)
INFO Sending Welcome! to alice@example.com
DEBUG Message msg-123 processed successfully
INFO ✓ Message msg-123 acked by middleware

3. Scaling Up

One worker isn't enough. Let's scale.

Multiple Workers

Process messages in parallel:

let pool = WorkerPoolBuilder::new("email-pool")
    .with_strategy(LoadBalancingStrategy::RoundRobin)
    .add_workers(vec![
        Arc::new(EmailWorker),
        Arc::new(EmailWorker),
        Arc::new(EmailWorker),
    ])
    .build()
    .unwrap();

Load balancing strategies:

  • RoundRobin - Distribute evenly (worker 1, 2, 3, 1, 2, 3...)
  • Random - Pick randomly (good enough for most cases)
  • LeastLoaded - Send to worker with fewest active tasks (best for variable workloads)

Concurrency Control

Limit how many messages process simultaneously:

let pool = WorkerPoolBuilder::new("email-pool")
    .with_concurrency_limit(50)  // Max 50 concurrent messages
    .add_worker(EmailWorker)
    .build()
    .unwrap();

Why limit concurrency?

  • Prevent overwhelming downstream services (SMTP servers have rate limits)
  • Control memory usage
  • Avoid connection pool exhaustion

How to choose the right number:

  • CPU-bound tasks: Number of CPU cores
  • I/O-bound (HTTP, DB): 50-200
  • Mixed workloads: Start at 50, monitor and adjust

Connect to RabbitMQ

Time to use a real message broker:

use foxtive_worker::backends::RabbitMqBackend;

let config = RabbitMqConsumerConfig {
    queue_name: "emails".to_string(),
    prefetch_count: 50,  // Fetch 50 messages ahead
    ..Default::default()
};

let backend = Arc::new(
    RabbitMqBackend::new("amqp://localhost", config).await?
);

let pool = WorkerPoolBuilder::new("email-pool")
    .add_worker(EmailWorker)
    .with_middleware(RetryHandler::default())
    .build()
    .unwrap();

// Main loop
loop {
    if let Some(msg) = backend.receive().await? {
        pool.dispatch(msg).await?;
    }
}

Prefetch count matters:

  • Low (1-10): Strict ordering, slower
  • Medium (10-100): Good balance
  • High (100+): Maximum throughput, may reorder

4. Production Ready

Final touches before deploying.

Graceful Shutdown

Handle SIGTERM properly so you don't lose in-flight messages:

use tokio::signal;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let backend = Arc::new(RabbitMqBackend::new("amqp://localhost", config).await?);
    let pool = WorkerPoolBuilder::new("email-pool")
        .add_worker(EmailWorker)
        .build()
        .unwrap();
    
    // Spawn consumer in background
    let backend_clone = backend.clone();
    let pool_clone = pool.clone();
    tokio::spawn(async move {
        loop {
            if let Some(msg) = backend_clone.receive().await.unwrap() {
                pool_clone.dispatch(msg).await.unwrap();
            }
        }
    });
    
    // Wait for shutdown signal
    signal::ctrl_c().await?;
    println!("\nShutting down...");
    
    // This waits for in-flight messages to complete
    pool.shutdown().await?;
    println!("Shutdown complete");
    
    Ok(())
}

Health Checks

Expose health status for Kubernetes/orchestration:

use foxtive_worker::http::HealthEndpoint;
use axum::{Router, routing::get};

let health = HealthEndpoint::new(pool.clone());

let app = Router::new()
    .route("/health", get(|| async { health.check_health() }));

axum::Server::bind(&"0.0.0.0:8080".parse().unwrap())
    .serve(app.into_make_service())
    .await?;

Returns:

{
  "status": "healthy",
  "pool": "email-pool",
  "workers": 3,
  "running": true
}

Metrics

Track performance in production:

foxtive-worker = { version = "0.1", features = ["metrics"] }
metrics-exporter-prometheus = "0.12"
use metrics_exporter_prometheus::PrometheusBuilder;

// Export metrics to Prometheus
PrometheusBuilder::new()
    .install()
    .unwrap();

// Now your pool automatically tracks:
// - foxtive_worker_messages_received_total
// - foxtive_worker_messages_processed_total
// - foxtive_worker_message_processing_duration_seconds
// - foxtive_worker_active_workers

Scrape at http://localhost:9090/metrics.

Rate Limiting

Don't overwhelm external services:

use foxtive_worker::middleware::RateLimitMiddleware;

// 100 messages per second, burst of 10
let rate_limiter = RateLimitMiddleware::new(100, 10);

let pool = WorkerPoolBuilder::new("email-pool")
    .add_worker(EmailWorker)
    .with_middleware(rate_limiter)
    .build()
    .unwrap();

Powered by the governor crate-efficient, distributed-ready rate limiting.


5. Message Properties

Modern microservices architectures need rich metadata for distributed tracing, service identification, and message routing. Foxtive Worker provides standardized MessageProperties that work across all backends.

Why Message Properties?

  • Service Identification: Track which service sent a message
  • Distributed Tracing: Correlate requests across services
  • Priority Processing: Handle urgent messages first
  • TTL Management: Auto-expire stale messages
  • Custom Metadata: Backend-specific headers and properties

Basic Usage

use foxtive_worker::MessageProperties;

// Create properties with builder pattern
let properties = MessageProperties::new()
    .with_content_type("application/json")
    .with_app_id("user-service")
    .with_message_type("user.created")
    .with_priority(5)
    .with_header("correlation_id", "trace-abc-123")
    .with_header("environment", "production");

Accessing Properties in Workers

#[async_trait]
impl Worker for MyWorker {
    fn id(&self) -> &str { "my-worker" }
    
    async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
        // Access message properties
        if let Some(props) = &message.message.metadata.properties {
            // Get standard fields
            if let Some(app_id) = &props.app_id {
                tracing::info!("Message from: {}", app_id);
            }
            
            // Get custom headers for distributed tracing
            if let Some(headers) = &props.headers {
                if let Some(correlation_id) = headers.get("correlation_id") {
                    tracing::Span::current().record("correlation_id", correlation_id);
                }
            }
            
            // Priority-based processing
            if let Some(priority) = props.priority {
                if priority >= 8 {
                    tracing::warn!("High priority message!");
                }
            }
        }
        
        // Process message...
        Ok(())
    }
}

Backend-Specific Behavior

RabbitMQ: Automatically extracts AMQP BasicProperties:

  • Content type, encoding, priority, expiration
  • User ID, app ID, reply-to
  • Custom headers from FieldTable

Redis Streams: Extracts additional stream fields as custom headers (all fields except 'data')

Memory Backend: Use enqueue_with_properties() to set properties:

let backend = MemoryBackend::new();
backend.enqueue_with_properties(
    serde_json::json!({"key": "value"}),
    Some(properties)
);

Distributed Tracing Example

Track a request flowing through multiple services:

// Service 1: API Gateway
let props = MessageProperties::new()
    .with_app_id("api-gateway")
    .with_message_type("request.received")
    .with_header("correlation_id", "trace-xyz-789");

// Service 2: Auth Service
let props = MessageProperties::new()
    .with_app_id("auth-service")
    .with_message_type("auth.validated")
    .with_header("correlation_id", "trace-xyz-789");  // Same ID!

// Service 3: Order Service
let props = MessageProperties::new()
    .with_app_id("order-service")
    .with_message_type("order.created")
    .with_header("correlation_id", "trace-xyz-789");  // Same ID!

All events share the same correlation ID for end-to-end tracing.

Available Properties

Field Type Description
content_type Option<String> MIME type (e.g., "application/json")
content_encoding Option<String> Encoding (e.g., "utf-8", "gzip")
priority Option<u8> Priority level (0-255)
expiration Option<u64> TTL in milliseconds
message_type Option<String> Type identifier for routing
user_id Option<String> Associated user ID
app_id Option<String> Application/service identifier
cluster_id Option<String> Cluster ID for federated systems
reply_to Option<String> Reply address for responses
headers Option<HashMap> Custom key-value pairs

See the message_properties example for complete usage patterns.


Acknowledgment Patterns

Foxtive Worker provides two ways to handle message acknowledgment:

1. Automatic Acknowledgment (Recommended)

Use AckNackMiddleware to automatically ack/nack based on worker result:

use foxtive_worker::middleware::AckNackMiddleware;

let pool = WorkerPoolBuilder::new("my-pool")
    .add_worker(MyWorker)
    .with_middleware(AckNackMiddleware::default())  // Auto-ack on success, nack on failure
    .build()?;

Your worker just returns results:

async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
    do_work().await?;  // If this fails, middleware will nack
    Ok(())  // Middleware will ack
}

Benefits:

  • ✅ Clean separation of concerns
  • ✅ No forgotten acknowledgments
  • ✅ Consistent error handling
  • ✅ Works with retries, circuit breakers, etc.

2. Manual Acknowledgment

If you need fine-grained control, skip AckNackMiddleware and ack manually:

async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
    match do_work().await {
        Ok(_) => {
            message.ack().await?;  // Explicitly acknowledge
            Ok(())
        }
        Err(e) => {
            if should_retry(&e) {
                message.nack(true).await?;  // Requeue for retry
            } else {
                message.nack(false).await?;  // Don't requeue (send to DLQ)
            }
            Err(e)
        }
    }
}

When to use manual ack:

  • Conditional acknowledgment logic
  • Partial processing scenarios
  • Custom retry strategies per message

⚠️ Warning: Never mix manual ack with AckNackMiddleware-you'll get double-ack errors!


6. Dead Letter Queues

When messages fail all retry attempts, you don't want to lose them-you want to inspect and debug them later. That's where Dead Letter Queues (DLQ) come in.

What is a DLQ?

A DLQ is a special queue that stores messages that have exhausted all retry attempts. Instead of discarding failed messages, they're moved to the DLQ for:

  • Debugging: Inspect why messages failed
  • Reprocessing: Fix issues and retry manually
  • Monitoring: Track failure rates and patterns
  • Audit trail: Keep record of all failed processing

RabbitMQ DLQ Architecture

Foxtive Worker automatically sets up DLQ infrastructure when you enable delayed retries:

Main Queue → Retry Queue (TTL) → Main Queue → [3 attempts] → DLQ

Infrastructure created automatically:

  • Retry Queue: {queue_name}-retry - Holds messages during TTL delay
  • Retry Exchange: {queue_name}-retry_exchange - Routes retried messages
  • DLQ: {queue_name}-dlq - Permanent storage for exhausted messages

Enabling DLQ

use foxtive_worker::backends::{RabbitMqBackend, RabbitMqConsumerConfig};

let config = RabbitMqConsumerConfig {
    queue_name: "email-notifications".to_string(),
    enable_delayed_retry: true,  // Enables retry queue + DLQ
    ..Default::default()
};

let backend = Arc::new(
    RabbitMqBackend::new("amqp://localhost", config).await?
);

// DLQ is automatically created as "email-notifications-dlq"
println!("DLQ name: {:?}", backend.dlq_name);

How It Works

  1. Message fails → RetryHandler nacks with delay
  2. Published to retry queue with TTL (e.g., 60 seconds)
  3. TTL expires → Message dead-lettered back to main queue
  4. Attempt count preserved via message headers (x-retry-attempt)
  5. After max retries → Published to DLQ with failure metadata
  6. Original message acknowledged (removed from main queue)

DLQ Message Headers

Messages in the DLQ include rich metadata in their headers:

Header Type Description
x-original-routing-key String Original message routing key
x-failure-reason String Error message explaining failure
x-final-attempt Integer Final attempt count before exhaustion
x-failed-at String ISO 8601 timestamp of failure

Example DLQ message headers:

{
  "x-original-routing-key": "user.created",
  "x-failure-reason": "Connection timeout after 3 retries",
  "x-final-attempt": 3,
  "x-failed-at": "2026-06-16T14:48:05.214409Z"
}

Monitoring DLQ

Check your DLQ size to detect systemic issues:

// Using RabbitMQ Management API
let dlq_messages = rabbitmq_api.get_queue_messages("email-notifications-dlq").await?;

if dlq_messages > 10 {
    tracing::warn!("High DLQ count: {} messages need attention", dlq_messages);
}

Or use RabbitMQ Management UI:

  1. Navigate to Queues tab
  2. Find {your-queue}-dlq
  3. Monitor Total column
  4. Click queue name to inspect individual messages

Reprocessing DLQ Messages

Manually reprocess failed messages after fixing the issue:

// Consume from DLQ
let dlq_config = RabbitMqConsumerConfig {
    queue_name: "email-notifications-dlq".to_string(),
    ..Default::default()
};

let dlq_backend = Arc::new(
    RabbitMqBackend::new("amqp://localhost", dlq_config).await?
);

// Process failed messages
while let Some(msg) = dlq_backend.receive().await? {
    println!("DLQ message: {}", msg.message.id);
    
    // Inspect failure reason from headers
    if let Some(props) = &msg.message.metadata.properties {
        if let Some(headers) = &props.headers {
            if let Some(reason) = headers.get("x-failure-reason") {
                println!("Failed because: {}", reason);
            }
        }
    }
    
    // After fixing the issue, republish to main queue
    // or handle based on your business logic
    msg.ack().await?;
}

Customizing DLQ Names

By default, DLQ names follow the pattern {queue_name}-dlq. You can customize this:

let config = RabbitMqConsumerConfig {
    queue_name: "emails".to_string(),
    enable_delayed_retry: true,
    retry_queue_name: Some("emails-delayed-retry".to_string()),
    // DLQ will be "emails-dlq" (auto-generated)
    ..Default::default()
};

Best Practices

Monitor DLQ growth - Set up alerts when DLQ exceeds threshold
Include error context - Use descriptive error messages in workers
Regular cleanup - Archive or delete old DLQ messages
Root cause analysis - Investigate patterns in DLQ failures
Automated reprocessing - Build DLQ consumers for common failures

Don't ignore DLQ - Growing DLQ indicates systemic issues
Don't store sensitive data - DLQ messages persist indefinitely
Don't disable DLQ in production - You'll lose failed messages

Example: Payment Processing with DLQ

use foxtive_worker::middleware::{RetryHandler, AckNackMiddleware};

let retry_handler = RetryHandler::default()
    .with_max_retries(3)
    .with_initial_backoff(Duration::from_secs(5));

let pool = WorkerPoolBuilder::new("payment-pool")
    .add_worker(PaymentWorker)
    .with_middleware(AckNackMiddleware::default())
    .with_middleware(retry_handler)
    .build()?;

// If payment fails 3 times:
// 1. Message moves to "payments-dlq"
// 2. Headers contain failure reason
// 3. You can inspect and reprocess manually
// 4. No payment is lost!

Examples

Complete, copy-paste ready examples for common scenarios.

Payment Processing

Process payments with timeouts and circuit breakers:

struct PaymentWorker;

#[async_trait]
impl Worker for PaymentWorker {
    fn id(&self) -> &str { "payment-worker" }
    
    fn processing_timeout(&self) -> Option<Duration> {
        Some(Duration::from_secs(30))  // Fail fast if payment hangs
    }
    
    async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
        let order_id = message.message.payload["order_id"].as_str().unwrap();
        let amount = message.message.payload["amount"].as_f64().unwrap();
        
        charge_payment(order_id, amount).await?;
        Ok(())  // AckNackMiddleware will ack automatically
    }
}

let pool = WorkerPoolBuilder::new("payment-pool")
    .with_concurrency_limit(20)  // Conservative-payments are critical
    .add_worker(PaymentWorker)
    .with_middleware(AckNackMiddleware::default())
    .with_middleware(CircuitBreakerMiddleware::new(3, Duration::from_secs(60)))
    .with_middleware(RetryHandler::default().with_max_retries(2))
    .build()?;

Batch Database Updates

Process updates in batches for efficiency:

use foxtive_worker::{BatchHandler, MessageBatch, BatchConfig};
use foxtive_worker::middleware::BatchMiddleware;

struct DatabaseBatchHandler;

#[async_trait]
impl BatchHandler for DatabaseBatchHandler {
    async fn process_batch(&self, batch: MessageBatch<serde_json::Value>) -> WorkerResult<()> {
        let updates: Vec<_> = batch.messages.iter()
            .map(|msg| msg.message.payload.clone())
            .collect();
        
        // Single bulk insert instead of N individual inserts
        bulk_insert(updates).await?;
        Ok(())
    }
}

let config = BatchConfig::default()
    .with_batch_size(100)              // Process 100 at a time
    .with_flush_interval(Duration::from_secs(5));  // Or every 5 seconds

let batch_middleware = BatchMiddleware::new(
    Arc::new(DatabaseBatchHandler),
    config
);

let pool = WorkerPoolBuilder::new("db-pool")
    .add_worker(DbWorker)
    .with_middleware(batch_middleware)
    .build()?;

Image Processing Pipeline

Chain multiple workers together:

// Worker 1: Download image
struct DownloadWorker;

// Worker 2: Resize image  
struct ResizeWorker;

// Worker 3: Upload to CDN
struct UploadWorker;

// Separate pools for each stage
let download_pool = WorkerPoolBuilder::new("download-pool")
    .with_concurrency_limit(10)  // Network-bound
    .add_worker(DownloadWorker)
    .build()?;

let resize_pool = WorkerPoolBuilder::new("resize-pool")
    .with_concurrency_limit(4)  // CPU-bound
    .add_worker(ResizeWorker)
    .build()?;

let upload_pool = WorkerPoolBuilder::new("upload-pool")
    .with_concurrency_limit(20)  // Network-bound
    .add_worker(UploadWorker)
    .build()?;

Each worker publishes to the next queue in the pipeline.


Configuration Reference

RetryHandler

RetryHandler::default()
    .with_max_retries(5)                      // Total attempts
    .with_initial_backoff(Duration::from_secs(1))  // First retry delay
    .with_max_backoff(Duration::from_secs(60))     // Maximum delay
    .with_backoff_multiplier(2.0)             // Exponential factor
    .with_jitter(true)                        // Add randomness
    .with_dead_letter_queue(dlq)              // Where to send exhausted messages
    .with_poison_pill_tracker(tracker)        // Detect always-failing messages

RabbitMQ Backend

RabbitMqConsumerConfig {
    queue_name: "my-queue".to_string(),
    consumer_tag: "my-consumer".to_string(),
    prefetch_count: 50,        // Messages to fetch ahead
    auto_ack: false,           // Always manual ack!
    requeue_on_nack: true,     // Put failed messages back
}

Redis Streams Backend

RedisStreamConsumerConfig {
    stream_name: "my-stream".to_string(),
    group_name: "my-group".to_string(),
    consumer_name: "consumer-1".to_string(),
    block_ms: 5000,            // Wait up to 5s for messages
    count: 10,                 // Read 10 messages at once
    auto_ack: false,           // Manual ack
    dlq_stream_name: Some("my-dlq".to_string()),
}

Making Backends "Refuse to Die"

Network failures happen. Brokers restart. Kubernetes reschedules pods. Your workers should survive all of this.

The Problem

By default, if your RabbitMQ or Redis connection drops:

  • receive() returns an error
  • Your worker stops processing
  • You need manual intervention to restart

The Solution: ResilientBackend

Wrap any backend in ResilientBackend and it will automatically retry forever with exponential backoff:

use foxtive_worker::{ResilientBackend, ReconnectStrategy};
use std::sync::Arc;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // Create your backend
    let rabbitmq = RabbitMqBackend::with_defaults("amqp://localhost")
        .await
        .unwrap();
    
    // Wrap it - now it won't die!
    let resilient = ResilientBackend::new(Arc::new(rabbitmq));
    
    // This will retry forever if connection drops
    loop {
        match resilient.receive().await {
            Ok(Some(msg)) => {
                println!("Got: {}", msg.message.id);
                msg.ack().await.unwrap();
            }
            Ok(None) => continue,  // No messages
            Err(e) => {
                // This is rarely reached - ResilientBackend retries internally
                tracing::error!("Unexpected: {}", e);
            }
        }
    }
}

How It Works

  1. Detects failures - Catches any error from the wrapped backend
  2. Retries indefinitely - Never gives up (unless you explicitly shutdown)
  3. Exponential backoff - Starts fast (1s), slows down (max 60s)
  4. Jitter - Adds randomness to prevent thundering herd
  5. Observable - Track connection state and retry attempts

Custom Reconnection Strategies

Want more control? Configure your own strategy:

// Fast retries for critical systems
let aggressive = ReconnectStrategy::Exponential {
    initial: Duration::from_millis(200),  // Start at 200ms
    max: Duration::from_secs(10),          // Max 10s
    multiplier: 2.0,
    jitter_factor: 0.1,                    // 10% jitter
};

let resilient = ResilientBackendBuilder::new(Arc::new(redis))
    .with_strategy(aggressive)
    .build();

Or use fixed delays (good for testing):

let fixed = ReconnectStrategy::Fixed(Duration::from_secs(2));
let resilient = ResilientBackend::with_strategy(Arc::new(backend), fixed);

Monitoring Connection Health

Track when things go wrong:

let resilient = ResilientBackend::new(Arc::new(rabbitmq));

// Spawn a monitor task
tokio::spawn({
    let resilient = resilient.clone();
    async move {
        loop {
            tokio::time::sleep(Duration::from_secs(5)).await;
            
            let connected = resilient.is_connected().await;
            let attempts = resilient.reconnect_attempts().await;
            let failures = resilient.consecutive_failures().await;
            
            if !connected || attempts > 0 {
                tracing::warn!(
                    "Connection issues: connected={}, attempts={}, failures={}",
                    connected, attempts, failures
                );
            }
        }
    }
});

When to Use ResilientBackend

Use it when:

  • Running in production (network failures are inevitable)
  • Deployed on Kubernetes (pods get rescheduled)
  • Using managed services (RDS, CloudAMQP, etc.)
  • You want "set it and forget it" reliability

Skip it when:

  • Testing locally (errors help you debug faster)
  • You need immediate failure notification
  • Building CLI tools (users expect fast feedback)

Real-World Example

Here's how to make your email worker truly bulletproof:

use foxtive_worker::{WorkerPoolBuilder, ResilientBackend, middleware::RetryHandler};
use std::sync::Arc;

#[tokio::main]
async fn main() {
    // 1. Create resilient backend
    let rabbitmq = RabbitMqBackend::with_defaults("amqp://rabbitmq.prod")
        .await
        .unwrap();
    let backend = Arc::new(ResilientBackend::new(Arc::new(rabbitmq)));
    
    // 2. Add retry logic for transient failures
    let retry = RetryHandler::default()
        .with_max_retries(5)
        .with_initial_backoff(Duration::from_secs(1));
    
    // 3. Build pool with middleware
    let pool = WorkerPoolBuilder::new("email-workers")
        .add_worker(EmailWorker)
        .with_middleware(retry)
        .with_concurrency_limit(50)
        .build()
        .unwrap();
    
    // 4. Run forever - survives network outages, broker restarts, etc.
    pool.run_with_backend(backend).await.unwrap();
}

Now your worker will:

  • Survive RabbitMQ restarts
  • Handle temporary network partitions
  • Retry failed messages with backoff
  • Move poison pills to DLQ after 5 attempts
  • Keep running for months without intervention

That's production-ready.


Load Balancing Strategies

Strategy Best For Notes
RoundRobin Uniform workloads Even distribution
Random Simple setups Good enough usually
LeastLoaded Variable workloads Smartest routing

Troubleshooting

Messages aren't being processed

Check:

  1. Is your worker actually receiving messages?

    tracing_subscriber::fmt::init();  // Enable logs
    
  2. Does the queue/stream name match what you're publishing to?

  3. Are there permission issues on the queue?

  4. Is your worker crashing silently? Wrap in try-catch:

    async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()> {
        match self.do_work(&message).await {
            Ok(_) => Ok(()),  // Middleware will ack
            Err(e) => {
                eprintln!("Worker error: {}", e);
                Err(e)  // Middleware will nack
            }
        }
    }
    
  5. Using manual ack with AckNackMiddleware? This causes PRECONDITION_FAILED errors! Choose one pattern only.

Messages retrying forever

You have a poison pill (malformed message). Fix:

let retry_handler = RetryHandler::default()
    .with_max_retries(5)  // Don't retry forever!
    .with_dead_letter_queue(dlq);

Then inspect the DLQ to see what's failing.

High memory usage

Solutions:

  1. Reduce concurrency:

    .with_concurrency_limit(50)  // Instead of 500
    
  2. Lower prefetch count (RabbitMQ):

    prefetch_count: 10  // Instead of 100
    
  3. Check for memory leaks in your worker code

Slow processing

Debug steps:

  1. Profile your worker-the bottleneck is usually your code, not the framework
  2. Increase concurrency if I/O-bound:
    .with_concurrency_limit(200)
    
  3. Use batch processing for database operations
  4. Check network latency to your broker

Worker keeps crashing

Use Foxtive Supervisor for automatic restarts:

use foxtive_supervisor::Supervisor;

let supervisor = Supervisor::new()
    .add_child(pool.supervise())
    .start()
    .await?;

Architecture

How it all fits together:

Message Broker (RabbitMQ/Redis)
         │
         ▼
   MessageBackend      ← Abstracts broker-specific code
         │
         ▼
   WorkerPool          ← Load balances across workers
         │
         ├──► Middleware Chain  ← Retry, circuit breaker, tracing
         │         │
         │         ▼
         └──► Worker           ← Your business logic
                   │
                   ▼
              Ack/Nack         ← Manual acknowledgment

Design Decisions

Manual ack/nack via middleware

  • AckNackMiddleware automatically acknowledges based on worker result
  • You control acknowledgment behavior through middleware configuration
  • No accidental message loss, explicit error handling
  • Workers focus on business logic, not infrastructure concerns

Middleware pipeline

  • Composable, reusable components
  • Add/remove functionality without touching worker code
  • Clean separation of concerns

Trait-based backends

  • Swap RabbitMQ for Redis without changing worker logic
  • Easy to add new brokers (Kafka, NATS, etc.)

Semaphore-based concurrency

  • Precise control over parallelism
  • Prevents resource exhaustion
  • Works across all workers in the pool

Spawn-per-message

  • Each message gets its own Tokio task
  • Simple mental model
  • Leverages Tokio's work-stealing scheduler

Contributing

Contributions welcome! Guidelines:

  1. Run tests: cargo test
  2. Format code: cargo fmt
  3. Clippy clean: cargo clippy -- -D warnings
  4. Update docs: If you change public API, update docs

Open an issue first for big changes. Let's discuss before you spend weeks on a PR.


License

MIT License - do whatever you want with this.


Credits

Built with:

Inspired by Celery, Sidekiq, and Bull-but faster because Rust.


Need help? Open an issue on GitHub. Found a bug? Same thing. Want to chat? Also an issue.

Happy coding!

🦊