use crate::{RedisResultBackend, TaskMeta, TaskResult};
use async_trait::async_trait;
use celers_core::result::{ResultStore, TaskResultValue};
use celers_core::{TaskId, TaskState};
use crate::ResultBackend as LocalResultBackend;
fn to_task_result(value: &TaskResultValue) -> TaskResult {
match value {
TaskResultValue::Pending => TaskResult::Pending,
TaskResultValue::Received => TaskResult::Pending, TaskResultValue::Started => TaskResult::Started,
TaskResultValue::Success(v) => TaskResult::Success(v.clone()),
TaskResultValue::Failure { error, .. } => TaskResult::Failure(error.clone()),
TaskResultValue::Revoked => TaskResult::Revoked,
TaskResultValue::Retry { attempt, .. } => TaskResult::Retry(*attempt),
TaskResultValue::Rejected { reason } => TaskResult::Failure(reason.clone()),
}
}
fn from_task_result(result: &TaskResult) -> TaskResultValue {
match result {
TaskResult::Pending => TaskResultValue::Pending,
TaskResult::Started => TaskResultValue::Started,
TaskResult::Success(v) => TaskResultValue::Success(v.clone()),
TaskResult::Failure(msg) => TaskResultValue::Failure {
error: msg.clone(),
traceback: None,
},
TaskResult::Revoked => TaskResultValue::Revoked,
TaskResult::Retry(count) => TaskResultValue::Retry {
attempt: *count,
max_retries: 3, },
}
}
fn task_result_to_state(result: &TaskResult) -> TaskState {
match result {
TaskResult::Pending => TaskState::Pending,
TaskResult::Started => TaskState::Running,
TaskResult::Success(v) => {
let bytes = serde_json::to_vec(v).unwrap_or_default();
TaskState::Succeeded(bytes)
}
TaskResult::Failure(msg) => TaskState::Failed(msg.clone()),
TaskResult::Revoked => TaskState::Failed("Task revoked".to_string()),
TaskResult::Retry(_) => TaskState::Running, }
}
#[async_trait]
impl ResultStore for RedisResultBackend {
async fn store_result(
&self,
task_id: TaskId,
result: TaskResultValue,
) -> celers_core::Result<()> {
let task_result = to_task_result(&result);
let mut meta = TaskMeta::new(task_id, String::new()); meta.result = task_result;
let mut backend = self.clone();
<RedisResultBackend as LocalResultBackend>::store_result(&mut backend, task_id, &meta)
.await
.map_err(|e| celers_core::CelersError::Other(format!("Redis error: {}", e)))
}
async fn get_result(&self, task_id: TaskId) -> celers_core::Result<Option<TaskResultValue>> {
let mut backend = self.clone();
match <RedisResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await {
Ok(Some(meta)) => Ok(Some(from_task_result(&meta.result))),
Ok(None) => Ok(None),
Err(e) => Err(celers_core::CelersError::Other(format!(
"Redis error: {}",
e
))),
}
}
async fn get_state(&self, task_id: TaskId) -> celers_core::Result<TaskState> {
let mut backend = self.clone();
match <RedisResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await {
Ok(Some(meta)) => Ok(task_result_to_state(&meta.result)),
Ok(None) => Ok(TaskState::Pending),
Err(e) => Err(celers_core::CelersError::Other(format!(
"Redis error: {}",
e
))),
}
}
async fn forget(&self, task_id: TaskId) -> celers_core::Result<()> {
let mut backend = self.clone();
<RedisResultBackend as LocalResultBackend>::delete_result(&mut backend, task_id)
.await
.map_err(|e| celers_core::CelersError::Other(format!("Redis error: {}", e)))
}
async fn has_result(&self, task_id: TaskId) -> celers_core::Result<bool> {
let mut backend = self.clone();
match <RedisResultBackend as LocalResultBackend>::get_result(&mut backend, task_id).await {
Ok(Some(_)) => Ok(true),
Ok(None) => Ok(false),
Err(e) => Err(celers_core::CelersError::Other(format!(
"Redis error: {}",
e
))),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_task_result_conversion() {
let value = TaskResultValue::Success(json!({"result": 42}));
let result = to_task_result(&value);
assert!(matches!(result, TaskResult::Success(_)));
let back = from_task_result(&result);
assert!(matches!(back, TaskResultValue::Success(_)));
}
#[test]
fn test_failure_conversion() {
let value = TaskResultValue::Failure {
error: "Test error".to_string(),
traceback: Some("Stack trace".to_string()),
};
let result = to_task_result(&value);
assert!(matches!(result, TaskResult::Failure(_)));
if let TaskResult::Failure(msg) = result {
assert_eq!(msg, "Test error");
}
}
#[test]
fn test_state_conversion() {
let result = TaskResult::Success(json!({"value": 123}));
let state = task_result_to_state(&result);
assert!(matches!(state, TaskState::Succeeded(_)));
let result = TaskResult::Failure("error".to_string());
let state = task_result_to_state(&result);
assert!(matches!(state, TaskState::Failed(_)));
}
#[test]
fn test_retry_conversion() {
let value = TaskResultValue::Retry {
attempt: 2,
max_retries: 5,
};
let result = to_task_result(&value);
assert!(matches!(result, TaskResult::Retry(2)));
}
#[test]
fn test_revoked_conversion() {
let value = TaskResultValue::Revoked;
let result = to_task_result(&value);
assert!(matches!(result, TaskResult::Revoked));
let back = from_task_result(&result);
assert!(matches!(back, TaskResultValue::Revoked));
}
}