Skip to main content

ferro_queue/
job.rs

1//! Job trait and payload structures.
2
3use crate::Error;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use uuid::Uuid;
8
9/// A job that can be executed by a queue worker.
10///
11/// Jobs contain the logic that should run in the background.
12/// They must be serializable so they can be stored in the queue.
13///
14/// # Example
15///
16/// ```rust
17/// use ferro_queue::{Job, Error, async_trait};
18/// use serde::{Deserialize, Serialize};
19///
20/// #[derive(Debug, Clone, Serialize, Deserialize)]
21/// struct ProcessImage {
22///     image_id: i64,
23///     operations: Vec<String>,
24/// }
25///
26/// #[async_trait]
27/// impl Job for ProcessImage {
28///     async fn handle(&self) -> Result<(), Error> {
29///         println!("Processing image {} with {:?}", self.image_id, self.operations);
30///         Ok(())
31///     }
32///
33///     fn max_retries(&self) -> u32 {
34///         5
35///     }
36///
37///     fn retry_delay(&self, attempt: u32) -> std::time::Duration {
38///         // Exponential backoff
39///         std::time::Duration::from_secs(2u64.pow(attempt))
40///     }
41/// }
42/// ```
43#[async_trait]
44pub trait Job: Send + Sync + 'static {
45    /// Execute the job logic.
46    async fn handle(&self) -> Result<(), Error>;
47
48    /// The name of the job for logging and identification.
49    fn name(&self) -> &'static str {
50        std::any::type_name::<Self>()
51    }
52
53    /// Maximum number of times to retry the job on failure.
54    fn max_retries(&self) -> u32 {
55        3
56    }
57
58    /// Delay before retrying after a failure.
59    fn retry_delay(&self, _attempt: u32) -> std::time::Duration {
60        std::time::Duration::from_secs(5)
61    }
62
63    /// Called when the job fails after all retries are exhausted.
64    async fn failed(&self, error: &Error) {
65        tracing::error!(job = self.name(), error = %error, "Job failed permanently");
66    }
67
68    /// Timeout for job execution.
69    fn timeout(&self) -> std::time::Duration {
70        std::time::Duration::from_secs(60)
71    }
72}
73
74/// Serialized job payload stored in the queue.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct JobPayload {
77    /// Unique job ID.
78    pub id: Uuid,
79    /// Job type name for deserialization.
80    pub job_type: String,
81    /// Serialized job data.
82    pub data: String,
83    /// Queue name.
84    pub queue: String,
85    /// Number of attempts made.
86    pub attempts: u32,
87    /// Maximum retry attempts.
88    pub max_retries: u32,
89    /// When the job was created.
90    pub created_at: DateTime<Utc>,
91    /// When the job should be available for processing.
92    pub available_at: DateTime<Utc>,
93    /// When the job was reserved by a worker (if any).
94    pub reserved_at: Option<DateTime<Utc>>,
95    /// Tenant ID for tenant-scoped job execution.
96    /// None means the job runs in system scope (no tenant context).
97    /// Old payloads without this field deserialize to None.
98    #[serde(default)]
99    pub tenant_id: Option<i64>,
100}
101
102impl JobPayload {
103    /// Create a new job payload.
104    pub fn new<J: Job + Serialize>(job: &J, queue: &str) -> Result<Self, Error> {
105        let data =
106            serde_json::to_string(job).map_err(|e| Error::SerializationFailed(e.to_string()))?;
107
108        Ok(Self {
109            id: Uuid::new_v4(),
110            job_type: job.name().to_string(),
111            data,
112            queue: queue.to_string(),
113            attempts: 0,
114            max_retries: job.max_retries(),
115            created_at: Utc::now(),
116            available_at: Utc::now(),
117            reserved_at: None,
118            tenant_id: None,
119        })
120    }
121
122    /// Create a job payload with a delay.
123    pub fn with_delay<J: Job + Serialize>(
124        job: &J,
125        queue: &str,
126        delay: std::time::Duration,
127    ) -> Result<Self, Error> {
128        let mut payload = Self::new(job, queue)?;
129        payload.available_at = Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
130        Ok(payload)
131    }
132
133    /// Set the tenant ID for this payload.
134    pub fn with_tenant_id(mut self, id: Option<i64>) -> Self {
135        self.tenant_id = id;
136        self
137    }
138
139    /// Check if the job is available for processing.
140    pub fn is_available(&self) -> bool {
141        Utc::now() >= self.available_at
142    }
143
144    /// Check if the job has exceeded max retries.
145    pub fn has_exceeded_retries(&self) -> bool {
146        self.attempts >= self.max_retries
147    }
148
149    /// Increment the attempt counter.
150    pub fn increment_attempts(&mut self) {
151        self.attempts += 1;
152    }
153
154    /// Mark the job as reserved.
155    pub fn reserve(&mut self) {
156        self.reserved_at = Some(Utc::now());
157    }
158
159    /// Serialize the payload to JSON.
160    pub fn to_json(&self) -> Result<String, Error> {
161        serde_json::to_string(self).map_err(|e| Error::SerializationFailed(e.to_string()))
162    }
163
164    /// Deserialize a payload from JSON.
165    pub fn from_json(json: &str) -> Result<Self, Error> {
166        serde_json::from_str(json).map_err(|e| Error::DeserializationFailed(e.to_string()))
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173
174    #[derive(Debug, Clone, Serialize, Deserialize)]
175    struct TestJob {
176        value: i32,
177    }
178
179    #[async_trait]
180    impl Job for TestJob {
181        async fn handle(&self) -> Result<(), Error> {
182            Ok(())
183        }
184    }
185
186    #[test]
187    fn test_job_payload_creation() {
188        let job = TestJob { value: 42 };
189        let payload = JobPayload::new(&job, "default").unwrap();
190
191        assert_eq!(payload.queue, "default");
192        assert_eq!(payload.attempts, 0);
193        assert!(payload.is_available());
194    }
195
196    #[test]
197    fn test_job_payload_with_delay() {
198        let job = TestJob { value: 42 };
199        let payload =
200            JobPayload::with_delay(&job, "default", std::time::Duration::from_secs(60)).unwrap();
201
202        assert!(!payload.is_available());
203    }
204
205    #[test]
206    fn test_job_payload_serialization() {
207        let job = TestJob { value: 42 };
208        let payload = JobPayload::new(&job, "default").unwrap();
209
210        let json = payload.to_json().unwrap();
211        let restored = JobPayload::from_json(&json).unwrap();
212
213        assert_eq!(payload.id, restored.id);
214        assert_eq!(payload.queue, restored.queue);
215    }
216
217    #[test]
218    fn test_tenant_id_none_by_default() {
219        let job = TestJob { value: 42 };
220        let payload = JobPayload::new(&job, "default").unwrap();
221        assert_eq!(payload.tenant_id, None);
222    }
223
224    #[test]
225    fn test_tenant_id_none_serializes_as_null() {
226        let job = TestJob { value: 42 };
227        let payload = JobPayload::new(&job, "default").unwrap();
228        let json = payload.to_json().unwrap();
229        let val: serde_json::Value = serde_json::from_str(&json).unwrap();
230        assert_eq!(val["tenant_id"], serde_json::Value::Null);
231    }
232
233    #[test]
234    fn test_tenant_id_some_round_trips() {
235        let job = TestJob { value: 42 };
236        let payload = JobPayload::new(&job, "default")
237            .unwrap()
238            .with_tenant_id(Some(42));
239        let json = payload.to_json().unwrap();
240        let restored = JobPayload::from_json(&json).unwrap();
241        assert_eq!(restored.tenant_id, Some(42));
242    }
243
244    #[test]
245    fn test_old_payload_without_tenant_id_deserializes_to_none() {
246        // Old JSON payload without "tenant_id" key — must deserialize to None via serde(default)
247        let old_json = r#"{"id":"550e8400-e29b-41d4-a716-446655440000","job_type":"test","data":"{}","queue":"default","attempts":0,"max_retries":3,"created_at":"2024-01-01T00:00:00Z","available_at":"2024-01-01T00:00:00Z","reserved_at":null}"#;
248        let payload = JobPayload::from_json(old_json).unwrap();
249        assert_eq!(payload.tenant_id, None);
250    }
251
252    #[test]
253    fn test_with_tenant_id_builder() {
254        let job = TestJob { value: 42 };
255        let payload = JobPayload::new(&job, "default")
256            .unwrap()
257            .with_tenant_id(Some(99));
258        assert_eq!(payload.tenant_id, Some(99));
259
260        let payload_none = JobPayload::new(&job, "default")
261            .unwrap()
262            .with_tenant_id(None);
263        assert_eq!(payload_none.tenant_id, None);
264    }
265}