#![cfg(feature = "queue-cron")]
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use cron::Schedule;
use super::backend::PushOptions;
use super::backend::QueueBackend;
pub struct CronScheduler {
schedule: Schedule,
queue: String,
payload: Arc<Vec<u8>>,
backend: Arc<dyn QueueBackend>,
}
impl CronScheduler {
pub fn new(
expression: &str,
queue: impl Into<String>,
payload: Vec<u8>,
backend: Arc<dyn QueueBackend>,
) -> Result<Self, cron::error::Error> {
let schedule = Schedule::from_str(expression)?;
Ok(Self {
schedule,
queue: queue.into(),
payload: Arc::new(payload),
backend,
})
}
#[cfg(not(feature = "compio"))]
pub async fn run(self) {
loop {
let Some(next) = self.schedule.upcoming(Utc).next() else {
return;
};
let now = Utc::now();
let wait = (next - now).to_std().unwrap_or(Duration::from_secs(0));
let deadline = tokio::time::Instant::now() + wait;
tokio::time::sleep_until(deadline).await;
let _ = self
.backend
.push(&self.queue, self.payload.as_slice(), PushOptions::default())
.await;
}
}
#[cfg(feature = "compio")]
pub async fn run(self) {
loop {
let Some(next) = self.schedule.upcoming(Utc).next() else {
return;
};
let now = Utc::now();
let wait = (next - now).to_std().unwrap_or(Duration::from_secs(0));
let deadline = std::time::Instant::now() + wait;
compio::time::sleep_until(deadline).await;
let _ = self
.backend
.push(&self.queue, self.payload.as_slice(), PushOptions::default())
.await;
}
}
}