1use std::future::Future;
2use std::pin::Pin;
3use std::str::FromStr;
4use std::time::Duration;
5
6use serde::{Serialize, de::DeserializeOwned};
7
8use crate::error::Result;
9
10use super::context::JobContext;
11
12pub trait ForgeJob: Send + Sync + 'static {
14 type Args: DeserializeOwned + Serialize + Send + Sync;
16 type Output: Serialize + Send;
18
19 fn info() -> JobInfo;
21
22 fn execute(
24 ctx: &JobContext,
25 args: Self::Args,
26 ) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
27
28 fn compensate<'a>(
30 _ctx: &'a JobContext,
31 _args: Self::Args,
32 _reason: &'a str,
33 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
34 Box::pin(async { Ok(()) })
35 }
36}
37
38#[derive(Debug, Clone)]
40pub struct JobInfo {
41 pub name: &'static str,
43 pub timeout: Duration,
45 pub priority: JobPriority,
47 pub retry: RetryConfig,
49 pub worker_capability: Option<&'static str>,
51 pub idempotent: bool,
53 pub idempotency_key: Option<&'static str>,
55 pub is_public: bool,
57 pub required_role: Option<&'static str>,
59 pub ttl: Option<Duration>,
62}
63
64impl Default for JobInfo {
65 fn default() -> Self {
66 Self {
67 name: "",
68 timeout: Duration::from_secs(3600), priority: JobPriority::Normal,
70 retry: RetryConfig::default(),
71 worker_capability: None,
72 idempotent: false,
73 idempotency_key: None,
74 is_public: false,
75 required_role: None,
76 ttl: None,
77 }
78 }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
83pub enum JobPriority {
84 Background = 0,
85 Low = 25,
86 #[default]
87 Normal = 50,
88 High = 75,
89 Critical = 100,
90}
91
92impl JobPriority {
93 pub fn as_i32(&self) -> i32 {
95 *self as i32
96 }
97
98 pub fn from_i32(value: i32) -> Self {
100 match value {
101 0..=12 => Self::Background,
102 13..=37 => Self::Low,
103 38..=62 => Self::Normal,
104 63..=87 => Self::High,
105 _ => Self::Critical,
106 }
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct ParseJobPriorityError(pub String);
112
113impl std::fmt::Display for ParseJobPriorityError {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 write!(f, "invalid job priority: '{}'", self.0)
116 }
117}
118
119impl std::error::Error for ParseJobPriorityError {}
120
121impl FromStr for JobPriority {
122 type Err = ParseJobPriorityError;
123
124 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
125 match s.to_lowercase().as_str() {
126 "background" => Ok(Self::Background),
127 "low" => Ok(Self::Low),
128 "normal" => Ok(Self::Normal),
129 "high" => Ok(Self::High),
130 "critical" => Ok(Self::Critical),
131 _ => Err(ParseJobPriorityError(s.to_string())),
132 }
133 }
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138pub enum JobStatus {
139 Pending,
141 Claimed,
143 Running,
145 Completed,
147 Retry,
149 Failed,
151 DeadLetter,
153 CancelRequested,
155 Cancelled,
157}
158
159impl JobStatus {
160 pub fn as_str(&self) -> &'static str {
162 match self {
163 Self::Pending => "pending",
164 Self::Claimed => "claimed",
165 Self::Running => "running",
166 Self::Completed => "completed",
167 Self::Retry => "retry",
168 Self::Failed => "failed",
169 Self::DeadLetter => "dead_letter",
170 Self::CancelRequested => "cancel_requested",
171 Self::Cancelled => "cancelled",
172 }
173 }
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct ParseJobStatusError(pub String);
178
179impl std::fmt::Display for ParseJobStatusError {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 write!(f, "invalid job status: '{}'", self.0)
182 }
183}
184
185impl std::error::Error for ParseJobStatusError {}
186
187impl FromStr for JobStatus {
188 type Err = ParseJobStatusError;
189
190 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
191 match s {
192 "pending" => Ok(Self::Pending),
193 "claimed" => Ok(Self::Claimed),
194 "running" => Ok(Self::Running),
195 "completed" => Ok(Self::Completed),
196 "retry" => Ok(Self::Retry),
197 "failed" => Ok(Self::Failed),
198 "dead_letter" => Ok(Self::DeadLetter),
199 "cancel_requested" => Ok(Self::CancelRequested),
200 "cancelled" => Ok(Self::Cancelled),
201 _ => Err(ParseJobStatusError(s.to_string())),
202 }
203 }
204}
205
206#[derive(Debug, Clone)]
208pub struct RetryConfig {
209 pub max_attempts: u32,
211 pub backoff: BackoffStrategy,
213 pub max_backoff: Duration,
215 pub retry_on: Vec<String>,
217}
218
219impl Default for RetryConfig {
220 fn default() -> Self {
221 Self {
222 max_attempts: 3,
223 backoff: BackoffStrategy::Exponential,
224 max_backoff: Duration::from_secs(300), retry_on: Vec::new(), }
227 }
228}
229
230impl RetryConfig {
231 pub fn calculate_backoff(&self, attempt: u32) -> Duration {
233 let base = Duration::from_secs(1);
234 let backoff = match self.backoff {
235 BackoffStrategy::Fixed => base,
236 BackoffStrategy::Linear => base * attempt,
237 BackoffStrategy::Exponential => base * 2u32.pow(attempt.saturating_sub(1)),
238 };
239 backoff.min(self.max_backoff)
240 }
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
245pub enum BackoffStrategy {
246 Fixed,
248 Linear,
250 #[default]
252 Exponential,
253}
254
255#[cfg(test)]
256#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
257mod tests {
258 use super::*;
259
260 #[test]
261 fn test_priority_ordering() {
262 assert!(JobPriority::Critical > JobPriority::High);
263 assert!(JobPriority::High > JobPriority::Normal);
264 assert!(JobPriority::Normal > JobPriority::Low);
265 assert!(JobPriority::Low > JobPriority::Background);
266 }
267
268 #[test]
269 fn test_priority_conversion() {
270 assert_eq!(JobPriority::Critical.as_i32(), 100);
271 assert_eq!(JobPriority::Normal.as_i32(), 50);
272 assert_eq!(JobPriority::from_i32(100), JobPriority::Critical);
273 assert_eq!(JobPriority::from_i32(50), JobPriority::Normal);
274 }
275
276 #[test]
277 fn test_status_conversion() {
278 assert_eq!(JobStatus::Pending.as_str(), "pending");
279 assert_eq!("pending".parse::<JobStatus>(), Ok(JobStatus::Pending));
280 assert_eq!(JobStatus::DeadLetter.as_str(), "dead_letter");
281 assert_eq!(
282 "dead_letter".parse::<JobStatus>(),
283 Ok(JobStatus::DeadLetter)
284 );
285 assert_eq!(JobStatus::CancelRequested.as_str(), "cancel_requested");
286 assert_eq!(
287 "cancel_requested".parse::<JobStatus>(),
288 Ok(JobStatus::CancelRequested)
289 );
290 assert_eq!(JobStatus::Cancelled.as_str(), "cancelled");
291 assert_eq!("cancelled".parse::<JobStatus>(), Ok(JobStatus::Cancelled));
292 }
293
294 #[test]
295 fn test_exponential_backoff() {
296 let config = RetryConfig::default();
297 assert_eq!(config.calculate_backoff(1), Duration::from_secs(1));
298 assert_eq!(config.calculate_backoff(2), Duration::from_secs(2));
299 assert_eq!(config.calculate_backoff(3), Duration::from_secs(4));
300 assert_eq!(config.calculate_backoff(4), Duration::from_secs(8));
301 }
302
303 #[test]
304 fn test_max_backoff_cap() {
305 let config = RetryConfig {
306 max_backoff: Duration::from_secs(10),
307 ..Default::default()
308 };
309 assert_eq!(config.calculate_backoff(10), Duration::from_secs(10));
310 }
311}