1use std::collections::HashMap;
46use std::fmt;
47use std::sync::Arc;
48use std::time::Duration;
49
50use async_trait::async_trait;
51use chrono::{DateTime, Utc};
52use serde::{Deserialize, Serialize};
53use uuid::Uuid;
54
55pub mod config;
56pub mod error;
57
58#[cfg(feature = "rabbitmq")]
59pub mod rabbitmq;
60
61#[cfg(feature = "kafka")]
62pub mod kafka;
63
64#[cfg(feature = "nats")]
65pub mod nats;
66
67#[cfg(feature = "aws")]
68pub mod aws;
69
70pub use config::*;
71pub use error::*;
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct Message {
76 pub id: String,
78 pub payload: Vec<u8>,
80 pub headers: HashMap<String, String>,
82 pub topic: String,
84 pub timestamp: DateTime<Utc>,
86 pub correlation_id: Option<String>,
88 pub reply_to: Option<String>,
90 pub content_type: Option<String>,
92 pub priority: Option<u8>,
94 pub ttl: Option<u64>,
96}
97
98impl Message {
99 pub fn new<T: Into<Vec<u8>>>(topic: impl Into<String>, payload: T) -> Self {
101 Self {
102 id: Uuid::new_v4().to_string(),
103 payload: payload.into(),
104 headers: HashMap::new(),
105 topic: topic.into(),
106 timestamp: Utc::now(),
107 correlation_id: None,
108 reply_to: None,
109 content_type: None,
110 priority: None,
111 ttl: None,
112 }
113 }
114
115 pub fn json<T: Serialize>(topic: impl Into<String>, value: &T) -> Result<Self, MessagingError> {
117 let payload =
118 serde_json::to_vec(value).map_err(|e| MessagingError::Serialization(e.to_string()))?;
119 let mut msg = Self::new(topic, payload);
120 msg.content_type = Some("application/json".to_string());
121 Ok(msg)
122 }
123
124 pub fn parse_json<T: for<'de> Deserialize<'de>>(&self) -> Result<T, MessagingError> {
126 serde_json::from_slice(&self.payload)
127 .map_err(|e| MessagingError::Deserialization(e.to_string()))
128 }
129
130 pub fn payload_str(&self) -> Result<&str, MessagingError> {
132 std::str::from_utf8(&self.payload)
133 .map_err(|e| MessagingError::Deserialization(e.to_string()))
134 }
135
136 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
138 self.headers.insert(key.into(), value.into());
139 self
140 }
141
142 pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
144 self.correlation_id = Some(id.into());
145 self
146 }
147
148 pub fn with_reply_to(mut self, reply_to: impl Into<String>) -> Self {
150 self.reply_to = Some(reply_to.into());
151 self
152 }
153
154 pub fn with_content_type(mut self, content_type: impl Into<String>) -> Self {
156 self.content_type = Some(content_type.into());
157 self
158 }
159
160 pub fn with_priority(mut self, priority: u8) -> Self {
162 self.priority = Some(priority.min(9));
163 self
164 }
165
166 pub fn with_ttl(mut self, ttl_ms: u64) -> Self {
168 self.ttl = Some(ttl_ms);
169 self
170 }
171
172 pub fn with_ttl_duration(mut self, ttl: Duration) -> Self {
174 self.ttl = Some(ttl.as_millis() as u64);
175 self
176 }
177}
178
179impl fmt::Display for Message {
180 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181 write!(
182 f,
183 "Message {{ id: {}, topic: {}, size: {} bytes }}",
184 self.id,
185 self.topic,
186 self.payload.len()
187 )
188 }
189}
190
191#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
193pub enum AckMode {
194 #[default]
196 Auto,
197 Manual,
199 None,
201}
202
203#[derive(Debug, Clone)]
205pub enum ProcessingResult {
206 Success,
208 Retry,
210 DeadLetter,
212 Reject,
214}
215
216#[async_trait]
218pub trait MessageHandler: Send + Sync + 'static {
219 async fn handle(&self, message: Message) -> Result<ProcessingResult, MessagingError>;
221
222 async fn on_deserialize_error(&self, _error: &MessagingError) -> ProcessingResult {
224 ProcessingResult::DeadLetter
225 }
226}
227
228pub struct FnHandler<F>(pub F);
230
231#[async_trait]
232impl<F, Fut> MessageHandler for FnHandler<F>
233where
234 F: Fn(Message) -> Fut + Send + Sync + 'static,
235 Fut: std::future::Future<Output = Result<ProcessingResult, MessagingError>> + Send,
236{
237 async fn handle(&self, message: Message) -> Result<ProcessingResult, MessagingError> {
238 (self.0)(message).await
239 }
240}
241
242#[derive(Debug, Clone, Default)]
244pub struct PublishOptions {
245 pub confirm: bool,
247 pub timeout: Option<Duration>,
249 pub persistent: bool,
251 pub routing_key: Option<String>,
253 pub exchange: Option<String>,
255 pub partition_key: Option<String>,
257}
258
259impl PublishOptions {
260 pub fn persistent() -> Self {
262 Self {
263 persistent: true,
264 confirm: true,
265 ..Default::default()
266 }
267 }
268
269 pub fn with_routing_key(mut self, key: impl Into<String>) -> Self {
271 self.routing_key = Some(key.into());
272 self
273 }
274
275 pub fn with_exchange(mut self, exchange: impl Into<String>) -> Self {
277 self.exchange = Some(exchange.into());
278 self
279 }
280
281 pub fn with_partition_key(mut self, key: impl Into<String>) -> Self {
283 self.partition_key = Some(key.into());
284 self
285 }
286
287 pub fn with_confirm(mut self, timeout: Duration) -> Self {
289 self.confirm = true;
290 self.timeout = Some(timeout);
291 self
292 }
293}
294
295#[derive(Debug, Clone, Default)]
297pub struct SubscribeOptions {
298 pub consumer_group: Option<String>,
300 pub prefetch_count: Option<u16>,
302 pub ack_mode: AckMode,
304 pub from_beginning: bool,
306 pub filter: Option<String>,
308 pub concurrency: Option<usize>,
310}
311
312impl SubscribeOptions {
313 pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
315 self.consumer_group = Some(group.into());
316 self
317 }
318
319 pub fn with_prefetch(mut self, count: u16) -> Self {
321 self.prefetch_count = Some(count);
322 self
323 }
324
325 pub fn with_ack_mode(mut self, mode: AckMode) -> Self {
327 self.ack_mode = mode;
328 self
329 }
330
331 pub fn from_beginning(mut self) -> Self {
333 self.from_beginning = true;
334 self
335 }
336
337 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
339 self.concurrency = Some(concurrency);
340 self
341 }
342}
343
344#[async_trait]
346pub trait Subscription: Send + Sync {
347 async fn unsubscribe(&self) -> Result<(), MessagingError>;
349
350 fn is_active(&self) -> bool;
352
353 fn topic(&self) -> &str;
355}
356
357#[async_trait]
359pub trait MessageBroker: Send + Sync {
360 type Subscription: Subscription;
362
363 async fn publish(&self, message: Message) -> Result<(), MessagingError>;
365
366 async fn publish_with_options(
368 &self,
369 message: Message,
370 options: PublishOptions,
371 ) -> Result<(), MessagingError>;
372
373 async fn subscribe(
375 &self,
376 topic: &str,
377 handler: Arc<dyn MessageHandler>,
378 ) -> Result<Self::Subscription, MessagingError>;
379
380 async fn subscribe_with_options(
382 &self,
383 topic: &str,
384 handler: Arc<dyn MessageHandler>,
385 options: SubscribeOptions,
386 ) -> Result<Self::Subscription, MessagingError>;
387
388 fn is_connected(&self) -> bool;
390
391 async fn close(&self) -> Result<(), MessagingError>;
393}
394
395pub struct MessagingBuilder {
397 pub config: MessagingConfig,
399}
400
401impl MessagingBuilder {
402 pub fn new(config: MessagingConfig) -> Self {
404 Self { config }
405 }
406
407 #[cfg(feature = "rabbitmq")]
409 pub async fn build_rabbitmq(self) -> Result<rabbitmq::RabbitMqBroker, MessagingError> {
410 rabbitmq::RabbitMqBroker::connect(&self.config).await
411 }
412
413 #[cfg(feature = "kafka")]
415 pub async fn build_kafka(self) -> Result<kafka::KafkaBroker, MessagingError> {
416 kafka::KafkaBroker::connect(&self.config).await
417 }
418
419 #[cfg(feature = "nats")]
421 pub async fn build_nats(self) -> Result<nats::NatsBroker, MessagingError> {
422 nats::NatsBroker::connect(&self.config).await
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429
430 #[test]
431 fn test_message_creation() {
432 let msg = Message::new("test-topic", b"hello world".to_vec());
433 assert_eq!(msg.topic, "test-topic");
434 assert_eq!(msg.payload, b"hello world");
435 assert!(!msg.id.is_empty());
436 }
437
438 #[test]
439 fn test_message_json() {
440 #[derive(Serialize, Deserialize, Debug, PartialEq)]
441 struct TestData {
442 name: String,
443 value: i32,
444 }
445
446 let data = TestData {
447 name: "test".to_string(),
448 value: 42,
449 };
450
451 let msg = Message::json("test-topic", &data).unwrap();
452 assert_eq!(msg.content_type, Some("application/json".to_string()));
453
454 let parsed: TestData = msg.parse_json().unwrap();
455 assert_eq!(parsed, data);
456 }
457
458 #[test]
459 fn test_message_builder() {
460 let msg = Message::new("topic", b"data".to_vec())
461 .with_header("key", "value")
462 .with_correlation_id("corr-123")
463 .with_reply_to("reply-queue")
464 .with_priority(5)
465 .with_ttl(60000);
466
467 assert_eq!(msg.headers.get("key"), Some(&"value".to_string()));
468 assert_eq!(msg.correlation_id, Some("corr-123".to_string()));
469 assert_eq!(msg.reply_to, Some("reply-queue".to_string()));
470 assert_eq!(msg.priority, Some(5));
471 assert_eq!(msg.ttl, Some(60000));
472 }
473
474 #[test]
475 fn test_publish_options() {
476 let opts = PublishOptions::persistent()
477 .with_routing_key("my.routing.key")
478 .with_exchange("my-exchange")
479 .with_confirm(Duration::from_secs(5));
480
481 assert!(opts.persistent);
482 assert!(opts.confirm);
483 assert_eq!(opts.routing_key, Some("my.routing.key".to_string()));
484 assert_eq!(opts.exchange, Some("my-exchange".to_string()));
485 assert_eq!(opts.timeout, Some(Duration::from_secs(5)));
486 }
487
488 #[test]
489 fn test_subscribe_options() {
490 let opts = SubscribeOptions::default()
491 .with_consumer_group("my-group")
492 .with_prefetch(10)
493 .with_ack_mode(AckMode::Manual)
494 .with_concurrency(4);
495
496 assert_eq!(opts.consumer_group, Some("my-group".to_string()));
497 assert_eq!(opts.prefetch_count, Some(10));
498 assert_eq!(opts.ack_mode, AckMode::Manual);
499 assert_eq!(opts.concurrency, Some(4));
500 }
501}