1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::{json, Value};
5use std::fmt;
6
7use crate::error::{BackendError, TaskError};
8use crate::task::ResultValue;
9
10mod redis;
11pub use redis::RedisBackend;
12
13#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "UPPERCASE")]
16pub enum TaskState {
17 Pending,
18 Started,
19 Retry,
20 Success,
21 Failure,
22}
23
24impl TaskState {
25 pub fn is_ready(self) -> bool {
26 matches!(self, TaskState::Success | TaskState::Failure)
27 }
28}
29
30impl fmt::Display for TaskState {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 let as_str = match self {
33 TaskState::Pending => "PENDING",
34 TaskState::Started => "STARTED",
35 TaskState::Retry => "RETRY",
36 TaskState::Success => "SUCCESS",
37 TaskState::Failure => "FAILURE",
38 };
39 write!(f, "{}", as_str)
40 }
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45pub struct TaskMeta {
46 pub task_id: String,
47 pub status: TaskState,
48 #[serde(default)]
49 pub result: Option<Value>,
50 #[serde(default)]
51 pub traceback: Option<String>,
52 #[serde(default)]
53 pub children: Vec<Value>,
54 #[serde(default)]
55 pub date_done: Option<DateTime<Utc>>,
56 #[serde(default)]
57 pub retries: Option<u32>,
58 #[serde(default)]
59 pub eta: Option<DateTime<Utc>>,
60 #[serde(default)]
61 pub meta: Option<Value>,
62}
63
64impl TaskMeta {
65 pub fn pending(task_id: &str) -> Self {
66 Self {
67 task_id: task_id.into(),
68 status: TaskState::Pending,
69 result: None,
70 traceback: None,
71 children: Vec::new(),
72 date_done: None,
73 retries: None,
74 eta: None,
75 meta: None,
76 }
77 }
78
79 pub fn started(task_id: &str) -> Self {
80 Self {
81 task_id: task_id.into(),
82 status: TaskState::Started,
83 result: None,
84 traceback: None,
85 children: Vec::new(),
86 date_done: None,
87 retries: None,
88 eta: None,
89 meta: None,
90 }
91 }
92
93 pub fn success<R>(task_id: &str, result: &R) -> Result<Self, serde_json::Error>
94 where
95 R: ResultValue,
96 {
97 Ok(Self {
98 task_id: task_id.into(),
99 status: TaskState::Success,
100 result: Some(result.to_json_value()?),
101 traceback: None,
102 children: Vec::new(),
103 date_done: Some(Utc::now()),
104 retries: None,
105 eta: None,
106 meta: None,
107 })
108 }
109
110 pub fn failure(task_id: &str, error: &TaskError) -> Self {
111 let (exc_type, exc_message) = match error {
112 TaskError::ExpectedError(msg) => ("ExpectedError", msg.clone()),
113 TaskError::UnexpectedError(msg) => ("UnexpectedError", msg.clone()),
114 TaskError::TimeoutError => ("TimeoutError", "task timed out".into()),
115 TaskError::Retry(_) => ("Retry", "task retry requested".into()),
116 };
117
118 Self {
119 task_id: task_id.into(),
120 status: TaskState::Failure,
121 result: Some(Value::String(error.to_string())),
122 traceback: None,
123 children: Vec::new(),
124 date_done: Some(Utc::now()),
125 retries: None,
126 eta: None,
127 meta: Some(json!({
128 "exc_type": exc_type,
129 "exc_message": exc_message,
130 })),
131 }
132 }
133
134 pub fn retry(
135 task_id: &str,
136 error: &TaskError,
137 eta: Option<DateTime<Utc>>,
138 retries: u32,
139 ) -> Self {
140 Self {
141 task_id: task_id.into(),
142 status: TaskState::Retry,
143 result: Some(Value::String(error.to_string())),
144 traceback: None,
145 children: Vec::new(),
146 date_done: Some(Utc::now()),
147 retries: Some(retries),
148 eta,
149 meta: None,
150 }
151 }
152}
153
154#[async_trait]
155pub trait ResultBackend: Send + Sync {
156 async fn store_task_meta(&self, meta: TaskMeta) -> Result<(), BackendError>;
157 async fn get_task_meta(&self, task_id: &str) -> Result<Option<TaskMeta>, BackendError>;
158 async fn forget(&self, task_id: &str) -> Result<(), BackendError>;
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164 use crate::error::TaskError;
165
166 #[test]
167 fn success_meta_contains_result() {
168 let meta = TaskMeta::success("abc", &42u32).unwrap();
169 assert_eq!(meta.status, TaskState::Success);
170 assert_eq!(meta.task_id, "abc");
171 assert_eq!(meta.result, Some(Value::from(42u32)));
172 assert!(meta.date_done.is_some());
173 }
174
175 #[test]
176 fn failure_meta_has_message() {
177 let err = TaskError::ExpectedError("boom".into());
178 let meta = TaskMeta::failure("xyz", &err);
179 assert_eq!(meta.status, TaskState::Failure);
180 assert_eq!(meta.result, Some(Value::String(err.to_string())));
181 assert!(meta.meta.is_some());
182 }
183}