1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::time::Duration;
5
6use serde::{Serialize, de::DeserializeOwned};
7
8use crate::error::Result;
9
10use super::context::JobContext;
11
12pub trait ForgeJob: crate::__sealed::Sealed + Send + Sync + 'static {
14 type Args: DeserializeOwned + Serialize + Send + Sync;
15 type Output: Serialize + Send;
16
17 fn info() -> JobInfo;
18
19 fn execute(
20 ctx: &JobContext,
21 args: Self::Args,
22 ) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
23
24 fn compensate<'a>(
25 _ctx: &'a JobContext,
26 _args: Self::Args,
27 _reason: &'a str,
28 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
29 Box::pin(async { Ok(()) })
30 }
31}
32
33#[derive(Debug, Clone)]
35pub struct JobInfo {
36 pub name: &'static str,
37 pub description: Option<&'static str>,
38 pub timeout: Duration,
39 pub http_timeout: Option<Duration>,
40 pub priority: JobPriority,
41 pub retry: RetryConfig,
42 pub worker_capability: Option<&'static str>,
43 pub idempotent: bool,
44 pub idempotency_key: Option<&'static str>,
45 pub is_public: bool,
46 pub required_role: Option<&'static str>,
47 pub ttl: Option<Duration>,
49}
50
51impl Default for JobInfo {
52 fn default() -> Self {
53 Self {
54 name: "",
55 description: None,
56 timeout: Duration::from_secs(3600),
57 http_timeout: None,
58 priority: JobPriority::Normal,
59 retry: RetryConfig::default(),
60 worker_capability: None,
61 idempotent: false,
62 idempotency_key: None,
63 is_public: false,
64 required_role: None,
65 ttl: None,
66 }
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
71#[non_exhaustive]
72pub enum JobPriority {
73 Background = 0,
74 Low = 25,
75 #[default]
76 Normal = 50,
77 High = 75,
78 Critical = 100,
79}
80
81impl JobPriority {
82 pub fn as_i32(&self) -> i32 {
83 *self as i32
84 }
85
86 pub fn from_i32(value: i32) -> Self {
87 match value {
88 0..=12 => Self::Background,
89 13..=37 => Self::Low,
90 38..=62 => Self::Normal,
91 63..=87 => Self::High,
92 _ => Self::Critical,
93 }
94 }
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub struct ParseJobPriorityError(pub String);
99
100impl std::fmt::Display for ParseJobPriorityError {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 write!(f, "invalid job priority: '{}'", self.0)
103 }
104}
105
106impl std::error::Error for ParseJobPriorityError {}
107
108impl FromStr for JobPriority {
109 type Err = ParseJobPriorityError;
110
111 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
112 match s.to_lowercase().as_str() {
113 "background" => Ok(Self::Background),
114 "low" => Ok(Self::Low),
115 "normal" => Ok(Self::Normal),
116 "high" => Ok(Self::High),
117 "critical" => Ok(Self::Critical),
118 _ => Err(ParseJobPriorityError(s.to_string())),
119 }
120 }
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124#[non_exhaustive]
125pub enum JobStatus {
126 Pending,
128 Claimed,
130 Running,
132 Completed,
134 Retry,
136 Failed,
138 DeadLetter,
140 CancelRequested,
142 Cancelled,
144}
145
146impl JobStatus {
147 pub fn as_str(&self) -> &'static str {
148 match self {
149 Self::Pending => "pending",
150 Self::Claimed => "claimed",
151 Self::Running => "running",
152 Self::Completed => "completed",
153 Self::Retry => "retry",
154 Self::Failed => "failed",
155 Self::DeadLetter => "dead_letter",
156 Self::CancelRequested => "cancel_requested",
157 Self::Cancelled => "cancelled",
158 }
159 }
160}
161
162#[derive(Debug, Clone, PartialEq, Eq)]
163pub struct ParseJobStatusError(pub String);
164
165impl std::fmt::Display for ParseJobStatusError {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 write!(f, "invalid job status: '{}'", self.0)
168 }
169}
170
171impl std::error::Error for ParseJobStatusError {}
172
173impl FromStr for JobStatus {
174 type Err = ParseJobStatusError;
175
176 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
177 match s {
178 "pending" => Ok(Self::Pending),
179 "claimed" => Ok(Self::Claimed),
180 "running" => Ok(Self::Running),
181 "completed" => Ok(Self::Completed),
182 "retry" => Ok(Self::Retry),
183 "failed" => Ok(Self::Failed),
184 "dead_letter" => Ok(Self::DeadLetter),
185 "cancel_requested" => Ok(Self::CancelRequested),
186 "cancelled" => Ok(Self::Cancelled),
187 _ => Err(ParseJobStatusError(s.to_string())),
188 }
189 }
190}
191
192#[derive(Debug, Clone)]
194pub struct RetryConfig {
195 pub max_attempts: u32,
196 pub backoff: BackoffStrategy,
197 pub max_backoff: Duration,
198 pub retry_on: Vec<String>,
200}
201
202impl Default for RetryConfig {
203 fn default() -> Self {
204 Self {
205 max_attempts: 3,
206 backoff: BackoffStrategy::Exponential,
207 max_backoff: Duration::from_secs(300),
208 retry_on: Vec::new(),
209 }
210 }
211}
212
213impl RetryConfig {
214 pub fn calculate_backoff(&self, attempt: u32) -> Duration {
215 let base = Duration::from_secs(1);
216 let backoff = match self.backoff {
217 BackoffStrategy::Fixed => base,
218 BackoffStrategy::Linear => base * attempt,
219 BackoffStrategy::Exponential => base * 2u32.pow(attempt.saturating_sub(1)),
220 };
221 backoff.min(self.max_backoff)
222 }
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
226#[non_exhaustive]
227pub enum BackoffStrategy {
228 Fixed,
229 Linear,
230 #[default]
231 Exponential,
232}
233
234#[cfg(test)]
235#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
236mod tests {
237 use super::*;
238
239 #[test]
240 fn test_priority_ordering() {
241 assert!(JobPriority::Critical > JobPriority::High);
242 assert!(JobPriority::High > JobPriority::Normal);
243 assert!(JobPriority::Normal > JobPriority::Low);
244 assert!(JobPriority::Low > JobPriority::Background);
245 }
246
247 #[test]
248 fn test_priority_conversion() {
249 assert_eq!(JobPriority::Critical.as_i32(), 100);
250 assert_eq!(JobPriority::Normal.as_i32(), 50);
251 assert_eq!(JobPriority::from_i32(100), JobPriority::Critical);
252 assert_eq!(JobPriority::from_i32(50), JobPriority::Normal);
253 }
254
255 #[test]
256 fn test_status_conversion() {
257 assert_eq!(JobStatus::Pending.as_str(), "pending");
258 assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
259 assert_eq!(JobStatus::DeadLetter.as_str(), "dead_letter");
260 assert_eq!(
261 "dead_letter".parse::<JobStatus>(),
262 Ok(JobStatus::DeadLetter)
263 );
264 assert_eq!(JobStatus::CancelRequested.as_str(), "cancel_requested");
265 assert_eq!(
266 "cancel_requested".parse::<JobStatus>(),
267 Ok(JobStatus::CancelRequested)
268 );
269 assert_eq!(JobStatus::Cancelled.as_str(), "cancelled");
270 assert_eq!("cancelled".parse::<JobStatus>(), Ok(JobStatus::Cancelled));
271 }
272
273 #[test]
274 fn test_exponential_backoff() {
275 let config = RetryConfig::default();
276 assert_eq!(config.calculate_backoff(1), Duration::from_secs(1));
277 assert_eq!(config.calculate_backoff(2), Duration::from_secs(2));
278 assert_eq!(config.calculate_backoff(3), Duration::from_secs(4));
279 assert_eq!(config.calculate_backoff(4), Duration::from_secs(8));
280 }
281
282 #[test]
283 fn test_max_backoff_cap() {
284 let config = RetryConfig {
285 max_backoff: Duration::from_secs(10),
286 ..Default::default()
287 };
288 assert_eq!(config.calculate_backoff(10), Duration::from_secs(10));
289 }
290
291 #[test]
292 fn priority_from_i32_covers_all_buckets() {
293 assert_eq!(JobPriority::from_i32(0), JobPriority::Background);
296 assert_eq!(JobPriority::from_i32(12), JobPriority::Background);
297 assert_eq!(JobPriority::from_i32(13), JobPriority::Low);
298 assert_eq!(JobPriority::from_i32(25), JobPriority::Low);
299 assert_eq!(JobPriority::from_i32(37), JobPriority::Low);
300 assert_eq!(JobPriority::from_i32(38), JobPriority::Normal);
301 assert_eq!(JobPriority::from_i32(62), JobPriority::Normal);
302 assert_eq!(JobPriority::from_i32(63), JobPriority::High);
303 assert_eq!(JobPriority::from_i32(87), JobPriority::High);
304 assert_eq!(JobPriority::from_i32(88), JobPriority::Critical);
305 assert_eq!(JobPriority::from_i32(1_000_000), JobPriority::Critical);
306 assert_eq!(JobPriority::from_i32(-1), JobPriority::Critical);
308 }
309
310 #[test]
311 fn priority_default_is_normal() {
312 assert_eq!(JobPriority::default(), JobPriority::Normal);
313 }
314
315 #[test]
316 fn priority_round_trips_through_i32_buckets() {
317 for variant in [
320 JobPriority::Background,
321 JobPriority::Low,
322 JobPriority::Normal,
323 JobPriority::High,
324 JobPriority::Critical,
325 ] {
326 assert_eq!(JobPriority::from_i32(variant.as_i32()), variant);
327 }
328 }
329
330 #[test]
331 fn priority_from_str_is_case_insensitive_for_all_variants() {
332 assert_eq!(
333 "background".parse::<JobPriority>(),
334 Ok(JobPriority::Background)
335 );
336 assert_eq!("Low".parse::<JobPriority>(), Ok(JobPriority::Low));
337 assert_eq!("NORMAL".parse::<JobPriority>(), Ok(JobPriority::Normal));
338 assert_eq!("HiGh".parse::<JobPriority>(), Ok(JobPriority::High));
339 assert_eq!("critical".parse::<JobPriority>(), Ok(JobPriority::Critical));
340 }
341
342 #[test]
343 fn priority_from_str_reports_unknown_input_verbatim() {
344 let err = "urgent".parse::<JobPriority>().unwrap_err();
345 assert_eq!(err.0, "urgent");
346 assert!(err.to_string().contains("urgent"));
348 }
349
350 #[test]
351 fn status_from_str_rejects_unknown_input() {
352 let err = "pending_review".parse::<JobStatus>().unwrap_err();
353 assert_eq!(err.0, "pending_review");
354 assert!(err.to_string().contains("pending_review"));
355 }
356
357 #[test]
358 fn status_round_trips_for_every_variant() {
359 for status in [
360 JobStatus::Pending,
361 JobStatus::Claimed,
362 JobStatus::Running,
363 JobStatus::Completed,
364 JobStatus::Retry,
365 JobStatus::Failed,
366 JobStatus::DeadLetter,
367 JobStatus::CancelRequested,
368 JobStatus::Cancelled,
369 ] {
370 let s = status.as_str();
371 assert_eq!(s.parse::<JobStatus>().unwrap(), status);
372 }
373 }
374
375 #[test]
376 fn parse_errors_are_error_trait_impls() {
377 fn assert_error<E: std::error::Error>() {}
380 assert_error::<ParseJobPriorityError>();
381 assert_error::<ParseJobStatusError>();
382 }
383
384 #[test]
385 fn job_info_default_values_match_doctrine() {
386 let info = JobInfo::default();
387 assert_eq!(info.name, "");
388 assert_eq!(info.timeout, Duration::from_secs(3600));
389 assert_eq!(info.priority, JobPriority::Normal);
390 assert!(!info.is_public);
391 assert!(info.required_role.is_none());
392 assert!(!info.idempotent);
393 assert!(info.ttl.is_none());
394 }
395
396 #[test]
397 fn retry_config_default_retries_on_all_errors() {
398 let cfg = RetryConfig::default();
399 assert_eq!(cfg.max_attempts, 3);
400 assert_eq!(cfg.backoff, BackoffStrategy::Exponential);
401 assert_eq!(cfg.max_backoff, Duration::from_secs(300));
402 assert!(cfg.retry_on.is_empty(), "empty list ⇒ retry on every error");
403 }
404
405 #[test]
406 fn backoff_fixed_returns_base_for_any_attempt() {
407 let cfg = RetryConfig {
408 backoff: BackoffStrategy::Fixed,
409 ..Default::default()
410 };
411 for attempt in [1u32, 2, 5, 100] {
412 assert_eq!(cfg.calculate_backoff(attempt), Duration::from_secs(1));
413 }
414 }
415
416 #[test]
417 fn backoff_linear_multiplies_base_by_attempt() {
418 let cfg = RetryConfig {
419 backoff: BackoffStrategy::Linear,
420 ..Default::default()
421 };
422 assert_eq!(cfg.calculate_backoff(1), Duration::from_secs(1));
423 assert_eq!(cfg.calculate_backoff(5), Duration::from_secs(5));
424 assert_eq!(cfg.calculate_backoff(50), Duration::from_secs(50));
425 }
426
427 #[test]
428 fn backoff_exponential_handles_attempt_zero_without_underflow() {
429 let cfg = RetryConfig::default();
431 assert_eq!(cfg.calculate_backoff(0), Duration::from_secs(1));
432 }
433
434 #[test]
435 fn backoff_exponential_caps_at_max_backoff_for_large_attempt() {
436 let cfg = RetryConfig::default();
438 assert_eq!(cfg.calculate_backoff(20), Duration::from_secs(300));
439 }
440
441 #[test]
442 fn backoff_strategy_default_is_exponential() {
443 assert_eq!(BackoffStrategy::default(), BackoffStrategy::Exponential);
444 }
445}