1use crate::{JobEntry, JobId, JobState, QueueBackend, QueueError, QueueResult, QueueStats, JobResult, QueueConfig};
4use async_trait::async_trait;
5use dashmap::DashMap;
6use parking_lot::RwLock;
7use std::collections::BinaryHeap;
8use std::cmp::Ordering;
9use std::sync::Arc;
10use tokio::time::Instant;
11use chrono::Utc;
12
13#[derive(Debug, Clone)]
15struct PriorityJobEntry {
16 entry: JobEntry,
17 enqueue_time: Instant,
18}
19
20impl PartialEq for PriorityJobEntry {
21 fn eq(&self, other: &Self) -> bool {
22 self.entry.priority() == other.entry.priority() &&
23 self.entry.run_at() == other.entry.run_at() &&
24 self.enqueue_time == other.enqueue_time
25 }
26}
27
28impl Eq for PriorityJobEntry {}
29
30impl PartialOrd for PriorityJobEntry {
31 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
32 Some(self.cmp(other))
33 }
34}
35
36impl Ord for PriorityJobEntry {
37 fn cmp(&self, other: &Self) -> Ordering {
38 match self.entry.priority().cmp(&other.entry.priority()) {
40 Ordering::Equal => {
41 match other.entry.run_at().cmp(&self.entry.run_at()) {
42 Ordering::Equal => other.enqueue_time.cmp(&self.enqueue_time),
43 other_ord => other_ord,
44 }
45 }
46 priority_ord => priority_ord,
47 }
48 }
49}
50
51pub struct MemoryBackend {
53 config: QueueConfig,
54 jobs: DashMap<JobId, JobEntry>,
55 pending_queue: Arc<RwLock<BinaryHeap<PriorityJobEntry>>>,
56 stats: Arc<RwLock<QueueStats>>,
57}
58
59impl MemoryBackend {
60 pub fn new(config: QueueConfig) -> Self {
62 Self {
63 config,
64 jobs: DashMap::new(),
65 pending_queue: Arc::new(RwLock::new(BinaryHeap::new())),
66 stats: Arc::new(RwLock::new(QueueStats::default())),
67 }
68 }
69
70 fn update_stats(&self, old_state: Option<JobState>, new_state: JobState) {
72 let mut stats = self.stats.write();
73
74 if let Some(old) = old_state {
76 match old {
77 JobState::Pending => stats.pending_jobs = stats.pending_jobs.saturating_sub(1),
78 JobState::Processing => stats.processing_jobs = stats.processing_jobs.saturating_sub(1),
79 JobState::Completed => stats.completed_jobs = stats.completed_jobs.saturating_sub(1),
80 JobState::Failed => stats.failed_jobs = stats.failed_jobs.saturating_sub(1),
81 JobState::Dead => stats.dead_jobs = stats.dead_jobs.saturating_sub(1),
82 }
83 } else {
84 stats.total_jobs += 1;
86 }
87
88 match new_state {
90 JobState::Pending => stats.pending_jobs += 1,
91 JobState::Processing => stats.processing_jobs += 1,
92 JobState::Completed => stats.completed_jobs += 1,
93 JobState::Failed => stats.failed_jobs += 1,
94 JobState::Dead => stats.dead_jobs += 1,
95 }
96 }
97
98 fn get_next_ready_job(&self) -> Option<JobEntry> {
100 let mut queue = self.pending_queue.write();
101 let now = Utc::now();
102
103 while let Some(priority_entry) = queue.peek() {
105 if !self.jobs.contains_key(&priority_entry.entry.id()) {
109 queue.pop();
110 continue;
111 }
112
113 if priority_entry.entry.is_ready() {
114 let priority_entry = queue.pop().unwrap();
115 return Some(priority_entry.entry);
116 } else if priority_entry.entry.run_at() > now {
117 break;
119 } else {
120 queue.pop();
122 }
123 }
124
125 None
126 }
127}
128
129#[async_trait]
130impl QueueBackend for MemoryBackend {
131 async fn enqueue(&self, job: JobEntry) -> QueueResult<JobId> {
132 let job_id = job.id();
133
134 if *self.config.get_max_queue_size() > 0 && self.jobs.len() >= *self.config.get_max_queue_size() {
136 return Err(QueueError::Configuration(
137 format!("Queue size limit exceeded: {}", *self.config.get_max_queue_size())
138 ));
139 }
140
141 self.update_stats(None, job.state().clone());
143
144 if job.state() == &JobState::Pending {
146 let priority_entry = PriorityJobEntry {
147 entry: job.clone(),
148 enqueue_time: Instant::now(),
149 };
150 self.pending_queue.write().push(priority_entry);
151 }
152
153 self.jobs.insert(job_id, job);
155
156 Ok(job_id)
157 }
158
159 async fn dequeue(&self) -> QueueResult<Option<JobEntry>> {
160 if let Some(mut job) = self.get_next_ready_job() {
161 let old_state = job.state().clone();
162 job.mark_processing();
163
164 self.update_stats(Some(old_state), job.state().clone());
166
167 self.jobs.insert(job.id(), job.clone());
169
170 Ok(Some(job))
171 } else {
172 Ok(None)
173 }
174 }
175
176 async fn complete(&self, job_id: JobId, result: JobResult<()>) -> QueueResult<()> {
177 if let Some(mut job_entry) = self.jobs.get_mut(&job_id) {
178 let old_state = job_entry.state().clone();
179
180 match result {
181 Ok(_) => {
182 job_entry.mark_completed();
183 self.update_stats(Some(old_state), job_entry.state().clone());
184 }
185 Err(error) => {
186 let error_message = error.to_string();
187 job_entry.mark_failed(error_message);
188 let new_state = job_entry.state().clone();
189
190 self.update_stats(Some(old_state), new_state.clone());
191
192 if new_state == JobState::Failed {
194 let priority_entry = PriorityJobEntry {
195 entry: job_entry.clone(),
196 enqueue_time: Instant::now(),
197 };
198 self.pending_queue.write().push(priority_entry);
199 }
200 }
201 }
202 Ok(())
203 } else {
204 Err(QueueError::JobNotFound(job_id.to_string()))
205 }
206 }
207
208 async fn get_job(&self, job_id: JobId) -> QueueResult<Option<JobEntry>> {
209 Ok(self.jobs.get(&job_id).map(|entry| entry.clone()))
210 }
211
212 async fn get_jobs_by_state(&self, state: JobState, limit: Option<usize>) -> QueueResult<Vec<JobEntry>> {
213 let mut jobs: Vec<JobEntry> = self.jobs
214 .iter()
215 .filter(|entry| entry.state() == &state)
216 .map(|entry| entry.clone())
217 .collect();
218
219 jobs.sort_by(|a, b| a.created_at.cmp(&b.created_at));
221
222 if let Some(limit) = limit {
223 jobs.truncate(limit);
224 }
225
226 Ok(jobs)
227 }
228
229 async fn remove_job(&self, job_id: JobId) -> QueueResult<bool> {
230 if let Some((_, job)) = self.jobs.remove(&job_id) {
231 let mut stats = self.stats.write();
233 match job.state() {
234 JobState::Pending => stats.pending_jobs = stats.pending_jobs.saturating_sub(1),
235 JobState::Processing => stats.processing_jobs = stats.processing_jobs.saturating_sub(1),
236 JobState::Completed => stats.completed_jobs = stats.completed_jobs.saturating_sub(1),
237 JobState::Failed => stats.failed_jobs = stats.failed_jobs.saturating_sub(1),
238 JobState::Dead => stats.dead_jobs = stats.dead_jobs.saturating_sub(1),
239 }
240 stats.total_jobs = stats.total_jobs.saturating_sub(1);
241
242 Ok(true)
243 } else {
244 Ok(false)
245 }
246 }
247
248 async fn clear(&self) -> QueueResult<()> {
249 self.jobs.clear();
250 self.pending_queue.write().clear();
251 *self.stats.write() = QueueStats::default();
252 Ok(())
253 }
254
255 async fn stats(&self) -> QueueResult<QueueStats> {
256 Ok(self.stats.read().clone())
257 }
258
259 async fn requeue_job(&self, job_id: JobId, _job: JobEntry) -> QueueResult<bool> {
261 if let Some(mut existing_job) = self.jobs.get_mut(&job_id) {
263 if existing_job.state() == &JobState::Dead {
264 existing_job.reset_for_retry();
266
267 if existing_job.is_ready() {
269 let priority_entry = PriorityJobEntry {
270 entry: existing_job.clone(),
271 enqueue_time: Instant::now(),
272 };
273 self.pending_queue.write().push(priority_entry);
274 }
275
276 let mut stats = self.stats.write();
278 stats.dead_jobs = stats.dead_jobs.saturating_sub(1);
279 stats.pending_jobs = stats.pending_jobs.saturating_add(1);
280
281 Ok(true)
282 } else {
283 Ok(false)
284 }
285 } else {
286 Ok(false)
287 }
288 }
289
290 async fn clear_jobs_by_state(&self, state: JobState) -> QueueResult<u64> {
292 let mut count = 0u64;
293 let mut stats = self.stats.write();
294
295 self.jobs.retain(|_, job| {
297 if job.state() == &state {
298 count += 1;
299
300 match state {
302 JobState::Pending => stats.pending_jobs = stats.pending_jobs.saturating_sub(1),
303 JobState::Processing => stats.processing_jobs = stats.processing_jobs.saturating_sub(1),
304 JobState::Completed => stats.completed_jobs = stats.completed_jobs.saturating_sub(1),
305 JobState::Failed => stats.failed_jobs = stats.failed_jobs.saturating_sub(1),
306 JobState::Dead => stats.dead_jobs = stats.dead_jobs.saturating_sub(1),
307 }
308
309 false } else {
311 true }
313 });
314
315 if state == JobState::Pending {
317 let mut pending_queue = self.pending_queue.write();
318 pending_queue.retain(|priority_entry| {
319 priority_entry.entry.state() != &JobState::Pending
320 });
321 }
322
323 stats.total_jobs = stats.total_jobs.saturating_sub(count);
324 Ok(count)
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use crate::{Job, Priority};
332 use serde::{Deserialize, Serialize};
333 use std::time::Duration;
334
335 #[derive(Debug, Clone, Serialize, Deserialize)]
336 struct TestJob {
337 id: u32,
338 message: String,
339 }
340
341 #[async_trait]
342 impl Job for TestJob {
343 async fn execute(&self) -> JobResult<()> {
344 Ok(())
345 }
346
347 fn job_type(&self) -> &'static str {
348 "test"
349 }
350 }
351
352 #[tokio::test]
353 async fn test_memory_backend_basic_operations() {
354 let backend = MemoryBackend::new(QueueConfig::default());
355
356 let job = TestJob {
357 id: 1,
358 message: "test job".to_string(),
359 };
360 let entry = JobEntry::new(job, Some(Priority::Normal), None).unwrap();
361 let job_id = entry.id();
362
363 backend.enqueue(entry).await.unwrap();
365
366 let stats = backend.stats().await.unwrap();
368 assert_eq!(stats.pending_jobs, 1);
369 assert_eq!(stats.total_jobs, 1);
370
371 let dequeued = backend.dequeue().await.unwrap().unwrap();
373 assert_eq!(dequeued.id(), job_id);
374 assert_eq!(dequeued.state(), &JobState::Processing);
375
376 backend.complete(job_id, Ok(())).await.unwrap();
378
379 let stats = backend.stats().await.unwrap();
381 assert_eq!(stats.completed_jobs, 1);
382 assert_eq!(stats.processing_jobs, 0);
383 assert_eq!(stats.pending_jobs, 0);
384 }
385
386 #[tokio::test]
387 async fn test_priority_ordering() {
388 let backend = MemoryBackend::new(QueueConfig::default());
389
390 let low_job = TestJob { id: 1, message: "low".to_string() };
392 let high_job = TestJob { id: 2, message: "high".to_string() };
393 let normal_job = TestJob { id: 3, message: "normal".to_string() };
394
395 let low_entry = JobEntry::new(low_job, Some(Priority::Low), None).unwrap();
396 let high_entry = JobEntry::new(high_job, Some(Priority::High), None).unwrap();
397 let normal_entry = JobEntry::new(normal_job, Some(Priority::Normal), None).unwrap();
398
399 backend.enqueue(low_entry).await.unwrap();
400 backend.enqueue(high_entry).await.unwrap();
401 backend.enqueue(normal_entry).await.unwrap();
402
403 let first = backend.dequeue().await.unwrap().unwrap();
405 assert_eq!(first.priority(), Priority::High);
406
407 let second = backend.dequeue().await.unwrap().unwrap();
408 assert_eq!(second.priority(), Priority::Normal);
409
410 let third = backend.dequeue().await.unwrap().unwrap();
411 assert_eq!(third.priority(), Priority::Low);
412 }
413
414 #[tokio::test]
415 async fn test_ghost_job_cleanup() {
416 let config = crate::config::QueueConfigBuilder::testing().build().expect("Failed to build config");
417 let backend = MemoryBackend::new(config);
418
419 let job = TestJob { id: 1, message: "ghost test".to_string() };
421 let entry = JobEntry::new(job, Some(Priority::Normal), None).unwrap();
422 let job_id = backend.enqueue(entry).await.unwrap();
423
424 assert!(backend.jobs.contains_key(&job_id));
426 assert_eq!(backend.pending_queue.read().len(), 1);
427
428 backend.jobs.remove(&job_id);
430
431 assert_eq!(backend.pending_queue.read().len(), 1);
433
434 let result = backend.dequeue().await.unwrap();
436 assert!(result.is_none());
437
438 assert_eq!(backend.pending_queue.read().len(), 0);
440 }
441
442 #[tokio::test]
443 async fn test_multiple_ghost_jobs_cleanup() {
444 let config = crate::config::QueueConfigBuilder::testing().build().expect("Failed to build config");
445 let backend = MemoryBackend::new(config);
446
447 let mut job_ids = Vec::new();
449 for i in 1..=5 {
450 let job = TestJob { id: i, message: format!("ghost test {}", i) };
451 let entry = JobEntry::new(job, Some(Priority::Normal), None).unwrap();
452 let job_id = backend.enqueue(entry).await.unwrap();
453 job_ids.push(job_id);
454 }
455
456 assert_eq!(backend.pending_queue.read().len(), 5);
458 assert_eq!(backend.jobs.len(), 5);
459
460 for &job_id in &job_ids[0..3] {
462 backend.jobs.remove(&job_id);
463 }
464
465 assert_eq!(backend.pending_queue.read().len(), 5);
467 assert_eq!(backend.jobs.len(), 2);
468
469 let result = backend.dequeue().await.unwrap().unwrap();
471 assert_eq!(result.payload.get("id").unwrap().as_u64().unwrap(), 4);
472
473 assert_eq!(backend.pending_queue.read().len(), 1);
475 }
476
477 #[tokio::test]
478 async fn test_delayed_job() {
479 let backend = MemoryBackend::new(QueueConfig::default());
480
481 let job = TestJob {
482 id: 1,
483 message: "delayed job".to_string(),
484 };
485 let delay = Duration::from_millis(100);
486 let entry = JobEntry::new(job, None, Some(delay)).unwrap();
487
488 backend.enqueue(entry).await.unwrap();
489
490 let result = backend.dequeue().await.unwrap();
492 assert!(result.is_none());
493
494 tokio::time::sleep(delay + Duration::from_millis(10)).await;
496
497 let result = backend.dequeue().await.unwrap();
499 assert!(result.is_some());
500 }
501
502 #[tokio::test]
503 async fn test_job_failure_and_retry() {
504 let backend = MemoryBackend::new(QueueConfig::default());
505
506 let job = TestJob {
507 id: 1,
508 message: "failing job".to_string(),
509 };
510 let entry = JobEntry::new(job, None, None).unwrap();
511 let job_id = entry.id();
512
513 backend.enqueue(entry).await.unwrap();
514
515 let _job_entry = backend.dequeue().await.unwrap().unwrap();
517 let error = Box::new(std::io::Error::new(std::io::ErrorKind::Other, "test error"));
518 backend.complete(job_id, Err(error)).await.unwrap();
519
520 let stats = backend.stats().await.unwrap();
522 assert_eq!(stats.failed_jobs, 1);
523 assert_eq!(stats.processing_jobs, 0);
524
525 let failed_jobs = backend.get_jobs_by_state(JobState::Failed, None).await.unwrap();
527 assert_eq!(failed_jobs.len(), 1);
528 assert_eq!(failed_jobs[0].attempts(), 1);
529 }
530
531 #[tokio::test]
532 async fn test_queue_size_limit() {
533 let config = crate::config::QueueConfigBuilder::new()
534 .max_queue_size(2)
535 .build().expect("Failed to build config");
536 let backend = MemoryBackend::new(config);
537
538 for i in 1..=2 {
540 let job = TestJob { id: i, message: format!("job {}", i) };
541 let entry = JobEntry::new(job, None, None).unwrap();
542 backend.enqueue(entry).await.unwrap();
543 }
544
545 let job = TestJob { id: 3, message: "overflow job".to_string() };
547 let entry = JobEntry::new(job, None, None).unwrap();
548 let result = backend.enqueue(entry).await;
549
550 assert!(result.is_err());
551 assert!(matches!(result.unwrap_err(), QueueError::Configuration(_)));
552 }
553}