basic_scheduler/
lib.rs

1extern crate chrono;
2
3use std::sync::mpsc::{channel, Receiver, Sender};
4use std::cmp::Ordering;
5
6use chrono::prelude::*;
7
8mod events;
9
10pub use self::events::{BasicEvent, Eventer};
11pub use chrono::Duration;
12
13type InternalTime = DateTime<Utc>;
14
15pub struct Scheduler {
16    stuff: Vec<ScheduledEvent>,
17    new_items_rx: Receiver<Box<Eventer + Send>>,
18    new_items_tx: Sender<Box<Eventer + Send>>,
19}
20
21impl Scheduler {
22    /// Create a new Scheduler instance
23    pub fn new() -> Self {
24        let (tx,rx) = channel();
25        Scheduler {
26            stuff: vec!(),
27            new_items_tx: tx,
28            new_items_rx: rx,
29        }
30    }
31
32    /// Obtain a handle to send new tasks to
33    pub fn add_handle(&self) -> Sender<Box<Eventer + Send>> {
34        self.new_items_tx.clone()
35    }
36
37    /// Run the scheduler. Will block forever
38    pub fn run(&mut self) {
39        loop {
40            self.step()
41        }
42    }
43
44    fn step(&mut self) {
45        let time_to_next = self.process_pending();
46
47        match self.new_items_rx.recv_timeout(time_to_next.to_std().unwrap()) {
48            Ok(evt) => {
49                //println!("PING");
50                let mut new_evts: Vec<_> =
51                    self.new_items_rx.try_iter().fold(vec![evt], |mut acc, x| {
52                        acc.push(x);
53                        acc
54                    });
55
56                // Immediately run all new tasks
57                for evt in new_evts.drain(..) {
58                    self.process_single(evt);
59                }
60            }
61            _ => {
62                //println!("PONG");
63                // Timeout, its probably time to run an event
64            }
65        }
66    }
67
68    fn process_single(&mut self, mut evt: Box<Eventer + Send>) {
69        match evt.execute() {
70            Some(d) => {
71                // reschedule
72                self.insert(ScheduledEvent {
73                    when_next: Utc::now() + d,
74                    what: evt,
75                });
76            }
77            None => {} // Nothing to reschedule
78        }
79    }
80
81    fn process_pending(&mut self) -> Duration {
82        // println!("Processing Pending");
83        loop {
84            // Is there a pending item?
85            if self.stuff.len() == 0 {
86                return Duration::hours(24);
87            }
88
89            let now = Utc::now();
90            let next = self.stuff
91                .get(0)
92                .unwrap()
93                .when_next;
94
95            if next <= now {
96                let x = self.stuff.remove(0);
97                self.process_single(x.what);
98            } else {
99                return next.signed_duration_since(now);
100            }
101        }
102    }
103
104    fn insert(&mut self, evt: ScheduledEvent) {
105        let idx = match self.stuff.binary_search(&evt) {
106            Ok(idx) => idx,
107            Err(idx) => idx,
108        };
109
110        self.stuff.insert(idx, evt);
111    }
112}
113
114struct ScheduledEvent {
115    when_next: InternalTime,
116    what: Box<Eventer + Send>,
117}
118
119impl Ord for ScheduledEvent {
120    fn cmp(&self, other: &ScheduledEvent) -> Ordering {
121        self.when_next.cmp(&other.when_next)
122    }
123}
124
125impl PartialOrd for ScheduledEvent {
126    fn partial_cmp(&self, other: &ScheduledEvent) -> Option<Ordering> {
127        Some(self.cmp(other))
128    }
129}
130
131impl PartialEq for ScheduledEvent {
132    fn eq(&self, other: &ScheduledEvent) -> bool {
133        self.when_next == other.when_next
134    }
135}
136
137// This probably shouldn't be a thing
138impl Eq for ScheduledEvent {}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    #[test]
144    fn it_works() {
145
146    }
147}