Skip to main content

forge_core/job/
traits.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::time::Duration;
5
6use serde::{Serialize, de::DeserializeOwned};
7
8use crate::error::Result;
9
10use super::context::JobContext;
11
12/// Trait for FORGE job handlers.
13pub trait ForgeJob: Send + Sync + 'static {
14    /// Input arguments type.
15    type Args: DeserializeOwned + Serialize + Send + Sync;
16    /// Output result type.
17    type Output: Serialize + Send;
18
19    /// Get job metadata.
20    fn info() -> JobInfo;
21
22    /// Execute the job.
23    fn execute(
24        ctx: &JobContext,
25        args: Self::Args,
26    ) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
27
28    /// Compensate a cancelled job.
29    fn compensate<'a>(
30        _ctx: &'a JobContext,
31        _args: Self::Args,
32        _reason: &'a str,
33    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
34        Box::pin(async { Ok(()) })
35    }
36}
37
38/// Job metadata.
39#[derive(Debug, Clone)]
40pub struct JobInfo {
41    /// Job name (used for routing).
42    pub name: &'static str,
43    /// Job timeout.
44    pub timeout: Duration,
45    /// Default timeout for outbound HTTP requests made by this job.
46    pub http_timeout: Option<Duration>,
47    /// Default priority.
48    pub priority: JobPriority,
49    /// Retry configuration.
50    pub retry: RetryConfig,
51    /// Required worker capability (e.g., "general", "media", "ml").
52    pub worker_capability: Option<&'static str>,
53    /// Whether to deduplicate by idempotency key.
54    pub idempotent: bool,
55    /// Idempotency key field path.
56    pub idempotency_key: Option<&'static str>,
57    /// Whether the job is public (no auth required).
58    pub is_public: bool,
59    /// Required role for authorization (implies auth required).
60    pub required_role: Option<&'static str>,
61    /// Time-to-live after completion. Job records are cleaned up after this duration.
62    /// None means records are kept indefinitely.
63    pub ttl: Option<Duration>,
64}
65
66impl Default for JobInfo {
67    fn default() -> Self {
68        Self {
69            name: "",
70            timeout: Duration::from_secs(3600), // 1 hour default
71            http_timeout: None,
72            priority: JobPriority::Normal,
73            retry: RetryConfig::default(),
74            worker_capability: None,
75            idempotent: false,
76            idempotency_key: None,
77            is_public: false,
78            required_role: None,
79            ttl: None,
80        }
81    }
82}
83
84/// Job priority levels.
85#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
86pub enum JobPriority {
87    Background = 0,
88    Low = 25,
89    #[default]
90    Normal = 50,
91    High = 75,
92    Critical = 100,
93}
94
95impl JobPriority {
96    /// Get numeric value for database storage.
97    pub fn as_i32(&self) -> i32 {
98        *self as i32
99    }
100
101    /// Parse from numeric value.
102    pub fn from_i32(value: i32) -> Self {
103        match value {
104            0..=12 => Self::Background,
105            13..=37 => Self::Low,
106            38..=62 => Self::Normal,
107            63..=87 => Self::High,
108            _ => Self::Critical,
109        }
110    }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct ParseJobPriorityError(pub String);
115
116impl std::fmt::Display for ParseJobPriorityError {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        write!(f, "invalid job priority: '{}'", self.0)
119    }
120}
121
122impl std::error::Error for ParseJobPriorityError {}
123
124impl FromStr for JobPriority {
125    type Err = ParseJobPriorityError;
126
127    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
128        match s.to_lowercase().as_str() {
129            "background" => Ok(Self::Background),
130            "low" => Ok(Self::Low),
131            "normal" => Ok(Self::Normal),
132            "high" => Ok(Self::High),
133            "critical" => Ok(Self::Critical),
134            _ => Err(ParseJobPriorityError(s.to_string())),
135        }
136    }
137}
138
139/// Job status in the queue.
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141pub enum JobStatus {
142    /// Waiting to be claimed.
143    Pending,
144    /// Claimed by a worker.
145    Claimed,
146    /// Currently executing.
147    Running,
148    /// Successfully completed.
149    Completed,
150    /// Failed, will retry.
151    Retry,
152    /// Failed permanently.
153    Failed,
154    /// Moved to dead letter queue.
155    DeadLetter,
156    /// Cancellation requested for a running job.
157    CancelRequested,
158    /// Job cancelled.
159    Cancelled,
160}
161
162impl JobStatus {
163    /// Convert to database string.
164    pub fn as_str(&self) -> &'static str {
165        match self {
166            Self::Pending => "pending",
167            Self::Claimed => "claimed",
168            Self::Running => "running",
169            Self::Completed => "completed",
170            Self::Retry => "retry",
171            Self::Failed => "failed",
172            Self::DeadLetter => "dead_letter",
173            Self::CancelRequested => "cancel_requested",
174            Self::Cancelled => "cancelled",
175        }
176    }
177}
178
179#[derive(Debug, Clone, PartialEq, Eq)]
180pub struct ParseJobStatusError(pub String);
181
182impl std::fmt::Display for ParseJobStatusError {
183    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184        write!(f, "invalid job status: '{}'", self.0)
185    }
186}
187
188impl std::error::Error for ParseJobStatusError {}
189
190impl FromStr for JobStatus {
191    type Err = ParseJobStatusError;
192
193    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
194        match s {
195            "pending" => Ok(Self::Pending),
196            "claimed" => Ok(Self::Claimed),
197            "running" => Ok(Self::Running),
198            "completed" => Ok(Self::Completed),
199            "retry" => Ok(Self::Retry),
200            "failed" => Ok(Self::Failed),
201            "dead_letter" => Ok(Self::DeadLetter),
202            "cancel_requested" => Ok(Self::CancelRequested),
203            "cancelled" => Ok(Self::Cancelled),
204            _ => Err(ParseJobStatusError(s.to_string())),
205        }
206    }
207}
208
209/// Retry configuration for jobs.
210#[derive(Debug, Clone)]
211pub struct RetryConfig {
212    /// Maximum number of retry attempts.
213    pub max_attempts: u32,
214    /// Backoff strategy.
215    pub backoff: BackoffStrategy,
216    /// Maximum backoff duration.
217    pub max_backoff: Duration,
218    /// Error types to retry on (empty = all errors).
219    pub retry_on: Vec<String>,
220}
221
222impl Default for RetryConfig {
223    fn default() -> Self {
224        Self {
225            max_attempts: 3,
226            backoff: BackoffStrategy::Exponential,
227            max_backoff: Duration::from_secs(300), // 5 minutes
228            retry_on: Vec::new(),                  // Retry on all errors
229        }
230    }
231}
232
233impl RetryConfig {
234    /// Calculate backoff duration for a given attempt.
235    pub fn calculate_backoff(&self, attempt: u32) -> Duration {
236        let base = Duration::from_secs(1);
237        let backoff = match self.backoff {
238            BackoffStrategy::Fixed => base,
239            BackoffStrategy::Linear => base * attempt,
240            BackoffStrategy::Exponential => base * 2u32.pow(attempt.saturating_sub(1)),
241        };
242        backoff.min(self.max_backoff)
243    }
244}
245
246/// Backoff strategy for retries.
247#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
248pub enum BackoffStrategy {
249    /// Same delay each time.
250    Fixed,
251    /// Delay increases linearly.
252    Linear,
253    /// Delay doubles each time.
254    #[default]
255    Exponential,
256}
257
258#[cfg(test)]
259#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
260mod tests {
261    use super::*;
262
263    #[test]
264    fn test_priority_ordering() {
265        assert!(JobPriority::Critical > JobPriority::High);
266        assert!(JobPriority::High > JobPriority::Normal);
267        assert!(JobPriority::Normal > JobPriority::Low);
268        assert!(JobPriority::Low > JobPriority::Background);
269    }
270
271    #[test]
272    fn test_priority_conversion() {
273        assert_eq!(JobPriority::Critical.as_i32(), 100);
274        assert_eq!(JobPriority::Normal.as_i32(), 50);
275        assert_eq!(JobPriority::from_i32(100), JobPriority::Critical);
276        assert_eq!(JobPriority::from_i32(50), JobPriority::Normal);
277    }
278
279    #[test]
280    fn test_status_conversion() {
281        assert_eq!(JobStatus::Pending.as_str(), "pending");
282        assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
283        assert_eq!(JobStatus::DeadLetter.as_str(), "dead_letter");
284        assert_eq!(
285            "dead_letter".parse::<JobStatus>(),
286            Ok(JobStatus::DeadLetter)
287        );
288        assert_eq!(JobStatus::CancelRequested.as_str(), "cancel_requested");
289        assert_eq!(
290            "cancel_requested".parse::<JobStatus>(),
291            Ok(JobStatus::CancelRequested)
292        );
293        assert_eq!(JobStatus::Cancelled.as_str(), "cancelled");
294        assert_eq!("cancelled".parse::<JobStatus>(), Ok(JobStatus::Cancelled));
295    }
296
297    #[test]
298    fn test_exponential_backoff() {
299        let config = RetryConfig::default();
300        assert_eq!(config.calculate_backoff(1), Duration::from_secs(1));
301        assert_eq!(config.calculate_backoff(2), Duration::from_secs(2));
302        assert_eq!(config.calculate_backoff(3), Duration::from_secs(4));
303        assert_eq!(config.calculate_backoff(4), Duration::from_secs(8));
304    }
305
306    #[test]
307    fn test_max_backoff_cap() {
308        let config = RetryConfig {
309            max_backoff: Duration::from_secs(10),
310            ..Default::default()
311        };
312        assert_eq!(config.calculate_backoff(10), Duration::from_secs(10));
313    }
314}