rust_rabbit/
lib.rs

1//! # RustRabbit 🐰
2//!
3//! A **high-performance, production-ready** RabbitMQ client library for Rust with **zero-configuration**
4//! simplicity and enterprise-grade features. Built for reliability, observability, and developer happiness.
5//!
6//! ## Features
7//!
8//! - **🚀 Smart Automation**: One-line setup with `RetryPolicy::fast()` configures everything
9//! - **🔄 Advanced Retry System**: Multiple presets, exponential backoff, dead letter integration  
10//! - **🏗️ Enterprise Patterns**: Request-Response, Saga, Event Sourcing, Priority Queues
11//! - **🔍 Production Observability**: Prometheus metrics, health monitoring, circuit breaker
12//! - **🛡️ Reliability**: Connection pooling, graceful shutdown, error recovery
13//!
14//! ## Quick Start
15//!
16//! ```rust,no_run
17//! use rust_rabbit::{
18//!     config::RabbitConfig,
19//!     connection::ConnectionManager,
20//!     consumer::{Consumer, ConsumerOptions},
21//!     retry::RetryPolicy,
22//! };
23//!
24//! #[tokio::main]
25//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
26//!     // 1. Connection
27//!     let config = RabbitConfig::builder()
28//!         .connection_string("amqp://user:pass@localhost:5672/vhost")
29//!         .build();
30//!     let connection = ConnectionManager::new(config).await?;
31//!
32//!     // 2. Consumer with retry (1 line!)
33//!     let options = ConsumerOptions {
34//!         auto_ack: false,
35//!         retry_policy: Some(RetryPolicy::fast()),
36//!         ..Default::default()
37//!     };
38//!
39//!     // 3. Create consumer (ready to use)
40//!     let _consumer = Consumer::new(connection, options).await?;
41//!     
42//!     // Consumer is ready! See examples/ for usage patterns
43//!     Ok(())
44//! }
45//! ```
46//!
47//! **What `RetryPolicy::fast()` creates automatically:**
48//! - ✅ **5 retries**: 200ms → 300ms → 450ms → 675ms → 1s (capped at 10s)
49//! - ✅ **Dead Letter Queue**: Automatic DLX/DLQ setup for failed messages
50//! - ✅ **Backoff + Jitter**: Intelligent delay with randomization
51//! - ✅ **Production Ready**: Optimal settings for most use cases
52//!
53//! ## Retry Patterns
54//!
55//! ```rust,no_run
56//! use rust_rabbit::retry::RetryPolicy;
57//! use std::time::Duration;
58//!
59//! // Quick presets for common scenarios
60//! let fast = RetryPolicy::fast();               // 5 retries, 200ms→10s, 1.5x backoff
61//! let slow = RetryPolicy::slow();               // 3 retries, 1s→1min, 2.0x backoff
62//! let linear = RetryPolicy::linear(Duration::from_millis(500), 3); // Fixed 500ms intervals
63//!
64//! // Custom with builder
65//! let custom = RetryPolicy::builder()
66//!     .max_retries(5)
67//!     .initial_delay(Duration::from_millis(100))
68//!     .backoff_multiplier(2.0)
69//!     .jitter(0.1)
70//!     .dead_letter_exchange("my.dlx")
71//!     .build();
72//! ```
73//!
74//! ## Advanced Patterns
75//!
76//! ### Request-Response (RPC)
77//!
78//! ```rust,no_run
79//! use rust_rabbit::patterns::request_response::*;
80//! use std::time::Duration;
81//! use std::sync::Arc;
82//!
83//! #[tokio::main]
84//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
85//!     // Simple example - actual usage requires proper message types
86//!     let client = RequestResponseClient::new(Duration::from_secs(30));
87//!     
88//!     // In real usage, you would send actual request messages
89//!     // let response = client.send_request("queue", request_data, None).await?;
90//!     Ok(())
91//! }
92//! ```
93//!
94//! ### Event Sourcing (CQRS)
95//!
96//! ```rust,no_run
97//! use rust_rabbit::patterns::event_sourcing::*;
98//! use std::sync::Arc;
99//!
100//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
101//!     let event_store = Arc::new(InMemoryEventStore::new());
102//!     
103//!     // Example - actual usage requires implementing AggregateRoot trait
104//!     // let repository = EventSourcingRepository::<MyAggregate>::new(event_store);
105//!     Ok(())
106//! }
107//! ```
108//!
109//! ## Production Features
110//!
111//! ### Health Monitoring
112//!
113//! ```rust,no_run
114//! use rust_rabbit::{health::HealthChecker, connection::ConnectionManager, config::RabbitConfig};
115//!
116//! #[tokio::main]
117//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
118//!     let config = RabbitConfig::builder()
119//!         .connection_string("amqp://localhost:5672")
120//!         .build();
121//!     let connection_manager = ConnectionManager::new(config).await?;
122//!     let health_checker = HealthChecker::new(connection_manager.clone());
123//!     
124//!     match health_checker.check_health().await {
125//!         Ok(status) => println!("Connection healthy: {:?}", status),
126//!         Err(e) => println!("Connection issues: {}", e),
127//!     }
128//!     Ok(())
129//! }
130//! ```
131//!
132//! ### Prometheus Metrics
133//!
134//! ```rust,no_run
135//! use rust_rabbit::metrics::RustRabbitMetrics;
136//!
137//! let metrics = RustRabbitMetrics::new();
138//! // Metrics automatically collected:
139//! // - rust_rabbit_messages_published_total
140//! // - rust_rabbit_messages_consumed_total  
141//! // - rust_rabbit_message_processing_duration_seconds
142//! // - rust_rabbit_connection_health
143//! ```
144
145pub mod batching;
146pub mod circuit_breaker;
147pub mod config;
148pub mod connection;
149pub mod consumer;
150pub mod error;
151pub mod health;
152pub mod metrics;
153pub mod patterns; // Phase 2: Advanced messaging patterns
154pub mod publisher;
155pub mod retry;
156pub mod shutdown;
157
158pub use batching::{BatchConfig, BatchConfigBuilder, MessageBatcher};
159pub use circuit_breaker::{
160    CircuitBreaker, CircuitBreakerConfig, CircuitBreakerStats, CircuitState,
161};
162pub use config::{
163    HealthCheckConfig, HealthCheckConfigBuilder, PoolConfig, PoolConfigBuilder, RabbitConfig,
164    RabbitConfigBuilder, RetryConfig, RetryConfigBuilder,
165};
166pub use connection::{Connection, ConnectionManager, ConnectionStats};
167pub use consumer::{
168    BaseConsumer, Consumer, ConsumerOptions, ConsumerOptionsBuilder, MessageHandler,
169};
170pub use error::{ProcessingError, RabbitError, Result, RustRabbitError};
171pub use health::{ConnectionStatus, HealthChecker};
172pub use metrics::{MetricsTimer, RustRabbitMetrics};
173pub use patterns::{
174    // Message deduplication
175    deduplication::{
176        ContentHash, DeduplicatedMessage, DeduplicationConfig, DeduplicationManager,
177        DeduplicationResult, DeduplicationStrategy, DuplicateInfo, MessageId,
178    },
179    // Event sourcing
180    event_sourcing::{
181        AggregateId, AggregateRoot, AggregateSnapshot, DomainEvent, EventReplayService,
182        EventSequence, EventSourcingRepository, EventStore, InMemoryEventStore,
183    },
184    // Priority queues
185    priority::{
186        Priority, PriorityConsumer, PriorityMessage, PriorityQueue, PriorityQueueConfig,
187        PriorityRouter,
188    },
189    // Request-Response pattern
190    request_response::{
191        CorrelationId, RequestHandler, RequestMessage, RequestResponseClient,
192        RequestResponseServer, ResponseMessage,
193    },
194    // Saga pattern
195    saga::{
196        SagaAction, SagaCoordinator, SagaId, SagaInstance, SagaStatus, SagaStep, SagaStepExecutor,
197        StepResult, StepStatus,
198    },
199};
200pub use publisher::{
201    CustomExchangeDeclareOptions, CustomQueueDeclareOptions, PublishOptions, PublishOptionsBuilder,
202    Publisher,
203};
204pub use retry::{DelayedMessageExchange, RetryPolicy};
205pub use shutdown::{
206    setup_signal_handling, ShutdownConfig, ShutdownHandler, ShutdownManager, ShutdownSignal,
207};
208
209/// Main facade for the rust-rabbit library
210pub struct RustRabbit {
211    connection_manager: ConnectionManager,
212    metrics: Option<RustRabbitMetrics>,
213    shutdown_manager: Option<std::sync::Arc<ShutdownManager>>,
214}
215
216impl RustRabbit {
217    /// Create a new RustRabbit instance with the given configuration
218    pub async fn new(config: RabbitConfig) -> Result<Self> {
219        let connection_manager = ConnectionManager::new(config).await?;
220        Ok(Self {
221            connection_manager,
222            metrics: None,
223            shutdown_manager: None,
224        })
225    }
226
227    /// Create a new RustRabbit instance with metrics enabled
228    pub async fn with_metrics(config: RabbitConfig, metrics: RustRabbitMetrics) -> Result<Self> {
229        let connection_manager = ConnectionManager::new(config).await?;
230        Ok(Self {
231            connection_manager,
232            metrics: Some(metrics),
233            shutdown_manager: None,
234        })
235    }
236
237    /// Get a publisher instance
238    pub fn publisher(&self) -> Publisher {
239        let mut publisher = Publisher::new(self.connection_manager.clone());
240        if let Some(metrics) = &self.metrics {
241            publisher.set_metrics(metrics.clone());
242        }
243        publisher
244    }
245
246    /// Create a consumer with the given options
247    pub async fn consumer(&self, options: ConsumerOptions) -> Result<Consumer> {
248        let mut consumer = Consumer::new(self.connection_manager.clone(), options).await?;
249        if let Some(metrics) = &self.metrics {
250            consumer.set_metrics(metrics.clone());
251        }
252        Ok(consumer)
253    }
254
255    /// Get the health checker
256    pub fn health_checker(&self) -> HealthChecker {
257        let mut health_checker = HealthChecker::new(self.connection_manager.clone());
258        if let Some(metrics) = &self.metrics {
259            health_checker.set_metrics(metrics.clone());
260        }
261        health_checker
262    }
263
264    /// Get the metrics instance if enabled
265    pub fn metrics(&self) -> Option<&RustRabbitMetrics> {
266        self.metrics.as_ref()
267    }
268
269    /// Create a message batcher for high-throughput publishing
270    pub async fn create_batcher(&self, config: BatchConfig) -> Result<MessageBatcher> {
271        let publisher = self.publisher();
272
273        if let Some(metrics) = &self.metrics {
274            MessageBatcher::with_metrics(publisher, config, metrics.clone()).await
275        } else {
276            MessageBatcher::new(publisher, config).await
277        }
278    }
279
280    /// Enable graceful shutdown handling
281    pub fn enable_shutdown_handling(
282        &mut self,
283        config: ShutdownConfig,
284    ) -> std::sync::Arc<ShutdownManager> {
285        let shutdown_manager = std::sync::Arc::new(ShutdownManager::new(config));
286        self.shutdown_manager = Some(shutdown_manager.clone());
287        shutdown_manager
288    }
289
290    /// Get the shutdown manager if enabled
291    pub fn shutdown_manager(&self) -> Option<std::sync::Arc<ShutdownManager>> {
292        self.shutdown_manager.clone()
293    }
294
295    /// Close all connections with optional graceful shutdown
296    pub async fn close(&self) -> Result<()> {
297        if let Some(shutdown_manager) = &self.shutdown_manager {
298            shutdown_manager.shutdown(ShutdownSignal::Graceful).await?;
299        }
300        self.connection_manager.close().await
301    }
302}