1#![deny(missing_docs)]
2
3use background_jobs_core::{JobInfo, JobResult, NewJobInfo, ReturnJobInfo};
17use sled::{Db, Tree};
18use std::{
19 collections::HashMap,
20 ops::Bound,
21 sync::{Arc, Mutex},
22 time::Duration,
23};
24use tokio::{
25 sync::Notify,
26 task::{JoinError, JoinHandle},
27};
28use uuid::{NoContext, Timestamp, Uuid};
29
30#[derive(Debug, thiserror::Error)]
32pub enum Error {
33 #[error("Error in sled extensions")]
35 Sled(#[from] sled::Error),
36
37 #[error("Error in cbor")]
39 Cbor(#[from] serde_cbor::Error),
40
41 #[error("Failed to spawn blocking task")]
43 Spawn(#[from] std::io::Error),
44
45 #[error("Conflict while updating record")]
47 Conflict,
48
49 #[error("Missing record")]
51 Missing,
52
53 #[error("Blocking operation was canceled")]
55 Canceled,
56}
57
58#[derive(serde::Serialize, serde::Deserialize)]
59struct JobMeta {
60 id: Uuid,
61 heartbeat_interval: time::Duration,
62 state: Option<JobState>,
63}
64
65#[derive(serde::Serialize, serde::Deserialize)]
66struct JobState {
67 runner_id: Uuid,
68 heartbeat: time::OffsetDateTime,
69}
70
71struct JobKey {
72 queue: String,
73 next_queue_id: Uuid,
74}
75
76fn encode_key(key: &JobKey) -> Vec<u8> {
77 let mut v = Vec::with_capacity(key.queue.len() + 17);
78 v.extend_from_slice(key.queue.as_bytes());
79 v.push(b',');
80 v.extend_from_slice(key.next_queue_id.as_bytes());
81 v
82}
83
84pub type Result<T> = std::result::Result<T, Error>;
86
87#[derive(Clone)]
88pub struct Storage {
90 inner: Arc<Inner>,
91}
92
93struct Inner {
94 jobs: Tree,
95 queue_jobs: Tree,
96 queues: Mutex<HashMap<String, Arc<Notify>>>,
97 _db: Db,
98}
99
100#[cfg(tokio_unstable)]
101fn spawn_blocking<F, T>(name: &str, f: F) -> std::io::Result<JoinHandle<T>>
102where
103 F: FnOnce() -> T + Send + 'static,
104 T: Send + 'static,
105{
106 tokio::task::Builder::new().name(name).spawn_blocking(f)
107}
108
109#[cfg(not(tokio_unstable))]
110fn spawn_blocking<F, T>(name: &str, f: F) -> std::io::Result<JoinHandle<T>>
111where
112 F: FnOnce() -> T + Send + 'static,
113 T: Send + 'static,
114{
115 let _ = name;
116 Ok(tokio::task::spawn_blocking(f))
117}
118
119#[async_trait::async_trait]
120impl background_jobs_core::Storage for Storage {
121 type Error = Error;
122
123 #[tracing::instrument(skip(self))]
124 async fn info(&self, job_id: Uuid) -> Result<Option<JobInfo>> {
125 let this = self.clone();
126
127 spawn_blocking("jobs-info", move || this.get(job_id))?.await?
128 }
129
130 #[tracing::instrument(skip_all)]
131 async fn push(&self, job: NewJobInfo) -> Result<Uuid> {
132 let this = self.clone();
133
134 spawn_blocking("jobs-push", move || this.insert(job.build()))?.await?
135 }
136
137 #[tracing::instrument(skip(self))]
138 async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo> {
139 loop {
140 let notifier = self.notifier(queue.to_string());
141
142 let this = self.clone();
143 let queue2 = queue.to_string();
144 if let Some(job) =
145 spawn_blocking("jobs-try-pop", move || this.try_pop(queue2, runner_id))?.await??
146 {
147 return Ok(job);
148 }
149
150 let this = self.clone();
151 let queue2 = queue.to_string();
152 let duration = spawn_blocking("jobs-next-duration", move || {
153 this.next_duration(queue2).unwrap_or(Duration::from_secs(5))
154 })?
155 .await?;
156
157 match tokio::time::timeout(duration, notifier.notified()).await {
158 Ok(()) => {
159 }
161 Err(_) => {
162 }
164 }
165 }
166 }
167
168 #[tracing::instrument(skip(self))]
169 async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<()> {
170 let this = self.clone();
171
172 spawn_blocking("jobs-heartbeat", move || {
173 this.set_heartbeat(job_id, runner_id)
174 })?
175 .await?
176 }
177
178 #[tracing::instrument(skip(self))]
179 async fn complete(&self, ReturnJobInfo { id, result }: ReturnJobInfo) -> Result<bool> {
180 let this = self.clone();
181 let mut job = if let Some(job) =
182 spawn_blocking("jobs-remove", move || this.remove_job(id))?.await??
183 {
184 job
185 } else {
186 return Ok(true);
187 };
188
189 match result {
190 JobResult::Success => Ok(true),
192 JobResult::Unexecuted | JobResult::Unregistered => {
194 let this = self.clone();
195 spawn_blocking("jobs-requeue", move || this.insert(job))?.await??;
196 Ok(false)
197 }
198 JobResult::Failure if job.prepare_retry() => {
200 let this = self.clone();
201 spawn_blocking("jobs-requeue", move || this.insert(job))?.await??;
202 Ok(false)
203 }
204 JobResult::Failure => Ok(true),
206 }
207 }
208}
209
210impl Storage {
211 pub fn new(db: Db) -> Result<Self> {
213 Ok(Storage {
214 inner: Arc::new(Inner {
215 jobs: db.open_tree("background-jobs-jobs")?,
216 queue_jobs: db.open_tree("background-jobs-queue-jobs")?,
217 queues: Mutex::new(HashMap::new()),
218 _db: db,
219 }),
220 })
221 }
222
223 fn get(&self, job_id: Uuid) -> Result<Option<JobInfo>> {
224 if let Some(ivec) = self.inner.jobs.get(job_id.as_bytes())? {
225 let job_info = serde_cbor::from_slice(&ivec)?;
226
227 Ok(Some(job_info))
228 } else {
229 Ok(None)
230 }
231 }
232
233 fn notifier(&self, queue: String) -> Arc<Notify> {
234 self.inner
235 .queues
236 .lock()
237 .unwrap()
238 .entry(queue)
239 .or_insert_with(|| Arc::new(Notify::new()))
240 .clone()
241 }
242
243 fn notify(&self, queue: String) {
244 self.inner
245 .queues
246 .lock()
247 .unwrap()
248 .entry(queue)
249 .or_insert_with(|| Arc::new(Notify::new()))
250 .notify_one();
251 }
252
253 fn try_pop(&self, queue: String, runner_id: Uuid) -> Result<Option<JobInfo>> {
254 let lower_bound = encode_key(&JobKey {
255 queue: queue.clone(),
256 next_queue_id: Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)),
257 });
258 let upper_bound = encode_key(&JobKey {
259 queue: queue.clone(),
260 next_queue_id: Uuid::now_v7(),
261 });
262 let now = time::OffsetDateTime::now_utc();
263
264 for res in self
265 .inner
266 .queue_jobs
267 .range((Bound::Excluded(lower_bound), Bound::Included(upper_bound)))
268 {
269 let (key, ivec) = res?;
270
271 if let Ok(JobMeta {
272 id,
273 heartbeat_interval,
274 state,
275 }) = serde_cbor::from_slice(&ivec)
276 {
277 if state.is_none()
278 || state.is_some_and(|JobState { heartbeat, .. }| {
279 heartbeat + (5 * heartbeat_interval) < now
280 })
281 {
282 let new_bytes = serde_cbor::to_vec(&JobMeta {
283 id,
284 heartbeat_interval,
285 state: Some(JobState {
286 runner_id,
287 heartbeat: now,
288 }),
289 })?;
290
291 match self.inner.queue_jobs.compare_and_swap(
292 key,
293 Some(ivec),
294 Some(new_bytes),
295 )? {
296 Ok(()) => {
297 if let Some(job) = self.inner.jobs.get(id.as_bytes())? {
299 return Ok(Some(serde_cbor::from_slice(&job)?));
300 }
301 }
302 Err(_) => {
303 }
305 }
306
307 break;
308 }
309 }
310 }
311
312 Ok(None)
313 }
314
315 fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<()> {
316 let queue = if let Some(job) = self.inner.jobs.get(job_id.as_bytes())? {
317 let job: JobInfo = serde_cbor::from_slice(&job)?;
318 job.queue
319 } else {
320 return Ok(());
321 };
322
323 for res in self.inner.queue_jobs.scan_prefix(queue.as_bytes()) {
324 let (key, ivec) = res?;
325
326 if let Ok(JobMeta {
327 id,
328 heartbeat_interval,
329 ..
330 }) = serde_cbor::from_slice(&ivec)
331 {
332 if id == job_id {
333 let new_bytes = serde_cbor::to_vec(&JobMeta {
334 id,
335 heartbeat_interval,
336 state: Some(JobState {
337 runner_id,
338 heartbeat: time::OffsetDateTime::now_utc(),
339 }),
340 })?;
341
342 match self.inner.queue_jobs.compare_and_swap(
343 key,
344 Some(ivec),
345 Some(new_bytes),
346 )? {
347 Ok(()) => {
348 return Ok(());
350 }
351 Err(_) => {
352 return Err(Error::Conflict);
354 }
355 }
356 }
357 }
358 }
359
360 Err(Error::Missing)
361 }
362
363 fn remove_job(&self, job_id: Uuid) -> Result<Option<JobInfo>> {
364 let job: JobInfo = if let Some(job) = self.inner.jobs.remove(job_id.as_bytes())? {
365 serde_cbor::from_slice(&job)?
366 } else {
367 return Ok(None);
368 };
369
370 let lower_bound = encode_key(&JobKey {
371 queue: job.queue.clone(),
372 next_queue_id: Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)),
373 });
374 let upper_bound = encode_key(&JobKey {
375 queue: job.queue.clone(),
376 next_queue_id: Uuid::now_v7(),
377 });
378
379 for res in self
380 .inner
381 .queue_jobs
382 .range((Bound::Excluded(lower_bound), Bound::Included(upper_bound)))
383 {
384 let (key, ivec) = res?;
385
386 if let Ok(JobMeta { id, .. }) = serde_cbor::from_slice(&ivec) {
387 if id == job_id {
388 self.inner.queue_jobs.remove(key)?;
389 return Ok(Some(job));
390 }
391 }
392 }
393
394 Err(Error::Missing)
395 }
396
397 fn next_duration(&self, pop_queue: String) -> Option<Duration> {
398 let lower_bound = encode_key(&JobKey {
399 queue: pop_queue.clone(),
400 next_queue_id: Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)),
401 });
402
403 let now = time::OffsetDateTime::now_utc();
404
405 self.inner
406 .queue_jobs
407 .range((Bound::Excluded(lower_bound), Bound::Unbounded))
408 .values()
409 .filter_map(|res| res.ok())
410 .filter_map(|ivec| serde_cbor::from_slice(&ivec).ok())
411 .filter(|JobMeta { state, .. }| state.is_none())
412 .filter_map(|JobMeta { id, .. }| self.inner.jobs.get(id.as_bytes()).ok()?)
413 .filter_map(|ivec| serde_cbor::from_slice::<JobInfo>(&ivec).ok())
414 .take_while(|JobInfo { queue, .. }| queue.as_str() == pop_queue.as_str())
415 .map(|JobInfo { next_queue, .. }| {
416 if next_queue > now {
417 next_queue - now
418 } else {
419 time::Duration::seconds(0)
420 }
421 })
422 .find_map(|d| d.try_into().ok())
423 }
424
425 fn insert(&self, job: JobInfo) -> Result<Uuid> {
426 let id = job.id;
427 let queue = job.queue.clone();
428 let next_queue_id = job.next_queue_id();
429 let heartbeat_interval = job.heartbeat_interval;
430
431 let job_bytes = serde_cbor::to_vec(&job)?;
432
433 self.inner.jobs.insert(id.as_bytes(), job_bytes)?;
434
435 let key_bytes = encode_key(&JobKey {
436 queue: queue.clone(),
437 next_queue_id,
438 });
439
440 let job_meta_bytes = serde_cbor::to_vec(&JobMeta {
441 id,
442 heartbeat_interval: time::Duration::milliseconds(heartbeat_interval as _),
443 state: None,
444 })?;
445
446 self.inner.queue_jobs.insert(key_bytes, job_meta_bytes)?;
447
448 self.notify(queue);
449
450 Ok(id)
451 }
452}
453
454impl From<JoinError> for Error {
455 fn from(_: JoinError) -> Self {
456 Error::Canceled
457 }
458}