use crate::{Error, Result};
use crate::config;
use serde::{Deserialize, Serialize};
use std::time::Duration;
pub mod builder;
pub mod progress_ext;
pub use builder::TaskBuilder;
pub use progress_ext::TaskProgressExt;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum TaskStatus {
#[default]
Pending,
Active,
Processed,
Failed,
Retry,
Dead,
}
impl std::fmt::Display for TaskStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TaskStatus::Pending => write!(f, "pending"),
TaskStatus::Active => write!(f, "active"),
TaskStatus::Processed => write!(f, "processed"),
TaskStatus::Failed => write!(f, "failed"),
TaskStatus::Retry => write!(f, "retry"),
TaskStatus::Dead => write!(f, "dead"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskOptions {
pub max_retry: u32,
pub timeout: Duration,
pub delay: Option<Duration>,
pub cron: Option<String>,
pub unique_key: Option<String>,
pub priority: i32,
pub depends_on: Option<Vec<String>>,
pub group: Option<String>,
}
impl Default for TaskOptions {
fn default() -> Self {
Self {
max_retry: 3,
timeout: Duration::from_secs(30),
delay: None,
cron: None,
unique_key: None,
priority: 50,
depends_on: None,
group: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Task {
pub id: String,
pub task_type: String,
pub queue: String,
pub payload: Vec<u8>,
pub options: TaskOptions,
pub status: TaskStatus,
pub created_at: i64,
pub enqueued_at: Option<i64>,
pub processed_at: Option<i64>,
pub retry_cnt: u32,
pub last_error: Option<String>,
}
impl Task {
#[must_use]
pub fn builder(task_type: impl Into<String>) -> TaskBuilder {
TaskBuilder::new(task_type)
}
pub fn validate(&self) -> Result<()> {
if self.task_type.is_empty() {
return Err(Error::Validation("task_type cannot be empty".into()));
}
if self.queue.is_empty() {
return Err(Error::Validation("queue cannot be empty".into()));
}
if self.payload.is_empty() {
return Err(Error::Validation("payload cannot be empty".into()));
}
let max_payload_size = config::get_max_payload_size();
if self.payload.len() > max_payload_size {
return Err(Error::Validation(format!(
"payload exceeds {}KB limit (got {}B)",
max_payload_size / 1024,
self.payload.len()
)));
}
let priority_range = config::get_priority_range();
if self.options.priority < priority_range.0 || self.options.priority > priority_range.1 {
return Err(Error::Validation(format!(
"priority must be between {} and {}, got {}",
priority_range.0, priority_range.1, self.options.priority
)));
}
if self.options.timeout.is_zero() {
return Err(Error::Validation("timeout must be greater than 0".into()));
}
if let Some(delay) = self.options.delay {
const MAX_DELAY_SECS: u64 = 365 * 24 * 60 * 60; if delay.as_secs() > MAX_DELAY_SECS {
return Err(Error::Validation(format!(
"delay cannot exceed {} seconds (1 year), got {} seconds",
MAX_DELAY_SECS,
delay.as_secs()
)));
}
}
if let Some(cron) = &self.options.cron {
if !cron.contains(' ') && cron != "@always" {
return Err(Error::Validation(format!(
"invalid cron expression: {}",
cron
)));
}
}
Ok(())
}
pub fn description(&self) -> String {
format!(
"Task[type={}, queue={}, id={}, status={}]",
self.task_type, self.queue, self.id, self.status
)
}
pub fn can_retry(&self) -> bool {
self.retry_cnt < self.options.max_retry
}
pub fn retry_delay(&self) -> Option<Duration> {
if !self.can_retry() {
return None;
}
let delay_secs = 2u64.pow(self.retry_cnt.saturating_add(1).min(6));
Some(Duration::from_secs(delay_secs))
}
pub fn payload_json<'a, T>(&'a self) -> Result<T>
where
T: Deserialize<'a>,
{
serde_json::from_slice(&self.payload)
.map_err(|e| Error::Serialization(format!("Failed to deserialize JSON: {}", e)))
}
pub fn payload_msgpack<'a, T>(&'a self) -> Result<T>
where
T: Deserialize<'a>,
{
rmp_serde::from_slice(&self.payload)
.map_err(|e| Error::Serialization(format!("Failed to deserialize MessagePack: {}", e)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use uuid::Uuid;
#[test]
fn test_task_validation() {
let task = Task {
id: Uuid::new_v4().to_string(),
task_type: "test".to_string(),
queue: "default".to_string(),
payload: vec![1, 2, 3],
options: TaskOptions::default(),
status: TaskStatus::Pending,
created_at: Utc::now().timestamp(),
enqueued_at: None,
processed_at: None,
retry_cnt: 0,
last_error: None,
};
assert!(task.validate().is_ok());
}
#[test]
fn test_task_validation_empty_type() {
let task = Task {
id: Uuid::new_v4().to_string(),
task_type: "".to_string(),
queue: "default".to_string(),
payload: vec![1, 2, 3],
options: TaskOptions::default(),
status: TaskStatus::Pending,
created_at: Utc::now().timestamp(),
enqueued_at: None,
processed_at: None,
retry_cnt: 0,
last_error: None,
};
assert!(task.validate().is_err());
}
#[test]
fn test_task_validation_large_payload() {
let task = Task {
id: Uuid::new_v4().to_string(),
task_type: "test".to_string(),
queue: "default".to_string(),
payload: vec![0u8; 600 * 1024], options: TaskOptions::default(),
status: TaskStatus::Pending,
created_at: Utc::now().timestamp(),
enqueued_at: None,
processed_at: None,
retry_cnt: 0,
last_error: None,
};
assert!(task.validate().is_err());
}
#[test]
fn test_retry_delay() {
let task = Task {
id: Uuid::new_v4().to_string(),
task_type: "test".to_string(),
queue: "default".to_string(),
payload: vec![1, 2, 3],
options: TaskOptions {
max_retry: 5,
..Default::default()
},
status: TaskStatus::Pending,
created_at: Utc::now().timestamp(),
enqueued_at: None,
processed_at: None,
retry_cnt: 0,
last_error: None,
};
assert_eq!(task.retry_delay(), Some(Duration::from_secs(2)));
let task = Task {
retry_cnt: 1,
..task
};
assert_eq!(task.retry_delay(), Some(Duration::from_secs(4)));
let task = Task {
retry_cnt: 2,
..task
};
assert_eq!(task.retry_delay(), Some(Duration::from_secs(8)));
}
#[test]
fn test_can_retry() {
let task = Task {
id: Uuid::new_v4().to_string(),
task_type: "test".to_string(),
queue: "default".to_string(),
payload: vec![1, 2, 3],
options: TaskOptions {
max_retry: 3,
..Default::default()
},
status: TaskStatus::Pending,
created_at: Utc::now().timestamp(),
enqueued_at: None,
processed_at: None,
retry_cnt: 0,
last_error: None,
};
assert!(task.can_retry());
let task = Task {
retry_cnt: 3,
..task
};
assert!(!task.can_retry());
}
}