Skip to main content

forge_core/job/
traits.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::time::Duration;
5
6use serde::{Serialize, de::DeserializeOwned};
7
8use crate::error::Result;
9
10use super::context::JobContext;
11
12/// Trait for FORGE job handlers.
13pub trait ForgeJob: Send + Sync + 'static {
14    /// Input arguments type.
15    type Args: DeserializeOwned + Serialize + Send + Sync;
16    /// Output result type.
17    type Output: Serialize + Send;
18
19    /// Get job metadata.
20    fn info() -> JobInfo;
21
22    /// Execute the job.
23    fn execute(
24        ctx: &JobContext,
25        args: Self::Args,
26    ) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
27
28    /// Compensate a cancelled job.
29    fn compensate<'a>(
30        _ctx: &'a JobContext,
31        _args: Self::Args,
32        _reason: &'a str,
33    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
34        Box::pin(async { Ok(()) })
35    }
36}
37
38/// Job metadata.
39#[derive(Debug, Clone)]
40pub struct JobInfo {
41    /// Job name (used for routing).
42    pub name: &'static str,
43    /// Job timeout.
44    pub timeout: Duration,
45    /// Default priority.
46    pub priority: JobPriority,
47    /// Retry configuration.
48    pub retry: RetryConfig,
49    /// Required worker capability (e.g., "general", "media", "ml").
50    pub worker_capability: Option<&'static str>,
51    /// Whether to deduplicate by idempotency key.
52    pub idempotent: bool,
53    /// Idempotency key field path.
54    pub idempotency_key: Option<&'static str>,
55    /// Whether the job is public (no auth required).
56    pub is_public: bool,
57    /// Required role for authorization (implies auth required).
58    pub required_role: Option<&'static str>,
59    /// Time-to-live after completion. Job records are cleaned up after this duration.
60    /// None means records are kept indefinitely.
61    pub ttl: Option<Duration>,
62}
63
64impl Default for JobInfo {
65    fn default() -> Self {
66        Self {
67            name: "",
68            timeout: Duration::from_secs(3600), // 1 hour default
69            priority: JobPriority::Normal,
70            retry: RetryConfig::default(),
71            worker_capability: None,
72            idempotent: false,
73            idempotency_key: None,
74            is_public: false,
75            required_role: None,
76            ttl: None,
77        }
78    }
79}
80
81/// Job priority levels.
82#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
83pub enum JobPriority {
84    Background = 0,
85    Low = 25,
86    #[default]
87    Normal = 50,
88    High = 75,
89    Critical = 100,
90}
91
92impl JobPriority {
93    /// Get numeric value for database storage.
94    pub fn as_i32(&self) -> i32 {
95        *self as i32
96    }
97
98    /// Parse from numeric value.
99    pub fn from_i32(value: i32) -> Self {
100        match value {
101            0..=12 => Self::Background,
102            13..=37 => Self::Low,
103            38..=62 => Self::Normal,
104            63..=87 => Self::High,
105            _ => Self::Critical,
106        }
107    }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct ParseJobPriorityError(pub String);
112
113impl std::fmt::Display for ParseJobPriorityError {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        write!(f, "invalid job priority: '{}'", self.0)
116    }
117}
118
119impl std::error::Error for ParseJobPriorityError {}
120
121impl FromStr for JobPriority {
122    type Err = ParseJobPriorityError;
123
124    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
125        match s.to_lowercase().as_str() {
126            "background" => Ok(Self::Background),
127            "low" => Ok(Self::Low),
128            "normal" => Ok(Self::Normal),
129            "high" => Ok(Self::High),
130            "critical" => Ok(Self::Critical),
131            _ => Err(ParseJobPriorityError(s.to_string())),
132        }
133    }
134}
135
136/// Job status in the queue.
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138pub enum JobStatus {
139    /// Waiting to be claimed.
140    Pending,
141    /// Claimed by a worker.
142    Claimed,
143    /// Currently executing.
144    Running,
145    /// Successfully completed.
146    Completed,
147    /// Failed, will retry.
148    Retry,
149    /// Failed permanently.
150    Failed,
151    /// Moved to dead letter queue.
152    DeadLetter,
153    /// Cancellation requested for a running job.
154    CancelRequested,
155    /// Job cancelled.
156    Cancelled,
157}
158
159impl JobStatus {
160    /// Convert to database string.
161    pub fn as_str(&self) -> &'static str {
162        match self {
163            Self::Pending => "pending",
164            Self::Claimed => "claimed",
165            Self::Running => "running",
166            Self::Completed => "completed",
167            Self::Retry => "retry",
168            Self::Failed => "failed",
169            Self::DeadLetter => "dead_letter",
170            Self::CancelRequested => "cancel_requested",
171            Self::Cancelled => "cancelled",
172        }
173    }
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct ParseJobStatusError(pub String);
178
179impl std::fmt::Display for ParseJobStatusError {
180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181        write!(f, "invalid job status: '{}'", self.0)
182    }
183}
184
185impl std::error::Error for ParseJobStatusError {}
186
187impl FromStr for JobStatus {
188    type Err = ParseJobStatusError;
189
190    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
191        match s {
192            "pending" => Ok(Self::Pending),
193            "claimed" => Ok(Self::Claimed),
194            "running" => Ok(Self::Running),
195            "completed" => Ok(Self::Completed),
196            "retry" => Ok(Self::Retry),
197            "failed" => Ok(Self::Failed),
198            "dead_letter" => Ok(Self::DeadLetter),
199            "cancel_requested" => Ok(Self::CancelRequested),
200            "cancelled" => Ok(Self::Cancelled),
201            _ => Err(ParseJobStatusError(s.to_string())),
202        }
203    }
204}
205
206/// Retry configuration for jobs.
207#[derive(Debug, Clone)]
208pub struct RetryConfig {
209    /// Maximum number of retry attempts.
210    pub max_attempts: u32,
211    /// Backoff strategy.
212    pub backoff: BackoffStrategy,
213    /// Maximum backoff duration.
214    pub max_backoff: Duration,
215    /// Error types to retry on (empty = all errors).
216    pub retry_on: Vec<String>,
217}
218
219impl Default for RetryConfig {
220    fn default() -> Self {
221        Self {
222            max_attempts: 3,
223            backoff: BackoffStrategy::Exponential,
224            max_backoff: Duration::from_secs(300), // 5 minutes
225            retry_on: Vec::new(),                  // Retry on all errors
226        }
227    }
228}
229
230impl RetryConfig {
231    /// Calculate backoff duration for a given attempt.
232    pub fn calculate_backoff(&self, attempt: u32) -> Duration {
233        let base = Duration::from_secs(1);
234        let backoff = match self.backoff {
235            BackoffStrategy::Fixed => base,
236            BackoffStrategy::Linear => base * attempt,
237            BackoffStrategy::Exponential => base * 2u32.pow(attempt.saturating_sub(1)),
238        };
239        backoff.min(self.max_backoff)
240    }
241}
242
243/// Backoff strategy for retries.
244#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
245pub enum BackoffStrategy {
246    /// Same delay each time.
247    Fixed,
248    /// Delay increases linearly.
249    Linear,
250    /// Delay doubles each time.
251    #[default]
252    Exponential,
253}
254
255#[cfg(test)]
256#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
257mod tests {
258    use super::*;
259
260    #[test]
261    fn test_priority_ordering() {
262        assert!(JobPriority::Critical > JobPriority::High);
263        assert!(JobPriority::High > JobPriority::Normal);
264        assert!(JobPriority::Normal > JobPriority::Low);
265        assert!(JobPriority::Low > JobPriority::Background);
266    }
267
268    #[test]
269    fn test_priority_conversion() {
270        assert_eq!(JobPriority::Critical.as_i32(), 100);
271        assert_eq!(JobPriority::Normal.as_i32(), 50);
272        assert_eq!(JobPriority::from_i32(100), JobPriority::Critical);
273        assert_eq!(JobPriority::from_i32(50), JobPriority::Normal);
274    }
275
276    #[test]
277    fn test_status_conversion() {
278        assert_eq!(JobStatus::Pending.as_str(), "pending");
279        assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
280        assert_eq!(JobStatus::DeadLetter.as_str(), "dead_letter");
281        assert_eq!(
282            "dead_letter".parse::<JobStatus>(),
283            Ok(JobStatus::DeadLetter)
284        );
285        assert_eq!(JobStatus::CancelRequested.as_str(), "cancel_requested");
286        assert_eq!(
287            "cancel_requested".parse::<JobStatus>(),
288            Ok(JobStatus::CancelRequested)
289        );
290        assert_eq!(JobStatus::Cancelled.as_str(), "cancelled");
291        assert_eq!("cancelled".parse::<JobStatus>(), Ok(JobStatus::Cancelled));
292    }
293
294    #[test]
295    fn test_exponential_backoff() {
296        let config = RetryConfig::default();
297        assert_eq!(config.calculate_backoff(1), Duration::from_secs(1));
298        assert_eq!(config.calculate_backoff(2), Duration::from_secs(2));
299        assert_eq!(config.calculate_backoff(3), Duration::from_secs(4));
300        assert_eq!(config.calculate_backoff(4), Duration::from_secs(8));
301    }
302
303    #[test]
304    fn test_max_backoff_cap() {
305        let config = RetryConfig {
306            max_backoff: Duration::from_secs(10),
307            ..Default::default()
308        };
309        assert_eq!(config.calculate_backoff(10), Duration::from_secs(10));
310    }
311}