Skip to main content

celers_backend_db/
result_store.rs

1//! ResultStore implementation for Database backends
2//!
3//! This module provides adapters between the AsyncResult API (ResultStore trait)
4//! and the Database result backend implementations (PostgreSQL and MySQL).
5
6use crate::{MysqlResultBackend, PostgresResultBackend, TaskMeta, TaskResult};
7use async_trait::async_trait;
8use celers_backend_redis::ResultBackend as LocalResultBackend;
9use celers_core::result::{ResultStore, TaskResultValue};
10use celers_core::{TaskId, TaskState};
11
12/// Convert TaskResultValue to TaskResult (for storage)
13fn to_task_result(value: &TaskResultValue) -> TaskResult {
14    match value {
15        TaskResultValue::Pending => TaskResult::Pending,
16        TaskResultValue::Received => TaskResult::Pending,
17        TaskResultValue::Started => TaskResult::Started,
18        TaskResultValue::Success(v) => TaskResult::Success(v.clone()),
19        TaskResultValue::Failure { error, .. } => TaskResult::Failure(error.clone()),
20        TaskResultValue::Revoked => TaskResult::Revoked,
21        TaskResultValue::Retry { attempt, .. } => TaskResult::Retry(*attempt),
22        TaskResultValue::Rejected { reason } => TaskResult::Failure(reason.clone()),
23    }
24}
25
26/// Convert TaskResult to TaskResultValue (for retrieval)
27fn from_task_result(result: &TaskResult) -> TaskResultValue {
28    match result {
29        TaskResult::Pending => TaskResultValue::Pending,
30        TaskResult::Started => TaskResultValue::Started,
31        TaskResult::Success(v) => TaskResultValue::Success(v.clone()),
32        TaskResult::Failure(msg) => TaskResultValue::Failure {
33            error: msg.clone(),
34            traceback: None,
35        },
36        TaskResult::Revoked => TaskResultValue::Revoked,
37        TaskResult::Retry(count) => TaskResultValue::Retry {
38            attempt: *count,
39            max_retries: 3,
40        },
41    }
42}
43
44/// Convert TaskResult to TaskState
45fn task_result_to_state(result: &TaskResult) -> TaskState {
46    match result {
47        TaskResult::Pending => TaskState::Pending,
48        TaskResult::Started => TaskState::Running,
49        TaskResult::Success(v) => {
50            let bytes = serde_json::to_vec(v).unwrap_or_default();
51            TaskState::Succeeded(bytes)
52        }
53        TaskResult::Failure(msg) => TaskState::Failed(msg.clone()),
54        TaskResult::Revoked => TaskState::Failed("Task revoked".to_string()),
55        TaskResult::Retry(_) => TaskState::Running,
56    }
57}
58
59// PostgreSQL ResultStore implementation
60#[async_trait]
61impl ResultStore for PostgresResultBackend {
62    async fn store_result(
63        &self,
64        task_id: TaskId,
65        result: TaskResultValue,
66    ) -> celers_core::Result<()> {
67        let task_result = to_task_result(&result);
68        let mut meta = TaskMeta::new(task_id, String::new());
69        meta.result = task_result;
70
71        let mut backend = self.clone();
72        <PostgresResultBackend as LocalResultBackend>::store_result(&mut backend, task_id, &meta)
73            .await
74            .map_err(|e| celers_core::CelersError::Other(format!("Database error: {}", e)))
75    }
76
77    async fn get_result(&self, task_id: TaskId) -> celers_core::Result<Option<TaskResultValue>> {
78        let mut backend = self.clone();
79        match <PostgresResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await
80        {
81            Ok(Some(meta)) => Ok(Some(from_task_result(&meta.result))),
82            Ok(None) => Ok(None),
83            Err(e) => Err(celers_core::CelersError::Other(format!(
84                "Database error: {}",
85                e
86            ))),
87        }
88    }
89
90    async fn get_state(&self, task_id: TaskId) -> celers_core::Result<TaskState> {
91        let mut backend = self.clone();
92        match <PostgresResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await
93        {
94            Ok(Some(meta)) => Ok(task_result_to_state(&meta.result)),
95            Ok(None) => Ok(TaskState::Pending),
96            Err(e) => Err(celers_core::CelersError::Other(format!(
97                "Database error: {}",
98                e
99            ))),
100        }
101    }
102
103    async fn forget(&self, task_id: TaskId) -> celers_core::Result<()> {
104        let mut backend = self.clone();
105        <PostgresResultBackend as LocalResultBackend>::delete_result(&mut backend, task_id)
106            .await
107            .map_err(|e| celers_core::CelersError::Other(format!("Database error: {}", e)))
108    }
109
110    async fn has_result(&self, task_id: TaskId) -> celers_core::Result<bool> {
111        let mut backend = self.clone();
112        match <PostgresResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await
113        {
114            Ok(Some(_)) => Ok(true),
115            Ok(None) => Ok(false),
116            Err(e) => Err(celers_core::CelersError::Other(format!(
117                "Database error: {}",
118                e
119            ))),
120        }
121    }
122}
123
124// MySQL ResultStore implementation
125#[async_trait]
126impl ResultStore for MysqlResultBackend {
127    async fn store_result(
128        &self,
129        task_id: TaskId,
130        result: TaskResultValue,
131    ) -> celers_core::Result<()> {
132        let task_result = to_task_result(&result);
133        let mut meta = TaskMeta::new(task_id, String::new());
134        meta.result = task_result;
135
136        let mut backend = self.clone();
137        <MysqlResultBackend as LocalResultBackend>::store_result(&mut backend, task_id, &meta)
138            .await
139            .map_err(|e| celers_core::CelersError::Other(format!("Database error: {}", e)))
140    }
141
142    async fn get_result(&self, task_id: TaskId) -> celers_core::Result<Option<TaskResultValue>> {
143        let mut backend = self.clone();
144        match <MysqlResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await {
145            Ok(Some(meta)) => Ok(Some(from_task_result(&meta.result))),
146            Ok(None) => Ok(None),
147            Err(e) => Err(celers_core::CelersError::Other(format!(
148                "Database error: {}",
149                e
150            ))),
151        }
152    }
153
154    async fn get_state(&self, task_id: TaskId) -> celers_core::Result<TaskState> {
155        let mut backend = self.clone();
156        match <MysqlResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await {
157            Ok(Some(meta)) => Ok(task_result_to_state(&meta.result)),
158            Ok(None) => Ok(TaskState::Pending),
159            Err(e) => Err(celers_core::CelersError::Other(format!(
160                "Database error: {}",
161                e
162            ))),
163        }
164    }
165
166    async fn forget(&self, task_id: TaskId) -> celers_core::Result<()> {
167        let mut backend = self.clone();
168        <MysqlResultBackend as LocalResultBackend>::delete_result(&mut backend, task_id)
169            .await
170            .map_err(|e| celers_core::CelersError::Other(format!("Database error: {}", e)))
171    }
172
173    async fn has_result(&self, task_id: TaskId) -> celers_core::Result<bool> {
174        let mut backend = self.clone();
175        match <MysqlResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await {
176            Ok(Some(_)) => Ok(true),
177            Ok(None) => Ok(false),
178            Err(e) => Err(celers_core::CelersError::Other(format!(
179                "Database error: {}",
180                e
181            ))),
182        }
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189    use serde_json::json;
190
191    #[test]
192    fn test_task_result_conversion() {
193        let value = TaskResultValue::Success(json!({"result": 42}));
194        let result = to_task_result(&value);
195        assert!(matches!(result, TaskResult::Success(_)));
196
197        let back = from_task_result(&result);
198        assert!(matches!(back, TaskResultValue::Success(_)));
199    }
200
201    #[test]
202    fn test_failure_conversion() {
203        let value = TaskResultValue::Failure {
204            error: "Test error".to_string(),
205            traceback: Some("Stack trace".to_string()),
206        };
207        let result = to_task_result(&value);
208        assert!(matches!(result, TaskResult::Failure(_)));
209    }
210
211    #[test]
212    fn test_state_conversion() {
213        let result = TaskResult::Success(json!({"value": 123}));
214        let state = task_result_to_state(&result);
215        assert!(matches!(state, TaskState::Succeeded(_)));
216
217        let result = TaskResult::Failure("error".to_string());
218        let state = task_result_to_state(&result);
219        assert!(matches!(state, TaskState::Failed(_)));
220    }
221}