background_jobs_core/
storage.rs

1use crate::{JobInfo, NewJobInfo, ReturnJobInfo};
2use std::error::Error;
3use uuid::Uuid;
4
5/// Define a storage backend for jobs
6///
7/// This crate provides a default implementation in the `memory_storage` module, which is backed by
8/// HashMaps and uses counting to assign IDs. If jobs must be persistent across application
9/// restarts, look into the [`sled-backed`](https://github.com/spacejam/sled) implementation from
10/// the `background-jobs-sled-storage` crate.
11#[async_trait::async_trait]
12pub trait Storage: Clone + Send {
13    /// The error type used by the storage mechansim.
14    type Error: Error + Send + Sync;
15
16    /// Get the JobInfo for a given job ID
17    async fn info(&self, job_id: Uuid) -> Result<Option<JobInfo>, Self::Error>;
18
19    /// push a job into the queue
20    async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error>;
21
22    /// pop a job from the provided queue
23    async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error>;
24
25    /// mark a job as being actively worked on
26    async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Self::Error>;
27
28    /// "Return" a job to the database, marking it for retry if needed
29    ///
30    /// returns `true` if the job has not been requeued
31    async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<bool, Self::Error>;
32}
33
34/// A default, in-memory implementation of a storage mechanism
35pub mod memory_storage {
36    use crate::{JobInfo, JobResult, NewJobInfo, ReturnJobInfo};
37
38    use event_listener::{Event, EventListener};
39    use std::{
40        collections::{BTreeMap, HashMap},
41        convert::Infallible,
42        future::Future,
43        ops::Bound,
44        sync::Arc,
45        sync::Mutex,
46        time::Duration,
47    };
48    use time::OffsetDateTime;
49    use uuid::{NoContext, Timestamp, Uuid};
50
51    /// Allows memory storage to set timeouts for when to retry checking a queue for a job
52    #[async_trait::async_trait]
53    pub trait Timer {
54        /// Race a future against the clock, returning an empty tuple if the clock wins
55        async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()>
56        where
57            F: Future + Send + Sync;
58    }
59
60    #[derive(Clone)]
61    /// An In-Memory store for jobs
62    pub struct Storage<T> {
63        timer: T,
64        inner: Arc<Mutex<Inner>>,
65    }
66
67    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
68    struct QueueTimeId(Uuid);
69
70    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
71    struct JobId(Uuid);
72
73    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
74    struct RunnerId(Uuid);
75
76    type OrderedKey = (String, QueueTimeId);
77    type JobState = Option<(RunnerId, OffsetDateTime)>;
78    type JobMeta = (JobId, time::Duration, JobState);
79    type QueueMeta = (JobInfo, QueueTimeId);
80
81    struct Inner {
82        queues: HashMap<String, Event>,
83        jobs: HashMap<JobId, QueueMeta>,
84        queue_jobs: BTreeMap<OrderedKey, JobMeta>,
85    }
86
87    impl<T: Timer> Storage<T> {
88        /// Create a new, empty job store
89        pub fn new(timer: T) -> Self {
90            Storage {
91                inner: Arc::new(Mutex::new(Inner {
92                    queues: HashMap::new(),
93                    jobs: HashMap::new(),
94                    queue_jobs: BTreeMap::new(),
95                })),
96                timer,
97            }
98        }
99
100        fn get(&self, job_id: Uuid) -> Option<JobInfo> {
101            self.inner
102                .lock()
103                .unwrap()
104                .jobs
105                .get(&JobId(job_id))
106                .map(|(job_info, _)| job_info.clone())
107        }
108
109        fn listener(&self, pop_queue: String) -> (EventListener, Duration) {
110            let lower_bound = QueueTimeId(Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)));
111            let now = OffsetDateTime::now_utc();
112
113            let mut inner = self.inner.lock().unwrap();
114
115            let listener = inner.queues.entry(pop_queue.clone()).or_default().listen();
116
117            let duration = inner
118                .queue_jobs
119                .range((
120                    Bound::Excluded((pop_queue.clone(), lower_bound)),
121                    Bound::Unbounded,
122                ))
123                .filter(|(_, (_, _, meta))| meta.is_none())
124                .filter_map(|(_, (id, _, _))| inner.jobs.get(id))
125                .take_while(|(JobInfo { queue, .. }, _)| queue.as_str() == pop_queue.as_str())
126                .map(|(JobInfo { next_queue, .. }, _)| {
127                    if *next_queue > now {
128                        *next_queue - now
129                    } else {
130                        time::Duration::seconds(0)
131                    }
132                })
133                .find_map(|duration| duration.try_into().ok());
134
135            (listener, duration.unwrap_or(Duration::from_secs(10)))
136        }
137
138        fn try_pop(&self, queue: &str, runner_id: Uuid) -> Option<JobInfo> {
139            let runner_id = RunnerId(runner_id);
140
141            let lower_bound = QueueTimeId(Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)));
142            let upper_bound = QueueTimeId(Uuid::now_v7());
143            let now = time::OffsetDateTime::now_utc();
144
145            let mut inner = self.inner.lock().unwrap();
146
147            let mut pop_job = None;
148
149            for (_, (job_id, heartbeat_interval, job_meta)) in inner.queue_jobs.range_mut((
150                Bound::Excluded((queue.to_string(), lower_bound)),
151                Bound::Included((queue.to_string(), upper_bound)),
152            )) {
153                if job_meta.is_none()
154                    || job_meta.is_some_and(|(_, heartbeat_timestamp)| {
155                        heartbeat_timestamp + (5 * *heartbeat_interval) < now
156                    })
157                {
158                    *job_meta = Some((runner_id, now));
159                    pop_job = Some(*job_id);
160                    break;
161                }
162            }
163
164            if let Some(job_id) = pop_job {
165                return inner
166                    .jobs
167                    .get(&job_id)
168                    .map(|(job_info, _)| job_info.clone());
169            }
170
171            None
172        }
173
174        fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) {
175            let job_id = JobId(job_id);
176            let runner_id = RunnerId(runner_id);
177
178            let mut inner = self.inner.lock().unwrap();
179
180            let queue_key = if let Some((job, queue_time_id)) = inner.jobs.get(&job_id) {
181                (job.queue.clone(), *queue_time_id)
182            } else {
183                return;
184            };
185
186            if let Some((_, _, found_job_meta)) = inner.queue_jobs.get_mut(&queue_key) {
187                *found_job_meta = Some((runner_id, OffsetDateTime::now_utc()));
188            } else {
189                metrics::counter!("background-jobs.memory.heartbeat.missing-queue-job")
190                    .increment(1);
191                tracing::warn!("Missing job meta for {queue_key:?}");
192            }
193        }
194
195        fn remove_job(&self, job_id: Uuid) -> Option<JobInfo> {
196            let job_id = JobId(job_id);
197
198            let mut inner = self.inner.lock().unwrap();
199
200            let (job, queue_time_id) = inner.jobs.remove(&job_id)?;
201            let queue_key = (job.queue.clone(), queue_time_id);
202
203            if inner.queue_jobs.remove(&queue_key).is_none() {
204                metrics::counter!("background-jobs.memory.remove.missing-queue-job").increment(1);
205                tracing::warn!("failed to remove job meta for {queue_key:?}");
206            }
207
208            Some(job)
209        }
210
211        fn insert(&self, job: JobInfo) -> Uuid {
212            let id = JobId(job.id);
213            let queue = job.queue.clone();
214            let queue_time_id = QueueTimeId(job.next_queue_id());
215            let heartbeat_interval = job.heartbeat_interval;
216
217            let mut inner = self.inner.lock().unwrap();
218
219            inner.jobs.insert(id, (job, queue_time_id));
220
221            inner.queue_jobs.insert(
222                (queue.clone(), queue_time_id),
223                (
224                    id,
225                    time::Duration::milliseconds(heartbeat_interval as _),
226                    None,
227                ),
228            );
229
230            inner.queues.entry(queue).or_default().notify(1);
231
232            metrics::gauge!("background-jobs.memory.insert.queues")
233                .set(recordable(inner.queues.len()));
234            metrics::gauge!("background-jobs.memory.insert.jobs").set(recordable(inner.jobs.len()));
235            metrics::gauge!("background-jobs.memory.insert.queue-jobs")
236                .set(recordable(inner.queue_jobs.len()));
237
238            id.0
239        }
240    }
241
242    fn recordable(value: usize) -> u32 {
243        let value = value as u64;
244        let value = value % u64::from(u32::MAX);
245
246        value as _
247    }
248
249    #[async_trait::async_trait]
250    impl<T: Timer + Send + Sync + Clone> super::Storage for Storage<T> {
251        type Error = Infallible;
252
253        #[tracing::instrument(skip(self))]
254        async fn info(&self, job_id: Uuid) -> Result<Option<JobInfo>, Self::Error> {
255            Ok(self.get(job_id))
256        }
257
258        /// push a job into the queue
259        #[tracing::instrument(skip_all)]
260        async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error> {
261            Ok(self.insert(job.build()))
262        }
263
264        /// pop a job from the provided queue
265        #[tracing::instrument(skip(self))]
266        async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error> {
267            loop {
268                let (listener, duration) = self.listener(queue.to_string());
269
270                if let Some(job) = self.try_pop(queue, runner_id) {
271                    return Ok(job);
272                }
273
274                match self.timer.timeout(duration, listener).await {
275                    Ok(()) => {
276                        // listener wakeup
277                    }
278                    Err(()) => {
279                        // timeout
280                    }
281                }
282            }
283        }
284
285        /// mark a job as being actively worked on
286        #[tracing::instrument(skip(self))]
287        async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Self::Error> {
288            self.set_heartbeat(job_id, runner_id);
289            Ok(())
290        }
291
292        /// "Return" a job to the database, marking it for retry if needed
293        #[tracing::instrument(skip(self))]
294        async fn complete(
295            &self,
296            ReturnJobInfo { id, result }: ReturnJobInfo,
297        ) -> Result<bool, Self::Error> {
298            let mut job = if let Some(job) = self.remove_job(id) {
299                job
300            } else {
301                return Ok(true);
302            };
303
304            match result {
305                // successful jobs are removed
306                JobResult::Success => Ok(true),
307                // Unregistered or Unexecuted jobs are restored as-is
308                JobResult::Unregistered | JobResult::Unexecuted => {
309                    self.insert(job);
310                    Ok(false)
311                }
312                // retryable failed jobs are restored
313                JobResult::Failure if job.prepare_retry() => {
314                    self.insert(job);
315                    Ok(false)
316                }
317                // dead jobs are removed
318                JobResult::Failure => Ok(true),
319            }
320        }
321    }
322}