1use serde::{Deserialize, Serialize};
3use std::time::SystemTime;
4use uuid::Uuid;
5
6#[derive(Clone, Debug, Serialize, Deserialize)]
7pub enum TaskStatus {
8 Queued,
9 InProgress,
10 Completed,
11 Failed,
12 DeadLetter,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
16pub struct TaskId(Uuid);
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
24pub struct Task<D: Clone> {
25 pub task_id: TaskId,
26 pub payload: D,
27 pub status: TaskStatus,
28 pub queued_at: SystemTime,
29 pub started_at: Option<SystemTime>,
30 pub finished_at: Option<SystemTime>,
31 pub retries: u32,
32 pub error_msg: Option<String>,
33}
34
35impl<D: Clone> Task<D> {
36 pub fn new(payload: D) -> Self {
37 Task {
38 task_id: TaskId::new(),
39 payload,
40 status: TaskStatus::Queued,
41 queued_at: SystemTime::now(),
42 started_at: None,
43 finished_at: None,
44 retries: 0,
45 error_msg: None,
46 }
47 }
48
49 pub fn set_in_progress(&mut self) {
50 self.status = TaskStatus::InProgress;
51 self.started_at = Some(SystemTime::now());
52 }
53
54 pub fn set_succeed(&mut self) {
55 self.status = TaskStatus::Completed;
56 self.finished_at = Some(SystemTime::now());
57 }
58
59 pub fn set_retry(&mut self, err_msg: &str) {
60 self.status = TaskStatus::Failed;
61 self.finished_at = Some(SystemTime::now());
62 self.retries += 1;
63 self.error_msg = Some(err_msg.to_string());
64 }
65
66 pub fn set_dlq(&mut self, err_msg: &str) {
67 self.status = TaskStatus::DeadLetter;
68 self.finished_at = Some(SystemTime::now());
69 self.error_msg = Some(err_msg.to_string());
70 }
71
72 pub fn set_status(&mut self, new_status: TaskStatus) {
73 self.status = new_status;
74 }
75
76 pub fn get_payload(&self) -> &D {
77 &self.payload
78 }
79}
80
81impl TaskId {
86 pub fn new() -> Self {
87 Self(Uuid::new_v4())
88 }
89
90 pub fn get(&self) -> Uuid {
91 self.0
92 }
93}
94
95impl Default for TaskId {
96 fn default() -> Self {
97 Self::new()
98 }
99}
100
101impl serde::Serialize for TaskId {
103 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
104 where
105 S: serde::Serializer,
106 {
107 self.0.serialize(serializer)
109 }
110}
111
112impl<'de> serde::Deserialize<'de> for TaskId {
114 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
115 where
116 D: serde::Deserializer<'de>,
117 {
118 let uuid = Uuid::deserialize(deserializer)?;
120 Ok(TaskId(uuid))
121 }
122}
123
124impl std::fmt::Display for TaskId {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 write!(f, "TaskId({})", self.0)
127 }
128}
129
130#[cfg(test)]
135mod tests {
136 use core::panic;
137
138 use super::*;
139 use serde::{Deserialize, Serialize};
140
141 #[derive(Clone, Serialize, Deserialize, Default)]
142 struct TaskData {
143 value: u32,
144 }
145
146 #[test]
147 fn task_id_serde() {
148 let task = Task::new(TaskData { value: 1 });
149 let task_id = task.task_id.clone();
150 let serialized_task_value = serde_json::to_value(task).unwrap();
151 let serialized_task_json = serialized_task_value.to_string();
152 let desrialized_task: Task<TaskData> =
153 serde_json::from_str(&serialized_task_json).unwrap();
154 assert_eq!(task_id, desrialized_task.task_id);
155 }
156
157 #[test]
158 fn test_task_creation() {
159 let task = Task::new(TaskData::default());
160 assert!(task.started_at.is_none());
161 assert!(task.finished_at.is_none());
162 assert_eq!(task.retries, 0);
163 assert_eq!(task.payload.value, 0);
164 match task.status {
165 TaskStatus::Queued => {}
166 _ => panic!("Wrong status (task.status)"),
167 };
168 }
169
170 #[test]
171 fn test_in_progress() {
172 let mut task = Task::new(TaskData::default());
173
174 task.set_in_progress();
175 match task.status {
176 TaskStatus::InProgress => {}
177 _ => panic!("Wrong status (task.status)"),
178 };
179 assert!(task.started_at.is_some());
180 assert!(task.finished_at.is_none());
181 }
182
183 #[test]
184 fn test_succeed() {
185 let mut task = Task::new(TaskData::default());
186
187 task.set_succeed();
188 match task.status {
189 TaskStatus::Completed => {}
190 _ => panic!("Wrong status (task.status)"),
191 };
192 assert!(task.finished_at.is_some());
193 assert!(task.started_at.is_none());
194 }
195
196 #[test]
197 fn test_set_retry() {
198 let mut task = Task::new(TaskData::default());
199
200 task.set_retry("Wrong task value");
201 match task.status {
202 TaskStatus::Failed => {}
203 _ => panic!("Wrong status (task.status)"),
204 };
205 assert!(task.finished_at.is_some());
206 assert_eq!(task.retries, 1);
207 assert!(task.error_msg.is_some());
208 assert!(task.started_at.is_none());
209 }
210
211 #[test]
212 fn test_set_dlq() {
213 let mut task = Task::new(TaskData::default());
214
215 task.set_dlq("Wrong task value");
216 match task.status {
217 TaskStatus::DeadLetter => {}
218 _ => panic!("Wrong status (task.status)"),
219 };
220 assert!(task.finished_at.is_some());
221 assert!(task.started_at.is_none());
222 assert!(task.error_msg.is_some());
223 }
224
225 #[test]
226 fn task_flow_succeed() {
227 let mut task = Task::new(TaskData::default());
228
229 task.set_in_progress();
230 task.payload.value += 1;
231
232 std::thread::sleep(std::time::Duration::from_millis(5));
233
234 task.set_retry("Wrong task value");
235 task.payload.value += 1;
236
237 task.set_in_progress();
238 std::thread::sleep(std::time::Duration::from_millis(5));
239
240 task.set_succeed();
241
242 match task.status {
243 TaskStatus::Completed => {}
244 _ => panic!("Wrong status (task.status)"),
245 };
246 assert_eq!(task.retries, 1);
247 assert_eq!(task.get_payload().value, 2);
248 assert!(task.started_at.is_some());
249 assert!(task.finished_at.is_some());
250
251 assert!(
253 task.finished_at
254 .unwrap()
255 .duration_since(task.started_at.unwrap())
256 .unwrap()
257 < std::time::Duration::from_millis(10)
258 );
259 assert!(
261 task.finished_at
262 .unwrap()
263 .duration_since(task.queued_at)
264 .unwrap()
265 >= std::time::Duration::from_millis(10)
266 );
267 }
268
269 #[test]
270 fn test_flow() {
271 let mut task = Task::new(TaskData::default());
272
273 task.set_in_progress();
274 task.payload.value += 1;
275
276 std::thread::sleep(std::time::Duration::from_millis(5));
277
278 task.set_retry("Wrong task value");
279 task.payload.value += 1;
280
281 task.set_in_progress();
282 std::thread::sleep(std::time::Duration::from_millis(5));
283
284 task.set_dlq("Failed to complete task");
285
286 match task.status {
287 TaskStatus::DeadLetter => {}
288 _ => panic!("Wrong status (task.status)"),
289 };
290 assert_eq!(task.retries, 1);
291 assert_eq!(task.get_payload().value, 2);
292 assert!(task.started_at.is_some());
293 assert!(task.finished_at.is_some());
294
295 assert!(
297 task.finished_at
298 .unwrap()
299 .duration_since(task.started_at.unwrap())
300 .unwrap()
301 < std::time::Duration::from_millis(10)
302 );
303 assert!(
305 task.finished_at
306 .unwrap()
307 .duration_since(task.queued_at)
308 .unwrap()
309 >= std::time::Duration::from_millis(10)
310 );
311 }
312}