apalis_sql/
lib.rs

1#![warn(
2    missing_debug_implementations,
3    missing_docs,
4    rust_2018_idioms,
5    unreachable_pub
6)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
8
9//! # apalis-sql
10//! apalis offers Sqlite, Mysql and Postgres storages for its workers.
11//! See relevant modules for examples
12
13use std::{num::TryFromIntError, time::Duration};
14
15use apalis_core::{error::Error, request::State};
16
17/// The context of the sql job
18pub mod context;
19/// Util for fetching rows
20pub mod from_row;
21
22/// Postgres storage for apalis. Uses `NOTIFY` and `SKIP LOCKED`
23#[cfg(feature = "postgres")]
24#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))]
25pub mod postgres;
26
27/// Sqlite Storage for apalis.
28/// Uses a transaction and min(rowid)
29#[cfg(feature = "sqlite")]
30#[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))]
31pub mod sqlite;
32
33/// MySQL storage for apalis. Uses `SKIP LOCKED`
34#[cfg(feature = "mysql")]
35#[cfg_attr(docsrs, doc(cfg(feature = "mysql")))]
36pub mod mysql;
37
38// Re-exports
39pub use sqlx;
40
41/// Config for sql storages
42#[derive(Debug, Clone)]
43pub struct Config {
44    keep_alive: Duration,
45    buffer_size: usize,
46    poll_interval: Duration,
47    reenqueue_orphaned_after: Duration,
48    namespace: String,
49}
50
51/// A general sql error
52#[derive(Debug, thiserror::Error)]
53pub enum SqlError {
54    /// Handles sqlx errors
55    #[error("sqlx::Error: {0}")]
56    Sqlx(#[from] sqlx::Error),
57    /// Handles int conversion errors
58    #[error("TryFromIntError: {0}")]
59    TryFromInt(#[from] TryFromIntError),
60}
61
62impl Default for Config {
63    fn default() -> Self {
64        Self {
65            keep_alive: Duration::from_secs(30),
66            buffer_size: 10,
67            poll_interval: Duration::from_millis(100),
68            reenqueue_orphaned_after: Duration::from_secs(300), // 5 minutes
69            namespace: String::from("apalis::sql"),
70        }
71    }
72}
73
74impl Config {
75    /// Create a new config with a jobs namespace
76    pub fn new(namespace: &str) -> Self {
77        Config::default().set_namespace(namespace)
78    }
79
80    /// Interval between database poll queries
81    ///
82    /// Defaults to 100ms
83    pub fn set_poll_interval(mut self, interval: Duration) -> Self {
84        self.poll_interval = interval;
85        self
86    }
87
88    /// Interval between worker keep-alive database updates
89    ///
90    /// Defaults to 30s
91    pub fn set_keep_alive(mut self, keep_alive: Duration) -> Self {
92        self.keep_alive = keep_alive;
93        self
94    }
95
96    /// Buffer size to use when querying for jobs
97    ///
98    /// Defaults to 10
99    pub fn set_buffer_size(mut self, buffer_size: usize) -> Self {
100        self.buffer_size = buffer_size;
101        self
102    }
103
104    /// Set the namespace to consume and push jobs to
105    ///
106    /// Defaults to "apalis::sql"
107    pub fn set_namespace(mut self, namespace: &str) -> Self {
108        self.namespace = namespace.to_string();
109        self
110    }
111
112    /// Gets a reference to the keep_alive duration.
113    pub fn keep_alive(&self) -> &Duration {
114        &self.keep_alive
115    }
116
117    /// Gets a mutable reference to the keep_alive duration.
118    pub fn keep_alive_mut(&mut self) -> &mut Duration {
119        &mut self.keep_alive
120    }
121
122    /// Gets the buffer size.
123    pub fn buffer_size(&self) -> usize {
124        self.buffer_size
125    }
126
127    /// Gets a reference to the poll_interval duration.
128    pub fn poll_interval(&self) -> &Duration {
129        &self.poll_interval
130    }
131
132    /// Gets a mutable reference to the poll_interval duration.
133    pub fn poll_interval_mut(&mut self) -> &mut Duration {
134        &mut self.poll_interval
135    }
136
137    /// Gets a reference to the namespace.
138    pub fn namespace(&self) -> &String {
139        &self.namespace
140    }
141
142    /// Gets a mutable reference to the namespace.
143    pub fn namespace_mut(&mut self) -> &mut String {
144        &mut self.namespace
145    }
146
147    /// Gets the reenqueue_orphaned_after duration.
148    pub fn reenqueue_orphaned_after(&self) -> Duration {
149        self.reenqueue_orphaned_after
150    }
151
152    /// Gets a mutable reference to the reenqueue_orphaned_after.
153    pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration {
154        &mut self.reenqueue_orphaned_after
155    }
156
157    /// Occasionally some workers die, or abandon jobs because of panics.
158    /// This is the time a task takes before its back to the queue
159    ///
160    /// Defaults to 5 minutes
161    pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self {
162        self.reenqueue_orphaned_after = after;
163        self
164    }
165}
166
167/// Calculates the status from a result
168pub fn calculate_status<Res>(res: &Result<Res, Error>) -> State {
169    match res {
170        Ok(_) => State::Done,
171        Err(e) => match &e {
172            Error::Abort(_) => State::Killed,
173            _ => State::Failed,
174        },
175    }
176}
177
178/// Standard checks for any sql backend
179#[macro_export]
180macro_rules! sql_storage_tests {
181    ($setup:path, $storage_type:ty, $job_type:ty) => {
182        async fn setup_test_wrapper(
183        ) -> TestWrapper<$storage_type, Request<$job_type, SqlContext>, ()> {
184            let (mut t, poller) = TestWrapper::new_with_service(
185                $setup().await,
186                apalis_core::service_fn::service_fn(email_service::send_email),
187            );
188            tokio::spawn(poller);
189            t.vacuum().await.unwrap();
190            t
191        }
192
193        #[tokio::test]
194        async fn integration_test_kill_job() {
195            let mut storage = setup_test_wrapper().await;
196
197            storage
198                .push(email_service::example_killed_email())
199                .await
200                .unwrap();
201
202            let (job_id, res) = storage.execute_next().await;
203            assert_eq!(res, Err("AbortError: Invalid character.".to_owned()));
204            apalis_core::sleep(Duration::from_secs(1)).await;
205            let job = storage
206                .fetch_by_id(&job_id)
207                .await
208                .unwrap()
209                .expect("No job found");
210            let ctx = job.parts.context;
211            assert_eq!(*ctx.status(), State::Killed);
212            // assert!(ctx.done_at().is_some());
213            assert_eq!(
214                ctx.last_error().clone().unwrap(),
215                "{\"Err\":\"AbortError: Invalid character.\"}"
216            );
217        }
218
219        #[tokio::test]
220        async fn integration_test_acknowledge_good_job() {
221            let mut storage = setup_test_wrapper().await;
222            storage
223                .push(email_service::example_good_email())
224                .await
225                .unwrap();
226
227            let (job_id, res) = storage.execute_next().await;
228            assert_eq!(res, Ok("()".to_owned()));
229            apalis_core::sleep(Duration::from_secs(1)).await;
230            let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
231            let ctx = job.parts.context;
232            assert_eq!(*ctx.status(), State::Done);
233            assert!(ctx.done_at().is_some());
234        }
235
236        #[tokio::test]
237        async fn integration_test_acknowledge_failed_job() {
238            let mut storage = setup_test_wrapper().await;
239
240            storage
241                .push(email_service::example_retry_able_email())
242                .await
243                .unwrap();
244
245            let (job_id, res) = storage.execute_next().await;
246            assert_eq!(
247                res,
248                Err("FailedError: Missing separator character '@'.".to_owned())
249            );
250            apalis_core::sleep(Duration::from_secs(1)).await;
251            let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
252            let ctx = job.parts.context;
253            assert_eq!(*ctx.status(), State::Failed);
254            assert!(job.parts.attempt.current() >= 1);
255            assert_eq!(
256                ctx.last_error().clone().unwrap(),
257                "{\"Err\":\"FailedError: Missing separator character '@'.\"}"
258            );
259        }
260
261        #[tokio::test]
262        async fn worker_consume() {
263            use apalis_core::builder::WorkerBuilder;
264            use apalis_core::builder::WorkerFactoryFn;
265            let storage = $setup().await;
266            let mut handle = storage.clone();
267
268            let parts = handle
269                .push(email_service::example_good_email())
270                .await
271                .unwrap();
272
273            async fn task(_job: Email) -> &'static str {
274                tokio::time::sleep(Duration::from_millis(100)).await;
275                "Job well done"
276            }
277            let worker = WorkerBuilder::new("rango-tango").backend(storage);
278            let worker = worker.build_fn(task);
279            let wkr = worker.run();
280
281            let w = wkr.get_handle();
282
283            let runner = async move {
284                apalis_core::sleep(Duration::from_secs(3)).await;
285                let job_id = &parts.task_id;
286                let job = get_job(&mut handle, job_id).await;
287                let ctx = job.parts.context;
288
289                assert_eq!(*ctx.status(), State::Done);
290                assert!(ctx.done_at().is_some());
291                assert!(ctx.lock_by().is_some());
292                assert!(ctx.lock_at().is_some());
293                assert!(ctx.last_error().is_some()); // TODO: rename last_error to last_result
294
295                w.stop();
296            };
297
298            tokio::join!(runner, wkr);
299        }
300    };
301}