rust_rabbit/lib.rs
1//! # rust-rabbit 🐰
2//!
3//! A **simple, reliable** RabbitMQ client library for Rust.
4//! Focus on core functionality with minimal configuration.
5//!
6//! ## Features
7//!
8//! - **🚀 Simple API**: Just Publisher and Consumer with essential methods
9//! - **🔄 Flexible Retry**: Exponential, linear, or custom retry mechanisms
10//! - **🛠️ Auto-Setup**: Automatic queue/exchange declaration and binding
11//! - **⚡ Built-in Reliability**: Default ACK behavior with error handling
12//!
13//! ## Quick Start
14//!
15//! ### Publisher
16//!
17//! ```rust,no_run
18//! use rust_rabbit::{Connection, Publisher, PublishOptions};
19//! use serde::Serialize;
20//!
21//! #[derive(Serialize)]
22//! struct Order {
23//! id: u32,
24//! amount: f64,
25//! }
26//!
27//! #[tokio::main]
28//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
29//! let connection = Connection::new("amqp://localhost:5672").await?;
30//! let publisher = Publisher::new(connection);
31//!
32//! let order = Order { id: 123, amount: 99.99 };
33//!
34//! // Publish to exchange
35//! publisher.publish_to_exchange("orders", "new.order", &order, None).await?;
36//!
37//! // Publish directly to queue
38//! publisher.publish_to_queue("order_queue", &order, None).await?;
39//!
40//! Ok(())
41//! }
42//! ```
43//!
44//! ### Consumer with Retry
45//!
46//! ```rust,no_run
47//! use rust_rabbit::{Connection, Consumer, RetryConfig};
48//! use serde::{Deserialize, Serialize};
49//!
50//! #[derive(Serialize, Deserialize, Clone)]
51//! struct Order {
52//! id: u32,
53//! amount: f64,
54//! }
55//!
56//! #[tokio::main]
57//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
58//! let connection = Connection::new("amqp://localhost:5672").await?;
59//!
60//! let consumer = Consumer::builder(connection, "order_queue")
61//! .with_retry(RetryConfig::exponential_default()) // 1s->2s->4s->8s->16s
62//! .bind_to_exchange("orders", "order.*")
63//! .with_prefetch(5)
64//! .build();
65//!
66//! consumer.consume(|msg: rust_rabbit::Message<Order>| async move {
67//! println!("Processing order {}: ${}", msg.data.id, msg.data.amount);
68//! // Your business logic here
69//! Ok(()) // ACK message
70//! }).await?;
71//!
72//! Ok(())
73//! }
74//! ```
75//!
76//! ## Retry Configurations
77//!
78//! ```rust,no_run
79//! use rust_rabbit::RetryConfig;
80//! use std::time::Duration;
81//!
82//! // Exponential: 1s -> 2s -> 4s -> 8s -> 16s (5 retries)
83//! let exponential = RetryConfig::exponential_default();
84//!
85//! // Custom exponential: 2s -> 4s -> 8s -> 16s -> 32s (with cap at 60s)
86//! let custom_exp = RetryConfig::exponential(5, Duration::from_secs(2), Duration::from_secs(60));
87//!
88//! // Linear: 10s -> 10s -> 10s (3 retries)
89//! let linear = RetryConfig::linear(3, Duration::from_secs(10));
90//!
91//! // Custom delays: 1s -> 5s -> 30s
92//! let custom = RetryConfig::custom(vec![
93//! Duration::from_secs(1),
94//! Duration::from_secs(5),
95//! Duration::from_secs(30),
96//! ]);
97//!
98//! // No retries
99//! let no_retry = RetryConfig::no_retry();
100//! ```
101//!
102//! ## MessageEnvelope System
103//!
104//! For advanced retry tracking and error handling, use the MessageEnvelope system:
105//!
106//! ```rust,no_run
107//! use rust_rabbit::{Connection, Publisher, Consumer, MessageEnvelope, RetryConfig};
108//! use serde::{Serialize, Deserialize};
109//!
110//! #[derive(Serialize, Deserialize, Clone)]
111//! struct Order {
112//! id: u32,
113//! amount: f64,
114//! }
115//!
116//! #[tokio::main]
117//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
118//! let connection = Connection::new("amqp://localhost:5672").await?;
119//!
120//! // Publisher with envelope
121//! let publisher = Publisher::new(connection.clone());
122//! let order = Order { id: 123, amount: 99.99 };
123//! let envelope = MessageEnvelope::new(order, "order_queue")
124//! .with_max_retries(3);
125//!
126//! publisher.publish_envelope_to_queue("order_queue", &envelope, None).await?;
127//!
128//! // Consumer with envelope processing
129//! let consumer = Consumer::builder(connection, "order_queue")
130//! .with_retry(RetryConfig::exponential_default())
131//! .build();
132//!
133//! consumer.consume_envelopes(|envelope: MessageEnvelope<Order>| async move {
134//! println!("Processing order {} (attempt {})",
135//! envelope.payload.id,
136//! envelope.metadata.retry_attempt + 1);
137//!
138//! // Access retry metadata
139//! if !envelope.is_first_attempt() {
140//! println!("This is a retry. Last error: {:?}", envelope.last_error());
141//! }
142//!
143//! // Your business logic here
144//! Ok(())
145//! }).await?;
146//!
147//! Ok(())
148//! }
149//! ```
150
151// Re-export main types for easy access
152pub use connection::Connection;
153pub use consumer::{Consumer, ConsumerBuilder};
154pub use error::{Result, RustRabbitError};
155pub use message::{
156 ErrorRecord, ErrorType, MassTransitEnvelope, MessageEnvelope, MessageMetadata, MessageSource,
157 WireMessage,
158};
159pub use publisher::{MassTransitOptions, PublishOptions, Publisher};
160pub use retry::{DelayStrategy, RetryConfig, RetryMechanism};
161
162// Internal modules
163mod connection;
164mod consumer;
165mod error;
166mod message;
167mod publisher;
168mod retry;
169
170/// Prelude module for convenient imports
171pub mod prelude {
172 pub use crate::{
173 Connection, Consumer, ConsumerBuilder, DelayStrategy, ErrorRecord, ErrorType,
174 MassTransitEnvelope, MassTransitOptions, MessageEnvelope, MessageMetadata, MessageSource,
175 PublishOptions, Publisher, Result, RetryConfig, RetryMechanism, RustRabbitError,
176 WireMessage,
177 };
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use serde::{Deserialize, Serialize};
184 use std::time::Duration;
185
186 #[derive(Debug, Serialize, Deserialize, PartialEq)]
187 #[allow(dead_code)]
188 struct TestMessage {
189 id: u32,
190 content: String,
191 }
192
193 #[tokio::test]
194 async fn test_api_compilation() {
195 // This test ensures the API compiles correctly
196 // Real integration tests would require a RabbitMQ instance
197
198 let _connection_result = Connection::new("amqp://localhost:5672").await;
199
200 // Test retry configurations
201 let _exponential = RetryConfig::exponential_default();
202 let _linear = RetryConfig::linear(3, Duration::from_secs(5));
203 let _custom = RetryConfig::custom(vec![Duration::from_secs(1), Duration::from_secs(5)]);
204 let _no_retry = RetryConfig::no_retry();
205 }
206
207 #[test]
208 fn test_basic_api_exists() {
209 // Test that our main types exist and can be referenced
210 use crate::prelude::*;
211
212 // This is a compile-time test - if it compiles, our API is accessible
213 let _: Option<Connection> = None;
214 let _: Option<Publisher> = None;
215 let _: Option<Consumer> = None;
216 let _: Option<RetryConfig> = None;
217
218 // Test that we can create basic configs
219 let _retry = RetryConfig::exponential_default();
220 let _options = PublishOptions::new();
221 }
222
223 #[test]
224 fn test_retry_config_calculations() {
225 let config = RetryConfig::exponential(5, Duration::from_secs(1), Duration::from_secs(30));
226
227 assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1)));
228 assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(2)));
229 assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(4)));
230 assert_eq!(config.calculate_delay(3), Some(Duration::from_secs(8)));
231 assert_eq!(config.calculate_delay(4), Some(Duration::from_secs(16)));
232 // Attempt 5 exceeds max_retries (5), so should return None
233 assert_eq!(config.calculate_delay(5), None);
234 }
235
236 #[test]
237 fn test_retry_config_linear() {
238 let config = RetryConfig::linear(3, Duration::from_secs(5));
239
240 assert_eq!(config.max_retries, 3);
241 assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(5)));
242 assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(5)));
243 assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(5)));
244 assert_eq!(config.calculate_delay(3), None); // Exceeds max_retries
245 }
246
247 #[test]
248 fn test_retry_config_custom() {
249 let delays = vec![
250 Duration::from_secs(1),
251 Duration::from_secs(3),
252 Duration::from_secs(7),
253 ];
254 let config = RetryConfig::custom(delays.clone());
255
256 assert_eq!(config.max_retries, 3);
257 assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1)));
258 assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(3)));
259 assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(7)));
260 assert_eq!(config.calculate_delay(3), None); // Exceeds max_retries
261 }
262
263 #[test]
264 fn test_publish_options() {
265 let options = PublishOptions::new()
266 .mandatory()
267 .with_expiration("60000")
268 .with_priority(5);
269
270 assert!(options.mandatory);
271 assert_eq!(options.expiration, Some("60000".to_string()));
272 assert_eq!(options.priority, Some(5));
273 }
274}