docbox_core/background/
scheduler.rs

1//! # Scheduler
2//!
3//! Provides background task scheduling to run tasks at specific
4//! fixed intervals independent of when the task started
5
6use chrono::Local;
7use futures::{Stream, future::BoxFuture};
8use std::{
9    collections::BinaryHeap,
10    future::Future,
11    pin::Pin,
12    task::{Poll, ready},
13    time::Duration,
14};
15use tokio::time::{Instant, sleep_until};
16
17pub struct ScheduledEvent<E> {
18    /// Data for the event to execute
19    pub event: E,
20
21    /// Interval the event executes at in seconds
22    /// (For further scheduling)
23    pub interval: u64,
24
25    /// Next instance the
26    pub next_run: Instant,
27}
28
29impl<E> Eq for ScheduledEvent<E> {}
30
31impl<E> PartialEq for ScheduledEvent<E> {
32    fn eq(&self, other: &Self) -> bool {
33        self.next_run.eq(&other.next_run)
34    }
35}
36
37impl<E> PartialOrd for ScheduledEvent<E> {
38    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
39        Some(self.cmp(other))
40    }
41}
42
43impl<E> Ord for ScheduledEvent<E> {
44    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
45        // Reverse comparison order for binary heap to sort
46        // closest ones to the top
47        other.next_run.cmp(&self.next_run)
48    }
49}
50
51pub struct SchedulerQueueEvent<E> {
52    /// Data for the event
53    pub event: E,
54    /// Interval the event executes at in seconds
55    pub interval: u64,
56}
57
58pub struct SchedulerEventStream<E> {
59    /// Heap of scheduled events, ordered by the event which is
60    /// due to come first
61    events: BinaryHeap<ScheduledEvent<E>>,
62
63    /// Current sleep future
64    current_sleep: Option<BoxFuture<'static, ()>>,
65}
66
67impl<E> SchedulerEventStream<E>
68where
69    E: Clone + Unpin + PartialEq + Ord + 'static,
70{
71    pub fn new(events: Vec<SchedulerQueueEvent<E>>) -> SchedulerEventStream<E>
72    where
73        E: Clone + PartialEq + Ord + 'static,
74    {
75        SchedulerEventStream {
76            events: events
77                .into_iter()
78                .map(|event| create_scheduled_event(event.event, event.interval))
79                .collect(),
80            current_sleep: None,
81        }
82    }
83
84    /// Takes the current event pushing its next iteration to the
85    /// event heap then returns the current value
86    fn reschedule_current_event(&mut self) -> Option<E> {
87        let event = self.events.pop()?;
88
89        // Create the next iteration of the event
90        self.events
91            .push(create_scheduled_event(event.event.clone(), event.interval));
92
93        // Emit event
94        Some(event.event)
95    }
96}
97
98impl<E> Stream for SchedulerEventStream<E>
99where
100    E: Clone + Unpin + PartialEq + Ord + 'static,
101{
102    type Item = E;
103
104    fn poll_next(
105        self: Pin<&mut Self>,
106        cx: &mut std::task::Context<'_>,
107    ) -> Poll<Option<Self::Item>> {
108        let this = self.get_mut();
109
110        loop {
111            if let Some(current_sleep) = this.current_sleep.as_mut() {
112                // Poll current sleep
113                if Pin::new(current_sleep).poll(cx).is_pending() {
114                    return Poll::Pending;
115                }
116
117                // Clear current sleep
118                this.current_sleep = None;
119
120                return match this.reschedule_current_event() {
121                    Some(event) => Poll::Ready(Some(event)),
122                    None => Poll::Pending,
123                };
124            }
125
126            // Peek the top event
127            let next_event = match this.events.peek() {
128                Some(value) => value,
129                None => return Poll::Pending,
130            };
131
132            // Check if the event has already passed
133            let now = Instant::now();
134            if next_event.next_run < now {
135                return match this.reschedule_current_event() {
136                    Some(event) => Poll::Ready(Some(event)),
137                    None => Poll::Pending,
138                };
139            }
140
141            // Store and poll new sleep state
142            let sleep = sleep_until(next_event.next_run);
143            let sleep = this.current_sleep.insert(Box::pin(sleep));
144            ready!(Pin::new(sleep).poll(cx));
145        }
146    }
147}
148
149fn create_scheduled_event<E>(event: E, interval: u64) -> ScheduledEvent<E> {
150    let next_run = get_nth_interval_instant(interval, 1);
151    ScheduledEvent {
152        event,
153        interval,
154        next_run,
155    }
156}
157
158/// Gets the next instant for a fixed interval in seconds
159fn get_nth_interval_instant(interval: u64, nth: u64) -> Instant {
160    let now = Local::now();
161    let seconds_since_epoch = now.timestamp() as u64;
162    let next = (seconds_since_epoch / interval + nth) * interval;
163    Instant::now() + Duration::from_secs(next - seconds_since_epoch)
164}
165
166#[cfg(test)]
167mod test {
168    use futures::{FutureExt, StreamExt};
169    use std::sync::{Arc, Mutex};
170    use tokio::{spawn, time::sleep_until};
171
172    use crate::background::scheduler::{SchedulerEventStream, get_nth_interval_instant};
173
174    use super::SchedulerQueueEvent;
175
176    /// Tests that the correct number of events is produced over time
177    #[tokio::test]
178    async fn test_event_production() {
179        #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
180        struct MyEvent;
181
182        let events = vec![SchedulerQueueEvent {
183            event: MyEvent,
184            interval: 1,
185        }];
186
187        let start_instant = get_nth_interval_instant(1, 1);
188        let mut scheduler = SchedulerEventStream::new(events);
189
190        // Consume any events that were made immediately
191        scheduler.next().now_or_never();
192
193        let events: Arc<Mutex<Vec<MyEvent>>> = Default::default();
194
195        spawn({
196            let events = events.clone();
197            async move {
198                while let Some(event) = scheduler.next().await {
199                    events.lock().expect("lock was poisoned").push(event);
200                }
201            }
202        });
203
204        // Sleep until the start point, should have no events at this point
205        sleep_until(start_instant).await;
206        assert_eq!(events.lock().unwrap().len(), 0);
207
208        // Repeat testing 5 times to ensure correctness increasing the number
209        // of events that should have elapsed
210        for nth in 1..6 {
211            // Get the 5th interval from now
212            let start_instant = get_nth_interval_instant(1, nth);
213
214            {
215                // Sleep until the start point, should have nth events at this point
216                sleep_until(start_instant).await;
217                assert_eq!(events.lock().unwrap().len(), nth as usize);
218            }
219
220            // Reset for next iteration
221            events.lock().unwrap().clear();
222        }
223    }
224}