celers-kombu
Broker abstraction layer for CeleRS, inspired by Python's Kombu library. Provides unified traits for message broker implementations.
Status: [Stable] — v0.2.0 (2026-03-27) — 323 tests
Overview
Production-ready broker abstraction with:
- ✅ Transport Abstraction: Unified interface for Redis, AMQP, SQS, PostgreSQL
- ✅ Producer/Consumer: Separate publish/consume interfaces
- ✅ Queue Management: Create, delete, purge queues
- ✅ FIFO & Priority Modes: Flexible queue ordering
- ✅ Acknowledgments: Message delivery guarantees
- ✅ Requeue Support: Retry failed messages
- ✅ Message Envelope: Delivery metadata tracking
- ✅ Error Handling: Comprehensive error types
- ✅ Dead Letter Queue (DLQ): Failed message handling with retry tracking
- ✅ Message Transactions: ACID guarantees with isolation levels
- ✅ Message Scheduling: Delayed delivery with absolute/relative timing
- ✅ Consumer Groups: Load-balanced distributed consumption
- ✅ Message Replay: Debugging and recovery with progress tracking
- ✅ Quota Management: Resource limits with enforcement policies
- ✅ Flow Control: Backpressure detection and poison message handling
- ✅ Middleware System: 21 middleware types for transformation, validation, security, and reliability
- Built-in (18): Validation, Logging, Metrics, Retry Limit, Rate Limiting, Deduplication, Timeout, Filter, Sampling, Transformation, Tracing, Batching, Audit, Deadline, ContentType, RoutingKey, Idempotency, Backoff
- Feature-gated (3): Compression (Gzip), Signing (HMAC), Encryption (AES-256-GCM)
- ✅ Utilities Module: 47 helper functions for optimization, monitoring, and operational excellence
Architecture
┌─────────────────────────────────────────────────────────┐
│ Broker Trait │
│ (Full producer + consumer) │
└─────────────────────────────────────────────────────────┘
│
┌───────────────┴───────────────┐
│ │
┌───────▼────────┐ ┌─────────▼────────┐
│ Producer │ │ Consumer │
│ (Publishing) │ │ (Consuming) │
└───────┬────────┘ └─────────┬────────┘
│ │
└───────────────┬───────────────┘
│
┌───────▼────────┐
│ Transport │
│ (Connection) │
└────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
┌───────▼──┐ ┌──────▼───┐ ┌──────▼───┐
│ Redis │ │ AMQP │ │ SQS │
└──────────┘ └──────────┘ └──────────┘
Quick Start
Implementing a Broker
use ;
use async_trait;
Traits
Transport
Low-level connection management:
Usage:
let mut transport = new;
transport.connect.await?;
assert!;
println!;
transport.disconnect.await?;
Producer
Message publishing interface:
Usage:
use Message;
use Uuid;
let message = new;
// Simple publish
producer.publish.await?;
// Routing (AMQP-style)
producer.publish_with_routing.await?;
Consumer
Message consumption interface:
Usage:
use Duration;
// Consume with timeout
if let Some = consumer.consume.await?
// Check queue size
let size = consumer.queue_size.await?;
println!;
Broker
Full broker interface combining producer and consumer:
Usage:
// Create queues
broker.create_queue.await?;
broker.create_queue.await?;
// List queues
let queues = broker.list_queues.await?;
println!;
// Purge queue
let removed = broker.purge.await?;
println!;
// Delete queue
broker.delete_queue.await?;
Types
Message Envelope
Usage:
if let Some = consumer.consume.await?
Queue Mode
FIFO Mode:
- Tasks processed in order received
- Simpler implementation
- Higher throughput
Priority Mode:
- Tasks with higher priority processed first
- Requires sorted structure (e.g., Redis ZSET)
- Slightly lower throughput
Usage:
// FIFO queue (default)
broker.create_queue.await?;
// Priority queue
broker.create_queue.await?;
Queue Configuration
Builder pattern:
use Duration;
let config = new
.with_mode
.with_ttl;
assert_eq!;
assert_eq!;
assert!;
Error Types
Usage:
use BrokerError;
match consumer.consume.await
Broker Implementations
Available Implementations
| Broker | Crate | Transport Type | Features |
|---|---|---|---|
| Redis | celers-broker-redis |
In-memory | Fast, simple, FIFO/Priority |
| PostgreSQL | celers-broker-postgres |
Database | Durable, transactional |
| RabbitMQ | celers-broker-amqp |
Message broker | Advanced routing, exchanges |
| AWS SQS | celers-broker-sqs |
Cloud queue | Managed, scalable |
| SQL | celers-broker-sql |
Database | Generic SQL support |
Example: Using Redis Broker
use RedisBroker;
use ;
async
Producer/Consumer Pattern
Producer (Publisher)
use Producer;
async
Consumer (Worker)
use Consumer;
use Duration;
async
Message Acknowledgment Patterns
Automatic Acknowledgment
if let Some = consumer.consume.await?
Pros: Lower latency, simpler Cons: Message loss if processing fails
Manual Acknowledgment (Recommended)
if let Some = consumer.consume.await?
Pros: No message loss Cons: Possible duplicates, higher latency
Dead Letter Queue Pattern
const MAX_RETRIES: u32 = 3;
if let Some = consumer.consume.await?
Routing Patterns
Direct Routing
// Simple queue name routing
producer.publish.await?;
Topic Routing (AMQP-style)
// Routing key pattern: "tasks.high_priority"
producer.publish_with_routing.await?;
// Routing key pattern: "logs.error"
producer.publish_with_routing.await?;
Fanout (Broadcast)
// Publish to exchange, all queues receive
producer.publish_with_routing.await?;
Middleware
The crate provides a powerful middleware system for message transformation, validation, security, and reliability.
Middleware Chain
use ;
let chain = new
.with_middleware
.with_middleware;
// Use with producer
producer.publish_with_middleware.await?;
// Use with consumer
if let Some = consumer.consume_with_middleware.await?
Built-in Middleware
ValidationMiddleware
Validates message structure and size limits:
use ValidationMiddleware;
let validator = new
.with_max_body_size // 10MB limit
.with_require_task_name;
let chain = new
.with_middleware;
LoggingMiddleware
Logs message events for debugging:
use LoggingMiddleware;
let logger = new
.with_body_logging; // Enable detailed logging
let chain = new
.with_middleware;
MetricsMiddleware
Collects message statistics:
use MetricsMiddleware;
use ;
let metrics = new;
let metrics_mw = new;
let chain = new
.with_middleware;
// Later, get metrics snapshot
let snapshot = metrics.lock.unwrap.clone;
println!;
RetryLimitMiddleware
Enforces maximum retry count:
use RetryLimitMiddleware;
let retry_limiter = new; // Max 3 retries
let chain = new
.with_middleware;
RateLimitingMiddleware
Controls message publishing rate using token bucket algorithm:
use RateLimitingMiddleware;
let rate_limiter = new; // 100 messages/sec
let chain = new
.with_middleware;
// Automatically enforces rate limit on publish
producer.publish_with_middleware.await?;
DeduplicationMiddleware
Prevents duplicate message processing:
use DeduplicationMiddleware;
let dedup = new; // Track 10K message IDs
// Or use default: let dedup = DeduplicationMiddleware::with_default_cache();
let chain = new
.with_middleware;
// Rejects duplicate messages based on task_id
consumer.consume_with_middleware.await?;
TimeoutMiddleware
Enforces message processing timeouts:
use TimeoutMiddleware;
use Duration;
let timeout = new;
let chain = new
.with_middleware;
// Timeout metadata injected into message headers
FilterMiddleware
Selectively processes messages based on custom predicates:
use FilterMiddleware;
let filter = new;
let chain = new
.with_middleware;
// Only processes messages matching the predicate
SamplingMiddleware
Statistical message sampling for monitoring/testing:
use SamplingMiddleware;
let sampler = new; // Sample 10% of messages
let chain = new
.with_middleware;
TransformationMiddleware
Custom message content transformation:
use TransformationMiddleware;
let transformer = new;
let chain = new
.with_middleware;
TracingMiddleware
Distributed tracing support with automatic trace ID propagation:
use TracingMiddleware;
let tracer = new;
let chain = new
.with_middleware;
// Injects trace IDs, span IDs, and timestamps for latency analysis
BatchingMiddleware
Automatic message batching hints for batch-aware consumers:
use BatchingMiddleware;
use Duration;
let batcher = new;
let chain = new
.with_middleware;
// Suggests batching metadata (size: 100, timeout: 5s)
AuditMiddleware
Comprehensive audit logging for compliance:
use AuditMiddleware;
let auditor = new
.with_body_logging; // Include message body in audit trail
let chain = new
.with_middleware;
// Generates unique audit IDs and tracks all operations
DeadlineMiddleware
Hard deadline enforcement (absolute time-based):
use DeadlineMiddleware;
use Duration;
let deadline = new; // 5 min deadline
let chain = new
.with_middleware;
// Rejects messages that exceed their absolute deadline
ContentTypeMiddleware
Content type validation and conversion:
use ContentTypeMiddleware;
let validator = new
.with_allowed_types
.with_default_type;
let chain = new
.with_middleware;
// Validates and enforces content types
RoutingKeyMiddleware
Dynamic routing key assignment:
use RoutingKeyMiddleware;
// From task name
let router = from_task_name;
// From task and priority
let router = from_task_and_priority;
// Custom routing logic
let router = new;
let chain = new
.with_middleware;
IdempotencyMiddleware (NEW - v0.4.7)
Exactly-once message processing guarantee:
use IdempotencyMiddleware;
let idempotency = new; // Track 10K message IDs
// Or use default: IdempotencyMiddleware::with_default_cache();
let chain = new
.with_middleware;
// Tracks processed messages to prevent duplicate processing on retries
// Sets x-already-processed header to true for duplicates
BackoffMiddleware (NEW - v0.4.7)
Automatic retry backoff calculation with jitter:
use BackoffMiddleware;
use Duration;
let backoff = new;
// Or use defaults: BackoffMiddleware::with_defaults();
let chain = new
.with_middleware;
// Calculates exponential backoff with 0-25% jitter
// Injects x-backoff-delay and x-next-retry headers
Feature-Gated Middleware
The following middleware require enabling feature flags in Cargo.toml:
CompressionMiddleware
Compresses message bodies (requires compression feature):
[]
= { = "0.2", = ["compression"] }
use CompressionMiddleware;
use CompressionType;
SigningMiddleware
Signs messages with HMAC-SHA256 (requires signing feature):
[]
= { = "0.2", = ["signing"] }
use SigningMiddleware;
EncryptionMiddleware
Encrypts messages with AES-256-GCM (requires encryption feature):
[]
= { = "0.2", = ["encryption"] }
use EncryptionMiddleware;
Enable All Features
[]
= { = "0.2", = ["full"] }
Combining Middleware
Create a complete middleware pipeline:
use *;
use ;
let metrics = new;
let chain = new
// Validation first
.with_middleware
// Rate limiting
.with_middleware
// Deduplication
.with_middleware
// Logging
.with_middleware
// Metrics collection
.with_middleware
// Retry limit
.with_middleware;
// Optional: Add compression, signing, encryption (if features enabled)
let chain = chain.with_middleware;
let chain = chain.with_middleware;
let chain = chain.with_middleware;
Dead Letter Queue (DLQ)
Handle failed messages with automatic retry tracking:
use ;
// Configure DLQ
let dlq_config = new
.with_max_retries
.with_ttl // 24 hours
.with_metadata;
// Send to DLQ
broker.send_to_dlq.await?;
// Retrieve from DLQ
if let Some = broker.get_from_dlq.await?
// Retry from DLQ
broker.retry_from_dlq.await?;
// Get DLQ statistics
let stats = broker.dlq_stats.await?;
println!;
// Purge DLQ
broker.purge_dlq.await?;
Message Transactions
ACID guarantees for message operations:
use ;
// Begin transaction with isolation level
let tx_id = broker.begin_transaction.await?;
// Publish within transaction
broker.publish_transactional.await?;
broker.publish_transactional.await?;
// Consume within transaction
if let Some = broker.consume_transactional.await?
// Commit or rollback
if success else
// Check transaction state
let state = broker.transaction_state.await?;
Isolation Levels:
ReadUncommitted: Dirty reads allowedReadCommitted: Only committed data visibleRepeatableRead: Consistent snapshotSerializable: Full isolation
Message Scheduling
Delay message delivery with flexible timing:
use ;
// Schedule with delay
let schedule = delay; // 1 hour delay
// Schedule at absolute time
let schedule = at;
// Schedule with execution window
let schedule = delay
.with_window; // Allow ±30s variance
// Schedule message
let scheduled_id = broker.schedule_message.await?;
// Cancel scheduled message
broker.cancel_scheduled.await?;
// List scheduled messages
let scheduled = broker.list_scheduled.await?;
for msg in scheduled
// Check if ready for delivery
if schedule.is_ready
Consumer Groups
Load-balanced distributed consumption:
use ;
// Configure consumer group
let config = new
.with_max_consumers
.with_heartbeat_interval
.with_rebalance_timeout;
// Join group
let consumer_id = broker.join_group.await?;
println!;
// Send heartbeat (keep membership alive)
broker.heartbeat.await?;
// Consume with automatic load balancing
if let Some = broker.consume_from_group.await?
// Get group members
let members = broker.group_members.await?;
println!;
// Leave group
broker.leave_group.await?;
Message Replay
Debug and recover with historical message replay:
use ;
// Replay last hour
let config = from_duration;
// Replay from specific timestamp
let config = from_timestamp;
// Replay with limits
let config = from_duration
.with_max_messages
.with_speed; // 2x speed
// Begin replay session
let session_id = broker.begin_replay.await?;
// Replay messages
loop
// Track progress
let progress = broker.replay_progress.await?;
println!;
// Stop replay
broker.stop_replay.await?;
Quota Management
Resource limits with flexible enforcement:
use ;
// Configure quotas
let quota = new
.with_max_messages
.with_max_bytes // 100 MB
.with_max_rate // 100 messages/sec
.with_max_per_consumer
.with_enforcement;
// Set quota
broker.set_quota.await?;
// Check quota before operation
match broker.check_quota.await?
// Get quota usage
let usage = broker.quota_usage.await?;
println!;
println!;
println!;
if usage.is_message_quota_exceeded
// Reset quota
broker.reset_quota.await?;
Enforcement Policies:
Reject: Reject operations exceeding quotaThrottle: Slow down operationsWarn: Log warnings but allow
Flow Control
Backpressure Detection
Automatic flow control based on queue metrics:
use BackpressureConfig;
let backpressure = new
.with_max_pending
.with_max_queue_size
.with_high_watermark // 80%
.with_low_watermark; // 20%
// Check if backpressure should be applied
if backpressure.should_apply_backpressure
// Check if backpressure can be released
if backpressure.should_release_backpressure
Poison Message Detection
Prevent infinite retry loops:
use PoisonMessageDetector;
let detector = new
.with_max_failures
.with_failure_window; // 5 min window
// Record failure
detector.record_failure;
// Check if poison
if detector.is_poison else
// Clear all tracking
detector.clear_all;
Utilities Module
47 helper functions for broker operations, optimization, and monitoring:
Batch Optimization
use utils;
// Calculate optimal batch size
let batch_size = calculate_optimal_batch_size;
// Calculate optimal workers
let workers = calculate_optimal_workers;
Performance Analysis
// Analyze broker performance
let = analyze_broker_performance;
println!;
// Calculate throughput
let throughput = calculate_throughput;
println!;
// Analyze circuit breaker
let action = analyze_circuit_breaker;
println!;
Queue Health Monitoring
// Analyze queue health
let health = analyze_queue_health;
println!; // Healthy, Warning, Critical
// Estimate drain time
let drain_time = estimate_drain_time;
println!;
// Estimate memory usage
let memory_mb = estimate_queue_memory;
println!;
Operational Excellence (NEW - v0.4.7)
// Anomaly detection
let = detect_anomalies;
// SLA compliance
let = calculate_sla_compliance;
// Error budget tracking
let = calculate_error_budget;
// Cost estimation
let monthly_cost = estimate_infrastructure_cost;
// Queue saturation prediction
let = predict_queue_saturation;
See examples for comprehensive usage: cargo run --example monitoring and cargo run --example operational_excellence.
Best Practices
1. Always Handle Timeouts
use Duration;
let timeout = from_secs;
match consumer.consume.await
2. Implement Graceful Shutdown
use signal;
async
3. Use Connection Pooling
// Create connection pool
let pool = create_broker_pool?;
// Acquire from pool
let mut broker = pool.acquire.await?;
broker.publish.await?;
// Return to pool (automatic)
drop;
4. Monitor Queue Sizes
async
5. Separate Queues by Priority
// High-priority queue
broker.create_queue.await?;
// Normal queue
broker.create_queue.await?;
// Low-priority queue
broker.create_queue.await?;
// Publish to appropriate queue
if is_urgent else
Testing
Examples
The crate includes 11 comprehensive examples demonstrating all features:
Basic Usage
# Complete broker implementation
# Middleware usage patterns
# Batch operations
Advanced Features
# Dead Letter Queue (DLQ)
# Message transactions with ACID guarantees
# Scheduling, consumer groups, replay, quotas
Flow Control & Resilience
# Backpressure, poison detection, timeout, filter
# Circuit breaker, connection pooling, health checks
Monitoring & Operational Excellence
# 47 utility functions showcase
# Production monitoring and observability
# Idempotency, backoff, anomaly detection, SLA tracking, error budgets
Each example includes detailed comments and demonstrates best practices for production use.
See Also
- Protocol:
celers-protocol- Message format definitions - Brokers:
celers-broker-redis,celers-broker-postgres, etc. - Core:
celers-core- Task execution
License
Apache-2.0