rust_rabbit/lib.rs
1//! # RustRabbit 🐰
2//!
3//! A **high-performance, production-ready** RabbitMQ client library for Rust with **zero-configuration**
4//! simplicity and enterprise-grade features. Built for reliability, observability, and developer happiness.
5//!
6//! ## Features
7//!
8//! - **🚀 Smart Automation**: One-line setup with `RetryPolicy::fast()` configures everything
9//! - **🔄 Advanced Retry System**: Multiple presets, exponential backoff, dead letter integration
10//! - **🏗️ Enterprise Patterns**: Request-Response, Saga, Event Sourcing, Priority Queues
11//! - **🔍 Production Observability**: Prometheus metrics, health monitoring, circuit breaker
12//! - **🛡️ Reliability**: Connection pooling, graceful shutdown, error recovery
13//!
14//! ## Quick Start
15//!
16//! ```rust,no_run
17//! use rust_rabbit::{
18//! config::RabbitConfig,
19//! connection::ConnectionManager,
20//! consumer::{Consumer, ConsumerOptions},
21//! retry::RetryPolicy,
22//! };
23//!
24//! #[tokio::main]
25//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
26//! // 1. Connection
27//! let config = RabbitConfig::builder()
28//! .connection_string("amqp://user:pass@localhost:5672/vhost")
29//! .build();
30//! let connection = ConnectionManager::new(config).await?;
31//!
32//! // 2. Consumer with retry (1 line!)
33//! let options = ConsumerOptions {
34//! auto_ack: false,
35//! retry_policy: Some(RetryPolicy::fast()),
36//! ..Default::default()
37//! };
38//!
39//! // 3. Create consumer (ready to use)
40//! let _consumer = Consumer::new(connection, options).await?;
41//!
42//! // Consumer is ready! See examples/ for usage patterns
43//! Ok(())
44//! }
45//! ```
46//!
47//! **What `RetryPolicy::fast()` creates automatically:**
48//! - ✅ **5 retries**: 200ms → 300ms → 450ms → 675ms → 1s (capped at 10s)
49//! - ✅ **Dead Letter Queue**: Automatic DLX/DLQ setup for failed messages
50//! - ✅ **Backoff + Jitter**: Intelligent delay with randomization
51//! - ✅ **Production Ready**: Optimal settings for most use cases
52//!
53//! ## Retry Patterns
54//!
55//! ```rust,no_run
56//! use rust_rabbit::retry::RetryPolicy;
57//! use std::time::Duration;
58//!
59//! // Quick presets for common scenarios
60//! let fast = RetryPolicy::fast(); // 5 retries, 200ms→10s, 1.5x backoff
61//! let slow = RetryPolicy::slow(); // 3 retries, 1s→1min, 2.0x backoff
62//! let linear = RetryPolicy::linear(Duration::from_millis(500), 3); // Fixed 500ms intervals
63//!
64//! // Custom with builder
65//! let custom = RetryPolicy::builder()
66//! .max_retries(5)
67//! .initial_delay(Duration::from_millis(100))
68//! .backoff_multiplier(2.0)
69//! .jitter(0.1)
70//! .dead_letter_exchange("my.dlx")
71//! .build();
72//! ```
73//!
74//! ## Advanced Patterns
75//!
76//! ### Request-Response (RPC)
77//!
78//! ```rust,no_run
79//! use rust_rabbit::patterns::request_response::*;
80//! use std::time::Duration;
81//! use std::sync::Arc;
82//!
83//! #[tokio::main]
84//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
85//! // Simple example - actual usage requires proper message types
86//! let client = RequestResponseClient::new(Duration::from_secs(30));
87//!
88//! // In real usage, you would send actual request messages
89//! // let response = client.send_request("queue", request_data, None).await?;
90//! Ok(())
91//! }
92//! ```
93//!
94//! ### Event Sourcing (CQRS)
95//!
96//! ```rust,no_run
97//! use rust_rabbit::patterns::event_sourcing::*;
98//! use std::sync::Arc;
99//!
100//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
101//! let event_store = Arc::new(InMemoryEventStore::new());
102//!
103//! // Example - actual usage requires implementing AggregateRoot trait
104//! // let repository = EventSourcingRepository::<MyAggregate>::new(event_store);
105//! Ok(())
106//! }
107//! ```
108//!
109//! ## Production Features
110//!
111//! ### Health Monitoring
112//!
113//! ```rust,no_run
114//! use rust_rabbit::{health::HealthChecker, connection::ConnectionManager, config::RabbitConfig};
115//!
116//! #[tokio::main]
117//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
118//! let config = RabbitConfig::builder()
119//! .connection_string("amqp://localhost:5672")
120//! .build();
121//! let connection_manager = ConnectionManager::new(config).await?;
122//! let health_checker = HealthChecker::new(connection_manager.clone());
123//!
124//! match health_checker.check_health().await {
125//! Ok(status) => println!("Connection healthy: {:?}", status),
126//! Err(e) => println!("Connection issues: {}", e),
127//! }
128//! Ok(())
129//! }
130//! ```
131//!
132//! ### Prometheus Metrics
133//!
134//! ```rust,no_run
135//! use rust_rabbit::metrics::RustRabbitMetrics;
136//!
137//! let metrics = RustRabbitMetrics::new();
138//! // Metrics automatically collected:
139//! // - rust_rabbit_messages_published_total
140//! // - rust_rabbit_messages_consumed_total
141//! // - rust_rabbit_message_processing_duration_seconds
142//! // - rust_rabbit_connection_health
143//! ```
144
145pub mod batching;
146pub mod circuit_breaker;
147pub mod config;
148pub mod connection;
149pub mod consumer;
150pub mod error;
151pub mod health;
152pub mod metrics;
153pub mod patterns; // Phase 2: Advanced messaging patterns
154pub mod publisher;
155pub mod retry;
156pub mod shutdown;
157
158pub use batching::{BatchConfig, BatchConfigBuilder, MessageBatcher};
159pub use circuit_breaker::{
160 CircuitBreaker, CircuitBreakerConfig, CircuitBreakerStats, CircuitState,
161};
162pub use config::{
163 HealthCheckConfig, HealthCheckConfigBuilder, PoolConfig, PoolConfigBuilder, RabbitConfig,
164 RabbitConfigBuilder, RetryConfig, RetryConfigBuilder,
165};
166pub use connection::{Connection, ConnectionManager, ConnectionStats};
167pub use consumer::{
168 BaseConsumer, Consumer, ConsumerOptions, ConsumerOptionsBuilder, MessageHandler,
169};
170pub use error::{ProcessingError, RabbitError, Result, RustRabbitError};
171pub use health::{ConnectionStatus, HealthChecker};
172pub use metrics::{MetricsTimer, RustRabbitMetrics};
173pub use patterns::{
174 // Message deduplication
175 deduplication::{
176 ContentHash, DeduplicatedMessage, DeduplicationConfig, DeduplicationManager,
177 DeduplicationResult, DeduplicationStrategy, DuplicateInfo, MessageId,
178 },
179 // Event sourcing
180 event_sourcing::{
181 AggregateId, AggregateRoot, AggregateSnapshot, DomainEvent, EventReplayService,
182 EventSequence, EventSourcingRepository, EventStore, InMemoryEventStore,
183 },
184 // Priority queues
185 priority::{
186 Priority, PriorityConsumer, PriorityMessage, PriorityQueue, PriorityQueueConfig,
187 PriorityRouter,
188 },
189 // Request-Response pattern
190 request_response::{
191 CorrelationId, RequestHandler, RequestMessage, RequestResponseClient,
192 RequestResponseServer, ResponseMessage,
193 },
194 // Saga pattern
195 saga::{
196 SagaAction, SagaCoordinator, SagaId, SagaInstance, SagaStatus, SagaStep, SagaStepExecutor,
197 StepResult, StepStatus,
198 },
199};
200pub use publisher::{
201 CustomExchangeDeclareOptions, CustomQueueDeclareOptions, PublishOptions, PublishOptionsBuilder,
202 Publisher,
203};
204pub use retry::{DelayedMessageExchange, RetryPolicy};
205pub use shutdown::{
206 setup_signal_handling, ShutdownConfig, ShutdownHandler, ShutdownManager, ShutdownSignal,
207};
208
209/// Main facade for the rust-rabbit library
210pub struct RustRabbit {
211 connection_manager: ConnectionManager,
212 metrics: Option<RustRabbitMetrics>,
213 shutdown_manager: Option<std::sync::Arc<ShutdownManager>>,
214}
215
216impl RustRabbit {
217 /// Create a new RustRabbit instance with the given configuration
218 pub async fn new(config: RabbitConfig) -> Result<Self> {
219 let connection_manager = ConnectionManager::new(config).await?;
220 Ok(Self {
221 connection_manager,
222 metrics: None,
223 shutdown_manager: None,
224 })
225 }
226
227 /// Create a new RustRabbit instance with metrics enabled
228 pub async fn with_metrics(config: RabbitConfig, metrics: RustRabbitMetrics) -> Result<Self> {
229 let connection_manager = ConnectionManager::new(config).await?;
230 Ok(Self {
231 connection_manager,
232 metrics: Some(metrics),
233 shutdown_manager: None,
234 })
235 }
236
237 /// Get a publisher instance
238 pub fn publisher(&self) -> Publisher {
239 let mut publisher = Publisher::new(self.connection_manager.clone());
240 if let Some(metrics) = &self.metrics {
241 publisher.set_metrics(metrics.clone());
242 }
243 publisher
244 }
245
246 /// Create a consumer with the given options
247 pub async fn consumer(&self, options: ConsumerOptions) -> Result<Consumer> {
248 let mut consumer = Consumer::new(self.connection_manager.clone(), options).await?;
249 if let Some(metrics) = &self.metrics {
250 consumer.set_metrics(metrics.clone());
251 }
252 Ok(consumer)
253 }
254
255 /// Get the health checker
256 pub fn health_checker(&self) -> HealthChecker {
257 let mut health_checker = HealthChecker::new(self.connection_manager.clone());
258 if let Some(metrics) = &self.metrics {
259 health_checker.set_metrics(metrics.clone());
260 }
261 health_checker
262 }
263
264 /// Get the metrics instance if enabled
265 pub fn metrics(&self) -> Option<&RustRabbitMetrics> {
266 self.metrics.as_ref()
267 }
268
269 /// Create a message batcher for high-throughput publishing
270 pub async fn create_batcher(&self, config: BatchConfig) -> Result<MessageBatcher> {
271 let publisher = self.publisher();
272
273 if let Some(metrics) = &self.metrics {
274 MessageBatcher::with_metrics(publisher, config, metrics.clone()).await
275 } else {
276 MessageBatcher::new(publisher, config).await
277 }
278 }
279
280 /// Enable graceful shutdown handling
281 pub fn enable_shutdown_handling(
282 &mut self,
283 config: ShutdownConfig,
284 ) -> std::sync::Arc<ShutdownManager> {
285 let shutdown_manager = std::sync::Arc::new(ShutdownManager::new(config));
286 self.shutdown_manager = Some(shutdown_manager.clone());
287 shutdown_manager
288 }
289
290 /// Get the shutdown manager if enabled
291 pub fn shutdown_manager(&self) -> Option<std::sync::Arc<ShutdownManager>> {
292 self.shutdown_manager.clone()
293 }
294
295 /// Close all connections with optional graceful shutdown
296 pub async fn close(&self) -> Result<()> {
297 if let Some(shutdown_manager) = &self.shutdown_manager {
298 shutdown_manager.shutdown(ShutdownSignal::Graceful).await?;
299 }
300 self.connection_manager.close().await
301 }
302}