memfault_ssf/
scheduler.rs

1//
2// Copyright (c) Memfault, Inc.
3// See License.txt for details
4mod job;
5
6use job::*;
7
8use std::{
9    collections::BinaryHeap,
10    thread::{sleep, spawn, JoinHandle},
11    time::{Duration, Instant},
12};
13
14use crate::{Handler, Mailbox, MailboxError, Message, Service};
15
16/// The `Scheduler` is a tool to schedule sending specific messages at a given
17/// interval.  It runs as its own thread.
18pub struct Scheduler {
19    schedule: BinaryHeap<Job>,
20}
21
22impl Scheduler {
23    pub fn new() -> Self {
24        Scheduler {
25            schedule: BinaryHeap::new(),
26        }
27    }
28
29    /// Schedule a new subscription. The `message` will be sent to the `mailbox` every  `period`.
30    /// The return value (if any) is ignored.
31    pub fn schedule_message_subscription<M: Message + Clone, S: Service + Handler<M> + 'static>(
32        &mut self,
33        message: M,
34        mailbox: &Mailbox<S>,
35        period: &Duration,
36    ) {
37        let task = DeliverMessageJobImpl::new(mailbox.clone(), message);
38        let job = Job {
39            next_run: Instant::now() + *period,
40            period: *period,
41            task: Box::new(task),
42        };
43
44        self.schedule.push(job);
45    }
46
47    /// Run the Scheduler on its own thread
48    /// `on_error` will be called when one of the messages cannot be delivered to the service.
49    pub fn run(mut self, on_error: Box<dyn Fn(MailboxError) + Send>) -> JoinHandle<()> {
50        spawn(move || loop {
51            if let Some(job) = self.schedule.pop() {
52                while Instant::now() < job.next_run {
53                    sleep(job.next_run - Instant::now());
54                }
55                if let Err(e) = job.task.execute() {
56                    on_error(e);
57                }
58
59                self.schedule.push(Job {
60                    next_run: job.next_run + job.period,
61                    period: job.period,
62                    task: job.task,
63                })
64            }
65        })
66    }
67}
68
69impl Default for Scheduler {
70    fn default() -> Self {
71        Scheduler::new()
72    }
73}
74
75trait ScheduledTask: Send {
76    fn execute(&self) -> Result<(), MailboxError>;
77}
78
79struct DeliverMessageJobImpl<S, M>
80where
81    S: Service,
82    M: Message + Clone,
83    S: Handler<M>,
84{
85    message: M,
86    mailbox: Mailbox<S>,
87}
88
89impl<S, M> DeliverMessageJobImpl<S, M>
90where
91    S: Service,
92    M: Message + Clone,
93    S: Handler<M>,
94{
95    fn new(mailbox: Mailbox<S>, message: M) -> Self {
96        Self { message, mailbox }
97    }
98}
99
100impl<S, M> ScheduledTask for DeliverMessageJobImpl<S, M>
101where
102    S: Service + 'static,
103    M: Message + Clone,
104    S: Handler<M>,
105{
106    fn execute(&self) -> Result<(), MailboxError> {
107        self.mailbox.send_and_wait_for_reply(self.message.clone())?;
108        Ok(())
109    }
110}