mio_misc/
scheduler.rs

1//! Thread safe scheduler
2use crate::queue::Notifier;
3use crate::NotificationId;
4use crossbeam_queue::SegQueue;
5use std::cmp::Ordering as CmpOrdering;
6use std::collections::{BTreeSet, HashSet};
7use std::fmt::Debug;
8use std::hash::{Hash, Hasher};
9use std::ops::{Add, Sub};
10use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
11use std::sync::{Arc, RwLock};
12use std::thread::JoinHandle;
13use std::{
14    fmt, thread,
15    time::{Duration, Instant},
16};
17
18/// Schedules notification deliveries
19#[derive(Debug)]
20pub struct NotificationScheduler {
21    notifier: Arc<dyn Notifier>,
22    scheduler: Arc<Scheduler>,
23}
24
25impl NotificationScheduler {
26    /// Creates a new scheduler that uses the provided notifier to deliver notifications
27    pub fn new(notifier: Arc<dyn Notifier>, scheduler: Arc<Scheduler>) -> NotificationScheduler {
28        NotificationScheduler {
29            notifier,
30            scheduler,
31        }
32    }
33
34    /// Schedules recurring notification deliveries with fixed intervals
35    pub fn notify_with_fixed_interval<I: Into<Option<Duration>>>(
36        &self,
37        id: NotificationId,
38        interval: Duration,
39        initial_delay: I,
40        name: Option<String>,
41    ) -> ScheduleEntryId
42    where
43        I: Into<Option<Duration>>,
44    {
45        let notifier = Arc::clone(&self.notifier);
46        let entry = ScheduleEntry::with_interval(interval, initial_delay, name, move || {
47            let _ = notifier.notify(id);
48        });
49        let id = entry.id;
50        self.scheduler.schedule(entry);
51        id
52    }
53
54    /// Schedules a one-time notification delivery
55    pub fn notify_once_after_delay(
56        &self,
57        id: NotificationId,
58        delay: Duration,
59        name: Option<String>,
60    ) -> ScheduleEntryId {
61        let notifier = Arc::clone(&self.notifier);
62        let entry = ScheduleEntry::one_time(delay, name, move || {
63            let _ = notifier.notify(id);
64        });
65        let id = entry.id;
66        self.scheduler.schedule(entry);
67        id
68    }
69
70    /// Cancels future notification(s)
71    pub fn cancel(&self, id: ScheduleEntryId) {
72        self.scheduler.cancel(id);
73    }
74}
75
76type Callback = dyn Fn() + Send + Sync + 'static;
77
78/// Entry associated with callback
79#[derive(Clone)]
80pub struct ScheduleEntry {
81    start: Instant,
82    /// The interval with which to run the callback. No interval means only one-time run
83    interval: Option<Duration>,
84    callback: Arc<Callback>,
85    /// The assigned name of the entry for debugging purposes
86    pub name: Option<String>,
87    /// Entry Id
88    pub id: ScheduleEntryId,
89}
90
91impl ScheduleEntry {
92    /// Creates an entry to run the callback repeatedly with a fixed delay
93    pub fn with_interval<I, F>(
94        interval: Duration,
95        initial_delay: I,
96        name: Option<String>,
97        callback: F,
98    ) -> ScheduleEntry
99    where
100        I: Into<Option<Duration>>,
101        F: Fn() + Send + Sync + 'static,
102    {
103        let now = Instant::now();
104        ScheduleEntry {
105            start: initial_delay.into().map(|d| now.add(d)).unwrap_or(now),
106            interval: Some(interval),
107            callback: Arc::new(callback),
108            name,
109            id: ScheduleEntryId::gen_next(),
110        }
111    }
112
113    /// Creates an entry to run the callback only once after a given delay
114    pub fn one_time<F>(delay: Duration, name: Option<String>, callback: F) -> ScheduleEntry
115    where
116        F: Fn() + Send + Sync + 'static,
117    {
118        ScheduleEntry {
119            start: Instant::now().add(delay),
120            interval: None,
121            callback: Arc::new(callback),
122            name,
123            id: ScheduleEntryId::gen_next(),
124        }
125    }
126}
127
128impl fmt::Debug for ScheduleEntry {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        f.debug_struct("ScheduleEntry")
131            .field("start", &self.start)
132            .field("interval", &self.interval)
133            .field("name", &self.name)
134            .field("id", &self.id)
135            .finish()
136    }
137}
138
139impl Hash for ScheduleEntry {
140    fn hash<H: Hasher>(&self, hasher: &mut H) {
141        self.start.hash(hasher);
142        self.id.hash(hasher);
143    }
144}
145
146impl Eq for ScheduleEntry {}
147
148impl PartialEq for ScheduleEntry {
149    fn eq(&self, other: &Self) -> bool {
150        self.start == other.start && self.id == other.id
151    }
152}
153
154impl Ord for ScheduleEntry {
155    fn cmp(&self, other: &Self) -> CmpOrdering {
156        self.start.cmp(&other.start).then(self.id.cmp(&other.id))
157    }
158}
159
160impl PartialOrd for ScheduleEntry {
161    fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
162        Some(self.cmp(other))
163    }
164}
165
166static NEXT_SCHEDULE_ENTRY_ID: AtomicU32 = AtomicU32::new(1);
167
168/// Id associated with an entry
169#[derive(Copy, Clone, Debug, PartialEq, Ord, PartialOrd, Eq, Hash)]
170pub struct ScheduleEntryId(u32);
171
172impl ScheduleEntryId {
173    /// Generates next `ScheduleEntryId`, which is guaranteed to be unique
174    pub fn gen_next() -> ScheduleEntryId {
175        let id = NEXT_SCHEDULE_ENTRY_ID.fetch_add(1, Ordering::SeqCst);
176        ScheduleEntryId(id)
177    }
178
179    /// Returns id
180    pub fn id(&self) -> u32 {
181        self.0
182    }
183}
184
185/// Scheduler Status
186#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
187pub enum SchedulerStatus {
188    /// Currently executing an entry
189    Active,
190    /// Waiting for new entries to be scheduled
191    Parked,
192    /// Waiting to execute entries in the queue at the scheduled intervals
193    ParkedTimeout,
194}
195
196/// Single-threaded scheduler that prioritizes "cancels" over schedule executions, hence multiple queues
197#[derive(Debug)]
198pub struct Scheduler {
199    shutdown: Arc<AtomicBool>,
200    thread_handle: JoinHandle<()>,
201    schedule_queue: Arc<SegQueue<ScheduleEntry>>,
202    cancel_queue: Arc<SegQueue<ScheduleEntryId>>,
203    name: String,
204    status: Arc<RwLock<SchedulerStatus>>,
205    entry_count: Arc<AtomicU32>,
206}
207
208impl Default for Scheduler {
209    fn default() -> Scheduler {
210        Scheduler::new(None)
211    }
212}
213
214// Helps distinguish different scheduler creations
215static SCHEDULER_THREAD_ID: AtomicU32 = AtomicU32::new(1);
216
217impl Scheduler {
218    /// Returns the name of the scheduler
219    pub fn name(&self) -> &str {
220        self.name.as_str()
221    }
222
223    /// Schedules entry for execution(s)
224    pub fn schedule(&self, entry: ScheduleEntry) {
225        self.schedule_queue.push(entry);
226        self.thread_handle.thread().unpark();
227    }
228
229    /// Cancels future execution(s)
230    pub fn cancel(&self, id: ScheduleEntryId) {
231        self.cancel_queue.push(id);
232        self.thread_handle.thread().unpark();
233    }
234
235    /// Returns the scheduler's current status
236    pub fn status(&self) -> SchedulerStatus {
237        *(self.status.read().unwrap())
238    }
239
240    /// Number of current entries
241    pub fn entry_count(&self) -> u32 {
242        self.entry_count.load(Ordering::SeqCst)
243    }
244
245    /// Creates a scheduler
246    pub fn new(name: Option<String>) -> Scheduler {
247        let t_id = SCHEDULER_THREAD_ID.fetch_add(1, Ordering::SeqCst);
248        let name_prefix = "mio-misc-scheduler";
249        let name = name
250            .map(|n| format!("{}-{}-{}", name_prefix, n, t_id))
251            .unwrap_or_else(|| format!("{}-{}", name_prefix, t_id));
252        let name_clone = name.clone();
253
254        let shut_down = Arc::new(AtomicBool::new(false));
255        let shutdown_clone = Arc::clone(&shut_down);
256        let entry_count = Arc::new(AtomicU32::new(0));
257        let entry_count_clone = Arc::clone(&entry_count);
258        let schedule_queue = Arc::new(SegQueue::new());
259        let schedule_queue_clone = Arc::clone(&schedule_queue);
260        let cancel_queue = Arc::new(SegQueue::new());
261        let cancel_queue_clone = Arc::clone(&cancel_queue);
262        let status = Arc::new(RwLock::new(SchedulerStatus::Active));
263        let status_clone = Arc::clone(&status);
264        let thread_handle = thread::Builder::new()
265            .name(name.clone())
266            .spawn(move || {
267                let mut entries: BTreeSet<ScheduleEntry> = BTreeSet::new();
268                let mut entries_to_cancel: HashSet<ScheduleEntryId> = HashSet::new();
269                while !shut_down.load(Ordering::SeqCst) {
270                    // cancel requests take precedence
271                    while let Some(entry_id) = cancel_queue.pop() {
272                        trace!(
273                            "{}: cancelling scheduler entry with id {:?};",
274                            name,
275                            entry_id
276                        );
277                        let _ = entries_to_cancel.insert(entry_id);
278                    }
279                    if let Some(entry) = schedule_queue.pop() {
280                        trace!("{}: scheduling entry; {:?};", name, entry);
281                        if entries.insert(entry) {
282                            entry_count.fetch_add(1, Ordering::SeqCst);
283                        }
284                    }
285                    if let Some(entry) = entries.iter().cloned().next() {
286                        let now = Instant::now();
287                        // time to execute a callback ?
288                        if now.ge(&entry.start) {
289                            entries.remove(&entry);
290                            // entry still relevant ?
291                            if !entries_to_cancel.contains(&entry.id) {
292                                trace!("{}: executing scheduler entry; {:?}", name, entry);
293                                let cb = Arc::clone(&entry.callback);
294                                cb();
295                                if let Some(interval) = entry.interval {
296                                    // add back
297                                    let updated_entry = ScheduleEntry {
298                                        start: Instant::now().add(interval),
299                                        interval: entry.interval,
300                                        callback: entry.callback,
301                                        name: entry.name,
302                                        id: entry.id,
303                                    };
304                                    entries.insert(updated_entry);
305                                }
306                            } else {
307                                // not executing and not scheduling a new entry
308                                trace!("{}: cancelling scheduler entry; {:?}", name, entry);
309
310                                if entries_to_cancel.remove(&entry.id) {
311                                    entry_count.fetch_sub(1, Ordering::SeqCst);
312                                }
313                            }
314                        } else {
315                            // park until the nearest time when we need to execute a function
316                            let timeout_dur = entry.start.sub(now);
317                            trace!("{}: parking scheduler for {:?}", name, timeout_dur);
318                            *status.write().unwrap() = SchedulerStatus::ParkedTimeout;
319                            thread::park_timeout(timeout_dur);
320                            *status.write().unwrap() = SchedulerStatus::Active;
321                        }
322                    } else {
323                        // there's no function to execute, so park indefinitely instead of spinning idly
324                        trace!("{}: parking scheduler until being un-parked", name);
325                        *status.write().unwrap() = SchedulerStatus::Parked;
326                        thread::park();
327                        *status.write().unwrap() = SchedulerStatus::Active;
328                    }
329                }
330            })
331            .unwrap();
332        Scheduler {
333            shutdown: shutdown_clone,
334            thread_handle,
335            schedule_queue: schedule_queue_clone,
336            cancel_queue: cancel_queue_clone,
337            name: name_clone,
338            status: status_clone,
339            entry_count: entry_count_clone,
340        }
341    }
342}
343
344impl Drop for Scheduler {
345    fn drop(&mut self) {
346        self.shutdown.store(true, Ordering::SeqCst);
347        self.thread_handle.thread().unpark();
348    }
349}