1use crate::queue::Notifier;
3use crate::NotificationId;
4use crossbeam_queue::SegQueue;
5use std::cmp::Ordering as CmpOrdering;
6use std::collections::{BTreeSet, HashSet};
7use std::fmt::Debug;
8use std::hash::{Hash, Hasher};
9use std::ops::{Add, Sub};
10use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
11use std::sync::{Arc, RwLock};
12use std::thread::JoinHandle;
13use std::{
14 fmt, thread,
15 time::{Duration, Instant},
16};
17
18#[derive(Debug)]
20pub struct NotificationScheduler {
21 notifier: Arc<dyn Notifier>,
22 scheduler: Arc<Scheduler>,
23}
24
25impl NotificationScheduler {
26 pub fn new(notifier: Arc<dyn Notifier>, scheduler: Arc<Scheduler>) -> NotificationScheduler {
28 NotificationScheduler {
29 notifier,
30 scheduler,
31 }
32 }
33
34 pub fn notify_with_fixed_interval<I: Into<Option<Duration>>>(
36 &self,
37 id: NotificationId,
38 interval: Duration,
39 initial_delay: I,
40 name: Option<String>,
41 ) -> ScheduleEntryId {
42 let notifier = Arc::clone(&self.notifier);
43 let entry = ScheduleEntry::with_interval(interval, initial_delay, name, move || {
44 let _ = notifier.notify(id);
45 });
46 let id = entry.id;
47 self.scheduler.schedule(entry);
48 id
49 }
50
51 pub fn notify_once_after_delay(
53 &self,
54 id: NotificationId,
55 delay: Duration,
56 name: Option<String>,
57 ) -> ScheduleEntryId {
58 let notifier = Arc::clone(&self.notifier);
59 let entry = ScheduleEntry::one_time(delay, name, move || {
60 let _ = notifier.notify(id);
61 });
62 let id = entry.id;
63 self.scheduler.schedule(entry);
64 id
65 }
66
67 pub fn cancel(&self, id: ScheduleEntryId) {
69 self.scheduler.cancel(id);
70 }
71}
72
73type Callback = dyn Fn() + Send + Sync + 'static;
74
75#[derive(Clone)]
77pub struct ScheduleEntry {
78 start: Instant,
79 interval: Option<Duration>,
81 callback: Arc<Callback>,
82 pub name: Option<String>,
84 pub id: ScheduleEntryId,
86}
87
88impl ScheduleEntry {
89 pub fn with_interval<I, F>(
91 interval: Duration,
92 initial_delay: I,
93 name: Option<String>,
94 callback: F,
95 ) -> ScheduleEntry
96 where
97 I: Into<Option<Duration>>,
98 F: Fn() + Send + Sync + 'static,
99 {
100 let now = Instant::now();
101 ScheduleEntry {
102 start: initial_delay.into().map(|d| now.add(d)).unwrap_or(now),
103 interval: Some(interval),
104 callback: Arc::new(callback),
105 name,
106 id: ScheduleEntryId::gen_next(),
107 }
108 }
109
110 pub fn one_time<F>(delay: Duration, name: Option<String>, callback: F) -> ScheduleEntry
112 where
113 F: Fn() + Send + Sync + 'static,
114 {
115 ScheduleEntry {
116 start: Instant::now().add(delay),
117 interval: None,
118 callback: Arc::new(callback),
119 name,
120 id: ScheduleEntryId::gen_next(),
121 }
122 }
123}
124
125impl fmt::Debug for ScheduleEntry {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 f.debug_struct("ScheduleEntry")
128 .field("start", &self.start)
129 .field("interval", &self.interval)
130 .field("name", &self.name)
131 .field("id", &self.id)
132 .finish()
133 }
134}
135
136impl Hash for ScheduleEntry {
137 fn hash<H: Hasher>(&self, hasher: &mut H) {
138 self.start.hash(hasher);
139 self.id.hash(hasher);
140 }
141}
142
143impl Eq for ScheduleEntry {}
144
145impl PartialEq for ScheduleEntry {
146 fn eq(&self, other: &Self) -> bool {
147 self.start == other.start && self.id == other.id
148 }
149}
150
151impl Ord for ScheduleEntry {
152 fn cmp(&self, other: &Self) -> CmpOrdering {
153 self.start.cmp(&other.start).then(self.id.cmp(&other.id))
154 }
155}
156
157impl PartialOrd for ScheduleEntry {
158 fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
159 Some(self.cmp(other))
160 }
161}
162
163static NEXT_SCHEDULE_ENTRY_ID: AtomicU32 = AtomicU32::new(1);
164
165#[derive(Copy, Clone, Debug, PartialEq, Ord, PartialOrd, Eq, Hash)]
167pub struct ScheduleEntryId(u32);
168
169impl ScheduleEntryId {
170 pub fn gen_next() -> ScheduleEntryId {
172 let id = NEXT_SCHEDULE_ENTRY_ID.fetch_add(1, Ordering::SeqCst);
173 ScheduleEntryId(id)
174 }
175
176 pub fn id(&self) -> u32 {
178 self.0
179 }
180}
181
182#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
184pub enum SchedulerStatus {
185 Active,
187 Parked,
189 ParkedTimeout,
191}
192
193#[derive(Debug)]
195pub struct Scheduler {
196 shutdown: Arc<AtomicBool>,
197 thread_handle: JoinHandle<()>,
198 schedule_queue: Arc<SegQueue<ScheduleEntry>>,
199 cancel_queue: Arc<SegQueue<ScheduleEntryId>>,
200 name: String,
201 status: Arc<RwLock<SchedulerStatus>>,
202 entry_count: Arc<AtomicU32>,
203}
204
205impl Default for Scheduler {
206 fn default() -> Scheduler {
207 Scheduler::new(None)
208 }
209}
210
211static SCHEDULER_THREAD_ID: AtomicU32 = AtomicU32::new(1);
213
214impl Scheduler {
215 pub fn name(&self) -> &str {
217 self.name.as_str()
218 }
219
220 pub fn schedule(&self, entry: ScheduleEntry) {
222 self.schedule_queue.push(entry);
223 self.thread_handle.thread().unpark();
224 }
225
226 pub fn cancel(&self, id: ScheduleEntryId) {
228 self.cancel_queue.push(id);
229 self.thread_handle.thread().unpark();
230 }
231
232 pub fn status(&self) -> SchedulerStatus {
234 *(self.status.read().unwrap())
235 }
236
237 pub fn entry_count(&self) -> u32 {
239 self.entry_count.load(Ordering::SeqCst)
240 }
241
242 pub fn new(name: Option<String>) -> Scheduler {
244 let t_id = SCHEDULER_THREAD_ID.fetch_add(1, Ordering::SeqCst);
245 let name_prefix = "mio-misc-scheduler";
246 let name = name
247 .map(|n| format!("{}-{}-{}", name_prefix, n, t_id))
248 .unwrap_or_else(|| format!("{}-{}", name_prefix, t_id));
249 let name_clone = name.clone();
250
251 let shut_down = Arc::new(AtomicBool::new(false));
252 let shutdown_clone = Arc::clone(&shut_down);
253 let entry_count = Arc::new(AtomicU32::new(0));
254 let entry_count_clone = Arc::clone(&entry_count);
255 let schedule_queue = Arc::new(SegQueue::new());
256 let schedule_queue_clone = Arc::clone(&schedule_queue);
257 let cancel_queue = Arc::new(SegQueue::new());
258 let cancel_queue_clone = Arc::clone(&cancel_queue);
259 let status = Arc::new(RwLock::new(SchedulerStatus::Active));
260 let status_clone = Arc::clone(&status);
261 let thread_handle = thread::Builder::new()
262 .name(name.clone())
263 .spawn(move || {
264 let mut entries: BTreeSet<ScheduleEntry> = BTreeSet::new();
265 let mut entries_to_cancel: HashSet<ScheduleEntryId> = HashSet::new();
266 while !shut_down.load(Ordering::SeqCst) {
267 while let Some(entry_id) = cancel_queue.pop() {
269 trace!(
270 "{}: cancelling scheduler entry with id {:?};",
271 name,
272 entry_id
273 );
274 let _ = entries_to_cancel.insert(entry_id);
275 }
276 if let Some(entry) = schedule_queue.pop() {
277 trace!("{}: scheduling entry; {:?};", name, entry);
278 if entries.insert(entry) {
279 entry_count.fetch_add(1, Ordering::SeqCst);
280 }
281 }
282 if let Some(entry) = entries.iter().next().cloned() {
283 let now = Instant::now();
284 if now.ge(&entry.start) {
286 entries.remove(&entry);
287 if !entries_to_cancel.contains(&entry.id) {
289 trace!("{}: executing scheduler entry; {:?}", name, entry);
290 let cb = Arc::clone(&entry.callback);
291 cb();
292 if let Some(interval) = entry.interval {
293 let updated_entry = ScheduleEntry {
295 start: Instant::now().add(interval),
296 interval: entry.interval,
297 callback: entry.callback,
298 name: entry.name,
299 id: entry.id,
300 };
301 entries.insert(updated_entry);
302 }
303 } else {
304 trace!("{}: cancelling scheduler entry; {:?}", name, entry);
306
307 if entries_to_cancel.remove(&entry.id) {
308 entry_count.fetch_sub(1, Ordering::SeqCst);
309 }
310 }
311 } else {
312 let timeout_dur = entry.start.sub(now);
314 trace!("{}: parking scheduler for {:?}", name, timeout_dur);
315 *status.write().unwrap() = SchedulerStatus::ParkedTimeout;
316 thread::park_timeout(timeout_dur);
317 *status.write().unwrap() = SchedulerStatus::Active;
318 }
319 } else {
320 trace!("{}: parking scheduler until being un-parked", name);
322 *status.write().unwrap() = SchedulerStatus::Parked;
323 thread::park();
324 *status.write().unwrap() = SchedulerStatus::Active;
325 }
326 }
327 })
328 .unwrap();
329 Scheduler {
330 shutdown: shutdown_clone,
331 thread_handle,
332 schedule_queue: schedule_queue_clone,
333 cancel_queue: cancel_queue_clone,
334 name: name_clone,
335 status: status_clone,
336 entry_count: entry_count_clone,
337 }
338 }
339}
340
341impl Drop for Scheduler {
342 fn drop(&mut self) {
343 self.shutdown.store(true, Ordering::SeqCst);
344 self.thread_handle.thread().unpark();
345 }
346}