use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use chrono::{DateTime, TimeZone, Utc};
use crate::entry::Entry;
use crate::Result;
use crate::MAX_WAIT_SECONDS;
#[derive(Clone, Debug)]
pub struct Cron<Z>
where
Z: TimeZone + Sync + Send + 'static,
Z::Offset: Send,
{
entries: Arc<Mutex<Vec<Entry<Z>>>>,
next_id: Arc<AtomicUsize>,
tz: Z,
add_channel: (
crossbeam_channel::Sender<Entry<Z>>,
crossbeam_channel::Receiver<Entry<Z>>,
),
stop_channel: (
crossbeam_channel::Sender<bool>,
crossbeam_channel::Receiver<bool>,
),
}
impl<Z> Cron<Z>
where
Z: TimeZone + Sync + Send + 'static,
Z::Offset: Send,
{
pub fn new(tz: Z) -> Cron<Z> {
Cron {
entries: Arc::new(Mutex::new(Vec::new())),
next_id: Arc::new(AtomicUsize::new(0)),
tz,
add_channel: crossbeam_channel::unbounded(),
stop_channel: crossbeam_channel::unbounded(),
}
}
pub fn add_fn<T>(&mut self, spec: &str, f: T) -> Result<usize>
where
T: 'static,
T: Fn() + Send + Sync,
{
let schedule = cron::Schedule::from_str(spec)?;
self.schedule(schedule, f)
}
pub fn add_fn_once<T>(&mut self, datetime: DateTime<Z>, f: T) -> Result<usize>
where
T: 'static,
T: Fn() + Send + Sync,
{
let next_id = self.next_id.fetch_add(1, Ordering::SeqCst);
let entry = Entry {
id: next_id,
next: Some(datetime),
run: Arc::new(f),
schedule: None,
};
self.add_channel.0.send(entry).unwrap();
Ok(next_id)
}
pub fn add_fn_after<T>(&mut self, delay: Duration, f: T) -> Result<usize>
where
T: 'static,
T: Fn() + Send + Sync,
{
let chrono_delay = chrono::Duration::from_std(delay)
.map_err(|_| crate::CronError::DurationOutOfRange)?;
let execute_at = self.now() + chrono_delay;
self.add_fn_once(execute_at, f)
}
fn schedule<T>(&mut self, schedule: cron::Schedule, f: T) -> Result<usize>
where
T: Send + Sync + 'static,
T: Fn(),
{
let next_id = self.next_id.fetch_add(1, Ordering::SeqCst);
let mut entry = Entry {
id: next_id,
next: None,
run: Arc::new(f),
schedule: Some(schedule),
};
entry.next = entry.schedule_next(self.get_timezone());
self.add_channel.0.send(entry).unwrap();
Ok(next_id)
}
pub fn set_timezone(&mut self, tz: Z) {
self.tz = tz;
}
fn get_timezone(&self) -> Z {
self.tz.clone()
}
fn now(&self) -> DateTime<Z> {
self.get_timezone()
.from_utc_datetime(&Utc::now().naive_utc())
}
fn remove_entry(&self, id: usize) {
let mut entries = self.entries.lock().unwrap();
if let Some(index) = entries.iter().position(|e| e.id == id) {
entries.remove(index);
}
}
pub fn remove(&self, id: usize) {
self.remove_entry(id)
}
pub fn stop(&self) {
self.stop_channel.0.send(true).unwrap()
}
pub fn start(&mut self) {
let mut cron = self.clone();
thread::spawn(move || {
cron.start_blocking();
});
}
pub fn start_blocking(&mut self) {
for entry in self.entries.lock().unwrap().iter_mut() {
if entry.next.is_none() {
entry.next = entry.schedule_next(self.get_timezone());
}
}
let mut wait_duration = Duration::from_secs(MAX_WAIT_SECONDS);
loop {
let mut entries = self.entries.lock().unwrap();
entries.sort_by(|b, a| b.next.cmp(&a.next));
if let Some(entry) = entries.first() {
let wait_milis = (entry.next.as_ref().unwrap().timestamp_millis() as u64)
.saturating_sub(self.now().timestamp_millis() as u64);
wait_duration = Duration::from_millis(wait_milis);
}
drop(entries);
crossbeam_channel::select! {
recv(crossbeam_channel::after(wait_duration)) -> _ => {
let now = self.now();
let mut entries = self.entries.lock().unwrap();
let mut jobs_to_remove = Vec::new();
for entry in entries.iter_mut() {
if entry.next.as_ref().unwrap().gt(&now) {
break;
}
let run = entry.run.clone();
thread::spawn(move || {
run();
});
if entry.is_once() {
jobs_to_remove.push(entry.id);
} else {
entry.next = entry.schedule_next(self.get_timezone());
}
}
entries.retain(|e| !jobs_to_remove.contains(&e.id));
},
recv(self.add_channel.1) -> new_entry => {
let mut entry = new_entry.unwrap();
if entry.next.is_none() {
entry.next = entry.schedule_next(self.get_timezone());
}
self.entries.lock().unwrap().push(entry);
},
recv(self.stop_channel.1) -> _ => {
return;
},
}
}
}
}