Skip to main content

kojin_core/
message.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::state::TaskState;
6use crate::task_id::TaskId;
7
8/// A task message that flows through the broker.
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct TaskMessage {
11    /// Unique task identifier.
12    pub id: TaskId,
13    /// Registered task name (e.g., "send_email").
14    pub task_name: String,
15    /// Target queue name.
16    pub queue: String,
17    /// Serialized task payload.
18    pub payload: serde_json::Value,
19    /// Current task state.
20    pub state: TaskState,
21    /// Current retry count.
22    pub retries: u32,
23    /// Maximum allowed retries.
24    pub max_retries: u32,
25    /// When the message was created.
26    pub created_at: DateTime<Utc>,
27    /// When the message was last updated.
28    pub updated_at: DateTime<Utc>,
29    /// Optional ETA — earliest time the task should execute.
30    pub eta: Option<DateTime<Utc>>,
31    /// Arbitrary headers for middleware / tracing propagation.
32    pub headers: HashMap<String, String>,
33
34    // -- Workflow metadata (Phase 2) --
35    /// Parent task ID for workflow tracking.
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub parent_id: Option<TaskId>,
38    /// Correlation ID for tracing an entire workflow.
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub correlation_id: Option<String>,
41    /// Group ID this task belongs to.
42    #[serde(default, skip_serializing_if = "Option::is_none")]
43    pub group_id: Option<String>,
44    /// Total number of tasks in the group.
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub group_total: Option<u32>,
47    /// Chord callback to enqueue when all group members complete.
48    #[serde(default, skip_serializing_if = "Option::is_none")]
49    pub chord_callback: Option<Box<TaskMessage>>,
50
51    // -- Priority & deduplication (Phase 4) --
52    /// Task priority (0–9, higher = more urgent). Broker-specific support.
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub priority: Option<u8>,
55    /// Deduplication key. If set, brokers may skip enqueue when a duplicate exists.
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub dedup_key: Option<String>,
58}
59
60impl TaskMessage {
61    /// Create a new task message with defaults.
62    pub fn new(
63        task_name: impl Into<String>,
64        queue: impl Into<String>,
65        payload: serde_json::Value,
66    ) -> Self {
67        let now = Utc::now();
68        Self {
69            id: TaskId::new(),
70            task_name: task_name.into(),
71            queue: queue.into(),
72            payload,
73            state: TaskState::Pending,
74            retries: 0,
75            max_retries: 3,
76            created_at: now,
77            updated_at: now,
78            eta: None,
79            headers: HashMap::new(),
80            parent_id: None,
81            correlation_id: None,
82            group_id: None,
83            group_total: None,
84            chord_callback: None,
85            priority: None,
86            dedup_key: None,
87        }
88    }
89
90    /// Set max retries.
91    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
92        self.max_retries = max_retries;
93        self
94    }
95
96    /// Set ETA.
97    pub fn with_eta(mut self, eta: DateTime<Utc>) -> Self {
98        self.eta = Some(eta);
99        self
100    }
101
102    /// Add a header.
103    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
104        self.headers.insert(key.into(), value.into());
105        self
106    }
107
108    /// Set parent task ID for workflow tracking.
109    pub fn with_parent_id(mut self, parent_id: TaskId) -> Self {
110        self.parent_id = Some(parent_id);
111        self
112    }
113
114    /// Set correlation ID for tracing an entire workflow.
115    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
116        self.correlation_id = Some(correlation_id.into());
117        self
118    }
119
120    /// Set group metadata.
121    pub fn with_group(mut self, group_id: impl Into<String>, group_total: u32) -> Self {
122        self.group_id = Some(group_id.into());
123        self.group_total = Some(group_total);
124        self
125    }
126
127    /// Set chord callback.
128    pub fn with_chord_callback(mut self, callback: TaskMessage) -> Self {
129        self.chord_callback = Some(Box::new(callback));
130        self
131    }
132
133    /// Set task priority (clamped to 0–9, higher = more urgent).
134    pub fn with_priority(mut self, priority: u8) -> Self {
135        if priority > 9 {
136            tracing::warn!(
137                requested = priority,
138                clamped = 9,
139                "priority clamped to max value 9"
140            );
141        }
142        self.priority = Some(priority.min(9));
143        self
144    }
145
146    /// Set an explicit deduplication key.
147    pub fn with_dedup_key(mut self, key: impl Into<String>) -> Self {
148        self.dedup_key = Some(key.into());
149        self
150    }
151
152    /// Auto-generate a dedup key by hashing `task_name` + `payload`.
153    ///
154    /// Uses FNV-1a (64-bit) which is deterministic across Rust versions and platforms,
155    /// unlike `DefaultHasher` which may change between releases.
156    pub fn with_content_dedup(mut self) -> Self {
157        let input = format!("{}:{}", self.task_name, self.payload);
158        self.dedup_key = Some(format!("content:{:x}", fnv1a_64(input.as_bytes())));
159        self
160    }
161}
162
163/// FNV-1a 64-bit hash — deterministic across platforms and Rust versions.
164fn fnv1a_64(data: &[u8]) -> u64 {
165    let mut hash: u64 = 0xcbf29ce484222325;
166    for &byte in data {
167        hash ^= byte as u64;
168        hash = hash.wrapping_mul(0x100000001b3);
169    }
170    hash
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176
177    #[test]
178    fn task_message_serde_roundtrip() {
179        let msg = TaskMessage::new(
180            "send_email",
181            "default",
182            serde_json::json!({"to": "a@b.com"}),
183        )
184        .with_max_retries(5)
185        .with_header("trace_id", "abc123");
186
187        let json = serde_json::to_string(&msg).unwrap();
188        let deserialized: TaskMessage = serde_json::from_str(&json).unwrap();
189
190        assert_eq!(msg.id, deserialized.id);
191        assert_eq!(msg.task_name, deserialized.task_name);
192        assert_eq!(msg.queue, deserialized.queue);
193        assert_eq!(msg.max_retries, deserialized.max_retries);
194        assert_eq!(msg.headers.get("trace_id"), Some(&"abc123".to_string()));
195    }
196
197    #[test]
198    fn task_message_defaults() {
199        let msg = TaskMessage::new("test", "default", serde_json::Value::Null);
200        assert_eq!(msg.state, TaskState::Pending);
201        assert_eq!(msg.retries, 0);
202        assert_eq!(msg.max_retries, 3);
203        assert!(msg.eta.is_none());
204        assert!(msg.headers.is_empty());
205        assert!(msg.parent_id.is_none());
206        assert!(msg.correlation_id.is_none());
207        assert!(msg.group_id.is_none());
208        assert!(msg.group_total.is_none());
209        assert!(msg.chord_callback.is_none());
210        assert!(msg.priority.is_none());
211        assert!(msg.dedup_key.is_none());
212    }
213
214    #[test]
215    fn backward_compat_deserialization() {
216        // Simulate a v0.1.0 message without workflow fields
217        let old_json = serde_json::json!({
218            "id": "01234567-89ab-cdef-0123-456789abcdef",
219            "task_name": "send_email",
220            "queue": "default",
221            "payload": {"to": "a@b.com"},
222            "state": "pending",
223            "retries": 0,
224            "max_retries": 3,
225            "created_at": "2025-01-01T00:00:00Z",
226            "updated_at": "2025-01-01T00:00:00Z",
227            "eta": null,
228            "headers": {}
229        });
230        let msg: TaskMessage = serde_json::from_value(old_json).unwrap();
231        assert_eq!(msg.task_name, "send_email");
232        assert!(msg.parent_id.is_none());
233        assert!(msg.correlation_id.is_none());
234        assert!(msg.group_id.is_none());
235        assert!(msg.group_total.is_none());
236        assert!(msg.chord_callback.is_none());
237        assert!(msg.priority.is_none());
238        assert!(msg.dedup_key.is_none());
239    }
240
241    #[test]
242    fn priority_and_dedup_roundtrip() {
243        let msg = TaskMessage::new("task", "default", serde_json::json!({"x": 1}))
244            .with_priority(5)
245            .with_dedup_key("my-key");
246
247        let json = serde_json::to_string(&msg).unwrap();
248        let deserialized: TaskMessage = serde_json::from_str(&json).unwrap();
249
250        assert_eq!(deserialized.priority, Some(5));
251        assert_eq!(deserialized.dedup_key.as_deref(), Some("my-key"));
252    }
253
254    #[test]
255    fn priority_clamped_to_9() {
256        let msg = TaskMessage::new("task", "default", serde_json::Value::Null).with_priority(20);
257        assert_eq!(msg.priority, Some(9));
258    }
259
260    #[test]
261    fn content_dedup_deterministic() {
262        let msg1 = TaskMessage::new("task", "q", serde_json::json!({"a": 1})).with_content_dedup();
263        let msg2 = TaskMessage::new("task", "q", serde_json::json!({"a": 1})).with_content_dedup();
264        assert_eq!(msg1.dedup_key, msg2.dedup_key);
265
266        let msg3 =
267            TaskMessage::new("other_task", "q", serde_json::json!({"a": 1})).with_content_dedup();
268        assert_ne!(msg1.dedup_key, msg3.dedup_key);
269    }
270
271    #[test]
272    fn workflow_metadata_roundtrip() {
273        let callback = TaskMessage::new("callback", "default", serde_json::json!({}));
274        let msg = TaskMessage::new("task", "default", serde_json::json!({}))
275            .with_parent_id(TaskId::new())
276            .with_correlation_id("corr-123")
277            .with_group("group-1", 5)
278            .with_chord_callback(callback);
279
280        let json = serde_json::to_string(&msg).unwrap();
281        let deserialized: TaskMessage = serde_json::from_str(&json).unwrap();
282
283        assert_eq!(msg.parent_id, deserialized.parent_id);
284        assert_eq!(msg.correlation_id, deserialized.correlation_id);
285        assert_eq!(msg.group_id, deserialized.group_id);
286        assert_eq!(msg.group_total, deserialized.group_total);
287        assert!(deserialized.chord_callback.is_some());
288        assert_eq!(deserialized.chord_callback.unwrap().task_name, "callback");
289    }
290}