mio_misc/
scheduler.rs

1//! Thread safe scheduler
2use crate::queue::Notifier;
3use crate::NotificationId;
4use crossbeam_queue::SegQueue;
5use std::cmp::Ordering as CmpOrdering;
6use std::collections::{BTreeSet, HashSet};
7use std::fmt::Debug;
8use std::hash::{Hash, Hasher};
9use std::ops::{Add, Sub};
10use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
11use std::sync::{Arc, RwLock};
12use std::thread::JoinHandle;
13use std::{
14    fmt, thread,
15    time::{Duration, Instant},
16};
17
18/// Schedules notification deliveries
19#[derive(Debug)]
20pub struct NotificationScheduler {
21    notifier: Arc<dyn Notifier>,
22    scheduler: Arc<Scheduler>,
23}
24
25impl NotificationScheduler {
26    /// Creates a new scheduler that uses the provided notifier to deliver notifications
27    pub fn new(notifier: Arc<dyn Notifier>, scheduler: Arc<Scheduler>) -> NotificationScheduler {
28        NotificationScheduler {
29            notifier,
30            scheduler,
31        }
32    }
33
34    /// Schedules recurring notification deliveries with fixed intervals
35    pub fn notify_with_fixed_interval<I: Into<Option<Duration>>>(
36        &self,
37        id: NotificationId,
38        interval: Duration,
39        initial_delay: I,
40        name: Option<String>,
41    ) -> ScheduleEntryId {
42        let notifier = Arc::clone(&self.notifier);
43        let entry = ScheduleEntry::with_interval(interval, initial_delay, name, move || {
44            let _ = notifier.notify(id);
45        });
46        let id = entry.id;
47        self.scheduler.schedule(entry);
48        id
49    }
50
51    /// Schedules a one-time notification delivery
52    pub fn notify_once_after_delay(
53        &self,
54        id: NotificationId,
55        delay: Duration,
56        name: Option<String>,
57    ) -> ScheduleEntryId {
58        let notifier = Arc::clone(&self.notifier);
59        let entry = ScheduleEntry::one_time(delay, name, move || {
60            let _ = notifier.notify(id);
61        });
62        let id = entry.id;
63        self.scheduler.schedule(entry);
64        id
65    }
66
67    /// Cancels future notification(s)
68    pub fn cancel(&self, id: ScheduleEntryId) {
69        self.scheduler.cancel(id);
70    }
71}
72
73type Callback = dyn Fn() + Send + Sync + 'static;
74
75/// Entry associated with callback
76#[derive(Clone)]
77pub struct ScheduleEntry {
78    start: Instant,
79    /// The interval with which to run the callback. No interval means only one-time run
80    interval: Option<Duration>,
81    callback: Arc<Callback>,
82    /// The assigned name of the entry for debugging purposes
83    pub name: Option<String>,
84    /// Entry Id
85    pub id: ScheduleEntryId,
86}
87
88impl ScheduleEntry {
89    /// Creates an entry to run the callback repeatedly with a fixed delay
90    pub fn with_interval<I, F>(
91        interval: Duration,
92        initial_delay: I,
93        name: Option<String>,
94        callback: F,
95    ) -> ScheduleEntry
96    where
97        I: Into<Option<Duration>>,
98        F: Fn() + Send + Sync + 'static,
99    {
100        let now = Instant::now();
101        ScheduleEntry {
102            start: initial_delay.into().map(|d| now.add(d)).unwrap_or(now),
103            interval: Some(interval),
104            callback: Arc::new(callback),
105            name,
106            id: ScheduleEntryId::gen_next(),
107        }
108    }
109
110    /// Creates an entry to run the callback only once after a given delay
111    pub fn one_time<F>(delay: Duration, name: Option<String>, callback: F) -> ScheduleEntry
112    where
113        F: Fn() + Send + Sync + 'static,
114    {
115        ScheduleEntry {
116            start: Instant::now().add(delay),
117            interval: None,
118            callback: Arc::new(callback),
119            name,
120            id: ScheduleEntryId::gen_next(),
121        }
122    }
123}
124
125impl fmt::Debug for ScheduleEntry {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        f.debug_struct("ScheduleEntry")
128            .field("start", &self.start)
129            .field("interval", &self.interval)
130            .field("name", &self.name)
131            .field("id", &self.id)
132            .finish()
133    }
134}
135
136impl Hash for ScheduleEntry {
137    fn hash<H: Hasher>(&self, hasher: &mut H) {
138        self.start.hash(hasher);
139        self.id.hash(hasher);
140    }
141}
142
143impl Eq for ScheduleEntry {}
144
145impl PartialEq for ScheduleEntry {
146    fn eq(&self, other: &Self) -> bool {
147        self.start == other.start && self.id == other.id
148    }
149}
150
151impl Ord for ScheduleEntry {
152    fn cmp(&self, other: &Self) -> CmpOrdering {
153        self.start.cmp(&other.start).then(self.id.cmp(&other.id))
154    }
155}
156
157impl PartialOrd for ScheduleEntry {
158    fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
159        Some(self.cmp(other))
160    }
161}
162
163static NEXT_SCHEDULE_ENTRY_ID: AtomicU32 = AtomicU32::new(1);
164
165/// Id associated with an entry
166#[derive(Copy, Clone, Debug, PartialEq, Ord, PartialOrd, Eq, Hash)]
167pub struct ScheduleEntryId(u32);
168
169impl ScheduleEntryId {
170    /// Generates next `ScheduleEntryId`, which is guaranteed to be unique
171    pub fn gen_next() -> ScheduleEntryId {
172        let id = NEXT_SCHEDULE_ENTRY_ID.fetch_add(1, Ordering::SeqCst);
173        ScheduleEntryId(id)
174    }
175
176    /// Returns id
177    pub fn id(&self) -> u32 {
178        self.0
179    }
180}
181
182/// Scheduler Status
183#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
184pub enum SchedulerStatus {
185    /// Currently executing an entry
186    Active,
187    /// Waiting for new entries to be scheduled
188    Parked,
189    /// Waiting to execute entries in the queue at the scheduled intervals
190    ParkedTimeout,
191}
192
193/// Single-threaded scheduler that prioritizes "cancels" over schedule executions, hence multiple queues
194#[derive(Debug)]
195pub struct Scheduler {
196    shutdown: Arc<AtomicBool>,
197    thread_handle: JoinHandle<()>,
198    schedule_queue: Arc<SegQueue<ScheduleEntry>>,
199    cancel_queue: Arc<SegQueue<ScheduleEntryId>>,
200    name: String,
201    status: Arc<RwLock<SchedulerStatus>>,
202    entry_count: Arc<AtomicU32>,
203}
204
205impl Default for Scheduler {
206    fn default() -> Scheduler {
207        Scheduler::new(None)
208    }
209}
210
211// Helps distinguish different scheduler creations
212static SCHEDULER_THREAD_ID: AtomicU32 = AtomicU32::new(1);
213
214impl Scheduler {
215    /// Returns the name of the scheduler
216    pub fn name(&self) -> &str {
217        self.name.as_str()
218    }
219
220    /// Schedules entry for execution(s)
221    pub fn schedule(&self, entry: ScheduleEntry) {
222        self.schedule_queue.push(entry);
223        self.thread_handle.thread().unpark();
224    }
225
226    /// Cancels future execution(s)
227    pub fn cancel(&self, id: ScheduleEntryId) {
228        self.cancel_queue.push(id);
229        self.thread_handle.thread().unpark();
230    }
231
232    /// Returns the scheduler's current status
233    pub fn status(&self) -> SchedulerStatus {
234        *(self.status.read().unwrap())
235    }
236
237    /// Number of current entries
238    pub fn entry_count(&self) -> u32 {
239        self.entry_count.load(Ordering::SeqCst)
240    }
241
242    /// Creates a scheduler
243    pub fn new(name: Option<String>) -> Scheduler {
244        let t_id = SCHEDULER_THREAD_ID.fetch_add(1, Ordering::SeqCst);
245        let name_prefix = "mio-misc-scheduler";
246        let name = name
247            .map(|n| format!("{}-{}-{}", name_prefix, n, t_id))
248            .unwrap_or_else(|| format!("{}-{}", name_prefix, t_id));
249        let name_clone = name.clone();
250
251        let shut_down = Arc::new(AtomicBool::new(false));
252        let shutdown_clone = Arc::clone(&shut_down);
253        let entry_count = Arc::new(AtomicU32::new(0));
254        let entry_count_clone = Arc::clone(&entry_count);
255        let schedule_queue = Arc::new(SegQueue::new());
256        let schedule_queue_clone = Arc::clone(&schedule_queue);
257        let cancel_queue = Arc::new(SegQueue::new());
258        let cancel_queue_clone = Arc::clone(&cancel_queue);
259        let status = Arc::new(RwLock::new(SchedulerStatus::Active));
260        let status_clone = Arc::clone(&status);
261        let thread_handle = thread::Builder::new()
262            .name(name.clone())
263            .spawn(move || {
264                let mut entries: BTreeSet<ScheduleEntry> = BTreeSet::new();
265                let mut entries_to_cancel: HashSet<ScheduleEntryId> = HashSet::new();
266                while !shut_down.load(Ordering::SeqCst) {
267                    // cancel requests take precedence
268                    while let Some(entry_id) = cancel_queue.pop() {
269                        trace!(
270                            "{}: cancelling scheduler entry with id {:?};",
271                            name,
272                            entry_id
273                        );
274                        let _ = entries_to_cancel.insert(entry_id);
275                    }
276                    if let Some(entry) = schedule_queue.pop() {
277                        trace!("{}: scheduling entry; {:?};", name, entry);
278                        if entries.insert(entry) {
279                            entry_count.fetch_add(1, Ordering::SeqCst);
280                        }
281                    }
282                    if let Some(entry) = entries.iter().next().cloned() {
283                        let now = Instant::now();
284                        // time to execute a callback?
285                        if now.ge(&entry.start) {
286                            entries.remove(&entry);
287                            // entry still relevant ?
288                            if !entries_to_cancel.contains(&entry.id) {
289                                trace!("{}: executing scheduler entry; {:?}", name, entry);
290                                let cb = Arc::clone(&entry.callback);
291                                cb();
292                                if let Some(interval) = entry.interval {
293                                    // add back
294                                    let updated_entry = ScheduleEntry {
295                                        start: Instant::now().add(interval),
296                                        interval: entry.interval,
297                                        callback: entry.callback,
298                                        name: entry.name,
299                                        id: entry.id,
300                                    };
301                                    entries.insert(updated_entry);
302                                }
303                            } else {
304                                // not executing and not scheduling a new entry
305                                trace!("{}: cancelling scheduler entry; {:?}", name, entry);
306
307                                if entries_to_cancel.remove(&entry.id) {
308                                    entry_count.fetch_sub(1, Ordering::SeqCst);
309                                }
310                            }
311                        } else {
312                            // park until the nearest time when we need to execute a function
313                            let timeout_dur = entry.start.sub(now);
314                            trace!("{}: parking scheduler for {:?}", name, timeout_dur);
315                            *status.write().unwrap() = SchedulerStatus::ParkedTimeout;
316                            thread::park_timeout(timeout_dur);
317                            *status.write().unwrap() = SchedulerStatus::Active;
318                        }
319                    } else {
320                        // there's no function to execute, so park indefinitely instead of spinning idly
321                        trace!("{}: parking scheduler until being un-parked", name);
322                        *status.write().unwrap() = SchedulerStatus::Parked;
323                        thread::park();
324                        *status.write().unwrap() = SchedulerStatus::Active;
325                    }
326                }
327            })
328            .unwrap();
329        Scheduler {
330            shutdown: shutdown_clone,
331            thread_handle,
332            schedule_queue: schedule_queue_clone,
333            cancel_queue: cancel_queue_clone,
334            name: name_clone,
335            status: status_clone,
336            entry_count: entry_count_clone,
337        }
338    }
339}
340
341impl Drop for Scheduler {
342    fn drop(&mut self) {
343        self.shutdown.store(true, Ordering::SeqCst);
344        self.thread_handle.thread().unpark();
345    }
346}