aide_de_camp_sqlite/
lib.rs1#![doc = include_str!("../README.md")]
2
3pub mod job_handle;
4pub mod queue;
5pub mod types;
6
7pub use queue::SqliteQueue;
8use sqlx::migrate::Migrator;
9pub static MIGRATOR: Migrator = sqlx::migrate!();
10
11#[cfg(test)]
12mod test {
13 use crate::queue::SqliteQueue;
14 use crate::MIGRATOR;
15 use aide_de_camp::core::bincode::{Decode, Encode};
16 use aide_de_camp::core::job_handle::JobHandle;
17 use aide_de_camp::core::job_processor::JobProcessor;
18 use aide_de_camp::core::queue::Queue;
19 use aide_de_camp::core::{CancellationToken, Duration, Xid};
20 use aide_de_camp::prelude::QueueError;
21 use async_trait::async_trait;
22 use sqlx::types::chrono::Utc;
23 use sqlx::SqlitePool;
24 use std::convert::Infallible;
25
26 #[allow(dead_code)]
27 pub fn setup_logger() {
28 tracing_subscriber::fmt()
29 .with_max_level(tracing::Level::TRACE)
30 .with_test_writer()
31 .init();
32 }
33
34 async fn make_pool(uri: &str) -> SqlitePool {
35 let pool = SqlitePool::connect(uri).await.unwrap();
36 MIGRATOR.run(&pool).await.unwrap();
37 pool
38 }
39
40 #[derive(Encode, Decode, PartialEq, Clone, Debug)]
41 struct TestPayload1 {
42 arg1: i32,
43 arg2: String,
44 }
45
46 impl Default for TestPayload1 {
47 fn default() -> Self {
48 Self {
49 arg1: 1774,
50 arg2: String::from("this is a test"),
51 }
52 }
53 }
54
55 struct TestJob1;
56
57 #[async_trait]
58 impl JobProcessor for TestJob1 {
59 type Payload = TestPayload1;
60 type Error = Infallible;
61
62 async fn handle(
63 &self,
64 _jid: Xid,
65 _payload: Self::Payload,
66 _cancellation_token: CancellationToken,
67 ) -> Result<(), Self::Error> {
68 Ok(())
69 }
70
71 fn name() -> &'static str
72 where
73 Self: Sized,
74 {
75 "test_job_1"
76 }
77 }
78
79 #[derive(Encode, Decode, PartialEq, Clone, Debug)]
80 struct TestPayload2 {
81 arg1: i32,
82 arg2: u64,
83 arg3: String,
84 }
85
86 impl Default for TestPayload2 {
87 fn default() -> Self {
88 Self {
89 arg1: 1774,
90 arg2: 42,
91 arg3: String::from("this is a test"),
92 }
93 }
94 }
95
96 struct TestJob2;
97
98 #[async_trait]
99 impl JobProcessor for TestJob2 {
100 type Payload = TestPayload2;
101 type Error = Infallible;
102
103 async fn handle(
104 &self,
105 _jid: Xid,
106 _payload: Self::Payload,
107 _cancellation_token: CancellationToken,
108 ) -> Result<(), Self::Error> {
109 Ok(())
110 }
111
112 fn name() -> &'static str
113 where
114 Self: Sized,
115 {
116 "test_job_2"
117 }
118 }
119
120 struct TestJob3;
122
123 #[async_trait]
124 impl JobProcessor for TestJob3 {
125 type Payload = TestPayload2;
126 type Error = Infallible;
127
128 async fn handle(
129 &self,
130 _jid: Xid,
131 _payload: Self::Payload,
132 _cancellation_token: CancellationToken,
133 ) -> Result<(), Self::Error> {
134 Ok(())
135 }
136
137 fn name() -> &'static str
138 where
139 Self: Sized,
140 {
141 "test_job_1"
142 }
143 }
144
145 #[tokio::test]
146 async fn queue_smoke_test() {
147 let pool = make_pool(":memory:").await;
148 let queue = SqliteQueue::with_pool(pool);
149
150 {
152 let job = queue.poll_next(&[TestJob1::name()]).await.unwrap();
153 assert!(job.is_none());
154 }
155 let jid1 = queue
157 .schedule::<TestJob1>(TestPayload1::default(), 0)
158 .await
159 .unwrap();
160
161 let job1 = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
163 assert_eq!(jid1, job1.id());
164 {
166 let job = queue.poll_next(&[TestJob1::name()]).await.unwrap();
167 assert!(job.is_none());
168 }
169
170 job1.complete().await.unwrap();
172 {
173 let job = queue.poll_next(&[TestJob1::name()]).await.unwrap();
174 assert!(job.is_none());
175 }
176 }
177
178 #[tokio::test]
179 async fn failed_jobs() {
180 let pool = make_pool(":memory:").await;
181 let queue = SqliteQueue::with_pool(pool);
182
183 let _jid1 = queue
185 .schedule::<TestJob1>(TestPayload1::default(), 0)
186 .await
187 .unwrap();
188
189 let job1 = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
191 assert_eq!(job1.retries(), 0);
192 job1.fail().await.unwrap();
194
195 let job1 = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
198 assert_eq!(job1.retries(), 1);
199 }
200
201 #[tokio::test]
202 async fn scheduling_future_jobs() {
203 setup_logger();
204 let pool = make_pool(":memory:").await;
205 let queue = SqliteQueue::with_pool(pool);
206
207 let tomorrow_jid = queue
210 .schedule_in::<TestJob1>(TestPayload1::default(), Duration::days(1), 0)
211 .await
212 .unwrap();
213
214 {
216 let job = queue.poll_next(&[TestJob1::name()]).await.unwrap();
217 assert!(job.is_none());
218 }
219
220 let hour_ago = { Utc::now() - Duration::hours(1) };
221 let hour_ago_jid = queue
222 .schedule_at::<TestJob1>(TestPayload1::default(), hour_ago, 0)
223 .await
224 .unwrap();
225
226 {
227 let job = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
228 assert_eq!(hour_ago_jid, job.id());
229 }
230
231 let tomorrow = Utc::now() + Duration::days(1) + Duration::minutes(1);
232 {
233 let job = queue
234 .poll_next_with_instant(&[TestJob1::name()], tomorrow)
235 .await
236 .unwrap()
237 .unwrap();
238 assert_eq!(tomorrow_jid, job.id());
239 }
240
241 {
243 let job = queue
244 .poll_next_with_instant(&[TestJob1::name()], tomorrow)
245 .await
246 .unwrap();
247 assert!(job.is_none());
248 }
249 }
250
251 #[tokio::test]
252 async fn cancel_job_not_started() {
253 let pool = make_pool(":memory:").await;
254 let queue = SqliteQueue::with_pool(pool);
255 let jid = queue
256 .schedule::<TestJob1>(TestPayload1::default(), 0)
257 .await
258 .unwrap();
259 queue.cancel_job(jid).await.unwrap();
260
261 {
263 let job = queue.poll_next(&[TestJob1::name()]).await.unwrap();
264 assert!(job.is_none());
265 }
266
267 let ret = queue.cancel_job(jid).await;
269 assert!(matches!(ret, Err(QueueError::JobNotFound(_))));
270 }
271
272 #[tokio::test]
273 async fn cancel_job_return_payload() {
274 let pool = make_pool(":memory:").await;
275 let queue = SqliteQueue::with_pool(pool);
276 let payload = TestPayload1::default();
277 let jid = queue
278 .schedule::<TestJob1>(payload.clone(), 0)
279 .await
280 .unwrap();
281
282 let deleted_payload = queue.unschedule_job::<TestJob1>(jid).await.unwrap();
283 assert_eq!(payload, deleted_payload);
284
285 let ret = queue.unschedule_job::<TestJob1>(jid).await;
286 assert!(matches!(ret, Err(QueueError::JobNotFound(_))));
287 }
288
289 #[tokio::test]
290 async fn cancel_wrong_type() {
291 let pool = make_pool(":memory:").await;
292 let queue = SqliteQueue::with_pool(pool);
293 let jid = queue
294 .schedule::<TestJob1>(TestPayload1::default(), 0)
295 .await
296 .unwrap();
297
298 let result = queue.unschedule_job::<TestJob2>(jid).await;
299 assert!(matches!(result, Err(QueueError::JobNotFound(_))));
300
301 let result = queue.unschedule_job::<TestJob3>(jid).await;
302 dbg!(&result);
303 assert!(matches!(result, Err(QueueError::DecodeError { .. })));
304 }
305
306 #[tokio::test]
307 async fn cancel_job_started() {
308 let pool = make_pool(":memory:").await;
309 let queue = SqliteQueue::with_pool(pool);
310 let payload = TestPayload1::default();
311 let jid = queue
312 .schedule::<TestJob1>(payload.clone(), 0)
313 .await
314 .unwrap();
315
316 let _job = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
317
318 let ret = queue.cancel_job(jid).await;
319 assert!(matches!(ret, Err(QueueError::JobNotFound(_))));
320
321 let ret = queue.unschedule_job::<TestJob1>(jid).await;
322 assert!(matches!(ret, Err(QueueError::JobNotFound(_))));
323 }
324 #[tokio::test]
325 async fn priority_polling() {
326 let pool = make_pool(":memory:").await;
327 let queue = SqliteQueue::with_pool(pool);
328 let hour_ago = { Utc::now() - Duration::hours(1) };
329 let _hour_ago_jid = queue
330 .schedule_at::<TestJob1>(TestPayload1::default(), hour_ago, 0)
331 .await
332 .unwrap();
333
334 let higher_priority_jid = queue
335 .schedule_at::<TestJob1>(TestPayload1::default(), hour_ago, 3)
336 .await
337 .unwrap();
338
339 let job = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
340 assert_eq!(higher_priority_jid, job.id());
341 }
342}