rustapi_jobs/
queue.rs

1use crate::backend::{JobBackend, JobRequest};
2use crate::error::Result;
3use crate::job::{Job, JobContext, JobHandler};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9/// Main job queue manager
10#[derive(Clone)]
11pub struct JobQueue {
12    backend: Arc<dyn JobBackend>,
13    handlers: Arc<RwLock<HashMap<String, Box<dyn JobHandler>>>>,
14}
15
16impl JobQueue {
17    /// Create a new job queue with a backend
18    pub fn new<B: JobBackend + 'static>(backend: B) -> Self {
19        Self {
20            backend: Arc::new(backend),
21            handlers: Arc::new(RwLock::new(HashMap::new())),
22        }
23    }
24
25    /// Register a job handler
26    pub async fn register_job<J: Job + Clone>(&self, job: J) {
27        let mut handlers = self.handlers.write().await;
28        handlers.insert(J::NAME.to_string(), Box::new(job));
29    }
30
31    /// Enqueue a job
32    pub async fn enqueue<J: Job>(&self, data: J::Data) -> Result<String> {
33        self.enqueue_opts::<J>(data, EnqueueOptions::default())
34            .await
35    }
36
37    /// Enqueue a job with options
38    pub async fn enqueue_opts<J: Job>(
39        &self,
40        data: J::Data,
41        opts: EnqueueOptions,
42    ) -> Result<String> {
43        let payload = serde_json::to_value(data)?;
44        let id = Uuid::new_v4().to_string();
45
46        let request = JobRequest {
47            id: id.clone(),
48            name: J::NAME.to_string(),
49            payload,
50            created_at: chrono::Utc::now(),
51            attempts: 0,
52            max_attempts: opts.max_attempts,
53            last_error: None,
54            run_at: opts.run_at,
55        };
56
57        self.backend.push(request).await?;
58        Ok(id)
59    }
60
61    /// Process a single job (for testing or manual control)
62    pub async fn process_one(&self) -> Result<bool> {
63        if let Some(req) = self.backend.pop().await? {
64            let handlers = self.handlers.read().await;
65            if let Some(handler) = handlers.get(&req.name) {
66                let ctx = JobContext {
67                    job_id: req.id.clone(),
68                    attempt: req.attempts + 1,
69                    created_at: req.created_at,
70                };
71
72                match handler.handle(ctx, req.payload.clone()).await {
73                    Ok(_) => {
74                        self.backend.complete(&req.id).await?;
75                        Ok(true)
76                    }
77                    Err(e) => {
78                        let mut new_req = req.clone();
79                        new_req.attempts += 1;
80                        new_req.last_error = Some(e.to_string());
81
82                        if new_req.attempts < new_req.max_attempts {
83                            // Exponential backoff: 2^attempts seconds (e.g. 2, 4, 8, 16...)
84                            // Limit max backoff to some reasonable value (e.g. 24 hours)?
85                            // For now basic exponential.
86                            let backoff_secs = 2u64.saturating_pow(new_req.attempts).min(86400);
87                            let retry_delay = chrono::Duration::seconds(backoff_secs as i64);
88                            new_req.run_at = Some(chrono::Utc::now() + retry_delay);
89
90                            // Re-push the job for retry
91                            self.backend.push(new_req).await?;
92                        } else {
93                            // Job failed permanently
94                            self.backend.fail(&req.id, &e.to_string()).await?;
95
96                            // TODO: If we implemented a real DLQ, we would push it there now.
97                            // Currently fail() is where backend would handle that.
98                        }
99                        Ok(true)
100                    }
101                }
102            } else {
103                // Handler not found
104                // For now, treat as permanent failure
105                self.backend
106                    .fail(&req.id, &format!("No handler for job: {}", req.name))
107                    .await?;
108                Ok(true)
109            }
110        } else {
111            Ok(false)
112        }
113    }
114
115    /// Start a worker loop
116    pub async fn start_worker(&self) -> Result<()> {
117        loop {
118            match self.process_one().await {
119                Ok(processed) => {
120                    if !processed {
121                        // Empty queue, sleep a bit
122                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
123                    }
124                }
125                Err(e) => {
126                    tracing::error!("Worker error: {}", e);
127                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
128                }
129            }
130        }
131    }
132}
133
134/// Options for enqueueing a job
135#[derive(Debug, Clone, Default)]
136pub struct EnqueueOptions {
137    pub max_attempts: u32,
138    pub run_at: Option<chrono::DateTime<chrono::Utc>>,
139}
140
141impl EnqueueOptions {
142    pub fn new() -> Self {
143        Self::default()
144    }
145
146    pub fn max_attempts(mut self, n: u32) -> Self {
147        self.max_attempts = n;
148        self
149    }
150
151    pub fn delay(mut self, duration: std::time::Duration) -> Self {
152        self.run_at = Some(chrono::Utc::now() + chrono::Duration::from_std(duration).unwrap());
153        self
154    }
155}
156
157#[cfg(test)]
158mod property_tests {
159    use super::*;
160    use crate::backend::memory::InMemoryBackend as MemoryBackend;
161    use crate::JobError;
162    use async_trait::async_trait;
163    use proptest::prelude::*;
164    use serde::{Deserialize, Serialize};
165    use std::sync::Arc;
166    use tokio::sync::RwLock;
167
168    /// **Feature: v1-features-roadmap, Properties 21-22: Job persistence and retry**
169    /// **Validates: Requirements 10.1, 10.2**
170    ///
171    /// For background job processing:
172    /// - Property 21: Jobs SHALL persist across backend operations
173    /// - Property 22: Retry SHALL use exponential backoff (2^attempts seconds)
174    /// - Failed jobs SHALL be retried up to max_attempts
175    /// - Successful jobs SHALL be marked complete and removed
176    /// - Jobs with run_at in future SHALL not be processed immediately
177
178    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
179    struct TestJobData {
180        value: i32,
181    }
182
183    #[derive(Clone)]
184    struct TestJob {
185        should_fail: Arc<RwLock<bool>>,
186        execution_count: Arc<RwLock<u32>>,
187    }
188
189    #[async_trait]
190    impl Job for TestJob {
191        const NAME: &'static str = "test_job";
192        type Data = TestJobData;
193
194        async fn execute(&self, _ctx: JobContext, data: Self::Data) -> Result<()> {
195            let mut count = self.execution_count.write().await;
196            *count += 1;
197
198            let should_fail = *self.should_fail.read().await;
199            if should_fail {
200                return Err(JobError::WorkerError(format!(
201                    "Test failure for value {}",
202                    data.value
203                )));
204            }
205            Ok(())
206        }
207    }
208
209    proptest! {
210        #![proptest_config(ProptestConfig::with_cases(50))]
211
212        /// Property 21: Jobs persist through push/pop cycle
213        #[test]
214        fn prop_job_persistence(value in -1000i32..1000i32) {
215            tokio::runtime::Runtime::new().unwrap().block_on(async {
216                let backend = MemoryBackend::new();
217                let queue = JobQueue::new(backend);
218
219                let test_job = TestJob {
220                    should_fail: Arc::new(RwLock::new(false)),
221                    execution_count: Arc::new(RwLock::new(0)),
222                };
223                queue.register_job(test_job.clone()).await;
224
225                // Enqueue job
226                let job_id = queue
227                    .enqueue::<TestJob>(TestJobData { value })
228                    .await
229                    .unwrap();
230
231                prop_assert!(!job_id.is_empty());
232
233                // Job MUST be retrievable
234                let processed = queue.process_one().await.unwrap();
235                prop_assert!(processed);
236
237                // Job MUST have been executed
238                let count = *test_job.execution_count.read().await;
239                prop_assert_eq!(count, 1);
240
241                Ok(())
242            })?;
243        }
244
245        /// Property 22: Exponential backoff is calculated correctly
246        #[test]
247        fn prop_exponential_backoff_calculation(attempts in 0u32..10) {
248            let expected_backoff = 2u64.saturating_pow(attempts).min(86400);
249
250            // This is the formula used in process_one for retries
251            let calculated_backoff = 2u64.saturating_pow(attempts).min(86400);
252
253            prop_assert_eq!(calculated_backoff, expected_backoff);
254
255            // Verify exponential growth
256            if attempts > 0 && expected_backoff < 86400 {
257                let previous = 2u64.saturating_pow(attempts - 1);
258                prop_assert_eq!(expected_backoff, previous * 2);
259            }
260        }
261
262        /// Property 22: Failed jobs are retried with exponential backoff
263        #[test]
264        #[ignore] // TODO: Requires time mocking
265        fn prop_retry_behavior(value in -1000i32..1000i32, max_attempts in 2u32..5) {
266            tokio::runtime::Runtime::new().unwrap().block_on(async {
267                let backend = MemoryBackend::new();
268                let queue = JobQueue::new(backend);
269
270                let test_job = TestJob {
271                    should_fail: Arc::new(RwLock::new(true)), // Always fail
272                    execution_count: Arc::new(RwLock::new(0)),
273                };
274                queue.register_job(test_job.clone()).await;
275
276                // Enqueue with max attempts
277                let opts = EnqueueOptions::new().max_attempts(max_attempts);
278                let _job_id = queue
279                    .enqueue_opts::<TestJob>(TestJobData { value }, opts)
280                    .await
281                    .unwrap();
282
283                // Process job multiple times (it will fail and retry)
284                for attempt in 1..=max_attempts {
285                    let processed = queue.process_one().await.unwrap();
286                    prop_assert!(processed);
287
288                    let count = *test_job.execution_count.read().await;
289                    prop_assert_eq!(count, attempt);
290                }
291
292                // After max_attempts, job should be failed permanently
293                // No more jobs to process
294                let processed = queue.process_one().await.unwrap();
295                prop_assert!(!processed); // Queue should be empty
296
297                Ok(())
298            })?;
299        }
300
301        /// Property 21: Multiple jobs persist independently
302        #[test]
303        fn prop_multiple_jobs_persist(
304            values in prop::collection::vec(-100i32..100, 1..10)
305        ) {
306            tokio::runtime::Runtime::new().unwrap().block_on(async {
307                let backend = MemoryBackend::new();
308                let queue = JobQueue::new(backend);
309
310                let test_job = TestJob {
311                    should_fail: Arc::new(RwLock::new(false)),
312                    execution_count: Arc::new(RwLock::new(0)),
313                };
314                queue.register_job(test_job.clone()).await;
315
316                // Enqueue all jobs
317                let job_count = values.len();
318                for value in values {
319                    queue.enqueue::<TestJob>(TestJobData { value }).await.unwrap();
320                }
321
322                // Process all jobs
323                for _ in 0..job_count {
324                    let processed = queue.process_one().await.unwrap();
325                    prop_assert!(processed);
326                }
327
328                // All jobs MUST have been executed
329                let count = *test_job.execution_count.read().await;
330                prop_assert_eq!(count as usize, job_count);
331
332                // Queue MUST be empty
333                let processed = queue.process_one().await.unwrap();
334                prop_assert!(!processed);
335
336                Ok(())
337            })?;
338        }
339
340        /// Property 22: Jobs with run_at in future are not processed
341        #[test]
342        fn prop_delayed_jobs_not_immediate(value in -100i32..100) {
343            tokio::runtime::Runtime::new().unwrap().block_on(async {
344                let backend = MemoryBackend::new();
345                let queue = JobQueue::new(backend);
346
347                let test_job = TestJob {
348                    should_fail: Arc::new(RwLock::new(false)),
349                    execution_count: Arc::new(RwLock::new(0)),
350                };
351                queue.register_job(test_job.clone()).await;
352
353                // Enqueue with delay
354                let opts = EnqueueOptions::new()
355                    .delay(std::time::Duration::from_secs(3600)); // 1 hour delay
356                queue
357                    .enqueue_opts::<TestJob>(TestJobData { value }, opts)
358                    .await
359                    .unwrap();
360
361                // Try to process immediately - should not process delayed job
362                let processed = queue.process_one().await.unwrap();
363                prop_assert!(!processed); // Job should not be processed yet
364
365                // Execution count should still be 0
366                let count = *test_job.execution_count.read().await;
367                prop_assert_eq!(count, 0);
368
369                Ok(())
370            })?;
371        }
372
373        /// Property 22: Successful job is completed and removed
374        #[test]
375        fn prop_successful_job_completed(value in -100i32..100) {
376            tokio::runtime::Runtime::new().unwrap().block_on(async {
377                let backend = MemoryBackend::new();
378                let queue = JobQueue::new(backend);
379
380                let test_job = TestJob {
381                    should_fail: Arc::new(RwLock::new(false)),
382                    execution_count: Arc::new(RwLock::new(0)),
383                };
384                queue.register_job(test_job.clone()).await;
385
386                queue.enqueue::<TestJob>(TestJobData { value }).await.unwrap();
387
388                // Process once - should succeed
389                let processed = queue.process_one().await.unwrap();
390                prop_assert!(processed);
391
392                // Job MUST be executed exactly once
393                let count = *test_job.execution_count.read().await;
394                prop_assert_eq!(count, 1);
395
396                // Job MUST be removed from queue (completed)
397                let processed_again = queue.process_one().await.unwrap();
398                prop_assert!(!processed_again); // Queue should be empty
399
400                // Execution count MUST not increase
401                let count_after = *test_job.execution_count.read().await;
402                prop_assert_eq!(count_after, 1);
403
404                Ok(())
405            })?;
406        }
407
408        /// Property 21: Job IDs are unique
409        #[test]
410        fn prop_job_ids_unique(count in 2usize..10) {
411            tokio::runtime::Runtime::new().unwrap().block_on(async {
412                let backend = MemoryBackend::new();
413                let queue = JobQueue::new(backend);
414
415                let test_job = TestJob {
416                    should_fail: Arc::new(RwLock::new(false)),
417                    execution_count: Arc::new(RwLock::new(0)),
418                };
419                queue.register_job(test_job).await;
420
421                // Enqueue multiple jobs
422                let mut job_ids = Vec::new();
423                for i in 0..count {
424                    let id = queue
425                        .enqueue::<TestJob>(TestJobData { value: i as i32 })
426                        .await
427                        .unwrap();
428                    job_ids.push(id);
429                }
430
431                // All IDs MUST be unique
432                for i in 0..job_ids.len() {
433                    for j in (i + 1)..job_ids.len() {
434                        prop_assert_ne!(&job_ids[i], &job_ids[j]);
435                    }
436                }
437
438                Ok(())
439            })?;
440        }
441
442        /// Property 22: Max attempts limit is respected
443        #[test]
444        #[ignore] // TODO: Requires time mocking
445        fn prop_max_attempts_respected(value in -100i32..100, max_attempts in 1u32..5) {
446            tokio::runtime::Runtime::new().unwrap().block_on(async {
447                let backend = MemoryBackend::new();
448                let queue = JobQueue::new(backend);
449
450                let test_job = TestJob {
451                    should_fail: Arc::new(RwLock::new(true)),
452                    execution_count: Arc::new(RwLock::new(0)),
453                };
454                queue.register_job(test_job.clone()).await;
455
456                let opts = EnqueueOptions::new().max_attempts(max_attempts);
457                queue
458                    .enqueue_opts::<TestJob>(TestJobData { value }, opts)
459                    .await
460                    .unwrap();
461
462                // Process until queue is empty
463                let mut process_count = 0;
464                while queue.process_one().await.unwrap() {
465                    process_count += 1;
466                    // Safety limit
467                    if process_count > max_attempts + 5 {
468                        break;
469                    }
470                }
471
472                // Job MUST have been executed exactly max_attempts times
473                let count = *test_job.execution_count.read().await;
474                prop_assert_eq!(count, max_attempts);
475
476                Ok(())
477            })?;
478        }
479
480        /// Property 22: Backoff increases exponentially, not linearly
481        #[test]
482        fn prop_backoff_exponential_not_linear(attempt in 1u32..8) {
483            let backoff_current = 2u64.saturating_pow(attempt);
484            let backoff_previous = 2u64.saturating_pow(attempt - 1);
485
486            // Exponential: backoff_current = 2 * backoff_previous
487            prop_assert_eq!(backoff_current, backoff_previous * 2);
488
489            // NOT linear: backoff_current != backoff_previous + constant
490            let linear_would_be = backoff_previous + 2; // Linear increment of 2
491            if attempt > 2 {
492                prop_assert_ne!(backoff_current, linear_would_be);
493            }
494        }
495
496        /// Property 22: Backoff is capped at maximum (86400 seconds = 24 hours)
497        #[test]
498        fn prop_backoff_capped(attempt in 20u32..30) {
499            let backoff = 2u64.saturating_pow(attempt).min(86400);
500
501            // MUST be capped at 86400
502            prop_assert_eq!(backoff, 86400);
503
504            // Without cap, would be much larger
505            let uncapped = 2u64.saturating_pow(attempt);
506            prop_assert!(uncapped > 86400);
507        }
508    }
509}