aide-de-camp-sqlite 0.2.0

SQLite implementation of Queue trait from aide-de-camp
Documentation
use crate::types::JobRow;
use aide_de_camp::core::job_handle::JobHandle;
use aide_de_camp::core::queue::QueueError;
use aide_de_camp::core::{Bytes, Xid};
use anyhow::Context;
use async_trait::async_trait;
use sqlx::SqlitePool;

pub struct SqliteJobHandle {
    pool: SqlitePool,
    row: JobRow,
}

#[async_trait]
impl JobHandle for SqliteJobHandle {
    fn id(&self) -> Xid {
        self.row.jid
    }

    fn job_type(&self) -> &str {
        &self.row.job_type
    }

    fn payload(&self) -> Bytes {
        self.row.payload.clone()
    }

    fn retries(&self) -> u32 {
        self.row.retries
    }

    async fn complete(mut self) -> Result<(), QueueError> {
        let jid = self.row.jid.to_string();
        sqlx::query!("DELETE FROM adc_queue where jid = ?1", jid)
            .execute(&self.pool)
            .await
            .context("Failed to mark job as completed")?;
        Ok(())
    }

    async fn fail(mut self) -> Result<(), QueueError> {
        let jid = self.row.jid.to_string();
        sqlx::query!("UPDATE adc_queue SET started_at=null WHERE jid = ?1", jid)
            .execute(&self.pool)
            .await
            .context("Failed to mark job as failed")?;
        Ok(())
    }

    async fn dead_queue(mut self) -> Result<(), QueueError> {
        let jid = self.row.jid.to_string();
        let retries = self.row.retries;
        let job_type = self.row.job_type.clone();
        let payload = self.row.payload.as_ref();
        let scheduled_at = self.row.scheduled_at;
        let enqueued_at = self.row.enqueued_at;

        let mut tx = self
            .pool
            .begin()
            .await
            .context("Failed to start transaction")?;
        sqlx::query!("DELETE FROM adc_queue WHERE jid = ?1", jid)
            .execute(&mut tx)
            .await
            .context("Failed to delete job from the queue")?;

        sqlx::query!("INSERT INTO adc_dead_queue (jid, job_type, payload, retries, scheduled_at, enqueued_at) VALUES (?1, ?2, ?3, ?4,?5,?6)",
            jid,
            job_type,
            payload,
            retries,
            scheduled_at,
            enqueued_at)
            .execute(&mut tx).await
            .context("Failed to move job to dead queue")?;
        Ok(())
    }
}

impl SqliteJobHandle {
    pub(crate) fn new(row: JobRow, pool: SqlitePool) -> Self {
        Self { row, pool }
    }
}