1use std::time::Duration;
64use serde::{Serialize, Deserialize, de::DeserializeOwned};
65use async_trait::async_trait;
66use thiserror::Error;
67use uuid::Uuid;
68use chrono::{DateTime, Utc};
69
70pub mod backends;
71pub mod config;
72pub mod worker;
73pub mod scheduler;
74
75pub use backends::*;
76pub use config::*;
77pub use worker::*;
78pub use scheduler::*;
79
80#[derive(Error, Debug)]
82pub enum QueueError {
83 #[error("Serialization error: {0}")]
84 Serialization(#[from] serde_json::Error),
85
86 #[error("Backend error: {0}")]
87 Backend(String),
88
89 #[error("Job not found: {0}")]
90 JobNotFound(String),
91
92 #[error("Queue configuration error: {0}")]
93 Configuration(String),
94
95 #[error("Network error: {0}")]
96 Network(String),
97
98 #[error("Timeout error")]
99 Timeout,
100
101 #[error("Job execution failed: {0}")]
102 Execution(String),
103}
104
105pub type QueueResult<T> = Result<T, QueueError>;
107
108pub type JobResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
110
111pub type JobId = Uuid;
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
116pub enum Priority {
117 Low = 0,
118 Normal = 1,
119 High = 2,
120 Critical = 3,
121}
122
123impl Default for Priority {
124 fn default() -> Self {
125 Priority::Normal
126 }
127}
128
129#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
131pub enum JobState {
132 Pending,
134 Processing,
136 Completed,
138 Failed,
140 Dead,
142}
143
144#[async_trait]
146pub trait Job: Send + Sync + Serialize + DeserializeOwned {
147 async fn execute(&self) -> JobResult<()>;
149
150 fn job_type(&self) -> &'static str;
152
153 fn max_retries(&self) -> u32 {
155 3
156 }
157
158 fn retry_delay(&self, attempt: u32) -> Duration {
160 Duration::from_secs(1 << attempt.min(6)) }
162
163 fn timeout(&self) -> Duration {
165 Duration::from_secs(300)
166 }
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct JobEntry {
172 id: JobId,
174 job_type: String,
176 payload: serde_json::Value,
178 priority: Priority,
180 state: JobState,
182 attempts: u32,
184 max_retries: u32,
186 created_at: DateTime<Utc>,
188 run_at: DateTime<Utc>,
190 processed_at: Option<DateTime<Utc>>,
192 last_error: Option<String>,
194}
195
196impl JobEntry {
197 pub fn new<T: Job>(job: T, priority: Option<Priority>, delay: Option<Duration>) -> QueueResult<Self> {
199 let now = Utc::now();
200 let run_at = match delay {
201 Some(d) => now + chrono::Duration::from_std(d)
202 .map_err(|e| QueueError::Configuration(format!("Invalid delay duration: {}", e)))?,
203 None => now,
204 };
205
206 let job_type = job.job_type().to_string();
207 let max_retries = job.max_retries();
208 let payload = serde_json::to_value(job)?;
209
210 Ok(JobEntry {
211 id: Uuid::new_v4(),
212 job_type,
213 payload,
214 priority: priority.unwrap_or_default(),
215 state: JobState::Pending,
216 attempts: 0,
217 max_retries,
218 created_at: now,
219 run_at,
220 processed_at: None,
221 last_error: None,
222 })
223 }
224
225 pub fn id(&self) -> JobId {
227 self.id
228 }
229
230 pub fn job_type(&self) -> &str {
232 &self.job_type
233 }
234
235 pub fn priority(&self) -> Priority {
237 self.priority
238 }
239
240 pub fn state(&self) -> &JobState {
242 &self.state
243 }
244
245 pub fn attempts(&self) -> u32 {
247 self.attempts
248 }
249
250 pub fn run_at(&self) -> DateTime<Utc> {
252 self.run_at
253 }
254
255 pub fn is_ready(&self) -> bool {
257 matches!(self.state, JobState::Pending | JobState::Failed) && self.run_at <= Utc::now()
258 }
259
260 pub async fn execute<T: Job>(&self) -> JobResult<()> {
262 let job: T = serde_json::from_value(self.payload.clone())?;
263 job.execute().await
264 }
265
266 pub(crate) fn mark_processing(&mut self) {
268 self.state = JobState::Processing;
269 self.processed_at = Some(Utc::now());
270 }
271
272 pub(crate) fn mark_completed(&mut self) {
274 self.state = JobState::Completed;
275 }
276
277 pub(crate) fn mark_failed(&mut self, error: String) {
279 self.attempts += 1;
280 self.last_error = Some(error);
281
282 if self.attempts >= self.max_retries {
283 self.state = JobState::Dead;
284 } else {
285 self.state = JobState::Failed;
286 let delay = Duration::from_secs(1 << self.attempts.min(6));
288 let chrono_delay = chrono::Duration::from_std(delay)
291 .unwrap_or(chrono::Duration::MAX);
292 self.run_at = Utc::now() + chrono_delay;
293 }
294 }
295
296 pub(crate) fn reset_for_retry(&mut self) {
298 self.attempts = 0;
299 self.state = JobState::Pending;
300 self.run_at = Utc::now();
301 self.last_error = None;
302 self.processed_at = None;
303 }
304
305 pub(crate) fn new_with_job_type(
307 job_type: String,
308 payload: serde_json::Value,
309 priority: Option<Priority>,
310 delay: Option<Duration>,
311 max_retries: u32,
312 ) -> QueueResult<Self> {
313 let now = Utc::now();
314 let run_at = delay.map(|d| now + chrono::Duration::from_std(d).unwrap()).unwrap_or(now);
315
316 Ok(JobEntry {
317 id: Uuid::new_v4(),
318 job_type,
319 payload,
320 priority: priority.unwrap_or_default(),
321 state: JobState::Pending,
322 attempts: 0,
323 max_retries,
324 created_at: now,
325 run_at,
326 processed_at: None,
327 last_error: None,
328 })
329 }
330}
331
332#[async_trait]
334pub trait QueueBackend: Send + Sync {
335 async fn enqueue(&self, job: JobEntry) -> QueueResult<JobId>;
337
338 async fn dequeue(&self) -> QueueResult<Option<JobEntry>>;
340
341 async fn complete(&self, job_id: JobId, result: JobResult<()>) -> QueueResult<()>;
343
344 async fn get_job(&self, job_id: JobId) -> QueueResult<Option<JobEntry>>;
346
347 async fn get_jobs_by_state(&self, state: JobState, limit: Option<usize>) -> QueueResult<Vec<JobEntry>>;
349
350 async fn remove_job(&self, job_id: JobId) -> QueueResult<bool>;
352
353 async fn clear(&self) -> QueueResult<()>;
355
356 async fn stats(&self) -> QueueResult<QueueStats>;
358
359 async fn requeue_job(&self, job_id: JobId, mut job: JobEntry) -> QueueResult<bool> {
362 if self.remove_job(job_id).await? {
364 job.reset_for_retry();
365 self.enqueue(job).await?;
366 Ok(true)
367 } else {
368 Ok(false)
369 }
370 }
371
372 async fn clear_jobs_by_state(&self, state: JobState) -> QueueResult<u64> {
375 let jobs = self.get_jobs_by_state(state, None).await?;
377 let count = jobs.len() as u64;
378
379 for job in jobs {
380 self.remove_job(job.id()).await?;
381 }
382
383 Ok(count)
384 }
385}
386
387#[derive(Debug, Clone, Default, Serialize, Deserialize)]
389pub struct QueueStats {
390 pub pending_jobs: u64,
391 pub processing_jobs: u64,
392 pub completed_jobs: u64,
393 pub failed_jobs: u64,
394 pub dead_jobs: u64,
395 pub total_jobs: u64,
396}
397
398pub struct Queue<B: QueueBackend> {
400 backend: B,
401}
402
403impl<B: QueueBackend> Queue<B> {
404 pub fn new(backend: B) -> Self {
406 Self { backend }
407 }
408
409 pub async fn enqueue<T: Job>(&self, job: T, priority: Option<Priority>) -> QueueResult<JobId> {
411 let entry = JobEntry::new(job, priority, None)?;
412 self.backend.enqueue(entry).await
413 }
414
415 pub async fn enqueue_delayed<T: Job>(&self, job: T, delay: Duration, priority: Option<Priority>) -> QueueResult<JobId> {
417 let entry = JobEntry::new(job, priority, Some(delay))?;
418 self.backend.enqueue(entry).await
419 }
420
421 pub async fn dequeue(&self) -> QueueResult<Option<JobEntry>> {
423 self.backend.dequeue().await
424 }
425
426 pub async fn complete(&self, job_id: JobId, result: JobResult<()>) -> QueueResult<()> {
428 self.backend.complete(job_id, result).await
429 }
430
431 pub async fn get_job(&self, job_id: JobId) -> QueueResult<Option<JobEntry>> {
433 self.backend.get_job(job_id).await
434 }
435
436 pub async fn get_jobs_by_state(&self, state: JobState, limit: Option<usize>) -> QueueResult<Vec<JobEntry>> {
438 self.backend.get_jobs_by_state(state, limit).await
439 }
440
441 pub async fn remove_job(&self, job_id: JobId) -> QueueResult<bool> {
443 self.backend.remove_job(job_id).await
444 }
445
446 pub async fn clear(&self) -> QueueResult<()> {
448 self.backend.clear().await
449 }
450
451 pub async fn stats(&self) -> QueueResult<QueueStats> {
453 self.backend.stats().await
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use crate::backends::MemoryBackend;
461
462 #[derive(Debug, Clone, Serialize, Deserialize)]
463 struct TestJob {
464 message: String,
465 }
466
467 #[async_trait]
468 impl Job for TestJob {
469 async fn execute(&self) -> JobResult<()> {
470 println!("Executing job: {}", self.message);
471 Ok(())
472 }
473
474 fn job_type(&self) -> &'static str {
475 "test"
476 }
477 }
478
479 #[tokio::test]
480 async fn test_job_entry_creation() {
481 let job = TestJob {
482 message: "Hello, World!".to_string(),
483 };
484
485 let entry = JobEntry::new(job, Some(Priority::High), None).unwrap();
486 assert_eq!(entry.job_type(), "test");
487 assert_eq!(entry.priority(), Priority::High);
488 assert_eq!(entry.state(), &JobState::Pending);
489 assert_eq!(entry.attempts(), 0);
490 assert!(entry.is_ready());
491 }
492
493 #[tokio::test]
494 async fn test_delayed_job() {
495 let job = TestJob {
496 message: "Delayed job".to_string(),
497 };
498
499 let delay = Duration::from_secs(60);
500 let entry = JobEntry::new(job, None, Some(delay)).unwrap();
501
502 assert!(!entry.is_ready());
504 assert!(entry.run_at() > Utc::now());
505 }
506
507 #[tokio::test]
508 async fn test_duration_conversion_error_handling() {
509 use std::time::Duration as StdDuration;
510
511 let job = TestJob {
512 message: "test".to_string(),
513 };
514
515 let max_delay = StdDuration::MAX;
517 let result = JobEntry::new(job.clone(), None, Some(max_delay));
518
519 assert!(result.is_err());
521 if let Err(QueueError::Configuration(msg)) = result {
522 assert!(msg.contains("Invalid delay duration"));
523 } else {
524 panic!("Expected Configuration error for invalid delay duration");
525 }
526
527 let entry = JobEntry::new(job, None, None).unwrap();
529 let mut job_entry = entry;
530
531 job_entry.attempts = 100; job_entry.mark_failed("test error".to_string());
536
537 assert_eq!(job_entry.state, JobState::Dead); }
540
541 #[tokio::test]
542 async fn test_queue_basic_operations() {
543 let backend = MemoryBackend::new(QueueConfig::default());
544 let queue = Queue::new(backend);
545
546 let job = TestJob {
547 message: "Test job".to_string(),
548 };
549
550 let job_id = queue.enqueue(job, Some(Priority::Normal)).await.unwrap();
552
553 let job_entry = queue.dequeue().await.unwrap().unwrap();
555 assert_eq!(job_entry.id(), job_id);
556 assert_eq!(job_entry.job_type(), "test");
557
558 let result = job_entry.execute::<TestJob>().await;
560 queue.complete(job_id, result).await.unwrap();
561
562 let stats = queue.stats().await.unwrap();
564 assert_eq!(stats.total_jobs, 1);
565 assert_eq!(stats.completed_jobs, 1);
566 }
567}