use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, SystemTime};
use super::{Job, Trigger};
pub struct JobStore {
signal: Arc<Condvar>,
#[allow(dead_code)]
data: Arc<Mutex<BTreeSet<TriggerWrapper>>>,
}
impl JobStore {
pub fn new() -> Self {
Self {
signal: Arc::new(Default::default()),
data: Arc::new(Mutex::new(Default::default())),
}
}
pub fn next_job(&self, timeout: Duration) -> Option<Arc<Job>> {
let guard = self.data.lock().unwrap();
let timeout = guard
.first()
.map(|wrapper| {
wrapper
.next_fire()
.duration_since(SystemTime::now())
.unwrap_or_default()
})
.unwrap_or(timeout)
.min(timeout);
let (mut guard, _) = self.signal.wait_timeout(guard, timeout).unwrap();
if !guard.is_empty() && guard.first().unwrap().next_fire() <= SystemTime::now() + Duration::from_micros(10) {
guard.pop_first().map(|wrapper| {
let (job, next) = wrapper.compute_next();
if let Some(next) = next {
guard.insert(next);
}
job
})
} else {
None
}
}
pub fn add(&self, job: Job, trigger: Trigger) {
let mut store = self.data.lock().unwrap();
store.insert((job, trigger).into());
self.signal.notify_one()
}
pub fn shutdown(&self) {
self.signal.notify_one();
}
}
impl Default for JobStore {
fn default() -> Self {
JobStore::new()
}
}
impl From<(Job, Trigger)> for TriggerWrapper {
fn from((job, trigger): (Job, Trigger)) -> Self {
Self {
trigger: trigger.into(),
job: job.into(),
repeated: 0,
last_triggered: None,
}
}
}
#[derive(Debug)]
struct TriggerWrapper {
trigger: Arc<Trigger>,
job: Arc<Job>,
repeated: u32,
last_triggered: Option<SystemTime>,
}
impl TriggerWrapper {
fn compute_next(self) -> (Arc<Job>, Option<Self>) {
let TriggerWrapper {
trigger,
job,
repeated,
last_triggered: _,
} = self;
let now = SystemTime::now();
if let Some(end) = trigger.end_time {
if end <= now + trigger.interval.unwrap_or_default() {
return (job, None);
}
}
if let Some(repeat_count) = trigger.repeat_count {
if repeated < repeat_count - 1 {
let next = TriggerWrapper {
trigger,
job: job.clone(),
repeated: repeated + 1,
last_triggered: Some(now),
};
return (job, Some(next));
}
}
(job, None)
}
fn next_fire(&self) -> SystemTime {
if let Some(last_triggered) = self.last_triggered {
if let Some(interval) = self.trigger.interval {
return last_triggered + interval;
}
}
self.trigger.next_fire()
}
}
impl Eq for TriggerWrapper {}
impl PartialEq<Self> for TriggerWrapper {
fn eq(&self, other: &Self) -> bool {
self.job.eq(&other.job) && self.trigger.eq(&other.trigger)
}
}
impl PartialOrd<Self> for TriggerWrapper {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for TriggerWrapper {
fn cmp(&self, other: &Self) -> Ordering {
self.trigger.next_fire().cmp(&other.trigger.next_fire())
}
}