lilqueue 0.1.0

A small storage-agnostic async job queue runner
Documentation
use crate::{
    ClaimedJob, Job, JobQueue, NewJob, ProcessorOptions, RetryableQueue, RunOutcome,
    queue::BoxError,
};
use std::{
    marker::PhantomData,
    sync::Arc,
    time::{Duration, SystemTime, UNIX_EPOCH},
};

#[derive(Debug, thiserror::Error)]
pub enum QueueError {
    #[error("queue error: {0}")]
    Queue(#[from] BoxError),
    #[error("serialization error: {0}")]
    Serialization(#[from] serde_json::Error),
    #[error("clock error: {0}")]
    Clock(#[from] std::time::SystemTimeError),
}

pub struct JobProcessor<J, Q>
where
    J: Job,
    Q: JobQueue,
{
    queue: Q,
    options: ProcessorOptions,
    enqueue_notify: Arc<tokio::sync::Notify>,
    _marker: PhantomData<J>,
}

#[must_use = "workers are stopped when the handle is dropped"]
pub struct WorkerHandle {
    shutdown: Arc<tokio::sync::Notify>,
    tasks: Vec<tokio::task::JoinHandle<()>>,
}

impl WorkerHandle {
    pub fn shutdown(&self) {
        self.shutdown.notify_waiters();
    }

    pub async fn wait(mut self) {
        for task in self.tasks.drain(..) {
            let _ = task.await;
        }
    }

    pub async fn shutdown_and_wait(mut self) {
        self.shutdown.notify_waiters();
        for task in self.tasks.drain(..) {
            let _ = task.await;
        }
    }
}

impl Drop for WorkerHandle {
    fn drop(&mut self) {
        self.shutdown.notify_waiters();
        for task in &self.tasks {
            task.abort();
        }
    }
}

impl<J, Q> Clone for JobProcessor<J, Q>
where
    J: Job,
    Q: JobQueue,
{
    fn clone(&self) -> Self {
        Self {
            queue: self.queue.clone(),
            options: self.options.clone(),
            enqueue_notify: Arc::clone(&self.enqueue_notify),
            _marker: PhantomData,
        }
    }
}

impl<J, Q> JobProcessor<J, Q>
where
    J: Job,
    Q: JobQueue,
{
    pub fn new(queue: Q, options: ProcessorOptions) -> Self {
        Self {
            queue,
            options,
            enqueue_notify: Arc::new(tokio::sync::Notify::new()),
            _marker: PhantomData,
        }
    }

    pub fn options(&self) -> &ProcessorOptions {
        &self.options
    }

    pub fn queue(&self) -> &Q {
        &self.queue
    }

    pub fn wake_workers(&self) {
        self.enqueue_notify.notify_waiters();
    }

    pub async fn enqueue(&self, job: &J) -> Result<i64, QueueError> {
        self.enqueue_with_delay(job, Duration::ZERO).await
    }

    pub async fn enqueue_with_delay(&self, job: &J, delay: Duration) -> Result<i64, QueueError> {
        let now = now_epoch_seconds()?;
        let payload = serde_json::to_string(job)?;
        let id = self
            .queue
            .enqueue(NewJob {
                job_type: J::job_type().to_string(),
                payload,
                available_at: now.saturating_add(duration_to_secs(delay)),
                max_attempts: self.options.max_attempts,
                enqueued_at: now,
            })
            .await?;
        self.enqueue_notify.notify_one();
        Ok(id)
    }
}

impl<J, Q> JobProcessor<J, Q>
where
    J: Job,
    Q: RetryableQueue,
{
    pub fn spawn_worker(&self) -> WorkerHandle {
        self.spawn_workers(1)
    }

    pub fn spawn_workers(&self, concurrency: usize) -> WorkerHandle {
        let count = concurrency.max(1);
        let shutdown = Arc::new(tokio::sync::Notify::new());
        let mut tasks = Vec::with_capacity(count);

        for _ in 0..count {
            let processor = self.clone();
            let shutdown_signal = Arc::clone(&shutdown);
            let retry_delay = non_zero_poll_interval(self.options.poll_interval);

            tasks.push(tokio::spawn(async move {
                loop {
                    match processor.run_until_notified(shutdown_signal.as_ref()).await {
                        Ok(()) => break,
                        Err(_) => tokio::time::sleep(retry_delay).await,
                    }
                }
            }));
        }

        WorkerHandle { shutdown, tasks }
    }

    pub(crate) async fn run_once(&self) -> Result<RunOutcome, QueueError> {
        let Some(claimed) = self.queue.claim(J::job_type()).await? else {
            return Ok(RunOutcome::Idle);
        };

        let job_id = claimed.id;
        let attempts = claimed.attempts;
        let max_attempts = claimed.max_attempts;
        let job = serde_json::from_str::<J>(&claimed.payload)?;

        match job.process().await {
            Ok(()) => {
                self.queue.complete(claimed).await?;
                Ok(RunOutcome::Completed { job_id, attempts })
            }
            Err(job_error) => {
                self.handle_job_error(claimed, job_id, attempts, max_attempts, job_error)
                    .await
            }
        }
    }

    pub(crate) async fn run_until_notified(
        &self,
        shutdown: &tokio::sync::Notify,
    ) -> Result<(), QueueError> {
        loop {
            match self.run_once().await? {
                RunOutcome::Idle => {
                    let wake_delay = self.next_wakeup_delay().await?;
                    if self
                        .wait_for_enqueue_or_shutdown_notify(shutdown, wake_delay)
                        .await
                    {
                        return Ok(());
                    }
                }
                _ => {}
            }
        }
    }

    async fn handle_job_error(
        &self,
        claimed: ClaimedJob<Q::Claim>,
        job_id: i64,
        attempts: u32,
        max_attempts: u32,
        job_error: crate::JobError,
    ) -> Result<RunOutcome, QueueError> {
        let error_message = job_error.to_string();
        let retry = job_error.is_retryable() && attempts < max_attempts;

        if retry {
            let delay = self.options.backoff.delay_for_attempt(attempts);
            let next_run_at = now_epoch_seconds()?.saturating_add(duration_to_secs(delay));
            self.queue
                .retry(claimed, next_run_at, error_message.clone())
                .await?;

            Ok(RunOutcome::Retried {
                job_id,
                attempts,
                next_run_at,
                error: error_message,
            })
        } else {
            self.queue.fail(claimed, error_message.clone()).await?;

            Ok(RunOutcome::Failed {
                job_id,
                attempts,
                error: error_message,
            })
        }
    }

    async fn next_wakeup_delay(&self) -> Result<Option<Duration>, QueueError> {
        let Some(wake_at) = self.queue.next_wakeup_at(J::job_type()).await? else {
            return Ok(None);
        };
        let now = now_epoch_seconds()?;

        if wake_at <= now {
            return Ok(Some(non_zero_poll_interval(self.options.poll_interval)));
        }

        let delay_secs = u64::try_from(wake_at.saturating_sub(now)).unwrap_or(u64::MAX);
        Ok(Some(Duration::from_secs(delay_secs)))
    }

    async fn wait_for_enqueue_or_shutdown_notify(
        &self,
        shutdown: &tokio::sync::Notify,
        wake_delay: Option<Duration>,
    ) -> bool {
        match wake_delay {
            Some(delay) => {
                tokio::select! {
                    _ = self.enqueue_notify.notified() => false,
                    _ = tokio::time::sleep(delay) => false,
                    _ = shutdown.notified() => true,
                }
            }
            None => {
                tokio::select! {
                    _ = self.enqueue_notify.notified() => false,
                    _ = shutdown.notified() => true,
                }
            }
        }
    }
}

fn now_epoch_seconds() -> Result<i64, std::time::SystemTimeError> {
    let secs = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
    Ok(i64::try_from(secs).unwrap_or(i64::MAX))
}

fn duration_to_secs(duration: Duration) -> i64 {
    i64::try_from(duration.as_secs()).unwrap_or(i64::MAX)
}

fn non_zero_poll_interval(interval: Duration) -> Duration {
    if interval.is_zero() {
        Duration::from_millis(1)
    } else {
        interval
    }
}