1use std::collections::{HashMap, VecDeque};
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::{Arc, Condvar, Mutex};
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use serde::{Deserialize, Serialize};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "lowercase")]
22pub enum Priority {
23 Low = 0,
24 Normal = 1,
25 High = 2,
26 Critical = 3,
27}
28
29impl Priority {
30 pub fn from_str_loose(s: &str) -> Self {
32 match s.to_lowercase().as_str() {
33 "low" => Self::Low,
34 "high" => Self::High,
35 "critical" => Self::Critical,
36 _ => Self::Normal,
37 }
38 }
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "lowercase")]
44pub enum JobStatus {
45 Pending,
46 Running,
47 Completed,
48 Failed,
49 Retrying,
50 Dead,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct Job {
56 pub id: String,
57 pub name: String,
58 pub payload: serde_json::Value,
59 pub priority: Priority,
60 pub status: JobStatus,
61 pub max_retries: u32,
62 pub retry_count: u32,
63 pub created_at: String,
64 pub started_at: Option<String>,
65 pub completed_at: Option<String>,
66 pub error: Option<String>,
67 pub delay_secs: u64,
69 pub queue: String,
71}
72
73pub enum JobResult {
75 Success,
76 Failure(String),
77 Retry(String),
78}
79
80pub type JobHandler = Arc<dyn Fn(&Job) -> JobResult + Send + Sync>;
82
83#[derive(Debug, Clone, Serialize)]
88pub struct QueueStats {
89 pub pending: usize,
90 pub running: usize,
91 pub completed: u64,
92 pub failed: u64,
93 pub dead: usize,
94 pub handlers: Vec<String>,
95}
96
97pub struct JobQueue {
106 pending: Mutex<VecDeque<Job>>,
109 running: Mutex<HashMap<String, Job>>,
111 history: Mutex<VecDeque<Job>>,
113 handlers: Mutex<HashMap<String, JobHandler>>,
115 notify: Condvar,
117 max_history: usize,
119 dead_letters: Mutex<VecDeque<Job>>,
121 completed_count: AtomicU64,
123 failed_count: AtomicU64,
124 next_id: AtomicU64,
126 store: Mutex<Option<std::sync::Arc<crate::job_store::JobStore>>>,
130}
131
132impl JobQueue {
133 pub fn new(max_history: usize) -> Self {
134 Self {
135 pending: Mutex::new(VecDeque::new()),
136 running: Mutex::new(HashMap::new()),
137 history: Mutex::new(VecDeque::new()),
138 handlers: Mutex::new(HashMap::new()),
139 notify: Condvar::new(),
140 max_history,
141 dead_letters: Mutex::new(VecDeque::new()),
142 completed_count: AtomicU64::new(0),
143 failed_count: AtomicU64::new(0),
144 next_id: AtomicU64::new(1),
145 store: Mutex::new(None),
146 }
147 }
148
149 pub fn attach_store(&self, store: std::sync::Arc<crate::job_store::JobStore>) {
152 *self.store.lock().unwrap() = Some(store);
153 }
154
155 fn persist(&self, job: &Job) {
159 if let Some(store) = self.store.lock().unwrap().as_ref() {
160 if let Err(e) = store.save(job) {
161 tracing::warn!("[jobs] failed to persist job {}: {e}", job.id);
162 }
163 }
164 }
165
166 pub fn register(&self, job_name: &str, handler: JobHandler) {
168 self.handlers
169 .lock()
170 .unwrap()
171 .insert(job_name.to_string(), handler);
172 }
173
174 pub fn enqueue(&self, name: &str, payload: serde_json::Value) -> String {
176 self.enqueue_with_options(name, payload, Priority::Normal, 0, 3, "default")
177 }
178
179 pub fn enqueue_with_options(
184 &self,
185 name: &str,
186 payload: serde_json::Value,
187 priority: Priority,
188 delay_secs: u64,
189 max_retries: u32,
190 queue: &str,
191 ) -> String {
192 match self.try_enqueue_with_options(name, payload, priority, delay_secs, max_retries, queue)
193 {
194 Ok(id) => id,
195 Err(e) => {
196 tracing::warn!("[jobs] enqueue rejected: {e}");
197 String::new()
198 }
199 }
200 }
201
202 pub fn try_enqueue_with_options(
206 &self,
207 name: &str,
208 payload: serde_json::Value,
209 priority: Priority,
210 delay_secs: u64,
211 max_retries: u32,
212 queue: &str,
213 ) -> Result<String, String> {
214 let id = format!("job_{}", self.next_id.fetch_add(1, Ordering::Relaxed));
215 let now = now_iso();
216 let job = Job {
217 id: id.clone(),
218 name: name.to_string(),
219 payload,
220 priority,
221 status: JobStatus::Pending,
222 max_retries,
223 retry_count: 0,
224 created_at: now,
225 started_at: None,
226 completed_at: None,
227 error: None,
228 delay_secs,
229 queue: queue.to_string(),
230 };
231 self.try_enqueue_job(job)
232 }
233
234 fn try_enqueue_job(&self, job: Job) -> Result<String, String> {
235 if let Some(store) = self.store.lock().unwrap().as_ref() {
238 if let Err(e) = store.save(&job) {
239 return Err(format!("persist failed for job {}: {e}", job.id));
240 }
241 }
242
243 let id = job.id.clone();
244 let priority = job.priority;
245 {
246 let mut pending = self.pending.lock().unwrap();
247 let pos = pending
249 .iter()
250 .position(|j| (j.priority as u8) < (priority as u8))
251 .unwrap_or(pending.len());
252 pending.insert(pos, job);
253 }
254 self.notify.notify_one();
255 Ok(id)
256 }
257
258 pub fn dequeue(&self, timeout: Duration) -> Option<Job> {
261 let mut pending = self.pending.lock().unwrap();
262 let now = now_secs();
263 if !pending.iter().any(|j| is_ready(j, now)) {
264 let (guard, _) = self.notify.wait_timeout(pending, timeout).unwrap();
265 pending = guard;
266 }
267
268 let now = now_secs();
269 let pos = pending.iter().position(|j| is_ready(j, now));
270 if let Some(idx) = pos {
271 let mut job = pending.remove(idx).unwrap();
272 job.status = JobStatus::Running;
273 job.started_at = Some(now_iso());
274 self.running
275 .lock()
276 .unwrap()
277 .insert(job.id.clone(), job.clone());
278 self.persist(&job);
279 Some(job)
280 } else {
281 None
282 }
283 }
284
285 pub fn dequeue_from(&self, queue: &str, timeout: Duration) -> Option<Job> {
288 let mut pending = self.pending.lock().unwrap();
289 let now = now_secs();
290 if !pending.iter().any(|j| j.queue == queue && is_ready(j, now)) {
291 let (guard, _) = self.notify.wait_timeout(pending, timeout).unwrap();
292 pending = guard;
293 }
294
295 let now = now_secs();
296 let pos = pending
297 .iter()
298 .position(|j| j.queue == queue && is_ready(j, now));
299 if let Some(idx) = pos {
300 let mut job = pending.remove(idx).unwrap();
301 job.status = JobStatus::Running;
302 job.started_at = Some(now_iso());
303 self.running
304 .lock()
305 .unwrap()
306 .insert(job.id.clone(), job.clone());
307 self.persist(&job);
308 Some(job)
309 } else {
310 None
311 }
312 }
313
314 pub fn complete(&self, job_id: &str) {
316 let job = self.running.lock().unwrap().remove(job_id);
317 if let Some(mut job) = job {
318 job.status = JobStatus::Completed;
319 job.completed_at = Some(now_iso());
320 self.completed_count.fetch_add(1, Ordering::Relaxed);
321 self.persist(&job);
322 self.push_history(job);
323 }
324 }
325
326 pub fn fail(&self, job_id: &str, error: &str) {
328 let job = self.running.lock().unwrap().remove(job_id);
329 if let Some(mut job) = job {
330 job.error = Some(error.to_string());
331
332 if job.retry_count < job.max_retries {
333 job.retry_count += 1;
335 job.status = JobStatus::Retrying;
336 job.started_at = None;
337 job.completed_at = None;
338
339 self.persist(&job);
340 let mut pending = self.pending.lock().unwrap();
341 let priority = job.priority as u8;
342 let pos = pending
343 .iter()
344 .position(|j| (j.priority as u8) < priority)
345 .unwrap_or(pending.len());
346 pending.insert(pos, job);
347 drop(pending);
348 self.notify.notify_one();
349 } else {
350 job.status = JobStatus::Dead;
352 job.completed_at = Some(now_iso());
353 self.failed_count.fetch_add(1, Ordering::Relaxed);
354 self.persist(&job);
355 self.dead_letters.lock().unwrap().push_back(job);
356 }
357 }
358 }
359
360 pub fn process_one(&self) -> bool {
363 let job = match self.dequeue(Duration::from_millis(100)) {
364 Some(j) => j,
365 None => return false,
366 };
367
368 let handler = {
369 let handlers = self.handlers.lock().unwrap();
370 handlers.get(&job.name).cloned()
371 };
372
373 match handler {
374 Some(h) => match h(&job) {
375 JobResult::Success => self.complete(&job.id),
376 JobResult::Failure(e) => self.fail(&job.id, &e),
377 JobResult::Retry(reason) => self.fail(&job.id, &reason),
378 },
379 None => {
380 self.fail(
381 &job.id,
382 &format!("No handler registered for '{}'", job.name),
383 );
384 }
385 }
386
387 true
388 }
389
390 pub fn get_job(&self, id: &str) -> Option<Job> {
392 if let Some(j) = self.running.lock().unwrap().get(id) {
394 return Some(j.clone());
395 }
396 if let Some(j) = self.pending.lock().unwrap().iter().find(|j| j.id == id) {
398 return Some(j.clone());
399 }
400 if let Some(j) = self.history.lock().unwrap().iter().find(|j| j.id == id) {
402 return Some(j.clone());
403 }
404 if let Some(j) = self
406 .dead_letters
407 .lock()
408 .unwrap()
409 .iter()
410 .find(|j| j.id == id)
411 {
412 return Some(j.clone());
413 }
414 None
415 }
416
417 pub fn stats(&self) -> QueueStats {
419 let handler_names: Vec<String> = self.handlers.lock().unwrap().keys().cloned().collect();
420 QueueStats {
421 pending: self.pending.lock().unwrap().len(),
422 running: self.running.lock().unwrap().len(),
423 completed: self.completed_count.load(Ordering::Relaxed),
424 failed: self.failed_count.load(Ordering::Relaxed),
425 dead: self.dead_letters.lock().unwrap().len(),
426 handlers: handler_names,
427 }
428 }
429
430 pub fn pending_count(&self) -> usize {
432 self.pending.lock().unwrap().len()
433 }
434
435 pub fn running_count(&self) -> usize {
437 self.running.lock().unwrap().len()
438 }
439
440 pub fn dead_letters(&self) -> Vec<Job> {
442 self.dead_letters.lock().unwrap().iter().cloned().collect()
443 }
444
445 pub fn retry_dead(&self, job_id: &str) -> bool {
447 let mut dead = self.dead_letters.lock().unwrap();
448 let pos = dead.iter().position(|j| j.id == job_id);
449 if let Some(idx) = pos {
450 let mut job = dead.remove(idx).unwrap();
451 job.status = JobStatus::Pending;
452 job.retry_count = 0;
453 job.error = None;
454 job.started_at = None;
455 job.completed_at = None;
456
457 let priority = job.priority as u8;
458 let mut pending = self.pending.lock().unwrap();
459 let insert_pos = pending
460 .iter()
461 .position(|j| (j.priority as u8) < priority)
462 .unwrap_or(pending.len());
463 pending.insert(insert_pos, job);
464 drop(pending);
465 drop(dead);
466 self.notify.notify_one();
467 true
468 } else {
469 false
470 }
471 }
472
473 pub fn recent_history(&self, limit: usize) -> Vec<Job> {
475 let history = self.history.lock().unwrap();
476 history.iter().rev().take(limit).cloned().collect()
477 }
478
479 pub fn list_jobs(&self, status: Option<&str>, queue: Option<&str>, limit: usize) -> Vec<Job> {
481 let mut result = Vec::new();
482
483 let pending = self.pending.lock().unwrap();
485 let running = self.running.lock().unwrap();
486 let history = self.history.lock().unwrap();
487
488 let all_jobs = pending.iter().chain(running.values()).chain(history.iter());
489
490 for job in all_jobs {
491 if let Some(s) = status {
492 let job_status = match &job.status {
493 JobStatus::Pending => "pending",
494 JobStatus::Running => "running",
495 JobStatus::Completed => "completed",
496 JobStatus::Failed => "failed",
497 JobStatus::Retrying => "retrying",
498 JobStatus::Dead => "dead",
499 };
500 if job_status != s {
501 continue;
502 }
503 }
504 if let Some(q) = queue {
505 if job.queue != q {
506 continue;
507 }
508 }
509 result.push(job.clone());
510 if result.len() >= limit {
511 break;
512 }
513 }
514
515 result
516 }
517
518 fn push_history(&self, job: Job) {
523 let mut history = self.history.lock().unwrap();
524 history.push_back(job);
525 while history.len() > self.max_history {
526 history.pop_front();
527 }
528 }
529
530 pub fn restore_from(&self, store: &crate::job_store::JobStore) -> usize {
542 let jobs = match store.load_pending() {
543 Ok(j) => j,
544 Err(_) => return 0,
545 };
546
547 let mut pending = self.pending.lock().unwrap();
548 let count = jobs.len();
549
550 for mut job in jobs {
551 if job.status == JobStatus::Running {
554 job.status = JobStatus::Pending;
555 job.started_at = None;
556 }
557 if job.status == JobStatus::Retrying {
558 job.status = JobStatus::Pending;
559 }
560
561 let priority = job.priority as u8;
563 let pos = pending
564 .iter()
565 .position(|j| (j.priority as u8) < priority)
566 .unwrap_or(pending.len());
567 pending.insert(pos, job);
568 }
569
570 let max_id = pending
573 .iter()
574 .filter_map(|j| {
575 j.id.strip_prefix("job_")
576 .and_then(|n| n.parse::<u64>().ok())
577 })
578 .max()
579 .unwrap_or(0);
580 let current = self.next_id.load(Ordering::Relaxed);
581 if max_id >= current {
582 self.next_id.store(max_id + 1, Ordering::Relaxed);
583 }
584
585 count
586 }
587}
588
589pub struct Worker {
595 queue: Arc<JobQueue>,
596 #[allow(dead_code)]
597 name: String,
598 running: Arc<AtomicBool>,
599}
600
601impl Worker {
602 pub fn new(queue: Arc<JobQueue>, name: &str) -> Self {
603 Self {
604 queue,
605 name: name.to_string(),
606 running: Arc::new(AtomicBool::new(true)),
607 }
608 }
609
610 pub fn start(self) -> WorkerHandle {
612 let running = Arc::clone(&self.running);
613 let handle = std::thread::spawn(move || {
614 while self.running.load(Ordering::Relaxed) {
615 self.queue.process_one();
616 }
617 });
618 WorkerHandle {
619 running,
620 handle: Some(handle),
621 }
622 }
623}
624
625pub struct WorkerHandle {
627 running: Arc<AtomicBool>,
628 #[allow(dead_code)]
629 handle: Option<std::thread::JoinHandle<()>>,
630}
631
632impl WorkerHandle {
633 pub fn stop(&self) {
635 self.running.store(false, Ordering::Relaxed);
636 }
637}
638
639fn now_iso() -> String {
644 format!("{}Z", now_secs())
645}
646
647fn now_secs() -> u64 {
648 SystemTime::now()
649 .duration_since(UNIX_EPOCH)
650 .unwrap_or_default()
651 .as_secs()
652}
653
654fn is_ready(job: &Job, now: u64) -> bool {
657 if job.delay_secs == 0 {
658 return true;
659 }
660 let created = job
661 .created_at
662 .trim_end_matches('Z')
663 .parse::<u64>()
664 .unwrap_or(0);
665 now >= created.saturating_add(job.delay_secs)
666}
667
668#[cfg(test)]
673mod tests {
674 use super::*;
675
676 #[test]
677 fn enqueue_and_dequeue() {
678 let q = JobQueue::new(100);
679 let id = q.enqueue("test_job", serde_json::json!({"x": 1}));
680 assert!(id.starts_with("job_"));
681 assert_eq!(q.pending_count(), 1);
682
683 let job = q.dequeue(Duration::from_millis(10)).unwrap();
684 assert_eq!(job.name, "test_job");
685 assert_eq!(job.status, JobStatus::Running);
686 assert_eq!(q.pending_count(), 0);
687 assert_eq!(q.running_count(), 1);
688 }
689
690 #[test]
691 fn dequeue_returns_none_on_empty() {
692 let q = JobQueue::new(100);
693 assert!(q.dequeue(Duration::from_millis(10)).is_none());
694 }
695
696 #[test]
697 fn priority_ordering() {
698 let q = JobQueue::new(100);
699 q.enqueue_with_options("low", serde_json::json!({}), Priority::Low, 0, 0, "default");
700 q.enqueue_with_options(
701 "high",
702 serde_json::json!({}),
703 Priority::High,
704 0,
705 0,
706 "default",
707 );
708 q.enqueue_with_options(
709 "normal",
710 serde_json::json!({}),
711 Priority::Normal,
712 0,
713 0,
714 "default",
715 );
716 q.enqueue_with_options(
717 "critical",
718 serde_json::json!({}),
719 Priority::Critical,
720 0,
721 0,
722 "default",
723 );
724
725 let j1 = q.dequeue(Duration::from_millis(10)).unwrap();
726 let j2 = q.dequeue(Duration::from_millis(10)).unwrap();
727 let j3 = q.dequeue(Duration::from_millis(10)).unwrap();
728 let j4 = q.dequeue(Duration::from_millis(10)).unwrap();
729
730 assert_eq!(j1.name, "critical");
731 assert_eq!(j2.name, "high");
732 assert_eq!(j3.name, "normal");
733 assert_eq!(j4.name, "low");
734 }
735
736 #[test]
737 fn complete_moves_to_history() {
738 let q = JobQueue::new(100);
739 let id = q.enqueue("test", serde_json::json!({}));
740 let _job = q.dequeue(Duration::from_millis(10)).unwrap();
741 q.complete(&id);
742
743 assert_eq!(q.running_count(), 0);
744 let job = q.get_job(&id).unwrap();
745 assert_eq!(job.status, JobStatus::Completed);
746 }
747
748 #[test]
749 fn fail_retries_when_under_max() {
750 let q = JobQueue::new(100);
751 let id = q.enqueue_with_options(
752 "test",
753 serde_json::json!({}),
754 Priority::Normal,
755 0,
756 2,
757 "default",
758 );
759
760 let _job = q.dequeue(Duration::from_millis(10)).unwrap();
762 q.fail(&id, "oops");
763
764 let job = q.get_job(&id).unwrap();
766 assert_eq!(job.retry_count, 1);
767 assert_eq!(job.status, JobStatus::Retrying);
768 assert_eq!(q.pending_count(), 1);
769 }
770
771 #[test]
772 fn fail_moves_to_dead_after_max_retries() {
773 let q = JobQueue::new(100);
774 let id = q.enqueue_with_options(
775 "test",
776 serde_json::json!({}),
777 Priority::Normal,
778 0,
779 1,
780 "default",
781 );
782
783 let _job = q.dequeue(Duration::from_millis(10)).unwrap();
785 q.fail(&id, "fail 1");
786
787 let _job = q.dequeue(Duration::from_millis(10)).unwrap();
789 q.fail(&id, "fail 2");
790
791 let dead = q.dead_letters();
793 assert_eq!(dead.len(), 1);
794 assert_eq!(dead[0].id, id);
795 assert_eq!(dead[0].status, JobStatus::Dead);
796 }
797
798 #[test]
799 fn retry_dead_letter() {
800 let q = JobQueue::new(100);
801 let id = q.enqueue_with_options(
802 "test",
803 serde_json::json!({}),
804 Priority::Normal,
805 0,
806 0,
807 "default",
808 );
809
810 let _job = q.dequeue(Duration::from_millis(10)).unwrap();
811 q.fail(&id, "dead");
812 assert_eq!(q.dead_letters().len(), 1);
813
814 assert!(q.retry_dead(&id));
815 assert_eq!(q.dead_letters().len(), 0);
816 assert_eq!(q.pending_count(), 1);
817
818 let job = q.get_job(&id).unwrap();
819 assert_eq!(job.status, JobStatus::Pending);
820 assert_eq!(job.retry_count, 0);
821 }
822
823 #[test]
824 fn retry_dead_returns_false_for_unknown() {
825 let q = JobQueue::new(100);
826 assert!(!q.retry_dead("nonexistent"));
827 }
828
829 #[test]
830 fn get_job_searches_all_collections() {
831 let q = JobQueue::new(100);
832 let id1 = q.enqueue("pending_job", serde_json::json!({}));
833 assert!(q.get_job(&id1).is_some());
834
835 let id2 = q.enqueue("running_job", serde_json::json!({}));
836 let _job = q.dequeue(Duration::from_millis(10)).unwrap(); let _job = q.dequeue(Duration::from_millis(10)).unwrap(); assert!(q.get_job(&id2).is_some());
839
840 q.complete(&id1);
841 let found = q.get_job(&id1).unwrap();
842 assert_eq!(found.status, JobStatus::Completed);
843 }
844
845 #[test]
846 fn dequeue_from_specific_queue() {
847 let q = JobQueue::new(100);
848 q.enqueue_with_options("a", serde_json::json!({}), Priority::High, 0, 0, "alpha");
849 q.enqueue_with_options("b", serde_json::json!({}), Priority::Critical, 0, 0, "beta");
850
851 let job = q.dequeue_from("beta", Duration::from_millis(10)).unwrap();
852 assert_eq!(job.name, "b");
853 assert_eq!(job.queue, "beta");
854 }
855
856 #[test]
857 fn process_one_with_handler() {
858 let q = Arc::new(JobQueue::new(100));
859 q.register("echo", Arc::new(|_job| JobResult::Success));
860 q.enqueue("echo", serde_json::json!({"msg": "hello"}));
861 assert!(q.process_one());
862
863 let stats = q.stats();
864 assert_eq!(stats.completed, 1);
865 assert_eq!(stats.pending, 0);
866 }
867
868 #[test]
869 fn process_one_without_handler_fails() {
870 let q = Arc::new(JobQueue::new(100));
871 q.enqueue_with_options(
872 "unhandled",
873 serde_json::json!({}),
874 Priority::Normal,
875 0,
876 0,
877 "default",
878 );
879 q.process_one();
880
881 assert_eq!(q.dead_letters().len(), 1);
883 }
884
885 #[test]
886 fn stats_reports_handler_names() {
887 let q = JobQueue::new(100);
888 q.register("alpha", Arc::new(|_| JobResult::Success));
889 q.register("beta", Arc::new(|_| JobResult::Success));
890
891 let stats = q.stats();
892 assert!(stats.handlers.contains(&"alpha".to_string()));
893 assert!(stats.handlers.contains(&"beta".to_string()));
894 }
895
896 #[test]
897 fn history_is_bounded() {
898 let q = JobQueue::new(3);
899 for i in 0..5 {
900 let id = q.enqueue(&format!("job_{i}"), serde_json::json!({}));
901 let _job = q.dequeue(Duration::from_millis(10)).unwrap();
902 q.complete(&id);
903 }
904 let history = q.recent_history(10);
905 assert_eq!(history.len(), 3);
906 }
907
908 #[test]
909 fn list_jobs_with_filters() {
910 let q = JobQueue::new(100);
911 q.enqueue_with_options("a", serde_json::json!({}), Priority::Normal, 0, 0, "emails");
912 q.enqueue_with_options(
913 "b",
914 serde_json::json!({}),
915 Priority::Normal,
916 0,
917 0,
918 "default",
919 );
920 q.enqueue_with_options("c", serde_json::json!({}), Priority::Normal, 0, 0, "emails");
921
922 let email_jobs = q.list_jobs(None, Some("emails"), 50);
923 assert_eq!(email_jobs.len(), 2);
924
925 let pending_jobs = q.list_jobs(Some("pending"), None, 50);
926 assert_eq!(pending_jobs.len(), 3);
927 }
928
929 #[test]
930 fn worker_processes_jobs() {
931 let q = Arc::new(JobQueue::new(100));
932 q.register("add", Arc::new(|_job| JobResult::Success));
933 q.enqueue("add", serde_json::json!({"a": 1, "b": 2}));
934
935 let worker = Worker::new(Arc::clone(&q), "test-worker");
936 let handle = worker.start();
937
938 std::thread::sleep(Duration::from_millis(200));
940 handle.stop();
941
942 assert_eq!(q.stats().completed, 1);
943 }
944
945 #[test]
946 fn priority_from_str_loose() {
947 assert_eq!(Priority::from_str_loose("low"), Priority::Low);
948 assert_eq!(Priority::from_str_loose("HIGH"), Priority::High);
949 assert_eq!(Priority::from_str_loose("critical"), Priority::Critical);
950 assert_eq!(Priority::from_str_loose("unknown"), Priority::Normal);
951 }
952
953 #[test]
954 fn restore_from_store() {
955 let store = crate::job_store::JobStore::in_memory().unwrap();
956
957 let pending_job = Job {
959 id: "job_100".into(),
960 name: "email".into(),
961 payload: serde_json::json!({"to": "alice"}),
962 priority: Priority::High,
963 status: JobStatus::Pending,
964 max_retries: 3,
965 retry_count: 0,
966 queue: "default".into(),
967 delay_secs: 0,
968 error: None,
969 created_at: "1000Z".into(),
970 started_at: None,
971 completed_at: None,
972 };
973 let running_job = Job {
974 id: "job_200".into(),
975 name: "process".into(),
976 payload: serde_json::json!({}),
977 priority: Priority::Normal,
978 status: JobStatus::Running,
979 max_retries: 2,
980 retry_count: 1,
981 queue: "default".into(),
982 delay_secs: 0,
983 error: None,
984 created_at: "2000Z".into(),
985 started_at: Some("2001Z".into()),
986 completed_at: None,
987 };
988
989 store.save(&pending_job).unwrap();
990 store.save(&running_job).unwrap();
991
992 let q = JobQueue::new(100);
993 let restored = q.restore_from(&store);
994 assert_eq!(restored, 2);
995 assert_eq!(q.pending_count(), 2);
996
997 let job = q.get_job("job_200").unwrap();
999 assert_eq!(job.status, JobStatus::Pending);
1000 assert!(job.started_at.is_none());
1001
1002 let new_id = q.enqueue("new", serde_json::json!({}));
1004 let num: u64 = new_id.strip_prefix("job_").unwrap().parse().unwrap();
1005 assert!(num > 200);
1006 }
1007}