rust_rabbit/
lib.rs

1//! # RustRabbit 🐰
2//!
3//! A **high-performance, production-ready** RabbitMQ client library for Rust with **zero-configuration**
4//! simplicity and enterprise-grade features. Built for reliability, observability, and developer happiness.
5//!
6//! ## Features
7//!
8//! - **🚀 Smart Automation**: One-line setup with `RetryPolicy::fast()` configures everything
9//! - **🔄 Advanced Retry System**: Multiple presets, exponential backoff, dead letter integration  
10//! - **🏗️ Enterprise Patterns**: Request-Response, Saga, Event Sourcing, Priority Queues
11//! - **🔍 Production Observability**: Prometheus metrics, health monitoring, circuit breaker
12//! - **🛡️ Reliability**: Connection pooling, graceful shutdown, error recovery
13//!
14//! ## Quick Start
15//!
16//! ```rust,no_run
17//! use rust_rabbit::{
18//!     config::RabbitConfig,
19//!     connection::ConnectionManager,
20//!     consumer::{Consumer, ConsumerOptions},
21//!     retry::RetryPolicy,
22//! };
23//!
24//! #[tokio::main]
25//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
26//!     // 1. Connection
27//!     let config = RabbitConfig::builder()
28//!         .connection_string("amqp://user:pass@localhost:5672/vhost")
29//!         .build();
30//!     let connection = ConnectionManager::new(config).await?;
31//!
32//!     // 2. Consumer with retry (1 line!)
33//!     let options = ConsumerOptions {
34//!         auto_ack: false,
35//!         retry_policy: Some(RetryPolicy::fast()),
36//!         ..Default::default()
37//!     };
38//!
39//!     // 3. Create consumer (ready to use)
40//!     let _consumer = Consumer::new(connection, options).await?;
41//!     
42//!     // Consumer is ready! See examples/ for usage patterns
43//!     Ok(())
44//! }
45//! ```
46//!
47//! **What `RetryPolicy::fast()` creates automatically:**
48//! - ✅ **5 retries**: 200ms → 300ms → 450ms → 675ms → 1s (capped at 10s)
49//! - ✅ **Dead Letter Queue**: Automatic DLX/DLQ setup for failed messages
50//! - ✅ **Backoff + Jitter**: Intelligent delay with randomization
51//! - ✅ **Production Ready**: Optimal settings for most use cases
52//!
53//! ## Retry Patterns
54//!
55//! ```rust,no_run
56//! use rust_rabbit::retry::RetryPolicy;
57//! use std::time::Duration;
58//!
59//! // Quick presets for common scenarios
60//! let fast = RetryPolicy::fast();               // 5 retries, 200ms→10s, 1.5x backoff
61//! let slow = RetryPolicy::slow();               // 3 retries, 1s→1min, 2.0x backoff
62//! let linear = RetryPolicy::linear(Duration::from_millis(500), 3); // Fixed 500ms intervals
63//!
64//! // Custom with builder
65//! let custom = RetryPolicy::builder()
66//!     .max_retries(5)
67//!     .initial_delay(Duration::from_millis(100))
68//!     .backoff_multiplier(2.0)
69//!     .jitter(0.1)
70//!     .dead_letter_exchange("my.dlx")
71//!     .build();
72//! ```
73//!
74//! ## Advanced Patterns
75//!
76//! ### Request-Response (RPC)
77//!
78//! ```rust,no_run
79//! use rust_rabbit::patterns::request_response::*;
80//! use std::time::Duration;
81//! use std::sync::Arc;
82//!
83//! #[tokio::main]
84//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
85//!     // Simple example - actual usage requires proper message types
86//!     let client = RequestResponseClient::new(Duration::from_secs(30));
87//!     
88//!     // In real usage, you would send actual request messages
89//!     // let response = client.send_request("queue", request_data, None).await?;
90//!     Ok(())
91//! }
92//! ```
93//!
94//! ### Event Sourcing (CQRS)
95//!
96//! ```rust,no_run
97//! use rust_rabbit::patterns::event_sourcing::*;
98//! use std::sync::Arc;
99//!
100//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
101//!     let event_store = Arc::new(InMemoryEventStore::new());
102//!     
103//!     // Example - actual usage requires implementing AggregateRoot trait
104//!     // let repository = EventSourcingRepository::<MyAggregate>::new(event_store);
105//!     Ok(())
106//! }
107//! ```
108//!
109//! ## Production Features
110//!
111//! ### Health Monitoring
112//!
113//! ```rust,no_run
114//! use rust_rabbit::{health::HealthChecker, connection::ConnectionManager, config::RabbitConfig};
115//!
116//! #[tokio::main]
117//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
118//!     let config = RabbitConfig::builder()
119//!         .connection_string("amqp://localhost:5672")
120//!         .build();
121//!     let connection_manager = ConnectionManager::new(config).await?;
122//!     let health_checker = HealthChecker::new(connection_manager.clone());
123//!     
124//!     match health_checker.check_health().await {
125//!         Ok(status) => println!("Connection healthy: {:?}", status),
126//!         Err(e) => println!("Connection issues: {}", e),
127//!     }
128//!     Ok(())
129//! }
130//! ```
131//!
132//! ### Prometheus Metrics
133//!
134//! ```rust,no_run
135//! use rust_rabbit::metrics::RustRabbitMetrics;
136//!
137//! let metrics = RustRabbitMetrics::new();
138//! // Metrics automatically collected:
139//! // - rust_rabbit_messages_published_total
140//! // - rust_rabbit_messages_consumed_total  
141//! // - rust_rabbit_message_processing_duration_seconds
142//! // - rust_rabbit_connection_health
143//! ```
144
145pub mod batching;
146pub mod circuit_breaker;
147pub mod config;
148pub mod connection;
149pub mod consumer;
150pub mod error;
151pub mod health;
152pub mod metrics;
153pub mod patterns; // Phase 2: Advanced messaging patterns
154pub mod publisher;
155pub mod retry;
156pub mod shutdown;
157
158pub use batching::{BatchConfig, BatchConfigBuilder, MessageBatcher};
159pub use circuit_breaker::{
160    CircuitBreaker, CircuitBreakerConfig, CircuitBreakerStats, CircuitState,
161};
162pub use config::{
163    HealthCheckConfig, HealthCheckConfigBuilder, PoolConfig, PoolConfigBuilder, RabbitConfig,
164    RabbitConfigBuilder, RetryConfig, RetryConfigBuilder,
165};
166pub use connection::{Connection, ConnectionManager, ConnectionStats};
167pub use consumer::{Consumer, ConsumerOptions, ConsumerOptionsBuilder, MessageHandler};
168pub use error::{RabbitError, Result, RustRabbitError};
169pub use health::{ConnectionStatus, HealthChecker};
170pub use metrics::{MetricsTimer, RustRabbitMetrics};
171pub use patterns::{
172    // Message deduplication
173    deduplication::{
174        ContentHash, DeduplicatedMessage, DeduplicationConfig, DeduplicationManager,
175        DeduplicationResult, DeduplicationStrategy, DuplicateInfo, MessageId,
176    },
177    // Event sourcing
178    event_sourcing::{
179        AggregateId, AggregateRoot, AggregateSnapshot, DomainEvent, EventReplayService,
180        EventSequence, EventSourcingRepository, EventStore, InMemoryEventStore,
181    },
182    // Priority queues
183    priority::{
184        Priority, PriorityConsumer, PriorityMessage, PriorityQueue, PriorityQueueConfig,
185        PriorityRouter,
186    },
187    // Request-Response pattern
188    request_response::{
189        CorrelationId, RequestHandler, RequestMessage, RequestResponseClient,
190        RequestResponseServer, ResponseMessage,
191    },
192    // Saga pattern
193    saga::{
194        SagaAction, SagaCoordinator, SagaId, SagaInstance, SagaStatus, SagaStep, SagaStepExecutor,
195        StepResult, StepStatus,
196    },
197};
198pub use publisher::{
199    CustomExchangeDeclareOptions, CustomQueueDeclareOptions, PublishOptions, PublishOptionsBuilder,
200    Publisher,
201};
202pub use retry::{DelayedMessageExchange, RetryPolicy};
203pub use shutdown::{
204    setup_signal_handling, ShutdownConfig, ShutdownHandler, ShutdownManager, ShutdownSignal,
205};
206
207/// Main facade for the rust-rabbit library
208pub struct RustRabbit {
209    connection_manager: ConnectionManager,
210    metrics: Option<RustRabbitMetrics>,
211    shutdown_manager: Option<std::sync::Arc<ShutdownManager>>,
212}
213
214impl RustRabbit {
215    /// Create a new RustRabbit instance with the given configuration
216    pub async fn new(config: RabbitConfig) -> Result<Self> {
217        let connection_manager = ConnectionManager::new(config).await?;
218        Ok(Self {
219            connection_manager,
220            metrics: None,
221            shutdown_manager: None,
222        })
223    }
224
225    /// Create a new RustRabbit instance with metrics enabled
226    pub async fn with_metrics(config: RabbitConfig, metrics: RustRabbitMetrics) -> Result<Self> {
227        let connection_manager = ConnectionManager::new(config).await?;
228        Ok(Self {
229            connection_manager,
230            metrics: Some(metrics),
231            shutdown_manager: None,
232        })
233    }
234
235    /// Get a publisher instance
236    pub fn publisher(&self) -> Publisher {
237        let mut publisher = Publisher::new(self.connection_manager.clone());
238        if let Some(metrics) = &self.metrics {
239            publisher.set_metrics(metrics.clone());
240        }
241        publisher
242    }
243
244    /// Create a consumer with the given options
245    pub async fn consumer(&self, options: ConsumerOptions) -> Result<Consumer> {
246        let mut consumer = Consumer::new(self.connection_manager.clone(), options).await?;
247        if let Some(metrics) = &self.metrics {
248            consumer.set_metrics(metrics.clone());
249        }
250        Ok(consumer)
251    }
252
253    /// Get the health checker
254    pub fn health_checker(&self) -> HealthChecker {
255        let mut health_checker = HealthChecker::new(self.connection_manager.clone());
256        if let Some(metrics) = &self.metrics {
257            health_checker.set_metrics(metrics.clone());
258        }
259        health_checker
260    }
261
262    /// Get the metrics instance if enabled
263    pub fn metrics(&self) -> Option<&RustRabbitMetrics> {
264        self.metrics.as_ref()
265    }
266
267    /// Create a message batcher for high-throughput publishing
268    pub async fn create_batcher(&self, config: BatchConfig) -> Result<MessageBatcher> {
269        let publisher = self.publisher();
270
271        if let Some(metrics) = &self.metrics {
272            MessageBatcher::with_metrics(publisher, config, metrics.clone()).await
273        } else {
274            MessageBatcher::new(publisher, config).await
275        }
276    }
277
278    /// Enable graceful shutdown handling
279    pub fn enable_shutdown_handling(
280        &mut self,
281        config: ShutdownConfig,
282    ) -> std::sync::Arc<ShutdownManager> {
283        let shutdown_manager = std::sync::Arc::new(ShutdownManager::new(config));
284        self.shutdown_manager = Some(shutdown_manager.clone());
285        shutdown_manager
286    }
287
288    /// Get the shutdown manager if enabled
289    pub fn shutdown_manager(&self) -> Option<std::sync::Arc<ShutdownManager>> {
290        self.shutdown_manager.clone()
291    }
292
293    /// Close all connections with optional graceful shutdown
294    pub async fn close(&self) -> Result<()> {
295        if let Some(shutdown_manager) = &self.shutdown_manager {
296            shutdown_manager.shutdown(ShutdownSignal::Graceful).await?;
297        }
298        self.connection_manager.close().await
299    }
300}