use super::{AsyncScheduler, DEFAULT_TICK_INTERVAL, JobContext};
use crate::{BoxFuture, Uuid, datetime::DateTime, extension::TomlTableExt};
use chrono::Local;
use cron::Schedule;
use std::{io, str::FromStr, time::Duration};
use toml::Table;
pub type AsyncCronJob = for<'a> fn(ctx: &'a mut JobContext) -> BoxFuture<'a>;
pub struct AsyncJob {
context: JobContext,
schedule: Schedule,
run: AsyncCronJob,
}
impl AsyncJob {
#[inline]
pub fn new(cron_expr: &str, exec: AsyncCronJob) -> Self {
let schedule = Schedule::from_str(cron_expr)
.unwrap_or_else(|err| panic!("invalid cron expression `{cron_expr}`: {err}"));
let mut context = JobContext::new();
context.set_source(cron_expr);
Self {
context,
schedule,
run: exec,
}
}
pub fn with_config(config: &Table, exec: AsyncCronJob) -> Self {
let cron_expr = config.get_str("cron").unwrap_or_default();
let schedule = Schedule::from_str(cron_expr)
.unwrap_or_else(|err| panic!("invalid cron expression `{cron_expr}`: {err}"));
let mut context = JobContext::new();
if let Some(disabled) = config.get_bool("disable") {
context.set_disabled_status(disabled);
}
if let Some(immediate) = config.get_bool("immediate") {
context.set_immediate_mode(immediate);
}
if let Some(ticks) = config
.get_bool("once")
.and_then(|b| b.then_some(1))
.or_else(|| config.get_usize("max-ticks"))
{
context.set_remaining_ticks(ticks);
}
Self {
context,
schedule,
run: exec,
}
}
#[inline]
pub fn name(mut self, name: &'static str) -> Self {
self.context.set_name(name);
self
}
#[inline]
pub fn data<T: Send + 'static>(mut self, data: T) -> Self {
self.context.set_data(data);
self
}
#[inline]
pub fn max_ticks(mut self, ticks: usize) -> Self {
self.context.set_remaining_ticks(ticks);
self
}
#[inline]
pub fn once(mut self) -> Self {
self.context.set_remaining_ticks(1);
self
}
#[inline]
pub fn disable(mut self, disabled: bool) -> Self {
self.context.set_disabled_status(disabled);
self
}
#[inline]
pub fn immediate(mut self, immediate: bool) -> Self {
self.context.set_immediate_mode(immediate);
self
}
#[inline]
pub fn pause(&mut self) {
self.context.set_disabled_status(true);
}
#[inline]
pub fn resume(&mut self) {
self.context.set_disabled_status(false);
}
pub async fn tick(&mut self) {
let now = Local::now();
let upcoming = self.upcoming();
let ctx = &mut self.context;
let run = self.run;
if ctx.is_immediate() && !ctx.is_disabled() && !ctx.is_fused() {
ctx.start();
ctx.set_next_tick(upcoming);
run(ctx).await;
ctx.finish();
} else if let Some(last_tick) = ctx.last_tick().map(|dt| dt.into()) {
for event in self.schedule.after(&last_tick) {
if event > now || ctx.is_fused() {
break;
}
if !ctx.is_disabled() {
ctx.start();
ctx.set_next_tick(upcoming);
run(ctx).await;
ctx.finish();
}
}
} else {
ctx.set_last_tick(now.into());
}
}
pub async fn execute(&mut self) {
let upcoming = self.upcoming();
let ctx = &mut self.context;
let run = self.run;
ctx.start();
ctx.set_next_tick(upcoming);
run(ctx).await;
ctx.finish();
}
#[inline]
pub fn context(&self) -> &JobContext {
&self.context
}
#[inline]
pub fn context_mut(&mut self) -> &mut JobContext {
&mut self.context
}
#[inline]
pub fn upcoming(&self) -> Option<DateTime> {
self.schedule.upcoming(Local).next().map(|dt| dt.into())
}
}
#[derive(Default)]
pub struct AsyncJobScheduler {
jobs: Vec<AsyncJob>,
}
impl AsyncJobScheduler {
#[inline]
pub fn new() -> Self {
Self { jobs: Vec::new() }
}
#[inline]
pub fn add(&mut self, job: AsyncJob) -> Uuid {
let job_id = job.context().job_id();
self.jobs.push(job);
job_id
}
pub fn remove(&mut self, job_id: Uuid) -> bool {
let position = self
.jobs
.iter()
.position(|job| job.context().job_id() == job_id);
if let Some(index) = position {
self.jobs.remove(index);
true
} else {
false
}
}
#[inline]
pub fn get(&self, job_id: Uuid) -> Option<&AsyncJob> {
self.jobs
.iter()
.find(|job| job.context().job_id() == job_id)
}
#[inline]
pub fn get_mut(&mut self, job_id: Uuid) -> Option<&mut AsyncJob> {
self.jobs
.iter_mut()
.find(|job| job.context().job_id() == job_id)
}
pub fn time_till_next_job(&self) -> Duration {
if self.jobs.is_empty() {
DEFAULT_TICK_INTERVAL
} else {
let mut duration = Duration::ZERO;
let now = Local::now();
for job in self.jobs.iter() {
if let Some(interval) = job
.context()
.next_tick()
.and_then(|dt| dt.span_after_now())
.filter(|interval| duration.is_zero() || interval < &duration)
{
duration = interval;
}
for event in job.schedule.after(&now).take(1) {
let interval = event - now;
if let Ok(interval) = interval.to_std()
&& (duration.is_zero() || interval < duration)
{
duration = interval;
}
}
}
duration.max(DEFAULT_TICK_INTERVAL)
}
}
pub async fn tick(&mut self) {
let mut fused_jobs = Vec::new();
for job in &mut self.jobs {
job.tick().await;
let ctx = job.context();
if ctx.is_fused() {
fused_jobs.push(ctx.job_id());
}
}
for job_id in fused_jobs {
self.remove(job_id);
}
}
#[inline]
pub async fn execute(&mut self) {
for job in &mut self.jobs {
job.execute().await;
}
}
}
impl AsyncScheduler for AsyncJobScheduler {
#[inline]
fn is_ready(&self) -> bool {
!self.jobs.is_empty()
}
#[inline]
fn is_blocking(&self) -> bool {
false
}
#[inline]
fn time_till_next_job(&self) -> Option<Duration> {
Some(self.time_till_next_job())
}
#[inline]
async fn tick(&mut self) {
self.tick().await;
}
#[inline]
async fn run(self) -> io::Result<()> {
Ok(())
}
}