celery/backend/
mod.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::{json, Value};
5use std::fmt;
6
7use crate::error::{BackendError, TaskError};
8use crate::task::ResultValue;
9
10mod redis;
11pub use redis::RedisBackend;
12
13/// Task states that mirror the canonical Celery state machine.
14#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "UPPERCASE")]
16pub enum TaskState {
17    Pending,
18    Started,
19    Retry,
20    Success,
21    Failure,
22}
23
24impl TaskState {
25    pub fn is_ready(self) -> bool {
26        matches!(self, TaskState::Success | TaskState::Failure)
27    }
28}
29
30impl fmt::Display for TaskState {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        let as_str = match self {
33            TaskState::Pending => "PENDING",
34            TaskState::Started => "STARTED",
35            TaskState::Retry => "RETRY",
36            TaskState::Success => "SUCCESS",
37            TaskState::Failure => "FAILURE",
38        };
39        write!(f, "{}", as_str)
40    }
41}
42
43/// Metadata persisted in a result backend.
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45pub struct TaskMeta {
46    pub task_id: String,
47    pub status: TaskState,
48    #[serde(default)]
49    pub result: Option<Value>,
50    #[serde(default)]
51    pub traceback: Option<String>,
52    #[serde(default)]
53    pub children: Vec<Value>,
54    #[serde(default)]
55    pub date_done: Option<DateTime<Utc>>,
56    #[serde(default)]
57    pub retries: Option<u32>,
58    #[serde(default)]
59    pub eta: Option<DateTime<Utc>>,
60    #[serde(default)]
61    pub meta: Option<Value>,
62}
63
64impl TaskMeta {
65    pub fn pending(task_id: &str) -> Self {
66        Self {
67            task_id: task_id.into(),
68            status: TaskState::Pending,
69            result: None,
70            traceback: None,
71            children: Vec::new(),
72            date_done: None,
73            retries: None,
74            eta: None,
75            meta: None,
76        }
77    }
78
79    pub fn started(task_id: &str) -> Self {
80        Self {
81            task_id: task_id.into(),
82            status: TaskState::Started,
83            result: None,
84            traceback: None,
85            children: Vec::new(),
86            date_done: None,
87            retries: None,
88            eta: None,
89            meta: None,
90        }
91    }
92
93    pub fn success<R>(task_id: &str, result: &R) -> Result<Self, serde_json::Error>
94    where
95        R: ResultValue,
96    {
97        Ok(Self {
98            task_id: task_id.into(),
99            status: TaskState::Success,
100            result: Some(result.to_json_value()?),
101            traceback: None,
102            children: Vec::new(),
103            date_done: Some(Utc::now()),
104            retries: None,
105            eta: None,
106            meta: None,
107        })
108    }
109
110    pub fn failure(task_id: &str, error: &TaskError) -> Self {
111        let (exc_type, exc_message) = match error {
112            TaskError::ExpectedError(msg) => ("ExpectedError", msg.clone()),
113            TaskError::UnexpectedError(msg) => ("UnexpectedError", msg.clone()),
114            TaskError::TimeoutError => ("TimeoutError", "task timed out".into()),
115            TaskError::Retry(_) => ("Retry", "task retry requested".into()),
116        };
117
118        Self {
119            task_id: task_id.into(),
120            status: TaskState::Failure,
121            result: Some(Value::String(error.to_string())),
122            traceback: None,
123            children: Vec::new(),
124            date_done: Some(Utc::now()),
125            retries: None,
126            eta: None,
127            meta: Some(json!({
128                "exc_type": exc_type,
129                "exc_message": exc_message,
130            })),
131        }
132    }
133
134    pub fn retry(
135        task_id: &str,
136        error: &TaskError,
137        eta: Option<DateTime<Utc>>,
138        retries: u32,
139    ) -> Self {
140        Self {
141            task_id: task_id.into(),
142            status: TaskState::Retry,
143            result: Some(Value::String(error.to_string())),
144            traceback: None,
145            children: Vec::new(),
146            date_done: Some(Utc::now()),
147            retries: Some(retries),
148            eta,
149            meta: None,
150        }
151    }
152}
153
154#[async_trait]
155pub trait ResultBackend: Send + Sync {
156    async fn store_task_meta(&self, meta: TaskMeta) -> Result<(), BackendError>;
157    async fn get_task_meta(&self, task_id: &str) -> Result<Option<TaskMeta>, BackendError>;
158    async fn forget(&self, task_id: &str) -> Result<(), BackendError>;
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164    use crate::error::TaskError;
165
166    #[test]
167    fn success_meta_contains_result() {
168        let meta = TaskMeta::success("abc", &42u32).unwrap();
169        assert_eq!(meta.status, TaskState::Success);
170        assert_eq!(meta.task_id, "abc");
171        assert_eq!(meta.result, Some(Value::from(42u32)));
172        assert!(meta.date_done.is_some());
173    }
174
175    #[test]
176    fn failure_meta_has_message() {
177        let err = TaskError::ExpectedError("boom".into());
178        let meta = TaskMeta::failure("xyz", &err);
179        assert_eq!(meta.status, TaskState::Failure);
180        assert_eq!(meta.result, Some(Value::String(err.to_string())));
181        assert!(meta.meta.is_some());
182    }
183}