aide_de_camp/core/
queue.rs1use async_trait::async_trait;
2use bincode::{Decode, Encode};
3use chrono::Utc;
4use thiserror::Error;
5
6use crate::core::job_handle::JobHandle;
7use crate::core::job_processor::JobProcessor;
8use crate::core::{DateTime, Duration, Xid};
9
10#[async_trait]
17pub trait Queue: Send + Sync {
18 type JobHandle: JobHandle;
19 async fn schedule_at<J>(
21 &self,
22 payload: J::Payload,
23 scheduled_at: DateTime,
24 priority: i8,
25 ) -> Result<Xid, QueueError>
26 where
27 J: JobProcessor + 'static,
28 J::Payload: Encode;
29 async fn schedule<J>(&self, payload: J::Payload, priority: i8) -> Result<Xid, QueueError>
31 where
32 J: JobProcessor + 'static,
33 J::Payload: Encode,
34 {
35 self.schedule_at::<J>(payload, Utc::now(), priority).await
36 }
37
38 async fn schedule_in<J>(
40 &self,
41 payload: J::Payload,
42 scheduled_in: Duration,
43 priority: i8,
44 ) -> Result<Xid, QueueError>
45 where
46 J: JobProcessor + 'static,
47 J::Payload: Encode,
48 {
49 let when = Utc::now() + scheduled_in;
50 self.schedule_at::<J>(payload, when, priority).await
51 }
52
53 async fn poll_next_with_instant(
55 &self,
56 job_types: &[&str],
57 time: DateTime,
58 ) -> Result<Option<Self::JobHandle>, QueueError>;
59
60 async fn poll_next(&self, job_types: &[&str]) -> Result<Option<Self::JobHandle>, QueueError> {
62 self.poll_next_with_instant(job_types, Utc::now()).await
63 }
64
65 async fn next(
67 &self,
68 job_types: &[&str],
69 interval: Duration,
70 ) -> Result<Self::JobHandle, QueueError> {
71 let duration = interval
72 .to_std()
73 .map_err(|_| QueueError::InvalidInterval(interval))?;
74 let mut interval = tokio::time::interval(duration);
75 loop {
76 interval.tick().await;
77 let job = self.poll_next(job_types).await?;
78 if let Some(job) = job {
79 break Ok(job);
80 }
81 }
82 }
83
84 async fn cancel_job(&self, job_id: Xid) -> Result<(), QueueError>;
86
87 async fn unschedule_job<J>(&self, job_id: Xid) -> Result<J::Payload, QueueError>
90 where
91 J: JobProcessor + 'static,
92 J::Payload: Decode;
93}
94
95#[derive(Error, Debug)]
97#[non_exhaustive]
98pub enum QueueError {
99 #[error("Failed to serialize job context")]
101 EncodeError {
102 #[from]
103 source: bincode::error::EncodeError,
104 },
105 #[error("Failed to deserialize job context")]
107 DecodeError {
108 #[from]
109 source: bincode::error::DecodeError,
110 },
111 #[error("Interval must be greater than zero: {0:?}")]
112 InvalidInterval(Duration),
113 #[error("Job by that ID does not exist: {0}")]
114 JobNotFound(Xid),
115 #[error(transparent)]
116 Other(#[from] anyhow::Error),
117}