ferro_queue/
job.rs

1//! Job trait and payload structures.
2
3use crate::Error;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use uuid::Uuid;
8
9/// A job that can be executed by a queue worker.
10///
11/// Jobs contain the logic that should run in the background.
12/// They must be serializable so they can be stored in the queue.
13///
14/// # Example
15///
16/// ```rust
17/// use cancer_queue::{Job, Error, async_trait};
18/// use serde::{Deserialize, Serialize};
19///
20/// #[derive(Debug, Clone, Serialize, Deserialize)]
21/// struct ProcessImage {
22///     image_id: i64,
23///     operations: Vec<String>,
24/// }
25///
26/// #[async_trait]
27/// impl Job for ProcessImage {
28///     async fn handle(&self) -> Result<(), Error> {
29///         println!("Processing image {} with {:?}", self.image_id, self.operations);
30///         Ok(())
31///     }
32///
33///     fn max_retries(&self) -> u32 {
34///         5
35///     }
36///
37///     fn retry_delay(&self, attempt: u32) -> std::time::Duration {
38///         // Exponential backoff
39///         std::time::Duration::from_secs(2u64.pow(attempt))
40///     }
41/// }
42/// ```
43#[async_trait]
44pub trait Job: Send + Sync + 'static {
45    /// Execute the job logic.
46    async fn handle(&self) -> Result<(), Error>;
47
48    /// The name of the job for logging and identification.
49    fn name(&self) -> &'static str {
50        std::any::type_name::<Self>()
51    }
52
53    /// Maximum number of times to retry the job on failure.
54    fn max_retries(&self) -> u32 {
55        3
56    }
57
58    /// Delay before retrying after a failure.
59    fn retry_delay(&self, _attempt: u32) -> std::time::Duration {
60        std::time::Duration::from_secs(5)
61    }
62
63    /// Called when the job fails after all retries are exhausted.
64    async fn failed(&self, error: &Error) {
65        tracing::error!(job = self.name(), error = %error, "Job failed permanently");
66    }
67
68    /// Timeout for job execution.
69    fn timeout(&self) -> std::time::Duration {
70        std::time::Duration::from_secs(60)
71    }
72}
73
74/// Serialized job payload stored in the queue.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct JobPayload {
77    /// Unique job ID.
78    pub id: Uuid,
79    /// Job type name for deserialization.
80    pub job_type: String,
81    /// Serialized job data.
82    pub data: String,
83    /// Queue name.
84    pub queue: String,
85    /// Number of attempts made.
86    pub attempts: u32,
87    /// Maximum retry attempts.
88    pub max_retries: u32,
89    /// When the job was created.
90    pub created_at: DateTime<Utc>,
91    /// When the job should be available for processing.
92    pub available_at: DateTime<Utc>,
93    /// When the job was reserved by a worker (if any).
94    pub reserved_at: Option<DateTime<Utc>>,
95}
96
97impl JobPayload {
98    /// Create a new job payload.
99    pub fn new<J: Job + Serialize>(job: &J, queue: &str) -> Result<Self, Error> {
100        let data =
101            serde_json::to_string(job).map_err(|e| Error::SerializationFailed(e.to_string()))?;
102
103        Ok(Self {
104            id: Uuid::new_v4(),
105            job_type: job.name().to_string(),
106            data,
107            queue: queue.to_string(),
108            attempts: 0,
109            max_retries: job.max_retries(),
110            created_at: Utc::now(),
111            available_at: Utc::now(),
112            reserved_at: None,
113        })
114    }
115
116    /// Create a job payload with a delay.
117    pub fn with_delay<J: Job + Serialize>(
118        job: &J,
119        queue: &str,
120        delay: std::time::Duration,
121    ) -> Result<Self, Error> {
122        let mut payload = Self::new(job, queue)?;
123        payload.available_at = Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
124        Ok(payload)
125    }
126
127    /// Check if the job is available for processing.
128    pub fn is_available(&self) -> bool {
129        Utc::now() >= self.available_at
130    }
131
132    /// Check if the job has exceeded max retries.
133    pub fn has_exceeded_retries(&self) -> bool {
134        self.attempts >= self.max_retries
135    }
136
137    /// Increment the attempt counter.
138    pub fn increment_attempts(&mut self) {
139        self.attempts += 1;
140    }
141
142    /// Mark the job as reserved.
143    pub fn reserve(&mut self) {
144        self.reserved_at = Some(Utc::now());
145    }
146
147    /// Serialize the payload to JSON.
148    pub fn to_json(&self) -> Result<String, Error> {
149        serde_json::to_string(self).map_err(|e| Error::SerializationFailed(e.to_string()))
150    }
151
152    /// Deserialize a payload from JSON.
153    pub fn from_json(json: &str) -> Result<Self, Error> {
154        serde_json::from_str(json).map_err(|e| Error::DeserializationFailed(e.to_string()))
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[derive(Debug, Clone, Serialize, Deserialize)]
163    struct TestJob {
164        value: i32,
165    }
166
167    #[async_trait]
168    impl Job for TestJob {
169        async fn handle(&self) -> Result<(), Error> {
170            Ok(())
171        }
172    }
173
174    #[test]
175    fn test_job_payload_creation() {
176        let job = TestJob { value: 42 };
177        let payload = JobPayload::new(&job, "default").unwrap();
178
179        assert_eq!(payload.queue, "default");
180        assert_eq!(payload.attempts, 0);
181        assert!(payload.is_available());
182    }
183
184    #[test]
185    fn test_job_payload_with_delay() {
186        let job = TestJob { value: 42 };
187        let payload =
188            JobPayload::with_delay(&job, "default", std::time::Duration::from_secs(60)).unwrap();
189
190        assert!(!payload.is_available());
191    }
192
193    #[test]
194    fn test_job_payload_serialization() {
195        let job = TestJob { value: 42 };
196        let payload = JobPayload::new(&job, "default").unwrap();
197
198        let json = payload.to_json().unwrap();
199        let restored = JobPayload::from_json(&json).unwrap();
200
201        assert_eq!(payload.id, restored.id);
202        assert_eq!(payload.queue, restored.queue);
203    }
204}