use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum JobError {
#[error("任务执行失败: {0}")]
ExecutionFailed(String),
#[error("任务超时")]
Timeout,
#[error("任务被取消")]
Cancelled,
#[error("序列化错误: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Redis 错误: {0}")]
Redis(#[from] redis::RedisError),
#[error("其他错误: {0}")]
Other(#[from] anyhow::Error),
}
pub type JobResult<T> = Result<T, JobError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum JobStatus {
Waiting,
Active,
Completed,
Failed,
Delayed,
Paused,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobOptions {
#[serde(default)]
pub delay: Option<u64>,
#[serde(default = "default_retries")]
pub retries: u32,
#[serde(default = "default_timeout")]
pub timeout: u64,
#[serde(default)]
pub priority: i32,
#[serde(default)]
pub cron: Option<String>,
#[serde(default)]
pub job_id: Option<String>,
}
fn default_retries() -> u32 {
3
}
fn default_timeout() -> u64 {
60
}
impl Default for JobOptions {
fn default() -> Self {
Self {
delay: None,
retries: 3,
timeout: 60,
priority: 0,
cron: None,
job_id: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Job {
pub id: String,
pub queue: String,
pub name: String,
pub data: serde_json::Value,
pub status: JobStatus,
pub options: JobOptions,
pub created_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub finished_at: Option<DateTime<Utc>>,
pub attempts: u32,
pub error: Option<String>,
pub result: Option<serde_json::Value>,
}
impl Job {
pub fn new(queue: &str, name: &str, data: serde_json::Value, options: JobOptions) -> Self {
Self {
id: options
.job_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
queue: queue.to_string(),
name: name.to_string(),
data,
status: JobStatus::Waiting,
options,
created_at: Utc::now(),
started_at: None,
finished_at: None,
attempts: 0,
error: None,
result: None,
}
}
pub fn mark_active(&mut self) {
self.status = JobStatus::Active;
self.started_at = Some(Utc::now());
self.attempts += 1;
}
pub fn mark_completed(&mut self, result: Option<serde_json::Value>) {
self.status = JobStatus::Completed;
self.finished_at = Some(Utc::now());
self.result = result;
}
pub fn mark_failed(&mut self, error: &str) {
self.status = JobStatus::Failed;
self.finished_at = Some(Utc::now());
self.error = Some(error.to_string());
}
pub fn can_retry(&self) -> bool {
self.attempts < self.options.retries
}
pub fn duration_ms(&self) -> Option<i64> {
match (self.started_at, self.finished_at) {
(Some(start), Some(end)) => Some((end - start).num_milliseconds()),
_ => None,
}
}
}
#[async_trait]
pub trait JobHandler: Send + Sync {
async fn handle(&self) -> JobResult<serde_json::Value>;
fn name(&self) -> &'static str;
async fn on_failed(&self, _error: &JobError) {}
async fn on_completed(&self, _result: &serde_json::Value) {}
}
pub trait JobHandlerFactory: Send + Sync {
fn create(&self, data: serde_json::Value) -> Box<dyn JobHandler>;
fn name(&self) -> &'static str;
}