1use std::time::Duration;
64use serde::{Serialize, Deserialize, de::DeserializeOwned};
65use async_trait::async_trait;
66use thiserror::Error;
67use uuid::Uuid;
68use chrono::{DateTime, Utc};
69
70pub mod backends;
71pub mod config;
72pub mod worker;
73pub mod scheduler;
74
75pub use backends::*;
76pub use config::*;
77pub use worker::*;
78pub use scheduler::*;
79
80#[derive(Error, Debug)]
82pub enum QueueError {
83 #[error("Serialization error: {0}")]
84 Serialization(#[from] serde_json::Error),
85
86 #[error("Backend error: {0}")]
87 Backend(String),
88
89 #[error("Job not found: {0}")]
90 JobNotFound(String),
91
92 #[error("Queue configuration error: {0}")]
93 Configuration(String),
94
95 #[error("Network error: {0}")]
96 Network(String),
97
98 #[error("Timeout error")]
99 Timeout,
100
101 #[error("Job execution failed: {0}")]
102 Execution(String),
103}
104
105pub type QueueResult<T> = Result<T, QueueError>;
107
108pub type JobResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
110
111pub type JobId = Uuid;
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
116pub enum Priority {
117 Low = 0,
118 Normal = 1,
119 High = 2,
120 Critical = 3,
121}
122
123impl Default for Priority {
124 fn default() -> Self {
125 Priority::Normal
126 }
127}
128
129#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
131pub enum JobState {
132 Pending,
134 Processing,
136 Completed,
138 Failed,
140 Dead,
142}
143
144#[async_trait]
146pub trait Job: Send + Sync + Serialize + DeserializeOwned {
147 async fn execute(&self) -> JobResult<()>;
149
150 fn job_type(&self) -> &'static str;
152
153 fn max_retries(&self) -> u32 {
155 3
156 }
157
158 fn retry_delay(&self, attempt: u32) -> Duration {
160 Duration::from_secs(1 << attempt.min(6)) }
162
163 fn timeout(&self) -> Duration {
165 Duration::from_secs(300)
166 }
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct JobEntry {
172 id: JobId,
174 job_type: String,
176 payload: serde_json::Value,
178 priority: Priority,
180 state: JobState,
182 attempts: u32,
184 max_retries: u32,
186 created_at: DateTime<Utc>,
188 run_at: DateTime<Utc>,
190 processed_at: Option<DateTime<Utc>>,
192 last_error: Option<String>,
194}
195
196impl JobEntry {
197 pub fn new<T: Job>(job: T, priority: Option<Priority>, delay: Option<Duration>) -> QueueResult<Self> {
199 let now = Utc::now();
200 let run_at = match delay {
201 Some(d) => now + chrono::Duration::from_std(d)
202 .map_err(|e| QueueError::Configuration(format!("Invalid delay duration: {}", e)))?,
203 None => now,
204 };
205
206 let job_type = job.job_type().to_string();
207 let max_retries = job.max_retries();
208 let payload = serde_json::to_value(job)?;
209
210 Ok(JobEntry {
211 id: Uuid::new_v4(),
212 job_type,
213 payload,
214 priority: priority.unwrap_or_default(),
215 state: JobState::Pending,
216 attempts: 0,
217 max_retries,
218 created_at: now,
219 run_at,
220 processed_at: None,
221 last_error: None,
222 })
223 }
224
225 pub fn id(&self) -> JobId {
227 self.id
228 }
229
230 pub fn job_type(&self) -> &str {
232 &self.job_type
233 }
234
235 pub fn priority(&self) -> Priority {
237 self.priority
238 }
239
240 pub fn state(&self) -> &JobState {
242 &self.state
243 }
244
245 pub fn attempts(&self) -> u32 {
247 self.attempts
248 }
249
250 pub fn run_at(&self) -> DateTime<Utc> {
252 self.run_at
253 }
254
255 pub fn payload(&self) -> &serde_json::Value {
257 &self.payload
258 }
259
260 pub fn is_ready(&self) -> bool {
262 matches!(self.state, JobState::Pending | JobState::Failed) && self.run_at <= Utc::now()
263 }
264
265 pub async fn execute<T: Job>(&self) -> JobResult<()> {
267 let job: T = serde_json::from_value(self.payload.clone())?;
268 job.execute().await
269 }
270
271 pub(crate) fn mark_processing(&mut self) {
273 self.state = JobState::Processing;
274 self.processed_at = Some(Utc::now());
275 }
276
277 pub(crate) fn mark_completed(&mut self) {
279 self.state = JobState::Completed;
280 }
281
282 pub(crate) fn mark_failed(&mut self, error: String) {
284 self.attempts += 1;
285 self.last_error = Some(error);
286
287 if self.attempts >= self.max_retries {
288 self.state = JobState::Dead;
289 } else {
290 self.state = JobState::Failed;
291 let delay = Duration::from_secs(1 << self.attempts.min(6));
293 let chrono_delay = chrono::Duration::from_std(delay)
296 .unwrap_or(chrono::Duration::MAX);
297 self.run_at = Utc::now() + chrono_delay;
298 }
299 }
300
301 pub(crate) fn reset_for_retry(&mut self) {
303 self.attempts = 0;
304 self.state = JobState::Pending;
305 self.run_at = Utc::now();
306 self.last_error = None;
307 self.processed_at = None;
308 }
309
310 pub(crate) fn new_with_job_type(
312 job_type: String,
313 payload: serde_json::Value,
314 priority: Option<Priority>,
315 delay: Option<Duration>,
316 max_retries: u32,
317 ) -> QueueResult<Self> {
318 let now = Utc::now();
319 let run_at = delay.map(|d| now + chrono::Duration::from_std(d).unwrap()).unwrap_or(now);
320
321 Ok(JobEntry {
322 id: Uuid::new_v4(),
323 job_type,
324 payload,
325 priority: priority.unwrap_or_default(),
326 state: JobState::Pending,
327 attempts: 0,
328 max_retries,
329 created_at: now,
330 run_at,
331 processed_at: None,
332 last_error: None,
333 })
334 }
335}
336
337#[async_trait]
339pub trait QueueBackend: Send + Sync {
340 async fn enqueue(&self, job: JobEntry) -> QueueResult<JobId>;
342
343 async fn dequeue(&self) -> QueueResult<Option<JobEntry>>;
345
346 async fn complete(&self, job_id: JobId, result: JobResult<()>) -> QueueResult<()>;
348
349 async fn get_job(&self, job_id: JobId) -> QueueResult<Option<JobEntry>>;
351
352 async fn get_jobs_by_state(&self, state: JobState, limit: Option<usize>) -> QueueResult<Vec<JobEntry>>;
354
355 async fn remove_job(&self, job_id: JobId) -> QueueResult<bool>;
357
358 async fn clear(&self) -> QueueResult<()>;
360
361 async fn stats(&self) -> QueueResult<QueueStats>;
363
364 async fn requeue_job(&self, job_id: JobId, mut job: JobEntry) -> QueueResult<bool> {
367 if self.remove_job(job_id).await? {
369 job.reset_for_retry();
370 self.enqueue(job).await?;
371 Ok(true)
372 } else {
373 Ok(false)
374 }
375 }
376
377 async fn clear_jobs_by_state(&self, state: JobState) -> QueueResult<u64> {
380 let jobs = self.get_jobs_by_state(state, None).await?;
382 let count = jobs.len() as u64;
383
384 for job in jobs {
385 self.remove_job(job.id()).await?;
386 }
387
388 Ok(count)
389 }
390}
391
392#[derive(Debug, Clone, Default, Serialize, Deserialize)]
394pub struct QueueStats {
395 pub pending_jobs: u64,
396 pub processing_jobs: u64,
397 pub completed_jobs: u64,
398 pub failed_jobs: u64,
399 pub dead_jobs: u64,
400 pub total_jobs: u64,
401}
402
403pub struct Queue<B: QueueBackend> {
405 backend: B,
406}
407
408impl<B: QueueBackend> Queue<B> {
409 pub fn new(backend: B) -> Self {
411 Self { backend }
412 }
413
414 pub async fn enqueue<T: Job>(&self, job: T, priority: Option<Priority>) -> QueueResult<JobId> {
416 let entry = JobEntry::new(job, priority, None)?;
417 self.backend.enqueue(entry).await
418 }
419
420 pub async fn enqueue_delayed<T: Job>(&self, job: T, delay: Duration, priority: Option<Priority>) -> QueueResult<JobId> {
422 let entry = JobEntry::new(job, priority, Some(delay))?;
423 self.backend.enqueue(entry).await
424 }
425
426 pub async fn dequeue(&self) -> QueueResult<Option<JobEntry>> {
428 self.backend.dequeue().await
429 }
430
431 pub async fn complete(&self, job_id: JobId, result: JobResult<()>) -> QueueResult<()> {
433 self.backend.complete(job_id, result).await
434 }
435
436 pub async fn get_job(&self, job_id: JobId) -> QueueResult<Option<JobEntry>> {
438 self.backend.get_job(job_id).await
439 }
440
441 pub async fn get_jobs_by_state(&self, state: JobState, limit: Option<usize>) -> QueueResult<Vec<JobEntry>> {
443 self.backend.get_jobs_by_state(state, limit).await
444 }
445
446 pub async fn remove_job(&self, job_id: JobId) -> QueueResult<bool> {
448 self.backend.remove_job(job_id).await
449 }
450
451 pub async fn clear(&self) -> QueueResult<()> {
453 self.backend.clear().await
454 }
455
456 pub async fn stats(&self) -> QueueResult<QueueStats> {
458 self.backend.stats().await
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465 use crate::backends::MemoryBackend;
466
467 #[derive(Debug, Clone, Serialize, Deserialize)]
468 struct TestJob {
469 message: String,
470 }
471
472 #[async_trait]
473 impl Job for TestJob {
474 async fn execute(&self) -> JobResult<()> {
475 println!("Executing job: {}", self.message);
476 Ok(())
477 }
478
479 fn job_type(&self) -> &'static str {
480 "test"
481 }
482 }
483
484 #[tokio::test]
485 async fn test_job_entry_creation() {
486 let job = TestJob {
487 message: "Hello, World!".to_string(),
488 };
489
490 let entry = JobEntry::new(job, Some(Priority::High), None).unwrap();
491 assert_eq!(entry.job_type(), "test");
492 assert_eq!(entry.priority(), Priority::High);
493 assert_eq!(entry.state(), &JobState::Pending);
494 assert_eq!(entry.attempts(), 0);
495 assert!(entry.is_ready());
496 }
497
498 #[tokio::test]
499 async fn test_delayed_job() {
500 let job = TestJob {
501 message: "Delayed job".to_string(),
502 };
503
504 let delay = Duration::from_secs(60);
505 let entry = JobEntry::new(job, None, Some(delay)).unwrap();
506
507 assert!(!entry.is_ready());
509 assert!(entry.run_at() > Utc::now());
510 }
511
512 #[tokio::test]
513 async fn test_duration_conversion_error_handling() {
514 use std::time::Duration as StdDuration;
515
516 let job = TestJob {
517 message: "test".to_string(),
518 };
519
520 let max_delay = StdDuration::MAX;
522 let result = JobEntry::new(job.clone(), None, Some(max_delay));
523
524 assert!(result.is_err());
526 if let Err(QueueError::Configuration(msg)) = result {
527 assert!(msg.contains("Invalid delay duration"));
528 } else {
529 panic!("Expected Configuration error for invalid delay duration");
530 }
531
532 let entry = JobEntry::new(job, None, None).unwrap();
534 let mut job_entry = entry;
535
536 job_entry.attempts = 100; job_entry.mark_failed("test error".to_string());
541
542 assert_eq!(job_entry.state, JobState::Dead); }
545
546 #[tokio::test]
547 async fn test_queue_basic_operations() {
548 let backend = MemoryBackend::new(QueueConfig::default());
549 let queue = Queue::new(backend);
550
551 let job = TestJob {
552 message: "Test job".to_string(),
553 };
554
555 let job_id = queue.enqueue(job, Some(Priority::Normal)).await.unwrap();
557
558 let job_entry = queue.dequeue().await.unwrap().unwrap();
560 assert_eq!(job_entry.id(), job_id);
561 assert_eq!(job_entry.job_type(), "test");
562
563 let result = job_entry.execute::<TestJob>().await;
565 queue.complete(job_id, result).await.unwrap();
566
567 let stats = queue.stats().await.unwrap();
569 assert_eq!(stats.total_jobs, 1);
570 assert_eq!(stats.completed_jobs, 1);
571 }
572}