1use crate::queue::Notifier;
3use crate::NotificationId;
4use crossbeam_queue::SegQueue;
5use std::cmp::Ordering as CmpOrdering;
6use std::collections::{BTreeSet, HashSet};
7use std::fmt::Debug;
8use std::hash::{Hash, Hasher};
9use std::ops::{Add, Sub};
10use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
11use std::sync::{Arc, RwLock};
12use std::thread::JoinHandle;
13use std::{
14 fmt, thread,
15 time::{Duration, Instant},
16};
17
18#[derive(Debug)]
20pub struct NotificationScheduler {
21 notifier: Arc<dyn Notifier>,
22 scheduler: Arc<Scheduler>,
23}
24
25impl NotificationScheduler {
26 pub fn new(notifier: Arc<dyn Notifier>, scheduler: Arc<Scheduler>) -> NotificationScheduler {
28 NotificationScheduler {
29 notifier,
30 scheduler,
31 }
32 }
33
34 pub fn notify_with_fixed_interval<I: Into<Option<Duration>>>(
36 &self,
37 id: NotificationId,
38 interval: Duration,
39 initial_delay: I,
40 name: Option<String>,
41 ) -> ScheduleEntryId
42 where
43 I: Into<Option<Duration>>,
44 {
45 let notifier = Arc::clone(&self.notifier);
46 let entry = ScheduleEntry::with_interval(interval, initial_delay, name, move || {
47 let _ = notifier.notify(id);
48 });
49 let id = entry.id;
50 self.scheduler.schedule(entry);
51 id
52 }
53
54 pub fn notify_once_after_delay(
56 &self,
57 id: NotificationId,
58 delay: Duration,
59 name: Option<String>,
60 ) -> ScheduleEntryId {
61 let notifier = Arc::clone(&self.notifier);
62 let entry = ScheduleEntry::one_time(delay, name, move || {
63 let _ = notifier.notify(id);
64 });
65 let id = entry.id;
66 self.scheduler.schedule(entry);
67 id
68 }
69
70 pub fn cancel(&self, id: ScheduleEntryId) {
72 self.scheduler.cancel(id);
73 }
74}
75
76type Callback = dyn Fn() + Send + Sync + 'static;
77
78#[derive(Clone)]
80pub struct ScheduleEntry {
81 start: Instant,
82 interval: Option<Duration>,
84 callback: Arc<Callback>,
85 pub name: Option<String>,
87 pub id: ScheduleEntryId,
89}
90
91impl ScheduleEntry {
92 pub fn with_interval<I, F>(
94 interval: Duration,
95 initial_delay: I,
96 name: Option<String>,
97 callback: F,
98 ) -> ScheduleEntry
99 where
100 I: Into<Option<Duration>>,
101 F: Fn() + Send + Sync + 'static,
102 {
103 let now = Instant::now();
104 ScheduleEntry {
105 start: initial_delay.into().map(|d| now.add(d)).unwrap_or(now),
106 interval: Some(interval),
107 callback: Arc::new(callback),
108 name,
109 id: ScheduleEntryId::gen_next(),
110 }
111 }
112
113 pub fn one_time<F>(delay: Duration, name: Option<String>, callback: F) -> ScheduleEntry
115 where
116 F: Fn() + Send + Sync + 'static,
117 {
118 ScheduleEntry {
119 start: Instant::now().add(delay),
120 interval: None,
121 callback: Arc::new(callback),
122 name,
123 id: ScheduleEntryId::gen_next(),
124 }
125 }
126}
127
128impl fmt::Debug for ScheduleEntry {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 f.debug_struct("ScheduleEntry")
131 .field("start", &self.start)
132 .field("interval", &self.interval)
133 .field("name", &self.name)
134 .field("id", &self.id)
135 .finish()
136 }
137}
138
139impl Hash for ScheduleEntry {
140 fn hash<H: Hasher>(&self, hasher: &mut H) {
141 self.start.hash(hasher);
142 self.id.hash(hasher);
143 }
144}
145
146impl Eq for ScheduleEntry {}
147
148impl PartialEq for ScheduleEntry {
149 fn eq(&self, other: &Self) -> bool {
150 self.start == other.start && self.id == other.id
151 }
152}
153
154impl Ord for ScheduleEntry {
155 fn cmp(&self, other: &Self) -> CmpOrdering {
156 self.start.cmp(&other.start).then(self.id.cmp(&other.id))
157 }
158}
159
160impl PartialOrd for ScheduleEntry {
161 fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
162 Some(self.cmp(other))
163 }
164}
165
166static NEXT_SCHEDULE_ENTRY_ID: AtomicU32 = AtomicU32::new(1);
167
168#[derive(Copy, Clone, Debug, PartialEq, Ord, PartialOrd, Eq, Hash)]
170pub struct ScheduleEntryId(u32);
171
172impl ScheduleEntryId {
173 pub fn gen_next() -> ScheduleEntryId {
175 let id = NEXT_SCHEDULE_ENTRY_ID.fetch_add(1, Ordering::SeqCst);
176 ScheduleEntryId(id)
177 }
178
179 pub fn id(&self) -> u32 {
181 self.0
182 }
183}
184
185#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
187pub enum SchedulerStatus {
188 Active,
190 Parked,
192 ParkedTimeout,
194}
195
196#[derive(Debug)]
198pub struct Scheduler {
199 shutdown: Arc<AtomicBool>,
200 thread_handle: JoinHandle<()>,
201 schedule_queue: Arc<SegQueue<ScheduleEntry>>,
202 cancel_queue: Arc<SegQueue<ScheduleEntryId>>,
203 name: String,
204 status: Arc<RwLock<SchedulerStatus>>,
205 entry_count: Arc<AtomicU32>,
206}
207
208impl Default for Scheduler {
209 fn default() -> Scheduler {
210 Scheduler::new(None)
211 }
212}
213
214static SCHEDULER_THREAD_ID: AtomicU32 = AtomicU32::new(1);
216
217impl Scheduler {
218 pub fn name(&self) -> &str {
220 self.name.as_str()
221 }
222
223 pub fn schedule(&self, entry: ScheduleEntry) {
225 self.schedule_queue.push(entry);
226 self.thread_handle.thread().unpark();
227 }
228
229 pub fn cancel(&self, id: ScheduleEntryId) {
231 self.cancel_queue.push(id);
232 self.thread_handle.thread().unpark();
233 }
234
235 pub fn status(&self) -> SchedulerStatus {
237 *(self.status.read().unwrap())
238 }
239
240 pub fn entry_count(&self) -> u32 {
242 self.entry_count.load(Ordering::SeqCst)
243 }
244
245 pub fn new(name: Option<String>) -> Scheduler {
247 let t_id = SCHEDULER_THREAD_ID.fetch_add(1, Ordering::SeqCst);
248 let name_prefix = "mio-misc-scheduler";
249 let name = name
250 .map(|n| format!("{}-{}-{}", name_prefix, n, t_id))
251 .unwrap_or_else(|| format!("{}-{}", name_prefix, t_id));
252 let name_clone = name.clone();
253
254 let shut_down = Arc::new(AtomicBool::new(false));
255 let shutdown_clone = Arc::clone(&shut_down);
256 let entry_count = Arc::new(AtomicU32::new(0));
257 let entry_count_clone = Arc::clone(&entry_count);
258 let schedule_queue = Arc::new(SegQueue::new());
259 let schedule_queue_clone = Arc::clone(&schedule_queue);
260 let cancel_queue = Arc::new(SegQueue::new());
261 let cancel_queue_clone = Arc::clone(&cancel_queue);
262 let status = Arc::new(RwLock::new(SchedulerStatus::Active));
263 let status_clone = Arc::clone(&status);
264 let thread_handle = thread::Builder::new()
265 .name(name.clone())
266 .spawn(move || {
267 let mut entries: BTreeSet<ScheduleEntry> = BTreeSet::new();
268 let mut entries_to_cancel: HashSet<ScheduleEntryId> = HashSet::new();
269 while !shut_down.load(Ordering::SeqCst) {
270 while let Some(entry_id) = cancel_queue.pop() {
272 trace!(
273 "{}: cancelling scheduler entry with id {:?};",
274 name,
275 entry_id
276 );
277 let _ = entries_to_cancel.insert(entry_id);
278 }
279 if let Some(entry) = schedule_queue.pop() {
280 trace!("{}: scheduling entry; {:?};", name, entry);
281 if entries.insert(entry) {
282 entry_count.fetch_add(1, Ordering::SeqCst);
283 }
284 }
285 if let Some(entry) = entries.iter().cloned().next() {
286 let now = Instant::now();
287 if now.ge(&entry.start) {
289 entries.remove(&entry);
290 if !entries_to_cancel.contains(&entry.id) {
292 trace!("{}: executing scheduler entry; {:?}", name, entry);
293 let cb = Arc::clone(&entry.callback);
294 cb();
295 if let Some(interval) = entry.interval {
296 let updated_entry = ScheduleEntry {
298 start: Instant::now().add(interval),
299 interval: entry.interval,
300 callback: entry.callback,
301 name: entry.name,
302 id: entry.id,
303 };
304 entries.insert(updated_entry);
305 }
306 } else {
307 trace!("{}: cancelling scheduler entry; {:?}", name, entry);
309
310 if entries_to_cancel.remove(&entry.id) {
311 entry_count.fetch_sub(1, Ordering::SeqCst);
312 }
313 }
314 } else {
315 let timeout_dur = entry.start.sub(now);
317 trace!("{}: parking scheduler for {:?}", name, timeout_dur);
318 *status.write().unwrap() = SchedulerStatus::ParkedTimeout;
319 thread::park_timeout(timeout_dur);
320 *status.write().unwrap() = SchedulerStatus::Active;
321 }
322 } else {
323 trace!("{}: parking scheduler until being un-parked", name);
325 *status.write().unwrap() = SchedulerStatus::Parked;
326 thread::park();
327 *status.write().unwrap() = SchedulerStatus::Active;
328 }
329 }
330 })
331 .unwrap();
332 Scheduler {
333 shutdown: shutdown_clone,
334 thread_handle,
335 schedule_queue: schedule_queue_clone,
336 cancel_queue: cancel_queue_clone,
337 name: name_clone,
338 status: status_clone,
339 entry_count: entry_count_clone,
340 }
341 }
342}
343
344impl Drop for Scheduler {
345 fn drop(&mut self) {
346 self.shutdown.store(true, Ordering::SeqCst);
347 self.thread_handle.thread().unpark();
348 }
349}