1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::state::TaskState;
6use crate::task_id::TaskId;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct TaskMessage {
11 pub id: TaskId,
13 pub task_name: String,
15 pub queue: String,
17 pub payload: serde_json::Value,
19 pub state: TaskState,
21 pub retries: u32,
23 pub max_retries: u32,
25 pub created_at: DateTime<Utc>,
27 pub updated_at: DateTime<Utc>,
29 pub eta: Option<DateTime<Utc>>,
31 pub headers: HashMap<String, String>,
33
34 #[serde(default, skip_serializing_if = "Option::is_none")]
37 pub parent_id: Option<TaskId>,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
40 pub correlation_id: Option<String>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
43 pub group_id: Option<String>,
44 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub group_total: Option<u32>,
47 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub chord_callback: Option<Box<TaskMessage>>,
50
51 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub priority: Option<u8>,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub dedup_key: Option<String>,
58}
59
60impl TaskMessage {
61 pub fn new(
63 task_name: impl Into<String>,
64 queue: impl Into<String>,
65 payload: serde_json::Value,
66 ) -> Self {
67 let now = Utc::now();
68 Self {
69 id: TaskId::new(),
70 task_name: task_name.into(),
71 queue: queue.into(),
72 payload,
73 state: TaskState::Pending,
74 retries: 0,
75 max_retries: 3,
76 created_at: now,
77 updated_at: now,
78 eta: None,
79 headers: HashMap::new(),
80 parent_id: None,
81 correlation_id: None,
82 group_id: None,
83 group_total: None,
84 chord_callback: None,
85 priority: None,
86 dedup_key: None,
87 }
88 }
89
90 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
92 self.max_retries = max_retries;
93 self
94 }
95
96 pub fn with_eta(mut self, eta: DateTime<Utc>) -> Self {
98 self.eta = Some(eta);
99 self
100 }
101
102 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
104 self.headers.insert(key.into(), value.into());
105 self
106 }
107
108 pub fn with_parent_id(mut self, parent_id: TaskId) -> Self {
110 self.parent_id = Some(parent_id);
111 self
112 }
113
114 pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
116 self.correlation_id = Some(correlation_id.into());
117 self
118 }
119
120 pub fn with_group(mut self, group_id: impl Into<String>, group_total: u32) -> Self {
122 self.group_id = Some(group_id.into());
123 self.group_total = Some(group_total);
124 self
125 }
126
127 pub fn with_chord_callback(mut self, callback: TaskMessage) -> Self {
129 self.chord_callback = Some(Box::new(callback));
130 self
131 }
132
133 pub fn with_priority(mut self, priority: u8) -> Self {
135 if priority > 9 {
136 tracing::warn!(
137 requested = priority,
138 clamped = 9,
139 "priority clamped to max value 9"
140 );
141 }
142 self.priority = Some(priority.min(9));
143 self
144 }
145
146 pub fn with_dedup_key(mut self, key: impl Into<String>) -> Self {
148 self.dedup_key = Some(key.into());
149 self
150 }
151
152 pub fn with_content_dedup(mut self) -> Self {
157 let input = format!("{}:{}", self.task_name, self.payload);
158 self.dedup_key = Some(format!("content:{:x}", fnv1a_64(input.as_bytes())));
159 self
160 }
161}
162
163fn fnv1a_64(data: &[u8]) -> u64 {
165 let mut hash: u64 = 0xcbf29ce484222325;
166 for &byte in data {
167 hash ^= byte as u64;
168 hash = hash.wrapping_mul(0x100000001b3);
169 }
170 hash
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176
177 #[test]
178 fn task_message_serde_roundtrip() {
179 let msg = TaskMessage::new(
180 "send_email",
181 "default",
182 serde_json::json!({"to": "a@b.com"}),
183 )
184 .with_max_retries(5)
185 .with_header("trace_id", "abc123");
186
187 let json = serde_json::to_string(&msg).unwrap();
188 let deserialized: TaskMessage = serde_json::from_str(&json).unwrap();
189
190 assert_eq!(msg.id, deserialized.id);
191 assert_eq!(msg.task_name, deserialized.task_name);
192 assert_eq!(msg.queue, deserialized.queue);
193 assert_eq!(msg.max_retries, deserialized.max_retries);
194 assert_eq!(msg.headers.get("trace_id"), Some(&"abc123".to_string()));
195 }
196
197 #[test]
198 fn task_message_defaults() {
199 let msg = TaskMessage::new("test", "default", serde_json::Value::Null);
200 assert_eq!(msg.state, TaskState::Pending);
201 assert_eq!(msg.retries, 0);
202 assert_eq!(msg.max_retries, 3);
203 assert!(msg.eta.is_none());
204 assert!(msg.headers.is_empty());
205 assert!(msg.parent_id.is_none());
206 assert!(msg.correlation_id.is_none());
207 assert!(msg.group_id.is_none());
208 assert!(msg.group_total.is_none());
209 assert!(msg.chord_callback.is_none());
210 assert!(msg.priority.is_none());
211 assert!(msg.dedup_key.is_none());
212 }
213
214 #[test]
215 fn backward_compat_deserialization() {
216 let old_json = serde_json::json!({
218 "id": "01234567-89ab-cdef-0123-456789abcdef",
219 "task_name": "send_email",
220 "queue": "default",
221 "payload": {"to": "a@b.com"},
222 "state": "pending",
223 "retries": 0,
224 "max_retries": 3,
225 "created_at": "2025-01-01T00:00:00Z",
226 "updated_at": "2025-01-01T00:00:00Z",
227 "eta": null,
228 "headers": {}
229 });
230 let msg: TaskMessage = serde_json::from_value(old_json).unwrap();
231 assert_eq!(msg.task_name, "send_email");
232 assert!(msg.parent_id.is_none());
233 assert!(msg.correlation_id.is_none());
234 assert!(msg.group_id.is_none());
235 assert!(msg.group_total.is_none());
236 assert!(msg.chord_callback.is_none());
237 assert!(msg.priority.is_none());
238 assert!(msg.dedup_key.is_none());
239 }
240
241 #[test]
242 fn priority_and_dedup_roundtrip() {
243 let msg = TaskMessage::new("task", "default", serde_json::json!({"x": 1}))
244 .with_priority(5)
245 .with_dedup_key("my-key");
246
247 let json = serde_json::to_string(&msg).unwrap();
248 let deserialized: TaskMessage = serde_json::from_str(&json).unwrap();
249
250 assert_eq!(deserialized.priority, Some(5));
251 assert_eq!(deserialized.dedup_key.as_deref(), Some("my-key"));
252 }
253
254 #[test]
255 fn priority_clamped_to_9() {
256 let msg = TaskMessage::new("task", "default", serde_json::Value::Null).with_priority(20);
257 assert_eq!(msg.priority, Some(9));
258 }
259
260 #[test]
261 fn content_dedup_deterministic() {
262 let msg1 = TaskMessage::new("task", "q", serde_json::json!({"a": 1})).with_content_dedup();
263 let msg2 = TaskMessage::new("task", "q", serde_json::json!({"a": 1})).with_content_dedup();
264 assert_eq!(msg1.dedup_key, msg2.dedup_key);
265
266 let msg3 =
267 TaskMessage::new("other_task", "q", serde_json::json!({"a": 1})).with_content_dedup();
268 assert_ne!(msg1.dedup_key, msg3.dedup_key);
269 }
270
271 #[test]
272 fn workflow_metadata_roundtrip() {
273 let callback = TaskMessage::new("callback", "default", serde_json::json!({}));
274 let msg = TaskMessage::new("task", "default", serde_json::json!({}))
275 .with_parent_id(TaskId::new())
276 .with_correlation_id("corr-123")
277 .with_group("group-1", 5)
278 .with_chord_callback(callback);
279
280 let json = serde_json::to_string(&msg).unwrap();
281 let deserialized: TaskMessage = serde_json::from_str(&json).unwrap();
282
283 assert_eq!(msg.parent_id, deserialized.parent_id);
284 assert_eq!(msg.correlation_id, deserialized.correlation_id);
285 assert_eq!(msg.group_id, deserialized.group_id);
286 assert_eq!(msg.group_total, deserialized.group_total);
287 assert!(deserialized.chord_callback.is_some());
288 assert_eq!(deserialized.chord_callback.unwrap().task_name, "callback");
289 }
290}