1use crate::backend::{JobBackend, JobRequest};
2use crate::error::Result;
3use crate::job::{Job, JobContext, JobHandler};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9#[derive(Clone)]
11pub struct JobQueue {
12 backend: Arc<dyn JobBackend>,
13 handlers: Arc<RwLock<HashMap<String, Box<dyn JobHandler>>>>,
14}
15
16impl JobQueue {
17 pub fn new<B: JobBackend + 'static>(backend: B) -> Self {
19 Self {
20 backend: Arc::new(backend),
21 handlers: Arc::new(RwLock::new(HashMap::new())),
22 }
23 }
24
25 pub async fn register_job<J: Job + Clone>(&self, job: J) {
27 let mut handlers = self.handlers.write().await;
28 handlers.insert(J::NAME.to_string(), Box::new(job));
29 }
30
31 pub async fn enqueue<J: Job>(&self, data: J::Data) -> Result<String> {
33 self.enqueue_opts::<J>(data, EnqueueOptions::default())
34 .await
35 }
36
37 pub async fn enqueue_opts<J: Job>(
39 &self,
40 data: J::Data,
41 opts: EnqueueOptions,
42 ) -> Result<String> {
43 let payload = serde_json::to_value(data)?;
44 let id = Uuid::new_v4().to_string();
45
46 let request = JobRequest {
47 id: id.clone(),
48 name: J::NAME.to_string(),
49 payload,
50 created_at: chrono::Utc::now(),
51 attempts: 0,
52 max_attempts: opts.max_attempts,
53 last_error: None,
54 run_at: opts.run_at,
55 };
56
57 self.backend.push(request).await?;
58 Ok(id)
59 }
60
61 pub async fn process_one(&self) -> Result<bool> {
63 if let Some(req) = self.backend.pop().await? {
64 let handlers = self.handlers.read().await;
65 if let Some(handler) = handlers.get(&req.name) {
66 let ctx = JobContext {
67 job_id: req.id.clone(),
68 attempt: req.attempts + 1,
69 created_at: req.created_at,
70 };
71
72 match handler.handle(ctx, req.payload.clone()).await {
73 Ok(_) => {
74 self.backend.complete(&req.id).await?;
75 Ok(true)
76 }
77 Err(e) => {
78 let mut new_req = req.clone();
79 new_req.attempts += 1;
80 new_req.last_error = Some(e.to_string());
81
82 if new_req.attempts < new_req.max_attempts {
83 let backoff_secs = 2u64.saturating_pow(new_req.attempts).min(86400);
87 let retry_delay = chrono::Duration::seconds(backoff_secs as i64);
88 new_req.run_at = Some(chrono::Utc::now() + retry_delay);
89
90 self.backend.push(new_req).await?;
92 } else {
93 self.backend.fail(&req.id, &e.to_string()).await?;
95
96 }
99 Ok(true)
100 }
101 }
102 } else {
103 self.backend
106 .fail(&req.id, &format!("No handler for job: {}", req.name))
107 .await?;
108 Ok(true)
109 }
110 } else {
111 Ok(false)
112 }
113 }
114
115 pub async fn start_worker(&self) -> Result<()> {
117 loop {
118 match self.process_one().await {
119 Ok(processed) => {
120 if !processed {
121 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
123 }
124 }
125 Err(e) => {
126 tracing::error!("Worker error: {}", e);
127 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
128 }
129 }
130 }
131 }
132}
133
134#[derive(Debug, Clone, Default)]
136pub struct EnqueueOptions {
137 pub max_attempts: u32,
138 pub run_at: Option<chrono::DateTime<chrono::Utc>>,
139}
140
141impl EnqueueOptions {
142 pub fn new() -> Self {
143 Self::default()
144 }
145
146 pub fn max_attempts(mut self, n: u32) -> Self {
147 self.max_attempts = n;
148 self
149 }
150
151 pub fn delay(mut self, duration: std::time::Duration) -> Self {
152 self.run_at = Some(chrono::Utc::now() + chrono::Duration::from_std(duration).unwrap());
153 self
154 }
155}
156
157#[cfg(test)]
158mod property_tests {
159 use super::*;
160 use crate::backend::memory::InMemoryBackend as MemoryBackend;
161 use crate::JobError;
162 use async_trait::async_trait;
163 use proptest::prelude::*;
164 use serde::{Deserialize, Serialize};
165 use std::sync::Arc;
166 use tokio::sync::RwLock;
167
168 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
179 struct TestJobData {
180 value: i32,
181 }
182
183 #[derive(Clone)]
184 struct TestJob {
185 should_fail: Arc<RwLock<bool>>,
186 execution_count: Arc<RwLock<u32>>,
187 }
188
189 #[async_trait]
190 impl Job for TestJob {
191 const NAME: &'static str = "test_job";
192 type Data = TestJobData;
193
194 async fn execute(&self, _ctx: JobContext, data: Self::Data) -> Result<()> {
195 let mut count = self.execution_count.write().await;
196 *count += 1;
197
198 let should_fail = *self.should_fail.read().await;
199 if should_fail {
200 return Err(JobError::WorkerError(format!(
201 "Test failure for value {}",
202 data.value
203 )));
204 }
205 Ok(())
206 }
207 }
208
209 proptest! {
210 #![proptest_config(ProptestConfig::with_cases(50))]
211
212 #[test]
214 fn prop_job_persistence(value in -1000i32..1000i32) {
215 tokio::runtime::Runtime::new().unwrap().block_on(async {
216 let backend = MemoryBackend::new();
217 let queue = JobQueue::new(backend);
218
219 let test_job = TestJob {
220 should_fail: Arc::new(RwLock::new(false)),
221 execution_count: Arc::new(RwLock::new(0)),
222 };
223 queue.register_job(test_job.clone()).await;
224
225 let job_id = queue
227 .enqueue::<TestJob>(TestJobData { value })
228 .await
229 .unwrap();
230
231 prop_assert!(!job_id.is_empty());
232
233 let processed = queue.process_one().await.unwrap();
235 prop_assert!(processed);
236
237 let count = *test_job.execution_count.read().await;
239 prop_assert_eq!(count, 1);
240
241 Ok(())
242 })?;
243 }
244
245 #[test]
247 fn prop_exponential_backoff_calculation(attempts in 0u32..10) {
248 let expected_backoff = 2u64.saturating_pow(attempts).min(86400);
249
250 let calculated_backoff = 2u64.saturating_pow(attempts).min(86400);
252
253 prop_assert_eq!(calculated_backoff, expected_backoff);
254
255 if attempts > 0 && expected_backoff < 86400 {
257 let previous = 2u64.saturating_pow(attempts - 1);
258 prop_assert_eq!(expected_backoff, previous * 2);
259 }
260 }
261
262 #[test]
264 #[ignore] fn prop_retry_behavior(value in -1000i32..1000i32, max_attempts in 2u32..5) {
266 tokio::runtime::Runtime::new().unwrap().block_on(async {
267 let backend = MemoryBackend::new();
268 let queue = JobQueue::new(backend);
269
270 let test_job = TestJob {
271 should_fail: Arc::new(RwLock::new(true)), execution_count: Arc::new(RwLock::new(0)),
273 };
274 queue.register_job(test_job.clone()).await;
275
276 let opts = EnqueueOptions::new().max_attempts(max_attempts);
278 let _job_id = queue
279 .enqueue_opts::<TestJob>(TestJobData { value }, opts)
280 .await
281 .unwrap();
282
283 for attempt in 1..=max_attempts {
285 let processed = queue.process_one().await.unwrap();
286 prop_assert!(processed);
287
288 let count = *test_job.execution_count.read().await;
289 prop_assert_eq!(count, attempt);
290 }
291
292 let processed = queue.process_one().await.unwrap();
295 prop_assert!(!processed); Ok(())
298 })?;
299 }
300
301 #[test]
303 fn prop_multiple_jobs_persist(
304 values in prop::collection::vec(-100i32..100, 1..10)
305 ) {
306 tokio::runtime::Runtime::new().unwrap().block_on(async {
307 let backend = MemoryBackend::new();
308 let queue = JobQueue::new(backend);
309
310 let test_job = TestJob {
311 should_fail: Arc::new(RwLock::new(false)),
312 execution_count: Arc::new(RwLock::new(0)),
313 };
314 queue.register_job(test_job.clone()).await;
315
316 let job_count = values.len();
318 for value in values {
319 queue.enqueue::<TestJob>(TestJobData { value }).await.unwrap();
320 }
321
322 for _ in 0..job_count {
324 let processed = queue.process_one().await.unwrap();
325 prop_assert!(processed);
326 }
327
328 let count = *test_job.execution_count.read().await;
330 prop_assert_eq!(count as usize, job_count);
331
332 let processed = queue.process_one().await.unwrap();
334 prop_assert!(!processed);
335
336 Ok(())
337 })?;
338 }
339
340 #[test]
342 fn prop_delayed_jobs_not_immediate(value in -100i32..100) {
343 tokio::runtime::Runtime::new().unwrap().block_on(async {
344 let backend = MemoryBackend::new();
345 let queue = JobQueue::new(backend);
346
347 let test_job = TestJob {
348 should_fail: Arc::new(RwLock::new(false)),
349 execution_count: Arc::new(RwLock::new(0)),
350 };
351 queue.register_job(test_job.clone()).await;
352
353 let opts = EnqueueOptions::new()
355 .delay(std::time::Duration::from_secs(3600)); queue
357 .enqueue_opts::<TestJob>(TestJobData { value }, opts)
358 .await
359 .unwrap();
360
361 let processed = queue.process_one().await.unwrap();
363 prop_assert!(!processed); let count = *test_job.execution_count.read().await;
367 prop_assert_eq!(count, 0);
368
369 Ok(())
370 })?;
371 }
372
373 #[test]
375 fn prop_successful_job_completed(value in -100i32..100) {
376 tokio::runtime::Runtime::new().unwrap().block_on(async {
377 let backend = MemoryBackend::new();
378 let queue = JobQueue::new(backend);
379
380 let test_job = TestJob {
381 should_fail: Arc::new(RwLock::new(false)),
382 execution_count: Arc::new(RwLock::new(0)),
383 };
384 queue.register_job(test_job.clone()).await;
385
386 queue.enqueue::<TestJob>(TestJobData { value }).await.unwrap();
387
388 let processed = queue.process_one().await.unwrap();
390 prop_assert!(processed);
391
392 let count = *test_job.execution_count.read().await;
394 prop_assert_eq!(count, 1);
395
396 let processed_again = queue.process_one().await.unwrap();
398 prop_assert!(!processed_again); let count_after = *test_job.execution_count.read().await;
402 prop_assert_eq!(count_after, 1);
403
404 Ok(())
405 })?;
406 }
407
408 #[test]
410 fn prop_job_ids_unique(count in 2usize..10) {
411 tokio::runtime::Runtime::new().unwrap().block_on(async {
412 let backend = MemoryBackend::new();
413 let queue = JobQueue::new(backend);
414
415 let test_job = TestJob {
416 should_fail: Arc::new(RwLock::new(false)),
417 execution_count: Arc::new(RwLock::new(0)),
418 };
419 queue.register_job(test_job).await;
420
421 let mut job_ids = Vec::new();
423 for i in 0..count {
424 let id = queue
425 .enqueue::<TestJob>(TestJobData { value: i as i32 })
426 .await
427 .unwrap();
428 job_ids.push(id);
429 }
430
431 for i in 0..job_ids.len() {
433 for j in (i + 1)..job_ids.len() {
434 prop_assert_ne!(&job_ids[i], &job_ids[j]);
435 }
436 }
437
438 Ok(())
439 })?;
440 }
441
442 #[test]
444 #[ignore] fn prop_max_attempts_respected(value in -100i32..100, max_attempts in 1u32..5) {
446 tokio::runtime::Runtime::new().unwrap().block_on(async {
447 let backend = MemoryBackend::new();
448 let queue = JobQueue::new(backend);
449
450 let test_job = TestJob {
451 should_fail: Arc::new(RwLock::new(true)),
452 execution_count: Arc::new(RwLock::new(0)),
453 };
454 queue.register_job(test_job.clone()).await;
455
456 let opts = EnqueueOptions::new().max_attempts(max_attempts);
457 queue
458 .enqueue_opts::<TestJob>(TestJobData { value }, opts)
459 .await
460 .unwrap();
461
462 let mut process_count = 0;
464 while queue.process_one().await.unwrap() {
465 process_count += 1;
466 if process_count > max_attempts + 5 {
468 break;
469 }
470 }
471
472 let count = *test_job.execution_count.read().await;
474 prop_assert_eq!(count, max_attempts);
475
476 Ok(())
477 })?;
478 }
479
480 #[test]
482 fn prop_backoff_exponential_not_linear(attempt in 1u32..8) {
483 let backoff_current = 2u64.saturating_pow(attempt);
484 let backoff_previous = 2u64.saturating_pow(attempt - 1);
485
486 prop_assert_eq!(backoff_current, backoff_previous * 2);
488
489 let linear_would_be = backoff_previous + 2; if attempt > 2 {
492 prop_assert_ne!(backoff_current, linear_would_be);
493 }
494 }
495
496 #[test]
498 fn prop_backoff_capped(attempt in 20u32..30) {
499 let backoff = 2u64.saturating_pow(attempt).min(86400);
500
501 prop_assert_eq!(backoff, 86400);
503
504 let uncapped = 2u64.saturating_pow(attempt);
506 prop_assert!(uncapped > 86400);
507 }
508 }
509}