capp/
task.rs

1// use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::time::SystemTime;
4use uuid::Uuid;
5
6#[derive(Clone, Debug, Serialize, Deserialize)]
7pub enum TaskStatus {
8    Queued,
9    InProgress,
10    Completed,
11    Failed,
12    DeadLetter,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
16pub struct TaskId(Uuid);
17
18/// A `Task` struct represents a single unit of work that will be processed
19/// by a worker. It contains payload of type `D`, which is used by the worker
20/// during processing. The `Task` struct also includes fields for managing
21/// the task's lifecycle, including the task's UUID, the start and
22/// finish times, the number of retries, and any error messages.
23#[derive(Clone, Debug, Serialize, Deserialize)]
24pub struct Task<D: Clone> {
25    pub task_id: TaskId,
26    pub payload: D,
27    pub status: TaskStatus,
28    pub queued_at: SystemTime,
29    pub started_at: Option<SystemTime>,
30    pub finished_at: Option<SystemTime>,
31    pub retries: u32,
32    pub error_msg: Option<String>,
33}
34
35impl<D: Clone> Task<D> {
36    pub fn new(payload: D) -> Self {
37        Task {
38            task_id: TaskId::new(),
39            payload,
40            status: TaskStatus::Queued,
41            queued_at: SystemTime::now(),
42            started_at: None,
43            finished_at: None,
44            retries: 0,
45            error_msg: None,
46        }
47    }
48
49    pub fn set_in_progress(&mut self) {
50        self.status = TaskStatus::InProgress;
51        self.started_at = Some(SystemTime::now());
52    }
53
54    pub fn set_succeed(&mut self) {
55        self.status = TaskStatus::Completed;
56        self.finished_at = Some(SystemTime::now());
57    }
58
59    pub fn set_retry(&mut self, err_msg: &str) {
60        self.status = TaskStatus::Failed;
61        self.finished_at = Some(SystemTime::now());
62        self.retries += 1;
63        self.error_msg = Some(err_msg.to_string());
64    }
65
66    pub fn set_dlq(&mut self, err_msg: &str) {
67        self.status = TaskStatus::DeadLetter;
68        self.finished_at = Some(SystemTime::now());
69        self.error_msg = Some(err_msg.to_string());
70    }
71
72    pub fn set_status(&mut self, new_status: TaskStatus) {
73        self.status = new_status;
74    }
75
76    pub fn get_payload(&self) -> &D {
77        &self.payload
78    }
79}
80
81//*****************************************************************************
82// TaskId with ser/de traits implemented (to convert underlaying Uuid)
83//*****************************************************************************
84
85impl TaskId {
86    pub fn new() -> Self {
87        Self(Uuid::new_v4())
88    }
89
90    pub fn get(&self) -> Uuid {
91        self.0
92    }
93}
94
95impl Default for TaskId {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101// Custom serialization for TaskId.
102impl serde::Serialize for TaskId {
103    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
104    where
105        S: serde::Serializer,
106    {
107        // Directly serialize the inner Uuid.
108        self.0.serialize(serializer)
109    }
110}
111
112// Custom deserialization for TaskId.
113impl<'de> serde::Deserialize<'de> for TaskId {
114    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
115    where
116        D: serde::Deserializer<'de>,
117    {
118        // Deserialize a Uuid and then wrap it in a TaskId.
119        let uuid = Uuid::deserialize(deserializer)?;
120        Ok(TaskId(uuid))
121    }
122}
123
124impl std::fmt::Display for TaskId {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        write!(f, "TaskId({})", self.0)
127    }
128}
129
130//*****************************************************************************
131// Tests
132//*****************************************************************************
133
134#[cfg(test)]
135mod tests {
136    use core::panic;
137
138    use super::*;
139    use serde::{Deserialize, Serialize};
140
141    #[derive(Clone, Serialize, Deserialize, Default)]
142    struct TaskData {
143        value: u32,
144    }
145
146    #[test]
147    fn task_id_serde() {
148        let task = Task::new(TaskData { value: 1 });
149        let task_id = task.task_id.clone();
150        let serialized_task_value = serde_json::to_value(task).unwrap();
151        let serialized_task_json = serialized_task_value.to_string();
152        let desrialized_task: Task<TaskData> =
153            serde_json::from_str(&serialized_task_json).unwrap();
154        assert_eq!(task_id, desrialized_task.task_id);
155    }
156
157    #[test]
158    fn test_task_creation() {
159        let task = Task::new(TaskData::default());
160        assert!(task.started_at.is_none());
161        assert!(task.finished_at.is_none());
162        assert_eq!(task.retries, 0);
163        assert_eq!(task.payload.value, 0);
164        match task.status {
165            TaskStatus::Queued => {}
166            _ => panic!("Wrong status (task.status)"),
167        };
168    }
169
170    #[test]
171    fn test_in_progress() {
172        let mut task = Task::new(TaskData::default());
173
174        task.set_in_progress();
175        match task.status {
176            TaskStatus::InProgress => {}
177            _ => panic!("Wrong status (task.status)"),
178        };
179        assert!(task.started_at.is_some());
180        assert!(task.finished_at.is_none());
181    }
182
183    #[test]
184    fn test_succeed() {
185        let mut task = Task::new(TaskData::default());
186
187        task.set_succeed();
188        match task.status {
189            TaskStatus::Completed => {}
190            _ => panic!("Wrong status (task.status)"),
191        };
192        assert!(task.finished_at.is_some());
193        assert!(task.started_at.is_none());
194    }
195
196    #[test]
197    fn test_set_retry() {
198        let mut task = Task::new(TaskData::default());
199
200        task.set_retry("Wrong task value");
201        match task.status {
202            TaskStatus::Failed => {}
203            _ => panic!("Wrong status (task.status)"),
204        };
205        assert!(task.finished_at.is_some());
206        assert_eq!(task.retries, 1);
207        assert!(task.error_msg.is_some());
208        assert!(task.started_at.is_none());
209    }
210
211    #[test]
212    fn test_set_dlq() {
213        let mut task = Task::new(TaskData::default());
214
215        task.set_dlq("Wrong task value");
216        match task.status {
217            TaskStatus::DeadLetter => {}
218            _ => panic!("Wrong status (task.status)"),
219        };
220        assert!(task.finished_at.is_some());
221        assert!(task.started_at.is_none());
222        assert!(task.error_msg.is_some());
223    }
224
225    #[test]
226    fn task_flow_succeed() {
227        let mut task = Task::new(TaskData::default());
228
229        task.set_in_progress();
230        task.payload.value += 1;
231
232        std::thread::sleep(std::time::Duration::from_millis(5));
233
234        task.set_retry("Wrong task value");
235        task.payload.value += 1;
236
237        task.set_in_progress();
238        std::thread::sleep(std::time::Duration::from_millis(5));
239
240        task.set_succeed();
241
242        match task.status {
243            TaskStatus::Completed => {}
244            _ => panic!("Wrong status (task.status)"),
245        };
246        assert_eq!(task.retries, 1);
247        assert_eq!(task.get_payload().value, 2);
248        assert!(task.started_at.is_some());
249        assert!(task.finished_at.is_some());
250
251        // finished_at - started_at
252        assert!(
253            task.finished_at
254                .unwrap()
255                .duration_since(task.started_at.unwrap())
256                .unwrap()
257                < std::time::Duration::from_millis(10)
258        );
259        // finished_at - queue_at
260        assert!(
261            task.finished_at
262                .unwrap()
263                .duration_since(task.queued_at)
264                .unwrap()
265                >= std::time::Duration::from_millis(10)
266        );
267    }
268
269    #[test]
270    fn test_flow() {
271        let mut task = Task::new(TaskData::default());
272
273        task.set_in_progress();
274        task.payload.value += 1;
275
276        std::thread::sleep(std::time::Duration::from_millis(5));
277
278        task.set_retry("Wrong task value");
279        task.payload.value += 1;
280
281        task.set_in_progress();
282        std::thread::sleep(std::time::Duration::from_millis(5));
283
284        task.set_dlq("Failed to complete task");
285
286        match task.status {
287            TaskStatus::DeadLetter => {}
288            _ => panic!("Wrong status (task.status)"),
289        };
290        assert_eq!(task.retries, 1);
291        assert_eq!(task.get_payload().value, 2);
292        assert!(task.started_at.is_some());
293        assert!(task.finished_at.is_some());
294
295        // finished_at - started_at
296        assert!(
297            task.finished_at
298                .unwrap()
299                .duration_since(task.started_at.unwrap())
300                .unwrap()
301                < std::time::Duration::from_millis(10)
302        );
303        // finished_at - queue_at
304        assert!(
305            task.finished_at
306                .unwrap()
307                .duration_since(task.queued_at)
308                .unwrap()
309                >= std::time::Duration::from_millis(10)
310        );
311    }
312}