use crate::{util::std_duration_to_chrono, Error, StepResult};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{de::DeserializeOwned, Serialize};
use sqlx::{types::Uuid, PgExecutor, PgPool};
use std::{fmt, time::Duration};
#[async_trait]
pub trait Step<Task>
where
Task: Sized,
Self: Into<Task> + Send + Sized + fmt::Debug + DeserializeOwned + Serialize,
{
const RETRY_LIMIT: i32 = 0;
const RETRY_DELAY: Duration = Duration::from_secs(1);
async fn step(self, db: &PgPool) -> StepResult<Task>;
fn retry_limit(&self) -> i32 {
Self::RETRY_LIMIT
}
fn retry_delay(&self) -> Duration {
Self::RETRY_DELAY
}
}
#[async_trait]
pub trait Scheduler: fmt::Debug + DeserializeOwned + Serialize + Sized + Sync {
async fn enqueue<'e>(&self, db: impl PgExecutor<'e>) -> crate::Result<Uuid> {
self.schedule(db, Utc::now()).await
}
async fn delay<'e>(&self, db: impl PgExecutor<'e>, delay: Duration) -> crate::Result<Uuid> {
let delay = std_duration_to_chrono(delay);
self.schedule(db, Utc::now() + delay).await
}
async fn schedule<'e>(
&self,
db: impl PgExecutor<'e>,
at: DateTime<Utc>,
) -> crate::Result<Uuid> {
let step = serde_json::to_string(self)
.map_err(|e| Error::SerializeStep(e, format!("{self:?}")))?;
sqlx::query!(
"INSERT INTO pg_task (step, wakeup_at) VALUES ($1, $2) RETURNING id",
step,
at
)
.map(|r| r.id)
.fetch_one(db)
.await
.map_err(Error::AddTask)
}
}