Skip to main content

forge_core/job/
traits.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::time::Duration;
5
6use serde::{Serialize, de::DeserializeOwned};
7
8use crate::error::Result;
9
10use super::context::JobContext;
11
12/// Trait for FORGE job handlers.
13pub trait ForgeJob: Send + Sync + 'static {
14    /// Input arguments type.
15    type Args: DeserializeOwned + Serialize + Send + Sync;
16    /// Output result type.
17    type Output: Serialize + Send;
18
19    /// Get job metadata.
20    fn info() -> JobInfo;
21
22    /// Execute the job.
23    fn execute(
24        ctx: &JobContext,
25        args: Self::Args,
26    ) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
27}
28
29/// Job metadata.
30#[derive(Debug, Clone)]
31pub struct JobInfo {
32    /// Job name (used for routing).
33    pub name: &'static str,
34    /// Job timeout.
35    pub timeout: Duration,
36    /// Default priority.
37    pub priority: JobPriority,
38    /// Retry configuration.
39    pub retry: RetryConfig,
40    /// Required worker capability (e.g., "general", "media", "ml").
41    pub worker_capability: Option<&'static str>,
42    /// Whether to deduplicate by idempotency key.
43    pub idempotent: bool,
44    /// Idempotency key field path.
45    pub idempotency_key: Option<&'static str>,
46    /// Whether the job is public (no auth required).
47    pub is_public: bool,
48    /// Required role for authorization (implies auth required).
49    pub required_role: Option<&'static str>,
50}
51
52impl Default for JobInfo {
53    fn default() -> Self {
54        Self {
55            name: "",
56            timeout: Duration::from_secs(3600), // 1 hour default
57            priority: JobPriority::Normal,
58            retry: RetryConfig::default(),
59            worker_capability: None,
60            idempotent: false,
61            idempotency_key: None,
62            is_public: false,
63            required_role: None,
64        }
65    }
66}
67
68/// Job priority levels.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
70pub enum JobPriority {
71    Background = 0,
72    Low = 25,
73    #[default]
74    Normal = 50,
75    High = 75,
76    Critical = 100,
77}
78
79impl JobPriority {
80    /// Get numeric value for database storage.
81    pub fn as_i32(&self) -> i32 {
82        *self as i32
83    }
84
85    /// Parse from numeric value.
86    pub fn from_i32(value: i32) -> Self {
87        match value {
88            0..=12 => Self::Background,
89            13..=37 => Self::Low,
90            38..=62 => Self::Normal,
91            63..=87 => Self::High,
92            _ => Self::Critical,
93        }
94    }
95}
96
97impl FromStr for JobPriority {
98    type Err = std::convert::Infallible;
99
100    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
101        Ok(match s.to_lowercase().as_str() {
102            "background" => Self::Background,
103            "low" => Self::Low,
104            "normal" => Self::Normal,
105            "high" => Self::High,
106            "critical" => Self::Critical,
107            _ => Self::Normal,
108        })
109    }
110}
111
112/// Job status in the queue.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum JobStatus {
115    /// Waiting to be claimed.
116    Pending,
117    /// Claimed by a worker.
118    Claimed,
119    /// Currently executing.
120    Running,
121    /// Successfully completed.
122    Completed,
123    /// Failed, will retry.
124    Retry,
125    /// Failed permanently.
126    Failed,
127    /// Moved to dead letter queue.
128    DeadLetter,
129}
130
131impl JobStatus {
132    /// Convert to database string.
133    pub fn as_str(&self) -> &'static str {
134        match self {
135            Self::Pending => "pending",
136            Self::Claimed => "claimed",
137            Self::Running => "running",
138            Self::Completed => "completed",
139            Self::Retry => "retry",
140            Self::Failed => "failed",
141            Self::DeadLetter => "dead_letter",
142        }
143    }
144}
145
146impl FromStr for JobStatus {
147    type Err = std::convert::Infallible;
148
149    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
150        Ok(match s {
151            "pending" => Self::Pending,
152            "claimed" => Self::Claimed,
153            "running" => Self::Running,
154            "completed" => Self::Completed,
155            "retry" => Self::Retry,
156            "failed" => Self::Failed,
157            "dead_letter" => Self::DeadLetter,
158            _ => Self::Pending,
159        })
160    }
161}
162
163/// Retry configuration for jobs.
164#[derive(Debug, Clone)]
165pub struct RetryConfig {
166    /// Maximum number of retry attempts.
167    pub max_attempts: u32,
168    /// Backoff strategy.
169    pub backoff: BackoffStrategy,
170    /// Maximum backoff duration.
171    pub max_backoff: Duration,
172    /// Error types to retry on (empty = all errors).
173    pub retry_on: Vec<String>,
174}
175
176impl Default for RetryConfig {
177    fn default() -> Self {
178        Self {
179            max_attempts: 3,
180            backoff: BackoffStrategy::Exponential,
181            max_backoff: Duration::from_secs(300), // 5 minutes
182            retry_on: Vec::new(),                  // Retry on all errors
183        }
184    }
185}
186
187impl RetryConfig {
188    /// Calculate backoff duration for a given attempt.
189    pub fn calculate_backoff(&self, attempt: u32) -> Duration {
190        let base = Duration::from_secs(1);
191        let backoff = match self.backoff {
192            BackoffStrategy::Fixed => base,
193            BackoffStrategy::Linear => base * attempt,
194            BackoffStrategy::Exponential => base * 2u32.pow(attempt.saturating_sub(1)),
195        };
196        backoff.min(self.max_backoff)
197    }
198}
199
200/// Backoff strategy for retries.
201#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
202pub enum BackoffStrategy {
203    /// Same delay each time.
204    Fixed,
205    /// Delay increases linearly.
206    Linear,
207    /// Delay doubles each time.
208    #[default]
209    Exponential,
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_priority_ordering() {
218        assert!(JobPriority::Critical > JobPriority::High);
219        assert!(JobPriority::High > JobPriority::Normal);
220        assert!(JobPriority::Normal > JobPriority::Low);
221        assert!(JobPriority::Low > JobPriority::Background);
222    }
223
224    #[test]
225    fn test_priority_conversion() {
226        assert_eq!(JobPriority::Critical.as_i32(), 100);
227        assert_eq!(JobPriority::Normal.as_i32(), 50);
228        assert_eq!(JobPriority::from_i32(100), JobPriority::Critical);
229        assert_eq!(JobPriority::from_i32(50), JobPriority::Normal);
230    }
231
232    #[test]
233    fn test_status_conversion() {
234        assert_eq!(JobStatus::Pending.as_str(), "pending");
235        assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
236        assert_eq!(JobStatus::DeadLetter.as_str(), "dead_letter");
237        assert_eq!(
238            "dead_letter".parse::<JobStatus>(),
239            Ok(JobStatus::DeadLetter)
240        );
241    }
242
243    #[test]
244    fn test_exponential_backoff() {
245        let config = RetryConfig::default();
246        assert_eq!(config.calculate_backoff(1), Duration::from_secs(1));
247        assert_eq!(config.calculate_backoff(2), Duration::from_secs(2));
248        assert_eq!(config.calculate_backoff(3), Duration::from_secs(4));
249        assert_eq!(config.calculate_backoff(4), Duration::from_secs(8));
250    }
251
252    #[test]
253    fn test_max_backoff_cap() {
254        let config = RetryConfig {
255            max_backoff: Duration::from_secs(10),
256            ..Default::default()
257        };
258        assert_eq!(config.calculate_backoff(10), Duration::from_secs(10));
259    }
260}