use async_trait::async_trait;
use bincode::Encode;
use chrono::Utc;
use thiserror::Error;
use crate::core::job_handle::JobHandle;
use crate::core::job_processor::JobProcessor;
use crate::core::{DateTime, Duration, Xid};
#[async_trait]
pub trait Queue: Send + Sync {
type JobHandle: JobHandle;
async fn schedule_at<J>(
&self,
payload: J::Payload,
scheduled_at: DateTime,
) -> Result<Xid, QueueError>
where
J: JobProcessor + 'static,
J::Payload: Encode;
async fn schedule<J>(&self, payload: J::Payload) -> Result<Xid, QueueError>
where
J: JobProcessor + 'static,
J::Payload: Encode,
{
self.schedule_at::<J>(payload, Utc::now()).await
}
async fn schedule_in<J>(
&self,
payload: J::Payload,
scheduled_in: Duration,
) -> Result<Xid, QueueError>
where
J: JobProcessor + 'static,
J::Payload: Encode,
{
let when = Utc::now() + scheduled_in;
self.schedule_at::<J>(payload, when).await
}
async fn poll_next_with_instant(
&self,
job_types: &[&str],
time: DateTime,
) -> Result<Option<Self::JobHandle>, QueueError>;
async fn poll_next(&self, job_types: &[&str]) -> Result<Option<Self::JobHandle>, QueueError> {
self.poll_next_with_instant(job_types, Utc::now()).await
}
async fn next(
&self,
job_types: &[&str],
interval: Duration,
) -> Result<Self::JobHandle, QueueError> {
let duration = interval
.to_std()
.map_err(|_| QueueError::InvalidInterval(interval))?;
let mut interval = tokio::time::interval(duration);
loop {
interval.tick().await;
let job = self.poll_next(job_types).await?;
if let Some(job) = job {
break Ok(job);
}
}
}
}
#[derive(Error, Debug)]
pub enum QueueError {
#[error("Failed to serialize job context")]
EncodeError {
#[from]
source: bincode::error::EncodeError,
},
#[error("Interval must be more than zero: {0:?}")]
InvalidInterval(Duration),
#[error(transparent)]
Other(#[from] anyhow::Error),
}