use crate::{datetime::DateTime, BoxFuture, Map, Uuid};
use chrono::Local;
use cron::Schedule;
use std::{str::FromStr, time::Duration};
pub type CronJob = fn(id: Uuid, data: &mut Map, last_tick: DateTime);
pub type AsyncCronJob =
for<'a> fn(id: Uuid, data: &'a mut Map, last_tick: DateTime) -> BoxFuture<'a>;
enum ExecutableJob {
Fn(CronJob),
AsyncFn(AsyncCronJob),
}
pub struct Job {
id: Uuid,
data: Map,
schedule: Schedule,
run: ExecutableJob,
last_tick: Option<chrono::DateTime<Local>>,
}
impl Job {
#[inline]
pub fn new(cron_expr: &str, exec: CronJob) -> Self {
let schedule = Schedule::from_str(cron_expr)
.unwrap_or_else(|err| panic!("invalid cron expression `{cron_expr}`: {err}"));
Job {
id: Uuid::new_v4(),
data: Map::new(),
schedule,
run: ExecutableJob::Fn(exec),
last_tick: None,
}
}
#[inline]
pub fn new_async(cron_expr: &str, exec: AsyncCronJob) -> Self {
let schedule = Schedule::from_str(cron_expr)
.unwrap_or_else(|err| panic!("invalid cron expression `{cron_expr}`: {err}"));
Job {
id: Uuid::new_v4(),
data: Map::new(),
schedule,
run: ExecutableJob::AsyncFn(exec),
last_tick: None,
}
}
#[inline]
pub fn id(&self) -> Uuid {
self.id
}
#[inline]
pub fn data(&self) -> &Map {
&self.data
}
#[inline]
pub fn data_mut(&mut self) -> &mut Map {
&mut self.data
}
#[inline]
pub fn set_last_tick(&mut self, last_tick: Option<DateTime>) {
self.last_tick = last_tick.map(|dt| dt.into());
}
pub fn tick(&mut self) {
let now = Local::now();
if let Some(last_tick) = self.last_tick {
for event in self.schedule.after(&last_tick) {
if event > now {
break;
}
match self.run {
ExecutableJob::Fn(exec) => exec(self.id, &mut self.data, last_tick.into()),
ExecutableJob::AsyncFn(_) => tracing::warn!("job `{}` is async", self.id),
}
}
}
self.last_tick = Some(now);
}
pub async fn tick_async(&mut self) {
let now = Local::now();
if let Some(last_tick) = self.last_tick {
for event in self.schedule.after(&last_tick) {
if event > now {
break;
}
match self.run {
ExecutableJob::Fn(_) => tracing::warn!("job `{}` is not async", self.id),
ExecutableJob::AsyncFn(exec) => {
exec(self.id, &mut self.data, last_tick.into()).await
}
}
}
}
self.last_tick = Some(now);
}
}
#[derive(Default)]
pub struct JobScheduler {
jobs: Vec<Job>,
}
impl JobScheduler {
#[inline]
pub fn new() -> Self {
Self { jobs: Vec::new() }
}
pub fn add(&mut self, job: Job) -> Uuid {
let job_id = job.id;
self.jobs.push(job);
job_id
}
pub fn remove(&mut self, job_id: Uuid) -> bool {
let position = self.jobs.iter().position(|job| job.id == job_id);
if let Some(index) = position {
self.jobs.remove(index);
true
} else {
false
}
}
pub fn tick(&mut self) {
for job in &mut self.jobs {
job.tick();
}
}
pub async fn tick_async(&mut self) {
for job in &mut self.jobs {
job.tick_async().await;
}
}
pub fn time_till_next_job(&self) -> Duration {
if self.jobs.is_empty() {
Duration::from_millis(500)
} else {
let mut duration = chrono::Duration::zero();
let now = Local::now();
for job in self.jobs.iter() {
for event in job.schedule.after(&now).take(1) {
let interval = event - now;
if duration.is_zero() || interval < duration {
duration = interval;
}
}
}
duration
.to_std()
.unwrap_or_else(|_| Duration::from_millis(500))
}
}
}