rust_rabbit/
lib.rs

1//! # rust-rabbit
2//!
3//! A **simple, reliable** RabbitMQ client library for Rust.
4//! Focus on core functionality with minimal configuration.
5//!
6//! ## Features
7//!
8//! - **Simple API**: Just Publisher and Consumer with essential methods
9//! - **Flexible Retry**: Exponential, linear, or custom retry mechanisms  
10//! - **Auto-Setup**: Automatic queue/exchange declaration and binding
11//! - **Built-in Reliability**: Default ACK behavior with error handling
12//!
13//! ## Quick Start
14//!
15//! ### Publisher
16//!
17//! ```rust,no_run
18//! use rust_rabbit::{Connection, Publisher, PublishOptions};
19//! use serde::Serialize;
20//!
21//! #[derive(Serialize)]
22//! struct Order {
23//!     id: u32,
24//!     amount: f64,
25//! }
26//!
27//! #[tokio::main]
28//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
29//!     let connection = Connection::new("amqp://localhost:5672").await?;
30//!     let publisher = Publisher::new(connection);
31//!     
32//!     let order = Order { id: 123, amount: 99.99 };
33//!     
34//!     // Publish to exchange
35//!     publisher.publish_to_exchange("orders", "new.order", &order, None).await?;
36//!     
37//!     // Publish directly to queue
38//!     publisher.publish_to_queue("order_queue", &order, None).await?;
39//!     
40//!     Ok(())
41//! }
42//! ```
43//!
44//! ### Consumer with Retry
45//!
46//! ```rust,no_run
47//! use rust_rabbit::{Connection, Consumer, RetryConfig};
48//! use serde::{Deserialize, Serialize};
49//!
50//! #[derive(Serialize, Deserialize, Clone)]
51//! struct Order {
52//!     id: u32,
53//!     amount: f64,
54//! }
55//!
56//! #[tokio::main]
57//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
58//!     let connection = Connection::new("amqp://localhost:5672").await?;
59//!     
60//!     let consumer = Consumer::builder(connection, "order_queue")
61//!         .with_retry(RetryConfig::exponential_default()) // 1s->2s->4s->8s->16s
62//!         .bind_to_exchange("orders", "order.*")
63//!         .with_prefetch(5)
64//!         .build();
65//!     
66//!     consumer.consume(|msg: rust_rabbit::MessageEnvelope<Order>| async move {
67//!         println!("Processing order {}: ${}", msg.payload.id, msg.payload.amount);
68//!         // Your business logic here
69//!         Ok(()) // ACK message
70//!     }).await?;
71//!     
72//!     Ok(())
73//! }
74//! ```
75//!
76//! ## Retry Configurations
77//!
78//! ```rust,no_run
79//! use rust_rabbit::RetryConfig;
80//! use std::time::Duration;
81//!
82//! // Exponential: 1s -> 2s -> 4s -> 8s -> 16s (5 retries)
83//! let exponential = RetryConfig::exponential_default();
84//!
85//! // Custom exponential: 2s -> 4s -> 8s -> 16s -> 32s (with cap at 60s)
86//! let custom_exp = RetryConfig::exponential(5, Duration::from_secs(2), Duration::from_secs(60));
87//!
88//! // Linear: 10s -> 10s -> 10s (3 retries)  
89//! let linear = RetryConfig::linear(3, Duration::from_secs(10));
90//!
91//! // Custom delays: 1s -> 5s -> 30s
92//! let custom = RetryConfig::custom(vec![
93//!     Duration::from_secs(1),
94//!     Duration::from_secs(5),
95//!     Duration::from_secs(30),
96//! ]);
97//!
98//! // No retries
99//! let no_retry = RetryConfig::no_retry();
100//! ```
101//!
102//! ## MessageEnvelope System
103//!
104//! For advanced retry tracking and error handling, use the MessageEnvelope system:
105//!
106//! ```rust,no_run
107//! use rust_rabbit::{Connection, Publisher, Consumer, MessageEnvelope, RetryConfig};
108//! use serde::{Serialize, Deserialize};
109//!
110//! #[derive(Serialize, Deserialize, Clone)]
111//! struct Order {
112//!     id: u32,
113//!     amount: f64,
114//! }
115//!
116//! #[tokio::main]
117//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
118//!     let connection = Connection::new("amqp://localhost:5672").await?;
119//!     
120//!     // Publisher with envelope
121//!     let publisher = Publisher::new(connection.clone());
122//!     let order = Order { id: 123, amount: 99.99 };
123//!     let envelope = MessageEnvelope::new(order, "order_queue")
124//!         .with_max_retries(3);
125//!     
126//!     publisher.publish_envelope_to_queue("order_queue", &envelope, None).await?;
127//!     
128//!     // Consumer with envelope processing
129//!     let consumer = Consumer::builder(connection, "order_queue")
130//!         .with_retry(RetryConfig::exponential_default())
131//!         .build();
132//!     
133//!     consumer.consume_envelopes(|envelope: MessageEnvelope<Order>| async move {
134//!         println!("Processing order {} (attempt {})",
135//!                  envelope.payload.id,
136//!                  envelope.metadata.retry_attempt + 1);
137//!         
138//!         // Access retry metadata
139//!         if !envelope.is_first_attempt() {
140//!             println!("This is a retry. Last error: {:?}", envelope.last_error());
141//!         }
142//!         
143//!         // Your business logic here
144//!         Ok(())
145//!     }).await?;
146//!     
147//!     Ok(())
148//! }
149//! ```
150
151// Re-export main types for easy access
152pub use connection::Connection;
153pub use consumer::{Consumer, ConsumerBuilder};
154pub use error::{Result, RustRabbitError};
155pub use message::{
156    ErrorRecord, ErrorType, MassTransitEnvelope, MessageEnvelope, MessageMetadata, MessageSource,
157    WireMessage,
158};
159pub use publisher::{MassTransitOptions, PublishOptions, Publisher};
160pub use retry::{DelayStrategy, RetryConfig, RetryMechanism};
161
162// Internal modules
163mod connection;
164mod consumer;
165mod error;
166mod message;
167mod publisher;
168mod retry;
169
170/// Initialize tracing with recommended defaults for rust-rabbit.
171///
172/// This sets up tracing with the following filters:
173/// - `info` level for general application logs
174/// - `warn` level for lapin (RabbitMQ client) to suppress spurious ERROR logs from io_loop
175///
176/// You can override the filter using the `RUST_LOG` environment variable.
177///
178/// # Example
179///
180/// ```rust,no_run
181/// use rust_rabbit::init_tracing;
182///
183/// #[tokio::main]
184/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
185///     // Initialize tracing with recommended settings
186///     init_tracing();
187///     
188///     // Your application code
189///     Ok(())
190/// }
191/// ```
192///
193/// # Custom Configuration
194///
195/// To use custom log levels, set the `RUST_LOG` environment variable:
196///
197/// ```bash
198/// RUST_LOG=debug,lapin=warn cargo run
199/// ```
200#[cfg(feature = "tracing")]
201pub fn init_tracing() {
202    use tracing_subscriber::EnvFilter;
203
204    let filter =
205        EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info,lapin=warn"));
206
207    tracing_subscriber::fmt().with_env_filter(filter).init();
208}
209
210/// Prelude module for convenient imports
211pub mod prelude {
212    pub use crate::{
213        Connection, Consumer, ConsumerBuilder, DelayStrategy, ErrorRecord, ErrorType,
214        MassTransitEnvelope, MassTransitOptions, MessageEnvelope, MessageMetadata, MessageSource,
215        PublishOptions, Publisher, Result, RetryConfig, RetryMechanism, RustRabbitError,
216        WireMessage,
217    };
218
219    #[cfg(feature = "tracing")]
220    pub use crate::init_tracing;
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use serde::{Deserialize, Serialize};
227    use std::time::Duration;
228
229    #[derive(Debug, Serialize, Deserialize, PartialEq)]
230    #[allow(dead_code)]
231    struct TestMessage {
232        id: u32,
233        content: String,
234    }
235
236    #[tokio::test]
237    async fn test_api_compilation() {
238        // This test ensures the API compiles correctly
239        // Real integration tests would require a RabbitMQ instance
240
241        let _connection_result = Connection::new("amqp://localhost:5672").await;
242
243        // Test retry configurations
244        let _exponential = RetryConfig::exponential_default();
245        let _linear = RetryConfig::linear(3, Duration::from_secs(5));
246        let _custom = RetryConfig::custom(vec![Duration::from_secs(1), Duration::from_secs(5)]);
247        let _no_retry = RetryConfig::no_retry();
248    }
249
250    #[test]
251    fn test_basic_api_exists() {
252        // Test that our main types exist and can be referenced
253        use crate::prelude::*;
254
255        // This is a compile-time test - if it compiles, our API is accessible
256        let _: Option<Connection> = None;
257        let _: Option<Publisher> = None;
258        let _: Option<Consumer> = None;
259        let _: Option<RetryConfig> = None;
260
261        // Test that we can create basic configs
262        let _retry = RetryConfig::exponential_default();
263        let _options = PublishOptions::new();
264    }
265
266    #[test]
267    fn test_retry_config_calculations() {
268        let config = RetryConfig::exponential(5, Duration::from_secs(1), Duration::from_secs(30));
269
270        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1)));
271        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(2)));
272        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(4)));
273        assert_eq!(config.calculate_delay(3), Some(Duration::from_secs(8)));
274        assert_eq!(config.calculate_delay(4), Some(Duration::from_secs(16)));
275        // Attempt 5 exceeds max_retries (5), so should return None
276        assert_eq!(config.calculate_delay(5), None);
277    }
278
279    #[test]
280    fn test_retry_config_linear() {
281        let config = RetryConfig::linear(3, Duration::from_secs(5));
282
283        assert_eq!(config.max_retries, 3);
284        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(5)));
285        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(5)));
286        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(5)));
287        assert_eq!(config.calculate_delay(3), None); // Exceeds max_retries
288    }
289
290    #[test]
291    fn test_retry_config_custom() {
292        let delays = vec![
293            Duration::from_secs(1),
294            Duration::from_secs(3),
295            Duration::from_secs(7),
296        ];
297        let config = RetryConfig::custom(delays.clone());
298
299        assert_eq!(config.max_retries, 3);
300        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1)));
301        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(3)));
302        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(7)));
303        assert_eq!(config.calculate_delay(3), None); // Exceeds max_retries
304    }
305
306    #[test]
307    fn test_publish_options() {
308        let options = PublishOptions::new()
309            .mandatory()
310            .with_expiration("60000")
311            .with_priority(5);
312
313        assert!(options.mandatory);
314        assert_eq!(options.expiration, Some("60000".to_string()));
315        assert_eq!(options.priority, Some(5));
316    }
317}