1use std::collections::VecDeque;
38use std::future::Future;
39use std::pin::Pin;
40use std::sync::Arc;
41use std::sync::atomic::AtomicBool;
42use std::sync::atomic::AtomicU64;
43use std::sync::atomic::Ordering;
44use std::time::Duration;
45use std::time::Instant;
46
47use parking_lot::Mutex;
48use scc::HashMap as SccHashMap;
49use tokio::sync::Notify;
50
51#[derive(Debug)]
53pub enum QueueError {
54 UnknownJob(String),
56 SerializeError(String),
58 HandlerError(String),
60 Shutdown,
62}
63
64impl std::fmt::Display for QueueError {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 match self {
67 Self::UnknownJob(name) => write!(f, "no handler registered for job '{name}'"),
68 Self::SerializeError(e) => write!(f, "failed to serialize job payload: {e}"),
69 Self::HandlerError(e) => write!(f, "job handler error: {e}"),
70 Self::Shutdown => write!(f, "queue has been shut down"),
71 }
72 }
73}
74
75impl std::error::Error for QueueError {}
76
77#[derive(Debug, Clone)]
79pub enum RetryPolicy {
80 None,
82 Fixed {
84 max_retries: u32,
86 delay: Duration,
88 },
89 Exponential {
91 max_retries: u32,
93 base_delay: Duration,
95 },
96}
97
98impl RetryPolicy {
99 pub fn fixed(max_retries: u32, delay: Duration) -> Self {
101 Self::Fixed { max_retries, delay }
102 }
103
104 pub fn exponential(max_retries: u32, base_delay: Duration) -> Self {
106 Self::Exponential {
107 max_retries,
108 base_delay,
109 }
110 }
111
112 fn max_retries(&self) -> u32 {
113 match self {
114 Self::None => 0,
115 Self::Fixed { max_retries, .. } | Self::Exponential { max_retries, .. } => *max_retries,
116 }
117 }
118
119 fn delay_for_attempt(&self, attempt: u32) -> Duration {
120 match self {
121 Self::None => Duration::ZERO,
122 Self::Fixed { delay, .. } => *delay,
123 Self::Exponential { base_delay, .. } => *base_delay * 2u32.saturating_pow(attempt),
124 }
125 }
126}
127
128impl Default for RetryPolicy {
129 fn default() -> Self {
130 Self::None
131 }
132}
133
134pub struct Job {
138 pub(crate) payload: Vec<u8>,
140 pub name: String,
142 pub attempt: u32,
144 pub id: u64,
146}
147
148impl Job {
149 pub fn deserialize<T: serde::de::DeserializeOwned>(&self) -> Result<T, QueueError> {
151 serde_json::from_slice(&self.payload).map_err(|e| QueueError::HandlerError(e.to_string()))
152 }
153
154 pub fn raw_payload(&self) -> &[u8] {
156 &self.payload
157 }
158}
159
160#[derive(Debug, Clone)]
162pub struct DeadJob {
163 pub id: u64,
165 pub name: String,
167 pub payload: Vec<u8>,
169 pub attempts: u32,
171 pub error: String,
173 pub failed_at: Instant,
175}
176
177struct PendingJob {
178 id: u64,
179 name: String,
180 payload: Vec<u8>,
181 attempt: u32,
182 run_after: Option<Instant>,
183}
184
185type BoxHandler =
186 Arc<dyn Fn(Job) -> Pin<Box<dyn Future<Output = Result<(), QueueError>> + Send>> + Send + Sync>;
187
188struct QueueInner {
189 pending: Mutex<VecDeque<PendingJob>>,
191 handlers: SccHashMap<String, BoxHandler>,
193 dead_letters: Mutex<Vec<DeadJob>>,
195 notify: Notify,
197 next_id: AtomicU64,
199 num_workers: usize,
201 retry_policy: RetryPolicy,
203 shutdown: AtomicBool,
205 inflight: AtomicU64,
207 drain_notify: Notify,
209}
210
211#[derive(Clone)]
220pub struct Queue {
221 inner: Arc<QueueInner>,
222}
223
224pub struct QueueBuilder {
226 workers: usize,
227 retry: RetryPolicy,
228}
229
230impl QueueBuilder {
231 pub fn workers(mut self, n: usize) -> Self {
233 self.workers = n.max(1);
234 self
235 }
236
237 pub fn retry(mut self, policy: RetryPolicy) -> Self {
239 self.retry = policy;
240 self
241 }
242
243 pub fn build(self) -> Queue {
245 Queue {
246 inner: Arc::new(QueueInner {
247 pending: Mutex::new(VecDeque::new()),
248 handlers: SccHashMap::new(),
249 dead_letters: Mutex::new(Vec::new()),
250 notify: Notify::new(),
251 next_id: AtomicU64::new(1),
252 num_workers: self.workers,
253 retry_policy: self.retry,
254 shutdown: AtomicBool::new(false),
255 inflight: AtomicU64::new(0),
256 drain_notify: Notify::new(),
257 }),
258 }
259 }
260}
261
262impl Queue {
263 pub fn new() -> Self {
265 Self::builder().build()
266 }
267
268 pub fn builder() -> QueueBuilder {
270 QueueBuilder {
271 workers: 4,
272 retry: RetryPolicy::default(),
273 }
274 }
275
276 pub fn register<F, Fut>(&self, name: impl Into<String>, handler: F)
290 where
291 F: Fn(Job) -> Fut + Send + Sync + 'static,
292 Fut: Future<Output = Result<(), QueueError>> + Send + 'static,
293 {
294 let name = name.into();
295 let handler: BoxHandler = Arc::new(move |job| Box::pin(handler(job)));
296 let _ = self.inner.handlers.insert_sync(name, handler);
297 }
298
299 pub async fn push(
303 &self,
304 name: impl Into<String>,
305 payload: &(impl serde::Serialize + ?Sized),
306 ) -> Result<u64, QueueError> {
307 self.push_inner(name.into(), payload, None)
308 }
309
310 pub async fn push_delayed(
314 &self,
315 name: impl Into<String>,
316 payload: &(impl serde::Serialize + ?Sized),
317 delay: Duration,
318 ) -> Result<u64, QueueError> {
319 self.push_inner(name.into(), payload, Some(Instant::now() + delay))
320 }
321
322 fn push_inner(
323 &self,
324 name: String,
325 payload: &(impl serde::Serialize + ?Sized),
326 run_after: Option<Instant>,
327 ) -> Result<u64, QueueError> {
328 if self.inner.shutdown.load(Ordering::SeqCst) {
329 return Err(QueueError::Shutdown);
330 }
331
332 let bytes =
333 serde_json::to_vec(payload).map_err(|e| QueueError::SerializeError(e.to_string()))?;
334
335 let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst);
336
337 self.inner.pending.lock().push_back(PendingJob {
338 id,
339 name,
340 payload: bytes,
341 attempt: 0,
342 run_after,
343 });
344
345 self.inner.notify.notify_one();
346 Ok(id)
347 }
348
349 #[cfg(not(feature = "compio"))]
354 pub fn start(&self) {
355 for _ in 0..self.inner.num_workers {
356 let inner = self.inner.clone();
357 tokio::spawn(async move { worker_loop(inner).await });
358 }
359 tracing::debug!("Queue started with {} workers", self.inner.num_workers);
360 }
361
362 #[cfg(feature = "compio")]
364 pub fn start(&self) {
365 for _ in 0..self.inner.num_workers {
366 let inner = self.inner.clone();
367 compio::runtime::spawn(async move { worker_loop(inner).await }).detach();
368 }
369 tracing::debug!("Queue started with {} workers", self.inner.num_workers);
370 }
371
372 pub async fn shutdown(&self, timeout: Duration) {
377 self.inner.shutdown.store(true, Ordering::SeqCst);
378 for _ in 0..self.inner.num_workers {
380 self.inner.notify.notify_one();
381 }
382
383 if self.inner.inflight.load(Ordering::SeqCst) > 0 {
384 #[cfg(not(feature = "compio"))]
385 {
386 let _ = tokio::time::timeout(timeout, self.inner.drain_notify.notified()).await;
387 }
388 #[cfg(feature = "compio")]
389 {
390 let drain = std::pin::pin!(self.inner.drain_notify.notified());
391 let sleep = std::pin::pin!(compio::time::sleep(timeout));
392 let _ = futures_util::future::select(drain, sleep).await;
393 }
394 }
395
396 tracing::debug!("Queue shut down");
397 }
398
399 pub fn dead_letters(&self) -> Vec<DeadJob> {
401 self.inner.dead_letters.lock().clone()
402 }
403
404 pub fn clear_dead_letters(&self) {
406 self.inner.dead_letters.lock().clear();
407 }
408
409 pub fn pending_count(&self) -> usize {
411 self.inner.pending.lock().len()
412 }
413
414 pub fn inflight_count(&self) -> u64 {
416 self.inner.inflight.load(Ordering::SeqCst)
417 }
418}
419
420impl Default for Queue {
421 fn default() -> Self {
422 Self::new()
423 }
424}
425
426async fn worker_loop(inner: Arc<QueueInner>) {
427 loop {
428 #[cfg(not(feature = "compio"))]
430 {
431 let _ = tokio::time::timeout(Duration::from_millis(100), inner.notify.notified()).await;
432 }
433 #[cfg(feature = "compio")]
434 {
435 let notified = std::pin::pin!(inner.notify.notified());
436 let sleep = std::pin::pin!(compio::time::sleep(Duration::from_millis(100)));
437 let _ = futures_util::future::select(notified, sleep).await;
438 }
439
440 if inner.shutdown.load(Ordering::SeqCst) && inner.pending.lock().is_empty() {
441 break;
442 }
443
444 let job = {
446 let mut pending = inner.pending.lock();
447 let now = Instant::now();
448
449 let pos = pending.iter().position(|j| match j.run_after {
451 Some(t) => now >= t,
452 None => true,
453 });
454
455 pos.and_then(|i| pending.remove(i))
456 };
457
458 let Some(pending_job) = job else {
459 continue;
460 };
461
462 let handler = inner
464 .handlers
465 .get_async(&pending_job.name)
466 .await
467 .map(|e| e.get().clone());
468
469 let Some(handler) = handler else {
470 tracing::warn!("No handler for job '{}', moving to DLQ", pending_job.name);
471 inner.dead_letters.lock().push(DeadJob {
472 id: pending_job.id,
473 name: pending_job.name,
474 payload: pending_job.payload,
475 attempts: pending_job.attempt + 1,
476 error: "no handler registered".into(),
477 failed_at: Instant::now(),
478 });
479 continue;
480 };
481
482 inner.inflight.fetch_add(1, Ordering::SeqCst);
483
484 let job = Job {
485 payload: pending_job.payload.clone(),
486 name: pending_job.name.clone(),
487 attempt: pending_job.attempt,
488 id: pending_job.id,
489 };
490
491 let result = handler(job).await;
492
493 if let Err(e) = result {
494 let max_retries = inner.retry_policy.max_retries();
495
496 if pending_job.attempt < max_retries {
497 let next_attempt = pending_job.attempt + 1;
498 let delay = inner.retry_policy.delay_for_attempt(pending_job.attempt);
499
500 tracing::debug!(
501 "Job '{}' (id={}) failed (attempt {}/{}), retrying in {:?}",
502 pending_job.name,
503 pending_job.id,
504 next_attempt,
505 max_retries,
506 delay
507 );
508
509 inner.pending.lock().push_back(PendingJob {
510 id: pending_job.id,
511 name: pending_job.name,
512 payload: pending_job.payload,
513 attempt: next_attempt,
514 run_after: Some(Instant::now() + delay),
515 });
516
517 inner.notify.notify_one();
518 } else {
519 tracing::warn!(
520 "Job '{}' (id={}) exhausted {} retries, moving to DLQ: {}",
521 pending_job.name,
522 pending_job.id,
523 max_retries,
524 e
525 );
526
527 inner.dead_letters.lock().push(DeadJob {
528 id: pending_job.id,
529 name: pending_job.name,
530 payload: pending_job.payload,
531 attempts: pending_job.attempt + 1,
532 error: e.to_string(),
533 failed_at: Instant::now(),
534 });
535 }
536 }
537
538 let prev = inner.inflight.fetch_sub(1, Ordering::SeqCst);
539 if prev == 1 && inner.shutdown.load(Ordering::SeqCst) {
540 inner.drain_notify.notify_one();
541 }
542 }
543}