Skip to main content

kojin_core/
message.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::state::TaskState;
6use crate::task_id::TaskId;
7
8/// A task message that flows through the broker.
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct TaskMessage {
11    /// Unique task identifier.
12    pub id: TaskId,
13    /// Registered task name (e.g., "send_email").
14    pub task_name: String,
15    /// Target queue name.
16    pub queue: String,
17    /// Serialized task payload.
18    pub payload: serde_json::Value,
19    /// Current task state.
20    pub state: TaskState,
21    /// Current retry count.
22    pub retries: u32,
23    /// Maximum allowed retries.
24    pub max_retries: u32,
25    /// When the message was created.
26    pub created_at: DateTime<Utc>,
27    /// When the message was last updated.
28    pub updated_at: DateTime<Utc>,
29    /// Optional ETA — earliest time the task should execute.
30    pub eta: Option<DateTime<Utc>>,
31    /// Arbitrary headers for middleware / tracing propagation.
32    pub headers: HashMap<String, String>,
33}
34
35impl TaskMessage {
36    /// Create a new task message with defaults.
37    pub fn new(
38        task_name: impl Into<String>,
39        queue: impl Into<String>,
40        payload: serde_json::Value,
41    ) -> Self {
42        let now = Utc::now();
43        Self {
44            id: TaskId::new(),
45            task_name: task_name.into(),
46            queue: queue.into(),
47            payload,
48            state: TaskState::Pending,
49            retries: 0,
50            max_retries: 3,
51            created_at: now,
52            updated_at: now,
53            eta: None,
54            headers: HashMap::new(),
55        }
56    }
57
58    /// Set max retries.
59    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
60        self.max_retries = max_retries;
61        self
62    }
63
64    /// Set ETA.
65    pub fn with_eta(mut self, eta: DateTime<Utc>) -> Self {
66        self.eta = Some(eta);
67        self
68    }
69
70    /// Add a header.
71    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
72        self.headers.insert(key.into(), value.into());
73        self
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use super::*;
80
81    #[test]
82    fn task_message_serde_roundtrip() {
83        let msg = TaskMessage::new(
84            "send_email",
85            "default",
86            serde_json::json!({"to": "a@b.com"}),
87        )
88        .with_max_retries(5)
89        .with_header("trace_id", "abc123");
90
91        let json = serde_json::to_string(&msg).unwrap();
92        let deserialized: TaskMessage = serde_json::from_str(&json).unwrap();
93
94        assert_eq!(msg.id, deserialized.id);
95        assert_eq!(msg.task_name, deserialized.task_name);
96        assert_eq!(msg.queue, deserialized.queue);
97        assert_eq!(msg.max_retries, deserialized.max_retries);
98        assert_eq!(msg.headers.get("trace_id"), Some(&"abc123".to_string()));
99    }
100
101    #[test]
102    fn task_message_defaults() {
103        let msg = TaskMessage::new("test", "default", serde_json::Value::Null);
104        assert_eq!(msg.state, TaskState::Pending);
105        assert_eq!(msg.retries, 0);
106        assert_eq!(msg.max_retries, 3);
107        assert!(msg.eta.is_none());
108        assert!(msg.headers.is_empty());
109    }
110}