background_jobs_sled/
lib.rs

1#![deny(missing_docs)]
2
3//! # Background Jobs Sled Storage
4//! _An implementation of the Background Jobs Storage trait based on the Sled embedded database_
5//!
6//! ### Usage
7//! ```rust,ignore
8//! use background_jobs::{ServerConfig, sled_storage::Storage};
9//! use sled_extensions::{ConfigBuilder, Db};
10//!
11//! let db = Db::start(ConfigBuilder::default().temporary(true).build())?;
12//! let storage = Storage::new(db)?;
13//! let queue_handle = ServerConfig::new(storage).thread_count(8).start();
14//! ```
15
16use background_jobs_core::{JobInfo, JobResult, NewJobInfo, ReturnJobInfo};
17use sled::{Db, Tree};
18use std::{
19    collections::HashMap,
20    ops::Bound,
21    sync::{Arc, Mutex},
22    time::Duration,
23};
24use tokio::{
25    sync::Notify,
26    task::{JoinError, JoinHandle},
27};
28use uuid::{NoContext, Timestamp, Uuid};
29
30/// The error produced by sled storage calls
31#[derive(Debug, thiserror::Error)]
32pub enum Error {
33    /// Error in the database
34    #[error("Error in sled extensions")]
35    Sled(#[from] sled::Error),
36
37    /// Error in cbor
38    #[error("Error in cbor")]
39    Cbor(#[from] serde_cbor::Error),
40
41    /// Error spawning task
42    #[error("Failed to spawn blocking task")]
43    Spawn(#[from] std::io::Error),
44
45    /// Conflict while updating record
46    #[error("Conflict while updating record")]
47    Conflict,
48
49    /// Missing record
50    #[error("Missing record")]
51    Missing,
52
53    /// Error executing db operation
54    #[error("Blocking operation was canceled")]
55    Canceled,
56}
57
58#[derive(serde::Serialize, serde::Deserialize)]
59struct JobMeta {
60    id: Uuid,
61    heartbeat_interval: time::Duration,
62    state: Option<JobState>,
63}
64
65#[derive(serde::Serialize, serde::Deserialize)]
66struct JobState {
67    runner_id: Uuid,
68    heartbeat: time::OffsetDateTime,
69}
70
71struct JobKey {
72    queue: String,
73    next_queue_id: Uuid,
74}
75
76fn encode_key(key: &JobKey) -> Vec<u8> {
77    let mut v = Vec::with_capacity(key.queue.len() + 17);
78    v.extend_from_slice(key.queue.as_bytes());
79    v.push(b',');
80    v.extend_from_slice(key.next_queue_id.as_bytes());
81    v
82}
83
84/// A simple alias for Result<T, Error>
85pub type Result<T> = std::result::Result<T, Error>;
86
87#[derive(Clone)]
88/// The Sled-backed storage implementation
89pub struct Storage {
90    inner: Arc<Inner>,
91}
92
93struct Inner {
94    jobs: Tree,
95    queue_jobs: Tree,
96    queues: Mutex<HashMap<String, Arc<Notify>>>,
97    _db: Db,
98}
99
100#[cfg(tokio_unstable)]
101fn spawn_blocking<F, T>(name: &str, f: F) -> std::io::Result<JoinHandle<T>>
102where
103    F: FnOnce() -> T + Send + 'static,
104    T: Send + 'static,
105{
106    tokio::task::Builder::new().name(name).spawn_blocking(f)
107}
108
109#[cfg(not(tokio_unstable))]
110fn spawn_blocking<F, T>(name: &str, f: F) -> std::io::Result<JoinHandle<T>>
111where
112    F: FnOnce() -> T + Send + 'static,
113    T: Send + 'static,
114{
115    let _ = name;
116    Ok(tokio::task::spawn_blocking(f))
117}
118
119#[async_trait::async_trait]
120impl background_jobs_core::Storage for Storage {
121    type Error = Error;
122
123    #[tracing::instrument(skip(self))]
124    async fn info(&self, job_id: Uuid) -> Result<Option<JobInfo>> {
125        let this = self.clone();
126
127        spawn_blocking("jobs-info", move || this.get(job_id))?.await?
128    }
129
130    #[tracing::instrument(skip_all)]
131    async fn push(&self, job: NewJobInfo) -> Result<Uuid> {
132        let this = self.clone();
133
134        spawn_blocking("jobs-push", move || this.insert(job.build()))?.await?
135    }
136
137    #[tracing::instrument(skip(self))]
138    async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo> {
139        loop {
140            let notifier = self.notifier(queue.to_string());
141
142            let this = self.clone();
143            let queue2 = queue.to_string();
144            if let Some(job) =
145                spawn_blocking("jobs-try-pop", move || this.try_pop(queue2, runner_id))?.await??
146            {
147                return Ok(job);
148            }
149
150            let this = self.clone();
151            let queue2 = queue.to_string();
152            let duration = spawn_blocking("jobs-next-duration", move || {
153                this.next_duration(queue2).unwrap_or(Duration::from_secs(5))
154            })?
155            .await?;
156
157            match tokio::time::timeout(duration, notifier.notified()).await {
158                Ok(()) => {
159                    // notified
160                }
161                Err(_) => {
162                    // timeout
163                }
164            }
165        }
166    }
167
168    #[tracing::instrument(skip(self))]
169    async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<()> {
170        let this = self.clone();
171
172        spawn_blocking("jobs-heartbeat", move || {
173            this.set_heartbeat(job_id, runner_id)
174        })?
175        .await?
176    }
177
178    #[tracing::instrument(skip(self))]
179    async fn complete(&self, ReturnJobInfo { id, result }: ReturnJobInfo) -> Result<bool> {
180        let this = self.clone();
181        let mut job = if let Some(job) =
182            spawn_blocking("jobs-remove", move || this.remove_job(id))?.await??
183        {
184            job
185        } else {
186            return Ok(true);
187        };
188
189        match result {
190            // successful jobs are removed
191            JobResult::Success => Ok(true),
192            // Unregistered or Unexecuted jobs are restored as-is
193            JobResult::Unexecuted | JobResult::Unregistered => {
194                let this = self.clone();
195                spawn_blocking("jobs-requeue", move || this.insert(job))?.await??;
196                Ok(false)
197            }
198            // retryable failed jobs are restored
199            JobResult::Failure if job.prepare_retry() => {
200                let this = self.clone();
201                spawn_blocking("jobs-requeue", move || this.insert(job))?.await??;
202                Ok(false)
203            }
204            // dead jobs are removed
205            JobResult::Failure => Ok(true),
206        }
207    }
208}
209
210impl Storage {
211    /// Create a new Storage struct
212    pub fn new(db: Db) -> Result<Self> {
213        Ok(Storage {
214            inner: Arc::new(Inner {
215                jobs: db.open_tree("background-jobs-jobs")?,
216                queue_jobs: db.open_tree("background-jobs-queue-jobs")?,
217                queues: Mutex::new(HashMap::new()),
218                _db: db,
219            }),
220        })
221    }
222
223    fn get(&self, job_id: Uuid) -> Result<Option<JobInfo>> {
224        if let Some(ivec) = self.inner.jobs.get(job_id.as_bytes())? {
225            let job_info = serde_cbor::from_slice(&ivec)?;
226
227            Ok(Some(job_info))
228        } else {
229            Ok(None)
230        }
231    }
232
233    fn notifier(&self, queue: String) -> Arc<Notify> {
234        self.inner
235            .queues
236            .lock()
237            .unwrap()
238            .entry(queue)
239            .or_insert_with(|| Arc::new(Notify::new()))
240            .clone()
241    }
242
243    fn notify(&self, queue: String) {
244        self.inner
245            .queues
246            .lock()
247            .unwrap()
248            .entry(queue)
249            .or_insert_with(|| Arc::new(Notify::new()))
250            .notify_one();
251    }
252
253    fn try_pop(&self, queue: String, runner_id: Uuid) -> Result<Option<JobInfo>> {
254        let lower_bound = encode_key(&JobKey {
255            queue: queue.clone(),
256            next_queue_id: Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)),
257        });
258        let upper_bound = encode_key(&JobKey {
259            queue: queue.clone(),
260            next_queue_id: Uuid::now_v7(),
261        });
262        let now = time::OffsetDateTime::now_utc();
263
264        for res in self
265            .inner
266            .queue_jobs
267            .range((Bound::Excluded(lower_bound), Bound::Included(upper_bound)))
268        {
269            let (key, ivec) = res?;
270
271            if let Ok(JobMeta {
272                id,
273                heartbeat_interval,
274                state,
275            }) = serde_cbor::from_slice(&ivec)
276            {
277                if state.is_none()
278                    || state.is_some_and(|JobState { heartbeat, .. }| {
279                        heartbeat + (5 * heartbeat_interval) < now
280                    })
281                {
282                    let new_bytes = serde_cbor::to_vec(&JobMeta {
283                        id,
284                        heartbeat_interval,
285                        state: Some(JobState {
286                            runner_id,
287                            heartbeat: now,
288                        }),
289                    })?;
290
291                    match self.inner.queue_jobs.compare_and_swap(
292                        key,
293                        Some(ivec),
294                        Some(new_bytes),
295                    )? {
296                        Ok(()) => {
297                            // success
298                            if let Some(job) = self.inner.jobs.get(id.as_bytes())? {
299                                return Ok(Some(serde_cbor::from_slice(&job)?));
300                            }
301                        }
302                        Err(_) => {
303                            // conflict
304                        }
305                    }
306
307                    break;
308                }
309            }
310        }
311
312        Ok(None)
313    }
314
315    fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<()> {
316        let queue = if let Some(job) = self.inner.jobs.get(job_id.as_bytes())? {
317            let job: JobInfo = serde_cbor::from_slice(&job)?;
318            job.queue
319        } else {
320            return Ok(());
321        };
322
323        for res in self.inner.queue_jobs.scan_prefix(queue.as_bytes()) {
324            let (key, ivec) = res?;
325
326            if let Ok(JobMeta {
327                id,
328                heartbeat_interval,
329                ..
330            }) = serde_cbor::from_slice(&ivec)
331            {
332                if id == job_id {
333                    let new_bytes = serde_cbor::to_vec(&JobMeta {
334                        id,
335                        heartbeat_interval,
336                        state: Some(JobState {
337                            runner_id,
338                            heartbeat: time::OffsetDateTime::now_utc(),
339                        }),
340                    })?;
341
342                    match self.inner.queue_jobs.compare_and_swap(
343                        key,
344                        Some(ivec),
345                        Some(new_bytes),
346                    )? {
347                        Ok(()) => {
348                            // success
349                            return Ok(());
350                        }
351                        Err(_) => {
352                            // conflict
353                            return Err(Error::Conflict);
354                        }
355                    }
356                }
357            }
358        }
359
360        Err(Error::Missing)
361    }
362
363    fn remove_job(&self, job_id: Uuid) -> Result<Option<JobInfo>> {
364        let job: JobInfo = if let Some(job) = self.inner.jobs.remove(job_id.as_bytes())? {
365            serde_cbor::from_slice(&job)?
366        } else {
367            return Ok(None);
368        };
369
370        let lower_bound = encode_key(&JobKey {
371            queue: job.queue.clone(),
372            next_queue_id: Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)),
373        });
374        let upper_bound = encode_key(&JobKey {
375            queue: job.queue.clone(),
376            next_queue_id: Uuid::now_v7(),
377        });
378
379        for res in self
380            .inner
381            .queue_jobs
382            .range((Bound::Excluded(lower_bound), Bound::Included(upper_bound)))
383        {
384            let (key, ivec) = res?;
385
386            if let Ok(JobMeta { id, .. }) = serde_cbor::from_slice(&ivec) {
387                if id == job_id {
388                    self.inner.queue_jobs.remove(key)?;
389                    return Ok(Some(job));
390                }
391            }
392        }
393
394        Err(Error::Missing)
395    }
396
397    fn next_duration(&self, pop_queue: String) -> Option<Duration> {
398        let lower_bound = encode_key(&JobKey {
399            queue: pop_queue.clone(),
400            next_queue_id: Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)),
401        });
402
403        let now = time::OffsetDateTime::now_utc();
404
405        self.inner
406            .queue_jobs
407            .range((Bound::Excluded(lower_bound), Bound::Unbounded))
408            .values()
409            .filter_map(|res| res.ok())
410            .filter_map(|ivec| serde_cbor::from_slice(&ivec).ok())
411            .filter(|JobMeta { state, .. }| state.is_none())
412            .filter_map(|JobMeta { id, .. }| self.inner.jobs.get(id.as_bytes()).ok()?)
413            .filter_map(|ivec| serde_cbor::from_slice::<JobInfo>(&ivec).ok())
414            .take_while(|JobInfo { queue, .. }| queue.as_str() == pop_queue.as_str())
415            .map(|JobInfo { next_queue, .. }| {
416                if next_queue > now {
417                    next_queue - now
418                } else {
419                    time::Duration::seconds(0)
420                }
421            })
422            .find_map(|d| d.try_into().ok())
423    }
424
425    fn insert(&self, job: JobInfo) -> Result<Uuid> {
426        let id = job.id;
427        let queue = job.queue.clone();
428        let next_queue_id = job.next_queue_id();
429        let heartbeat_interval = job.heartbeat_interval;
430
431        let job_bytes = serde_cbor::to_vec(&job)?;
432
433        self.inner.jobs.insert(id.as_bytes(), job_bytes)?;
434
435        let key_bytes = encode_key(&JobKey {
436            queue: queue.clone(),
437            next_queue_id,
438        });
439
440        let job_meta_bytes = serde_cbor::to_vec(&JobMeta {
441            id,
442            heartbeat_interval: time::Duration::milliseconds(heartbeat_interval as _),
443            state: None,
444        })?;
445
446        self.inner.queue_jobs.insert(key_bytes, job_meta_bytes)?;
447
448        self.notify(queue);
449
450        Ok(id)
451    }
452}
453
454impl From<JoinError> for Error {
455    fn from(_: JoinError) -> Self {
456        Error::Canceled
457    }
458}