1#![warn(
2 missing_debug_implementations,
3 missing_docs,
4 rust_2018_idioms,
5 unreachable_pub
6)]
7#![cfg_attr(docsrs, feature(doc_cfg))]
8
9use std::{num::TryFromIntError, time::Duration};
14
15use apalis_core::{error::Error, request::State};
16
17pub mod context;
19pub mod from_row;
21
22#[cfg(feature = "postgres")]
24#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))]
25pub mod postgres;
26
27#[cfg(feature = "sqlite")]
30#[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))]
31pub mod sqlite;
32
33#[cfg(feature = "mysql")]
35#[cfg_attr(docsrs, doc(cfg(feature = "mysql")))]
36pub mod mysql;
37
38pub use sqlx;
40
41#[derive(Debug, Clone)]
43pub struct Config {
44 keep_alive: Duration,
45 buffer_size: usize,
46 poll_interval: Duration,
47 reenqueue_orphaned_after: Duration,
48 namespace: String,
49}
50
51#[derive(Debug, thiserror::Error)]
53pub enum SqlError {
54 #[error("sqlx::Error: {0}")]
56 Sqlx(#[from] sqlx::Error),
57 #[error("TryFromIntError: {0}")]
59 TryFromInt(#[from] TryFromIntError),
60}
61
62impl Default for Config {
63 fn default() -> Self {
64 Self {
65 keep_alive: Duration::from_secs(30),
66 buffer_size: 10,
67 poll_interval: Duration::from_millis(100),
68 reenqueue_orphaned_after: Duration::from_secs(300), namespace: String::from("apalis::sql"),
70 }
71 }
72}
73
74impl Config {
75 pub fn new(namespace: &str) -> Self {
77 Config::default().set_namespace(namespace)
78 }
79
80 pub fn set_poll_interval(mut self, interval: Duration) -> Self {
84 self.poll_interval = interval;
85 self
86 }
87
88 pub fn set_keep_alive(mut self, keep_alive: Duration) -> Self {
92 self.keep_alive = keep_alive;
93 self
94 }
95
96 pub fn set_buffer_size(mut self, buffer_size: usize) -> Self {
100 self.buffer_size = buffer_size;
101 self
102 }
103
104 pub fn set_namespace(mut self, namespace: &str) -> Self {
108 self.namespace = namespace.to_string();
109 self
110 }
111
112 pub fn keep_alive(&self) -> &Duration {
114 &self.keep_alive
115 }
116
117 pub fn keep_alive_mut(&mut self) -> &mut Duration {
119 &mut self.keep_alive
120 }
121
122 pub fn buffer_size(&self) -> usize {
124 self.buffer_size
125 }
126
127 pub fn poll_interval(&self) -> &Duration {
129 &self.poll_interval
130 }
131
132 pub fn poll_interval_mut(&mut self) -> &mut Duration {
134 &mut self.poll_interval
135 }
136
137 pub fn namespace(&self) -> &String {
139 &self.namespace
140 }
141
142 pub fn namespace_mut(&mut self) -> &mut String {
144 &mut self.namespace
145 }
146
147 pub fn reenqueue_orphaned_after(&self) -> Duration {
149 self.reenqueue_orphaned_after
150 }
151
152 pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration {
154 &mut self.reenqueue_orphaned_after
155 }
156
157 pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self {
162 self.reenqueue_orphaned_after = after;
163 self
164 }
165}
166
167pub fn calculate_status<Res>(res: &Result<Res, Error>) -> State {
169 match res {
170 Ok(_) => State::Done,
171 Err(e) => match &e {
172 Error::Abort(_) => State::Killed,
173 _ => State::Failed,
174 },
175 }
176}
177
178#[macro_export]
180macro_rules! sql_storage_tests {
181 ($setup:path, $storage_type:ty, $job_type:ty) => {
182 async fn setup_test_wrapper(
183 ) -> TestWrapper<$storage_type, Request<$job_type, SqlContext>, ()> {
184 let (mut t, poller) = TestWrapper::new_with_service(
185 $setup().await,
186 apalis_core::service_fn::service_fn(email_service::send_email),
187 );
188 tokio::spawn(poller);
189 t.vacuum().await.unwrap();
190 t
191 }
192
193 #[tokio::test]
194 async fn integration_test_kill_job() {
195 let mut storage = setup_test_wrapper().await;
196
197 storage
198 .push(email_service::example_killed_email())
199 .await
200 .unwrap();
201
202 let (job_id, res) = storage.execute_next().await;
203 assert_eq!(res, Err("AbortError: Invalid character.".to_owned()));
204 apalis_core::sleep(Duration::from_secs(1)).await;
205 let job = storage
206 .fetch_by_id(&job_id)
207 .await
208 .unwrap()
209 .expect("No job found");
210 let ctx = job.parts.context;
211 assert_eq!(*ctx.status(), State::Killed);
212 assert_eq!(
214 ctx.last_error().clone().unwrap(),
215 "{\"Err\":\"AbortError: Invalid character.\"}"
216 );
217 }
218
219 #[tokio::test]
220 async fn integration_test_acknowledge_good_job() {
221 let mut storage = setup_test_wrapper().await;
222 storage
223 .push(email_service::example_good_email())
224 .await
225 .unwrap();
226
227 let (job_id, res) = storage.execute_next().await;
228 assert_eq!(res, Ok("()".to_owned()));
229 apalis_core::sleep(Duration::from_secs(1)).await;
230 let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
231 let ctx = job.parts.context;
232 assert_eq!(*ctx.status(), State::Done);
233 assert!(ctx.done_at().is_some());
234 }
235
236 #[tokio::test]
237 async fn integration_test_acknowledge_failed_job() {
238 let mut storage = setup_test_wrapper().await;
239
240 storage
241 .push(email_service::example_retry_able_email())
242 .await
243 .unwrap();
244
245 let (job_id, res) = storage.execute_next().await;
246 assert_eq!(
247 res,
248 Err("FailedError: Missing separator character '@'.".to_owned())
249 );
250 apalis_core::sleep(Duration::from_secs(1)).await;
251 let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
252 let ctx = job.parts.context;
253 assert_eq!(*ctx.status(), State::Failed);
254 assert!(job.parts.attempt.current() >= 1);
255 assert_eq!(
256 ctx.last_error().clone().unwrap(),
257 "{\"Err\":\"FailedError: Missing separator character '@'.\"}"
258 );
259 }
260
261 #[tokio::test]
262 async fn worker_consume() {
263 use apalis_core::builder::WorkerBuilder;
264 use apalis_core::builder::WorkerFactoryFn;
265 let storage = $setup().await;
266 let mut handle = storage.clone();
267
268 let parts = handle
269 .push(email_service::example_good_email())
270 .await
271 .unwrap();
272
273 async fn task(_job: Email) -> &'static str {
274 tokio::time::sleep(Duration::from_millis(100)).await;
275 "Job well done"
276 }
277 let worker = WorkerBuilder::new("rango-tango").backend(storage);
278 let worker = worker.build_fn(task);
279 let wkr = worker.run();
280
281 let w = wkr.get_handle();
282
283 let runner = async move {
284 apalis_core::sleep(Duration::from_secs(3)).await;
285 let job_id = &parts.task_id;
286 let job = get_job(&mut handle, job_id).await;
287 let ctx = job.parts.context;
288
289 assert_eq!(*ctx.status(), State::Done);
290 assert!(ctx.done_at().is_some());
291 assert!(ctx.lock_by().is_some());
292 assert!(ctx.lock_at().is_some());
293 assert!(ctx.last_error().is_some()); w.stop();
296 };
297
298 tokio::join!(runner, wkr);
299 }
300 };
301}