use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
use thiserror::Error;
use uuid::Uuid;
#[derive(Debug, Error)]
pub enum BackendError {
#[error("Redis error: {0}")]
Redis(#[from] redis::RedisError),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Result not found: {0}")]
NotFound(Uuid),
#[error("Connection error: {0}")]
Connection(String),
}
impl BackendError {
pub fn is_redis(&self) -> bool {
matches!(self, BackendError::Redis(_))
}
pub fn is_serialization(&self) -> bool {
matches!(self, BackendError::Serialization(_))
}
pub fn is_not_found(&self) -> bool {
matches!(self, BackendError::NotFound(_))
}
pub fn is_connection(&self) -> bool {
matches!(self, BackendError::Connection(_))
}
pub fn is_retryable(&self) -> bool {
matches!(self, BackendError::Redis(_) | BackendError::Connection(_))
}
pub fn category(&self) -> &'static str {
match self {
BackendError::Redis(_) => "redis",
BackendError::Serialization(_) => "serialization",
BackendError::NotFound(_) => "not_found",
BackendError::Connection(_) => "connection",
}
}
}
pub type Result<T> = std::result::Result<T, BackendError>;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum TaskResult {
Pending,
Started,
Success(serde_json::Value),
Failure(String),
Revoked,
Retry(u32),
}
impl TaskResult {
pub fn is_pending(&self) -> bool {
matches!(self, TaskResult::Pending)
}
pub fn is_started(&self) -> bool {
matches!(self, TaskResult::Started)
}
pub fn is_success(&self) -> bool {
matches!(self, TaskResult::Success(_))
}
pub fn is_failure(&self) -> bool {
matches!(self, TaskResult::Failure(_))
}
pub fn is_revoked(&self) -> bool {
matches!(self, TaskResult::Revoked)
}
pub fn is_retry(&self) -> bool {
matches!(self, TaskResult::Retry(_))
}
pub fn is_terminal(&self) -> bool {
matches!(
self,
TaskResult::Success(_) | TaskResult::Failure(_) | TaskResult::Revoked
)
}
pub fn is_active(&self) -> bool {
!self.is_terminal()
}
pub fn same_variant(&self, other: &TaskResult) -> bool {
matches!(
(self, other),
(TaskResult::Pending, TaskResult::Pending)
| (TaskResult::Started, TaskResult::Started)
| (TaskResult::Success(_), TaskResult::Success(_))
| (TaskResult::Failure(_), TaskResult::Failure(_))
| (TaskResult::Revoked, TaskResult::Revoked)
| (TaskResult::Retry(_), TaskResult::Retry(_))
)
}
pub fn success_value(&self) -> Option<&serde_json::Value> {
match self {
TaskResult::Success(value) => Some(value),
_ => None,
}
}
pub fn failure_message(&self) -> Option<&str> {
match self {
TaskResult::Failure(msg) => Some(msg),
_ => None,
}
}
pub fn retry_count(&self) -> Option<u32> {
match self {
TaskResult::Retry(count) => Some(*count),
_ => None,
}
}
}
impl std::fmt::Display for TaskResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TaskResult::Pending => write!(f, "PENDING"),
TaskResult::Started => write!(f, "STARTED"),
TaskResult::Success(_) => write!(f, "SUCCESS"),
TaskResult::Failure(err) => write!(f, "FAILURE: {}", err),
TaskResult::Revoked => write!(f, "REVOKED"),
TaskResult::Retry(count) => write!(f, "RETRY({})", count),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProgressInfo {
pub current: u64,
pub total: u64,
pub message: Option<String>,
pub percent: f64,
pub updated_at: DateTime<Utc>,
}
impl ProgressInfo {
pub fn new(current: u64, total: u64) -> Self {
let percent = if total > 0 {
(current as f64 / total as f64 * 100.0).min(100.0)
} else {
0.0
};
Self {
current,
total,
message: None,
percent,
updated_at: Utc::now(),
}
}
pub fn with_message(mut self, message: String) -> Self {
self.message = Some(message);
self
}
pub fn is_complete(&self) -> bool {
self.percent >= 100.0
}
pub fn has_message(&self) -> bool {
self.message.is_some()
}
pub fn remaining(&self) -> u64 {
self.total.saturating_sub(self.current)
}
pub fn fraction(&self) -> f64 {
self.percent / 100.0
}
}
impl std::fmt::Display for ProgressInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{} ({:.1}%)", self.current, self.total, self.percent)?;
if let Some(ref msg) = self.message {
write!(f, " - {}", msg)?;
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskMeta {
pub task_id: Uuid,
pub task_name: String,
pub result: TaskResult,
pub created_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub worker: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub progress: Option<ProgressInfo>,
#[serde(default)]
pub version: u32,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub tags: Vec<String>,
#[serde(default, skip_serializing_if = "std::collections::HashMap::is_empty")]
pub metadata: std::collections::HashMap<String, serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub worker_hostname: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub runtime_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub memory_bytes: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub retries: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub queue: Option<String>,
}
impl TaskMeta {
pub fn new(task_id: Uuid, task_name: String) -> Self {
Self {
task_id,
task_name,
result: TaskResult::Pending,
created_at: Utc::now(),
started_at: None,
completed_at: None,
worker: None,
progress: None,
version: 0,
tags: Vec::new(),
metadata: std::collections::HashMap::new(),
worker_hostname: None,
runtime_ms: None,
memory_bytes: None,
retries: None,
queue: None,
}
}
pub fn has_started(&self) -> bool {
self.started_at.is_some()
}
pub fn has_completed(&self) -> bool {
self.completed_at.is_some()
}
pub fn has_progress(&self) -> bool {
self.progress.is_some()
}
pub fn duration(&self) -> Option<chrono::Duration> {
match (self.started_at, self.completed_at) {
(Some(start), Some(end)) => Some(end - start),
_ => None,
}
}
pub fn age(&self) -> chrono::Duration {
Utc::now() - self.created_at
}
pub fn execution_time(&self) -> Option<chrono::Duration> {
self.started_at.map(|start| Utc::now() - start)
}
pub fn is_terminal(&self) -> bool {
self.result.is_terminal()
}
pub fn is_active(&self) -> bool {
self.result.is_active()
}
pub fn add_tag(&mut self, tag: impl Into<String>) {
let tag = tag.into();
if !self.tags.contains(&tag) {
self.tags.push(tag);
}
}
pub fn remove_tag(&mut self, tag: &str) {
self.tags.retain(|t| t != tag);
}
pub fn has_tag(&self, tag: &str) -> bool {
self.tags.iter().any(|t| t == tag)
}
pub fn has_any_tag(&self, tags: &[String]) -> bool {
tags.iter().any(|tag| self.has_tag(tag))
}
pub fn has_all_tags(&self, tags: &[String]) -> bool {
tags.iter().all(|tag| self.has_tag(tag))
}
pub fn set_metadata(&mut self, key: impl Into<String>, value: serde_json::Value) {
self.metadata.insert(key.into(), value);
}
pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
self.metadata.get(key)
}
pub fn remove_metadata(&mut self, key: &str) -> Option<serde_json::Value> {
self.metadata.remove(key)
}
pub fn has_metadata(&self, key: &str) -> bool {
self.metadata.contains_key(key)
}
}
impl std::fmt::Display for TaskMeta {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Task[{}] name={} result={}",
&self.task_id.to_string()[..8],
self.task_name,
self.result
)?;
if let Some(worker) = &self.worker {
write!(f, " worker={}", worker)?;
}
if let Some(progress) = &self.progress {
write!(f, " progress={}", progress)?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct TaskTtlConfig {
default_ttl: Option<Duration>,
task_ttls: HashMap<String, Duration>,
}
impl Default for TaskTtlConfig {
fn default() -> Self {
Self::new()
}
}
impl TaskTtlConfig {
pub fn new() -> Self {
Self {
default_ttl: None,
task_ttls: HashMap::new(),
}
}
pub fn with_default(ttl: Duration) -> Self {
Self {
default_ttl: Some(ttl),
task_ttls: HashMap::new(),
}
}
pub fn set_task_ttl(&mut self, task_name: &str, ttl: Duration) {
self.task_ttls.insert(task_name.to_string(), ttl);
}
pub fn get_ttl(&self, task_name: &str) -> Option<Duration> {
self.task_ttls.get(task_name).copied().or(self.default_ttl)
}
pub fn is_empty(&self) -> bool {
self.default_ttl.is_none() && self.task_ttls.is_empty()
}
pub fn default_ttl(&self) -> Option<Duration> {
self.default_ttl
}
pub fn set_default_ttl(&mut self, ttl: Duration) {
self.default_ttl = Some(ttl);
}
pub fn remove_task_ttl(&mut self, task_name: &str) -> Option<Duration> {
self.task_ttls.remove(task_name)
}
pub fn task_ttl_count(&self) -> usize {
self.task_ttls.len()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChordState {
pub chord_id: Uuid,
pub total: usize,
pub completed: usize,
pub callback: Option<String>,
pub task_ids: Vec<Uuid>,
pub created_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout: Option<Duration>,
#[serde(default)]
pub cancelled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub cancellation_reason: Option<String>,
#[serde(default)]
pub retry_count: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_retries: Option<u32>,
}
impl ChordState {
pub fn new(chord_id: Uuid, total: usize, task_ids: Vec<Uuid>) -> Self {
Self {
chord_id,
total,
completed: 0,
callback: None,
task_ids,
created_at: Utc::now(),
timeout: None,
cancelled: false,
cancellation_reason: None,
retry_count: 0,
max_retries: None,
}
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn with_callback(mut self, callback: String) -> Self {
self.callback = Some(callback);
self
}
pub fn is_complete(&self) -> bool {
self.completed >= self.total && !self.cancelled
}
pub fn is_cancelled(&self) -> bool {
self.cancelled
}
pub fn cancel(&mut self, reason: Option<String>) {
self.cancelled = true;
self.cancellation_reason = reason;
}
pub fn is_terminal(&self) -> bool {
self.is_complete() || self.is_cancelled() || self.is_timed_out()
}
pub fn is_timed_out(&self) -> bool {
if let Some(timeout) = self.timeout {
let age = Utc::now() - self.created_at;
age.num_milliseconds() > timeout.as_millis() as i64
} else {
false
}
}
pub fn remaining_timeout(&self) -> Option<Duration> {
self.timeout.and_then(|timeout| {
let age = Utc::now() - self.created_at;
let age_ms = age.num_milliseconds().max(0) as u64;
let timeout_ms = timeout.as_millis() as u64;
if age_ms < timeout_ms {
Some(Duration::from_millis(timeout_ms - age_ms))
} else {
None
}
})
}
pub fn remaining(&self) -> usize {
self.total.saturating_sub(self.completed)
}
pub fn percent_complete(&self) -> f64 {
if self.total > 0 {
(self.completed as f64 / self.total as f64 * 100.0).min(100.0)
} else {
0.0
}
}
pub fn has_callback(&self) -> bool {
self.callback.is_some()
}
pub fn has_timeout(&self) -> bool {
self.timeout.is_some()
}
pub fn task_count(&self) -> usize {
self.task_ids.len()
}
pub fn age(&self) -> chrono::Duration {
Utc::now() - self.created_at
}
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = Some(max_retries);
self
}
pub fn can_retry(&self) -> bool {
if let Some(max_retries) = self.max_retries {
self.retry_count < max_retries
} else {
false
}
}
pub fn retry(&mut self) -> bool {
if !self.can_retry() {
return false;
}
self.retry_count += 1;
self.completed = 0;
self.cancelled = false;
self.cancellation_reason = None;
self.created_at = Utc::now();
true
}
pub fn remaining_retries(&self) -> Option<u32> {
self.max_retries
.map(|max| max.saturating_sub(self.retry_count))
}
pub fn is_retry(&self) -> bool {
self.retry_count > 0
}
}
impl std::fmt::Display for ChordState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Chord[{}] {}/{} tasks ({:.1}%)",
&self.chord_id.to_string()[..8],
self.completed,
self.total,
self.percent_complete()
)?;
if let Some(ref callback) = self.callback {
write!(f, " callback={}", callback)?;
}
if self.is_cancelled() {
write!(f, " [CANCELLED")?;
if let Some(ref reason) = self.cancellation_reason {
write!(f, ": {}", reason)?;
}
write!(f, "]")?;
} else if let Some(timeout) = self.timeout {
if self.is_timed_out() {
write!(f, " [TIMED OUT]")?;
} else if let Some(remaining) = self.remaining_timeout() {
write!(f, " timeout={:?} remaining={:?}", timeout, remaining)?;
}
}
Ok(())
}
}