Skip to main content

tako/
queue.rs

1//! In-memory background job queue with named queues, retry policies, and dead letter support.
2//!
3//! Provides a lightweight task queue for deferring work to background workers —
4//! useful for sending emails, webhooks, async processing, etc.
5//!
6//! # Features
7//!
8//! - **Named queues** — separate logical channels (e.g. `"email"`, `"webhook"`)
9//! - **Configurable workers** — per-queue concurrency limit
10//! - **Retry policy** — fixed or exponential backoff with max attempts
11//! - **Delayed jobs** — schedule execution after a duration
12//! - **Dead letter queue** — failed jobs stored for inspection
13//! - **Graceful shutdown** — drain in-flight jobs before exit
14//!
15//! # Examples
16//!
17//! ```rust,no_run
18//! use tako::queue::{Queue, RetryPolicy, Job};
19//! use std::time::Duration;
20//!
21//! # async fn example() {
22//! let queue = Queue::builder()
23//!     .workers(4)
24//!     .retry(RetryPolicy::exponential(3, Duration::from_secs(1)))
25//!     .build();
26//!
27//! queue.register("send_email", |job: Job| async move {
28//!     let to: String = job.deserialize()?;
29//!     println!("Sending email to {to}");
30//!     Ok(())
31//! });
32//!
33//! queue.push("send_email", &"user@example.com").await.unwrap();
34//! # }
35//! ```
36
37use std::collections::VecDeque;
38use std::future::Future;
39use std::pin::Pin;
40use std::sync::Arc;
41use std::sync::atomic::AtomicBool;
42use std::sync::atomic::AtomicU64;
43use std::sync::atomic::Ordering;
44use std::time::Duration;
45use std::time::Instant;
46
47use parking_lot::Mutex;
48use scc::HashMap as SccHashMap;
49use tokio::sync::Notify;
50
51/// Error type for queue operations.
52#[derive(Debug)]
53pub enum QueueError {
54  /// No handler registered for the given job name.
55  UnknownJob(String),
56  /// Failed to serialize job payload.
57  SerializeError(String),
58  /// The job handler returned an error.
59  HandlerError(String),
60  /// Queue has been shut down.
61  Shutdown,
62}
63
64impl std::fmt::Display for QueueError {
65  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66    match self {
67      Self::UnknownJob(name) => write!(f, "no handler registered for job '{name}'"),
68      Self::SerializeError(e) => write!(f, "failed to serialize job payload: {e}"),
69      Self::HandlerError(e) => write!(f, "job handler error: {e}"),
70      Self::Shutdown => write!(f, "queue has been shut down"),
71    }
72  }
73}
74
75impl std::error::Error for QueueError {}
76
77/// Retry policy for failed jobs.
78#[derive(Debug, Clone)]
79pub enum RetryPolicy {
80  /// No retries — failed jobs go straight to the dead letter queue.
81  None,
82  /// Fixed delay between retries.
83  Fixed {
84    /// Maximum number of retry attempts.
85    max_retries: u32,
86    /// Delay between each retry.
87    delay: Duration,
88  },
89  /// Exponential backoff between retries.
90  Exponential {
91    /// Maximum number of retry attempts.
92    max_retries: u32,
93    /// Initial delay (doubled on each retry).
94    base_delay: Duration,
95  },
96}
97
98impl RetryPolicy {
99  /// Create a fixed-delay retry policy.
100  pub fn fixed(max_retries: u32, delay: Duration) -> Self {
101    Self::Fixed { max_retries, delay }
102  }
103
104  /// Create an exponential-backoff retry policy.
105  pub fn exponential(max_retries: u32, base_delay: Duration) -> Self {
106    Self::Exponential {
107      max_retries,
108      base_delay,
109    }
110  }
111
112  fn max_retries(&self) -> u32 {
113    match self {
114      Self::None => 0,
115      Self::Fixed { max_retries, .. } | Self::Exponential { max_retries, .. } => *max_retries,
116    }
117  }
118
119  fn delay_for_attempt(&self, attempt: u32) -> Duration {
120    match self {
121      Self::None => Duration::ZERO,
122      Self::Fixed { delay, .. } => *delay,
123      Self::Exponential { base_delay, .. } => *base_delay * 2u32.saturating_pow(attempt),
124    }
125  }
126}
127
128impl Default for RetryPolicy {
129  fn default() -> Self {
130    Self::None
131  }
132}
133
134/// A job passed to a handler function.
135///
136/// Contains the serialized payload and metadata about the job.
137pub struct Job {
138  /// The raw JSON payload.
139  pub(crate) payload: Vec<u8>,
140  /// Job name (the key it was registered under).
141  pub name: String,
142  /// Current attempt number (0-based).
143  pub attempt: u32,
144  /// Unique job ID.
145  pub id: u64,
146}
147
148impl Job {
149  /// Deserialize the job payload into the expected type.
150  pub fn deserialize<T: serde::de::DeserializeOwned>(&self) -> Result<T, QueueError> {
151    serde_json::from_slice(&self.payload).map_err(|e| QueueError::HandlerError(e.to_string()))
152  }
153
154  /// Access the raw payload bytes.
155  pub fn raw_payload(&self) -> &[u8] {
156    &self.payload
157  }
158}
159
160/// A failed job stored in the dead letter queue.
161#[derive(Debug, Clone)]
162pub struct DeadJob {
163  /// Unique job ID.
164  pub id: u64,
165  /// Job name.
166  pub name: String,
167  /// Raw payload.
168  pub payload: Vec<u8>,
169  /// Number of attempts made.
170  pub attempts: u32,
171  /// The final error message.
172  pub error: String,
173  /// When the job was moved to the DLQ.
174  pub failed_at: Instant,
175}
176
177struct PendingJob {
178  id: u64,
179  name: String,
180  payload: Vec<u8>,
181  attempt: u32,
182  run_after: Option<Instant>,
183}
184
185type BoxHandler =
186  Arc<dyn Fn(Job) -> Pin<Box<dyn Future<Output = Result<(), QueueError>> + Send>> + Send + Sync>;
187
188struct QueueInner {
189  /// Pending jobs waiting to be processed.
190  pending: Mutex<VecDeque<PendingJob>>,
191  /// Registered job handlers by name.
192  handlers: SccHashMap<String, BoxHandler>,
193  /// Dead letter queue.
194  dead_letters: Mutex<Vec<DeadJob>>,
195  /// Notify workers when new jobs arrive.
196  notify: Notify,
197  /// Monotonically increasing job ID counter.
198  next_id: AtomicU64,
199  /// Number of worker tasks.
200  num_workers: usize,
201  /// Retry policy.
202  retry_policy: RetryPolicy,
203  /// Whether the queue has been shut down.
204  shutdown: AtomicBool,
205  /// Track in-flight jobs for graceful shutdown.
206  inflight: AtomicU64,
207  /// Notify when inflight reaches 0.
208  drain_notify: Notify,
209}
210
211/// An in-memory background job queue.
212///
213/// Create via [`Queue::builder()`] or [`Queue::new()`].
214/// Register handlers with [`register()`](Queue::register), then push jobs
215/// with [`push()`](Queue::push) or [`push_delayed()`](Queue::push_delayed).
216///
217/// The queue must be started with [`start()`](Queue::start) to spawn
218/// background worker tasks that process jobs.
219#[derive(Clone)]
220pub struct Queue {
221  inner: Arc<QueueInner>,
222}
223
224/// Builder for configuring a [`Queue`].
225pub struct QueueBuilder {
226  workers: usize,
227  retry: RetryPolicy,
228}
229
230impl QueueBuilder {
231  /// Set the number of worker tasks (default: 4).
232  pub fn workers(mut self, n: usize) -> Self {
233    self.workers = n.max(1);
234    self
235  }
236
237  /// Set the retry policy for failed jobs.
238  pub fn retry(mut self, policy: RetryPolicy) -> Self {
239    self.retry = policy;
240    self
241  }
242
243  /// Build the queue. Call [`Queue::start()`] to begin processing.
244  pub fn build(self) -> Queue {
245    Queue {
246      inner: Arc::new(QueueInner {
247        pending: Mutex::new(VecDeque::new()),
248        handlers: SccHashMap::new(),
249        dead_letters: Mutex::new(Vec::new()),
250        notify: Notify::new(),
251        next_id: AtomicU64::new(1),
252        num_workers: self.workers,
253        retry_policy: self.retry,
254        shutdown: AtomicBool::new(false),
255        inflight: AtomicU64::new(0),
256        drain_notify: Notify::new(),
257      }),
258    }
259  }
260}
261
262impl Queue {
263  /// Create a queue with default settings (4 workers, no retries).
264  pub fn new() -> Self {
265    Self::builder().build()
266  }
267
268  /// Create a builder for customizing the queue.
269  pub fn builder() -> QueueBuilder {
270    QueueBuilder {
271      workers: 4,
272      retry: RetryPolicy::default(),
273    }
274  }
275
276  /// Register a named job handler.
277  ///
278  /// The handler receives a [`Job`] and returns `Result<(), QueueError>`.
279  ///
280  /// # Examples
281  ///
282  /// ```rust,ignore
283  /// queue.register("process_order", |job: Job| async move {
284  ///     let order_id: u64 = job.deserialize()?;
285  ///     // process the order ...
286  ///     Ok(())
287  /// });
288  /// ```
289  pub fn register<F, Fut>(&self, name: impl Into<String>, handler: F)
290  where
291    F: Fn(Job) -> Fut + Send + Sync + 'static,
292    Fut: Future<Output = Result<(), QueueError>> + Send + 'static,
293  {
294    let name = name.into();
295    let handler: BoxHandler = Arc::new(move |job| Box::pin(handler(job)));
296    let _ = self.inner.handlers.insert_sync(name, handler);
297  }
298
299  /// Push a job for immediate execution.
300  ///
301  /// The payload is serialized to JSON. Returns the job ID.
302  pub async fn push(
303    &self,
304    name: impl Into<String>,
305    payload: &(impl serde::Serialize + ?Sized),
306  ) -> Result<u64, QueueError> {
307    self.push_inner(name.into(), payload, None)
308  }
309
310  /// Push a job for delayed execution.
311  ///
312  /// The job will not be picked up by a worker until `delay` has elapsed.
313  pub async fn push_delayed(
314    &self,
315    name: impl Into<String>,
316    payload: &(impl serde::Serialize + ?Sized),
317    delay: Duration,
318  ) -> Result<u64, QueueError> {
319    self.push_inner(name.into(), payload, Some(Instant::now() + delay))
320  }
321
322  fn push_inner(
323    &self,
324    name: String,
325    payload: &(impl serde::Serialize + ?Sized),
326    run_after: Option<Instant>,
327  ) -> Result<u64, QueueError> {
328    if self.inner.shutdown.load(Ordering::SeqCst) {
329      return Err(QueueError::Shutdown);
330    }
331
332    let bytes =
333      serde_json::to_vec(payload).map_err(|e| QueueError::SerializeError(e.to_string()))?;
334
335    let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst);
336
337    self.inner.pending.lock().push_back(PendingJob {
338      id,
339      name,
340      payload: bytes,
341      attempt: 0,
342      run_after,
343    });
344
345    self.inner.notify.notify_one();
346    Ok(id)
347  }
348
349  /// Start background worker tasks.
350  ///
351  /// This spawns `workers` number of tokio tasks that process jobs from the queue.
352  /// Must be called once before pushing jobs.
353  #[cfg(not(feature = "compio"))]
354  pub fn start(&self) {
355    for _ in 0..self.inner.num_workers {
356      let inner = self.inner.clone();
357      tokio::spawn(async move { worker_loop(inner).await });
358    }
359    tracing::debug!("Queue started with {} workers", self.inner.num_workers);
360  }
361
362  /// Start background worker tasks (compio runtime).
363  #[cfg(feature = "compio")]
364  pub fn start(&self) {
365    for _ in 0..self.inner.num_workers {
366      let inner = self.inner.clone();
367      compio::runtime::spawn(async move { worker_loop(inner).await }).detach();
368    }
369    tracing::debug!("Queue started with {} workers", self.inner.num_workers);
370  }
371
372  /// Gracefully shut down the queue.
373  ///
374  /// Stops accepting new jobs and waits for in-flight jobs to complete
375  /// (up to the given timeout).
376  pub async fn shutdown(&self, timeout: Duration) {
377    self.inner.shutdown.store(true, Ordering::SeqCst);
378    // Wake all workers so they see the shutdown flag
379    for _ in 0..self.inner.num_workers {
380      self.inner.notify.notify_one();
381    }
382
383    if self.inner.inflight.load(Ordering::SeqCst) > 0 {
384      #[cfg(not(feature = "compio"))]
385      {
386        let _ = tokio::time::timeout(timeout, self.inner.drain_notify.notified()).await;
387      }
388      #[cfg(feature = "compio")]
389      {
390        let drain = std::pin::pin!(self.inner.drain_notify.notified());
391        let sleep = std::pin::pin!(compio::time::sleep(timeout));
392        let _ = futures_util::future::select(drain, sleep).await;
393      }
394    }
395
396    tracing::debug!("Queue shut down");
397  }
398
399  /// Returns a snapshot of jobs in the dead letter queue.
400  pub fn dead_letters(&self) -> Vec<DeadJob> {
401    self.inner.dead_letters.lock().clone()
402  }
403
404  /// Clear all dead letters.
405  pub fn clear_dead_letters(&self) {
406    self.inner.dead_letters.lock().clear();
407  }
408
409  /// Returns the number of pending jobs.
410  pub fn pending_count(&self) -> usize {
411    self.inner.pending.lock().len()
412  }
413
414  /// Returns the number of currently in-flight jobs.
415  pub fn inflight_count(&self) -> u64 {
416    self.inner.inflight.load(Ordering::SeqCst)
417  }
418}
419
420impl Default for Queue {
421  fn default() -> Self {
422    Self::new()
423  }
424}
425
426async fn worker_loop(inner: Arc<QueueInner>) {
427  loop {
428    // Wait for notification or check periodically for delayed jobs
429    #[cfg(not(feature = "compio"))]
430    {
431      let _ = tokio::time::timeout(Duration::from_millis(100), inner.notify.notified()).await;
432    }
433    #[cfg(feature = "compio")]
434    {
435      let notified = std::pin::pin!(inner.notify.notified());
436      let sleep = std::pin::pin!(compio::time::sleep(Duration::from_millis(100)));
437      let _ = futures_util::future::select(notified, sleep).await;
438    }
439
440    if inner.shutdown.load(Ordering::SeqCst) && inner.pending.lock().is_empty() {
441      break;
442    }
443
444    // Try to pick up a job
445    let job = {
446      let mut pending = inner.pending.lock();
447      let now = Instant::now();
448
449      // Find the first job that's ready to run
450      let pos = pending.iter().position(|j| match j.run_after {
451        Some(t) => now >= t,
452        None => true,
453      });
454
455      pos.and_then(|i| pending.remove(i))
456    };
457
458    let Some(pending_job) = job else {
459      continue;
460    };
461
462    // Look up handler
463    let handler = inner
464      .handlers
465      .get_async(&pending_job.name)
466      .await
467      .map(|e| e.get().clone());
468
469    let Some(handler) = handler else {
470      tracing::warn!("No handler for job '{}', moving to DLQ", pending_job.name);
471      inner.dead_letters.lock().push(DeadJob {
472        id: pending_job.id,
473        name: pending_job.name,
474        payload: pending_job.payload,
475        attempts: pending_job.attempt + 1,
476        error: "no handler registered".into(),
477        failed_at: Instant::now(),
478      });
479      continue;
480    };
481
482    inner.inflight.fetch_add(1, Ordering::SeqCst);
483
484    let job = Job {
485      payload: pending_job.payload.clone(),
486      name: pending_job.name.clone(),
487      attempt: pending_job.attempt,
488      id: pending_job.id,
489    };
490
491    let result = handler(job).await;
492
493    if let Err(e) = result {
494      let max_retries = inner.retry_policy.max_retries();
495
496      if pending_job.attempt < max_retries {
497        let next_attempt = pending_job.attempt + 1;
498        let delay = inner.retry_policy.delay_for_attempt(pending_job.attempt);
499
500        tracing::debug!(
501          "Job '{}' (id={}) failed (attempt {}/{}), retrying in {:?}",
502          pending_job.name,
503          pending_job.id,
504          next_attempt,
505          max_retries,
506          delay
507        );
508
509        inner.pending.lock().push_back(PendingJob {
510          id: pending_job.id,
511          name: pending_job.name,
512          payload: pending_job.payload,
513          attempt: next_attempt,
514          run_after: Some(Instant::now() + delay),
515        });
516
517        inner.notify.notify_one();
518      } else {
519        tracing::warn!(
520          "Job '{}' (id={}) exhausted {} retries, moving to DLQ: {}",
521          pending_job.name,
522          pending_job.id,
523          max_retries,
524          e
525        );
526
527        inner.dead_letters.lock().push(DeadJob {
528          id: pending_job.id,
529          name: pending_job.name,
530          payload: pending_job.payload,
531          attempts: pending_job.attempt + 1,
532          error: e.to_string(),
533          failed_at: Instant::now(),
534        });
535      }
536    }
537
538    let prev = inner.inflight.fetch_sub(1, Ordering::SeqCst);
539    if prev == 1 && inner.shutdown.load(Ordering::SeqCst) {
540      inner.drain_notify.notify_one();
541    }
542  }
543}