apalis_sql/
lib.rs

1#![warn(
2    missing_debug_implementations,
3    missing_docs,
4    rust_2018_idioms,
5    unreachable_pub
6)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
8
9//! # apalis-sql
10//! apalis offers Sqlite, Mysql and Postgres storages for its workers.
11//! See relevant modules for examples
12//!
13//! **Features**
14//!
15//! These features can be used to enable SQLx functionalities:
16//! - `mysql` - MySQL support.
17//! - `postgres` - PostgreSQL support.
18//! - `sqlite` - SQLite support.
19//! - `migrate` - Migrations for Apalis tables.
20//! - `async-std-comp` - Use [async-std](https://async.rs/) runtime and [rustls](https://docs.rs/rustls/latest/rustls/).
21//! - `async-std-comp-native-tls` - Use [async-std](https://async.rs/) runtime and [native-tls](https://docs.rs/native-tls/latest/native_tls/).
22//! - `tokio-comp` - Use [Tokio](https://tokio.rs/) runtime and [rustls](https://docs.rs/rustls/latest/rustls/).
23//! - `tokio-comp-native-tls` - Use [Tokio](https://tokio.rs/) runtime and [native-tls](https://docs.rs/native-tls/latest/native_tls/).
24//!
25//! For more information about the runtime and TLS features, see the [SQLx features documentation](https://docs.rs/sqlx/latest/sqlx/).
26
27use std::{num::TryFromIntError, time::Duration};
28
29use apalis_core::{error::Error, request::State, response::Response};
30
31/// The context of the sql job
32pub mod context;
33/// Util for fetching rows
34pub mod from_row;
35
36/// Postgres storage for apalis. Uses `NOTIFY` and `SKIP LOCKED`
37#[cfg(feature = "postgres")]
38#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))]
39pub mod postgres;
40
41/// Sqlite Storage for apalis.
42/// Uses a transaction and min(rowid)
43#[cfg(feature = "sqlite")]
44#[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))]
45pub mod sqlite;
46
47/// MySQL storage for apalis. Uses `SKIP LOCKED`
48#[cfg(feature = "mysql")]
49#[cfg_attr(docsrs, doc(cfg(feature = "mysql")))]
50pub mod mysql;
51
52use context::SqlContext;
53// Re-exports
54pub use sqlx;
55
56/// Config for sql storages
57#[derive(Debug, Clone)]
58pub struct Config {
59    keep_alive: Duration,
60    buffer_size: usize,
61    poll_interval: Duration,
62    reenqueue_orphaned_after: Duration,
63    namespace: String,
64}
65
66/// A general sql error
67#[derive(Debug, thiserror::Error)]
68pub enum SqlError {
69    /// Handles sqlx errors
70    #[error("sqlx::Error: {0}")]
71    Sqlx(#[from] sqlx::Error),
72    /// Handles int conversion errors
73    #[error("TryFromIntError: {0}")]
74    TryFromInt(#[from] TryFromIntError),
75}
76
77impl Default for Config {
78    fn default() -> Self {
79        Self {
80            keep_alive: Duration::from_secs(30),
81            buffer_size: 10,
82            poll_interval: Duration::from_millis(100),
83            reenqueue_orphaned_after: Duration::from_secs(300), // 5 minutes
84            namespace: String::from("apalis::sql"),
85        }
86    }
87}
88
89impl Config {
90    /// Create a new config with a jobs namespace
91    pub fn new(namespace: &str) -> Self {
92        Config::default().set_namespace(namespace)
93    }
94
95    /// Interval between database poll queries
96    ///
97    /// Defaults to 100ms
98    pub fn set_poll_interval(mut self, interval: Duration) -> Self {
99        self.poll_interval = interval;
100        self
101    }
102
103    /// Interval between worker keep-alive database updates
104    ///
105    /// Defaults to 30s
106    pub fn set_keep_alive(mut self, keep_alive: Duration) -> Self {
107        self.keep_alive = keep_alive;
108        self
109    }
110
111    /// Buffer size to use when querying for jobs
112    ///
113    /// Defaults to 10
114    pub fn set_buffer_size(mut self, buffer_size: usize) -> Self {
115        self.buffer_size = buffer_size;
116        self
117    }
118
119    /// Set the namespace to consume and push jobs to
120    ///
121    /// Defaults to "apalis::sql"
122    pub fn set_namespace(mut self, namespace: &str) -> Self {
123        self.namespace = namespace.to_string();
124        self
125    }
126
127    /// Gets a reference to the keep_alive duration.
128    pub fn keep_alive(&self) -> &Duration {
129        &self.keep_alive
130    }
131
132    /// Gets a mutable reference to the keep_alive duration.
133    pub fn keep_alive_mut(&mut self) -> &mut Duration {
134        &mut self.keep_alive
135    }
136
137    /// Gets the buffer size.
138    pub fn buffer_size(&self) -> usize {
139        self.buffer_size
140    }
141
142    /// Gets a reference to the poll_interval duration.
143    pub fn poll_interval(&self) -> &Duration {
144        &self.poll_interval
145    }
146
147    /// Gets a mutable reference to the poll_interval duration.
148    pub fn poll_interval_mut(&mut self) -> &mut Duration {
149        &mut self.poll_interval
150    }
151
152    /// Gets a reference to the namespace.
153    pub fn namespace(&self) -> &String {
154        &self.namespace
155    }
156
157    /// Gets a mutable reference to the namespace.
158    pub fn namespace_mut(&mut self) -> &mut String {
159        &mut self.namespace
160    }
161
162    /// Gets the reenqueue_orphaned_after duration.
163    pub fn reenqueue_orphaned_after(&self) -> Duration {
164        self.reenqueue_orphaned_after
165    }
166
167    /// Gets a mutable reference to the reenqueue_orphaned_after.
168    pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration {
169        &mut self.reenqueue_orphaned_after
170    }
171
172    /// Occasionally some workers die, or abandon jobs because of panics.
173    /// This is the time a task takes before its back to the queue
174    ///
175    /// Defaults to 5 minutes
176    pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self {
177        self.reenqueue_orphaned_after = after;
178        self
179    }
180}
181
182/// Calculates the status from a result
183pub fn calculate_status<Res>(ctx: &SqlContext, res: &Response<Res>) -> State {
184    match &res.inner {
185        Ok(_) => State::Done,
186        Err(e) => match &e {
187            Error::Abort(_) => State::Killed,
188            Error::Failed(_) if ctx.max_attempts() as usize <= res.attempt.current() => {
189                State::Killed
190            }
191            _ => State::Failed,
192        },
193    }
194}
195
196/// Standard checks for any sql backend
197#[macro_export]
198macro_rules! sql_storage_tests {
199    ($setup:path, $storage_type:ty, $job_type:ty) => {
200        type WrappedStorage = TestWrapper<$storage_type, Request<$job_type, SqlContext>, ()>;
201
202        async fn setup_test_wrapper() -> WrappedStorage {
203            let (mut t, poller) = TestWrapper::new_with_service(
204                $setup().await,
205                apalis_core::service_fn::service_fn(email_service::send_email),
206            );
207            tokio::spawn(poller);
208            t.vacuum().await.unwrap();
209            t
210        }
211
212        async fn push_email_priority(
213            storage: &mut WrappedStorage,
214            email: Email,
215            priority: i32,
216        ) -> TaskId {
217            let mut ctx = SqlContext::new();
218            ctx.set_priority(priority);
219            storage
220                .push_request(Request::new_with_ctx(email, ctx))
221                .await
222                .expect("failed to push a job")
223                .task_id
224        }
225
226        #[tokio::test]
227        async fn integration_test_kill_job() {
228            let mut storage = setup_test_wrapper().await;
229
230            storage
231                .push(email_service::example_killed_email())
232                .await
233                .unwrap();
234
235            let (job_id, res) = storage.execute_next().await.unwrap();
236            assert_eq!(res, Err("AbortError: Invalid character.".to_owned()));
237            apalis_core::sleep(Duration::from_secs(1)).await;
238            let job = storage
239                .fetch_by_id(&job_id)
240                .await
241                .unwrap()
242                .expect("No job found");
243            let ctx = job.parts.context;
244            assert_eq!(*ctx.status(), State::Killed);
245            // assert!(ctx.done_at().is_some());
246            assert_eq!(
247                ctx.last_error().clone().unwrap(),
248                "{\"Err\":\"AbortError: Invalid character.\"}"
249            );
250        }
251
252        #[tokio::test]
253        async fn integration_test_update_job() {
254            let mut storage = setup_test_wrapper().await;
255
256            let task_id = storage
257                .push(Email {
258                    subject: "Test Subject".to_string(),
259                    to: "example@sql".to_string(),
260                    text: "Some Text".to_string(),
261                })
262                .await
263                .expect("failed to push a job")
264                .task_id;
265
266            let mut job = get_job(&mut storage, &task_id).await;
267            job.parts.context.set_status(State::Killed);
268            storage.update(job).await.expect("updating to succeed");
269
270            let job = get_job(&mut storage, &task_id).await;
271            let ctx = job.parts.context;
272            assert_eq!(*ctx.status(), State::Killed);
273        }
274
275        #[tokio::test]
276        async fn integration_test_acknowledge_good_job() {
277            let mut storage = setup_test_wrapper().await;
278            storage
279                .push(email_service::example_good_email())
280                .await
281                .unwrap();
282
283            let (job_id, res) = storage.execute_next().await.unwrap();
284            assert_eq!(res, Ok("()".to_owned()));
285            apalis_core::sleep(Duration::from_secs(1)).await;
286            let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
287            let ctx = job.parts.context;
288            assert_eq!(*ctx.status(), State::Done);
289            assert!(ctx.done_at().is_some());
290        }
291
292        #[tokio::test]
293        async fn integration_test_acknowledge_failed_job() {
294            let mut storage = setup_test_wrapper().await;
295
296            storage
297                .push(email_service::example_retry_able_email())
298                .await
299                .unwrap();
300
301            let (job_id, res) = storage.execute_next().await.unwrap();
302            assert_eq!(
303                res,
304                Err("FailedError: Missing separator character '@'.".to_owned())
305            );
306            apalis_core::sleep(Duration::from_secs(1)).await;
307            let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
308            let ctx = job.parts.context;
309            assert_eq!(*ctx.status(), State::Failed);
310            assert!(job.parts.attempt.current() >= 1);
311            assert_eq!(
312                ctx.last_error().clone().unwrap(),
313                "{\"Err\":\"FailedError: Missing separator character '@'.\"}"
314            );
315        }
316
317        #[tokio::test]
318        async fn worker_consume() {
319            use apalis_core::builder::WorkerBuilder;
320            use apalis_core::builder::WorkerFactoryFn;
321            let storage = $setup().await;
322            let mut handle = storage.clone();
323
324            let parts = handle
325                .push(email_service::example_good_email())
326                .await
327                .unwrap();
328
329            async fn task(_job: Email) -> &'static str {
330                tokio::time::sleep(Duration::from_millis(100)).await;
331                "Job well done"
332            }
333            let worker = WorkerBuilder::new("rango-tango").backend(storage);
334            let worker = worker.build_fn(task);
335            let wkr = worker.run();
336
337            let w = wkr.get_handle();
338
339            let runner = async move {
340                apalis_core::sleep(Duration::from_secs(3)).await;
341                let job_id = &parts.task_id;
342                let job = get_job(&mut handle, job_id).await;
343                let ctx = job.parts.context;
344
345                assert_eq!(*ctx.status(), State::Done);
346                assert!(ctx.done_at().is_some());
347                assert!(ctx.lock_by().is_some());
348                assert!(ctx.lock_at().is_some());
349                assert!(ctx.last_error().is_some()); // TODO: rename last_error to last_result
350
351                w.stop();
352            };
353
354            tokio::join!(runner, wkr);
355        }
356
357        #[tokio::test]
358        async fn test_consume_jobs_with_priority() {
359            let mut storage = setup_test_wrapper().await;
360
361            // push several jobs with differing priorities, then ensure they get executed
362            // in priority order.
363            let job2 =
364                push_email_priority(&mut storage, email_service::example_good_email(), 5).await;
365            let job1 =
366                push_email_priority(&mut storage, email_service::example_good_email(), 10).await;
367            let job4 =
368                push_email_priority(&mut storage, email_service::example_good_email(), -1).await;
369            let job3 =
370                push_email_priority(&mut storage, email_service::example_good_email(), 1).await;
371
372            for (job_id, prio) in &[(job1, 10), (job2, 5), (job3, 1), (job4, -1)] {
373                let (exec_job_id, res) = storage.execute_next().await.unwrap();
374                assert_eq!(job_id, &exec_job_id);
375                assert_eq!(res, Ok("()".to_owned()));
376                apalis_core::sleep(Duration::from_millis(500)).await;
377                let job = storage.fetch_by_id(&exec_job_id).await.unwrap().unwrap();
378                let ctx = job.parts.context;
379
380                assert_eq!(*ctx.status(), State::Done);
381                assert_eq!(ctx.priority(), prio);
382            }
383        }
384
385        #[tokio::test]
386        async fn test_schedule_request() {
387            use chrono::SubsecRound;
388
389            let mut storage = $setup().await;
390
391            let email = Email {
392                subject: "Scheduled Email".to_string(),
393                to: "scheduled@example.com".to_string(),
394                text: "This is a scheduled email.".to_string(),
395            };
396
397            // Schedule 1 minute in the future
398            let schedule_time = Utc::now() + Duration::from_secs(60);
399
400            let parts = storage
401                .schedule(email, schedule_time.timestamp())
402                .await
403                .expect("Failed to schedule request");
404
405            let job = get_job(&mut storage, &parts.task_id).await;
406            let ctx = &job.parts.context;
407
408            assert_eq!(*ctx.status(), State::Pending);
409            assert!(ctx.lock_by().is_none());
410            assert!(ctx.lock_at().is_none());
411            let expected_schedule_time = schedule_time.clone().trunc_subsecs(0);
412            assert_eq!(ctx.run_at(), &expected_schedule_time);
413        }
414
415        #[tokio::test]
416        async fn test_backend_expose_succeeds() {
417            let storage = setup_test_wrapper().await;
418
419            assert!(storage.stats().await.is_ok());
420            assert!(storage.list_jobs(&State::Pending, 1).await.is_ok());
421            assert!(storage.list_workers().await.is_ok());
422        }
423
424        #[tokio::test]
425        async fn integration_test_shedule_and_run_job() {
426            let current = Utc::now();
427            let dur = Duration::from_secs(15);
428            let schedule_time = current + dur;
429            let mut storage = setup_test_wrapper().await;
430            storage
431                .schedule(
432                    email_service::example_good_email(),
433                    schedule_time.timestamp(),
434                )
435                .await
436                .unwrap();
437
438            let (job_id, res) = storage.execute_next().await.unwrap();
439            apalis_core::sleep(Duration::from_secs(1)).await;
440
441            assert_eq!(res, Ok("()".to_owned()));
442            let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
443            let ctx = job.parts.context;
444            assert_eq!(*ctx.status(), State::Done);
445            assert!(
446                ctx.lock_at().unwrap() >= schedule_time.timestamp(),
447                "{} should be greater than {}",
448                ctx.lock_at().unwrap(),
449                schedule_time.timestamp()
450            );
451        }
452    };
453}