pg_task/
traits.rs

1use crate::{util::std_duration_to_chrono, Error, StepResult};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use serde::{de::DeserializeOwned, Serialize};
5use sqlx::{types::Uuid, PgExecutor, PgPool};
6use std::{fmt, time::Duration};
7
8/// A tait to implement on each task step
9#[async_trait]
10pub trait Step<Task>
11where
12    Task: Sized,
13    Self: Into<Task> + Send + Sized + fmt::Debug + DeserializeOwned + Serialize,
14{
15    /// How many times retry_limit the step on an error
16    const RETRY_LIMIT: i32 = 0;
17
18    /// The time to wait between retries
19    const RETRY_DELAY: Duration = Duration::from_secs(1);
20
21    /// Processes the current step and returns the next if any
22    async fn step(self, db: &PgPool) -> StepResult<Task>;
23
24    /// Proxies the `RETRY` const, doesn't mean to be changed in impls
25    fn retry_limit(&self) -> i32 {
26        Self::RETRY_LIMIT
27    }
28
29    /// Proxies the `RETRY_DELAY` const, doesn't mean to be changed in impls
30    fn retry_delay(&self) -> Duration {
31        Self::RETRY_DELAY
32    }
33}
34
35/// A tait to implement on the outer enum wrapper containing all the tasks
36#[async_trait]
37pub trait Scheduler: fmt::Debug + DeserializeOwned + Serialize + Sized + Sync {
38    /// Enqueues the task to be run immediately
39    async fn enqueue<'e>(&self, db: impl PgExecutor<'e>) -> crate::Result<Uuid> {
40        self.schedule(db, Utc::now()).await
41    }
42
43    /// Schedules a task to be run after a specified delay
44    async fn delay<'e>(&self, db: impl PgExecutor<'e>, delay: Duration) -> crate::Result<Uuid> {
45        let delay = std_duration_to_chrono(delay);
46        self.schedule(db, Utc::now() + delay).await
47    }
48
49    /// Schedules a task to run at a specified time in the future
50    async fn schedule<'e>(
51        &self,
52        db: impl PgExecutor<'e>,
53        at: DateTime<Utc>,
54    ) -> crate::Result<Uuid> {
55        let step = serde_json::to_string(self)
56            .map_err(|e| Error::SerializeStep(e, format!("{self:?}")))?;
57        sqlx::query!(
58            "INSERT INTO pg_task (step, wakeup_at) VALUES ($1, $2) RETURNING id",
59            step,
60            at
61        )
62        .map(|r| r.id)
63        .fetch_one(db)
64        .await
65        .map_err(Error::AddTask)
66    }
67}