1use crate::{JobInfo, NewJobInfo, ReturnJobInfo};
2use std::error::Error;
3use uuid::Uuid;
4
5#[async_trait::async_trait]
12pub trait Storage: Clone + Send {
13 type Error: Error + Send + Sync;
15
16 async fn info(&self, job_id: Uuid) -> Result<Option<JobInfo>, Self::Error>;
18
19 async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error>;
21
22 async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error>;
24
25 async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Self::Error>;
27
28 async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<bool, Self::Error>;
32}
33
34pub mod memory_storage {
36 use crate::{JobInfo, JobResult, NewJobInfo, ReturnJobInfo};
37
38 use event_listener::{Event, EventListener};
39 use std::{
40 collections::{BTreeMap, HashMap},
41 convert::Infallible,
42 future::Future,
43 ops::Bound,
44 sync::Arc,
45 sync::Mutex,
46 time::Duration,
47 };
48 use time::OffsetDateTime;
49 use uuid::{NoContext, Timestamp, Uuid};
50
51 #[async_trait::async_trait]
53 pub trait Timer {
54 async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()>
56 where
57 F: Future + Send + Sync;
58 }
59
60 #[derive(Clone)]
61 pub struct Storage<T> {
63 timer: T,
64 inner: Arc<Mutex<Inner>>,
65 }
66
67 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
68 struct QueueTimeId(Uuid);
69
70 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
71 struct JobId(Uuid);
72
73 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
74 struct RunnerId(Uuid);
75
76 type OrderedKey = (String, QueueTimeId);
77 type JobState = Option<(RunnerId, OffsetDateTime)>;
78 type JobMeta = (JobId, time::Duration, JobState);
79 type QueueMeta = (JobInfo, QueueTimeId);
80
81 struct Inner {
82 queues: HashMap<String, Event>,
83 jobs: HashMap<JobId, QueueMeta>,
84 queue_jobs: BTreeMap<OrderedKey, JobMeta>,
85 }
86
87 impl<T: Timer> Storage<T> {
88 pub fn new(timer: T) -> Self {
90 Storage {
91 inner: Arc::new(Mutex::new(Inner {
92 queues: HashMap::new(),
93 jobs: HashMap::new(),
94 queue_jobs: BTreeMap::new(),
95 })),
96 timer,
97 }
98 }
99
100 fn get(&self, job_id: Uuid) -> Option<JobInfo> {
101 self.inner
102 .lock()
103 .unwrap()
104 .jobs
105 .get(&JobId(job_id))
106 .map(|(job_info, _)| job_info.clone())
107 }
108
109 fn listener(&self, pop_queue: String) -> (EventListener, Duration) {
110 let lower_bound = QueueTimeId(Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)));
111 let now = OffsetDateTime::now_utc();
112
113 let mut inner = self.inner.lock().unwrap();
114
115 let listener = inner.queues.entry(pop_queue.clone()).or_default().listen();
116
117 let duration = inner
118 .queue_jobs
119 .range((
120 Bound::Excluded((pop_queue.clone(), lower_bound)),
121 Bound::Unbounded,
122 ))
123 .filter(|(_, (_, _, meta))| meta.is_none())
124 .filter_map(|(_, (id, _, _))| inner.jobs.get(id))
125 .take_while(|(JobInfo { queue, .. }, _)| queue.as_str() == pop_queue.as_str())
126 .map(|(JobInfo { next_queue, .. }, _)| {
127 if *next_queue > now {
128 *next_queue - now
129 } else {
130 time::Duration::seconds(0)
131 }
132 })
133 .find_map(|duration| duration.try_into().ok());
134
135 (listener, duration.unwrap_or(Duration::from_secs(10)))
136 }
137
138 fn try_pop(&self, queue: &str, runner_id: Uuid) -> Option<JobInfo> {
139 let runner_id = RunnerId(runner_id);
140
141 let lower_bound = QueueTimeId(Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)));
142 let upper_bound = QueueTimeId(Uuid::now_v7());
143 let now = time::OffsetDateTime::now_utc();
144
145 let mut inner = self.inner.lock().unwrap();
146
147 let mut pop_job = None;
148
149 for (_, (job_id, heartbeat_interval, job_meta)) in inner.queue_jobs.range_mut((
150 Bound::Excluded((queue.to_string(), lower_bound)),
151 Bound::Included((queue.to_string(), upper_bound)),
152 )) {
153 if job_meta.is_none()
154 || job_meta.is_some_and(|(_, heartbeat_timestamp)| {
155 heartbeat_timestamp + (5 * *heartbeat_interval) < now
156 })
157 {
158 *job_meta = Some((runner_id, now));
159 pop_job = Some(*job_id);
160 break;
161 }
162 }
163
164 if let Some(job_id) = pop_job {
165 return inner
166 .jobs
167 .get(&job_id)
168 .map(|(job_info, _)| job_info.clone());
169 }
170
171 None
172 }
173
174 fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) {
175 let job_id = JobId(job_id);
176 let runner_id = RunnerId(runner_id);
177
178 let mut inner = self.inner.lock().unwrap();
179
180 let queue_key = if let Some((job, queue_time_id)) = inner.jobs.get(&job_id) {
181 (job.queue.clone(), *queue_time_id)
182 } else {
183 return;
184 };
185
186 if let Some((_, _, found_job_meta)) = inner.queue_jobs.get_mut(&queue_key) {
187 *found_job_meta = Some((runner_id, OffsetDateTime::now_utc()));
188 } else {
189 metrics::counter!("background-jobs.memory.heartbeat.missing-queue-job")
190 .increment(1);
191 tracing::warn!("Missing job meta for {queue_key:?}");
192 }
193 }
194
195 fn remove_job(&self, job_id: Uuid) -> Option<JobInfo> {
196 let job_id = JobId(job_id);
197
198 let mut inner = self.inner.lock().unwrap();
199
200 let (job, queue_time_id) = inner.jobs.remove(&job_id)?;
201 let queue_key = (job.queue.clone(), queue_time_id);
202
203 if inner.queue_jobs.remove(&queue_key).is_none() {
204 metrics::counter!("background-jobs.memory.remove.missing-queue-job").increment(1);
205 tracing::warn!("failed to remove job meta for {queue_key:?}");
206 }
207
208 Some(job)
209 }
210
211 fn insert(&self, job: JobInfo) -> Uuid {
212 let id = JobId(job.id);
213 let queue = job.queue.clone();
214 let queue_time_id = QueueTimeId(job.next_queue_id());
215 let heartbeat_interval = job.heartbeat_interval;
216
217 let mut inner = self.inner.lock().unwrap();
218
219 inner.jobs.insert(id, (job, queue_time_id));
220
221 inner.queue_jobs.insert(
222 (queue.clone(), queue_time_id),
223 (
224 id,
225 time::Duration::milliseconds(heartbeat_interval as _),
226 None,
227 ),
228 );
229
230 inner.queues.entry(queue).or_default().notify(1);
231
232 metrics::gauge!("background-jobs.memory.insert.queues")
233 .set(recordable(inner.queues.len()));
234 metrics::gauge!("background-jobs.memory.insert.jobs").set(recordable(inner.jobs.len()));
235 metrics::gauge!("background-jobs.memory.insert.queue-jobs")
236 .set(recordable(inner.queue_jobs.len()));
237
238 id.0
239 }
240 }
241
242 fn recordable(value: usize) -> u32 {
243 let value = value as u64;
244 let value = value % u64::from(u32::MAX);
245
246 value as _
247 }
248
249 #[async_trait::async_trait]
250 impl<T: Timer + Send + Sync + Clone> super::Storage for Storage<T> {
251 type Error = Infallible;
252
253 #[tracing::instrument(skip(self))]
254 async fn info(&self, job_id: Uuid) -> Result<Option<JobInfo>, Self::Error> {
255 Ok(self.get(job_id))
256 }
257
258 #[tracing::instrument(skip_all)]
260 async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error> {
261 Ok(self.insert(job.build()))
262 }
263
264 #[tracing::instrument(skip(self))]
266 async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error> {
267 loop {
268 let (listener, duration) = self.listener(queue.to_string());
269
270 if let Some(job) = self.try_pop(queue, runner_id) {
271 return Ok(job);
272 }
273
274 match self.timer.timeout(duration, listener).await {
275 Ok(()) => {
276 }
278 Err(()) => {
279 }
281 }
282 }
283 }
284
285 #[tracing::instrument(skip(self))]
287 async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Self::Error> {
288 self.set_heartbeat(job_id, runner_id);
289 Ok(())
290 }
291
292 #[tracing::instrument(skip(self))]
294 async fn complete(
295 &self,
296 ReturnJobInfo { id, result }: ReturnJobInfo,
297 ) -> Result<bool, Self::Error> {
298 let mut job = if let Some(job) = self.remove_job(id) {
299 job
300 } else {
301 return Ok(true);
302 };
303
304 match result {
305 JobResult::Success => Ok(true),
307 JobResult::Unregistered | JobResult::Unexecuted => {
309 self.insert(job);
310 Ok(false)
311 }
312 JobResult::Failure if job.prepare_retry() => {
314 self.insert(job);
315 Ok(false)
316 }
317 JobResult::Failure => Ok(true),
319 }
320 }
321 }
322}