1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
extern crate config;
extern crate riker;
extern crate uuid;

use std::time::{Duration, SystemTime};
use std::thread;
use std::sync::mpsc::{channel, Sender};

use config::Config;
use uuid::Uuid;

use riker::protocol::Message;
use riker::system::{TimerFactory, Job, OnceJob, RepeatJob};

impl<Msg: Message> TimerFactory for BasicTimer<Msg> {
    type Msg = Msg;

    fn new(config: &Config, _debug: bool) -> Sender<Job<Msg>> {
        let config = BasicTimerConfig::from(config);
        let tx = BasicTimer::start(config);

        tx
    }
}

pub struct BasicTimer<Msg: Message> {
    once_jobs: Vec<OnceJob<Msg>>,
    repeat_jobs: Vec<RepeatJob<Msg>>,
}

impl<Msg: Message> BasicTimer<Msg> {
    fn start(config: BasicTimerConfig) -> Sender<Job<Msg>> {
        let mut process = BasicTimer {
            once_jobs: Vec::new(),
            repeat_jobs: Vec::new()
        };

        let (tx, rx) = channel();
        thread::spawn(move || {
            loop {
                process.execute_once_jobs();
                process.execute_repeat_jobs();

                if let Ok(job) = rx.try_recv() {
                    match job {
                        Job::Cancel(id) => process.cancel(&id),
                        Job::Once(job) => process.schedule_once(job),
                        Job::Repeat(job) => process.schedule_repeat(job)
                    }
                }

                thread::sleep(Duration::from_millis(config.frequency_millis));
            }
        });

        tx
    }

    fn execute_once_jobs(&mut self) {
        let (send, keep): (Vec<OnceJob<Msg>>, Vec<OnceJob<Msg>>) =
            self.once_jobs.drain(..).partition(|j| SystemTime::now() >= j.send_at);

        // send those messages where the 'send_at' time has been reached or elapsed
        for job in send.into_iter() {
            job.send();
        }

        // for those messages that are not to be sent yet, just put them back on the vec
        for job in keep.into_iter() {
            self.once_jobs.push(job);
        }
    }

    fn execute_repeat_jobs(&mut self) {
        for job in self.repeat_jobs.iter_mut() {
            if SystemTime::now() >= job.send_at {
                job.send_at = SystemTime::now() + job.interval;
                job.send();
            }
        }
    }

    fn cancel(&mut self, id: &Uuid) {
        // slightly sub optimal way of canceling because we don't know the job type
        // so need to do the remove on both collections
        
        if let Some(pos) = self.once_jobs.iter().position(|job| &job.id == id) {
            self.once_jobs.remove(pos);
        }

        if let Some(pos) = self.repeat_jobs.iter().position(|job| &job.id == id) {
            self.repeat_jobs.remove(pos);
        }
    }

    fn schedule_once(&mut self, job: OnceJob<Msg>) {
        if SystemTime::now() >= job.send_at {
            job.send();
        } else {
            self.once_jobs.push(job);
        }
    }

    fn schedule_repeat(&mut self, job: RepeatJob<Msg>) {
        if SystemTime::now() >= job.send_at {
            job.send();
        } else {
            self.repeat_jobs.push(job);
        }
    }
}

struct BasicTimerConfig {
    frequency_millis: u64,
}

impl<'a> From<&'a Config> for BasicTimerConfig {
    fn from(config: &Config) -> Self {
        BasicTimerConfig {
            frequency_millis: config.get_int("scheduler.frequency_millis").unwrap() as u64
        }
    }
}