#[macro_export]
macro_rules! task {
($enum:ident { $($variant:ident),* $(,)? }) => {
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum $enum {
$($variant($variant),)*
}
$(
impl From<$variant> for $enum {
fn from(inner: $variant) -> Self {
Self::$variant(inner)
}
}
)*
#[async_trait::async_trait]
impl $crate::Step<$enum> for $enum {
async fn step(self, db: &sqlx::PgPool) -> $crate::StepResult<$enum> {
match self {
$(Self::$variant(inner) => inner.step(db).await.map(|next|
match next {
$crate::NextStep::None => $crate::NextStep::None,
$crate::NextStep::Now(x) => $crate::NextStep::Now(x.into()),
$crate::NextStep::Delayed(x, d) => $crate::NextStep::Delayed(x.into(), d),
}
),)*
}
}
fn retry_limit(&self) -> i32 {
match self {
$(Self::$variant(inner) => inner.retry_limit(),)*
}
}
fn retry_delay(&self) -> std::time::Duration {
match self {
$(Self::$variant(inner) => inner.retry_delay(),)*
}
}
}
}
}
#[macro_export]
macro_rules! scheduler {
($enum:ident { $($variant:ident),* $(,)? }) => {
$crate::task!($enum { $($variant),* });
impl $crate::Scheduler for $enum {}
}
}
#[cfg(test)]
mod tests {
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::time::Duration;
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub(super) struct First;
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub(super) struct Second;
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub(super) struct Third;
crate::task!(MacroTask {
First,
Second,
Third
});
crate::scheduler!(MacroScheduler { MacroTask });
#[async_trait::async_trait]
impl crate::Step<MacroTask> for First {
const RETRY_LIMIT: i32 = 3;
const RETRY_DELAY: Duration = Duration::from_millis(25);
async fn step(self, _db: &PgPool) -> crate::StepResult<MacroTask> {
crate::NextStep::delay(Second, Self::RETRY_DELAY)
}
}
#[async_trait::async_trait]
impl crate::Step<MacroTask> for Second {
async fn step(self, _db: &PgPool) -> crate::StepResult<MacroTask> {
crate::NextStep::none()
}
}
#[async_trait::async_trait]
impl crate::Step<MacroTask> for Third {
async fn step(self, _db: &PgPool) -> crate::StepResult<MacroTask> {
crate::NextStep::now(Second)
}
}
fn assert_scheduler<T: crate::Scheduler>() {}
#[test]
fn task_macro_implements_from_for_each_variant() {
let task: MacroTask = First.into();
assert!(matches!(task, MacroTask::First(First)));
}
#[test]
fn scheduler_macro_implements_the_scheduler_trait() {
assert_scheduler::<MacroScheduler>();
}
#[sqlx::test(migrations = "./migrations")]
async fn scheduler_macro_schedules_wrapped_tasks(pool: PgPool) {
let task = MacroScheduler::MacroTask(MacroTask::First(First));
let id = crate::enqueue(&pool, &task).await.unwrap();
let row = sqlx::query!("SELECT step FROM pg_task WHERE id = $1", id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.step, serde_json::to_string(&task).unwrap());
}
#[tokio::test]
async fn task_macro_forwards_step_and_retry_metadata() {
let pool = PgPoolOptions::new()
.connect_lazy("postgres:///pg_task")
.unwrap();
let task = MacroTask::First(First);
assert_eq!(crate::Step::<MacroTask>::retry_limit(&task), 3);
assert_eq!(
crate::Step::<MacroTask>::retry_delay(&task),
Duration::from_millis(25),
);
match crate::Step::<MacroTask>::step(task, &pool).await.unwrap() {
crate::NextStep::Delayed(MacroTask::Second(Second), delay) => {
assert_eq!(delay, Duration::from_millis(25));
}
_ => panic!("expected the delayed second step"),
}
}
#[tokio::test]
async fn task_macro_forwards_terminal_steps() {
let pool = PgPoolOptions::new()
.connect_lazy("postgres:///pg_task")
.unwrap();
assert!(matches!(
crate::Step::<MacroTask>::step(MacroTask::Second(Second), &pool)
.await
.unwrap(),
crate::NextStep::None
));
}
#[tokio::test]
async fn task_macro_forwards_immediate_steps() {
let pool = PgPoolOptions::new()
.connect_lazy("postgres:///pg_task")
.unwrap();
assert!(matches!(
crate::Step::<MacroTask>::step(MacroTask::Third(Third), &pool)
.await
.unwrap(),
crate::NextStep::Now(MacroTask::Second(Second))
));
}
}