1use chrono::{DateTime, Utc};
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31use uuid::Uuid;
32
33#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
35#[serde(rename_all = "UPPERCASE")]
36pub enum TaskStatus {
37 #[default]
39 Pending,
40 Received,
42 Started,
44 Success,
46 Failure,
48 Retry,
50 Revoked,
52}
53
54impl TaskStatus {
55 #[inline]
57 pub fn is_terminal(&self) -> bool {
58 matches!(
59 self,
60 TaskStatus::Success | TaskStatus::Failure | TaskStatus::Revoked
61 )
62 }
63
64 #[inline]
66 pub fn is_success(&self) -> bool {
67 matches!(self, TaskStatus::Success)
68 }
69
70 #[inline]
72 pub fn is_failure(&self) -> bool {
73 matches!(self, TaskStatus::Failure)
74 }
75
76 #[inline]
78 pub fn is_ready(&self) -> bool {
79 self.is_terminal()
80 }
81
82 #[inline]
84 pub fn as_str(&self) -> &'static str {
85 match self {
86 TaskStatus::Pending => "PENDING",
87 TaskStatus::Received => "RECEIVED",
88 TaskStatus::Started => "STARTED",
89 TaskStatus::Success => "SUCCESS",
90 TaskStatus::Failure => "FAILURE",
91 TaskStatus::Retry => "RETRY",
92 TaskStatus::Revoked => "REVOKED",
93 }
94 }
95}
96
97impl std::fmt::Display for TaskStatus {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 write!(f, "{}", self.as_str())
100 }
101}
102
103impl std::str::FromStr for TaskStatus {
104 type Err = String;
105
106 fn from_str(s: &str) -> Result<Self, Self::Err> {
107 match s.to_uppercase().as_str() {
108 "PENDING" => Ok(TaskStatus::Pending),
109 "RECEIVED" => Ok(TaskStatus::Received),
110 "STARTED" => Ok(TaskStatus::Started),
111 "SUCCESS" => Ok(TaskStatus::Success),
112 "FAILURE" => Ok(TaskStatus::Failure),
113 "RETRY" => Ok(TaskStatus::Retry),
114 "REVOKED" => Ok(TaskStatus::Revoked),
115 _ => Err(format!("Invalid task status: {}", s)),
116 }
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
122pub struct ExceptionInfo {
123 #[serde(rename = "exc_type")]
125 pub exc_type: String,
126
127 #[serde(rename = "exc_message")]
129 pub exc_message: String,
130
131 #[serde(skip_serializing_if = "Option::is_none")]
133 pub traceback: Option<String>,
134}
135
136impl ExceptionInfo {
137 pub fn new(exc_type: impl Into<String>, exc_message: impl Into<String>) -> Self {
139 Self {
140 exc_type: exc_type.into(),
141 exc_message: exc_message.into(),
142 traceback: None,
143 }
144 }
145
146 #[must_use]
148 pub fn with_traceback(mut self, traceback: impl Into<String>) -> Self {
149 self.traceback = Some(traceback.into());
150 self
151 }
152}
153
154#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
156pub struct ResultMessage {
157 pub task_id: Uuid,
159
160 pub status: TaskStatus,
162
163 #[serde(skip_serializing_if = "Option::is_none")]
165 pub result: Option<serde_json::Value>,
166
167 #[serde(skip_serializing_if = "Option::is_none")]
169 pub traceback: Option<String>,
170
171 #[serde(skip_serializing_if = "Option::is_none")]
173 pub exception: Option<ExceptionInfo>,
174
175 #[serde(skip_serializing_if = "Option::is_none")]
177 pub date_done: Option<DateTime<Utc>>,
178
179 #[serde(skip_serializing_if = "Option::is_none")]
181 pub task: Option<String>,
182
183 #[serde(skip_serializing_if = "Option::is_none")]
185 pub worker: Option<String>,
186
187 #[serde(skip_serializing_if = "Option::is_none")]
189 pub retries: Option<u32>,
190
191 #[serde(skip_serializing_if = "Option::is_none")]
193 pub parent_id: Option<Uuid>,
194
195 #[serde(skip_serializing_if = "Option::is_none")]
197 pub root_id: Option<Uuid>,
198
199 #[serde(skip_serializing_if = "Option::is_none")]
201 pub group_id: Option<Uuid>,
202
203 #[serde(default, skip_serializing_if = "Vec::is_empty")]
205 pub children: Vec<Uuid>,
206
207 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
209 pub meta: HashMap<String, serde_json::Value>,
210}
211
212impl ResultMessage {
213 pub fn new(task_id: Uuid, status: TaskStatus) -> Self {
215 Self {
216 task_id,
217 status,
218 result: None,
219 traceback: None,
220 exception: None,
221 date_done: None,
222 task: None,
223 worker: None,
224 retries: None,
225 parent_id: None,
226 root_id: None,
227 group_id: None,
228 children: Vec::new(),
229 meta: HashMap::new(),
230 }
231 }
232
233 pub fn pending(task_id: Uuid) -> Self {
235 Self::new(task_id, TaskStatus::Pending)
236 }
237
238 pub fn success(task_id: Uuid, result: serde_json::Value) -> Self {
240 Self {
241 result: Some(result),
242 date_done: Some(Utc::now()),
243 ..Self::new(task_id, TaskStatus::Success)
244 }
245 }
246
247 pub fn failure(task_id: Uuid, exc_type: &str, exc_message: &str) -> Self {
249 Self {
250 exception: Some(ExceptionInfo::new(exc_type, exc_message)),
251 date_done: Some(Utc::now()),
252 ..Self::new(task_id, TaskStatus::Failure)
253 }
254 }
255
256 pub fn failure_with_traceback(
258 task_id: Uuid,
259 exc_type: &str,
260 exc_message: &str,
261 traceback: &str,
262 ) -> Self {
263 Self {
264 exception: Some(ExceptionInfo::new(exc_type, exc_message).with_traceback(traceback)),
265 traceback: Some(traceback.to_string()),
266 date_done: Some(Utc::now()),
267 ..Self::new(task_id, TaskStatus::Failure)
268 }
269 }
270
271 pub fn retry(task_id: Uuid, retries: u32) -> Self {
273 Self {
274 retries: Some(retries),
275 ..Self::new(task_id, TaskStatus::Retry)
276 }
277 }
278
279 pub fn revoked(task_id: Uuid) -> Self {
281 Self {
282 date_done: Some(Utc::now()),
283 ..Self::new(task_id, TaskStatus::Revoked)
284 }
285 }
286
287 pub fn started(task_id: Uuid) -> Self {
289 Self::new(task_id, TaskStatus::Started)
290 }
291
292 pub fn received(task_id: Uuid) -> Self {
294 Self::new(task_id, TaskStatus::Received)
295 }
296
297 #[must_use]
299 pub fn with_task(mut self, task: impl Into<String>) -> Self {
300 self.task = Some(task.into());
301 self
302 }
303
304 #[must_use]
306 pub fn with_worker(mut self, worker: impl Into<String>) -> Self {
307 self.worker = Some(worker.into());
308 self
309 }
310
311 #[must_use]
313 pub fn with_parent(mut self, parent_id: Uuid) -> Self {
314 self.parent_id = Some(parent_id);
315 self
316 }
317
318 #[must_use]
320 pub fn with_root(mut self, root_id: Uuid) -> Self {
321 self.root_id = Some(root_id);
322 self
323 }
324
325 #[must_use]
327 pub fn with_group(mut self, group_id: Uuid) -> Self {
328 self.group_id = Some(group_id);
329 self
330 }
331
332 #[must_use]
334 pub fn with_child(mut self, child_id: Uuid) -> Self {
335 self.children.push(child_id);
336 self
337 }
338
339 #[must_use]
341 pub fn with_children(mut self, children: Vec<Uuid>) -> Self {
342 self.children = children;
343 self
344 }
345
346 #[must_use]
348 pub fn with_meta(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
349 self.meta.insert(key.into(), value);
350 self
351 }
352
353 #[must_use]
355 pub fn with_retries(mut self, retries: u32) -> Self {
356 self.retries = Some(retries);
357 self
358 }
359
360 #[must_use]
362 pub fn with_date_done(mut self, date_done: DateTime<Utc>) -> Self {
363 self.date_done = Some(date_done);
364 self
365 }
366
367 pub fn add_meta(&mut self, key: impl Into<String>, value: serde_json::Value) {
369 self.meta.insert(key.into(), value);
370 }
371
372 #[inline]
374 pub fn get_meta(&self, key: &str) -> Option<&serde_json::Value> {
375 self.meta.get(key)
376 }
377
378 #[inline]
380 pub fn has_meta(&self, key: &str) -> bool {
381 self.meta.contains_key(key)
382 }
383
384 #[inline]
386 pub fn meta_len(&self) -> usize {
387 self.meta.len()
388 }
389
390 #[inline]
392 pub fn retry_count(&self) -> u32 {
393 self.retries.unwrap_or(0)
394 }
395
396 #[inline]
398 pub fn is_ready(&self) -> bool {
399 self.status.is_ready()
400 }
401
402 #[inline]
404 pub fn is_success(&self) -> bool {
405 self.status.is_success()
406 }
407
408 #[inline]
410 pub fn is_failure(&self) -> bool {
411 self.status.is_failure()
412 }
413
414 #[inline]
416 pub fn get_result(&self) -> Option<&serde_json::Value> {
417 if self.is_success() {
418 self.result.as_ref()
419 } else {
420 None
421 }
422 }
423
424 #[inline]
426 pub fn get_exception(&self) -> Option<&ExceptionInfo> {
427 if self.is_failure() {
428 self.exception.as_ref()
429 } else {
430 None
431 }
432 }
433
434 pub fn to_json(&self) -> Result<Vec<u8>, serde_json::Error> {
436 serde_json::to_vec(self)
437 }
438
439 pub fn from_json(bytes: &[u8]) -> Result<Self, serde_json::Error> {
441 serde_json::from_slice(bytes)
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448 use serde_json::json;
449
450 #[test]
451 fn test_task_status_is_terminal() {
452 assert!(!TaskStatus::Pending.is_terminal());
453 assert!(!TaskStatus::Received.is_terminal());
454 assert!(!TaskStatus::Started.is_terminal());
455 assert!(TaskStatus::Success.is_terminal());
456 assert!(TaskStatus::Failure.is_terminal());
457 assert!(!TaskStatus::Retry.is_terminal());
458 assert!(TaskStatus::Revoked.is_terminal());
459 }
460
461 #[test]
462 fn test_task_status_as_str() {
463 assert_eq!(TaskStatus::Pending.as_str(), "PENDING");
464 assert_eq!(TaskStatus::Success.as_str(), "SUCCESS");
465 assert_eq!(TaskStatus::Failure.as_str(), "FAILURE");
466 }
467
468 #[test]
469 fn test_task_status_display() {
470 assert_eq!(TaskStatus::Success.to_string(), "SUCCESS");
471 assert_eq!(TaskStatus::Failure.to_string(), "FAILURE");
472 }
473
474 #[test]
475 fn test_task_status_default() {
476 assert_eq!(TaskStatus::default(), TaskStatus::Pending);
477 }
478
479 #[test]
480 fn test_task_status_from_str() {
481 use std::str::FromStr;
482
483 assert_eq!(
484 TaskStatus::from_str("PENDING").unwrap(),
485 TaskStatus::Pending
486 );
487 assert_eq!(
488 TaskStatus::from_str("pending").unwrap(),
489 TaskStatus::Pending
490 );
491 assert_eq!(
492 TaskStatus::from_str("RECEIVED").unwrap(),
493 TaskStatus::Received
494 );
495 assert_eq!(
496 TaskStatus::from_str("STARTED").unwrap(),
497 TaskStatus::Started
498 );
499 assert_eq!(
500 TaskStatus::from_str("SUCCESS").unwrap(),
501 TaskStatus::Success
502 );
503 assert_eq!(
504 TaskStatus::from_str("success").unwrap(),
505 TaskStatus::Success
506 );
507 assert_eq!(
508 TaskStatus::from_str("FAILURE").unwrap(),
509 TaskStatus::Failure
510 );
511 assert_eq!(TaskStatus::from_str("RETRY").unwrap(), TaskStatus::Retry);
512 assert_eq!(
513 TaskStatus::from_str("REVOKED").unwrap(),
514 TaskStatus::Revoked
515 );
516
517 assert!(TaskStatus::from_str("INVALID").is_err());
518 assert!(TaskStatus::from_str("").is_err());
519 }
520
521 #[test]
522 fn test_result_message_success() {
523 let task_id = Uuid::new_v4();
524 let result = ResultMessage::success(task_id, json!({"answer": 42}));
525
526 assert_eq!(result.task_id, task_id);
527 assert!(result.is_success());
528 assert!(result.is_ready());
529 assert!(!result.is_failure());
530 assert!(result.date_done.is_some());
531 assert_eq!(result.get_result(), Some(&json!({"answer": 42})));
532 }
533
534 #[test]
535 fn test_result_message_failure() {
536 let task_id = Uuid::new_v4();
537 let result = ResultMessage::failure(task_id, "ValueError", "Invalid input");
538
539 assert_eq!(result.task_id, task_id);
540 assert!(result.is_failure());
541 assert!(result.is_ready());
542 assert!(!result.is_success());
543 assert!(result.date_done.is_some());
544
545 let exc = result.get_exception().unwrap();
546 assert_eq!(exc.exc_type, "ValueError");
547 assert_eq!(exc.exc_message, "Invalid input");
548 }
549
550 #[test]
551 fn test_result_message_failure_with_traceback() {
552 let task_id = Uuid::new_v4();
553 let traceback = "Traceback (most recent call last):\n File \"test.py\"...";
554 let result = ResultMessage::failure_with_traceback(
555 task_id,
556 "RuntimeError",
557 "Test failed",
558 traceback,
559 );
560
561 assert!(result.is_failure());
562 assert_eq!(result.traceback, Some(traceback.to_string()));
563 assert_eq!(
564 result.exception.as_ref().unwrap().traceback,
565 Some(traceback.to_string())
566 );
567 }
568
569 #[test]
570 fn test_result_message_pending() {
571 let task_id = Uuid::new_v4();
572 let result = ResultMessage::pending(task_id);
573
574 assert_eq!(result.status, TaskStatus::Pending);
575 assert!(!result.is_ready());
576 }
577
578 #[test]
579 fn test_result_message_retry() {
580 let task_id = Uuid::new_v4();
581 let result = ResultMessage::retry(task_id, 3);
582
583 assert_eq!(result.status, TaskStatus::Retry);
584 assert_eq!(result.retries, Some(3));
585 assert!(!result.is_ready());
586 }
587
588 #[test]
589 fn test_result_message_revoked() {
590 let task_id = Uuid::new_v4();
591 let result = ResultMessage::revoked(task_id);
592
593 assert_eq!(result.status, TaskStatus::Revoked);
594 assert!(result.is_ready());
595 assert!(result.date_done.is_some());
596 }
597
598 #[test]
599 fn test_result_message_builders() {
600 let task_id = Uuid::new_v4();
601 let parent_id = Uuid::new_v4();
602 let root_id = Uuid::new_v4();
603 let group_id = Uuid::new_v4();
604 let child_id = Uuid::new_v4();
605
606 let result = ResultMessage::success(task_id, json!(100))
607 .with_task("tasks.add")
608 .with_worker("worker-1")
609 .with_parent(parent_id)
610 .with_root(root_id)
611 .with_group(group_id)
612 .with_child(child_id)
613 .with_meta("custom", json!("value"));
614
615 assert_eq!(result.task, Some("tasks.add".to_string()));
616 assert_eq!(result.worker, Some("worker-1".to_string()));
617 assert_eq!(result.parent_id, Some(parent_id));
618 assert_eq!(result.root_id, Some(root_id));
619 assert_eq!(result.group_id, Some(group_id));
620 assert_eq!(result.children, vec![child_id]);
621 assert_eq!(result.meta.get("custom"), Some(&json!("value")));
622 }
623
624 #[test]
625 fn test_result_message_json_round_trip() {
626 let task_id = Uuid::new_v4();
627 let result = ResultMessage::success(task_id, json!({"data": [1, 2, 3]}))
628 .with_task("tasks.process")
629 .with_worker("worker-2");
630
631 let json_bytes = result.to_json().unwrap();
632 let decoded = ResultMessage::from_json(&json_bytes).unwrap();
633
634 assert_eq!(decoded.task_id, task_id);
635 assert_eq!(decoded.status, TaskStatus::Success);
636 assert_eq!(decoded.task, Some("tasks.process".to_string()));
637 assert_eq!(decoded.worker, Some("worker-2".to_string()));
638 }
639
640 #[test]
641 fn test_result_message_serialization_format() {
642 let task_id = Uuid::new_v4();
643 let result = ResultMessage::success(task_id, json!(42));
644
645 let json_str = serde_json::to_string(&result).unwrap();
646 let value: serde_json::Value = serde_json::from_str(&json_str).unwrap();
647
648 assert!(value.get("task_id").is_some());
650 assert!(value.get("status").is_some());
651 assert!(value.get("result").is_some());
652 assert_eq!(value["status"], "SUCCESS");
653 }
654
655 #[test]
656 fn test_exception_info() {
657 let exc =
658 ExceptionInfo::new("TypeError", "Expected int, got str").with_traceback("at line 42");
659
660 assert_eq!(exc.exc_type, "TypeError");
661 assert_eq!(exc.exc_message, "Expected int, got str");
662 assert_eq!(exc.traceback, Some("at line 42".to_string()));
663 }
664
665 #[test]
666 fn test_exception_info_default() {
667 let exc = ExceptionInfo::default();
668
669 assert_eq!(exc.exc_type, "");
670 assert_eq!(exc.exc_message, "");
671 assert_eq!(exc.traceback, None);
672
673 let exc_builder = ExceptionInfo::default().with_traceback("some traceback");
675
676 assert_eq!(exc_builder.traceback, Some("some traceback".to_string()));
677 }
678
679 #[test]
680 fn test_with_children() {
681 let task_id = Uuid::new_v4();
682 let children = vec![Uuid::new_v4(), Uuid::new_v4()];
683
684 let result = ResultMessage::success(task_id, json!(null)).with_children(children.clone());
685
686 assert_eq!(result.children, children);
687 }
688
689 #[test]
690 fn test_result_message_with_retries() {
691 let result = ResultMessage::new(Uuid::new_v4(), TaskStatus::Retry).with_retries(5);
692
693 assert_eq!(result.retries, Some(5));
694 assert_eq!(result.retry_count(), 5);
695 }
696
697 #[test]
698 fn test_result_message_retry_count_default() {
699 let result = ResultMessage::new(Uuid::new_v4(), TaskStatus::Success);
700
701 assert_eq!(result.retries, None);
702 assert_eq!(result.retry_count(), 0); }
704
705 #[test]
706 fn test_result_message_with_date_done() {
707 let now = chrono::Utc::now();
708 let result = ResultMessage::new(Uuid::new_v4(), TaskStatus::Success).with_date_done(now);
709
710 assert_eq!(result.date_done, Some(now));
711 }
712
713 #[test]
714 fn test_result_message_metadata() {
715 let mut result = ResultMessage::new(Uuid::new_v4(), TaskStatus::Success);
716
717 result.add_meta("key1", json!("value1"));
719 result.add_meta("key2", json!(42));
720
721 assert_eq!(result.meta_len(), 2);
722 assert!(result.has_meta("key1"));
723 assert!(result.has_meta("key2"));
724 assert!(!result.has_meta("key3"));
725
726 assert_eq!(result.get_meta("key1"), Some(&json!("value1")));
727 assert_eq!(result.get_meta("key2"), Some(&json!(42)));
728 assert_eq!(result.get_meta("key3"), None);
729 }
730
731 #[test]
732 fn test_result_message_with_meta_builder() {
733 let result = ResultMessage::new(Uuid::new_v4(), TaskStatus::Success)
734 .with_meta("version", json!("1.0.0"))
735 .with_meta("region", json!("us-west-2"));
736
737 assert_eq!(result.meta_len(), 2);
738 assert_eq!(result.get_meta("version"), Some(&json!("1.0.0")));
739 assert_eq!(result.get_meta("region"), Some(&json!("us-west-2")));
740 }
741
742 #[test]
743 fn test_result_message_builder_chaining() {
744 let task_id = Uuid::new_v4();
745 let parent_id = Uuid::new_v4();
746 let root_id = Uuid::new_v4();
747 let now = chrono::Utc::now();
748
749 let result = ResultMessage::success(task_id, json!({"data": 42}))
750 .with_task("my.task")
751 .with_worker("worker-1")
752 .with_parent(parent_id)
753 .with_root(root_id)
754 .with_retries(3)
755 .with_date_done(now)
756 .with_meta("source", json!("api"));
757
758 assert_eq!(result.task_id, task_id);
759 assert_eq!(result.status, TaskStatus::Success);
760 assert_eq!(result.task, Some("my.task".to_string()));
761 assert_eq!(result.worker, Some("worker-1".to_string()));
762 assert_eq!(result.parent_id, Some(parent_id));
763 assert_eq!(result.root_id, Some(root_id));
764 assert_eq!(result.retry_count(), 3);
765 assert_eq!(result.date_done, Some(now));
766 assert_eq!(result.get_meta("source"), Some(&json!("api")));
767 }
768}