Skip to main content

rustapi_jobs/backend/
memory.rs

1use super::{JobBackend, JobRequest};
2use crate::error::{JobError, Result};
3use async_trait::async_trait;
4use std::collections::VecDeque;
5use std::sync::{Arc, Mutex};
6
7/// In-memory job backend (not persistent, for testing/dev)
8#[derive(Debug, Clone, Default)]
9pub struct InMemoryBackend {
10    queue: Arc<Mutex<VecDeque<JobRequest>>>,
11    // In a real system we'd track processing jobs separately for reliability
12}
13
14impl InMemoryBackend {
15    pub fn new() -> Self {
16        Self::default()
17    }
18}
19
20#[async_trait]
21impl JobBackend for InMemoryBackend {
22    async fn push(&self, job: JobRequest) -> Result<()> {
23        let mut q = self
24            .queue
25            .lock()
26            .map_err(|_| JobError::BackendError("Lock poisoned".to_string()))?;
27        q.push_back(job);
28        Ok(())
29    }
30
31    async fn pop(&self) -> Result<Option<JobRequest>> {
32        let mut q = self
33            .queue
34            .lock()
35            .map_err(|_| JobError::BackendError("Lock poisoned".to_string()))?;
36
37        let now = chrono::Utc::now();
38        let mut index_to_remove = None;
39
40        // Scan the queue for the first ready job
41        for (i, job) in q.iter().enumerate() {
42            if let Some(run_at) = job.run_at {
43                if run_at > now {
44                    continue;
45                }
46            }
47            // Found a ready job (no run_at, or run_at <= now)
48            index_to_remove = Some(i);
49            break;
50        }
51
52        if let Some(i) = index_to_remove {
53            Ok(q.remove(i))
54        } else {
55            Ok(None)
56        }
57    }
58
59    async fn complete(&self, _job_id: &str) -> Result<()> {
60        // No-op for simple in-memory queue that removes on pop
61        Ok(())
62    }
63
64    async fn fail(&self, _job_id: &str, _error: &str) -> Result<()> {
65        // In a real implementation we might move to DLQ or re-queue
66        Ok(())
67    }
68}