Skip to main content

foxtive_worker/
message.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::fmt::Debug;
5use std::sync::Arc;
6
7use crate::MessageProperties;
8use crate::error::WorkerResult;
9
10/// Pure message data (serializable, no backend references).
11///
12/// This struct contains only the message payload and metadata,
13/// making it safe to serialize, clone, and pass around without
14/// worrying about backend-specific state.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct Message<T> {
17    /// Unique message identifier
18    pub id: String,
19    /// The message payload
20    pub payload: T,
21    /// Message metadata
22    pub metadata: MessageMetadata,
23}
24
25/// Metadata associated with a message.
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct MessageMetadata {
28    /// When the message was received
29    pub received_at: DateTime<Utc>,
30    /// Number of processing attempts
31    pub attempt: u32,
32    /// Source of the message (queue name, stream name, etc.)
33    pub source: String,
34    /// Optional correlation ID for distributed tracing
35    pub correlation_id: Option<String>,
36    /// RabbitMQ routing key (if applicable)
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub routing_key: Option<String>,
39    /// Backend-specific message properties
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub properties: Option<MessageProperties>,
42}
43
44impl MessageMetadata {
45    /// Create new message metadata with current timestamp.
46    pub fn new(source: impl Into<String>) -> Self {
47        Self {
48            received_at: Utc::now(),
49            attempt: 0, // Initialize to 0, will be incremented before first processing
50            source: source.into(),
51            correlation_id: None,
52            routing_key: None,
53            properties: None,
54        }
55    }
56
57    /// Set the correlation ID for distributed tracing.
58    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
59        self.correlation_id = Some(correlation_id.into());
60        self
61    }
62
63    /// Set the routing key (for RabbitMQ messages).
64    pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
65        self.routing_key = Some(routing_key.into());
66        self
67    }
68
69    /// Set message properties
70    pub fn with_properties(mut self, properties: MessageProperties) -> Self {
71        self.properties = Some(properties);
72        self
73    }
74
75    /// Increment the attempt counter.
76    pub fn increment_attempt(&mut self) {
77        self.attempt += 1;
78    }
79}
80
81/// Trait for backend-specific acknowledgment operations.
82///
83/// Each message backend (RabbitMQ, Redis Streams, etc.) provides
84/// its own implementation of this trait, encapsulating the logic
85/// needed to acknowledge or negative-acknowledge a specific message.
86#[async_trait]
87pub trait AckHandle: Send + Sync + Debug {
88    /// Acknowledge successful message processing.
89    async fn ack(&self) -> WorkerResult<()>;
90
91    /// Negative acknowledge message processing failure.
92    ///
93    /// # Arguments
94    /// * `requeue` - If true, the message should be requeued for redelivery.
95    ///               If false, the message should be discarded or moved to a dead letter queue.
96    async fn nack(&self, requeue: bool) -> WorkerResult<()>;
97
98    /// Retry message with a delay (backend-specific implementation).
99    ///
100    /// For backends that support delayed retries (e.g., RabbitMQ with DLX+TTL),
101    /// this publishes the message to a retry queue with the specified delay.
102    /// For other backends, this may fall back to immediate requeue or return an error.
103    ///
104    /// # Arguments
105    /// * `message` - The message to retry
106    /// * `delay_ms` - Delay in milliseconds before redelivery
107    ///
108    /// # Returns
109    /// Ok if retry was scheduled, Err if not supported or failed
110    async fn retry_with_delay(
111        &self,
112        _message: &Message<serde_json::Value>,
113        _delay_ms: u64,
114    ) -> WorkerResult<()> {
115        // Default implementation: fall back to immediate requeue
116        self.nack(true).await
117    }
118
119    /// Send message to Dead Letter Queue after retries are exhausted.
120    ///
121    /// This method publishes the message to a DLQ with failure metadata.
122    /// Only applicable for backends that support DLQ (e.g., RabbitMQ).
123    /// For other backends, this is a no-op.
124    ///
125    /// # Arguments
126    /// * `message` - The message that has exhausted all retries
127    /// * `error_message` - Description of why the message failed
128    ///
129    /// # Returns
130    /// Ok if published to DLQ or not supported, Err if publishing failed
131    async fn send_to_dlq(
132        &self,
133        _message: &Message<serde_json::Value>,
134        _error_message: &str,
135    ) -> WorkerResult<()> {
136        // Default implementation: no-op for backends without DLQ support
137        Ok(())
138    }
139}
140
141/// Wrapper combining message data with acknowledgment capability.
142///
143/// This is what workers and middleware actually receive. It separates
144/// the pure message data from the acknowledgment operations, preventing
145/// circular dependencies while providing safe concurrent access.
146///
147/// # Example
148/// ```rust
149/// use foxtive_worker::message::ReceivedMessage;
150///
151/// async fn process_message(received: ReceivedMessage<serde_json::Value>) {
152///     // Access message data
153///     println!("Processing message: {}", received.message.id);
154///     
155///     // Process the message...
156///     
157///     // Acknowledge on success
158///     if let Err(e) = received.ack().await {
159///         eprintln!("Failed to ack: {}", e);
160///     }
161/// }
162/// ```
163pub struct ReceivedMessage<T> {
164    /// The pure message data
165    pub message: Message<T>,
166    /// Backend-specific acknowledgment handle (using Arc for shareability)
167    pub ack_handle: Arc<dyn AckHandle>,
168}
169
170impl<T: Send + Sync> ReceivedMessage<T> {
171    /// Create a new ReceivedMessage.
172    pub fn new(message: Message<T>, ack_handle: Arc<dyn AckHandle>) -> Self {
173        Self {
174            message,
175            ack_handle,
176        }
177    }
178
179    /// Acknowledge successful message processing.
180    pub async fn ack(&self) -> WorkerResult<()> {
181        self.ack_handle.ack().await
182    }
183
184    /// Negative acknowledge message processing failure.
185    pub async fn nack(&self, requeue: bool) -> WorkerResult<()> {
186        self.ack_handle.nack(requeue).await
187    }
188
189    /// Extract the inner message, discarding the ack handle.
190    pub fn into_message(self) -> Message<T> {
191        self.message
192    }
193}
194
195/// Specialized implementation for JSON messages
196impl ReceivedMessage<serde_json::Value> {
197    /// Retry message with a delay (if supported by backend).
198    pub async fn retry_with_delay(
199        &self,
200        delay_ms: u64,
201    ) -> WorkerResult<()> {
202        self.ack_handle.retry_with_delay(&self.message, delay_ms).await
203    }
204
205    /// Send message to Dead Letter Queue after retries are exhausted.
206    pub async fn send_to_dlq(&self, error_message: &str) -> WorkerResult<()> {
207        self.ack_handle.send_to_dlq(&self.message, error_message).await
208    }
209}
210
211impl<T: Clone + Send + Sync> Clone for ReceivedMessage<T> {
212    fn clone(&self) -> Self {
213        Self {
214            message: self.message.clone(),
215            ack_handle: self.ack_handle.clone(),
216        }
217    }
218}
219
220impl<T: Debug> Debug for ReceivedMessage<T> {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        f.debug_struct("ReceivedMessage")
223            .field("message", &self.message)
224            .field("ack_handle", &"<AckHandle>")
225            .finish()
226    }
227}
228
229/// Convenience type for JSON-valued messages.
230pub type JsonMessage = Message<serde_json::Value>;
231
232/// Convenience type for received JSON-valued messages.
233pub type ReceivedJsonMessage = ReceivedMessage<serde_json::Value>;
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use std::sync::Arc;
239    use std::sync::atomic::{AtomicBool, Ordering};
240
241    #[derive(Debug)]
242    struct MockAckHandle {
243        acked: Arc<AtomicBool>,
244        nacked: Arc<AtomicBool>,
245    }
246
247    impl MockAckHandle {
248        fn new() -> (Self, Arc<AtomicBool>, Arc<AtomicBool>) {
249            let acked = Arc::new(AtomicBool::new(false));
250            let nacked = Arc::new(AtomicBool::new(false));
251            (
252                Self {
253                    acked: acked.clone(),
254                    nacked: nacked.clone(),
255                },
256                acked,
257                nacked,
258            )
259        }
260    }
261
262    #[async_trait]
263    impl AckHandle for MockAckHandle {
264        async fn ack(&self) -> WorkerResult<()> {
265            self.acked.store(true, Ordering::SeqCst);
266            Ok(())
267        }
268
269        async fn nack(&self, _requeue: bool) -> WorkerResult<()> {
270            self.nacked.store(true, Ordering::SeqCst);
271            Ok(())
272        }
273    }
274
275    #[tokio::test]
276    async fn test_message_creation() {
277        let message = Message {
278            id: "test-1".to_string(),
279            payload: "test data",
280            metadata: MessageMetadata::new("test-queue"),
281        };
282
283        assert_eq!(message.id, "test-1");
284        assert_eq!(message.payload, "test data");
285        assert_eq!(message.metadata.attempt, 0); // Starts at 0, incremented before processing
286    }
287
288    #[tokio::test]
289    async fn test_received_message_ack() {
290        let (ack_handle, acked, _) = MockAckHandle::new();
291        let message = Message {
292            id: "test-1".to_string(),
293            payload: "test data",
294            metadata: MessageMetadata::new("test-queue"),
295        };
296        let received = ReceivedMessage::new(message, Arc::new(ack_handle));
297
298        received.ack().await.unwrap();
299        assert!(acked.load(Ordering::SeqCst));
300    }
301
302    #[tokio::test]
303    async fn test_received_message_nack() {
304        let (ack_handle, _, nacked) = MockAckHandle::new();
305        let message = Message {
306            id: "test-1".to_string(),
307            payload: "test data",
308            metadata: MessageMetadata::new("test-queue"),
309        };
310        let received = ReceivedMessage::new(message, Arc::new(ack_handle));
311
312        received.nack(true).await.unwrap();
313        assert!(nacked.load(Ordering::SeqCst));
314    }
315
316    #[tokio::test]
317    async fn test_metadata_with_correlation_id() {
318        let metadata = MessageMetadata::new("test-queue").with_correlation_id("corr-123");
319
320        assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
321    }
322
323    #[tokio::test]
324    async fn test_metadata_increment_attempt() {
325        let mut metadata = MessageMetadata::new("test-queue");
326        assert_eq!(metadata.attempt, 0); // Starts at 0
327
328        metadata.increment_attempt();
329        assert_eq!(metadata.attempt, 1); // First increment makes it 1
330    }
331}