RustRabbit
A high-performance, production-ready RabbitMQ client library for Rust with advanced observability, resilience, and performance features. Inspired by MassTransit for .NET, RustRabbit provides enterprise-grade messaging capabilities.
✨ Features
🚀 Performance & Scalability
- Message Batching: High-throughput batch publishing for optimal performance
- Connection Pooling: Automatic connection management with health monitoring
- Async/Await: Full tokio async runtime integration
🔍 Observability & Monitoring
- Prometheus Metrics: Comprehensive metrics for throughput, latency, and errors
- Health Checks: Real-time connection health monitoring with detailed status
- Structured Logging: Integrated tracing with correlation IDs
🛡️ Resilience & Reliability
- Circuit Breaker: Automatic failure detection and recovery for connections
- Advanced Retry: Built-in exponential backoff with jitter
- Graceful Shutdown: Multi-phase shutdown with signal handling
⚙️ Developer Experience
- Builder Pattern: Fluent, type-safe configuration API
- Type Safety: Strongly typed message handling with serde integration
- Comprehensive Testing: Extensive test coverage with integration tests
🚀 Quick Start
Add RustRabbit to your Cargo.toml:
[]
= "0.2.0"
= { = "1.0", = ["full"] }
= { = "1.0", = ["derive"] }
Basic Usage
use ;
use ;
async
🔍 Advanced Features
Prometheus Metrics
Enable comprehensive metrics collection:
use ;
async
Message Batching
For high-throughput scenarios:
use ;
async
Graceful Shutdown
Handle shutdown signals properly:
use ;
use Arc;
async
Circuit Breaker
Automatic connection resilience:
use ;
use Duration;
async
📊 Performance Benchmarks
RustRabbit v0.2.0 Performance Results:
| Metric | Value |
|---|---|
| Throughput | 50,000+ msgs/sec |
| Latency (P99) | < 10ms |
| Memory Usage | < 50MB baseline |
| Connection Pool | 10-100 connections |
Benchmarks run on: Intel i7-10700K, 32GB RAM, RabbitMQ 3.12
🏗️ Architecture
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ Application │ │ RustRabbit │ │ RabbitMQ │
│ │ │ │ │ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ Publishers │◄───┼────┼──► Message │ │ │ │ Exchanges │ │
│ └─────────────┘ │ │ │ Batching │◄───┼────┼──► & Queues │ │
│ │ │ └─────────────┘ │ │ └─────────────┘ │
│ ┌─────────────┐ │ │ │ │ │
│ │ Consumers │◄───┼────┼──► Circuit │ │ │ ┌─────────────┐ │
│ └─────────────┘ │ │ │ Breaker │◄───┼────┼──► Health │ │
│ │ │ └─────────────┘ │ │ │ Monitoring │ │
│ ┌─────────────┐ │ │ │ │ └─────────────┘ │
│ │ Metrics │◄───┼────┼──► Prometheus │ │ │ │
│ │ Dashboard │ │ │ │ Metrics │ │ │ │
│ └─────────────┘ │ │ └─────────────┘ │ │ │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
async fn main() -> Result<(), Box> { let config = RabbitConfig::builder() .connection_string("amqp://localhost:5672") .build();
let rabbit = RustRabbit::new(config).await?;
// Use builder for consumer options
let consumer_options = ConsumerOptions::builder("orders")
.consumer_tag("order-processor")
.concurrency(5)
.auto_declare_queue()
.reliable()
.build();
let consumer = rabbit.consumer(consumer_options).await?;
let handler = Arc::new(OrderHandler);
🛠️ Configuration
RustRabbit offers extensive configuration options:
use ;
use Duration;
let config = builder
.connection_string
.virtual_host
.pool
.retry
.health_check
.build;
📈 Metrics & Monitoring
Available Metrics
RustRabbit exposes comprehensive Prometheus metrics:
Message Metrics:
rustrabbit_messages_published_total- Total messages publishedrustrabbit_messages_consumed_total- Total messages consumedrustrabbit_messages_failed_total- Total failed messagesrustrabbit_message_processing_duration_seconds- Message processing latency
Connection Metrics:
rustrabbit_connections_total- Total connections in poolrustrabbit_connections_healthy- Healthy connectionsrustrabbit_connection_failures_total- Connection failures
Queue Metrics:
rustrabbit_queue_depth- Messages waiting in queuerustrabbit_consumer_count- Active consumers per queue
Grafana Dashboard
Import our pre-built Grafana dashboard for instant visibility:
# Download dashboard JSON
# Import into Grafana via UI or API
🚀 Production Deployment
Docker Compose Example
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.12-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
volumes:
- rabbitmq_data:/var/lib/rabbitmq
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
volumes:
rabbitmq_data:
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-rustrabbit-app
spec:
replicas: 3
selector:
matchLabels:
app: my-rustrabbit-app
template:
metadata:
labels:
app: my-rustrabbit-app
spec:
containers:
- name: app
image: my-rustrabbit-app:latest
ports:
- containerPort: 8080
env:
- name: RABBITMQ_URL
value: "amqp://rabbitmq:5672"
- name: RUST_LOG
value: "info"
🔧 Development
Prerequisites
- Rust 1.70+
- RabbitMQ 3.9+
- Docker (for integration tests)
Building
Testing
# Unit tests
# Integration tests (requires RabbitMQ)
# Run with coverage
Examples
Run the examples to see RustRabbit in action:
# Basic publisher/consumer
# Advanced features
🗺️ Roadmap
Phase 1 (v0.2.0) ✅ COMPLETED
- Prometheus metrics integration
- Circuit breaker pattern
- Message batching
- Graceful shutdown handling
Phase 2 (v0.3.0) - Advanced Messaging Patterns
- Request-Response pattern with correlation IDs
- Saga pattern for distributed transactions
- Event sourcing support
- Message deduplication
- Priority queues
Phase 3 (v0.4.0) - Enterprise Features
- Multi-broker support with failover
- Message encryption at rest
- Schema registry integration
- Advanced routing patterns
- Performance optimization
🤝 Contributing
We welcome contributions! Please see our Contributing Guide for details.
Areas for Contribution
- 🐛 Bug fixes and improvements
- 📚 Documentation enhancements
- ✨ New features from roadmap
- 🧪 Additional test coverage
- 📊 Performance optimizations
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
🙏 Acknowledgments
- Inspired by MassTransit for .NET
- Built on lapin for RabbitMQ protocol
- Metrics powered by Prometheus
🆘 Support
- 📖 Documentation
- 💬 GitHub Discussions
- 🐛 Issue Tracker
- 📧 Email: nghiaphamln3@gmail.com
⭐ Star us on GitHub if RustRabbit helps your project! ⭐
Health Monitoring
Monitor your RabbitMQ connections with built-in health checking:
use ;
// Configure health checking
let mut config = default;
config.health_check = aggressive;
let rabbit = new.await?;
let health_checker = rabbit.health_checker;
// Start monitoring
health_checker.start_monitoring.await?;
// Check health status
let is_healthy = health_checker.is_healthy.await;
let summary = health_checker.get_health_summary.await;
// Wait for healthy connection
health_checker.wait_for_healthy.await?;
Connection Pooling
RustRabbit automatically manages connection pools with builder pattern:
use RabbitConfig;
let config = builder
.connection_string
.retry
.pool
.build;
Message Options
Customize message publishing with builder pattern:
use PublishOptions;
let options = builder
.durable
.message_id
.correlation_id
.ttl
.priority
.header_string
.header_int
.auto_declare_exchange
.development
.build;
publisher.publish_to_exchange.await?;
Configuration with Builder Pattern
Environment-Specific Configurations
// Development configuration
let dev_config = builder
.connection_string
.retry
.health
.pool
.build;
// Production configuration
let prod_config = builder
.connection_string
.connection_timeout
.retry
.health
.pool
.build;
Consumer Configuration
// High throughput consumer
let high_throughput_options = builder
.consumer_tag
.high_throughput
.auto_declare_queue
.dead_letter_exchange
.build;
// Reliable consumer
let reliable_options = builder
.consumer_tag
.reliable
.manual_ack
.prefetch_count
.build;
Configuration
RabbitConfig
The main configuration struct with builder pattern:
use RabbitConfig;
let config = builder
.connection_string
.virtual_host
.connection_timeout
.heartbeat
.retry
.health
.pool
.build;
Consumer Options
Configure consumer behavior with builder:
use ConsumerOptions;
let consumer_options = builder
.consumer_tag
.concurrency
.prefetch_count
.auto_declare_queue
.dead_letter_exchange
.manual_ack
.build;
Error Handling
RustRabbit provides comprehensive error handling:
use ;
match publisher.publish_to_queue.await
Testing
RustRabbit includes comprehensive test suites to ensure reliability and performance.
Unit Tests
Run unit tests with:
# or
Integration Tests with Real RabbitMQ
RustRabbit supports integration testing with real RabbitMQ instances using Docker:
# Start RabbitMQ and run integration tests
# Or manually:
Quick Development Setup
# Setup development environment
# Start RabbitMQ for development
# Run tests with file watching
# Run integration tests with file watching
Test Coverage
The integration tests cover:
- ✅ End-to-end message publishing and consumption
- ✅ Retry mechanisms with delayed message exchange
- ✅ Health monitoring and connection management
- ✅ Performance benchmarking
- ✅ Error handling and recovery scenarios
- ✅ Builder pattern configuration
- ✅ Concurrent message processing
See INTEGRATION_TESTING.md for detailed testing documentation.
Requirements
- RabbitMQ: Version 3.8 or higher
- Rust: Version 1.70 or higher
- Optional: RabbitMQ delayed message exchange plugin for advanced retry features
Installing RabbitMQ Delayed Message Exchange Plugin
For advanced retry functionality, install the delayed message exchange plugin:
# Download and enable the plugin
Examples
Check out the examples/ directory for more comprehensive examples:
builder_pattern_example.rs- Comprehensive builder pattern usagepublisher_example.rs- Various publishing patternsconsumer_example.rs- Consumer with retry and error handlingretry_example.rs- Advanced retry mechanismshealth_monitoring_example.rs- Health monitoring and connection management
Run examples with:
Performance
RustRabbit is designed for high performance:
- Connection Pooling: Efficient connection reuse
- Async/Await: Non-blocking I/O operations
- Concurrent Processing: Configurable message processing concurrency
- Memory Efficient: Minimal allocations and zero-copy where possible
Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Acknowledgments
- Inspired by MassTransit for .NET
- Built on top of the excellent lapin RabbitMQ client
- Thanks to the Rust async ecosystem (tokio, futures, etc.)