use crate::{Clock, Job, Result, Tag, Timekeeper};
use jiff::{SpanRound, Unit, Zoned};
use tracing::debug;
#[derive(Debug, Default)]
pub struct Scheduler {
jobs: Vec<Job>,
clock: Clock,
}
impl Scheduler {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[cfg(test)]
fn with_mock_time(clock: crate::time::mock::Mock) -> Self {
Self {
clock: Clock::Mock(clock),
..Default::default()
}
}
pub(crate) fn add_job(&mut self, job: Job) {
self.jobs.push(job);
}
pub fn run_pending(&mut self) -> Result<()> {
self.jobs.sort();
let mut to_remove = Vec::new();
let now = self.now();
for (idx, job) in self.jobs.iter_mut().enumerate() {
if job.should_run(&now) {
let keep_going = job.execute(&now)?;
if !keep_going {
debug!("Cancelling job {job}");
to_remove.push(idx);
}
}
}
to_remove.sort_unstable();
to_remove.reverse();
for &idx in &to_remove {
self.jobs.remove(idx);
}
Ok(())
}
pub fn run_all(&mut self, delay_seconds: u64) {
let num_jobs = self.jobs.len();
debug!("Running all {num_jobs} jobs with {delay_seconds}s delay");
let now = self.now();
for job in &mut self.jobs {
if let Err(e) = job.execute(&now) {
eprintln!("Error: {e}");
}
std::thread::sleep(std::time::Duration::from_secs(delay_seconds));
}
}
#[must_use]
pub fn get_jobs(&self, tag: Option<Tag>) -> Vec<&Job> {
if let Some(t) = tag {
self.jobs
.iter()
.filter(|el| el.has_tag(&t))
.collect::<Vec<&Job>>()
} else {
self.jobs.iter().collect::<Vec<&Job>>()
}
}
pub fn clear(&mut self, tag: Option<Tag>) {
if let Some(t) = tag {
debug!("Deleting all jobs tagged {t}");
self.jobs.retain(|el| !el.has_tag(&t));
} else {
debug!("Deleting ALL jobs!!");
drop(self.jobs.drain(..));
}
}
#[must_use]
pub fn next_run(&self) -> Option<Zoned> {
if self.jobs.is_empty() {
None
} else {
self.jobs.iter().min().unwrap().next_run.clone()
}
}
#[must_use]
pub fn idle_seconds(&self) -> Option<i64> {
println!("now: {}", self.now());
println!("next_run: {}", self.next_run().unwrap_or_default());
Some(
self.now()
.until(&self.next_run()?)
.unwrap()
.round(SpanRound::new().largest(Unit::Second))
.unwrap()
.get_seconds(),
)
}
#[cfg(test)]
fn most_recent_job(&self) -> Option<&Job> {
if self.jobs.is_empty() {
return None;
}
Some(&self.jobs[self.jobs.len() - 1])
}
}
impl Timekeeper for Scheduler {
fn now(&self) -> Zoned {
self.clock.now()
}
#[cfg(test)]
fn add_duration(&mut self, duration: impl Into<jiff::ZonedArithmetic>) {
self.clock.add_duration(duration)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
use crate::{
error::Result,
every, every_single,
time::mock::{Mock, START},
};
use jiff::{civil, ToSpan};
use pretty_assertions::assert_eq;
fn setup() -> Scheduler {
let clock = Mock::default();
let scheduler = Scheduler::with_mock_time(clock);
scheduler
}
fn job() {}
#[test]
fn test_two_jobs() -> Result<()> {
let mut scheduler = setup();
assert_eq!(scheduler.idle_seconds(), None);
every(17).seconds()?.run(&mut scheduler, job)?;
assert_eq!(scheduler.idle_seconds(), Some(17));
every_single().minute()?.run(&mut scheduler, job)?;
assert_eq!(scheduler.idle_seconds(), Some(17));
assert_eq!(
scheduler.next_run(),
Some(START.checked_add(17.seconds()).unwrap())
);
scheduler.add_duration(17.seconds());
scheduler.run_pending()?;
println!("after one: {}", scheduler.now());
assert_eq!(
scheduler.next_run(),
Some(START.checked_add((17 * 2).seconds()).unwrap())
);
scheduler.add_duration(17.seconds());
scheduler.run_pending()?;
assert_eq!(
scheduler.next_run(),
Some(START.checked_add((17 * 3).seconds()).unwrap())
);
scheduler.add_duration(17.seconds());
scheduler.run_pending()?;
assert_eq!(scheduler.idle_seconds(), Some(9));
assert_eq!(
scheduler.next_run(),
Some(START.checked_add(1.minutes()).unwrap())
);
scheduler.add_duration(9.seconds());
scheduler.run_pending()?;
assert_eq!(scheduler.idle_seconds(), Some(8));
assert_eq!(
scheduler.next_run(),
Some(START.checked_add((17 * 4).seconds()).unwrap())
);
Ok(())
}
#[test]
fn test_time_range() -> Result<()> {
let mut scheduler = setup();
let num_jobs = 100;
let mut minutes = HashSet::with_capacity(num_jobs);
for _ in 0..num_jobs {
every(5).to(30)?.minutes()?.run(&mut scheduler, job)?;
minutes.insert(
scheduler
.most_recent_job()
.unwrap()
.next_run
.as_ref()
.unwrap()
.minute(),
);
}
assert!(minutes.len() > 1);
assert!(minutes.iter().min().unwrap() >= &5);
assert!(minutes.iter().max().unwrap() <= &30);
Ok(())
}
#[test]
fn test_at_time() -> Result<()> {
let mut scheduler = setup();
every_single()
.day()?
.at("10:30:50")?
.run(&mut scheduler, job)?;
assert_eq!(
scheduler
.most_recent_job()
.unwrap()
.next_run
.as_ref()
.unwrap()
.hour(),
10
);
assert_eq!(
scheduler
.most_recent_job()
.unwrap()
.next_run
.as_ref()
.unwrap()
.minute(),
30
);
assert_eq!(
scheduler
.most_recent_job()
.unwrap()
.next_run
.as_ref()
.unwrap()
.second(),
50
);
Ok(())
}
#[test]
fn test_clear_scheduler() -> Result<()> {
let mut scheduler = setup();
every_single().day()?.run(&mut scheduler, job)?;
every_single().minute()?.run(&mut scheduler, job)?;
assert_eq!(scheduler.jobs.len(), 2);
scheduler.clear(None);
assert_eq!(scheduler.jobs.len(), 0);
Ok(())
}
#[test]
fn test_until_time() -> Result<()> {
let mut scheduler = setup();
let deadline = civil::date(3000, 1, 1)
.at(12, 0, 0, 0)
.intz("America/New_York")
.unwrap();
every_single()
.day()?
.until(deadline.clone())?
.run(&mut scheduler, job)?;
assert_eq!(
scheduler
.most_recent_job()
.unwrap()
.cancel_after
.clone()
.unwrap(),
deadline
);
scheduler.clear(None);
let deadline = civil::date(2024, 1, 1)
.at(7, 0, 10, 0)
.intz("America/New_York")
.unwrap();
every(5)
.seconds()?
.until(deadline)?
.run(&mut scheduler, job)?;
assert_eq!(scheduler.most_recent_job().unwrap().call_count, 0);
scheduler.add_duration(5.seconds());
scheduler.run_pending()?;
assert_eq!(scheduler.most_recent_job().unwrap().call_count, 1);
assert_eq!(scheduler.jobs.len(), 1);
scheduler.add_duration(5.seconds());
scheduler.run_pending()?;
assert_eq!(scheduler.jobs.len(), 1);
assert_eq!(scheduler.most_recent_job().unwrap().call_count, 2);
scheduler.add_duration(5.seconds());
scheduler.run_pending()?;
assert_eq!(scheduler.jobs.len(), 0);
scheduler.clear(None);
let deadline = START.clone();
every(5)
.seconds()?
.until(deadline)?
.run(&mut scheduler, job)?;
scheduler.add_duration(5.seconds());
scheduler.run_pending()?;
assert_eq!(scheduler.jobs.len(), 0);
Ok(())
}
#[test]
fn test_weekday_at_time() -> Result<()> {
let mut scheduler = setup();
every_single()
.wednesday()?
.at("22:38:10")?
.run(&mut scheduler, job)?;
let j = scheduler.most_recent_job().unwrap();
assert_eq!(j.next_run.as_ref().unwrap().year(), 2024);
assert_eq!(j.next_run.as_ref().unwrap().month(), 1);
assert_eq!(j.next_run.as_ref().unwrap().day(), 3);
assert_eq!(j.next_run.as_ref().unwrap().hour(), 22);
assert_eq!(j.next_run.as_ref().unwrap().minute(), 38);
assert_eq!(j.next_run.as_ref().unwrap().second(), 10);
scheduler.clear(None);
every_single()
.wednesday()?
.at("22:39")?
.run(&mut scheduler, job)?;
let j = scheduler.most_recent_job().unwrap();
assert_eq!(j.next_run.as_ref().unwrap().year(), 2024);
assert_eq!(j.next_run.as_ref().unwrap().month(), 1);
assert_eq!(j.next_run.as_ref().unwrap().day(), 3);
assert_eq!(j.next_run.as_ref().unwrap().hour(), 22);
assert_eq!(j.next_run.as_ref().unwrap().minute(), 39);
assert_eq!(j.next_run.as_ref().unwrap().second(), 0);
Ok(())
}
}