Expand description
§RustRabbit 🐰
A high-performance, production-ready RabbitMQ client library for Rust with zero-configuration simplicity and enterprise-grade features. Built for reliability, observability, and developer happiness.
§Features
- 🚀 Smart Automation: One-line setup with
RetryPolicy::fast()configures everything - 🔄 Advanced Retry System: Multiple presets, exponential backoff, dead letter integration
- 🏗️ Enterprise Patterns: Request-Response, Saga, Event Sourcing, Priority Queues
- 🔍 Production Observability: Prometheus metrics, health monitoring, circuit breaker
- 🛡️ Reliability: Connection pooling, graceful shutdown, error recovery
§Quick Start
use rust_rabbit::{
config::RabbitConfig,
connection::ConnectionManager,
consumer::{Consumer, ConsumerOptions},
retry::RetryPolicy,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Connection
let config = RabbitConfig::builder()
.connection_string("amqp://user:pass@localhost:5672/vhost")
.build();
let connection = ConnectionManager::new(config).await?;
// 2. Consumer with retry (1 line!)
let options = ConsumerOptions {
auto_ack: false,
retry_policy: Some(RetryPolicy::fast()),
..Default::default()
};
// 3. Create consumer (ready to use)
let _consumer = Consumer::new(connection, options).await?;
// Consumer is ready! See examples/ for usage patterns
Ok(())
}What RetryPolicy::fast() creates automatically:
- ✅ 5 retries: 200ms → 300ms → 450ms → 675ms → 1s (capped at 10s)
- ✅ Dead Letter Queue: Automatic DLX/DLQ setup for failed messages
- ✅ Backoff + Jitter: Intelligent delay with randomization
- ✅ Production Ready: Optimal settings for most use cases
§Retry Patterns
use rust_rabbit::retry::RetryPolicy;
use std::time::Duration;
// Quick presets for common scenarios
let fast = RetryPolicy::fast(); // 5 retries, 200ms→10s, 1.5x backoff
let slow = RetryPolicy::slow(); // 3 retries, 1s→1min, 2.0x backoff
let linear = RetryPolicy::linear(Duration::from_millis(500), 3); // Fixed 500ms intervals
// Custom with builder
let custom = RetryPolicy::builder()
.max_retries(5)
.initial_delay(Duration::from_millis(100))
.backoff_multiplier(2.0)
.jitter(0.1)
.dead_letter_exchange("my.dlx")
.build();§Advanced Patterns
§Request-Response (RPC)
use rust_rabbit::patterns::request_response::*;
use std::time::Duration;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Simple example - actual usage requires proper message types
let client = RequestResponseClient::new(Duration::from_secs(30));
// In real usage, you would send actual request messages
// let response = client.send_request("queue", request_data, None).await?;
Ok(())
}§Event Sourcing (CQRS)
use rust_rabbit::patterns::event_sourcing::*;
use std::sync::Arc;
async fn example() -> Result<(), Box<dyn std::error::Error>> {
let event_store = Arc::new(InMemoryEventStore::new());
// Example - actual usage requires implementing AggregateRoot trait
// let repository = EventSourcingRepository::<MyAggregate>::new(event_store);
Ok(())
}§Production Features
§Health Monitoring
use rust_rabbit::{health::HealthChecker, connection::ConnectionManager, config::RabbitConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = RabbitConfig::builder()
.connection_string("amqp://localhost:5672")
.build();
let connection_manager = ConnectionManager::new(config).await?;
let health_checker = HealthChecker::new(connection_manager.clone());
match health_checker.check_health().await {
Ok(status) => println!("Connection healthy: {:?}", status),
Err(e) => println!("Connection issues: {}", e),
}
Ok(())
}§Prometheus Metrics
use rust_rabbit::metrics::RustRabbitMetrics;
let metrics = RustRabbitMetrics::new();
// Metrics automatically collected:
// - rust_rabbit_messages_published_total
// - rust_rabbit_messages_consumed_total
// - rust_rabbit_message_processing_duration_seconds
// - rust_rabbit_connection_healthRe-exports§
pub use batching::BatchConfig;pub use batching::BatchConfigBuilder;pub use batching::MessageBatcher;pub use circuit_breaker::CircuitBreaker;pub use circuit_breaker::CircuitBreakerConfig;pub use circuit_breaker::CircuitBreakerStats;pub use circuit_breaker::CircuitState;pub use config::HealthCheckConfig;pub use config::HealthCheckConfigBuilder;pub use config::PoolConfig;pub use config::PoolConfigBuilder;pub use config::RabbitConfig;pub use config::RabbitConfigBuilder;pub use config::RetryConfig;pub use config::RetryConfigBuilder;pub use connection::Connection;pub use connection::ConnectionManager;pub use connection::ConnectionStats;pub use consumer::Consumer;pub use consumer::ConsumerOptions;pub use consumer::ConsumerOptionsBuilder;pub use consumer::MessageHandler;pub use error::RabbitError;pub use error::Result;pub use error::RustRabbitError;pub use health::ConnectionStatus;pub use health::HealthChecker;pub use metrics::MetricsTimer;pub use metrics::RustRabbitMetrics;pub use patterns::deduplication::ContentHash;pub use patterns::deduplication::DeduplicatedMessage;pub use patterns::deduplication::DeduplicationConfig;pub use patterns::deduplication::DeduplicationManager;pub use patterns::deduplication::DeduplicationResult;pub use patterns::deduplication::DeduplicationStrategy;pub use patterns::deduplication::DuplicateInfo;pub use patterns::deduplication::MessageId;pub use patterns::event_sourcing::AggregateId;pub use patterns::event_sourcing::AggregateRoot;pub use patterns::event_sourcing::AggregateSnapshot;pub use patterns::event_sourcing::DomainEvent;pub use patterns::event_sourcing::EventReplayService;pub use patterns::event_sourcing::EventSequence;pub use patterns::event_sourcing::EventSourcingRepository;pub use patterns::event_sourcing::EventStore;pub use patterns::event_sourcing::InMemoryEventStore;pub use patterns::priority::Priority;pub use patterns::priority::PriorityConsumer;pub use patterns::priority::PriorityMessage;pub use patterns::priority::PriorityQueue;pub use patterns::priority::PriorityQueueConfig;pub use patterns::priority::PriorityRouter;pub use patterns::request_response::CorrelationId;pub use patterns::request_response::RequestHandler;pub use patterns::request_response::RequestMessage;pub use patterns::request_response::RequestResponseClient;pub use patterns::request_response::RequestResponseServer;pub use patterns::request_response::ResponseMessage;pub use patterns::saga::SagaAction;pub use patterns::saga::SagaCoordinator;pub use patterns::saga::SagaId;pub use patterns::saga::SagaInstance;pub use patterns::saga::SagaStatus;pub use patterns::saga::SagaStep;pub use patterns::saga::SagaStepExecutor;pub use patterns::saga::StepResult;pub use patterns::saga::StepStatus;pub use publisher::CustomExchangeDeclareOptions;pub use publisher::CustomQueueDeclareOptions;pub use publisher::PublishOptions;pub use publisher::PublishOptionsBuilder;pub use publisher::Publisher;pub use retry::DelayedMessageExchange;pub use retry::RetryPolicy;pub use shutdown::setup_signal_handling;pub use shutdown::ShutdownConfig;pub use shutdown::ShutdownHandler;pub use shutdown::ShutdownManager;pub use shutdown::ShutdownSignal;
Modules§
- batching
- Message batching implementation for high-throughput scenarios
- circuit_
breaker - Circuit breaker implementation for connection resilience
- config
- connection
- consumer
- error
- health
- metrics
- Metrics and observability module for RustRabbit
- patterns
- publisher
- retry
- shutdown
- Graceful shutdown handling for RustRabbit
Structs§
- Rust
Rabbit - Main facade for the rust-rabbit library