foxtive_worker/
message.rs1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::fmt::Debug;
5use std::sync::Arc;
6
7use crate::error::WorkerResult;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct Message<T> {
16 pub id: String,
18 pub payload: T,
20 pub metadata: MessageMetadata,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct MessageMetadata {
27 pub received_at: DateTime<Utc>,
29 pub attempt: u32,
31 pub source: String,
33 pub correlation_id: Option<String>,
35 #[serde(skip_serializing_if = "Option::is_none")]
37 pub routing_key: Option<String>,
38}
39
40impl MessageMetadata {
41 pub fn new(source: impl Into<String>) -> Self {
43 Self {
44 received_at: Utc::now(),
45 attempt: 0, source: source.into(),
47 correlation_id: None,
48 routing_key: None,
49 }
50 }
51
52 pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
54 self.correlation_id = Some(correlation_id.into());
55 self
56 }
57
58 pub fn with_routing_key(mut self, routing_key: impl Into<String>) -> Self {
60 self.routing_key = Some(routing_key.into());
61 self
62 }
63
64 pub fn increment_attempt(&mut self) {
66 self.attempt += 1;
67 }
68}
69
70#[async_trait]
76pub trait AckHandle: Send + Sync + Debug {
77 async fn ack(&self) -> WorkerResult<()>;
79
80 async fn nack(&self, requeue: bool) -> WorkerResult<()>;
86}
87
88pub struct ReceivedMessage<T> {
111 pub message: Message<T>,
113 pub ack_handle: Arc<dyn AckHandle>,
115}
116
117impl<T: Send + Sync> ReceivedMessage<T> {
118 pub fn new(message: Message<T>, ack_handle: Arc<dyn AckHandle>) -> Self {
120 Self {
121 message,
122 ack_handle,
123 }
124 }
125
126 pub async fn ack(&self) -> WorkerResult<()> {
128 self.ack_handle.ack().await
129 }
130
131 pub async fn nack(&self, requeue: bool) -> WorkerResult<()> {
133 self.ack_handle.nack(requeue).await
134 }
135
136 pub fn into_message(self) -> Message<T> {
138 self.message
139 }
140}
141
142impl<T: Clone + Send + Sync> Clone for ReceivedMessage<T> {
143 fn clone(&self) -> Self {
144 Self {
145 message: self.message.clone(),
146 ack_handle: self.ack_handle.clone(),
147 }
148 }
149}
150
151impl<T: Debug> Debug for ReceivedMessage<T> {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 f.debug_struct("ReceivedMessage")
154 .field("message", &self.message)
155 .field("ack_handle", &"<AckHandle>")
156 .finish()
157 }
158}
159
160pub type JsonMessage = Message<serde_json::Value>;
162
163pub type ReceivedJsonMessage = ReceivedMessage<serde_json::Value>;
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use std::sync::Arc;
170 use std::sync::atomic::{AtomicBool, Ordering};
171
172 #[derive(Debug)]
173 struct MockAckHandle {
174 acked: Arc<AtomicBool>,
175 nacked: Arc<AtomicBool>,
176 }
177
178 impl MockAckHandle {
179 fn new() -> (Self, Arc<AtomicBool>, Arc<AtomicBool>) {
180 let acked = Arc::new(AtomicBool::new(false));
181 let nacked = Arc::new(AtomicBool::new(false));
182 (
183 Self {
184 acked: acked.clone(),
185 nacked: nacked.clone(),
186 },
187 acked,
188 nacked,
189 )
190 }
191 }
192
193 #[async_trait]
194 impl AckHandle for MockAckHandle {
195 async fn ack(&self) -> WorkerResult<()> {
196 self.acked.store(true, Ordering::SeqCst);
197 Ok(())
198 }
199
200 async fn nack(&self, _requeue: bool) -> WorkerResult<()> {
201 self.nacked.store(true, Ordering::SeqCst);
202 Ok(())
203 }
204 }
205
206 #[tokio::test]
207 async fn test_message_creation() {
208 let message = Message {
209 id: "test-1".to_string(),
210 payload: "test data",
211 metadata: MessageMetadata::new("test-queue"),
212 };
213
214 assert_eq!(message.id, "test-1");
215 assert_eq!(message.payload, "test data");
216 assert_eq!(message.metadata.attempt, 0); }
218
219 #[tokio::test]
220 async fn test_received_message_ack() {
221 let (ack_handle, acked, _) = MockAckHandle::new();
222 let message = Message {
223 id: "test-1".to_string(),
224 payload: "test data",
225 metadata: MessageMetadata::new("test-queue"),
226 };
227 let received = ReceivedMessage::new(message, Arc::new(ack_handle));
228
229 received.ack().await.unwrap();
230 assert!(acked.load(Ordering::SeqCst));
231 }
232
233 #[tokio::test]
234 async fn test_received_message_nack() {
235 let (ack_handle, _, nacked) = MockAckHandle::new();
236 let message = Message {
237 id: "test-1".to_string(),
238 payload: "test data",
239 metadata: MessageMetadata::new("test-queue"),
240 };
241 let received = ReceivedMessage::new(message, Arc::new(ack_handle));
242
243 received.nack(true).await.unwrap();
244 assert!(nacked.load(Ordering::SeqCst));
245 }
246
247 #[tokio::test]
248 async fn test_metadata_with_correlation_id() {
249 let metadata = MessageMetadata::new("test-queue").with_correlation_id("corr-123");
250
251 assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
252 }
253
254 #[tokio::test]
255 async fn test_metadata_increment_attempt() {
256 let mut metadata = MessageMetadata::new("test-queue");
257 assert_eq!(metadata.attempt, 0); metadata.increment_attempt();
260 assert_eq!(metadata.attempt, 1); }
262}