aide_de_camp_sqlite/
queue.rs1use crate::job_handle::SqliteJobHandle;
2use crate::types::JobRow;
3use aide_de_camp::core::job_processor::JobProcessor;
4use aide_de_camp::core::queue::{Queue, QueueError};
5use aide_de_camp::core::{bincode::Encode, new_xid, DateTime, Xid};
6use anyhow::Context;
7use async_trait::async_trait;
8use bincode::Decode;
9use sqlx::sqlite::SqliteQueryResult;
10use sqlx::{FromRow, QueryBuilder, SqlitePool};
11use tracing::instrument;
12
13#[derive(Clone)]
15pub struct SqliteQueue {
16 pool: SqlitePool,
17 bincode_config: bincode::config::Configuration,
18}
19
20impl SqliteQueue {
21 pub fn with_pool(pool: SqlitePool) -> Self {
22 Self {
23 pool,
24 bincode_config: bincode::config::standard(),
25 }
26 }
27}
28
29#[async_trait]
30impl Queue for SqliteQueue {
31 type JobHandle = SqliteJobHandle;
32
33 #[instrument(skip_all, err, ret, fields(job_type = J::name(), payload_size))]
34 async fn schedule_at<J>(
35 &self,
36 payload: J::Payload,
37 scheduled_at: DateTime,
38 priority: i8,
39 ) -> Result<Xid, QueueError>
40 where
41 J: JobProcessor + 'static,
42 J::Payload: Encode,
43 {
44 let payload = bincode::encode_to_vec(&payload, self.bincode_config)?;
45 let jid = new_xid();
46 let jid_string = jid.to_string();
47 let job_type = J::name();
48
49 tracing::Span::current().record("payload_size", payload.len());
50
51 sqlx::query!(
52 "INSERT INTO adc_queue (jid,job_type,payload,scheduled_at,priority) VALUES (?1,?2,?3,?4,?5)",
53 jid_string,
54 job_type,
55 payload,
56 scheduled_at,
57 priority
58 )
59 .execute(&self.pool)
60 .await
61 .context("Failed to add job to the queue")?;
62 Ok(jid)
63 }
64
65 #[instrument(skip_all, err)]
66 async fn poll_next_with_instant(
67 &self,
68 job_types: &[&str],
69 now: DateTime,
70 ) -> Result<Option<SqliteJobHandle>, QueueError> {
71 let mut builder = QueryBuilder::new(
72 "UPDATE adc_queue SET started_at=(strftime('%s', 'now')),retries=retries+1 ",
73 );
74 let query = {
75 builder.push(
76 "WHERE jid IN (SELECT jid FROM adc_queue WHERE started_at IS NULL AND queue='default' AND scheduled_at <="
77 );
78 builder.push_bind(now);
79 builder.push(" AND job_type IN (");
80 {
81 let mut separated = builder.separated(",");
82 for job_type in job_types {
83 separated.push_bind(job_type);
84 }
85 }
86 builder.push(") ORDER BY priority DESC LIMIT 1) RETURNING *");
87 builder.build().bind(now)
88 };
89 let row = query
90 .try_map(|row| JobRow::from_row(&row))
91 .fetch_optional(&self.pool)
92 .await
93 .context("Failed to check out a job from the queue")?;
94
95 if let Some(row) = row {
96 Ok(Some(SqliteJobHandle::new(row, self.pool.clone())))
97 } else {
98 Ok(None)
99 }
100 }
101
102 #[instrument(skip_all, err)]
103 async fn cancel_job(&self, job_id: Xid) -> Result<(), QueueError> {
104 let jid = job_id.to_string();
105 let result: SqliteQueryResult = sqlx::query!(
106 "DELETE FROM adc_queue WHERE started_at IS NULL and jid = ?",
107 jid
108 )
109 .execute(&self.pool)
110 .await
111 .context("Failed to remove job from the queue")?;
112 if result.rows_affected() == 0 {
113 Err(QueueError::JobNotFound(job_id))
114 } else {
115 Ok(())
116 }
117 }
118
119 #[allow(clippy::or_fun_call)]
120 #[instrument(skip_all, err)]
121 async fn unschedule_job<J>(&self, job_id: Xid) -> Result<J::Payload, QueueError>
122 where
123 J: JobProcessor + 'static,
124 J::Payload: Decode,
125 {
126 let jid = job_id.to_string();
127 let job_type = J::name();
128 let payload = sqlx::query!(
129 "DELETE FROM adc_queue WHERE started_at IS NULL and jid = ? AND job_type = ? RETURNING *",
130 jid,
131 job_type)
132 .fetch_optional(&self.pool)
133 .await
134 .context("Failed to remove job from the queue")?
135 .map(|row| row.payload.unwrap_or_default())
136 .ok_or(QueueError::JobNotFound(job_id))?;
137 let (decoded, _) = bincode::decode_from_slice(&payload, self.bincode_config)?;
138 Ok(decoded)
139 }
140}