🦊 Foxtive Worker
A production-ready background worker framework for Rust.
Process messages from RabbitMQ, Redis Streams, or any queue system with confidence. Built-in retries, circuit breakers, dead letter queues, and observability-so you can focus on your business logic.
Table of Contents
- Quick Start - Get running in 5 minutes
- User Guide - Step-by-step learning path
- 1. Your First Worker
- 2. Adding Reliability
- 3. Scaling Up
- 4. Production Ready
- 5. Message Properties - Microservices metadata & distributed tracing
- 6. Dead Letter Queues - Handle exhausted retries
- Examples - Real-world use cases
- Configuration Reference
- Resilient Backends - Survive network failures
- Troubleshooting
- Architecture
Quick Start
Get a worker processing messages in under 5 minutes.
1. Add to Cargo.toml
[]
= { = "0.1", = ["rabbitmq"] }
= { = "1", = ["full"] }
= "0.1"
= "1.0"
2. Create a Worker
use ;
use WorkerResult;
use async_trait;
;
Note: In production, you'll typically add AckNackMiddleware to automatically acknowledge messages based on success/failure. See Adding Reliability below.
3. Run It
use ;
use Arc;
async
That's it! You've got a working message processor. Now let's make it production-ready.
User Guide
Follow this step-by-step guide to go from zero to production.
1. Your First Worker
Let's build something real-an email notification service.
The Problem
You have a queue of emails to send. Each message looks like:
The Solution
use ;
use WorkerResult;
use async_trait;
;
async
Key concepts:
- Return
Ok(())- Signals successful processing; middleware will ack the message - Return
Err(...)- Signals failure; middleware will nack/retry based on configuration - Always validate your input-bad messages happen
- Don't manually ack/nack when using
AckNackMiddleware-let it handle acknowledgment automatically
Try It
async
2. Adding Reliability
Your first worker works, but what happens when:
- The SMTP server is down?
- A message is malformed?
- Your worker crashes?
Let's add safety nets.
Automatic Retries
Messages fail sometimes. Retry them with exponential backoff:
use ;
let retry_handler = default
.with_max_retries // Try 3 times total
.with_initial_backoff // Wait 1s after first failure
.with_max_backoff; // Never wait more than 60s
let pool = new
.add_worker
.with_middleware // Auto-ack/nack based on result
.with_middleware
.build
.unwrap;
What happens:
- First attempt fails → middleware nacks (requeues) → wait 1 second
- Second attempt fails → middleware nacks (requeues) → wait 2 seconds
- Third attempt fails → middleware nacks (requeues) → wait 4 seconds
- All retries exhausted → send to dead letter queue (if configured)
The backoff doubles each time (exponential) with random jitter to prevent thundering herds.
Important: With AckNackMiddleware, you don't need to call message.ack() or message.nack() in your worker-just return Ok(()) for success or Err(...) for failure!
Dead Letter Queues
When retries are exhausted, don't lose the message-save it for later investigation:
use DeadLetterQueueBackend;
// Create a DLQ backend (in production, use Redis or file-based)
let dlq = new;
let retry_handler = default
.with_max_retries
.with_dead_letter_queue;
// Later, inspect failed messages
let failed_messages = dlq.get_failed_messages.await;
for msg in failed_messages
Circuit Breaker
If your SMTP server is down, stop hammering it:
use ;
let circuit_breaker = new;
let pool = new
.add_worker
.with_middleware
.with_middleware
.with_middleware
.build
.unwrap;
How it works:
- Closed (normal): Messages flow through
- Open (after 5 failures): Reject immediately, fail fast
- Half-Open (after 30s): Allow one test message through
- Success → Close circuit
- Failure → Reopen circuit
Tracing
See what's happening in production:
use ;
use tracing_subscriber;
// Initialize tracing
init;
let pool = new
.add_worker
.with_middleware
.with_middleware
.with_middleware
.build
.unwrap;
Now you get structured logs:
INFO Message msg-123 received from email-queue
DEBUG Processing message msg-123 (attempt 1/3)
INFO Sending Welcome! to alice@example.com
DEBUG Message msg-123 processed successfully
INFO ✓ Message msg-123 acked by middleware
3. Scaling Up
One worker isn't enough. Let's scale.
Multiple Workers
Process messages in parallel:
let pool = new
.with_strategy
.add_workers
.build
.unwrap;
Load balancing strategies:
RoundRobin- Distribute evenly (worker 1, 2, 3, 1, 2, 3...)Random- Pick randomly (good enough for most cases)LeastLoaded- Send to worker with fewest active tasks (best for variable workloads)
Concurrency Control
Limit how many messages process simultaneously:
let pool = new
.with_concurrency_limit // Max 50 concurrent messages
.add_worker
.build
.unwrap;
Why limit concurrency?
- Prevent overwhelming downstream services (SMTP servers have rate limits)
- Control memory usage
- Avoid connection pool exhaustion
How to choose the right number:
- CPU-bound tasks: Number of CPU cores
- I/O-bound (HTTP, DB): 50-200
- Mixed workloads: Start at 50, monitor and adjust
Connect to RabbitMQ
Time to use a real message broker:
use RabbitMqBackend;
let config = RabbitMqConsumerConfig ;
let backend = new;
let pool = new
.add_worker
.with_middleware
.build
.unwrap;
// Main loop
loop
Prefetch count matters:
- Low (1-10): Strict ordering, slower
- Medium (10-100): Good balance
- High (100+): Maximum throughput, may reorder
4. Production Ready
Final touches before deploying.
Graceful Shutdown
Handle SIGTERM properly so you don't lose in-flight messages:
use signal;
async
Health Checks
Expose health status for Kubernetes/orchestration:
use HealthEndpoint;
use ;
let health = new;
let app = new
.route;
bind
.serve
.await?;
Returns:
Metrics
Track performance in production:
= { = "0.1", = ["metrics"] }
= "0.12"
use PrometheusBuilder;
// Export metrics to Prometheus
new
.install
.unwrap;
// Now your pool automatically tracks:
// - foxtive_worker_messages_received_total
// - foxtive_worker_messages_processed_total
// - foxtive_worker_message_processing_duration_seconds
// - foxtive_worker_active_workers
Scrape at http://localhost:9090/metrics.
Rate Limiting
Don't overwhelm external services:
use RateLimitMiddleware;
// 100 messages per second, burst of 10
let rate_limiter = new;
let pool = new
.add_worker
.with_middleware
.build
.unwrap;
Powered by the governor crate-efficient, distributed-ready rate limiting.
5. Message Properties
Modern microservices architectures need rich metadata for distributed tracing, service identification, and message routing. Foxtive Worker provides standardized MessageProperties that work across all backends.
Why Message Properties?
- Service Identification: Track which service sent a message
- Distributed Tracing: Correlate requests across services
- Priority Processing: Handle urgent messages first
- TTL Management: Auto-expire stale messages
- Custom Metadata: Backend-specific headers and properties
Basic Usage
use MessageProperties;
// Create properties with builder pattern
let properties = new
.with_content_type
.with_app_id
.with_message_type
.with_priority
.with_header
.with_header;
Accessing Properties in Workers
Backend-Specific Behavior
RabbitMQ: Automatically extracts AMQP BasicProperties:
- Content type, encoding, priority, expiration
- User ID, app ID, reply-to
- Custom headers from FieldTable
Redis Streams: Extracts additional stream fields as custom headers (all fields except 'data')
Memory Backend: Use enqueue_with_properties() to set properties:
let backend = new;
backend.enqueue_with_properties;
Distributed Tracing Example
Track a request flowing through multiple services:
// Service 1: API Gateway
let props = new
.with_app_id
.with_message_type
.with_header;
// Service 2: Auth Service
let props = new
.with_app_id
.with_message_type
.with_header; // Same ID!
// Service 3: Order Service
let props = new
.with_app_id
.with_message_type
.with_header; // Same ID!
All events share the same correlation ID for end-to-end tracing.
Available Properties
| Field | Type | Description |
|---|---|---|
content_type |
Option<String> |
MIME type (e.g., "application/json") |
content_encoding |
Option<String> |
Encoding (e.g., "utf-8", "gzip") |
priority |
Option<u8> |
Priority level (0-255) |
expiration |
Option<u64> |
TTL in milliseconds |
message_type |
Option<String> |
Type identifier for routing |
user_id |
Option<String> |
Associated user ID |
app_id |
Option<String> |
Application/service identifier |
cluster_id |
Option<String> |
Cluster ID for federated systems |
reply_to |
Option<String> |
Reply address for responses |
headers |
Option<HashMap> |
Custom key-value pairs |
See the message_properties example for complete usage patterns.
Acknowledgment Patterns
Foxtive Worker provides two ways to handle message acknowledgment:
1. Automatic Acknowledgment (Recommended)
Use AckNackMiddleware to automatically ack/nack based on worker result:
use AckNackMiddleware;
let pool = new
.add_worker
.with_middleware // Auto-ack on success, nack on failure
.build?;
Your worker just returns results:
async
Benefits:
- ✅ Clean separation of concerns
- ✅ No forgotten acknowledgments
- ✅ Consistent error handling
- ✅ Works with retries, circuit breakers, etc.
2. Manual Acknowledgment
If you need fine-grained control, skip AckNackMiddleware and ack manually:
async
When to use manual ack:
- Conditional acknowledgment logic
- Partial processing scenarios
- Custom retry strategies per message
⚠️ Warning: Never mix manual ack with AckNackMiddleware-you'll get double-ack errors!
6. Dead Letter Queues
When messages fail all retry attempts, you don't want to lose them-you want to inspect and debug them later. That's where Dead Letter Queues (DLQ) come in.
What is a DLQ?
A DLQ is a special queue that stores messages that have exhausted all retry attempts. Instead of discarding failed messages, they're moved to the DLQ for:
- Debugging: Inspect why messages failed
- Reprocessing: Fix issues and retry manually
- Monitoring: Track failure rates and patterns
- Audit trail: Keep record of all failed processing
RabbitMQ DLQ Architecture
Foxtive Worker automatically sets up DLQ infrastructure when you enable delayed retries:
Main Queue → Retry Queue (TTL) → Main Queue → [3 attempts] → DLQ
Infrastructure created automatically:
- Retry Queue:
{queue_name}-retry- Holds messages during TTL delay - Retry Exchange:
{queue_name}-retry_exchange- Routes retried messages - DLQ:
{queue_name}-dlq- Permanent storage for exhausted messages
Enabling DLQ
use ;
let config = RabbitMqConsumerConfig ;
let backend = new;
// DLQ is automatically created as "email-notifications-dlq"
println!;
How It Works
- Message fails → RetryHandler nacks with delay
- Published to retry queue with TTL (e.g., 60 seconds)
- TTL expires → Message dead-lettered back to main queue
- Attempt count preserved via message headers (
x-retry-attempt) - After max retries → Published to DLQ with failure metadata
- Original message acknowledged (removed from main queue)
DLQ Message Headers
Messages in the DLQ include rich metadata in their headers:
| Header | Type | Description |
|---|---|---|
x-original-routing-key |
String | Original message routing key |
x-failure-reason |
String | Error message explaining failure |
x-final-attempt |
Integer | Final attempt count before exhaustion |
x-failed-at |
String | ISO 8601 timestamp of failure |
Example DLQ message headers:
Monitoring DLQ
Check your DLQ size to detect systemic issues:
// Using RabbitMQ Management API
let dlq_messages = rabbitmq_api.get_queue_messages.await?;
if dlq_messages > 10
Or use RabbitMQ Management UI:
- Navigate to Queues tab
- Find
{your-queue}-dlq - Monitor Total column
- Click queue name to inspect individual messages
Reprocessing DLQ Messages
Manually reprocess failed messages after fixing the issue:
// Consume from DLQ
let dlq_config = RabbitMqConsumerConfig ;
let dlq_backend = new;
// Process failed messages
while let Some = dlq_backend.receive.await?
Customizing DLQ Names
By default, DLQ names follow the pattern {queue_name}-dlq. You can customize this:
let config = RabbitMqConsumerConfig ;
Best Practices
✅ Monitor DLQ growth - Set up alerts when DLQ exceeds threshold
✅ Include error context - Use descriptive error messages in workers
✅ Regular cleanup - Archive or delete old DLQ messages
✅ Root cause analysis - Investigate patterns in DLQ failures
✅ Automated reprocessing - Build DLQ consumers for common failures
❌ Don't ignore DLQ - Growing DLQ indicates systemic issues
❌ Don't store sensitive data - DLQ messages persist indefinitely
❌ Don't disable DLQ in production - You'll lose failed messages
Example: Payment Processing with DLQ
use ;
let retry_handler = default
.with_max_retries
.with_initial_backoff;
let pool = new
.add_worker
.with_middleware
.with_middleware
.build?;
// If payment fails 3 times:
// 1. Message moves to "payments-dlq"
// 2. Headers contain failure reason
// 3. You can inspect and reprocess manually
// 4. No payment is lost!
Examples
Complete, copy-paste ready examples for common scenarios.
Payment Processing
Process payments with timeouts and circuit breakers:
;
let pool = new
.with_concurrency_limit // Conservative-payments are critical
.add_worker
.with_middleware
.with_middleware
.with_middleware
.build?;
Batch Database Updates
Process updates in batches for efficiency:
use ;
use BatchMiddleware;
;
let config = default
.with_batch_size // Process 100 at a time
.with_flush_interval; // Or every 5 seconds
let batch_middleware = new;
let pool = new
.add_worker
.with_middleware
.build?;
Image Processing Pipeline
Chain multiple workers together:
// Worker 1: Download image
;
// Worker 2: Resize image
;
// Worker 3: Upload to CDN
;
// Separate pools for each stage
let download_pool = new
.with_concurrency_limit // Network-bound
.add_worker
.build?;
let resize_pool = new
.with_concurrency_limit // CPU-bound
.add_worker
.build?;
let upload_pool = new
.with_concurrency_limit // Network-bound
.add_worker
.build?;
Each worker publishes to the next queue in the pipeline.
Configuration Reference
RetryHandler
default
.with_max_retries // Total attempts
.with_initial_backoff // First retry delay
.with_max_backoff // Maximum delay
.with_backoff_multiplier // Exponential factor
.with_jitter // Add randomness
.with_dead_letter_queue // Where to send exhausted messages
.with_poison_pill_tracker // Detect always-failing messages
RabbitMQ Backend
RabbitMqConsumerConfig
Redis Streams Backend
RedisStreamConsumerConfig
Making Backends "Refuse to Die"
Network failures happen. Brokers restart. Kubernetes reschedules pods. Your workers should survive all of this.
The Problem
By default, if your RabbitMQ or Redis connection drops:
receive()returns an error- Your worker stops processing
- You need manual intervention to restart
The Solution: ResilientBackend
Wrap any backend in ResilientBackend and it will automatically retry forever with exponential backoff:
use ;
use Arc;
use Duration;
async
How It Works
- Detects failures - Catches any error from the wrapped backend
- Retries indefinitely - Never gives up (unless you explicitly shutdown)
- Exponential backoff - Starts fast (1s), slows down (max 60s)
- Jitter - Adds randomness to prevent thundering herd
- Observable - Track connection state and retry attempts
Custom Reconnection Strategies
Want more control? Configure your own strategy:
// Fast retries for critical systems
let aggressive = Exponential ;
let resilient = new
.with_strategy
.build;
Or use fixed delays (good for testing):
let fixed = Fixed;
let resilient = with_strategy;
Monitoring Connection Health
Track when things go wrong:
let resilient = new;
// Spawn a monitor task
spawn;
When to Use ResilientBackend
Use it when:
- Running in production (network failures are inevitable)
- Deployed on Kubernetes (pods get rescheduled)
- Using managed services (RDS, CloudAMQP, etc.)
- You want "set it and forget it" reliability
Skip it when:
- Testing locally (errors help you debug faster)
- You need immediate failure notification
- Building CLI tools (users expect fast feedback)
Real-World Example
Here's how to make your email worker truly bulletproof:
use ;
use Arc;
async
Now your worker will:
- Survive RabbitMQ restarts
- Handle temporary network partitions
- Retry failed messages with backoff
- Move poison pills to DLQ after 5 attempts
- Keep running for months without intervention
That's production-ready.
Load Balancing Strategies
| Strategy | Best For | Notes |
|---|---|---|
| RoundRobin | Uniform workloads | Even distribution |
| Random | Simple setups | Good enough usually |
| LeastLoaded | Variable workloads | Smartest routing |
Troubleshooting
Messages aren't being processed
Check:
-
Is your worker actually receiving messages?
init; // Enable logs -
Does the queue/stream name match what you're publishing to?
-
Are there permission issues on the queue?
-
Is your worker crashing silently? Wrap in try-catch:
async -
Using manual ack with AckNackMiddleware? This causes PRECONDITION_FAILED errors! Choose one pattern only.
Messages retrying forever
You have a poison pill (malformed message). Fix:
let retry_handler = default
.with_max_retries // Don't retry forever!
.with_dead_letter_queue;
Then inspect the DLQ to see what's failing.
High memory usage
Solutions:
-
Reduce concurrency:
.with_concurrency_limit // Instead of 500 -
Lower prefetch count (RabbitMQ):
prefetch_count: 10 // Instead of 100 -
Check for memory leaks in your worker code
Slow processing
Debug steps:
- Profile your worker-the bottleneck is usually your code, not the framework
- Increase concurrency if I/O-bound:
.with_concurrency_limit - Use batch processing for database operations
- Check network latency to your broker
Worker keeps crashing
Use Foxtive Supervisor for automatic restarts:
use Supervisor;
let supervisor = new
.add_child
.start
.await?;
Architecture
How it all fits together:
Message Broker (RabbitMQ/Redis)
│
▼
MessageBackend ← Abstracts broker-specific code
│
▼
WorkerPool ← Load balances across workers
│
├──► Middleware Chain ← Retry, circuit breaker, tracing
│ │
│ ▼
└──► Worker ← Your business logic
│
▼
Ack/Nack ← Manual acknowledgment
Design Decisions
Manual ack/nack via middleware
AckNackMiddlewareautomatically acknowledges based on worker result- You control acknowledgment behavior through middleware configuration
- No accidental message loss, explicit error handling
- Workers focus on business logic, not infrastructure concerns
Middleware pipeline
- Composable, reusable components
- Add/remove functionality without touching worker code
- Clean separation of concerns
Trait-based backends
- Swap RabbitMQ for Redis without changing worker logic
- Easy to add new brokers (Kafka, NATS, etc.)
Semaphore-based concurrency
- Precise control over parallelism
- Prevents resource exhaustion
- Works across all workers in the pool
Spawn-per-message
- Each message gets its own Tokio task
- Simple mental model
- Leverages Tokio's work-stealing scheduler
Contributing
Contributions welcome! Guidelines:
- Run tests:
cargo test - Format code:
cargo fmt - Clippy clean:
cargo clippy -- -D warnings - Update docs: If you change public API, update docs
Open an issue first for big changes. Let's discuss before you spend weeks on a PR.
License
MIT License - do whatever you want with this.
Credits
Built with:
- Tokio - Async runtime
- lapin - RabbitMQ client
- redis - Redis client
- governor - Rate limiting
- tracing - Structured logging
Inspired by Celery, Sidekiq, and Bull-but faster because Rust.
Need help? Open an issue on GitHub. Found a bug? Same thing. Want to chat? Also an issue.
Happy coding!
🦊