Skip to main content

forge_core/job/
traits.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::time::Duration;
5
6use serde::{Serialize, de::DeserializeOwned};
7
8use crate::error::Result;
9
10use super::context::JobContext;
11
12/// Trait for background job handlers.
13pub trait ForgeJob: crate::__sealed::Sealed + Send + Sync + 'static {
14    type Args: DeserializeOwned + Serialize + Send + Sync;
15    type Output: Serialize + Send;
16
17    fn info() -> JobInfo;
18
19    fn execute(
20        ctx: &JobContext,
21        args: Self::Args,
22    ) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
23
24    fn compensate<'a>(
25        _ctx: &'a JobContext,
26        _args: Self::Args,
27        _reason: &'a str,
28    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
29        Box::pin(async { Ok(()) })
30    }
31}
32
33/// Metadata for a registered job handler.
34#[derive(Debug, Clone)]
35pub struct JobInfo {
36    pub name: &'static str,
37    pub description: Option<&'static str>,
38    pub timeout: Duration,
39    pub http_timeout: Option<Duration>,
40    pub priority: JobPriority,
41    pub retry: RetryConfig,
42    pub worker_capability: Option<&'static str>,
43    pub idempotent: bool,
44    pub idempotency_key: Option<&'static str>,
45    pub is_public: bool,
46    pub required_role: Option<&'static str>,
47    /// Records are cleaned up after this duration; `None` means kept indefinitely.
48    pub ttl: Option<Duration>,
49}
50
51impl Default for JobInfo {
52    fn default() -> Self {
53        Self {
54            name: "",
55            description: None,
56            timeout: Duration::from_secs(3600),
57            http_timeout: None,
58            priority: JobPriority::Normal,
59            retry: RetryConfig::default(),
60            worker_capability: None,
61            idempotent: false,
62            idempotency_key: None,
63            is_public: false,
64            required_role: None,
65            ttl: None,
66        }
67    }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
71#[non_exhaustive]
72pub enum JobPriority {
73    Background = 0,
74    Low = 25,
75    #[default]
76    Normal = 50,
77    High = 75,
78    Critical = 100,
79}
80
81impl JobPriority {
82    pub fn as_i32(&self) -> i32 {
83        *self as i32
84    }
85
86    pub fn from_i32(value: i32) -> Self {
87        match value {
88            0..=12 => Self::Background,
89            13..=37 => Self::Low,
90            38..=62 => Self::Normal,
91            63..=87 => Self::High,
92            _ => Self::Critical,
93        }
94    }
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub struct ParseJobPriorityError(pub String);
99
100impl std::fmt::Display for ParseJobPriorityError {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        write!(f, "invalid job priority: '{}'", self.0)
103    }
104}
105
106impl std::error::Error for ParseJobPriorityError {}
107
108impl FromStr for JobPriority {
109    type Err = ParseJobPriorityError;
110
111    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
112        match s.to_lowercase().as_str() {
113            "background" => Ok(Self::Background),
114            "low" => Ok(Self::Low),
115            "normal" => Ok(Self::Normal),
116            "high" => Ok(Self::High),
117            "critical" => Ok(Self::Critical),
118            _ => Err(ParseJobPriorityError(s.to_string())),
119        }
120    }
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124#[non_exhaustive]
125pub enum JobStatus {
126    /// Waiting to be claimed.
127    Pending,
128    /// Claimed by a worker.
129    Claimed,
130    /// Currently executing.
131    Running,
132    /// Successfully completed.
133    Completed,
134    /// Failed, will retry.
135    Retry,
136    /// Failed permanently.
137    Failed,
138    /// Moved to dead letter queue.
139    DeadLetter,
140    /// Cancellation requested for a running job.
141    CancelRequested,
142    /// Job cancelled.
143    Cancelled,
144}
145
146impl JobStatus {
147    pub fn as_str(&self) -> &'static str {
148        match self {
149            Self::Pending => "pending",
150            Self::Claimed => "claimed",
151            Self::Running => "running",
152            Self::Completed => "completed",
153            Self::Retry => "retry",
154            Self::Failed => "failed",
155            Self::DeadLetter => "dead_letter",
156            Self::CancelRequested => "cancel_requested",
157            Self::Cancelled => "cancelled",
158        }
159    }
160}
161
162#[derive(Debug, Clone, PartialEq, Eq)]
163pub struct ParseJobStatusError(pub String);
164
165impl std::fmt::Display for ParseJobStatusError {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        write!(f, "invalid job status: '{}'", self.0)
168    }
169}
170
171impl std::error::Error for ParseJobStatusError {}
172
173impl FromStr for JobStatus {
174    type Err = ParseJobStatusError;
175
176    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
177        match s {
178            "pending" => Ok(Self::Pending),
179            "claimed" => Ok(Self::Claimed),
180            "running" => Ok(Self::Running),
181            "completed" => Ok(Self::Completed),
182            "retry" => Ok(Self::Retry),
183            "failed" => Ok(Self::Failed),
184            "dead_letter" => Ok(Self::DeadLetter),
185            "cancel_requested" => Ok(Self::CancelRequested),
186            "cancelled" => Ok(Self::Cancelled),
187            _ => Err(ParseJobStatusError(s.to_string())),
188        }
189    }
190}
191
192/// Retry configuration for jobs.
193#[derive(Debug, Clone)]
194pub struct RetryConfig {
195    pub max_attempts: u32,
196    pub backoff: BackoffStrategy,
197    pub max_backoff: Duration,
198    /// Empty means retry on all errors.
199    pub retry_on: Vec<String>,
200}
201
202impl Default for RetryConfig {
203    fn default() -> Self {
204        Self {
205            max_attempts: 3,
206            backoff: BackoffStrategy::Exponential,
207            max_backoff: Duration::from_secs(300),
208            retry_on: Vec::new(),
209        }
210    }
211}
212
213impl RetryConfig {
214    pub fn calculate_backoff(&self, attempt: u32) -> Duration {
215        let base = Duration::from_secs(1);
216        let backoff = match self.backoff {
217            BackoffStrategy::Fixed => base,
218            BackoffStrategy::Linear => base * attempt,
219            BackoffStrategy::Exponential => base * 2u32.pow(attempt.saturating_sub(1)),
220        };
221        backoff.min(self.max_backoff)
222    }
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
226#[non_exhaustive]
227pub enum BackoffStrategy {
228    Fixed,
229    Linear,
230    #[default]
231    Exponential,
232}
233
234#[cfg(test)]
235#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
236mod tests {
237    use super::*;
238
239    #[test]
240    fn test_priority_ordering() {
241        assert!(JobPriority::Critical > JobPriority::High);
242        assert!(JobPriority::High > JobPriority::Normal);
243        assert!(JobPriority::Normal > JobPriority::Low);
244        assert!(JobPriority::Low > JobPriority::Background);
245    }
246
247    #[test]
248    fn test_priority_conversion() {
249        assert_eq!(JobPriority::Critical.as_i32(), 100);
250        assert_eq!(JobPriority::Normal.as_i32(), 50);
251        assert_eq!(JobPriority::from_i32(100), JobPriority::Critical);
252        assert_eq!(JobPriority::from_i32(50), JobPriority::Normal);
253    }
254
255    #[test]
256    fn test_status_conversion() {
257        assert_eq!(JobStatus::Pending.as_str(), "pending");
258        assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
259        assert_eq!(JobStatus::DeadLetter.as_str(), "dead_letter");
260        assert_eq!(
261            "dead_letter".parse::<JobStatus>(),
262            Ok(JobStatus::DeadLetter)
263        );
264        assert_eq!(JobStatus::CancelRequested.as_str(), "cancel_requested");
265        assert_eq!(
266            "cancel_requested".parse::<JobStatus>(),
267            Ok(JobStatus::CancelRequested)
268        );
269        assert_eq!(JobStatus::Cancelled.as_str(), "cancelled");
270        assert_eq!("cancelled".parse::<JobStatus>(), Ok(JobStatus::Cancelled));
271    }
272
273    #[test]
274    fn test_exponential_backoff() {
275        let config = RetryConfig::default();
276        assert_eq!(config.calculate_backoff(1), Duration::from_secs(1));
277        assert_eq!(config.calculate_backoff(2), Duration::from_secs(2));
278        assert_eq!(config.calculate_backoff(3), Duration::from_secs(4));
279        assert_eq!(config.calculate_backoff(4), Duration::from_secs(8));
280    }
281
282    #[test]
283    fn test_max_backoff_cap() {
284        let config = RetryConfig {
285            max_backoff: Duration::from_secs(10),
286            ..Default::default()
287        };
288        assert_eq!(config.calculate_backoff(10), Duration::from_secs(10));
289    }
290
291    #[test]
292    fn priority_from_i32_covers_all_buckets() {
293        // Boundaries derived from the impl: 0..=12 Background, 13..=37 Low,
294        // 38..=62 Normal, 63..=87 High, _ Critical (incl. negatives).
295        assert_eq!(JobPriority::from_i32(0), JobPriority::Background);
296        assert_eq!(JobPriority::from_i32(12), JobPriority::Background);
297        assert_eq!(JobPriority::from_i32(13), JobPriority::Low);
298        assert_eq!(JobPriority::from_i32(25), JobPriority::Low);
299        assert_eq!(JobPriority::from_i32(37), JobPriority::Low);
300        assert_eq!(JobPriority::from_i32(38), JobPriority::Normal);
301        assert_eq!(JobPriority::from_i32(62), JobPriority::Normal);
302        assert_eq!(JobPriority::from_i32(63), JobPriority::High);
303        assert_eq!(JobPriority::from_i32(87), JobPriority::High);
304        assert_eq!(JobPriority::from_i32(88), JobPriority::Critical);
305        assert_eq!(JobPriority::from_i32(1_000_000), JobPriority::Critical);
306        // Negatives fall through to Critical via the wildcard arm.
307        assert_eq!(JobPriority::from_i32(-1), JobPriority::Critical);
308    }
309
310    #[test]
311    fn priority_default_is_normal() {
312        assert_eq!(JobPriority::default(), JobPriority::Normal);
313    }
314
315    #[test]
316    fn priority_round_trips_through_i32_buckets() {
317        // The constructed values are at bucket centers, so the round trip is
318        // lossless for the canonical numeric encodings.
319        for variant in [
320            JobPriority::Background,
321            JobPriority::Low,
322            JobPriority::Normal,
323            JobPriority::High,
324            JobPriority::Critical,
325        ] {
326            assert_eq!(JobPriority::from_i32(variant.as_i32()), variant);
327        }
328    }
329
330    #[test]
331    fn priority_from_str_is_case_insensitive_for_all_variants() {
332        assert_eq!(
333            "background".parse::<JobPriority>(),
334            Ok(JobPriority::Background)
335        );
336        assert_eq!("Low".parse::<JobPriority>(), Ok(JobPriority::Low));
337        assert_eq!("NORMAL".parse::<JobPriority>(), Ok(JobPriority::Normal));
338        assert_eq!("HiGh".parse::<JobPriority>(), Ok(JobPriority::High));
339        assert_eq!("critical".parse::<JobPriority>(), Ok(JobPriority::Critical));
340    }
341
342    #[test]
343    fn priority_from_str_reports_unknown_input_verbatim() {
344        let err = "urgent".parse::<JobPriority>().unwrap_err();
345        assert_eq!(err.0, "urgent");
346        // Display surfaces the bad input.
347        assert!(err.to_string().contains("urgent"));
348    }
349
350    #[test]
351    fn status_from_str_rejects_unknown_input() {
352        let err = "pending_review".parse::<JobStatus>().unwrap_err();
353        assert_eq!(err.0, "pending_review");
354        assert!(err.to_string().contains("pending_review"));
355    }
356
357    #[test]
358    fn status_round_trips_for_every_variant() {
359        for status in [
360            JobStatus::Pending,
361            JobStatus::Claimed,
362            JobStatus::Running,
363            JobStatus::Completed,
364            JobStatus::Retry,
365            JobStatus::Failed,
366            JobStatus::DeadLetter,
367            JobStatus::CancelRequested,
368            JobStatus::Cancelled,
369        ] {
370            let s = status.as_str();
371            assert_eq!(s.parse::<JobStatus>().unwrap(), status);
372        }
373    }
374
375    #[test]
376    fn parse_errors_are_error_trait_impls() {
377        // Cheap guard against accidental removal of the Error impl, which
378        // would break `?` propagation in user code that uses these parsers.
379        fn assert_error<E: std::error::Error>() {}
380        assert_error::<ParseJobPriorityError>();
381        assert_error::<ParseJobStatusError>();
382    }
383
384    #[test]
385    fn job_info_default_values_match_doctrine() {
386        let info = JobInfo::default();
387        assert_eq!(info.name, "");
388        assert_eq!(info.timeout, Duration::from_secs(3600));
389        assert_eq!(info.priority, JobPriority::Normal);
390        assert!(!info.is_public);
391        assert!(info.required_role.is_none());
392        assert!(!info.idempotent);
393        assert!(info.ttl.is_none());
394    }
395
396    #[test]
397    fn retry_config_default_retries_on_all_errors() {
398        let cfg = RetryConfig::default();
399        assert_eq!(cfg.max_attempts, 3);
400        assert_eq!(cfg.backoff, BackoffStrategy::Exponential);
401        assert_eq!(cfg.max_backoff, Duration::from_secs(300));
402        assert!(cfg.retry_on.is_empty(), "empty list ⇒ retry on every error");
403    }
404
405    #[test]
406    fn backoff_fixed_returns_base_for_any_attempt() {
407        let cfg = RetryConfig {
408            backoff: BackoffStrategy::Fixed,
409            ..Default::default()
410        };
411        for attempt in [1u32, 2, 5, 100] {
412            assert_eq!(cfg.calculate_backoff(attempt), Duration::from_secs(1));
413        }
414    }
415
416    #[test]
417    fn backoff_linear_multiplies_base_by_attempt() {
418        let cfg = RetryConfig {
419            backoff: BackoffStrategy::Linear,
420            ..Default::default()
421        };
422        assert_eq!(cfg.calculate_backoff(1), Duration::from_secs(1));
423        assert_eq!(cfg.calculate_backoff(5), Duration::from_secs(5));
424        assert_eq!(cfg.calculate_backoff(50), Duration::from_secs(50));
425    }
426
427    #[test]
428    fn backoff_exponential_handles_attempt_zero_without_underflow() {
429        // attempt = 0 ⇒ saturating_sub keeps exponent at 0 ⇒ 2^0 = 1 ⇒ base.
430        let cfg = RetryConfig::default();
431        assert_eq!(cfg.calculate_backoff(0), Duration::from_secs(1));
432    }
433
434    #[test]
435    fn backoff_exponential_caps_at_max_backoff_for_large_attempt() {
436        // attempt = 20 ⇒ 2^19 seconds = ~6 days, must cap to default 5 min.
437        let cfg = RetryConfig::default();
438        assert_eq!(cfg.calculate_backoff(20), Duration::from_secs(300));
439    }
440
441    #[test]
442    fn backoff_strategy_default_is_exponential() {
443        assert_eq!(BackoffStrategy::default(), BackoffStrategy::Exponential);
444    }
445}