Skip to main content

foxtive_worker/
message.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::fmt::Debug;
5use std::sync::Arc;
6
7use crate::error::WorkerResult;
8
9/// Pure message data (serializable, no backend references).
10///
11/// This struct contains only the message payload and metadata,
12/// making it safe to serialize, clone, and pass around without
13/// worrying about backend-specific state.
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct Message<T> {
16    /// Unique message identifier
17    pub id: String,
18    /// The message payload
19    pub payload: T,
20    /// Message metadata
21    pub metadata: MessageMetadata,
22}
23
24/// Metadata associated with a message.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct MessageMetadata {
27    /// When the message was received
28    pub received_at: DateTime<Utc>,
29    /// Number of processing attempts
30    pub attempt: u32,
31    /// Source of the message (queue name, stream name, etc.)
32    pub source: String,
33    /// Optional correlation ID for distributed tracing
34    pub correlation_id: Option<String>,
35    /// RabbitMQ routing key (if applicable)
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub routing_key: Option<String>,
38}
39
40impl MessageMetadata {
41    /// Create new message metadata with current timestamp.
42    pub fn new(source: impl Into<String>) -> Self {
43        Self {
44            received_at: Utc::now(),
45            attempt: 0, // Initialize to 0, will be incremented before first processing
46            source: source.into(),
47            correlation_id: None,
48            routing_key: None,
49        }
50    }
51
52    /// Set the correlation ID for distributed tracing.
53    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
54        self.correlation_id = Some(correlation_id.into());
55        self
56    }
57
58    /// Set the routing key (for RabbitMQ messages).
59    pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
60        self.routing_key = Some(routing_key.into());
61        self
62    }
63
64    /// Increment the attempt counter.
65    pub fn increment_attempt(&mut self) {
66        self.attempt += 1;
67    }
68}
69
70/// Trait for backend-specific acknowledgment operations.
71///
72/// Each message backend (RabbitMQ, Redis Streams, etc.) provides
73/// its own implementation of this trait, encapsulating the logic
74/// needed to acknowledge or negative-acknowledge a specific message.
75#[async_trait]
76pub trait AckHandle: Send + Sync + Debug {
77    /// Acknowledge successful message processing.
78    async fn ack(&self) -> WorkerResult<()>;
79
80    /// Negative acknowledge message processing failure.
81    ///
82    /// # Arguments
83    /// * `requeue` - If true, the message should be requeued for redelivery.
84    ///               If false, the message should be discarded or moved to a dead letter queue.
85    async fn nack(&self, requeue: bool) -> WorkerResult<()>;
86}
87
88/// Wrapper combining message data with acknowledgment capability.
89///
90/// This is what workers and middleware actually receive. It separates
91/// the pure message data from the acknowledgment operations, preventing
92/// circular dependencies while providing safe concurrent access.
93///
94/// # Example
95/// ```rust
96/// use foxtive_worker::message::ReceivedMessage;
97///
98/// async fn process_message(received: ReceivedMessage<serde_json::Value>) {
99///     // Access message data
100///     println!("Processing message: {}", received.message.id);
101///     
102///     // Process the message...
103///     
104///     // Acknowledge on success
105///     if let Err(e) = received.ack().await {
106///         eprintln!("Failed to ack: {}", e);
107///     }
108/// }
109/// ```
110pub struct ReceivedMessage<T> {
111    /// The pure message data
112    pub message: Message<T>,
113    /// Backend-specific acknowledgment handle (using Arc for shareability)
114    pub ack_handle: Arc<dyn AckHandle>,
115}
116
117impl<T: Send + Sync> ReceivedMessage<T> {
118    /// Create a new ReceivedMessage.
119    pub fn new(message: Message<T>, ack_handle: Arc<dyn AckHandle>) -> Self {
120        Self {
121            message,
122            ack_handle,
123        }
124    }
125
126    /// Acknowledge successful message processing.
127    pub async fn ack(&self) -> WorkerResult<()> {
128        self.ack_handle.ack().await
129    }
130
131    /// Negative acknowledge message processing failure.
132    pub async fn nack(&self, requeue: bool) -> WorkerResult<()> {
133        self.ack_handle.nack(requeue).await
134    }
135
136    /// Extract the inner message, discarding the ack handle.
137    pub fn into_message(self) -> Message<T> {
138        self.message
139    }
140}
141
142impl<T: Clone + Send + Sync> Clone for ReceivedMessage<T> {
143    fn clone(&self) -> Self {
144        Self {
145            message: self.message.clone(),
146            ack_handle: self.ack_handle.clone(),
147        }
148    }
149}
150
151impl<T: Debug> Debug for ReceivedMessage<T> {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        f.debug_struct("ReceivedMessage")
154            .field("message", &self.message)
155            .field("ack_handle", &"<AckHandle>")
156            .finish()
157    }
158}
159
160/// Convenience type for JSON-valued messages.
161pub type JsonMessage = Message<serde_json::Value>;
162
163/// Convenience type for received JSON-valued messages.
164pub type ReceivedJsonMessage = ReceivedMessage<serde_json::Value>;
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use std::sync::atomic::{AtomicBool, Ordering};
170    use std::sync::Arc;
171
172    #[derive(Debug)]
173    struct MockAckHandle {
174        acked: Arc<AtomicBool>,
175        nacked: Arc<AtomicBool>,
176    }
177
178    impl MockAckHandle {
179        fn new() -> (Self, Arc<AtomicBool>, Arc<AtomicBool>) {
180            let acked = Arc::new(AtomicBool::new(false));
181            let nacked = Arc::new(AtomicBool::new(false));
182            (
183                Self {
184                    acked: acked.clone(),
185                    nacked: nacked.clone(),
186                },
187                acked,
188                nacked,
189            )
190        }
191    }
192
193    #[async_trait]
194    impl AckHandle for MockAckHandle {
195        async fn ack(&self) -> WorkerResult<()> {
196            self.acked.store(true, Ordering::SeqCst);
197            Ok(())
198        }
199
200        async fn nack(&self, _requeue: bool) -> WorkerResult<()> {
201            self.nacked.store(true, Ordering::SeqCst);
202            Ok(())
203        }
204    }
205
206    #[tokio::test]
207    async fn test_message_creation() {
208        let message = Message {
209            id: "test-1".to_string(),
210            payload: "test data",
211            metadata: MessageMetadata::new("test-queue"),
212        };
213
214        assert_eq!(message.id, "test-1");
215        assert_eq!(message.payload, "test data");
216        assert_eq!(message.metadata.attempt, 0); // Starts at 0, incremented before processing
217    }
218
219    #[tokio::test]
220    async fn test_received_message_ack() {
221        let (ack_handle, acked, _) = MockAckHandle::new();
222        let message = Message {
223            id: "test-1".to_string(),
224            payload: "test data",
225            metadata: MessageMetadata::new("test-queue"),
226        };
227        let received = ReceivedMessage::new(message, Arc::new(ack_handle));
228
229        received.ack().await.unwrap();
230        assert!(acked.load(Ordering::SeqCst));
231    }
232
233    #[tokio::test]
234    async fn test_received_message_nack() {
235        let (ack_handle, _, nacked) = MockAckHandle::new();
236        let message = Message {
237            id: "test-1".to_string(),
238            payload: "test data",
239            metadata: MessageMetadata::new("test-queue"),
240        };
241        let received = ReceivedMessage::new(message, Arc::new(ack_handle));
242
243        received.nack(true).await.unwrap();
244        assert!(nacked.load(Ordering::SeqCst));
245    }
246
247    #[tokio::test]
248    async fn test_metadata_with_correlation_id() {
249        let metadata = MessageMetadata::new("test-queue")
250            .with_correlation_id("corr-123");
251
252        assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
253    }
254
255    #[tokio::test]
256    async fn test_metadata_increment_attempt() {
257        let mut metadata = MessageMetadata::new("test-queue");
258        assert_eq!(metadata.attempt, 0); // Starts at 0
259
260        metadata.increment_attempt();
261        assert_eq!(metadata.attempt, 1); // First increment makes it 1
262    }
263}