#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
use std::{num::TryFromIntError, time::Duration};
use apalis_core::{error::Error, request::State, response::Response};
pub mod context;
pub mod from_row;
#[cfg(feature = "postgres")]
#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))]
pub mod postgres;
#[cfg(feature = "sqlite")]
#[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))]
pub mod sqlite;
#[cfg(feature = "mysql")]
#[cfg_attr(docsrs, doc(cfg(feature = "mysql")))]
pub mod mysql;
use context::SqlContext;
pub use sqlx;
#[derive(Debug, Clone)]
pub struct Config {
keep_alive: Duration,
buffer_size: usize,
poll_interval: Duration,
reenqueue_orphaned_after: Duration,
namespace: String,
}
#[derive(Debug, thiserror::Error)]
pub enum SqlError {
#[error("sqlx::Error: {0}")]
Sqlx(#[from] sqlx::Error),
#[error("TryFromIntError: {0}")]
TryFromInt(#[from] TryFromIntError),
}
impl Default for Config {
fn default() -> Self {
Self {
keep_alive: Duration::from_secs(30),
buffer_size: 10,
poll_interval: Duration::from_millis(100),
reenqueue_orphaned_after: Duration::from_secs(300), namespace: String::from("apalis::sql"),
}
}
}
impl Config {
pub fn new(namespace: &str) -> Self {
Config::default().set_namespace(namespace)
}
pub fn set_poll_interval(mut self, interval: Duration) -> Self {
self.poll_interval = interval;
self
}
pub fn set_keep_alive(mut self, keep_alive: Duration) -> Self {
self.keep_alive = keep_alive;
self
}
pub fn set_buffer_size(mut self, buffer_size: usize) -> Self {
self.buffer_size = buffer_size;
self
}
pub fn set_namespace(mut self, namespace: &str) -> Self {
self.namespace = namespace.to_string();
self
}
pub fn keep_alive(&self) -> &Duration {
&self.keep_alive
}
pub fn keep_alive_mut(&mut self) -> &mut Duration {
&mut self.keep_alive
}
pub fn buffer_size(&self) -> usize {
self.buffer_size
}
pub fn poll_interval(&self) -> &Duration {
&self.poll_interval
}
pub fn poll_interval_mut(&mut self) -> &mut Duration {
&mut self.poll_interval
}
pub fn namespace(&self) -> &String {
&self.namespace
}
pub fn namespace_mut(&mut self) -> &mut String {
&mut self.namespace
}
pub fn reenqueue_orphaned_after(&self) -> Duration {
self.reenqueue_orphaned_after
}
pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration {
&mut self.reenqueue_orphaned_after
}
pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self {
self.reenqueue_orphaned_after = after;
self
}
}
pub fn calculate_status<Res>(ctx: &SqlContext, res: &Response<Res>) -> State {
match &res.inner {
Ok(_) => State::Done,
Err(e) => match &e {
Error::Abort(_) => State::Killed,
Error::Failed(_) if ctx.max_attempts() as usize <= res.attempt.current() => {
State::Killed
}
_ => State::Failed,
},
}
}
#[macro_export]
macro_rules! sql_storage_tests {
($setup:path, $storage_type:ty, $job_type:ty) => {
type WrappedStorage = TestWrapper<$storage_type, Request<$job_type, SqlContext>, ()>;
async fn setup_test_wrapper() -> WrappedStorage {
let (mut t, poller) = TestWrapper::new_with_service(
$setup().await,
apalis_core::service_fn::service_fn(email_service::send_email),
);
tokio::spawn(poller);
t.vacuum().await.unwrap();
t
}
async fn push_email_priority(
storage: &mut WrappedStorage,
email: Email,
priority: i32,
) -> TaskId {
let mut ctx = SqlContext::new();
ctx.set_priority(priority);
storage
.push_request(Request::new_with_ctx(email, ctx))
.await
.expect("failed to push a job")
.task_id
}
#[tokio::test]
async fn integration_test_kill_job() {
let mut storage = setup_test_wrapper().await;
storage
.push(email_service::example_killed_email())
.await
.unwrap();
let (job_id, res) = storage.execute_next().await.unwrap();
assert_eq!(res, Err("AbortError: Invalid character.".to_owned()));
apalis_core::sleep(Duration::from_secs(1)).await;
let job = storage
.fetch_by_id(&job_id)
.await
.unwrap()
.expect("No job found");
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Killed);
assert_eq!(
ctx.last_error().clone().unwrap(),
"{\"Err\":\"AbortError: Invalid character.\"}"
);
}
#[tokio::test]
async fn integration_test_update_job() {
let mut storage = setup_test_wrapper().await;
let task_id = storage
.push(Email {
subject: "Test Subject".to_string(),
to: "example@sql".to_string(),
text: "Some Text".to_string(),
})
.await
.expect("failed to push a job")
.task_id;
let mut job = get_job(&mut storage, &task_id).await;
job.parts.context.set_status(State::Killed);
storage.update(job).await.expect("updating to succeed");
let job = get_job(&mut storage, &task_id).await;
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Killed);
}
#[tokio::test]
async fn integration_test_acknowledge_good_job() {
let mut storage = setup_test_wrapper().await;
storage
.push(email_service::example_good_email())
.await
.unwrap();
let (job_id, res) = storage.execute_next().await.unwrap();
assert_eq!(res, Ok("()".to_owned()));
apalis_core::sleep(Duration::from_secs(1)).await;
let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Done);
assert!(ctx.done_at().is_some());
}
#[tokio::test]
async fn integration_test_acknowledge_failed_job() {
let mut storage = setup_test_wrapper().await;
storage
.push(email_service::example_retry_able_email())
.await
.unwrap();
let (job_id, res) = storage.execute_next().await.unwrap();
assert_eq!(
res,
Err("FailedError: Missing separator character '@'.".to_owned())
);
apalis_core::sleep(Duration::from_secs(1)).await;
let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Failed);
assert!(job.parts.attempt.current() >= 1);
assert_eq!(
ctx.last_error().clone().unwrap(),
"{\"Err\":\"FailedError: Missing separator character '@'.\"}"
);
}
#[tokio::test]
async fn worker_consume() {
use apalis_core::builder::WorkerBuilder;
use apalis_core::builder::WorkerFactoryFn;
let storage = $setup().await;
let mut handle = storage.clone();
let parts = handle
.push(email_service::example_good_email())
.await
.unwrap();
async fn task(_job: Email) -> &'static str {
tokio::time::sleep(Duration::from_millis(100)).await;
"Job well done"
}
let worker = WorkerBuilder::new("rango-tango").backend(storage);
let worker = worker.build_fn(task);
let wkr = worker.run();
let w = wkr.get_handle();
let runner = async move {
apalis_core::sleep(Duration::from_secs(3)).await;
let job_id = &parts.task_id;
let job = get_job(&mut handle, job_id).await;
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Done);
assert!(ctx.done_at().is_some());
assert!(ctx.lock_by().is_some());
assert!(ctx.lock_at().is_some());
assert!(ctx.last_error().is_some());
w.stop();
};
tokio::join!(runner, wkr);
}
#[tokio::test]
async fn test_consume_jobs_with_priority() {
let mut storage = setup_test_wrapper().await;
let job2 =
push_email_priority(&mut storage, email_service::example_good_email(), 5).await;
let job1 =
push_email_priority(&mut storage, email_service::example_good_email(), 10).await;
let job4 =
push_email_priority(&mut storage, email_service::example_good_email(), -1).await;
let job3 =
push_email_priority(&mut storage, email_service::example_good_email(), 1).await;
for (job_id, prio) in &[(job1, 10), (job2, 5), (job3, 1), (job4, -1)] {
let (exec_job_id, res) = storage.execute_next().await.unwrap();
assert_eq!(job_id, &exec_job_id);
assert_eq!(res, Ok("()".to_owned()));
apalis_core::sleep(Duration::from_millis(500)).await;
let job = storage.fetch_by_id(&exec_job_id).await.unwrap().unwrap();
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Done);
assert_eq!(ctx.priority(), prio);
}
}
#[tokio::test]
async fn test_schedule_request() {
use chrono::SubsecRound;
let mut storage = $setup().await;
let email = Email {
subject: "Scheduled Email".to_string(),
to: "scheduled@example.com".to_string(),
text: "This is a scheduled email.".to_string(),
};
let schedule_time = Utc::now() + Duration::from_secs(60);
let parts = storage
.schedule(email, schedule_time.timestamp())
.await
.expect("Failed to schedule request");
let job = get_job(&mut storage, &parts.task_id).await;
let ctx = &job.parts.context;
assert_eq!(*ctx.status(), State::Pending);
assert!(ctx.lock_by().is_none());
assert!(ctx.lock_at().is_none());
let expected_schedule_time = schedule_time.clone().trunc_subsecs(0);
assert_eq!(ctx.run_at(), &expected_schedule_time);
}
#[tokio::test]
async fn test_backend_expose_succeeds() {
let storage = setup_test_wrapper().await;
assert!(storage.stats().await.is_ok());
assert!(storage.list_jobs(&State::Pending, 1).await.is_ok());
assert!(storage.list_workers().await.is_ok());
}
#[tokio::test]
async fn integration_test_shedule_and_run_job() {
let current = Utc::now();
let dur = Duration::from_secs(15);
let schedule_time = current + dur;
let mut storage = setup_test_wrapper().await;
storage
.schedule(
email_service::example_good_email(),
schedule_time.timestamp(),
)
.await
.unwrap();
let (job_id, res) = storage.execute_next().await.unwrap();
apalis_core::sleep(Duration::from_secs(1)).await;
assert_eq!(res, Ok("()".to_owned()));
let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
let ctx = job.parts.context;
assert_eq!(*ctx.status(), State::Done);
assert!(
ctx.lock_at().unwrap() >= schedule_time.timestamp(),
"{} should be greater than {}",
ctx.lock_at().unwrap(),
schedule_time.timestamp()
);
}
};
}