1use crate::{MysqlResultBackend, PostgresResultBackend, TaskMeta, TaskResult};
7use async_trait::async_trait;
8use celers_backend_redis::ResultBackend as LocalResultBackend;
9use celers_core::result::{ResultStore, TaskResultValue};
10use celers_core::{TaskId, TaskState};
11
12fn to_task_result(value: &TaskResultValue) -> TaskResult {
14 match value {
15 TaskResultValue::Pending => TaskResult::Pending,
16 TaskResultValue::Received => TaskResult::Pending,
17 TaskResultValue::Started => TaskResult::Started,
18 TaskResultValue::Success(v) => TaskResult::Success(v.clone()),
19 TaskResultValue::Failure { error, .. } => TaskResult::Failure(error.clone()),
20 TaskResultValue::Revoked => TaskResult::Revoked,
21 TaskResultValue::Retry { attempt, .. } => TaskResult::Retry(*attempt),
22 TaskResultValue::Rejected { reason } => TaskResult::Failure(reason.clone()),
23 }
24}
25
26fn from_task_result(result: &TaskResult) -> TaskResultValue {
28 match result {
29 TaskResult::Pending => TaskResultValue::Pending,
30 TaskResult::Started => TaskResultValue::Started,
31 TaskResult::Success(v) => TaskResultValue::Success(v.clone()),
32 TaskResult::Failure(msg) => TaskResultValue::Failure {
33 error: msg.clone(),
34 traceback: None,
35 },
36 TaskResult::Revoked => TaskResultValue::Revoked,
37 TaskResult::Retry(count) => TaskResultValue::Retry {
38 attempt: *count,
39 max_retries: 3,
40 },
41 }
42}
43
44fn task_result_to_state(result: &TaskResult) -> TaskState {
46 match result {
47 TaskResult::Pending => TaskState::Pending,
48 TaskResult::Started => TaskState::Running,
49 TaskResult::Success(v) => {
50 let bytes = serde_json::to_vec(v).unwrap_or_default();
51 TaskState::Succeeded(bytes)
52 }
53 TaskResult::Failure(msg) => TaskState::Failed(msg.clone()),
54 TaskResult::Revoked => TaskState::Failed("Task revoked".to_string()),
55 TaskResult::Retry(_) => TaskState::Running,
56 }
57}
58
59#[async_trait]
61impl ResultStore for PostgresResultBackend {
62 async fn store_result(
63 &self,
64 task_id: TaskId,
65 result: TaskResultValue,
66 ) -> celers_core::Result<()> {
67 let task_result = to_task_result(&result);
68 let mut meta = TaskMeta::new(task_id, String::new());
69 meta.result = task_result;
70
71 let mut backend = self.clone();
72 <PostgresResultBackend as LocalResultBackend>::store_result(&mut backend, task_id, &meta)
73 .await
74 .map_err(|e| celers_core::CelersError::Other(format!("Database error: {}", e)))
75 }
76
77 async fn get_result(&self, task_id: TaskId) -> celers_core::Result<Option<TaskResultValue>> {
78 let mut backend = self.clone();
79 match <PostgresResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await
80 {
81 Ok(Some(meta)) => Ok(Some(from_task_result(&meta.result))),
82 Ok(None) => Ok(None),
83 Err(e) => Err(celers_core::CelersError::Other(format!(
84 "Database error: {}",
85 e
86 ))),
87 }
88 }
89
90 async fn get_state(&self, task_id: TaskId) -> celers_core::Result<TaskState> {
91 let mut backend = self.clone();
92 match <PostgresResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await
93 {
94 Ok(Some(meta)) => Ok(task_result_to_state(&meta.result)),
95 Ok(None) => Ok(TaskState::Pending),
96 Err(e) => Err(celers_core::CelersError::Other(format!(
97 "Database error: {}",
98 e
99 ))),
100 }
101 }
102
103 async fn forget(&self, task_id: TaskId) -> celers_core::Result<()> {
104 let mut backend = self.clone();
105 <PostgresResultBackend as LocalResultBackend>::delete_result(&mut backend, task_id)
106 .await
107 .map_err(|e| celers_core::CelersError::Other(format!("Database error: {}", e)))
108 }
109
110 async fn has_result(&self, task_id: TaskId) -> celers_core::Result<bool> {
111 let mut backend = self.clone();
112 match <PostgresResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await
113 {
114 Ok(Some(_)) => Ok(true),
115 Ok(None) => Ok(false),
116 Err(e) => Err(celers_core::CelersError::Other(format!(
117 "Database error: {}",
118 e
119 ))),
120 }
121 }
122}
123
124#[async_trait]
126impl ResultStore for MysqlResultBackend {
127 async fn store_result(
128 &self,
129 task_id: TaskId,
130 result: TaskResultValue,
131 ) -> celers_core::Result<()> {
132 let task_result = to_task_result(&result);
133 let mut meta = TaskMeta::new(task_id, String::new());
134 meta.result = task_result;
135
136 let mut backend = self.clone();
137 <MysqlResultBackend as LocalResultBackend>::store_result(&mut backend, task_id, &meta)
138 .await
139 .map_err(|e| celers_core::CelersError::Other(format!("Database error: {}", e)))
140 }
141
142 async fn get_result(&self, task_id: TaskId) -> celers_core::Result<Option<TaskResultValue>> {
143 let mut backend = self.clone();
144 match <MysqlResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await {
145 Ok(Some(meta)) => Ok(Some(from_task_result(&meta.result))),
146 Ok(None) => Ok(None),
147 Err(e) => Err(celers_core::CelersError::Other(format!(
148 "Database error: {}",
149 e
150 ))),
151 }
152 }
153
154 async fn get_state(&self, task_id: TaskId) -> celers_core::Result<TaskState> {
155 let mut backend = self.clone();
156 match <MysqlResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await {
157 Ok(Some(meta)) => Ok(task_result_to_state(&meta.result)),
158 Ok(None) => Ok(TaskState::Pending),
159 Err(e) => Err(celers_core::CelersError::Other(format!(
160 "Database error: {}",
161 e
162 ))),
163 }
164 }
165
166 async fn forget(&self, task_id: TaskId) -> celers_core::Result<()> {
167 let mut backend = self.clone();
168 <MysqlResultBackend as LocalResultBackend>::delete_result(&mut backend, task_id)
169 .await
170 .map_err(|e| celers_core::CelersError::Other(format!("Database error: {}", e)))
171 }
172
173 async fn has_result(&self, task_id: TaskId) -> celers_core::Result<bool> {
174 let mut backend = self.clone();
175 match <MysqlResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await {
176 Ok(Some(_)) => Ok(true),
177 Ok(None) => Ok(false),
178 Err(e) => Err(celers_core::CelersError::Other(format!(
179 "Database error: {}",
180 e
181 ))),
182 }
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189 use serde_json::json;
190
191 #[test]
192 fn test_task_result_conversion() {
193 let value = TaskResultValue::Success(json!({"result": 42}));
194 let result = to_task_result(&value);
195 assert!(matches!(result, TaskResult::Success(_)));
196
197 let back = from_task_result(&result);
198 assert!(matches!(back, TaskResultValue::Success(_)));
199 }
200
201 #[test]
202 fn test_failure_conversion() {
203 let value = TaskResultValue::Failure {
204 error: "Test error".to_string(),
205 traceback: Some("Stack trace".to_string()),
206 };
207 let result = to_task_result(&value);
208 assert!(matches!(result, TaskResult::Failure(_)));
209 }
210
211 #[test]
212 fn test_state_conversion() {
213 let result = TaskResult::Success(json!({"value": 123}));
214 let state = task_result_to_state(&result);
215 assert!(matches!(state, TaskState::Succeeded(_)));
216
217 let result = TaskResult::Failure("error".to_string());
218 let state = task_result_to_state(&result);
219 assert!(matches!(state, TaskState::Failed(_)));
220 }
221}