aide_de_camp_sqlite/
lib.rs

1#![doc = include_str!("../README.md")]
2
3pub mod job_handle;
4pub mod queue;
5pub mod types;
6
7pub use queue::SqliteQueue;
8use sqlx::migrate::Migrator;
9pub static MIGRATOR: Migrator = sqlx::migrate!();
10
11#[cfg(test)]
12mod test {
13    use crate::queue::SqliteQueue;
14    use crate::MIGRATOR;
15    use aide_de_camp::core::bincode::{Decode, Encode};
16    use aide_de_camp::core::job_handle::JobHandle;
17    use aide_de_camp::core::job_processor::JobProcessor;
18    use aide_de_camp::core::queue::Queue;
19    use aide_de_camp::core::{CancellationToken, Duration, Xid};
20    use aide_de_camp::prelude::QueueError;
21    use async_trait::async_trait;
22    use sqlx::types::chrono::Utc;
23    use sqlx::SqlitePool;
24    use std::convert::Infallible;
25
26    #[allow(dead_code)]
27    pub fn setup_logger() {
28        tracing_subscriber::fmt()
29            .with_max_level(tracing::Level::TRACE)
30            .with_test_writer()
31            .init();
32    }
33
34    async fn make_pool(uri: &str) -> SqlitePool {
35        let pool = SqlitePool::connect(uri).await.unwrap();
36        MIGRATOR.run(&pool).await.unwrap();
37        pool
38    }
39
40    #[derive(Encode, Decode, PartialEq, Clone, Debug)]
41    struct TestPayload1 {
42        arg1: i32,
43        arg2: String,
44    }
45
46    impl Default for TestPayload1 {
47        fn default() -> Self {
48            Self {
49                arg1: 1774,
50                arg2: String::from("this is a test"),
51            }
52        }
53    }
54
55    struct TestJob1;
56
57    #[async_trait]
58    impl JobProcessor for TestJob1 {
59        type Payload = TestPayload1;
60        type Error = Infallible;
61
62        async fn handle(
63            &self,
64            _jid: Xid,
65            _payload: Self::Payload,
66            _cancellation_token: CancellationToken,
67        ) -> Result<(), Self::Error> {
68            Ok(())
69        }
70
71        fn name() -> &'static str
72        where
73            Self: Sized,
74        {
75            "test_job_1"
76        }
77    }
78
79    #[derive(Encode, Decode, PartialEq, Clone, Debug)]
80    struct TestPayload2 {
81        arg1: i32,
82        arg2: u64,
83        arg3: String,
84    }
85
86    impl Default for TestPayload2 {
87        fn default() -> Self {
88            Self {
89                arg1: 1774,
90                arg2: 42,
91                arg3: String::from("this is a test"),
92            }
93        }
94    }
95
96    struct TestJob2;
97
98    #[async_trait]
99    impl JobProcessor for TestJob2 {
100        type Payload = TestPayload2;
101        type Error = Infallible;
102
103        async fn handle(
104            &self,
105            _jid: Xid,
106            _payload: Self::Payload,
107            _cancellation_token: CancellationToken,
108        ) -> Result<(), Self::Error> {
109            Ok(())
110        }
111
112        fn name() -> &'static str
113        where
114            Self: Sized,
115        {
116            "test_job_2"
117        }
118    }
119
120    // Job with payload2 but job_type is from TestJob1
121    struct TestJob3;
122
123    #[async_trait]
124    impl JobProcessor for TestJob3 {
125        type Payload = TestPayload2;
126        type Error = Infallible;
127
128        async fn handle(
129            &self,
130            _jid: Xid,
131            _payload: Self::Payload,
132            _cancellation_token: CancellationToken,
133        ) -> Result<(), Self::Error> {
134            Ok(())
135        }
136
137        fn name() -> &'static str
138        where
139            Self: Sized,
140        {
141            "test_job_1"
142        }
143    }
144
145    #[tokio::test]
146    async fn queue_smoke_test() {
147        let pool = make_pool(":memory:").await;
148        let queue = SqliteQueue::with_pool(pool);
149
150        // If there are no jobs, this should return Ok(None);
151        {
152            let job = queue.poll_next(&[TestJob1::name()]).await.unwrap();
153            assert!(job.is_none());
154        }
155        // Schedule a job to run now
156        let jid1 = queue
157            .schedule::<TestJob1>(TestPayload1::default(), 0)
158            .await
159            .unwrap();
160
161        // Now poll_next should return this job to us
162        let job1 = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
163        assert_eq!(jid1, job1.id());
164        // Second time poll should not return anything
165        {
166            let job = queue.poll_next(&[TestJob1::name()]).await.unwrap();
167            assert!(job.is_none());
168        }
169
170        // Completed jobs should not show up in queue again
171        job1.complete().await.unwrap();
172        {
173            let job = queue.poll_next(&[TestJob1::name()]).await.unwrap();
174            assert!(job.is_none());
175        }
176    }
177
178    #[tokio::test]
179    async fn failed_jobs() {
180        let pool = make_pool(":memory:").await;
181        let queue = SqliteQueue::with_pool(pool);
182
183        // Schedule a job to run now
184        let _jid1 = queue
185            .schedule::<TestJob1>(TestPayload1::default(), 0)
186            .await
187            .unwrap();
188
189        // Now poll_next should return this job to us
190        let job1 = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
191        assert_eq!(job1.retries(), 0);
192        // Fail the job
193        job1.fail().await.unwrap();
194
195        // We should be able to get the same job again, but it should have increased retry count
196
197        let job1 = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
198        assert_eq!(job1.retries(), 1);
199    }
200
201    #[tokio::test]
202    async fn scheduling_future_jobs() {
203        setup_logger();
204        let pool = make_pool(":memory:").await;
205        let queue = SqliteQueue::with_pool(pool);
206
207        // schedule to run job tomorrow
208        // schedule a job to run now
209        let tomorrow_jid = queue
210            .schedule_in::<TestJob1>(TestPayload1::default(), Duration::days(1), 0)
211            .await
212            .unwrap();
213
214        // Should not be polled yet
215        {
216            let job = queue.poll_next(&[TestJob1::name()]).await.unwrap();
217            assert!(job.is_none());
218        }
219
220        let hour_ago = { Utc::now() - Duration::hours(1) };
221        let hour_ago_jid = queue
222            .schedule_at::<TestJob1>(TestPayload1::default(), hour_ago, 0)
223            .await
224            .unwrap();
225
226        {
227            let job = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
228            assert_eq!(hour_ago_jid, job.id());
229        }
230
231        let tomorrow = Utc::now() + Duration::days(1) + Duration::minutes(1);
232        {
233            let job = queue
234                .poll_next_with_instant(&[TestJob1::name()], tomorrow)
235                .await
236                .unwrap()
237                .unwrap();
238            assert_eq!(tomorrow_jid, job.id());
239        }
240
241        // Everything should be in-progress, so None
242        {
243            let job = queue
244                .poll_next_with_instant(&[TestJob1::name()], tomorrow)
245                .await
246                .unwrap();
247            assert!(job.is_none());
248        }
249    }
250
251    #[tokio::test]
252    async fn cancel_job_not_started() {
253        let pool = make_pool(":memory:").await;
254        let queue = SqliteQueue::with_pool(pool);
255        let jid = queue
256            .schedule::<TestJob1>(TestPayload1::default(), 0)
257            .await
258            .unwrap();
259        queue.cancel_job(jid).await.unwrap();
260
261        // Should return None
262        {
263            let job = queue.poll_next(&[TestJob1::name()]).await.unwrap();
264            assert!(job.is_none());
265        }
266
267        // Should fail
268        let ret = queue.cancel_job(jid).await;
269        assert!(matches!(ret, Err(QueueError::JobNotFound(_))));
270    }
271
272    #[tokio::test]
273    async fn cancel_job_return_payload() {
274        let pool = make_pool(":memory:").await;
275        let queue = SqliteQueue::with_pool(pool);
276        let payload = TestPayload1::default();
277        let jid = queue
278            .schedule::<TestJob1>(payload.clone(), 0)
279            .await
280            .unwrap();
281
282        let deleted_payload = queue.unschedule_job::<TestJob1>(jid).await.unwrap();
283        assert_eq!(payload, deleted_payload);
284
285        let ret = queue.unschedule_job::<TestJob1>(jid).await;
286        assert!(matches!(ret, Err(QueueError::JobNotFound(_))));
287    }
288
289    #[tokio::test]
290    async fn cancel_wrong_type() {
291        let pool = make_pool(":memory:").await;
292        let queue = SqliteQueue::with_pool(pool);
293        let jid = queue
294            .schedule::<TestJob1>(TestPayload1::default(), 0)
295            .await
296            .unwrap();
297
298        let result = queue.unschedule_job::<TestJob2>(jid).await;
299        assert!(matches!(result, Err(QueueError::JobNotFound(_))));
300
301        let result = queue.unschedule_job::<TestJob3>(jid).await;
302        dbg!(&result);
303        assert!(matches!(result, Err(QueueError::DecodeError { .. })));
304    }
305
306    #[tokio::test]
307    async fn cancel_job_started() {
308        let pool = make_pool(":memory:").await;
309        let queue = SqliteQueue::with_pool(pool);
310        let payload = TestPayload1::default();
311        let jid = queue
312            .schedule::<TestJob1>(payload.clone(), 0)
313            .await
314            .unwrap();
315
316        let _job = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
317
318        let ret = queue.cancel_job(jid).await;
319        assert!(matches!(ret, Err(QueueError::JobNotFound(_))));
320
321        let ret = queue.unschedule_job::<TestJob1>(jid).await;
322        assert!(matches!(ret, Err(QueueError::JobNotFound(_))));
323    }
324    #[tokio::test]
325    async fn priority_polling() {
326        let pool = make_pool(":memory:").await;
327        let queue = SqliteQueue::with_pool(pool);
328        let hour_ago = { Utc::now() - Duration::hours(1) };
329        let _hour_ago_jid = queue
330            .schedule_at::<TestJob1>(TestPayload1::default(), hour_ago, 0)
331            .await
332            .unwrap();
333
334        let higher_priority_jid = queue
335            .schedule_at::<TestJob1>(TestPayload1::default(), hour_ago, 3)
336            .await
337            .unwrap();
338
339        let job = queue.poll_next(&[TestJob1::name()]).await.unwrap().unwrap();
340        assert_eq!(higher_priority_jid, job.id());
341    }
342}