docbox_core/background/
scheduler.rs1use chrono::Local;
7use futures::{Stream, future::BoxFuture};
8use std::{
9 collections::BinaryHeap,
10 future::Future,
11 pin::Pin,
12 task::{Poll, ready},
13 time::Duration,
14};
15use tokio::time::{Instant, sleep_until};
16
17pub struct ScheduledEvent<E> {
18 pub event: E,
20
21 pub interval: u64,
24
25 pub next_run: Instant,
27}
28
29impl<E> Eq for ScheduledEvent<E> {}
30
31impl<E> PartialEq for ScheduledEvent<E> {
32 fn eq(&self, other: &Self) -> bool {
33 self.next_run.eq(&other.next_run)
34 }
35}
36
37impl<E> PartialOrd for ScheduledEvent<E> {
38 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
39 Some(self.cmp(other))
40 }
41}
42
43impl<E> Ord for ScheduledEvent<E> {
44 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
45 other.next_run.cmp(&self.next_run)
48 }
49}
50
51pub struct SchedulerQueueEvent<E> {
52 pub event: E,
54 pub interval: u64,
56}
57
58pub struct SchedulerEventStream<E> {
59 events: BinaryHeap<ScheduledEvent<E>>,
62
63 current_sleep: Option<BoxFuture<'static, ()>>,
65}
66
67impl<E> SchedulerEventStream<E>
68where
69 E: Clone + Unpin + PartialEq + Ord + 'static,
70{
71 pub fn new(events: Vec<SchedulerQueueEvent<E>>) -> SchedulerEventStream<E>
72 where
73 E: Clone + PartialEq + Ord + 'static,
74 {
75 SchedulerEventStream {
76 events: events
77 .into_iter()
78 .map(|event| create_scheduled_event(event.event, event.interval))
79 .collect(),
80 current_sleep: None,
81 }
82 }
83
84 fn reschedule_current_event(&mut self) -> Option<E> {
87 let event = self.events.pop()?;
88
89 self.events
91 .push(create_scheduled_event(event.event.clone(), event.interval));
92
93 Some(event.event)
95 }
96}
97
98impl<E> Stream for SchedulerEventStream<E>
99where
100 E: Clone + Unpin + PartialEq + Ord + 'static,
101{
102 type Item = E;
103
104 fn poll_next(
105 self: Pin<&mut Self>,
106 cx: &mut std::task::Context<'_>,
107 ) -> Poll<Option<Self::Item>> {
108 let this = self.get_mut();
109
110 loop {
111 if let Some(current_sleep) = this.current_sleep.as_mut() {
112 if Pin::new(current_sleep).poll(cx).is_pending() {
114 return Poll::Pending;
115 }
116
117 this.current_sleep = None;
119
120 return match this.reschedule_current_event() {
121 Some(event) => Poll::Ready(Some(event)),
122 None => Poll::Pending,
123 };
124 }
125
126 let next_event = match this.events.peek() {
128 Some(value) => value,
129 None => return Poll::Pending,
130 };
131
132 let now = Instant::now();
134 if next_event.next_run < now {
135 return match this.reschedule_current_event() {
136 Some(event) => Poll::Ready(Some(event)),
137 None => Poll::Pending,
138 };
139 }
140
141 let sleep = sleep_until(next_event.next_run);
143 let sleep = this.current_sleep.insert(Box::pin(sleep));
144 ready!(Pin::new(sleep).poll(cx));
145 }
146 }
147}
148
149fn create_scheduled_event<E>(event: E, interval: u64) -> ScheduledEvent<E> {
150 let next_run = get_nth_interval_instant(interval, 1);
151 ScheduledEvent {
152 event,
153 interval,
154 next_run,
155 }
156}
157
158fn get_nth_interval_instant(interval: u64, nth: u64) -> Instant {
160 let now = Local::now();
161 let seconds_since_epoch = now.timestamp() as u64;
162 let next = (seconds_since_epoch / interval + nth) * interval;
163 Instant::now() + Duration::from_secs(next - seconds_since_epoch)
164}
165
166#[cfg(test)]
167mod test {
168 use futures::{FutureExt, StreamExt};
169 use std::sync::{Arc, Mutex};
170 use tokio::{spawn, time::sleep_until};
171
172 use crate::background::scheduler::{SchedulerEventStream, get_nth_interval_instant};
173
174 use super::SchedulerQueueEvent;
175
176 #[tokio::test]
178 async fn test_event_production() {
179 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
180 struct MyEvent;
181
182 let events = vec![SchedulerQueueEvent {
183 event: MyEvent,
184 interval: 1,
185 }];
186
187 let start_instant = get_nth_interval_instant(1, 1);
188 let mut scheduler = SchedulerEventStream::new(events);
189
190 scheduler.next().now_or_never();
192
193 let events: Arc<Mutex<Vec<MyEvent>>> = Default::default();
194
195 spawn({
196 let events = events.clone();
197 async move {
198 while let Some(event) = scheduler.next().await {
199 events.lock().expect("lock was poisoned").push(event);
200 }
201 }
202 });
203
204 sleep_until(start_instant).await;
206 assert_eq!(events.lock().unwrap().len(), 0);
207
208 for nth in 1..6 {
211 let start_instant = get_nth_interval_instant(1, nth);
213
214 {
215 sleep_until(start_instant).await;
217 assert_eq!(events.lock().unwrap().len(), nth as usize);
218 }
219
220 events.lock().unwrap().clear();
222 }
223 }
224}