1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::time::Duration;
5
6use serde::{Serialize, de::DeserializeOwned};
7
8use crate::error::Result;
9
10use super::context::JobContext;
11
12pub trait ForgeJob: Send + Sync + 'static {
14 type Args: DeserializeOwned + Serialize + Send + Sync;
16 type Output: Serialize + Send;
18
19 fn info() -> JobInfo;
21
22 fn execute(
24 ctx: &JobContext,
25 args: Self::Args,
26 ) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
27
28 fn compensate<'a>(
30 _ctx: &'a JobContext,
31 _args: Self::Args,
32 _reason: &'a str,
33 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
34 Box::pin(async { Ok(()) })
35 }
36}
37
38#[derive(Debug, Clone)]
40pub struct JobInfo {
41 pub name: &'static str,
43 pub timeout: Duration,
45 pub http_timeout: Option<Duration>,
47 pub priority: JobPriority,
49 pub retry: RetryConfig,
51 pub worker_capability: Option<&'static str>,
53 pub idempotent: bool,
55 pub idempotency_key: Option<&'static str>,
57 pub is_public: bool,
59 pub required_role: Option<&'static str>,
61 pub ttl: Option<Duration>,
64}
65
66impl Default for JobInfo {
67 fn default() -> Self {
68 Self {
69 name: "",
70 timeout: Duration::from_secs(3600), http_timeout: None,
72 priority: JobPriority::Normal,
73 retry: RetryConfig::default(),
74 worker_capability: None,
75 idempotent: false,
76 idempotency_key: None,
77 is_public: false,
78 required_role: None,
79 ttl: None,
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
86pub enum JobPriority {
87 Background = 0,
88 Low = 25,
89 #[default]
90 Normal = 50,
91 High = 75,
92 Critical = 100,
93}
94
95impl JobPriority {
96 pub fn as_i32(&self) -> i32 {
98 *self as i32
99 }
100
101 pub fn from_i32(value: i32) -> Self {
103 match value {
104 0..=12 => Self::Background,
105 13..=37 => Self::Low,
106 38..=62 => Self::Normal,
107 63..=87 => Self::High,
108 _ => Self::Critical,
109 }
110 }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct ParseJobPriorityError(pub String);
115
116impl std::fmt::Display for ParseJobPriorityError {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 write!(f, "invalid job priority: '{}'", self.0)
119 }
120}
121
122impl std::error::Error for ParseJobPriorityError {}
123
124impl FromStr for JobPriority {
125 type Err = ParseJobPriorityError;
126
127 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
128 match s.to_lowercase().as_str() {
129 "background" => Ok(Self::Background),
130 "low" => Ok(Self::Low),
131 "normal" => Ok(Self::Normal),
132 "high" => Ok(Self::High),
133 "critical" => Ok(Self::Critical),
134 _ => Err(ParseJobPriorityError(s.to_string())),
135 }
136 }
137}
138
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141pub enum JobStatus {
142 Pending,
144 Claimed,
146 Running,
148 Completed,
150 Retry,
152 Failed,
154 DeadLetter,
156 CancelRequested,
158 Cancelled,
160}
161
162impl JobStatus {
163 pub fn as_str(&self) -> &'static str {
165 match self {
166 Self::Pending => "pending",
167 Self::Claimed => "claimed",
168 Self::Running => "running",
169 Self::Completed => "completed",
170 Self::Retry => "retry",
171 Self::Failed => "failed",
172 Self::DeadLetter => "dead_letter",
173 Self::CancelRequested => "cancel_requested",
174 Self::Cancelled => "cancelled",
175 }
176 }
177}
178
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub struct ParseJobStatusError(pub String);
181
182impl std::fmt::Display for ParseJobStatusError {
183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 write!(f, "invalid job status: '{}'", self.0)
185 }
186}
187
188impl std::error::Error for ParseJobStatusError {}
189
190impl FromStr for JobStatus {
191 type Err = ParseJobStatusError;
192
193 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
194 match s {
195 "pending" => Ok(Self::Pending),
196 "claimed" => Ok(Self::Claimed),
197 "running" => Ok(Self::Running),
198 "completed" => Ok(Self::Completed),
199 "retry" => Ok(Self::Retry),
200 "failed" => Ok(Self::Failed),
201 "dead_letter" => Ok(Self::DeadLetter),
202 "cancel_requested" => Ok(Self::CancelRequested),
203 "cancelled" => Ok(Self::Cancelled),
204 _ => Err(ParseJobStatusError(s.to_string())),
205 }
206 }
207}
208
209#[derive(Debug, Clone)]
211pub struct RetryConfig {
212 pub max_attempts: u32,
214 pub backoff: BackoffStrategy,
216 pub max_backoff: Duration,
218 pub retry_on: Vec<String>,
220}
221
222impl Default for RetryConfig {
223 fn default() -> Self {
224 Self {
225 max_attempts: 3,
226 backoff: BackoffStrategy::Exponential,
227 max_backoff: Duration::from_secs(300), retry_on: Vec::new(), }
230 }
231}
232
233impl RetryConfig {
234 pub fn calculate_backoff(&self, attempt: u32) -> Duration {
236 let base = Duration::from_secs(1);
237 let backoff = match self.backoff {
238 BackoffStrategy::Fixed => base,
239 BackoffStrategy::Linear => base * attempt,
240 BackoffStrategy::Exponential => base * 2u32.pow(attempt.saturating_sub(1)),
241 };
242 backoff.min(self.max_backoff)
243 }
244}
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
248pub enum BackoffStrategy {
249 Fixed,
251 Linear,
253 #[default]
255 Exponential,
256}
257
258#[cfg(test)]
259#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
260mod tests {
261 use super::*;
262
263 #[test]
264 fn test_priority_ordering() {
265 assert!(JobPriority::Critical > JobPriority::High);
266 assert!(JobPriority::High > JobPriority::Normal);
267 assert!(JobPriority::Normal > JobPriority::Low);
268 assert!(JobPriority::Low > JobPriority::Background);
269 }
270
271 #[test]
272 fn test_priority_conversion() {
273 assert_eq!(JobPriority::Critical.as_i32(), 100);
274 assert_eq!(JobPriority::Normal.as_i32(), 50);
275 assert_eq!(JobPriority::from_i32(100), JobPriority::Critical);
276 assert_eq!(JobPriority::from_i32(50), JobPriority::Normal);
277 }
278
279 #[test]
280 fn test_status_conversion() {
281 assert_eq!(JobStatus::Pending.as_str(), "pending");
282 assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
283 assert_eq!(JobStatus::DeadLetter.as_str(), "dead_letter");
284 assert_eq!(
285 "dead_letter".parse::<JobStatus>(),
286 Ok(JobStatus::DeadLetter)
287 );
288 assert_eq!(JobStatus::CancelRequested.as_str(), "cancel_requested");
289 assert_eq!(
290 "cancel_requested".parse::<JobStatus>(),
291 Ok(JobStatus::CancelRequested)
292 );
293 assert_eq!(JobStatus::Cancelled.as_str(), "cancelled");
294 assert_eq!("cancelled".parse::<JobStatus>(), Ok(JobStatus::Cancelled));
295 }
296
297 #[test]
298 fn test_exponential_backoff() {
299 let config = RetryConfig::default();
300 assert_eq!(config.calculate_backoff(1), Duration::from_secs(1));
301 assert_eq!(config.calculate_backoff(2), Duration::from_secs(2));
302 assert_eq!(config.calculate_backoff(3), Duration::from_secs(4));
303 assert_eq!(config.calculate_backoff(4), Duration::from_secs(8));
304 }
305
306 #[test]
307 fn test_max_backoff_cap() {
308 let config = RetryConfig {
309 max_backoff: Duration::from_secs(10),
310 ..Default::default()
311 };
312 assert_eq!(config.calculate_backoff(10), Duration::from_secs(10));
313 }
314}