Skip to main content

foxtive_worker/
message.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::fmt::Debug;
5use std::sync::Arc;
6
7use crate::MessageProperties;
8use crate::error::WorkerResult;
9
10/// Pure message data (serializable, no backend references).
11///
12/// This struct contains only the message payload and metadata,
13/// making it safe to serialize, clone, and pass around without
14/// worrying about backend-specific state.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct Message<T> {
17    /// Unique message identifier
18    pub id: String,
19    /// The message payload
20    pub payload: T,
21    /// Message metadata
22    pub metadata: MessageMetadata,
23}
24
25/// Metadata associated with a message.
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct MessageMetadata {
28    /// When the message was received
29    pub received_at: DateTime<Utc>,
30    /// Number of processing attempts
31    pub attempt: u32,
32    /// Source of the message (queue name, stream name, etc.)
33    pub source: String,
34    /// Optional correlation ID for distributed tracing
35    pub correlation_id: Option<String>,
36    /// RabbitMQ routing key (if applicable)
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub routing_key: Option<String>,
39    /// Backend-specific message properties
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub properties: Option<MessageProperties>,
42}
43
44impl MessageMetadata {
45    /// Create new message metadata with current timestamp.
46    pub fn new(source: impl Into<String>) -> Self {
47        Self {
48            received_at: Utc::now(),
49            attempt: 0, // Initialize to 0, will be incremented before first processing
50            source: source.into(),
51            correlation_id: None,
52            routing_key: None,
53            properties: None,
54        }
55    }
56
57    /// Set the correlation ID for distributed tracing.
58    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
59        self.correlation_id = Some(correlation_id.into());
60        self
61    }
62
63    /// Set the routing key (for RabbitMQ messages).
64    pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
65        self.routing_key = Some(routing_key.into());
66        self
67    }
68
69    /// Set message properties
70    pub fn with_properties(mut self, properties: MessageProperties) -> Self {
71        self.properties = Some(properties);
72        self
73    }
74
75    /// Increment the attempt counter.
76    pub fn increment_attempt(&mut self) {
77        self.attempt += 1;
78    }
79}
80
81/// Trait for backend-specific acknowledgment operations.
82///
83/// Each message backend (RabbitMQ, Redis Streams, etc.) provides
84/// its own implementation of this trait, encapsulating the logic
85/// needed to acknowledge or negative-acknowledge a specific message.
86#[async_trait]
87pub trait AckHandle: Send + Sync + Debug {
88    /// Acknowledge successful message processing.
89    async fn ack(&self) -> WorkerResult<()>;
90
91    /// Negative acknowledge message processing failure.
92    ///
93    /// # Arguments
94    /// * `requeue` - If true, the message should be requeued for redelivery.
95    ///               If false, the message should be discarded or moved to a dead letter queue.
96    async fn nack(&self, requeue: bool) -> WorkerResult<()>;
97}
98
99/// Wrapper combining message data with acknowledgment capability.
100///
101/// This is what workers and middleware actually receive. It separates
102/// the pure message data from the acknowledgment operations, preventing
103/// circular dependencies while providing safe concurrent access.
104///
105/// # Example
106/// ```rust
107/// use foxtive_worker::message::ReceivedMessage;
108///
109/// async fn process_message(received: ReceivedMessage<serde_json::Value>) {
110///     // Access message data
111///     println!("Processing message: {}", received.message.id);
112///     
113///     // Process the message...
114///     
115///     // Acknowledge on success
116///     if let Err(e) = received.ack().await {
117///         eprintln!("Failed to ack: {}", e);
118///     }
119/// }
120/// ```
121pub struct ReceivedMessage<T> {
122    /// The pure message data
123    pub message: Message<T>,
124    /// Backend-specific acknowledgment handle (using Arc for shareability)
125    pub ack_handle: Arc<dyn AckHandle>,
126}
127
128impl<T: Send + Sync> ReceivedMessage<T> {
129    /// Create a new ReceivedMessage.
130    pub fn new(message: Message<T>, ack_handle: Arc<dyn AckHandle>) -> Self {
131        Self {
132            message,
133            ack_handle,
134        }
135    }
136
137    /// Acknowledge successful message processing.
138    pub async fn ack(&self) -> WorkerResult<()> {
139        self.ack_handle.ack().await
140    }
141
142    /// Negative acknowledge message processing failure.
143    pub async fn nack(&self, requeue: bool) -> WorkerResult<()> {
144        self.ack_handle.nack(requeue).await
145    }
146
147    /// Extract the inner message, discarding the ack handle.
148    pub fn into_message(self) -> Message<T> {
149        self.message
150    }
151}
152
153impl<T: Clone + Send + Sync> Clone for ReceivedMessage<T> {
154    fn clone(&self) -> Self {
155        Self {
156            message: self.message.clone(),
157            ack_handle: self.ack_handle.clone(),
158        }
159    }
160}
161
162impl<T: Debug> Debug for ReceivedMessage<T> {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        f.debug_struct("ReceivedMessage")
165            .field("message", &self.message)
166            .field("ack_handle", &"<AckHandle>")
167            .finish()
168    }
169}
170
171/// Convenience type for JSON-valued messages.
172pub type JsonMessage = Message<serde_json::Value>;
173
174/// Convenience type for received JSON-valued messages.
175pub type ReceivedJsonMessage = ReceivedMessage<serde_json::Value>;
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use std::sync::Arc;
181    use std::sync::atomic::{AtomicBool, Ordering};
182
183    #[derive(Debug)]
184    struct MockAckHandle {
185        acked: Arc<AtomicBool>,
186        nacked: Arc<AtomicBool>,
187    }
188
189    impl MockAckHandle {
190        fn new() -> (Self, Arc<AtomicBool>, Arc<AtomicBool>) {
191            let acked = Arc::new(AtomicBool::new(false));
192            let nacked = Arc::new(AtomicBool::new(false));
193            (
194                Self {
195                    acked: acked.clone(),
196                    nacked: nacked.clone(),
197                },
198                acked,
199                nacked,
200            )
201        }
202    }
203
204    #[async_trait]
205    impl AckHandle for MockAckHandle {
206        async fn ack(&self) -> WorkerResult<()> {
207            self.acked.store(true, Ordering::SeqCst);
208            Ok(())
209        }
210
211        async fn nack(&self, _requeue: bool) -> WorkerResult<()> {
212            self.nacked.store(true, Ordering::SeqCst);
213            Ok(())
214        }
215    }
216
217    #[tokio::test]
218    async fn test_message_creation() {
219        let message = Message {
220            id: "test-1".to_string(),
221            payload: "test data",
222            metadata: MessageMetadata::new("test-queue"),
223        };
224
225        assert_eq!(message.id, "test-1");
226        assert_eq!(message.payload, "test data");
227        assert_eq!(message.metadata.attempt, 0); // Starts at 0, incremented before processing
228    }
229
230    #[tokio::test]
231    async fn test_received_message_ack() {
232        let (ack_handle, acked, _) = MockAckHandle::new();
233        let message = Message {
234            id: "test-1".to_string(),
235            payload: "test data",
236            metadata: MessageMetadata::new("test-queue"),
237        };
238        let received = ReceivedMessage::new(message, Arc::new(ack_handle));
239
240        received.ack().await.unwrap();
241        assert!(acked.load(Ordering::SeqCst));
242    }
243
244    #[tokio::test]
245    async fn test_received_message_nack() {
246        let (ack_handle, _, nacked) = MockAckHandle::new();
247        let message = Message {
248            id: "test-1".to_string(),
249            payload: "test data",
250            metadata: MessageMetadata::new("test-queue"),
251        };
252        let received = ReceivedMessage::new(message, Arc::new(ack_handle));
253
254        received.nack(true).await.unwrap();
255        assert!(nacked.load(Ordering::SeqCst));
256    }
257
258    #[tokio::test]
259    async fn test_metadata_with_correlation_id() {
260        let metadata = MessageMetadata::new("test-queue").with_correlation_id("corr-123");
261
262        assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
263    }
264
265    #[tokio::test]
266    async fn test_metadata_increment_attempt() {
267        let mut metadata = MessageMetadata::new("test-queue");
268        assert_eq!(metadata.attempt, 0); // Starts at 0
269
270        metadata.increment_attempt();
271        assert_eq!(metadata.attempt, 1); // First increment makes it 1
272    }
273}