Skip to main content

forge_core/job/
traits.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::time::Duration;
5
6use serde::{Serialize, de::DeserializeOwned};
7
8use crate::error::Result;
9
10use super::context::JobContext;
11
12/// Trait for FORGE job handlers.
13pub trait ForgeJob: Send + Sync + 'static {
14    /// Input arguments type.
15    type Args: DeserializeOwned + Serialize + Send + Sync;
16    /// Output result type.
17    type Output: Serialize + Send;
18
19    /// Get job metadata.
20    fn info() -> JobInfo;
21
22    /// Execute the job.
23    fn execute(
24        ctx: &JobContext,
25        args: Self::Args,
26    ) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
27
28    /// Compensate a cancelled job.
29    fn compensate<'a>(
30        _ctx: &'a JobContext,
31        _args: Self::Args,
32        _reason: &'a str,
33    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
34        Box::pin(async { Ok(()) })
35    }
36}
37
38/// Job metadata.
39#[derive(Debug, Clone)]
40pub struct JobInfo {
41    /// Job name (used for routing).
42    pub name: &'static str,
43    /// Job timeout.
44    pub timeout: Duration,
45    /// Default priority.
46    pub priority: JobPriority,
47    /// Retry configuration.
48    pub retry: RetryConfig,
49    /// Required worker capability (e.g., "general", "media", "ml").
50    pub worker_capability: Option<&'static str>,
51    /// Whether to deduplicate by idempotency key.
52    pub idempotent: bool,
53    /// Idempotency key field path.
54    pub idempotency_key: Option<&'static str>,
55    /// Whether the job is public (no auth required).
56    pub is_public: bool,
57    /// Required role for authorization (implies auth required).
58    pub required_role: Option<&'static str>,
59    /// Time-to-live after completion. Job records are cleaned up after this duration.
60    /// None means records are kept indefinitely.
61    pub ttl: Option<Duration>,
62}
63
64impl Default for JobInfo {
65    fn default() -> Self {
66        Self {
67            name: "",
68            timeout: Duration::from_secs(3600), // 1 hour default
69            priority: JobPriority::Normal,
70            retry: RetryConfig::default(),
71            worker_capability: None,
72            idempotent: false,
73            idempotency_key: None,
74            is_public: false,
75            required_role: None,
76            ttl: None,
77        }
78    }
79}
80
81/// Job priority levels.
82#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
83pub enum JobPriority {
84    Background = 0,
85    Low = 25,
86    #[default]
87    Normal = 50,
88    High = 75,
89    Critical = 100,
90}
91
92impl JobPriority {
93    /// Get numeric value for database storage.
94    pub fn as_i32(&self) -> i32 {
95        *self as i32
96    }
97
98    /// Parse from numeric value.
99    pub fn from_i32(value: i32) -> Self {
100        match value {
101            0..=12 => Self::Background,
102            13..=37 => Self::Low,
103            38..=62 => Self::Normal,
104            63..=87 => Self::High,
105            _ => Self::Critical,
106        }
107    }
108}
109
110impl FromStr for JobPriority {
111    type Err = std::convert::Infallible;
112
113    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
114        Ok(match s.to_lowercase().as_str() {
115            "background" => Self::Background,
116            "low" => Self::Low,
117            "normal" => Self::Normal,
118            "high" => Self::High,
119            "critical" => Self::Critical,
120            _ => Self::Normal,
121        })
122    }
123}
124
125/// Job status in the queue.
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum JobStatus {
128    /// Waiting to be claimed.
129    Pending,
130    /// Claimed by a worker.
131    Claimed,
132    /// Currently executing.
133    Running,
134    /// Successfully completed.
135    Completed,
136    /// Failed, will retry.
137    Retry,
138    /// Failed permanently.
139    Failed,
140    /// Moved to dead letter queue.
141    DeadLetter,
142    /// Cancellation requested for a running job.
143    CancelRequested,
144    /// Job cancelled.
145    Cancelled,
146}
147
148impl JobStatus {
149    /// Convert to database string.
150    pub fn as_str(&self) -> &'static str {
151        match self {
152            Self::Pending => "pending",
153            Self::Claimed => "claimed",
154            Self::Running => "running",
155            Self::Completed => "completed",
156            Self::Retry => "retry",
157            Self::Failed => "failed",
158            Self::DeadLetter => "dead_letter",
159            Self::CancelRequested => "cancel_requested",
160            Self::Cancelled => "cancelled",
161        }
162    }
163}
164
165impl FromStr for JobStatus {
166    type Err = std::convert::Infallible;
167
168    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
169        Ok(match s {
170            "pending" => Self::Pending,
171            "claimed" => Self::Claimed,
172            "running" => Self::Running,
173            "completed" => Self::Completed,
174            "retry" => Self::Retry,
175            "failed" => Self::Failed,
176            "dead_letter" => Self::DeadLetter,
177            "cancel_requested" => Self::CancelRequested,
178            "cancelled" => Self::Cancelled,
179            _ => Self::Pending,
180        })
181    }
182}
183
184/// Retry configuration for jobs.
185#[derive(Debug, Clone)]
186pub struct RetryConfig {
187    /// Maximum number of retry attempts.
188    pub max_attempts: u32,
189    /// Backoff strategy.
190    pub backoff: BackoffStrategy,
191    /// Maximum backoff duration.
192    pub max_backoff: Duration,
193    /// Error types to retry on (empty = all errors).
194    pub retry_on: Vec<String>,
195}
196
197impl Default for RetryConfig {
198    fn default() -> Self {
199        Self {
200            max_attempts: 3,
201            backoff: BackoffStrategy::Exponential,
202            max_backoff: Duration::from_secs(300), // 5 minutes
203            retry_on: Vec::new(),                  // Retry on all errors
204        }
205    }
206}
207
208impl RetryConfig {
209    /// Calculate backoff duration for a given attempt.
210    pub fn calculate_backoff(&self, attempt: u32) -> Duration {
211        let base = Duration::from_secs(1);
212        let backoff = match self.backoff {
213            BackoffStrategy::Fixed => base,
214            BackoffStrategy::Linear => base * attempt,
215            BackoffStrategy::Exponential => base * 2u32.pow(attempt.saturating_sub(1)),
216        };
217        backoff.min(self.max_backoff)
218    }
219}
220
221/// Backoff strategy for retries.
222#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
223pub enum BackoffStrategy {
224    /// Same delay each time.
225    Fixed,
226    /// Delay increases linearly.
227    Linear,
228    /// Delay doubles each time.
229    #[default]
230    Exponential,
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn test_priority_ordering() {
239        assert!(JobPriority::Critical > JobPriority::High);
240        assert!(JobPriority::High > JobPriority::Normal);
241        assert!(JobPriority::Normal > JobPriority::Low);
242        assert!(JobPriority::Low > JobPriority::Background);
243    }
244
245    #[test]
246    fn test_priority_conversion() {
247        assert_eq!(JobPriority::Critical.as_i32(), 100);
248        assert_eq!(JobPriority::Normal.as_i32(), 50);
249        assert_eq!(JobPriority::from_i32(100), JobPriority::Critical);
250        assert_eq!(JobPriority::from_i32(50), JobPriority::Normal);
251    }
252
253    #[test]
254    fn test_status_conversion() {
255        assert_eq!(JobStatus::Pending.as_str(), "pending");
256        assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
257        assert_eq!(JobStatus::DeadLetter.as_str(), "dead_letter");
258        assert_eq!(
259            "dead_letter".parse::<JobStatus>(),
260            Ok(JobStatus::DeadLetter)
261        );
262        assert_eq!(JobStatus::CancelRequested.as_str(), "cancel_requested");
263        assert_eq!(
264            "cancel_requested".parse::<JobStatus>(),
265            Ok(JobStatus::CancelRequested)
266        );
267        assert_eq!(JobStatus::Cancelled.as_str(), "cancelled");
268        assert_eq!("cancelled".parse::<JobStatus>(), Ok(JobStatus::Cancelled));
269    }
270
271    #[test]
272    fn test_exponential_backoff() {
273        let config = RetryConfig::default();
274        assert_eq!(config.calculate_backoff(1), Duration::from_secs(1));
275        assert_eq!(config.calculate_backoff(2), Duration::from_secs(2));
276        assert_eq!(config.calculate_backoff(3), Duration::from_secs(4));
277        assert_eq!(config.calculate_backoff(4), Duration::from_secs(8));
278    }
279
280    #[test]
281    fn test_max_backoff_cap() {
282        let config = RetryConfig {
283            max_backoff: Duration::from_secs(10),
284            ..Default::default()
285        };
286        assert_eq!(config.calculate_backoff(10), Duration::from_secs(10));
287    }
288}