1use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use thiserror::Error;
7
8#[derive(Error, Debug)]
10pub enum JobError {
11 #[error("任务执行失败: {0}")]
12 ExecutionFailed(String),
13 #[error("任务超时")]
14 Timeout,
15 #[error("任务被取消")]
16 Cancelled,
17 #[error("序列化错误: {0}")]
18 Serialization(#[from] serde_json::Error),
19 #[error("Redis 错误: {0}")]
20 Redis(#[from] redis::RedisError),
21 #[error("其他错误: {0}")]
22 Other(#[from] anyhow::Error),
23}
24
25pub type JobResult<T> = Result<T, JobError>;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
30#[serde(rename_all = "lowercase")]
31pub enum JobStatus {
32 Waiting,
34 Active,
36 Completed,
38 Failed,
40 Delayed,
42 Paused,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct JobOptions {
49 #[serde(default)]
51 pub delay: Option<u64>,
52 #[serde(default = "default_retries")]
54 pub retries: u32,
55 #[serde(default = "default_timeout")]
57 pub timeout: u64,
58 #[serde(default)]
60 pub priority: i32,
61 #[serde(default)]
63 pub cron: Option<String>,
64 #[serde(default)]
66 pub job_id: Option<String>,
67}
68
69fn default_retries() -> u32 {
70 3
71}
72
73fn default_timeout() -> u64 {
74 60
75}
76
77impl Default for JobOptions {
78 fn default() -> Self {
79 Self {
80 delay: None,
81 retries: 3,
82 timeout: 60,
83 priority: 0,
84 cron: None,
85 job_id: None,
86 }
87 }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct Job {
93 pub id: String,
95 pub queue: String,
97 pub name: String,
99 pub data: serde_json::Value,
101 pub status: JobStatus,
103 pub options: JobOptions,
105 pub created_at: DateTime<Utc>,
107 pub started_at: Option<DateTime<Utc>>,
109 pub finished_at: Option<DateTime<Utc>>,
111 pub attempts: u32,
113 pub error: Option<String>,
115 pub result: Option<serde_json::Value>,
117}
118
119impl Job {
120 pub fn new(queue: &str, name: &str, data: serde_json::Value, options: JobOptions) -> Self {
122 Self {
123 id: options
124 .job_id
125 .clone()
126 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
127 queue: queue.to_string(),
128 name: name.to_string(),
129 data,
130 status: JobStatus::Waiting,
131 options,
132 created_at: Utc::now(),
133 started_at: None,
134 finished_at: None,
135 attempts: 0,
136 error: None,
137 result: None,
138 }
139 }
140
141 pub fn mark_active(&mut self) {
143 self.status = JobStatus::Active;
144 self.started_at = Some(Utc::now());
145 self.attempts += 1;
146 }
147
148 pub fn mark_completed(&mut self, result: Option<serde_json::Value>) {
150 self.status = JobStatus::Completed;
151 self.finished_at = Some(Utc::now());
152 self.result = result;
153 }
154
155 pub fn mark_failed(&mut self, error: &str) {
157 self.status = JobStatus::Failed;
158 self.finished_at = Some(Utc::now());
159 self.error = Some(error.to_string());
160 }
161
162 pub fn can_retry(&self) -> bool {
164 self.attempts < self.options.retries
165 }
166
167 pub fn duration_ms(&self) -> Option<i64> {
169 match (self.started_at, self.finished_at) {
170 (Some(start), Some(end)) => Some((end - start).num_milliseconds()),
171 _ => None,
172 }
173 }
174}
175
176#[async_trait]
178pub trait JobHandler: Send + Sync {
179 async fn handle(&self) -> JobResult<serde_json::Value>;
181
182 fn name(&self) -> &'static str;
184
185 async fn on_failed(&self, _error: &JobError) {}
187
188 async fn on_completed(&self, _result: &serde_json::Value) {}
190}
191
192pub trait JobHandlerFactory: Send + Sync {
194 fn create(&self, data: serde_json::Value) -> Box<dyn JobHandler>;
196
197 fn name(&self) -> &'static str;
199}