1use crate::{util::std_duration_to_chrono, Error, StepResult};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use serde::{de::DeserializeOwned, Serialize};
5use sqlx::{types::Uuid, PgExecutor, PgPool};
6use std::{fmt, time::Duration};
7
8#[async_trait]
10pub trait Step<Task>
11where
12 Task: Sized,
13 Self: Into<Task> + Send + Sized + fmt::Debug + DeserializeOwned + Serialize,
14{
15 const RETRY_LIMIT: i32 = 0;
17
18 const RETRY_DELAY: Duration = Duration::from_secs(1);
20
21 async fn step(self, db: &PgPool) -> StepResult<Task>;
23
24 fn retry_limit(&self) -> i32 {
26 Self::RETRY_LIMIT
27 }
28
29 fn retry_delay(&self) -> Duration {
31 Self::RETRY_DELAY
32 }
33}
34
35#[async_trait]
37pub trait Scheduler: fmt::Debug + DeserializeOwned + Serialize + Sized + Sync {
38 async fn enqueue<'e>(&self, db: impl PgExecutor<'e>) -> crate::Result<Uuid> {
40 self.schedule(db, Utc::now()).await
41 }
42
43 async fn delay<'e>(&self, db: impl PgExecutor<'e>, delay: Duration) -> crate::Result<Uuid> {
45 let delay = std_duration_to_chrono(delay);
46 self.schedule(db, Utc::now() + delay).await
47 }
48
49 async fn schedule<'e>(
51 &self,
52 db: impl PgExecutor<'e>,
53 at: DateTime<Utc>,
54 ) -> crate::Result<Uuid> {
55 let step = serde_json::to_string(self)
56 .map_err(|e| Error::SerializeStep(e, format!("{self:?}")))?;
57 sqlx::query!(
58 "INSERT INTO pg_task (step, wakeup_at) VALUES ($1, $2) RETURNING id",
59 step,
60 at
61 )
62 .map(|r| r.id)
63 .fetch_one(db)
64 .await
65 .map_err(Error::AddTask)
66 }
67}