armature_messaging/
lib.rs

1//! # Armature Messaging
2//!
3//! Message broker integrations for the Armature framework.
4//!
5//! This crate provides a unified interface for working with various message brokers:
6//! - **RabbitMQ** - AMQP message broker
7//! - **Kafka** - Distributed event streaming
8//! - **NATS** - Cloud-native messaging
9//! - **AWS SQS/SNS** - AWS messaging services
10//!
11//! ## Features
12//!
13//! Enable specific backends via Cargo features:
14//! - `rabbitmq` - RabbitMQ/AMQP support
15//! - `kafka` - Apache Kafka support
16//! - `nats` - NATS support
17//! - `aws` - AWS SQS/SNS support
18//! - `full` - All backends
19//!
20//! ## Example
21//!
22//! ```rust,ignore
23//! use armature_messaging::{Message, MessageBroker, MessageHandler};
24//!
25//! // Define a message handler
26//! struct MyHandler;
27//!
28//! #[async_trait::async_trait]
29//! impl MessageHandler for MyHandler {
30//!     async fn handle(&self, message: Message) -> Result<(), MessagingError> {
31//!         println!("Received: {:?}", message.payload);
32//!         Ok(())
33//!     }
34//! }
35//!
36//! // Connect and subscribe
37//! #[cfg(feature = "rabbitmq")]
38//! async fn example() -> Result<(), MessagingError> {
39//!     let broker = RabbitMqBroker::connect("amqp://localhost:5672").await?;
40//!     broker.subscribe("my-queue", MyHandler).await?;
41//!     Ok(())
42//! }
43//! ```
44
45use std::collections::HashMap;
46use std::fmt;
47use std::sync::Arc;
48use std::time::Duration;
49
50use async_trait::async_trait;
51use chrono::{DateTime, Utc};
52use serde::{Deserialize, Serialize};
53use uuid::Uuid;
54
55pub mod config;
56pub mod error;
57
58#[cfg(feature = "rabbitmq")]
59pub mod rabbitmq;
60
61#[cfg(feature = "kafka")]
62pub mod kafka;
63
64#[cfg(feature = "nats")]
65pub mod nats;
66
67#[cfg(feature = "aws")]
68pub mod aws;
69
70pub use config::*;
71pub use error::*;
72
73/// A message to be sent or received from a message broker
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct Message {
76    /// Unique message identifier
77    pub id: String,
78    /// Message payload as bytes
79    pub payload: Vec<u8>,
80    /// Message headers/properties
81    pub headers: HashMap<String, String>,
82    /// Topic/queue/subject the message belongs to
83    pub topic: String,
84    /// Timestamp when the message was created
85    pub timestamp: DateTime<Utc>,
86    /// Optional correlation ID for request-response patterns
87    pub correlation_id: Option<String>,
88    /// Optional reply-to address
89    pub reply_to: Option<String>,
90    /// Message content type (e.g., "application/json")
91    pub content_type: Option<String>,
92    /// Message priority (0-9, where 9 is highest)
93    pub priority: Option<u8>,
94    /// Time-to-live in milliseconds
95    pub ttl: Option<u64>,
96}
97
98impl Message {
99    /// Create a new message with the given payload
100    pub fn new<T: Into<Vec<u8>>>(topic: impl Into<String>, payload: T) -> Self {
101        Self {
102            id: Uuid::new_v4().to_string(),
103            payload: payload.into(),
104            headers: HashMap::new(),
105            topic: topic.into(),
106            timestamp: Utc::now(),
107            correlation_id: None,
108            reply_to: None,
109            content_type: None,
110            priority: None,
111            ttl: None,
112        }
113    }
114
115    /// Create a message from a JSON-serializable value
116    pub fn json<T: Serialize>(topic: impl Into<String>, value: &T) -> Result<Self, MessagingError> {
117        let payload =
118            serde_json::to_vec(value).map_err(|e| MessagingError::Serialization(e.to_string()))?;
119        let mut msg = Self::new(topic, payload);
120        msg.content_type = Some("application/json".to_string());
121        Ok(msg)
122    }
123
124    /// Parse the payload as JSON
125    pub fn parse_json<T: for<'de> Deserialize<'de>>(&self) -> Result<T, MessagingError> {
126        serde_json::from_slice(&self.payload)
127            .map_err(|e| MessagingError::Deserialization(e.to_string()))
128    }
129
130    /// Get the payload as a UTF-8 string
131    pub fn payload_str(&self) -> Result<&str, MessagingError> {
132        std::str::from_utf8(&self.payload)
133            .map_err(|e| MessagingError::Deserialization(e.to_string()))
134    }
135
136    /// Add a header to the message
137    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
138        self.headers.insert(key.into(), value.into());
139        self
140    }
141
142    /// Set the correlation ID
143    pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
144        self.correlation_id = Some(id.into());
145        self
146    }
147
148    /// Set the reply-to address
149    pub fn with_reply_to(mut self, reply_to: impl Into<String>) -> Self {
150        self.reply_to = Some(reply_to.into());
151        self
152    }
153
154    /// Set the content type
155    pub fn with_content_type(mut self, content_type: impl Into<String>) -> Self {
156        self.content_type = Some(content_type.into());
157        self
158    }
159
160    /// Set the priority (0-9)
161    pub fn with_priority(mut self, priority: u8) -> Self {
162        self.priority = Some(priority.min(9));
163        self
164    }
165
166    /// Set the time-to-live in milliseconds
167    pub fn with_ttl(mut self, ttl_ms: u64) -> Self {
168        self.ttl = Some(ttl_ms);
169        self
170    }
171
172    /// Set the time-to-live from a Duration
173    pub fn with_ttl_duration(mut self, ttl: Duration) -> Self {
174        self.ttl = Some(ttl.as_millis() as u64);
175        self
176    }
177}
178
179impl fmt::Display for Message {
180    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181        write!(
182            f,
183            "Message {{ id: {}, topic: {}, size: {} bytes }}",
184            self.id,
185            self.topic,
186            self.payload.len()
187        )
188    }
189}
190
191/// Acknowledgment behavior for received messages
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
193pub enum AckMode {
194    /// Automatically acknowledge messages after successful processing
195    #[default]
196    Auto,
197    /// Manually acknowledge messages
198    Manual,
199    /// No acknowledgment required
200    None,
201}
202
203/// Result of processing a message
204#[derive(Debug, Clone)]
205pub enum ProcessingResult {
206    /// Message was processed successfully
207    Success,
208    /// Message processing failed, should be retried
209    Retry,
210    /// Message processing failed, should be dead-lettered
211    DeadLetter,
212    /// Message should be rejected and discarded
213    Reject,
214}
215
216/// Trait for handling received messages
217#[async_trait]
218pub trait MessageHandler: Send + Sync + 'static {
219    /// Handle a received message
220    async fn handle(&self, message: Message) -> Result<ProcessingResult, MessagingError>;
221
222    /// Called when a message cannot be deserialized
223    async fn on_deserialize_error(&self, _error: &MessagingError) -> ProcessingResult {
224        ProcessingResult::DeadLetter
225    }
226}
227
228/// Function-based message handler
229pub struct FnHandler<F>(pub F);
230
231#[async_trait]
232impl<F, Fut> MessageHandler for FnHandler<F>
233where
234    F: Fn(Message) -> Fut + Send + Sync + 'static,
235    Fut: std::future::Future<Output = Result<ProcessingResult, MessagingError>> + Send,
236{
237    async fn handle(&self, message: Message) -> Result<ProcessingResult, MessagingError> {
238        (self.0)(message).await
239    }
240}
241
242/// Options for publishing a message
243#[derive(Debug, Clone, Default)]
244pub struct PublishOptions {
245    /// Whether to wait for confirmation from the broker
246    pub confirm: bool,
247    /// Timeout for confirmation
248    pub timeout: Option<Duration>,
249    /// Delivery mode (persistent or transient)
250    pub persistent: bool,
251    /// Routing key (for topic-based routing)
252    pub routing_key: Option<String>,
253    /// Exchange name (for RabbitMQ)
254    pub exchange: Option<String>,
255    /// Partition key (for Kafka)
256    pub partition_key: Option<String>,
257}
258
259impl PublishOptions {
260    /// Create options for persistent delivery
261    pub fn persistent() -> Self {
262        Self {
263            persistent: true,
264            confirm: true,
265            ..Default::default()
266        }
267    }
268
269    /// Set the routing key
270    pub fn with_routing_key(mut self, key: impl Into<String>) -> Self {
271        self.routing_key = Some(key.into());
272        self
273    }
274
275    /// Set the exchange
276    pub fn with_exchange(mut self, exchange: impl Into<String>) -> Self {
277        self.exchange = Some(exchange.into());
278        self
279    }
280
281    /// Set the partition key
282    pub fn with_partition_key(mut self, key: impl Into<String>) -> Self {
283        self.partition_key = Some(key.into());
284        self
285    }
286
287    /// Enable confirmation
288    pub fn with_confirm(mut self, timeout: Duration) -> Self {
289        self.confirm = true;
290        self.timeout = Some(timeout);
291        self
292    }
293}
294
295/// Options for subscribing to messages
296#[derive(Debug, Clone, Default)]
297pub struct SubscribeOptions {
298    /// Consumer group/tag
299    pub consumer_group: Option<String>,
300    /// Prefetch count (how many messages to buffer)
301    pub prefetch_count: Option<u16>,
302    /// Acknowledgment mode
303    pub ack_mode: AckMode,
304    /// Whether to start from the beginning (for Kafka)
305    pub from_beginning: bool,
306    /// Filter expression (for some brokers)
307    pub filter: Option<String>,
308    /// Maximum concurrent handlers
309    pub concurrency: Option<usize>,
310}
311
312impl SubscribeOptions {
313    /// Set the consumer group
314    pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
315        self.consumer_group = Some(group.into());
316        self
317    }
318
319    /// Set the prefetch count
320    pub fn with_prefetch(mut self, count: u16) -> Self {
321        self.prefetch_count = Some(count);
322        self
323    }
324
325    /// Set the acknowledgment mode
326    pub fn with_ack_mode(mut self, mode: AckMode) -> Self {
327        self.ack_mode = mode;
328        self
329    }
330
331    /// Start from the beginning (Kafka)
332    pub fn from_beginning(mut self) -> Self {
333        self.from_beginning = true;
334        self
335    }
336
337    /// Set the concurrency level
338    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
339        self.concurrency = Some(concurrency);
340        self
341    }
342}
343
344/// Subscription handle for managing a subscription
345#[async_trait]
346pub trait Subscription: Send + Sync {
347    /// Stop the subscription
348    async fn unsubscribe(&self) -> Result<(), MessagingError>;
349
350    /// Check if the subscription is active
351    fn is_active(&self) -> bool;
352
353    /// Get the topic/queue name
354    fn topic(&self) -> &str;
355}
356
357/// Core trait for message brokers
358#[async_trait]
359pub trait MessageBroker: Send + Sync {
360    /// The subscription handle type
361    type Subscription: Subscription;
362
363    /// Publish a message
364    async fn publish(&self, message: Message) -> Result<(), MessagingError>;
365
366    /// Publish a message with options
367    async fn publish_with_options(
368        &self,
369        message: Message,
370        options: PublishOptions,
371    ) -> Result<(), MessagingError>;
372
373    /// Subscribe to a topic/queue
374    async fn subscribe(
375        &self,
376        topic: &str,
377        handler: Arc<dyn MessageHandler>,
378    ) -> Result<Self::Subscription, MessagingError>;
379
380    /// Subscribe with options
381    async fn subscribe_with_options(
382        &self,
383        topic: &str,
384        handler: Arc<dyn MessageHandler>,
385        options: SubscribeOptions,
386    ) -> Result<Self::Subscription, MessagingError>;
387
388    /// Check if connected to the broker
389    fn is_connected(&self) -> bool;
390
391    /// Close the connection
392    async fn close(&self) -> Result<(), MessagingError>;
393}
394
395/// Builder for creating message broker connections
396pub struct MessagingBuilder {
397    /// Configuration for the message broker
398    pub config: MessagingConfig,
399}
400
401impl MessagingBuilder {
402    /// Create a new builder with the given configuration
403    pub fn new(config: MessagingConfig) -> Self {
404        Self { config }
405    }
406
407    /// Build a RabbitMQ broker
408    #[cfg(feature = "rabbitmq")]
409    pub async fn build_rabbitmq(self) -> Result<rabbitmq::RabbitMqBroker, MessagingError> {
410        rabbitmq::RabbitMqBroker::connect(&self.config).await
411    }
412
413    /// Build a Kafka broker
414    #[cfg(feature = "kafka")]
415    pub async fn build_kafka(self) -> Result<kafka::KafkaBroker, MessagingError> {
416        kafka::KafkaBroker::connect(&self.config).await
417    }
418
419    /// Build a NATS broker
420    #[cfg(feature = "nats")]
421    pub async fn build_nats(self) -> Result<nats::NatsBroker, MessagingError> {
422        nats::NatsBroker::connect(&self.config).await
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429
430    #[test]
431    fn test_message_creation() {
432        let msg = Message::new("test-topic", b"hello world".to_vec());
433        assert_eq!(msg.topic, "test-topic");
434        assert_eq!(msg.payload, b"hello world");
435        assert!(!msg.id.is_empty());
436    }
437
438    #[test]
439    fn test_message_json() {
440        #[derive(Serialize, Deserialize, Debug, PartialEq)]
441        struct TestData {
442            name: String,
443            value: i32,
444        }
445
446        let data = TestData {
447            name: "test".to_string(),
448            value: 42,
449        };
450
451        let msg = Message::json("test-topic", &data).unwrap();
452        assert_eq!(msg.content_type, Some("application/json".to_string()));
453
454        let parsed: TestData = msg.parse_json().unwrap();
455        assert_eq!(parsed, data);
456    }
457
458    #[test]
459    fn test_message_builder() {
460        let msg = Message::new("topic", b"data".to_vec())
461            .with_header("key", "value")
462            .with_correlation_id("corr-123")
463            .with_reply_to("reply-queue")
464            .with_priority(5)
465            .with_ttl(60000);
466
467        assert_eq!(msg.headers.get("key"), Some(&"value".to_string()));
468        assert_eq!(msg.correlation_id, Some("corr-123".to_string()));
469        assert_eq!(msg.reply_to, Some("reply-queue".to_string()));
470        assert_eq!(msg.priority, Some(5));
471        assert_eq!(msg.ttl, Some(60000));
472    }
473
474    #[test]
475    fn test_publish_options() {
476        let opts = PublishOptions::persistent()
477            .with_routing_key("my.routing.key")
478            .with_exchange("my-exchange")
479            .with_confirm(Duration::from_secs(5));
480
481        assert!(opts.persistent);
482        assert!(opts.confirm);
483        assert_eq!(opts.routing_key, Some("my.routing.key".to_string()));
484        assert_eq!(opts.exchange, Some("my-exchange".to_string()));
485        assert_eq!(opts.timeout, Some(Duration::from_secs(5)));
486    }
487
488    #[test]
489    fn test_subscribe_options() {
490        let opts = SubscribeOptions::default()
491            .with_consumer_group("my-group")
492            .with_prefetch(10)
493            .with_ack_mode(AckMode::Manual)
494            .with_concurrency(4);
495
496        assert_eq!(opts.consumer_group, Some("my-group".to_string()));
497        assert_eq!(opts.prefetch_count, Some(10));
498        assert_eq!(opts.ack_mode, AckMode::Manual);
499        assert_eq!(opts.concurrency, Some(4));
500    }
501}