#![allow(
clippy::missing_errors_doc,
clippy::missing_panics_doc,
clippy::missing_fields_in_debug
)]
#![allow(clippy::cast_precision_loss)]
use crate::state::TaskState;
use crate::TaskId;
use async_trait::async_trait;
use serde_json::Value;
use std::time::Duration;
#[async_trait]
pub trait ResultStore: Send + Sync {
async fn store_result(&self, task_id: TaskId, result: TaskResultValue) -> crate::Result<()>;
async fn get_result(&self, task_id: TaskId) -> crate::Result<Option<TaskResultValue>>;
async fn get_state(&self, task_id: TaskId) -> crate::Result<TaskState>;
async fn forget(&self, task_id: TaskId) -> crate::Result<()>;
async fn has_result(&self, task_id: TaskId) -> crate::Result<bool>;
}
#[derive(Debug, Clone)]
pub enum TaskResultValue {
Pending,
Received,
Started,
Success(Value),
Failure {
error: String,
traceback: Option<String>,
},
Revoked,
Retry { attempt: u32, max_retries: u32 },
Rejected { reason: String },
}
impl TaskResultValue {
#[inline]
#[must_use]
pub const fn is_terminal(&self) -> bool {
matches!(
self,
TaskResultValue::Success(_)
| TaskResultValue::Failure { .. }
| TaskResultValue::Revoked
| TaskResultValue::Rejected { .. }
)
}
#[inline]
#[must_use]
pub const fn is_pending(&self) -> bool {
matches!(self, TaskResultValue::Pending)
}
#[inline]
#[must_use]
pub const fn is_ready(&self) -> bool {
self.is_terminal()
}
#[inline]
#[must_use]
pub const fn is_successful(&self) -> bool {
matches!(self, TaskResultValue::Success(_))
}
#[inline]
#[must_use]
pub const fn is_failed(&self) -> bool {
matches!(
self,
TaskResultValue::Failure { .. } | TaskResultValue::Rejected { .. }
)
}
#[inline]
#[must_use]
pub fn success_value(&self) -> Option<&Value> {
match self {
TaskResultValue::Success(v) => Some(v),
_ => None,
}
}
#[inline]
#[must_use]
pub fn error_message(&self) -> Option<&str> {
match self {
TaskResultValue::Failure { error, .. } => Some(error),
TaskResultValue::Rejected { reason } => Some(reason),
_ => None,
}
}
#[inline]
#[must_use]
pub fn traceback(&self) -> Option<&str> {
match self {
TaskResultValue::Failure { traceback, .. } => traceback.as_deref(),
_ => None,
}
}
}
#[derive(Clone)]
pub struct AsyncResult<S: ResultStore> {
task_id: TaskId,
store: S,
parent: Option<Box<AsyncResult<S>>>,
children: Vec<AsyncResult<S>>,
}
impl<S: ResultStore + Clone> AsyncResult<S> {
pub fn new(task_id: TaskId, store: S) -> Self {
Self {
task_id,
store,
parent: None,
children: Vec::new(),
}
}
pub fn with_parent(task_id: TaskId, store: S, parent: AsyncResult<S>) -> Self {
Self {
task_id,
store,
parent: Some(Box::new(parent)),
children: Vec::new(),
}
}
pub fn with_children(task_id: TaskId, store: S, children: Vec<AsyncResult<S>>) -> Self {
Self {
task_id,
store,
parent: None,
children,
}
}
#[inline]
#[must_use]
pub fn task_id(&self) -> TaskId {
self.task_id
}
#[inline]
#[must_use]
pub fn parent(&self) -> Option<&AsyncResult<S>> {
self.parent.as_deref()
}
#[inline]
#[must_use]
pub fn children(&self) -> &[AsyncResult<S>] {
&self.children
}
pub fn add_child(&mut self, child: AsyncResult<S>) {
self.children.push(child);
}
pub async fn children_ready(&self) -> crate::Result<bool> {
for child in &self.children {
if !child.ready().await? {
return Ok(false);
}
}
Ok(true)
}
pub async fn collect_children(
&self,
timeout: Option<Duration>,
) -> crate::Result<Vec<Option<Value>>> {
let mut results = Vec::with_capacity(self.children.len());
for child in &self.children {
results.push(child.get(timeout).await?);
}
Ok(results)
}
pub async fn ready(&self) -> crate::Result<bool> {
let state = self.store.get_state(self.task_id).await?;
Ok(state.is_terminal())
}
pub async fn successful(&self) -> crate::Result<bool> {
match self.store.get_result(self.task_id).await? {
Some(result) => Ok(result.is_successful()),
None => Ok(false),
}
}
pub async fn failed(&self) -> crate::Result<bool> {
match self.store.get_result(self.task_id).await? {
Some(result) => Ok(result.is_failed()),
None => Ok(false),
}
}
pub async fn state(&self) -> crate::Result<TaskState> {
self.store.get_state(self.task_id).await
}
pub async fn info(&self) -> crate::Result<Option<TaskResultValue>> {
self.store.get_result(self.task_id).await
}
pub async fn get(&self, timeout: Option<Duration>) -> crate::Result<Option<Value>> {
let start = std::time::Instant::now();
let poll_interval = Duration::from_millis(100);
loop {
if let Some(timeout_duration) = timeout {
if start.elapsed() > timeout_duration {
return Err(crate::CelersError::Timeout(format!(
"Task {} did not complete within {:?}",
self.task_id, timeout_duration
)));
}
}
if let Some(result) = self.store.get_result(self.task_id).await? {
match result {
TaskResultValue::Success(value) => return Ok(Some(value)),
TaskResultValue::Failure { error, traceback } => {
let msg = if let Some(tb) = traceback {
format!("{error}\n{tb}")
} else {
error
};
return Err(crate::CelersError::TaskExecution(msg));
}
TaskResultValue::Revoked => {
return Err(crate::CelersError::TaskRevoked(self.task_id));
}
TaskResultValue::Rejected { reason } => {
return Err(crate::CelersError::TaskExecution(format!(
"Task rejected: {reason}"
)));
}
_ => {}
}
} else {
}
tokio::time::sleep(poll_interval).await;
}
}
pub async fn result(&self) -> crate::Result<Option<Value>> {
match self.store.get_result(self.task_id).await? {
Some(TaskResultValue::Success(value)) => Ok(Some(value)),
_ => Ok(None),
}
}
pub async fn traceback(&self) -> crate::Result<Option<String>> {
match self.store.get_result(self.task_id).await? {
Some(result) => Ok(result.traceback().map(String::from)),
None => Ok(None),
}
}
pub async fn revoke(&self) -> crate::Result<()> {
self.store
.store_result(self.task_id, TaskResultValue::Revoked)
.await
}
pub async fn forget(&self) -> crate::Result<()> {
self.store.forget(self.task_id).await
}
pub async fn wait(&self, timeout: Option<Duration>) -> crate::Result<Value> {
match self.get(timeout).await? {
Some(value) => Ok(value),
None => Err(crate::CelersError::TaskExecution(
"Task completed but returned no value".to_string(),
)),
}
}
}
impl<S: ResultStore + Clone> std::fmt::Debug for AsyncResult<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncResult")
.field("store", &"<ResultStore>")
.field("task_id", &self.task_id)
.field("has_parent", &self.parent.is_some())
.field("num_children", &self.children.len())
.finish()
}
}
impl<S: ResultStore + Clone> std::fmt::Display for AsyncResult<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "AsyncResult[{}]", &self.task_id.to_string()[..8])
}
}
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResultMetadata {
pub tags: Vec<String>,
pub custom_fields: HashMap<String, Value>,
pub created_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub expires_at: Option<DateTime<Utc>>,
pub compressed: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub compression_algorithm: Option<String>,
pub chunked: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_chunks: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub original_size: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub compressed_size: Option<usize>,
}
impl ResultMetadata {
#[must_use]
pub fn new() -> Self {
Self {
tags: Vec::new(),
custom_fields: HashMap::new(),
created_at: Utc::now(),
expires_at: None,
compressed: false,
compression_algorithm: None,
chunked: false,
total_chunks: None,
original_size: None,
compressed_size: None,
}
}
#[must_use]
pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
self.tags.push(tag.into());
self
}
#[must_use]
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags.extend(tags);
self
}
#[must_use]
pub fn with_field(mut self, key: impl Into<String>, value: Value) -> Self {
self.custom_fields.insert(key.into(), value);
self
}
#[must_use]
pub fn with_ttl(mut self, ttl: Duration) -> Self {
self.expires_at = Some(
Utc::now() + chrono::Duration::from_std(ttl).expect("TTL duration should be valid"),
);
self
}
#[must_use]
pub fn with_expires_at(mut self, expires_at: DateTime<Utc>) -> Self {
self.expires_at = Some(expires_at);
self
}
#[inline]
#[must_use]
pub fn is_expired(&self) -> bool {
self.expires_at.is_some_and(|exp| Utc::now() > exp)
}
#[inline]
#[must_use]
pub fn time_until_expiration(&self) -> Option<Duration> {
self.expires_at.and_then(|exp| {
let diff = exp - Utc::now();
diff.to_std().ok()
})
}
#[must_use]
pub fn with_compression(
mut self,
algorithm: impl Into<String>,
original_size: usize,
compressed_size: usize,
) -> Self {
self.compressed = true;
self.compression_algorithm = Some(algorithm.into());
self.original_size = Some(original_size);
self.compressed_size = Some(compressed_size);
self
}
#[must_use]
pub fn with_chunking(mut self, total_chunks: usize) -> Self {
self.chunked = true;
self.total_chunks = Some(total_chunks);
self
}
#[allow(clippy::cast_precision_loss)]
#[must_use]
pub fn compression_ratio(&self) -> Option<f64> {
if let (Some(orig), Some(comp)) = (self.original_size, self.compressed_size) {
if orig > 0 {
return Some(comp as f64 / orig as f64);
}
}
None
}
}
impl Default for ResultMetadata {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResultChunk {
pub index: usize,
pub total: usize,
pub data: Vec<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
pub checksum: Option<String>,
}
impl ResultChunk {
#[must_use]
pub fn new(index: usize, total: usize, data: Vec<u8>) -> Self {
Self {
index,
total,
data,
checksum: None,
}
}
#[must_use]
pub fn with_checksum(mut self, checksum: impl Into<String>) -> Self {
self.checksum = Some(checksum.into());
self
}
#[inline]
#[must_use]
pub const fn is_last(&self) -> bool {
self.index == self.total - 1
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResultTombstone {
pub task_id: TaskId,
pub deleted_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_by: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tombstone_ttl: Option<Duration>,
}
impl ResultTombstone {
#[must_use]
pub fn new(task_id: TaskId) -> Self {
Self {
task_id,
deleted_at: Utc::now(),
reason: None,
deleted_by: None,
tombstone_ttl: None,
}
}
#[must_use]
pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
self.reason = Some(reason.into());
self
}
#[must_use]
pub fn with_deleted_by(mut self, deleted_by: impl Into<String>) -> Self {
self.deleted_by = Some(deleted_by.into());
self
}
#[must_use]
pub fn with_ttl(mut self, ttl: Duration) -> Self {
self.tombstone_ttl = Some(ttl);
self
}
}
#[async_trait]
pub trait ExtendedResultStore: ResultStore {
async fn store_result_with_metadata(
&self,
task_id: TaskId,
result: TaskResultValue,
metadata: ResultMetadata,
) -> crate::Result<()>;
async fn get_metadata(&self, task_id: TaskId) -> crate::Result<Option<ResultMetadata>>;
async fn store_chunk(&self, task_id: TaskId, chunk: ResultChunk) -> crate::Result<()>;
async fn get_chunk(&self, task_id: TaskId, index: usize) -> crate::Result<Option<ResultChunk>>;
async fn get_all_chunks(&self, task_id: TaskId) -> crate::Result<Vec<ResultChunk>>;
async fn store_tombstone(&self, tombstone: ResultTombstone) -> crate::Result<()>;
async fn get_tombstone(&self, task_id: TaskId) -> crate::Result<Option<ResultTombstone>>;
async fn has_tombstone(&self, task_id: TaskId) -> crate::Result<bool> {
Ok(self.get_tombstone(task_id).await?.is_some())
}
async fn cleanup_expired(&self) -> crate::Result<usize>;
async fn query_by_tags(&self, tags: &[String]) -> crate::Result<Vec<TaskId>>;
}
pub struct ResultCompressor {
threshold_bytes: usize,
}
impl ResultCompressor {
#[must_use]
pub fn new(threshold_bytes: usize) -> Self {
Self { threshold_bytes }
}
#[must_use]
pub const fn should_compress(&self, data: &[u8]) -> bool {
data.len() >= self.threshold_bytes
}
pub fn compress(&self, _data: &[u8], _algorithm: &str) -> crate::Result<Vec<u8>> {
Err(crate::CelersError::Other(
"Compression not available - use backend-specific implementation".to_string(),
))
}
pub fn decompress(&self, _data: &[u8], _algorithm: &str) -> crate::Result<Vec<u8>> {
Err(crate::CelersError::Other(
"Decompression not available - use backend-specific implementation".to_string(),
))
}
}
impl Default for ResultCompressor {
fn default() -> Self {
Self::new(1024 * 1024) }
}
pub struct ResultChunker {
chunk_size: usize,
}
impl ResultChunker {
#[must_use]
pub fn new(chunk_size: usize) -> Self {
Self { chunk_size }
}
#[must_use]
pub fn chunk(&self, data: &[u8]) -> Vec<ResultChunk> {
let total = data.len().div_ceil(self.chunk_size);
data.chunks(self.chunk_size)
.enumerate()
.map(|(index, chunk)| ResultChunk::new(index, total, chunk.to_vec()))
.collect()
}
pub fn reassemble(&self, chunks: &[ResultChunk]) -> crate::Result<Vec<u8>> {
if chunks.is_empty() {
return Ok(Vec::new());
}
let total = chunks[0].total;
if chunks.len() != total {
return Err(crate::CelersError::Other(format!(
"Incomplete chunks: expected {}, got {}",
total,
chunks.len()
)));
}
let mut result = Vec::new();
for (i, chunk) in chunks.iter().enumerate() {
if chunk.index != i {
return Err(crate::CelersError::Other(format!(
"Chunk out of order: expected index {}, got {}",
i, chunk.index
)));
}
result.extend_from_slice(&chunk.data);
}
Ok(result)
}
}
impl Default for ResultChunker {
fn default() -> Self {
Self::new(256 * 1024) }
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use uuid::Uuid;
#[derive(Clone)]
struct MockBackend {
results: Arc<Mutex<HashMap<TaskId, TaskResultValue>>>,
states: Arc<Mutex<HashMap<TaskId, TaskState>>>,
}
impl MockBackend {
fn new() -> Self {
Self {
results: Arc::new(Mutex::new(HashMap::new())),
states: Arc::new(Mutex::new(HashMap::new())),
}
}
fn set_result(&self, task_id: TaskId, result: TaskResultValue, state: TaskState) {
self.results.lock().unwrap().insert(task_id, result);
self.states.lock().unwrap().insert(task_id, state);
}
}
#[async_trait]
impl ResultStore for MockBackend {
async fn store_result(
&self,
task_id: TaskId,
result: TaskResultValue,
) -> crate::Result<()> {
self.results.lock().unwrap().insert(task_id, result);
Ok(())
}
async fn get_result(&self, task_id: TaskId) -> crate::Result<Option<TaskResultValue>> {
Ok(self.results.lock().unwrap().get(&task_id).cloned())
}
async fn get_state(&self, task_id: TaskId) -> crate::Result<TaskState> {
Ok(self
.states
.lock()
.unwrap()
.get(&task_id)
.cloned()
.unwrap_or(TaskState::Pending))
}
async fn forget(&self, task_id: TaskId) -> crate::Result<()> {
self.results.lock().unwrap().remove(&task_id);
self.states.lock().unwrap().remove(&task_id);
Ok(())
}
async fn has_result(&self, task_id: TaskId) -> crate::Result<bool> {
Ok(self.results.lock().unwrap().contains_key(&task_id))
}
}
#[tokio::test]
async fn test_async_result_ready() {
let backend = MockBackend::new();
let task_id = Uuid::new_v4();
backend.set_result(
task_id,
TaskResultValue::Success(Value::String("test".to_string())),
TaskState::Succeeded(vec![]),
);
let result = AsyncResult::new(task_id, backend);
assert!(result.ready().await.unwrap());
}
#[tokio::test]
async fn test_async_result_successful() {
let backend = MockBackend::new();
let task_id = Uuid::new_v4();
backend.set_result(
task_id,
TaskResultValue::Success(Value::String("test".to_string())),
TaskState::Succeeded(vec![]),
);
let result = AsyncResult::new(task_id, backend);
assert!(result.successful().await.unwrap());
assert!(!result.failed().await.unwrap());
}
#[tokio::test]
async fn test_async_result_failed() {
let backend = MockBackend::new();
let task_id = Uuid::new_v4();
backend.set_result(
task_id,
TaskResultValue::Failure {
error: "Test error".to_string(),
traceback: None,
},
TaskState::Failed(String::from("Test error")),
);
let result = AsyncResult::new(task_id, backend);
assert!(result.failed().await.unwrap());
assert!(!result.successful().await.unwrap());
}
#[tokio::test]
async fn test_async_result_get_success() {
let backend = MockBackend::new();
let task_id = Uuid::new_v4();
backend.set_result(
task_id,
TaskResultValue::Success(Value::String("success".to_string())),
TaskState::Succeeded(vec![]),
);
let result = AsyncResult::new(task_id, backend);
let value = result.get(Some(Duration::from_secs(1))).await.unwrap();
assert_eq!(value, Some(Value::String("success".to_string())));
}
#[tokio::test]
async fn test_async_result_forget() {
let backend = MockBackend::new();
let task_id = Uuid::new_v4();
backend.set_result(
task_id,
TaskResultValue::Success(Value::String("test".to_string())),
TaskState::Succeeded(vec![]),
);
let result = AsyncResult::new(task_id, backend.clone());
assert!(backend.has_result(task_id).await.unwrap());
result.forget().await.unwrap();
assert!(!backend.has_result(task_id).await.unwrap());
}
#[tokio::test]
async fn test_async_result_children() {
let backend = MockBackend::new();
let parent_id = Uuid::new_v4();
backend.set_result(
parent_id,
TaskResultValue::Success(Value::String("parent".to_string())),
TaskState::Succeeded(vec![]),
);
let child1_id = Uuid::new_v4();
let child2_id = Uuid::new_v4();
backend.set_result(
child1_id,
TaskResultValue::Success(Value::Number(serde_json::Number::from(1))),
TaskState::Succeeded(vec![]),
);
backend.set_result(
child2_id,
TaskResultValue::Success(Value::Number(serde_json::Number::from(2))),
TaskState::Succeeded(vec![]),
);
let child1 = AsyncResult::new(child1_id, backend.clone());
let child2 = AsyncResult::new(child2_id, backend.clone());
let parent = AsyncResult::with_children(parent_id, backend, vec![child1, child2]);
assert_eq!(parent.children().len(), 2);
assert_eq!(parent.children()[0].task_id(), child1_id);
assert_eq!(parent.children()[1].task_id(), child2_id);
assert!(parent.children_ready().await.unwrap());
let results = parent
.collect_children(Some(Duration::from_secs(1)))
.await
.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0], Some(Value::Number(serde_json::Number::from(1))));
assert_eq!(results[1], Some(Value::Number(serde_json::Number::from(2))));
}
#[tokio::test]
async fn test_async_result_add_child() {
let backend = MockBackend::new();
let parent_id = Uuid::new_v4();
let child_id = Uuid::new_v4();
backend.set_result(
child_id,
TaskResultValue::Success(Value::String("child".to_string())),
TaskState::Succeeded(vec![]),
);
let mut parent = AsyncResult::new(parent_id, backend.clone());
assert_eq!(parent.children().len(), 0);
let child = AsyncResult::new(child_id, backend);
parent.add_child(child);
assert_eq!(parent.children().len(), 1);
assert_eq!(parent.children()[0].task_id(), child_id);
}
}