forge_core/job/
traits.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::time::Duration;
5
6use serde::{Serialize, de::DeserializeOwned};
7
8use crate::error::Result;
9
10use super::context::JobContext;
11
12/// Trait for FORGE job handlers.
13pub trait ForgeJob: Send + Sync + 'static {
14    /// Input arguments type.
15    type Args: DeserializeOwned + Serialize + Send + Sync;
16    /// Output result type.
17    type Output: Serialize + Send;
18
19    /// Get job metadata.
20    fn info() -> JobInfo;
21
22    /// Execute the job.
23    fn execute(
24        ctx: &JobContext,
25        args: Self::Args,
26    ) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
27}
28
29/// Job metadata.
30#[derive(Debug, Clone)]
31pub struct JobInfo {
32    /// Job name (used for routing).
33    pub name: &'static str,
34    /// Job timeout.
35    pub timeout: Duration,
36    /// Default priority.
37    pub priority: JobPriority,
38    /// Retry configuration.
39    pub retry: RetryConfig,
40    /// Required worker capability (e.g., "general", "media", "ml").
41    pub worker_capability: Option<&'static str>,
42    /// Whether to deduplicate by idempotency key.
43    pub idempotent: bool,
44    /// Idempotency key field path.
45    pub idempotency_key: Option<&'static str>,
46}
47
48impl Default for JobInfo {
49    fn default() -> Self {
50        Self {
51            name: "",
52            timeout: Duration::from_secs(3600), // 1 hour default
53            priority: JobPriority::Normal,
54            retry: RetryConfig::default(),
55            worker_capability: None,
56            idempotent: false,
57            idempotency_key: None,
58        }
59    }
60}
61
62/// Job priority levels.
63#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
64pub enum JobPriority {
65    Background = 0,
66    Low = 25,
67    #[default]
68    Normal = 50,
69    High = 75,
70    Critical = 100,
71}
72
73impl JobPriority {
74    /// Get numeric value for database storage.
75    pub fn as_i32(&self) -> i32 {
76        *self as i32
77    }
78
79    /// Parse from numeric value.
80    pub fn from_i32(value: i32) -> Self {
81        match value {
82            0..=12 => Self::Background,
83            13..=37 => Self::Low,
84            38..=62 => Self::Normal,
85            63..=87 => Self::High,
86            _ => Self::Critical,
87        }
88    }
89}
90
91impl FromStr for JobPriority {
92    type Err = std::convert::Infallible;
93
94    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
95        Ok(match s.to_lowercase().as_str() {
96            "background" => Self::Background,
97            "low" => Self::Low,
98            "normal" => Self::Normal,
99            "high" => Self::High,
100            "critical" => Self::Critical,
101            _ => Self::Normal,
102        })
103    }
104}
105
106/// Job status in the queue.
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub enum JobStatus {
109    /// Waiting to be claimed.
110    Pending,
111    /// Claimed by a worker.
112    Claimed,
113    /// Currently executing.
114    Running,
115    /// Successfully completed.
116    Completed,
117    /// Failed, will retry.
118    Retry,
119    /// Failed permanently.
120    Failed,
121    /// Moved to dead letter queue.
122    DeadLetter,
123}
124
125impl JobStatus {
126    /// Convert to database string.
127    pub fn as_str(&self) -> &'static str {
128        match self {
129            Self::Pending => "pending",
130            Self::Claimed => "claimed",
131            Self::Running => "running",
132            Self::Completed => "completed",
133            Self::Retry => "retry",
134            Self::Failed => "failed",
135            Self::DeadLetter => "dead_letter",
136        }
137    }
138}
139
140impl FromStr for JobStatus {
141    type Err = std::convert::Infallible;
142
143    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
144        Ok(match s {
145            "pending" => Self::Pending,
146            "claimed" => Self::Claimed,
147            "running" => Self::Running,
148            "completed" => Self::Completed,
149            "retry" => Self::Retry,
150            "failed" => Self::Failed,
151            "dead_letter" => Self::DeadLetter,
152            _ => Self::Pending,
153        })
154    }
155}
156
157/// Retry configuration for jobs.
158#[derive(Debug, Clone)]
159pub struct RetryConfig {
160    /// Maximum number of retry attempts.
161    pub max_attempts: u32,
162    /// Backoff strategy.
163    pub backoff: BackoffStrategy,
164    /// Maximum backoff duration.
165    pub max_backoff: Duration,
166    /// Error types to retry on (empty = all errors).
167    pub retry_on: Vec<String>,
168}
169
170impl Default for RetryConfig {
171    fn default() -> Self {
172        Self {
173            max_attempts: 3,
174            backoff: BackoffStrategy::Exponential,
175            max_backoff: Duration::from_secs(300), // 5 minutes
176            retry_on: Vec::new(),                  // Retry on all errors
177        }
178    }
179}
180
181impl RetryConfig {
182    /// Calculate backoff duration for a given attempt.
183    pub fn calculate_backoff(&self, attempt: u32) -> Duration {
184        let base = Duration::from_secs(1);
185        let backoff = match self.backoff {
186            BackoffStrategy::Fixed => base,
187            BackoffStrategy::Linear => base * attempt,
188            BackoffStrategy::Exponential => base * 2u32.pow(attempt.saturating_sub(1)),
189        };
190        backoff.min(self.max_backoff)
191    }
192}
193
194/// Backoff strategy for retries.
195#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
196pub enum BackoffStrategy {
197    /// Same delay each time.
198    Fixed,
199    /// Delay increases linearly.
200    Linear,
201    /// Delay doubles each time.
202    #[default]
203    Exponential,
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209
210    #[test]
211    fn test_priority_ordering() {
212        assert!(JobPriority::Critical > JobPriority::High);
213        assert!(JobPriority::High > JobPriority::Normal);
214        assert!(JobPriority::Normal > JobPriority::Low);
215        assert!(JobPriority::Low > JobPriority::Background);
216    }
217
218    #[test]
219    fn test_priority_conversion() {
220        assert_eq!(JobPriority::Critical.as_i32(), 100);
221        assert_eq!(JobPriority::Normal.as_i32(), 50);
222        assert_eq!(JobPriority::from_i32(100), JobPriority::Critical);
223        assert_eq!(JobPriority::from_i32(50), JobPriority::Normal);
224    }
225
226    #[test]
227    fn test_status_conversion() {
228        assert_eq!(JobStatus::Pending.as_str(), "pending");
229        assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
230        assert_eq!(JobStatus::DeadLetter.as_str(), "dead_letter");
231        assert_eq!(
232            "dead_letter".parse::<JobStatus>(),
233            Ok(JobStatus::DeadLetter)
234        );
235    }
236
237    #[test]
238    fn test_exponential_backoff() {
239        let config = RetryConfig::default();
240        assert_eq!(config.calculate_backoff(1), Duration::from_secs(1));
241        assert_eq!(config.calculate_backoff(2), Duration::from_secs(2));
242        assert_eq!(config.calculate_backoff(3), Duration::from_secs(4));
243        assert_eq!(config.calculate_backoff(4), Duration::from_secs(8));
244    }
245
246    #[test]
247    fn test_max_backoff_cap() {
248        let config = RetryConfig {
249            max_backoff: Duration::from_secs(10),
250            ..Default::default()
251        };
252        assert_eq!(config.calculate_backoff(10), Duration::from_secs(10));
253    }
254}