foxtive_worker/
message.rs1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::fmt::Debug;
5use std::sync::Arc;
6
7use crate::MessageProperties;
8use crate::error::WorkerResult;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct Message<T> {
17 pub id: String,
19 pub payload: T,
21 pub metadata: MessageMetadata,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct MessageMetadata {
28 pub received_at: DateTime<Utc>,
30 pub attempt: u32,
32 pub source: String,
34 pub correlation_id: Option<String>,
36 #[serde(skip_serializing_if = "Option::is_none")]
38 pub routing_key: Option<String>,
39 #[serde(skip_serializing_if = "Option::is_none")]
41 pub properties: Option<MessageProperties>,
42}
43
44impl MessageMetadata {
45 pub fn new(source: impl Into<String>) -> Self {
47 Self {
48 received_at: Utc::now(),
49 attempt: 0, source: source.into(),
51 correlation_id: None,
52 routing_key: None,
53 properties: None,
54 }
55 }
56
57 pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
59 self.correlation_id = Some(correlation_id.into());
60 self
61 }
62
63 pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
65 self.routing_key = Some(routing_key.into());
66 self
67 }
68
69 pub fn with_properties(mut self, properties: MessageProperties) -> Self {
71 self.properties = Some(properties);
72 self
73 }
74
75 pub fn increment_attempt(&mut self) {
77 self.attempt += 1;
78 }
79}
80
81#[async_trait]
87pub trait AckHandle: Send + Sync + Debug {
88 async fn ack(&self) -> WorkerResult<()>;
90
91 async fn nack(&self, requeue: bool) -> WorkerResult<()>;
97
98 async fn retry_with_delay(
111 &self,
112 _message: &Message<serde_json::Value>,
113 _delay_ms: u64,
114 ) -> WorkerResult<()> {
115 self.nack(true).await
117 }
118
119 async fn send_to_dlq(
132 &self,
133 _message: &Message<serde_json::Value>,
134 _error_message: &str,
135 ) -> WorkerResult<()> {
136 Ok(())
138 }
139}
140
141pub struct ReceivedMessage<T> {
164 pub message: Message<T>,
166 pub ack_handle: Arc<dyn AckHandle>,
168}
169
170impl<T: Send + Sync> ReceivedMessage<T> {
171 pub fn new(message: Message<T>, ack_handle: Arc<dyn AckHandle>) -> Self {
173 Self {
174 message,
175 ack_handle,
176 }
177 }
178
179 pub async fn ack(&self) -> WorkerResult<()> {
181 self.ack_handle.ack().await
182 }
183
184 pub async fn nack(&self, requeue: bool) -> WorkerResult<()> {
186 self.ack_handle.nack(requeue).await
187 }
188
189 pub fn into_message(self) -> Message<T> {
191 self.message
192 }
193}
194
195impl ReceivedMessage<serde_json::Value> {
197 pub async fn retry_with_delay(
199 &self,
200 delay_ms: u64,
201 ) -> WorkerResult<()> {
202 self.ack_handle.retry_with_delay(&self.message, delay_ms).await
203 }
204
205 pub async fn send_to_dlq(&self, error_message: &str) -> WorkerResult<()> {
207 self.ack_handle.send_to_dlq(&self.message, error_message).await
208 }
209}
210
211impl<T: Clone + Send + Sync> Clone for ReceivedMessage<T> {
212 fn clone(&self) -> Self {
213 Self {
214 message: self.message.clone(),
215 ack_handle: self.ack_handle.clone(),
216 }
217 }
218}
219
220impl<T: Debug> Debug for ReceivedMessage<T> {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 f.debug_struct("ReceivedMessage")
223 .field("message", &self.message)
224 .field("ack_handle", &"<AckHandle>")
225 .finish()
226 }
227}
228
229pub type JsonMessage = Message<serde_json::Value>;
231
232pub type ReceivedJsonMessage = ReceivedMessage<serde_json::Value>;
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238 use std::sync::Arc;
239 use std::sync::atomic::{AtomicBool, Ordering};
240
241 #[derive(Debug)]
242 struct MockAckHandle {
243 acked: Arc<AtomicBool>,
244 nacked: Arc<AtomicBool>,
245 }
246
247 impl MockAckHandle {
248 fn new() -> (Self, Arc<AtomicBool>, Arc<AtomicBool>) {
249 let acked = Arc::new(AtomicBool::new(false));
250 let nacked = Arc::new(AtomicBool::new(false));
251 (
252 Self {
253 acked: acked.clone(),
254 nacked: nacked.clone(),
255 },
256 acked,
257 nacked,
258 )
259 }
260 }
261
262 #[async_trait]
263 impl AckHandle for MockAckHandle {
264 async fn ack(&self) -> WorkerResult<()> {
265 self.acked.store(true, Ordering::SeqCst);
266 Ok(())
267 }
268
269 async fn nack(&self, _requeue: bool) -> WorkerResult<()> {
270 self.nacked.store(true, Ordering::SeqCst);
271 Ok(())
272 }
273 }
274
275 #[tokio::test]
276 async fn test_message_creation() {
277 let message = Message {
278 id: "test-1".to_string(),
279 payload: "test data",
280 metadata: MessageMetadata::new("test-queue"),
281 };
282
283 assert_eq!(message.id, "test-1");
284 assert_eq!(message.payload, "test data");
285 assert_eq!(message.metadata.attempt, 0); }
287
288 #[tokio::test]
289 async fn test_received_message_ack() {
290 let (ack_handle, acked, _) = MockAckHandle::new();
291 let message = Message {
292 id: "test-1".to_string(),
293 payload: "test data",
294 metadata: MessageMetadata::new("test-queue"),
295 };
296 let received = ReceivedMessage::new(message, Arc::new(ack_handle));
297
298 received.ack().await.unwrap();
299 assert!(acked.load(Ordering::SeqCst));
300 }
301
302 #[tokio::test]
303 async fn test_received_message_nack() {
304 let (ack_handle, _, nacked) = MockAckHandle::new();
305 let message = Message {
306 id: "test-1".to_string(),
307 payload: "test data",
308 metadata: MessageMetadata::new("test-queue"),
309 };
310 let received = ReceivedMessage::new(message, Arc::new(ack_handle));
311
312 received.nack(true).await.unwrap();
313 assert!(nacked.load(Ordering::SeqCst));
314 }
315
316 #[tokio::test]
317 async fn test_metadata_with_correlation_id() {
318 let metadata = MessageMetadata::new("test-queue").with_correlation_id("corr-123");
319
320 assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
321 }
322
323 #[tokio::test]
324 async fn test_metadata_increment_attempt() {
325 let mut metadata = MessageMetadata::new("test-queue");
326 assert_eq!(metadata.attempt, 0); metadata.increment_attempt();
329 assert_eq!(metadata.attempt, 1); }
331}