aide_de_camp/core/
queue.rs

1use async_trait::async_trait;
2use bincode::{Decode, Encode};
3use chrono::Utc;
4use thiserror::Error;
5
6use crate::core::job_handle::JobHandle;
7use crate::core::job_processor::JobProcessor;
8use crate::core::{DateTime, Duration, Xid};
9
10/// An interface to queue implementation. Responsible for pushing jobs into the queue and pulling
11/// jobs out of the queue.
12///
13/// ### Priority
14///
15/// When is enqueued one can specify priority. Jobs with higher priority will get polled first even if submitted after lower priority jobs.
16#[async_trait]
17pub trait Queue: Send + Sync {
18    type JobHandle: JobHandle;
19    /// Schedule a job to run at the future time.
20    async fn schedule_at<J>(
21        &self,
22        payload: J::Payload,
23        scheduled_at: DateTime,
24        priority: i8,
25    ) -> Result<Xid, QueueError>
26    where
27        J: JobProcessor + 'static,
28        J::Payload: Encode;
29    /// Schedule a job to run next. Depending on queue backlog this may start running later than you expect.
30    async fn schedule<J>(&self, payload: J::Payload, priority: i8) -> Result<Xid, QueueError>
31    where
32        J: JobProcessor + 'static,
33        J::Payload: Encode,
34    {
35        self.schedule_at::<J>(payload, Utc::now(), priority).await
36    }
37
38    /// Schedule a job to run at the future time relative to now.
39    async fn schedule_in<J>(
40        &self,
41        payload: J::Payload,
42        scheduled_in: Duration,
43        priority: i8,
44    ) -> Result<Xid, QueueError>
45    where
46        J: JobProcessor + 'static,
47        J::Payload: Encode,
48    {
49        let when = Utc::now() + scheduled_in;
50        self.schedule_at::<J>(payload, when, priority).await
51    }
52
53    /// Pool queue, implementation should not wait for next job, if there nothing return `Ok(None)`.
54    async fn poll_next_with_instant(
55        &self,
56        job_types: &[&str],
57        time: DateTime,
58    ) -> Result<Option<Self::JobHandle>, QueueError>;
59
60    /// Pool queue, implementation should not wait for next job, if there nothing return `Ok(None)`.
61    async fn poll_next(&self, job_types: &[&str]) -> Result<Option<Self::JobHandle>, QueueError> {
62        self.poll_next_with_instant(job_types, Utc::now()).await
63    }
64
65    /// Await next job. Default implementation polls the queue with defined interval until there is something.
66    async fn next(
67        &self,
68        job_types: &[&str],
69        interval: Duration,
70    ) -> Result<Self::JobHandle, QueueError> {
71        let duration = interval
72            .to_std()
73            .map_err(|_| QueueError::InvalidInterval(interval))?;
74        let mut interval = tokio::time::interval(duration);
75        loop {
76            interval.tick().await;
77            let job = self.poll_next(job_types).await?;
78            if let Some(job) = job {
79                break Ok(job);
80            }
81        }
82    }
83
84    /// Cancel job that has been scheduled. Right now this will only cancel if the job hasn't started yet.
85    async fn cancel_job(&self, job_id: Xid) -> Result<(), QueueError>;
86
87    /// The same as [`cancel_job`](struct.cancel_job.html), but returns payload of canceled job.
88    /// If deserialization fails, then job won't be cancelled.
89    async fn unschedule_job<J>(&self, job_id: Xid) -> Result<J::Payload, QueueError>
90    where
91        J: JobProcessor + 'static,
92        J::Payload: Decode;
93}
94
95/// Errors related to queue operation.
96#[derive(Error, Debug)]
97#[non_exhaustive]
98pub enum QueueError {
99    /// Encountered an error when tried to serialize Context.
100    #[error("Failed to serialize job context")]
101    EncodeError {
102        #[from]
103        source: bincode::error::EncodeError,
104    },
105    /// Encountered an error when tried to deserialize Context.
106    #[error("Failed to deserialize job context")]
107    DecodeError {
108        #[from]
109        source: bincode::error::DecodeError,
110    },
111    #[error("Interval must be greater than zero: {0:?}")]
112    InvalidInterval(Duration),
113    #[error("Job by that ID does not exist: {0}")]
114    JobNotFound(Xid),
115    #[error(transparent)]
116    Other(#[from] anyhow::Error),
117}