apalis_sql/
lib.rs

1#![warn(
2    missing_debug_implementations,
3    missing_docs,
4    rust_2018_idioms,
5    unreachable_pub
6)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
8
9//! # apalis-sql
10//! apalis offers Sqlite, Mysql and Postgres storages for its workers.
11//! See relevant modules for examples
12//!
13//! **Features**
14//!
15//! These features can be used to enable SQLx functionalities:
16//! - `mysql` - MySQL support.
17//! - `postgres` - PostgreSQL support.
18//! - `sqlite` - SQLite support.
19//! - `migrate` - Migrations for Apalis tables.
20//! - `async-std-comp` - Use [async-std](https://async.rs/) runtime and [rustls](https://docs.rs/rustls/latest/rustls/).
21//! - `async-std-comp-native-tls` - Use [async-std](https://async.rs/) runtime and [native-tls](https://docs.rs/native-tls/latest/native_tls/).
22//! - `tokio-comp` - Use [Tokio](https://tokio.rs/) runtime and [rustls](https://docs.rs/rustls/latest/rustls/).
23//! - `tokio-comp-native-tls` - Use [Tokio](https://tokio.rs/) runtime and [native-tls](https://docs.rs/native-tls/latest/native_tls/).
24//!
25//! For more information about the runtime and TLS features, see the [SQLx features documentation](https://docs.rs/sqlx/latest/sqlx/).
26
27use std::{num::TryFromIntError, time::Duration};
28
29use apalis_core::{error::Error, request::State, response::Response};
30
31/// The context of the sql job
32pub mod context;
33/// Util for fetching rows
34pub mod from_row;
35
36/// Postgres storage for apalis. Uses `NOTIFY` and `SKIP LOCKED`
37#[cfg(feature = "postgres")]
38#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))]
39pub mod postgres;
40
41/// Sqlite Storage for apalis.
42/// Uses a transaction and min(rowid)
43#[cfg(feature = "sqlite")]
44#[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))]
45pub mod sqlite;
46
47/// MySQL storage for apalis. Uses `SKIP LOCKED`
48#[cfg(feature = "mysql")]
49#[cfg_attr(docsrs, doc(cfg(feature = "mysql")))]
50pub mod mysql;
51
52use context::SqlContext;
53// Re-exports
54pub use sqlx;
55
56/// Config for sql storages
57#[derive(Debug, Clone)]
58pub struct Config {
59    keep_alive: Duration,
60    buffer_size: usize,
61    poll_interval: Duration,
62    reenqueue_orphaned_after: Duration,
63    namespace: String,
64}
65
66/// A general sql error
67#[derive(Debug, thiserror::Error)]
68pub enum SqlError {
69    /// Handles sqlx errors
70    #[error("sqlx::Error: {0}")]
71    Sqlx(#[from] sqlx::Error),
72    /// Handles int conversion errors
73    #[error("TryFromIntError: {0}")]
74    TryFromInt(#[from] TryFromIntError),
75}
76
77impl Default for Config {
78    fn default() -> Self {
79        Self {
80            keep_alive: Duration::from_secs(30),
81            buffer_size: 10,
82            poll_interval: Duration::from_millis(100),
83            reenqueue_orphaned_after: Duration::from_secs(300), // 5 minutes
84            namespace: String::from("apalis::sql"),
85        }
86    }
87}
88
89impl Config {
90    /// Create a new config with a jobs namespace
91    pub fn new(namespace: &str) -> Self {
92        Config::default().set_namespace(namespace)
93    }
94
95    /// Interval between database poll queries
96    ///
97    /// Defaults to 100ms
98    pub fn set_poll_interval(mut self, interval: Duration) -> Self {
99        self.poll_interval = interval;
100        self
101    }
102
103    /// Interval between worker keep-alive database updates
104    ///
105    /// Defaults to 30s
106    pub fn set_keep_alive(mut self, keep_alive: Duration) -> Self {
107        self.keep_alive = keep_alive;
108        self
109    }
110
111    /// Buffer size to use when querying for jobs
112    ///
113    /// Defaults to 10
114    pub fn set_buffer_size(mut self, buffer_size: usize) -> Self {
115        self.buffer_size = buffer_size;
116        self
117    }
118
119    /// Set the namespace to consume and push jobs to
120    ///
121    /// Defaults to "apalis::sql"
122    pub fn set_namespace(mut self, namespace: &str) -> Self {
123        self.namespace = namespace.to_string();
124        self
125    }
126
127    /// Gets a reference to the keep_alive duration.
128    pub fn keep_alive(&self) -> &Duration {
129        &self.keep_alive
130    }
131
132    /// Gets a mutable reference to the keep_alive duration.
133    pub fn keep_alive_mut(&mut self) -> &mut Duration {
134        &mut self.keep_alive
135    }
136
137    /// Gets the buffer size.
138    pub fn buffer_size(&self) -> usize {
139        self.buffer_size
140    }
141
142    /// Gets a reference to the poll_interval duration.
143    pub fn poll_interval(&self) -> &Duration {
144        &self.poll_interval
145    }
146
147    /// Gets a mutable reference to the poll_interval duration.
148    pub fn poll_interval_mut(&mut self) -> &mut Duration {
149        &mut self.poll_interval
150    }
151
152    /// Gets a reference to the namespace.
153    pub fn namespace(&self) -> &String {
154        &self.namespace
155    }
156
157    /// Gets a mutable reference to the namespace.
158    pub fn namespace_mut(&mut self) -> &mut String {
159        &mut self.namespace
160    }
161
162    /// Gets the reenqueue_orphaned_after duration.
163    pub fn reenqueue_orphaned_after(&self) -> Duration {
164        self.reenqueue_orphaned_after
165    }
166
167    /// Gets a mutable reference to the reenqueue_orphaned_after.
168    pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration {
169        &mut self.reenqueue_orphaned_after
170    }
171
172    /// Occasionally some workers die, or abandon jobs because of panics.
173    /// This is the time a task takes before its back to the queue
174    ///
175    /// Defaults to 5 minutes
176    pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self {
177        self.reenqueue_orphaned_after = after;
178        self
179    }
180}
181
182/// Calculates the status from a result
183pub fn calculate_status<Res>(ctx: &SqlContext, res: &Response<Res>) -> State {
184    match &res.inner {
185        Ok(_) => State::Done,
186        Err(e) => match &e {
187            Error::Abort(_) => State::Killed,
188            Error::Failed(_) if ctx.max_attempts() as usize <= res.attempt.current() => {
189                State::Killed
190            }
191            _ => State::Failed,
192        },
193    }
194}
195
196/// Standard checks for any sql backend
197#[macro_export]
198macro_rules! sql_storage_tests {
199    ($setup:path, $storage_type:ty, $job_type:ty) => {
200        type WrappedStorage = TestWrapper<$storage_type, Request<$job_type, SqlContext>, ()>;
201
202        async fn setup_test_wrapper() -> WrappedStorage {
203            let (mut t, poller) = TestWrapper::new_with_service(
204                $setup().await,
205                apalis_core::service_fn::service_fn(email_service::send_email),
206            );
207            tokio::spawn(poller);
208            t.vacuum().await.unwrap();
209            t
210        }
211
212        async fn push_email_priority(
213            storage: &mut WrappedStorage,
214            email: Email,
215            priority: i32,
216        ) -> TaskId {
217            let mut ctx = SqlContext::new();
218            ctx.set_priority(priority);
219            storage
220                .push_request(Request::new_with_ctx(email, ctx))
221                .await
222                .expect("failed to push a job")
223                .task_id
224        }
225
226        #[tokio::test]
227        async fn integration_test_kill_job() {
228            let mut storage = setup_test_wrapper().await;
229
230            storage
231                .push(email_service::example_killed_email())
232                .await
233                .unwrap();
234
235            let (job_id, res) = storage.execute_next().await.unwrap();
236            assert_eq!(res, Err("AbortError: Invalid character.".to_owned()));
237            apalis_core::sleep(Duration::from_secs(1)).await;
238            let job = storage
239                .fetch_by_id(&job_id)
240                .await
241                .unwrap()
242                .expect("No job found");
243            let ctx = job.parts.context;
244            assert_eq!(*ctx.status(), State::Killed);
245            // assert!(ctx.done_at().is_some());
246            assert_eq!(
247                ctx.last_error().clone().unwrap(),
248                "{\"Err\":\"AbortError: Invalid character.\"}"
249            );
250        }
251
252        #[tokio::test]
253        async fn integration_test_acknowledge_good_job() {
254            let mut storage = setup_test_wrapper().await;
255            storage
256                .push(email_service::example_good_email())
257                .await
258                .unwrap();
259
260            let (job_id, res) = storage.execute_next().await.unwrap();
261            assert_eq!(res, Ok("()".to_owned()));
262            apalis_core::sleep(Duration::from_secs(1)).await;
263            let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
264            let ctx = job.parts.context;
265            assert_eq!(*ctx.status(), State::Done);
266            assert!(ctx.done_at().is_some());
267        }
268
269        #[tokio::test]
270        async fn integration_test_acknowledge_failed_job() {
271            let mut storage = setup_test_wrapper().await;
272
273            storage
274                .push(email_service::example_retry_able_email())
275                .await
276                .unwrap();
277
278            let (job_id, res) = storage.execute_next().await.unwrap();
279            assert_eq!(
280                res,
281                Err("FailedError: Missing separator character '@'.".to_owned())
282            );
283            apalis_core::sleep(Duration::from_secs(1)).await;
284            let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
285            let ctx = job.parts.context;
286            assert_eq!(*ctx.status(), State::Failed);
287            assert!(job.parts.attempt.current() >= 1);
288            assert_eq!(
289                ctx.last_error().clone().unwrap(),
290                "{\"Err\":\"FailedError: Missing separator character '@'.\"}"
291            );
292        }
293
294        #[tokio::test]
295        async fn worker_consume() {
296            use apalis_core::builder::WorkerBuilder;
297            use apalis_core::builder::WorkerFactoryFn;
298            let storage = $setup().await;
299            let mut handle = storage.clone();
300
301            let parts = handle
302                .push(email_service::example_good_email())
303                .await
304                .unwrap();
305
306            async fn task(_job: Email) -> &'static str {
307                tokio::time::sleep(Duration::from_millis(100)).await;
308                "Job well done"
309            }
310            let worker = WorkerBuilder::new("rango-tango").backend(storage);
311            let worker = worker.build_fn(task);
312            let wkr = worker.run();
313
314            let w = wkr.get_handle();
315
316            let runner = async move {
317                apalis_core::sleep(Duration::from_secs(3)).await;
318                let job_id = &parts.task_id;
319                let job = get_job(&mut handle, job_id).await;
320                let ctx = job.parts.context;
321
322                assert_eq!(*ctx.status(), State::Done);
323                assert!(ctx.done_at().is_some());
324                assert!(ctx.lock_by().is_some());
325                assert!(ctx.lock_at().is_some());
326                assert!(ctx.last_error().is_some()); // TODO: rename last_error to last_result
327
328                w.stop();
329            };
330
331            tokio::join!(runner, wkr);
332        }
333
334        #[tokio::test]
335        async fn test_consume_jobs_with_priority() {
336            let mut storage = setup_test_wrapper().await;
337
338            // push several jobs with differing priorities, then ensure they get executed
339            // in priority order.
340            let job2 =
341                push_email_priority(&mut storage, email_service::example_good_email(), 5).await;
342            let job1 =
343                push_email_priority(&mut storage, email_service::example_good_email(), 10).await;
344            let job4 =
345                push_email_priority(&mut storage, email_service::example_good_email(), -1).await;
346            let job3 =
347                push_email_priority(&mut storage, email_service::example_good_email(), 1).await;
348
349            for (job_id, prio) in &[(job1, 10), (job2, 5), (job3, 1), (job4, -1)] {
350                let (exec_job_id, res) = storage.execute_next().await.unwrap();
351                assert_eq!(job_id, &exec_job_id);
352                assert_eq!(res, Ok("()".to_owned()));
353                apalis_core::sleep(Duration::from_millis(500)).await;
354                let job = storage.fetch_by_id(&exec_job_id).await.unwrap().unwrap();
355                let ctx = job.parts.context;
356
357                assert_eq!(*ctx.status(), State::Done);
358                assert_eq!(ctx.priority(), prio);
359            }
360        }
361
362        #[tokio::test]
363        async fn test_schedule_request() {
364            use chrono::SubsecRound;
365
366            let mut storage = $setup().await;
367
368            let email = Email {
369                subject: "Scheduled Email".to_string(),
370                to: "scheduled@example.com".to_string(),
371                text: "This is a scheduled email.".to_string(),
372            };
373
374            // Schedule 1 minute in the future
375            let schedule_time = Utc::now() + Duration::from_secs(60);
376
377            let parts = storage
378                .schedule(email, schedule_time.timestamp())
379                .await
380                .expect("Failed to schedule request");
381
382            let job = get_job(&mut storage, &parts.task_id).await;
383            let ctx = &job.parts.context;
384
385            assert_eq!(*ctx.status(), State::Pending);
386            assert!(ctx.lock_by().is_none());
387            assert!(ctx.lock_at().is_none());
388            let expected_schedule_time = schedule_time.clone().trunc_subsecs(0);
389            assert_eq!(ctx.run_at(), &expected_schedule_time);
390        }
391
392        #[tokio::test]
393        async fn integration_test_shedule_and_run_job() {
394            let current = Utc::now();
395            let dur = Duration::from_secs(15);
396            let schedule_time = current + dur;
397            let mut storage = setup_test_wrapper().await;
398            storage
399                .schedule(
400                    email_service::example_good_email(),
401                    schedule_time.timestamp(),
402                )
403                .await
404                .unwrap();
405
406            let (job_id, res) = storage.execute_next().await.unwrap();
407            apalis_core::sleep(Duration::from_secs(1)).await;
408
409            assert_eq!(res, Ok("()".to_owned()));
410            let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
411            let ctx = job.parts.context;
412            assert_eq!(*ctx.status(), State::Done);
413            assert!(
414                ctx.lock_at().unwrap() >= schedule_time.timestamp(),
415                "{} should be greater than {}",
416                ctx.lock_at().unwrap(),
417                schedule_time.timestamp()
418            );
419        }
420    };
421}