Skip to main content

ts_time/
lib.rs

1#![doc = include_str!("../README.md")]
2#![forbid(unsafe_code)]
3
4pub use std::time::{Duration, Instant};
5use std::{
6    cmp::min,
7    sync::{Arc, Mutex, Weak},
8};
9
10/// A range of time between two [`Instant`]s.
11#[derive(Copy, Clone, Debug, Eq, PartialEq)]
12pub struct TimeRange {
13    start: Instant,
14    end: Instant,
15}
16
17impl TimeRange {
18    /// Return a time range spanning from `start` to `end` inclusive.
19    ///
20    /// # Panics
21    ///
22    /// If `start > end`.
23    pub fn new(start: Instant, end: Instant) -> Self {
24        assert!(start <= end);
25        Self { start, end }
26    }
27
28    /// Return a time range centered on `t`, with `plus_minus` time on either side.
29    ///
30    /// If `t` can't add or subtract `plus_minus`, the respective end of the range is
31    /// clamped to `t` instead.
32    pub fn new_around(t: Instant, plus_minus: Duration) -> Self {
33        Self::new(
34            t.checked_sub(plus_minus).unwrap_or(t),
35            t.checked_add(plus_minus).unwrap_or(t),
36        )
37    }
38
39    /// The first [`Instant`] that the interval covers.
40    pub fn start(&self) -> Instant {
41        self.start
42    }
43
44    /// The last [`Instant`] that the interval covers.
45    pub fn end(&self) -> Instant {
46        self.end
47    }
48
49    /// Reports whether the time range contains `t`.
50    ///
51    /// A time range contains `t` if `self.start() <= t <= self.end()`.
52    pub fn contains(&self, t: Instant) -> bool {
53        t >= self.start && t <= self.end
54    }
55}
56
57impl From<TimeRange> for Duration {
58    fn from(t: TimeRange) -> Duration {
59        t.end - t.start
60    }
61}
62
63#[derive(Debug)]
64struct FutureEvent<E> {
65    when: TimeRange,
66    what: E,
67}
68
69/// A scheduler for future events.
70///
71/// The scheduler does not dispatch events itself, rather it provides the facilities needed for the
72/// caller to efficiently dispatch events in as few wakeups as possible.
73#[derive(Debug)]
74pub struct Scheduler<E> {
75    // Currently scheduled timers, sorted by descending start of their time range.
76    //
77    // The ordering is the reverse of the intuitive one so that dispatching of events can be
78    // implemented by truncating the Vec's tail.
79    //
80    // Invariant: each FutureEvent is referenced from a few places only: one Arc in this Vec,
81    // one Weak in the Handle for the event, and a temporary upgraded Arc during the execution of
82    // Handle's methods. This invariant is relied upon by SchedulerInner, which accounts for all
83    // these potential references prior to unwrapping Arc::get_mut and Arc::into_inner.
84    // Additional rogue references would invalidate this accounting and cause runtime panics.
85    events: Arc<Mutex<Vec<Arc<FutureEvent<E>>>>>,
86}
87
88impl<E> Default for Scheduler<E> {
89    fn default() -> Self {
90        Self {
91            events: Default::default(),
92        }
93    }
94}
95
96impl<E> Scheduler<E> {
97    /// Returns the index of the first element of events whose start time is less than or
98    /// equal to `t`, or events.len() if no such element exists.
99    ///
100    /// `events` must be sorted by descending event start time.
101    fn partition_point(events: &[Arc<FutureEvent<E>>], t: Instant) -> usize {
102        events.partition_point(|e| e.when.start > t)
103    }
104
105    fn find(events: &[Arc<FutureEvent<E>>], event: &Arc<FutureEvent<E>>) -> Option<usize> {
106        let idx = Scheduler::partition_point(events, event.when.start);
107        for (i, other) in events[idx..].iter().enumerate() {
108            if other.when.start != event.when.start {
109                return None;
110            }
111            if Arc::ptr_eq(event, other) {
112                return Some(idx + i);
113            }
114        }
115        None
116    }
117
118    /// Schedule an event to occur at a future point in time.
119    ///
120    /// Returns a [`Handle`] which may be used to cancel or reschedule the event. The caller need
121    /// not retain the Handle if cancellation and rescheduling are not required.
122    pub fn add(&mut self, when: TimeRange, what: E) -> Handle<E> {
123        let event = Arc::new(FutureEvent { when, what });
124        let weak_event = Arc::downgrade(&event);
125        let mut events = self.events.lock().unwrap();
126        let idx = Scheduler::partition_point(&events, when.start);
127        events.insert(idx, event);
128        Handle {
129            events: Arc::downgrade(&self.events),
130            event: weak_event,
131        }
132    }
133
134    /// Cancel all pending events, leaving the scheduler idle.
135    pub fn clear(&mut self) {
136        self.events.lock().unwrap().clear();
137    }
138
139    /// Removes events that are due to happen at or before `now` from the scheduler's queue,
140    /// returning an iterator over the removed events.
141    ///
142    /// If the iterator is dropped before being fully consumed, it drops the remaining removed
143    /// events.
144    pub fn dispatch(&mut self, now: Instant) -> impl Iterator<Item = E> + use<E> {
145        let mut events = self.events.lock().unwrap();
146        let idx = Scheduler::partition_point(&events, now);
147        let to_dispatch = events.split_off(idx);
148
149        // Invariant: at most 3 refs to the event exist (see doc on SchedulerInner struct).
150        // We haven't upgraded the Handle's Weak, so that Arc doesn't exist. The iterator owns the
151        // Arc that was formerly in self.events, and into_inner is not blocked by the existence of
152        // the Handle's Weak. Thus, into_inner always succeeds.
153        to_dispatch
154            .into_iter()
155            .rev()
156            .map(|e| Arc::into_inner(e).unwrap().what)
157    }
158
159    /// Returns the next time range in which [`Scheduler::dispatch`] should next be called to
160    /// dispatch events.
161    ///
162    /// [`Scheduler::dispatch`] should be called at some point in the returned [`TimeRange`] to
163    /// dispatch events on time.
164    ///
165    /// Calling `dispatch` closer to the end of the range is more efficient and results in more
166    /// events being available. Calling `dispatch` before the returned range is inefficient
167    /// but otherwise harmless.
168    ///
169    /// The returned range may lie entirely in the past, if overdue events exists. The caller is
170    /// expected to call [`Scheduler::dispatch`] as soon as possible in that case.
171    ///
172    /// This method is intended to be used to plumb this Scheduler's event dispatch into another
173    /// Scheduler.
174    pub fn next_dispatch_range(&self) -> Option<TimeRange> {
175        let events = self.events.lock().unwrap();
176        let start = events.last()?.when.start;
177        let mut end = events.last()?.when.end;
178
179        for e in events.iter().rev().skip(1) {
180            if e.when.start > end {
181                break;
182            }
183            end = min(end, e.when.end);
184        }
185
186        Some(TimeRange::new(start, end))
187    }
188
189    /// Returns the next time at which [`Scheduler::dispatch`] should next be called to dispatch
190    /// events.
191    ///
192    /// This is the same as [`Scheduler::next_dispatch_range`], but only returns the [`Instant`]
193    /// corresponding to the end of the feasible time range.
194    ///
195    /// Calling `dispatch` before the returned time is inefficient but otherwise harmless.
196    ///
197    /// The returned time may be in the past, if overdue events exists. The caller is
198    /// expected to call [`Scheduler::dispatch`] as soon as possible in that case.
199    ///
200    /// This method is intended to be used to plumb this Scheduler into an external timer facility
201    /// (e.g. `std::thread::sleep`, `tokio::time::sleep`), to trigger event dispatching at the
202    /// appropriate time.
203    pub fn next_dispatch(&self) -> Option<Instant> {
204        Some(self.next_dispatch_range()?.end)
205    }
206
207    /// Assert that the scheduler's internal state is consistent.
208    ///
209    /// # Panics
210    ///
211    /// If the scheduler's internal invariants are violated.
212    #[cfg(test)]
213    fn assert_consistent(&self) {
214        assert!(
215            self.events
216                .lock()
217                .unwrap()
218                .is_sorted_by(|a, b| a.when.start >= b.when.start)
219        );
220    }
221}
222
223/// A handle for a scheduled future event, allowing the holder to reschedule or cancel the event.
224///
225/// The handle may outlive the event it relates to. Calling methods on such a lapsed Handle is safe.
226///
227/// Handles that aren't needed for cancellation or rescheduling can be dropped without impacting
228/// the related event.
229pub struct Handle<E> {
230    events: Weak<Mutex<Vec<Arc<FutureEvent<E>>>>>,
231    event: Weak<FutureEvent<E>>,
232}
233
234impl<E> Handle<E> {
235    /// Attempts to cancel the event.
236    ///
237    /// If the event hasn't yet occurred when cancel is called, it is canceled and will not be
238    /// returned by [`Scheduler::dispatch`]. Cancelling an event that has already been dispatched
239    /// is a no-op.
240    pub fn cancel(self) {
241        let Some(events) = self.events.upgrade() else {
242            return;
243        };
244        let Some(event) = self.event.upgrade() else {
245            return;
246        };
247        let mut events = events.lock().unwrap();
248        let Some(idx) = Scheduler::find(&events, &event) else {
249            return;
250        };
251        events.remove(idx);
252    }
253
254    /// Attempts to reschedule the event to a new time range.
255    ///
256    /// Returns an updated Handle if rescheduling succeeds, or None if the event has already
257    /// been dispatched.
258    pub fn reschedule(self, when: TimeRange) -> Option<Handle<E>> {
259        let events = self.events.upgrade()?;
260        let mut events = events.lock().unwrap();
261        let mut event = self.event.upgrade()?;
262        drop(self.event);
263        let idx = Scheduler::find(&events, &event)?;
264        drop(events.remove(idx));
265        // Invariant: At most 3 refs to the event exist (see doc on SchedulerInner struct).
266        // We dropped the Handle's Weak and events's Arc above, leaving `event` as the sole Arc
267        // for this event. Thus, get_mut always succeeds.
268        Arc::get_mut(&mut event).unwrap().when = when;
269        let weak = Arc::downgrade(&event);
270
271        let idx = Scheduler::partition_point(&events, when.start);
272        events.insert(idx, event);
273        drop(events);
274
275        Some(Handle {
276            events: self.events,
277            event: weak,
278        })
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use std::{collections::HashSet, fmt::Debug, hash::Hash};
285
286    use super::*;
287
288    #[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)]
289    enum Event {
290        Foo,
291        Bar(usize),
292    }
293
294    fn check_next<E: Debug + Eq + Hash>(
295        sched: &mut Scheduler<E>,
296        want_next: TimeRange,
297        want_events: Vec<E>,
298    ) {
299        // Pending events are only sorted according to their range's start time, so when multiple
300        // events have the same start time the exact order in which they're returned is dependent
301        // on insertion order. We don't care about the relative ordering of such events, so just
302        // toss everything into a set and check that the right set of events was dispatched.
303        let want_events: HashSet<E> = HashSet::from_iter(want_events);
304
305        let next = sched.next_dispatch_range();
306        assert_eq!(next, Some(want_next));
307        let next = sched.next_dispatch();
308        assert_eq!(next, Some(want_next.end));
309        let next = next.unwrap();
310
311        let events = sched.dispatch(next).collect::<HashSet<E>>();
312        assert_eq!(events, want_events);
313    }
314
315    fn check_empty<E>(sched: &mut Scheduler<E>) {
316        assert_eq!(sched.next_dispatch(), None);
317        assert_eq!(sched.next_dispatch_range(), None);
318    }
319
320    #[test]
321    fn test_basic() {
322        let datum = Instant::now();
323        let mut sched = Scheduler::default();
324        sched.add(TimeRange::new(datum, datum), Event::Foo);
325        check_next(&mut sched, TimeRange::new(datum, datum), vec![Event::Foo]);
326        check_empty(&mut sched);
327    }
328
329    #[test]
330    fn test_many() {
331        let datum = Instant::now();
332        let mut sched = Scheduler::default();
333        let ranges: Vec<(u64, u64)> = vec![(1, 9), (2, 8), (3, 7), (4, 6), (5, 5), (2, 4)];
334        // Event ranges:
335        //
336        // <-------> (1,9)
337        //  <----->  (2,8)
338        //   <--->   (3,7)
339        //    <->    (4,6)
340        //     x     (5,5)
341        //  <->      (2,4)
342        //
343        for (i, (start, end)) in ranges.iter().enumerate() {
344            let start = datum + Duration::from_secs(*start);
345            let end = datum + Duration::from_secs(*end);
346            let range = TimeRange::new(start, end);
347            sched.add(range, Event::Bar(i));
348        }
349        // First wakeup at 4, all events except (5,5).
350        check_next(
351            &mut sched,
352            TimeRange::new(
353                datum + Duration::from_secs(1),
354                datum + Duration::from_secs(4),
355            ),
356            vec![
357                Event::Bar(0),
358                Event::Bar(1),
359                Event::Bar(2),
360                Event::Bar(3),
361                Event::Bar(5),
362            ],
363        );
364        // Final dispatch at 5, only (5,5).
365        check_next(
366            &mut sched,
367            TimeRange::new(
368                datum + Duration::from_secs(5),
369                datum + Duration::from_secs(5),
370            ),
371            vec![Event::Bar(4)],
372        );
373        check_empty(&mut sched);
374    }
375}
376
377#[cfg(test)]
378mod proptests {
379    use std::{cmp::max, fmt::Debug, sync::LazyLock};
380
381    use proptest::{collection::vec, prelude::*};
382
383    use super::*;
384
385    static DATUM: LazyLock<Instant> = LazyLock::new(Instant::now);
386
387    prop_compose! {
388        fn arb_timerange()(start in 1..u16::MAX, duration in any::<u16>()) -> (Duration, Duration) {
389            let start = Duration::from_millis(start as u64);
390            let end = start+Duration::from_millis(duration as u64);
391            (start, end)
392        }
393    }
394
395    #[derive(Copy, Clone, Debug)]
396    enum Action {
397        /// Run a dispatch cycle, checking invariants on produced events
398        Dispatch,
399        /// Schedule a new event for the given time range.
400        Add((Duration, Duration)),
401        /// Cancel a previously added event. The f64 must be in 0..1, and is rescaled to the total
402        /// number of scheduled events so far in the run, so will try to cancel a uniformly sampled
403        /// prior event (which may have already been canceled).
404        Cancel(f64),
405        /// Reschedule a previously added event. The f64 is rescaled as with Cancel.
406        Reschedule((f64, (Duration, Duration))),
407    }
408
409    /// Convert a random 0-1 float value into an index in the range 0..max.
410    ///
411    /// Used below to distribute cancellations and reschedules over previously created events.
412    fn sample(v: f64, max: usize) -> usize {
413        (max as f64 * v.clamp(0f64, 1f64)).floor() as usize
414    }
415
416    fn arb_scheduler_action() -> impl Strategy<Value = Action> {
417        prop_oneof![
418            Just(Action::Dispatch),
419            arb_timerange().prop_map(Action::Add),
420            (0f64..1f64).prop_map(Action::Cancel),
421            ((0f64..1f64), arb_timerange()).prop_map(Action::Reschedule),
422        ]
423    }
424
425    struct Event {
426        id: usize,
427        range: Mutex<TimeRange>,
428    }
429
430    impl Event {
431        fn new(id: usize, start: Duration, end: Duration) -> Self {
432            let range = Mutex::new(TimeRange::new(*DATUM + start, *DATUM + end));
433            Self { id, range }
434        }
435
436        fn update(&mut self, start: Duration, end: Duration) {
437            let mut range = self.range.lock().unwrap();
438            *range = TimeRange::new(*DATUM + start, *DATUM + end);
439        }
440    }
441
442    impl Debug for Event {
443        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
444            let (start, end) = {
445                let range = self.range.lock().unwrap();
446                (range.start, range.end)
447            };
448            f.debug_struct("ReschedulableEvent")
449                .field("id", &self.id)
450                .field("start", &(start - *DATUM))
451                .field("end", &(end - *DATUM))
452                .finish()
453        }
454    }
455
456    proptest! {
457        #[test]
458        fn test_events(times in vec(arb_timerange(), 1..100)) {
459            let mut sched = Scheduler::default();
460            for (start, end) in &times {
461                let tr = TimeRange::new(*DATUM+*start, *DATUM+*end);
462                sched.add(tr, (start, end, tr));
463                sched.assert_consistent();
464            }
465
466            let mut total_seen = 0;
467            let mut last_time = *DATUM;
468            while let Some(next) = sched.next_dispatch() {
469                // Invariant: dispatch time always moves forward.
470                assert!(next > last_time, "next={:?}, last={:?}", next-*DATUM, last_time-*DATUM);
471                last_time = next;
472
473                for (start, end, tr) in sched.dispatch(next) {
474                    total_seen += 1;
475                    // Invariant: all events dispatch within their requested time range.
476                    // Note this is only true in this test because we schedule all events upfront,
477                    // all the events are scheduled for a future time.
478                    assert!(tr.contains(next), "range=({:?}, {:?}), next={:?}", start, end, next-*DATUM);
479                }
480                sched.assert_consistent();
481            }
482
483            // Invariant: the scheduler doesn't forget events.
484            assert_eq!(total_seen, times.len());
485        }
486    }
487
488    proptest! {
489        #![proptest_config(ProptestConfig::with_cases(1000))]
490        #[test]
491        fn test_event_handles(actions in vec(arb_scheduler_action(), 1..100)) {
492            let mut sched: Scheduler<usize> = Scheduler::default();
493            let mut events: Vec<Option<Event >> = Vec::new();
494            let mut handles: Vec<Option<Handle<usize>>> = Vec::new();
495            let mut now = *DATUM;
496            let mut total_scheduled = 0;
497            let mut total_canceled = 0;
498            let mut total_dispatched = 0;
499            println!("\nSTART, now=0s");
500            for action in actions {
501                sched.assert_consistent();
502                match action {
503                    Action::Dispatch => {
504                        let Some(next) = sched.next_dispatch() else {
505                            println!("Dispatch (no scheduled events)");
506                            continue;
507                        };
508                        // Due to reschedules, next may be in the past.
509                        now = max(next, now);
510                        println!("Dispatch, now={:?}", now-*DATUM);
511
512                        for idx in sched.dispatch(now) {
513                            if let Some(event) = events[idx].take() {
514                                println!("  {:?}", event);
515                                total_dispatched += 1;
516                                let tr = event.range.lock().unwrap();
517                                // Invariant: events dispatch within their requested time range, or
518                                // are being dispatched late in the case of events (re)scheduled
519                                // in the past.
520                                assert!(tr.contains(now) || now > tr.end());
521                            } else {
522                                panic!("dispatched canceled event {}", idx);
523                            }
524                        }
525                    }
526                    Action::Add((start, end)) => {
527                        let val = Event::new(events.len(), start, end);
528                        println!("Add {:?}", val);
529                        let tr = {
530                            let range = val.range.lock().unwrap();
531                            *range
532                        };
533                        let handle = sched.add(tr, val.id);
534                        events.push(Some(val));
535                        handles.push(Some(handle));
536                        total_scheduled += 1;
537                    }
538                    Action::Cancel(idx) => {
539                        if events.is_empty() {
540                            println!("Cancel() (no events yet)");
541                            continue;
542                        }
543                        let idx = sample(idx, events.len());
544                        if let Some(handle) = handles[idx].take() {
545                            println!("Cancel({})", idx);
546                            handle.cancel();
547                            events[idx] = None;
548                            total_canceled += 1;
549                        } else {
550                            println!("Cancel({}) (already canceled)", idx);
551                        };
552                    }
553                    Action::Reschedule((idx, (start, end))) => {
554                        if events.is_empty() {
555                            println!("Reschedule() (no events yet)");
556                            continue;
557                        }
558                        let idx = sample(idx, events.len());
559                        if let Some(event) = &mut events[idx] {
560                            event.update(start, end);
561                            let tr = {
562                                *event.range.lock().unwrap()
563                            };
564                            println!("Reschedule({}) event={:?}", idx, event);
565                            handles[idx] = handles[idx].take().and_then(|handle| handle.reschedule(tr));
566                        } else {
567                            println!("Reschedule({}) (no such event)", idx);
568                        }
569                    }
570                }
571            }
572            let total_pending: usize = events.iter().map(|x| if x.is_some() { 1 } else {0}).sum();
573            assert_eq!(total_scheduled, events.len());
574            assert!(total_dispatched <= total_scheduled);
575            assert!(total_canceled <= total_scheduled);
576            assert!(total_pending <= total_scheduled);
577            // Cancellations can cause double-counting, when cancelling an already dispatched event.
578            // So, best we can do is bracket the values.
579            let definitely_alive = total_pending+total_dispatched;
580            assert!(definitely_alive <= total_scheduled);
581            assert!(definitely_alive+total_canceled >= total_scheduled);
582        }
583    }
584}