Skip to main content

ferro_queue/
job.rs

1//! Job trait and payload structures.
2
3use crate::Error;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use uuid::Uuid;
8
9/// A job that can be executed by a queue worker.
10///
11/// Jobs contain the logic that should run in the background.
12/// They must be serializable so they can be stored in the queue.
13///
14/// # Example
15///
16/// ```rust
17/// use ferro_queue::{Job, Error, async_trait};
18/// use serde::{Deserialize, Serialize};
19///
20/// #[derive(Debug, Clone, Serialize, Deserialize)]
21/// struct ProcessImage {
22///     image_id: i64,
23///     operations: Vec<String>,
24/// }
25///
26/// #[async_trait]
27/// impl Job for ProcessImage {
28///     async fn handle(&self) -> Result<(), Error> {
29///         println!("Processing image {} with {:?}", self.image_id, self.operations);
30///         Ok(())
31///     }
32///
33///     fn max_retries(&self) -> u32 {
34///         5
35///     }
36///
37///     fn retry_delay(&self, attempt: u32) -> std::time::Duration {
38///         // Exponential backoff
39///         std::time::Duration::from_secs(2u64.pow(attempt))
40///     }
41/// }
42/// ```
43#[async_trait]
44pub trait Job: Send + Sync + 'static {
45    /// Execute the job logic.
46    async fn handle(&self) -> Result<(), Error>;
47
48    /// The name of the job for logging and identification.
49    fn name(&self) -> &'static str {
50        std::any::type_name::<Self>()
51    }
52
53    /// Maximum number of times to retry the job on failure.
54    fn max_retries(&self) -> u32 {
55        3
56    }
57
58    /// Delay before retrying after a failure.
59    ///
60    /// Default: full-jitter exponential backoff — `rand(0..=min(cap, base * 2^attempt))`.
61    /// Base 5 s, cap 15 min. Saturating arithmetic prevents overflow on large `attempt` values.
62    fn retry_delay(&self, attempt: u32) -> std::time::Duration {
63        // Full jitter: rand(0 ..= min(cap, base * 2^attempt)). Base 5s, cap 15min.
64        use rand::Rng;
65        let base_secs: u64 = 5;
66        let cap_secs: u64 = 15 * 60;
67        let max_delay = cap_secs.min(base_secs.saturating_mul(2u64.saturating_pow(attempt)));
68        let jitter = rand::thread_rng().gen_range(0..=max_delay);
69        std::time::Duration::from_secs(jitter)
70    }
71
72    /// Called when the job fails after all retries are exhausted.
73    async fn failed(&self, error: &Error) {
74        tracing::error!(job = self.name(), error = %error, "Job failed permanently");
75    }
76
77    /// Timeout for job execution.
78    fn timeout(&self) -> std::time::Duration {
79        std::time::Duration::from_secs(60)
80    }
81
82    /// Idempotency key for deduplication on enqueue.
83    ///
84    /// When `Some`, enqueue skips insertion if a pending or claimed row with
85    /// the same `(job_type, idempotency_key)` already exists (D-15).
86    fn idempotency_key(&self) -> Option<String> {
87        None
88    }
89}
90
91/// Serialized job payload stored in the queue.
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct JobPayload {
94    /// Unique job ID.
95    pub id: Uuid,
96    /// Job type name for deserialization.
97    pub job_type: String,
98    /// Serialized job data.
99    pub data: String,
100    /// Queue name.
101    pub queue: String,
102    /// Number of attempts made.
103    pub attempts: u32,
104    /// Maximum retry attempts.
105    pub max_retries: u32,
106    /// When the job was created.
107    pub created_at: DateTime<Utc>,
108    /// When the job should be available for processing.
109    pub available_at: DateTime<Utc>,
110    /// When the job was reserved by a worker (if any).
111    pub reserved_at: Option<DateTime<Utc>>,
112    /// Tenant ID for tenant-scoped job execution.
113    /// None means the job runs in system scope (no tenant context).
114    /// Old payloads without this field deserialize to None.
115    #[serde(default)]
116    pub tenant_id: Option<i64>,
117}
118
119impl JobPayload {
120    /// Create a new job payload.
121    pub fn new<J: Job + Serialize>(job: &J, queue: &str) -> Result<Self, Error> {
122        let data =
123            serde_json::to_string(job).map_err(|e| Error::SerializationFailed(e.to_string()))?;
124
125        Ok(Self {
126            id: Uuid::new_v4(),
127            job_type: job.name().to_string(),
128            data,
129            queue: queue.to_string(),
130            attempts: 0,
131            max_retries: job.max_retries(),
132            created_at: Utc::now(),
133            available_at: Utc::now(),
134            reserved_at: None,
135            tenant_id: None,
136        })
137    }
138
139    /// Create a job payload with a delay.
140    pub fn with_delay<J: Job + Serialize>(
141        job: &J,
142        queue: &str,
143        delay: std::time::Duration,
144    ) -> Result<Self, Error> {
145        let mut payload = Self::new(job, queue)?;
146        payload.available_at = Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
147        Ok(payload)
148    }
149
150    /// Set the tenant ID for this payload.
151    pub fn with_tenant_id(mut self, id: Option<i64>) -> Self {
152        self.tenant_id = id;
153        self
154    }
155
156    /// Check if the job is available for processing.
157    pub fn is_available(&self) -> bool {
158        Utc::now() >= self.available_at
159    }
160
161    /// Check if the job has exceeded max retries.
162    pub fn has_exceeded_retries(&self) -> bool {
163        self.attempts >= self.max_retries
164    }
165
166    /// Increment the attempt counter.
167    pub fn increment_attempts(&mut self) {
168        self.attempts += 1;
169    }
170
171    /// Mark the job as reserved.
172    pub fn reserve(&mut self) {
173        self.reserved_at = Some(Utc::now());
174    }
175
176    /// Serialize the payload to JSON.
177    pub fn to_json(&self) -> Result<String, Error> {
178        serde_json::to_string(self).map_err(|e| Error::SerializationFailed(e.to_string()))
179    }
180
181    /// Deserialize a payload from JSON.
182    pub fn from_json(json: &str) -> Result<Self, Error> {
183        serde_json::from_str(json).map_err(|e| Error::DeserializationFailed(e.to_string()))
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190
191    #[derive(Debug, Clone, Serialize, Deserialize)]
192    struct TestJob {
193        value: i32,
194    }
195
196    #[async_trait]
197    impl Job for TestJob {
198        async fn handle(&self) -> Result<(), Error> {
199            Ok(())
200        }
201    }
202
203    #[test]
204    fn test_job_payload_creation() {
205        let job = TestJob { value: 42 };
206        let payload = JobPayload::new(&job, "default").unwrap();
207
208        assert_eq!(payload.queue, "default");
209        assert_eq!(payload.attempts, 0);
210        assert!(payload.is_available());
211    }
212
213    #[test]
214    fn test_job_payload_with_delay() {
215        let job = TestJob { value: 42 };
216        let payload =
217            JobPayload::with_delay(&job, "default", std::time::Duration::from_secs(60)).unwrap();
218
219        assert!(!payload.is_available());
220    }
221
222    #[test]
223    fn test_job_payload_serialization() {
224        let job = TestJob { value: 42 };
225        let payload = JobPayload::new(&job, "default").unwrap();
226
227        let json = payload.to_json().unwrap();
228        let restored = JobPayload::from_json(&json).unwrap();
229
230        assert_eq!(payload.id, restored.id);
231        assert_eq!(payload.queue, restored.queue);
232    }
233
234    #[test]
235    fn test_tenant_id_none_by_default() {
236        let job = TestJob { value: 42 };
237        let payload = JobPayload::new(&job, "default").unwrap();
238        assert_eq!(payload.tenant_id, None);
239    }
240
241    #[test]
242    fn test_tenant_id_none_serializes_as_null() {
243        let job = TestJob { value: 42 };
244        let payload = JobPayload::new(&job, "default").unwrap();
245        let json = payload.to_json().unwrap();
246        let val: serde_json::Value = serde_json::from_str(&json).unwrap();
247        assert_eq!(val["tenant_id"], serde_json::Value::Null);
248    }
249
250    #[test]
251    fn test_tenant_id_some_round_trips() {
252        let job = TestJob { value: 42 };
253        let payload = JobPayload::new(&job, "default")
254            .unwrap()
255            .with_tenant_id(Some(42));
256        let json = payload.to_json().unwrap();
257        let restored = JobPayload::from_json(&json).unwrap();
258        assert_eq!(restored.tenant_id, Some(42));
259    }
260
261    #[test]
262    fn test_old_payload_without_tenant_id_deserializes_to_none() {
263        // Old JSON payload without "tenant_id" key — must deserialize to None via serde(default)
264        let old_json = r#"{"id":"550e8400-e29b-41d4-a716-446655440000","job_type":"test","data":"{}","queue":"default","attempts":0,"max_retries":3,"created_at":"2024-01-01T00:00:00Z","available_at":"2024-01-01T00:00:00Z","reserved_at":null}"#;
265        let payload = JobPayload::from_json(old_json).unwrap();
266        assert_eq!(payload.tenant_id, None);
267    }
268
269    #[test]
270    fn test_with_tenant_id_builder() {
271        let job = TestJob { value: 42 };
272        let payload = JobPayload::new(&job, "default")
273            .unwrap()
274            .with_tenant_id(Some(99));
275        assert_eq!(payload.tenant_id, Some(99));
276
277        let payload_none = JobPayload::new(&job, "default")
278            .unwrap()
279            .with_tenant_id(None);
280        assert_eq!(payload_none.tenant_id, None);
281    }
282
283    #[test]
284    fn backoff_delay_range() {
285        let job = TestJob { value: 0 };
286        for _ in 0..100 {
287            assert!(job.retry_delay(0).as_secs() <= 5);
288            assert!(job.retry_delay(3).as_secs() <= 40);
289            assert!(job.retry_delay(30).as_secs() <= 900);
290        }
291    }
292
293    #[test]
294    fn idempotency_key_defaults_to_none() {
295        let job = TestJob { value: 0 };
296        assert_eq!(job.idempotency_key(), None);
297    }
298}