aide_de_camp_sqlite/
queue.rs

1use crate::job_handle::SqliteJobHandle;
2use crate::types::JobRow;
3use aide_de_camp::core::job_processor::JobProcessor;
4use aide_de_camp::core::queue::{Queue, QueueError};
5use aide_de_camp::core::{bincode::Encode, new_xid, DateTime, Xid};
6use anyhow::Context;
7use async_trait::async_trait;
8use bincode::Decode;
9use sqlx::sqlite::SqliteQueryResult;
10use sqlx::{FromRow, QueryBuilder, SqlitePool};
11use tracing::instrument;
12
13/// An implementation of the Queue backed by SQlite
14#[derive(Clone)]
15pub struct SqliteQueue {
16    pool: SqlitePool,
17    bincode_config: bincode::config::Configuration,
18}
19
20impl SqliteQueue {
21    pub fn with_pool(pool: SqlitePool) -> Self {
22        Self {
23            pool,
24            bincode_config: bincode::config::standard(),
25        }
26    }
27}
28
29#[async_trait]
30impl Queue for SqliteQueue {
31    type JobHandle = SqliteJobHandle;
32
33    #[instrument(skip_all, err, ret, fields(job_type = J::name(), payload_size))]
34    async fn schedule_at<J>(
35        &self,
36        payload: J::Payload,
37        scheduled_at: DateTime,
38        priority: i8,
39    ) -> Result<Xid, QueueError>
40    where
41        J: JobProcessor + 'static,
42        J::Payload: Encode,
43    {
44        let payload = bincode::encode_to_vec(&payload, self.bincode_config)?;
45        let jid = new_xid();
46        let jid_string = jid.to_string();
47        let job_type = J::name();
48
49        tracing::Span::current().record("payload_size", payload.len());
50
51        sqlx::query!(
52            "INSERT INTO adc_queue (jid,job_type,payload,scheduled_at,priority) VALUES (?1,?2,?3,?4,?5)",
53            jid_string,
54            job_type,
55            payload,
56            scheduled_at,
57            priority
58        )
59        .execute(&self.pool)
60        .await
61        .context("Failed to add job to the queue")?;
62        Ok(jid)
63    }
64
65    #[instrument(skip_all, err)]
66    async fn poll_next_with_instant(
67        &self,
68        job_types: &[&str],
69        now: DateTime,
70    ) -> Result<Option<SqliteJobHandle>, QueueError> {
71        let mut builder = QueryBuilder::new(
72            "UPDATE adc_queue SET started_at=(strftime('%s', 'now')),retries=retries+1 ",
73        );
74        let query = {
75            builder.push(
76                "WHERE jid IN (SELECT jid FROM adc_queue WHERE started_at IS NULL AND queue='default' AND scheduled_at <="
77            );
78            builder.push_bind(now);
79            builder.push(" AND job_type IN (");
80            {
81                let mut separated = builder.separated(",");
82                for job_type in job_types {
83                    separated.push_bind(job_type);
84                }
85            }
86            builder.push(") ORDER BY priority DESC LIMIT 1) RETURNING *");
87            builder.build().bind(now)
88        };
89        let row = query
90            .try_map(|row| JobRow::from_row(&row))
91            .fetch_optional(&self.pool)
92            .await
93            .context("Failed to check out a job from the queue")?;
94
95        if let Some(row) = row {
96            Ok(Some(SqliteJobHandle::new(row, self.pool.clone())))
97        } else {
98            Ok(None)
99        }
100    }
101
102    #[instrument(skip_all, err)]
103    async fn cancel_job(&self, job_id: Xid) -> Result<(), QueueError> {
104        let jid = job_id.to_string();
105        let result: SqliteQueryResult = sqlx::query!(
106            "DELETE FROM adc_queue WHERE started_at IS NULL and jid = ?",
107            jid
108        )
109        .execute(&self.pool)
110        .await
111        .context("Failed to remove job from the queue")?;
112        if result.rows_affected() == 0 {
113            Err(QueueError::JobNotFound(job_id))
114        } else {
115            Ok(())
116        }
117    }
118
119    #[allow(clippy::or_fun_call)]
120    #[instrument(skip_all, err)]
121    async fn unschedule_job<J>(&self, job_id: Xid) -> Result<J::Payload, QueueError>
122    where
123        J: JobProcessor + 'static,
124        J::Payload: Decode,
125    {
126        let jid = job_id.to_string();
127        let job_type = J::name();
128        let payload = sqlx::query!(
129            "DELETE FROM adc_queue WHERE started_at IS NULL and jid = ? AND job_type = ? RETURNING *",
130            jid,
131            job_type)
132        .fetch_optional(&self.pool)
133        .await
134        .context("Failed to remove job from the queue")?
135        .map(|row| row.payload.unwrap_or_default())
136        .ok_or(QueueError::JobNotFound(job_id))?;
137        let (decoded, _) = bincode::decode_from_slice(&payload, self.bincode_config)?;
138        Ok(decoded)
139    }
140}