memfault_ssf/
scheduler.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
//
// Copyright (c) Memfault, Inc.
// See License.txt for details
mod job;

use job::*;

use std::{
    collections::BinaryHeap,
    thread::{sleep, spawn, JoinHandle},
    time::{Duration, Instant},
};

use crate::{Handler, Mailbox, MailboxError, Message, Service};

/// The `Scheduler` is a tool to schedule sending specific messages at a given
/// interval.  It runs as its own thread.
pub struct Scheduler {
    schedule: BinaryHeap<Job>,
}

impl Scheduler {
    pub fn new() -> Self {
        Scheduler {
            schedule: BinaryHeap::new(),
        }
    }

    /// Schedule a new subscription. The `message` will be sent to the `mailbox` every  `period`.
    /// The return value (if any) is ignored.
    pub fn schedule_message_subscription<M: Message + Clone, S: Service + Handler<M> + 'static>(
        &mut self,
        message: M,
        mailbox: &Mailbox<S>,
        period: &Duration,
    ) {
        let task = DeliverMessageJobImpl::new(mailbox.clone(), message);
        let job = Job {
            next_run: Instant::now() + *period,
            period: *period,
            task: Box::new(task),
        };

        self.schedule.push(job);
    }

    /// Run the Scheduler on its own thread
    /// `on_error` will be called when one of the messages cannot be delivered to the service.
    pub fn run(mut self, on_error: Box<dyn Fn(MailboxError) + Send>) -> JoinHandle<()> {
        spawn(move || loop {
            if let Some(job) = self.schedule.pop() {
                while Instant::now() < job.next_run {
                    sleep(job.next_run - Instant::now());
                }
                if let Err(e) = job.task.execute() {
                    on_error(e);
                }

                self.schedule.push(Job {
                    next_run: job.next_run + job.period,
                    period: job.period,
                    task: job.task,
                })
            }
        })
    }
}

impl Default for Scheduler {
    fn default() -> Self {
        Scheduler::new()
    }
}

trait ScheduledTask: Send {
    fn execute(&self) -> Result<(), MailboxError>;
    fn prepare_next(&self) -> Box<dyn ScheduledTask>;
}

struct DeliverMessageJobImpl<S, M>
where
    S: Service,
    M: Message + Clone,
    S: Handler<M>,
{
    message: M,
    mailbox: Mailbox<S>,
}

impl<S, M> DeliverMessageJobImpl<S, M>
where
    S: Service,
    M: Message + Clone,
    S: Handler<M>,
{
    fn new(mailbox: Mailbox<S>, message: M) -> Self {
        Self { message, mailbox }
    }
}

impl<S, M> ScheduledTask for DeliverMessageJobImpl<S, M>
where
    S: Service + 'static,
    M: Message + Clone,
    S: Handler<M>,
{
    fn execute(&self) -> Result<(), MailboxError> {
        self.mailbox.send_and_wait_for_reply(self.message.clone())?;
        Ok(())
    }

    fn prepare_next(&self) -> Box<dyn ScheduledTask> {
        Box::new(Self {
            message: self.message.clone(),
            mailbox: self.mailbox.clone(),
        })
    }
}