use std::time::{SystemTime, Duration};
use chrono::Utc;
use crate::error::Error as JobSchedulerError;
use crate::job::{JobBuilder, JobExecutor};
use crate::scheduler::types::{Schedule, ScheduleType, RecurringInterval};
pub trait SchedulerRunner {
fn add_job(&mut self, job: JobBuilder) -> Result<(), JobSchedulerError>;
fn run_pending(&mut self) -> Result<(), JobSchedulerError>;
fn next_run(&self) -> Option<SystemTime>;
fn list_all_jobs(&self) -> Vec<&JobBuilder>;
}
pub struct Scheduler {
jobs: Vec<JobBuilder>,
}
impl Scheduler {
pub fn new() -> Self {
Self { jobs: Vec::new() }
}
pub fn add_job(&mut self, job: JobBuilder) -> Result<(), JobSchedulerError> {
if job.schedules.is_empty() {
return Err(JobSchedulerError::MissingSchedule);
}
if job.handler.is_none() {
return Err(JobSchedulerError::HandlerNotBuilt);
}
self.jobs.push(job);
Ok(())
}
pub fn run_pending(&mut self) -> Result<(), JobSchedulerError> {
let now = SystemTime::now();
for job in self.jobs.iter_mut() {
if let Some(next) = job.next_run {
if next <= now {
job.run()?;
job.last_run = Some(now);
for sched in job.schedules.iter_mut() {
if let Some(rn) = Self::peek_next_run(sched) {
if rn <= now {
sched.run_count += 1;
Self::compute_next_run(sched);
}
}
}
job.next_run = job.schedules.iter()
.filter_map(|s| Self::peek_next_run(s))
.min();
}
}
}
Ok(())
}
pub fn next_run(&self) -> Option<SystemTime> {
self.jobs.iter().filter_map(|job| job.next_run).min()
}
pub fn list_all_jobs(&self) -> Vec<&JobBuilder> {
let mut job_refs: Vec<&JobBuilder> = self.jobs.iter().collect();
job_refs.sort_by(|a, b| match (a.next_run, b.next_run) {
(Some(a_time), Some(b_time)) => a_time.cmp(&b_time),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => std::cmp::Ordering::Equal,
});
job_refs
}
fn compute_next_run(schedule: &mut Schedule) -> Option<SystemTime> {
if let Some(max_runs) = schedule.max_runs {
if schedule.run_count >= max_runs {
return None;
}
}
match &mut schedule.schedule_type {
ScheduleType::Once(_time) => None, ScheduleType::Random(_) => None, ScheduleType::Recurring(recurring) => {
let delta = match &recurring.interval {
RecurringInterval::Secondly(secs) => {
Duration::from_secs(*secs as u64)
},
RecurringInterval::Minutely(mins) => {
Duration::from_secs(60 * *mins as u64)
},
RecurringInterval::Hourly(hours) => {
Duration::from_secs(3600 * *hours as u64)
},
RecurringInterval::Daily(days) => {
Duration::from_secs(86400 * *days as u64)
},
RecurringInterval::Weekly(weeks) => {
Duration::from_secs(7 * 86400 * *weeks as u64)
},
RecurringInterval::Monthly(months) => {
Duration::from_secs(30 * 86400 * *months as u64)
},
RecurringInterval::Custom { expression, frequency } => {
let days = match expression.as_str() {
"daily" => 1,
"weekly" => 7,
"monthly" => 30,
_ => *frequency,
};
Duration::from_secs(days as u64 * 86400)
},
};
let next = recurring.next_run + delta;
recurring.next_run = next;
Some(next)
}
ScheduleType::Cron(cron_schedule) => {
cron_schedule.upcoming(Utc).next().map(|dt| dt.into())
}
}
}
fn peek_next_run(schedule: &Schedule) -> Option<SystemTime> {
if let Some(max) = schedule.max_runs {
if schedule.run_count >= max {
return None;
}
}
match &schedule.schedule_type {
ScheduleType::Once(_) => None,
ScheduleType::Random(_) => None,
ScheduleType::Recurring(rec) => Some(rec.next_run),
ScheduleType::Cron(cron_schedule) => cron_schedule.upcoming(Utc).next().map(|dt| dt.into()),
}
}
}
#[cfg(test)]
mod tests {
use crate::scheduler::types::{RecurringSchedule, RecurringInterval};
use crate::utils::time::ScheduleTime;
use super::*;
use std::thread::sleep;
fn dummy_handler() -> anyhow::Result<()> {
Ok(())
}
#[test]
fn test_new_scheduler_empty() {
let scheduler = Scheduler::new();
assert_eq!(scheduler.jobs.len(), 0);
assert_eq!(scheduler.next_run(), None);
assert_eq!(scheduler.list_all_jobs().len(), 0);
}
#[test]
fn test_add_job() -> Result<(), JobSchedulerError> {
let mut scheduler = Scheduler::new();
let job = JobBuilder::new("test-job")
.once(ScheduleTime::At(SystemTime::now() + Duration::from_secs(60)))
.add_handler(dummy_handler)
.build();
scheduler.add_job(job)?;
assert_eq!(scheduler.jobs.len(), 1);
assert_eq!(scheduler.list_all_jobs().len(), 1);
let job_ref = scheduler.list_all_jobs()[0];
assert_eq!(job_ref.name, Some("test-job".to_string()));
Ok(())
}
#[test]
fn test_add_job_no_schedule() {
let mut scheduler = Scheduler::new();
let job = JobBuilder::new("no-schedule")
.add_handler(dummy_handler)
.build();
let result = scheduler.add_job(job);
assert!(result.is_err());
match result {
Err(JobSchedulerError::MissingSchedule) => {},
_ => panic!("Expected MissingSchedule error"),
}
}
#[test]
fn test_add_job_no_handler() {
let mut scheduler = Scheduler::new();
let job = JobBuilder::new("no-handler")
.once(ScheduleTime::At(SystemTime::now() + Duration::from_secs(60)))
.build();
let result = scheduler.add_job(job);
assert!(result.is_err());
match result {
Err(JobSchedulerError::HandlerNotBuilt) => {},
_ => panic!("Expected HandlerNotBuilt error"),
}
}
#[test]
fn test_next_run_single_job() -> Result<(), JobSchedulerError> {
let mut scheduler = Scheduler::new();
let target_time = SystemTime::now() + Duration::from_secs(60);
let job = JobBuilder::new("test-job")
.once(ScheduleTime::At(target_time))
.add_handler(dummy_handler)
.build();
scheduler.add_job(job)?;
let next = scheduler.next_run();
assert!(next.is_some());
let diff = if target_time > next.unwrap() {
target_time.duration_since(next.unwrap())
} else {
next.unwrap().duration_since(target_time)
};
assert!(diff.unwrap_or_default() < Duration::from_millis(10));
Ok(())
}
#[test]
fn test_next_run_multiple_jobs() -> Result<(), JobSchedulerError> {
let mut scheduler = Scheduler::new();
let time1 = SystemTime::now() + Duration::from_secs(60);
let time2 = SystemTime::now() + Duration::from_secs(30);
let job1 = JobBuilder::new("job1")
.once(ScheduleTime::At(time1))
.add_handler(dummy_handler)
.build();
let job2 = JobBuilder::new("job2")
.once(ScheduleTime::At(time2))
.add_handler(dummy_handler)
.build();
scheduler.add_job(job1)?;
scheduler.add_job(job2)?;
let next = scheduler.next_run();
assert!(next.is_some());
let diff = if time2 > next.unwrap() {
time2.duration_since(next.unwrap())
} else {
next.unwrap().duration_since(time2)
};
assert!(diff.unwrap_or_default() < Duration::from_millis(10));
Ok(())
}
#[test]
fn test_run_pending_job() -> Result<(), JobSchedulerError> {
let mut scheduler = Scheduler::new();
let job = JobBuilder::new("immediate")
.once(ScheduleTime::At(SystemTime::now()))
.add_handler(dummy_handler)
.build();
scheduler.add_job(job)?;
assert_eq!(scheduler.jobs.len(), 1);
scheduler.run_pending()?;
assert_eq!(scheduler.jobs.len(), 1);
assert!(scheduler.jobs[0].last_run.is_some());
assert!(scheduler.jobs[0].next_run.is_none());
Ok(())
}
#[test]
fn test_run_recurring_jobs() -> Result<(), JobSchedulerError> {
let mut scheduler = Scheduler::new();
let recur_time = SystemTime::now();
let job = JobBuilder::new("recurring")
.recurring(RecurringInterval::Secondly(1), Some(ScheduleTime::At(recur_time)))
.add_handler(dummy_handler)
.build();
scheduler.add_job(job)?;
assert_eq!(scheduler.jobs.len(), 1);
assert!(scheduler.jobs[0].next_run.is_some());
scheduler.run_pending()?;
let last_run = scheduler.jobs[0].last_run;
assert!(last_run.is_some());
let next_run = scheduler.jobs[0].next_run;
assert!(next_run.is_some());
let expected_next = recur_time + Duration::from_secs(1);
let diff = if expected_next > next_run.unwrap() {
expected_next.duration_since(next_run.unwrap())
} else {
next_run.unwrap().duration_since(expected_next)
};
assert!(diff.unwrap_or_default() < Duration::from_millis(100));
Ok(())
}
#[test]
fn test_run_job_with_max_runs() -> Result<(), JobSchedulerError> {
let mut scheduler = Scheduler::new();
let recur_time = SystemTime::now();
let job = JobBuilder::new("limited-runs")
.recurring(RecurringInterval::Secondly(1), Some(ScheduleTime::At(recur_time)))
.repeat(2)
.add_handler(dummy_handler)
.build();
scheduler.add_job(job)?;
scheduler.run_pending()?;
assert!(scheduler.jobs[0].next_run.is_some());
sleep(Duration::from_secs(1));
scheduler.run_pending()?;
sleep(Duration::from_secs(1));
scheduler.run_pending()?;
assert!(scheduler.jobs[0].next_run.is_none());
Ok(())
}
#[test]
fn test_list_all_jobs() -> Result<(), JobSchedulerError> {
let mut scheduler = Scheduler::new();
let job1 = JobBuilder::new("job1")
.once(ScheduleTime::At(SystemTime::now() + Duration::from_secs(60)))
.add_handler(dummy_handler)
.build();
let job2 = JobBuilder::new("job2")
.once(ScheduleTime::At(SystemTime::now() + Duration::from_secs(30)))
.add_handler(dummy_handler)
.build();
let job3 = JobBuilder::new("job3")
.once(ScheduleTime::At(SystemTime::now() + Duration::from_secs(90)))
.add_handler(dummy_handler)
.build();
scheduler.add_job(job1)?;
scheduler.add_job(job2)?;
scheduler.add_job(job3)?;
let all_jobs = scheduler.list_all_jobs();
assert_eq!(all_jobs.len(), 3);
assert_eq!(all_jobs[0].name, Some("job2".to_string()));
assert_eq!(all_jobs[1].name, Some("job1".to_string()));
assert_eq!(all_jobs[2].name, Some("job3".to_string()));
Ok(())
}
#[test]
fn test_compute_next_run_recurring_intervals() {
let now = SystemTime::now();
let mut secondly_sched = Schedule {
schedule_type: ScheduleType::Recurring(RecurringSchedule {
interval: RecurringInterval::Secondly(5),
next_run: now,
}),
max_runs: None,
run_count: 0,
};
let next_secondly = Scheduler::compute_next_run(&mut secondly_sched).unwrap();
assert_eq!(next_secondly, now + Duration::from_secs(5));
let mut hourly_sched = Schedule {
schedule_type: ScheduleType::Recurring(RecurringSchedule {
interval: RecurringInterval::Hourly(2),
next_run: now,
}),
max_runs: None,
run_count: 0,
};
let next_hourly = Scheduler::compute_next_run(&mut hourly_sched).unwrap();
assert_eq!(next_hourly, now + Duration::from_secs(2 * 3600));
let mut daily_sched = Schedule {
schedule_type: ScheduleType::Recurring(RecurringSchedule {
interval: RecurringInterval::Daily(1),
next_run: now,
}),
max_runs: None,
run_count: 0,
};
let next_daily = Scheduler::compute_next_run(&mut daily_sched).unwrap();
assert_eq!(next_daily, now + Duration::from_secs(86400));
let mut custom_sched = Schedule {
schedule_type: ScheduleType::Recurring(RecurringSchedule {
interval: RecurringInterval::Custom {
expression: "weekly".to_string(),
frequency: 1
},
next_run: now,
}),
max_runs: None,
run_count: 0,
};
let next_custom = Scheduler::compute_next_run(&mut custom_sched).unwrap();
assert_eq!(next_custom, now + Duration::from_secs(7 * 86400));
}
#[test]
fn test_max_runs_limit() {
let now = SystemTime::now();
let mut sched = Schedule {
schedule_type: ScheduleType::Recurring(RecurringSchedule {
interval: RecurringInterval::Secondly(1),
next_run: now,
}),
max_runs: Some(3),
run_count: 3, };
let next_run = Scheduler::compute_next_run(&mut sched);
assert!(next_run.is_none());
}
#[test]
fn test_peek_next_run() {
let now = SystemTime::now();
let recurring_sched = Schedule {
schedule_type: ScheduleType::Recurring(RecurringSchedule {
interval: RecurringInterval::Secondly(1),
next_run: now + Duration::from_secs(5),
}),
max_runs: None,
run_count: 0,
};
let peeked = Scheduler::peek_next_run(&recurring_sched);
assert_eq!(peeked.unwrap(), now + Duration::from_secs(5));
let once_sched = Schedule {
schedule_type: ScheduleType::Once(now),
max_runs: Some(1),
run_count: 0,
};
let peeked_once = Scheduler::peek_next_run(&once_sched);
assert!(peeked_once.is_none());
}
#[test]
fn test_cron_schedule() -> Result<(), JobSchedulerError> {
let mut scheduler = Scheduler::new();
let cron_str = "0 0 * * * *";
let job = JobBuilder::new("cron-job")
.cron(cron_str)
.add_handler(dummy_handler)
.build();
scheduler.add_job(job)?;
assert!(scheduler.next_run().is_some());
Ok(())
}
#[test]
fn test_random_schedule() -> Result<(), JobSchedulerError> {
let mut scheduler = Scheduler::new();
let start = SystemTime::now() + Duration::from_secs(1);
let end = SystemTime::now() + Duration::from_secs(5);
let job = JobBuilder::new("random-job")
.random(ScheduleTime::At(start), ScheduleTime::At(end))
.add_handler(dummy_handler)
.build();
scheduler.add_job(job)?;
assert_eq!(scheduler.jobs.len(), 1);
assert!(scheduler.jobs[0].next_run.is_some());
let next_run = scheduler.jobs[0].next_run.unwrap();
assert!(next_run >= start && next_run <= end);
Ok(())
}
}