1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
use std::{
    sync::mpsc,
    thread,
    time::{Duration, SystemTime},
};

use chrono::{DateTime, Utc};
use config::Config;
use uuid::Uuid;

use crate::{
    actor::{ActorRef, BasicActorRef, Sender},
    AnyMessage, Message,
};

pub type TimerRef = mpsc::Sender<Job>;

pub trait Timer {
    fn schedule<T, M>(
        &self,
        initial_delay: Duration,
        interval: Duration,
        receiver: ActorRef<M>,
        sender: Sender,
        msg: T,
    ) -> Uuid
    where
        T: Message + Into<M>,
        M: Message;

    fn schedule_once<T, M>(
        &self,
        delay: Duration,
        receiver: ActorRef<M>,
        sender: Sender,
        msg: T,
    ) -> Uuid
    where
        T: Message + Into<M>,
        M: Message;

    fn schedule_at_time<T, M>(
        &self,
        time: DateTime<Utc>,
        receiver: ActorRef<M>,
        sender: Sender,
        msg: T,
    ) -> Uuid
    where
        T: Message + Into<M>,
        M: Message;

    fn cancel_schedule(&self, id: Uuid);
}

pub enum Job {
    Once(OnceJob),
    Repeat(RepeatJob),
    Cancel(Uuid),
}

pub struct OnceJob {
    pub id: Uuid,
    pub send_at: SystemTime,
    pub receiver: BasicActorRef,
    pub sender: Sender,
    pub msg: AnyMessage,
}

impl OnceJob {
    pub fn send(mut self) {
        let _ = self.receiver.try_tell_any(&mut self.msg, self.sender);
    }
}

pub struct RepeatJob {
    pub id: Uuid,
    pub send_at: SystemTime,
    pub interval: Duration,
    pub receiver: BasicActorRef,
    pub sender: Sender,
    pub msg: AnyMessage,
}

impl RepeatJob {
    pub fn send(&mut self) {
        let _ = self
            .receiver
            .try_tell_any(&mut self.msg, self.sender.clone());
    }
}

// Default timer implementation

pub struct BasicTimer {
    once_jobs: Vec<OnceJob>,
    repeat_jobs: Vec<RepeatJob>,
}

impl BasicTimer {
    pub fn start(cfg: &Config) -> TimerRef {
        let cfg = BasicTimerConfig::from(cfg);

        let mut process = BasicTimer {
            once_jobs: Vec::new(),
            repeat_jobs: Vec::new(),
        };

        let (tx, rx) = mpsc::channel();
        thread::spawn(move || loop {
            process.execute_once_jobs();
            process.execute_repeat_jobs();

            if let Ok(job) = rx.try_recv() {
                match job {
                    Job::Cancel(id) => process.cancel(&id),
                    Job::Once(job) => process.schedule_once(job),
                    Job::Repeat(job) => process.schedule_repeat(job),
                }
            }

            thread::sleep(Duration::from_millis(cfg.frequency_millis));
        });

        tx
    }

    pub fn execute_once_jobs(&mut self) {
        let (send, keep): (Vec<OnceJob>, Vec<OnceJob>) = self
            .once_jobs
            .drain(..)
            .partition(|j| SystemTime::now() >= j.send_at);

        // send those messages where the 'send_at' time has been reached or elapsed
        for job in send.into_iter() {
            job.send();
        }

        // for those messages that are not to be sent yet, just put them back on the vec
        for job in keep.into_iter() {
            self.once_jobs.push(job);
        }
    }

    pub fn execute_repeat_jobs(&mut self) {
        for job in self.repeat_jobs.iter_mut() {
            if SystemTime::now() >= job.send_at {
                job.send_at = SystemTime::now() + job.interval;
                job.send();
            }
        }
    }

    pub fn cancel(&mut self, id: &Uuid) {
        // slightly sub optimal way of canceling because we don't know the job type
        // so need to do the remove on both vecs

        if let Some(pos) = self.once_jobs.iter().position(|job| &job.id == id) {
            self.once_jobs.remove(pos);
        }

        if let Some(pos) = self.repeat_jobs.iter().position(|job| &job.id == id) {
            self.repeat_jobs.remove(pos);
        }
    }

    pub fn schedule_once(&mut self, job: OnceJob) {
        if SystemTime::now() >= job.send_at {
            job.send();
        } else {
            self.once_jobs.push(job);
        }
    }

    pub fn schedule_repeat(&mut self, mut job: RepeatJob) {
        if SystemTime::now() >= job.send_at {
            job.send();
        }
        self.repeat_jobs.push(job);
    }
}

struct BasicTimerConfig {
    frequency_millis: u64,
}

impl<'a> From<&'a Config> for BasicTimerConfig {
    fn from(config: &Config) -> Self {
        BasicTimerConfig {
            frequency_millis: config.get_int("scheduler.frequency_millis").unwrap() as u64,
        }
    }
}