ts_time/lib.rs
1#![doc = include_str!("../README.md")]
2#![forbid(unsafe_code)]
3
4pub use std::time::{Duration, Instant};
5use std::{
6 cmp::min,
7 sync::{Arc, Mutex, Weak},
8};
9
10/// A range of time between two [`Instant`]s.
11#[derive(Copy, Clone, Debug, Eq, PartialEq)]
12pub struct TimeRange {
13 start: Instant,
14 end: Instant,
15}
16
17impl TimeRange {
18 /// Return a time range spanning from `start` to `end` inclusive.
19 ///
20 /// # Panics
21 ///
22 /// If `start > end`.
23 pub fn new(start: Instant, end: Instant) -> Self {
24 assert!(start <= end);
25 Self { start, end }
26 }
27
28 /// Return a time range centered on `t`, with `plus_minus` time on either side.
29 ///
30 /// If `t` can't add or subtract `plus_minus`, the respective end of the range is
31 /// clamped to `t` instead.
32 pub fn new_around(t: Instant, plus_minus: Duration) -> Self {
33 Self::new(
34 t.checked_sub(plus_minus).unwrap_or(t),
35 t.checked_add(plus_minus).unwrap_or(t),
36 )
37 }
38
39 /// The first [`Instant`] that the interval covers.
40 pub fn start(&self) -> Instant {
41 self.start
42 }
43
44 /// The last [`Instant`] that the interval covers.
45 pub fn end(&self) -> Instant {
46 self.end
47 }
48
49 /// Reports whether the time range contains `t`.
50 ///
51 /// A time range contains `t` if `self.start() <= t <= self.end()`.
52 pub fn contains(&self, t: Instant) -> bool {
53 t >= self.start && t <= self.end
54 }
55}
56
57impl From<TimeRange> for Duration {
58 fn from(t: TimeRange) -> Duration {
59 t.end - t.start
60 }
61}
62
63#[derive(Debug)]
64struct FutureEvent<E> {
65 when: TimeRange,
66 what: E,
67}
68
69/// A scheduler for future events.
70///
71/// The scheduler does not dispatch events itself, rather it provides the facilities needed for the
72/// caller to efficiently dispatch events in as few wakeups as possible.
73#[derive(Debug)]
74pub struct Scheduler<E> {
75 // Currently scheduled timers, sorted by descending start of their time range.
76 //
77 // The ordering is the reverse of the intuitive one so that dispatching of events can be
78 // implemented by truncating the Vec's tail.
79 //
80 // Invariant: each FutureEvent is referenced from a few places only: one Arc in this Vec,
81 // one Weak in the Handle for the event, and a temporary upgraded Arc during the execution of
82 // Handle's methods. This invariant is relied upon by SchedulerInner, which accounts for all
83 // these potential references prior to unwrapping Arc::get_mut and Arc::into_inner.
84 // Additional rogue references would invalidate this accounting and cause runtime panics.
85 events: Arc<Mutex<Vec<Arc<FutureEvent<E>>>>>,
86}
87
88impl<E> Default for Scheduler<E> {
89 fn default() -> Self {
90 Self {
91 events: Default::default(),
92 }
93 }
94}
95
96impl<E> Scheduler<E> {
97 /// Returns the index of the first element of events whose start time is less than or
98 /// equal to `t`, or events.len() if no such element exists.
99 ///
100 /// `events` must be sorted by descending event start time.
101 fn partition_point(events: &[Arc<FutureEvent<E>>], t: Instant) -> usize {
102 events.partition_point(|e| e.when.start > t)
103 }
104
105 fn find(events: &[Arc<FutureEvent<E>>], event: &Arc<FutureEvent<E>>) -> Option<usize> {
106 let idx = Scheduler::partition_point(events, event.when.start);
107 for (i, other) in events[idx..].iter().enumerate() {
108 if other.when.start != event.when.start {
109 return None;
110 }
111 if Arc::ptr_eq(event, other) {
112 return Some(idx + i);
113 }
114 }
115 None
116 }
117
118 /// Schedule an event to occur at a future point in time.
119 ///
120 /// Returns a [`Handle`] which may be used to cancel or reschedule the event. The caller need
121 /// not retain the Handle if cancellation and rescheduling are not required.
122 pub fn add(&mut self, when: TimeRange, what: E) -> Handle<E> {
123 let event = Arc::new(FutureEvent { when, what });
124 let weak_event = Arc::downgrade(&event);
125 let mut events = self.events.lock().unwrap();
126 let idx = Scheduler::partition_point(&events, when.start);
127 events.insert(idx, event);
128 Handle {
129 events: Arc::downgrade(&self.events),
130 event: weak_event,
131 }
132 }
133
134 /// Cancel all pending events, leaving the scheduler idle.
135 pub fn clear(&mut self) {
136 self.events.lock().unwrap().clear();
137 }
138
139 /// Removes events that are due to happen at or before `now` from the scheduler's queue,
140 /// returning an iterator over the removed events.
141 ///
142 /// If the iterator is dropped before being fully consumed, it drops the remaining removed
143 /// events.
144 pub fn dispatch(&mut self, now: Instant) -> impl Iterator<Item = E> + use<E> {
145 let mut events = self.events.lock().unwrap();
146 let idx = Scheduler::partition_point(&events, now);
147 let to_dispatch = events.split_off(idx);
148
149 // Invariant: at most 3 refs to the event exist (see doc on SchedulerInner struct).
150 // We haven't upgraded the Handle's Weak, so that Arc doesn't exist. The iterator owns the
151 // Arc that was formerly in self.events, and into_inner is not blocked by the existence of
152 // the Handle's Weak. Thus, into_inner always succeeds.
153 to_dispatch
154 .into_iter()
155 .rev()
156 .map(|e| Arc::into_inner(e).unwrap().what)
157 }
158
159 /// Returns the next time range in which [`Scheduler::dispatch`] should next be called to
160 /// dispatch events.
161 ///
162 /// [`Scheduler::dispatch`] should be called at some point in the returned [`TimeRange`] to
163 /// dispatch events on time.
164 ///
165 /// Calling `dispatch` closer to the end of the range is more efficient and results in more
166 /// events being available. Calling `dispatch` before the returned range is inefficient
167 /// but otherwise harmless.
168 ///
169 /// The returned range may lie entirely in the past, if overdue events exists. The caller is
170 /// expected to call [`Scheduler::dispatch`] as soon as possible in that case.
171 ///
172 /// This method is intended to be used to plumb this Scheduler's event dispatch into another
173 /// Scheduler.
174 pub fn next_dispatch_range(&self) -> Option<TimeRange> {
175 let events = self.events.lock().unwrap();
176 let start = events.last()?.when.start;
177 let mut end = events.last()?.when.end;
178
179 for e in events.iter().rev().skip(1) {
180 if e.when.start > end {
181 break;
182 }
183 end = min(end, e.when.end);
184 }
185
186 Some(TimeRange::new(start, end))
187 }
188
189 /// Returns the next time at which [`Scheduler::dispatch`] should next be called to dispatch
190 /// events.
191 ///
192 /// This is the same as [`Scheduler::next_dispatch_range`], but only returns the [`Instant`]
193 /// corresponding to the end of the feasible time range.
194 ///
195 /// Calling `dispatch` before the returned time is inefficient but otherwise harmless.
196 ///
197 /// The returned time may be in the past, if overdue events exists. The caller is
198 /// expected to call [`Scheduler::dispatch`] as soon as possible in that case.
199 ///
200 /// This method is intended to be used to plumb this Scheduler into an external timer facility
201 /// (e.g. `std::thread::sleep`, `tokio::time::sleep`), to trigger event dispatching at the
202 /// appropriate time.
203 pub fn next_dispatch(&self) -> Option<Instant> {
204 Some(self.next_dispatch_range()?.end)
205 }
206
207 /// Assert that the scheduler's internal state is consistent.
208 ///
209 /// # Panics
210 ///
211 /// If the scheduler's internal invariants are violated.
212 #[cfg(test)]
213 fn assert_consistent(&self) {
214 assert!(
215 self.events
216 .lock()
217 .unwrap()
218 .is_sorted_by(|a, b| a.when.start >= b.when.start)
219 );
220 }
221}
222
223/// A handle for a scheduled future event, allowing the holder to reschedule or cancel the event.
224///
225/// The handle may outlive the event it relates to. Calling methods on such a lapsed Handle is safe.
226///
227/// Handles that aren't needed for cancellation or rescheduling can be dropped without impacting
228/// the related event.
229pub struct Handle<E> {
230 events: Weak<Mutex<Vec<Arc<FutureEvent<E>>>>>,
231 event: Weak<FutureEvent<E>>,
232}
233
234impl<E> Handle<E> {
235 /// Attempts to cancel the event.
236 ///
237 /// If the event hasn't yet occurred when cancel is called, it is canceled and will not be
238 /// returned by [`Scheduler::dispatch`]. Cancelling an event that has already been dispatched
239 /// is a no-op.
240 pub fn cancel(self) {
241 let Some(events) = self.events.upgrade() else {
242 return;
243 };
244 let Some(event) = self.event.upgrade() else {
245 return;
246 };
247 let mut events = events.lock().unwrap();
248 let Some(idx) = Scheduler::find(&events, &event) else {
249 return;
250 };
251 events.remove(idx);
252 }
253
254 /// Attempts to reschedule the event to a new time range.
255 ///
256 /// Returns an updated Handle if rescheduling succeeds, or None if the event has already
257 /// been dispatched.
258 pub fn reschedule(self, when: TimeRange) -> Option<Handle<E>> {
259 let events = self.events.upgrade()?;
260 let mut events = events.lock().unwrap();
261 let mut event = self.event.upgrade()?;
262 drop(self.event);
263 let idx = Scheduler::find(&events, &event)?;
264 drop(events.remove(idx));
265 // Invariant: At most 3 refs to the event exist (see doc on SchedulerInner struct).
266 // We dropped the Handle's Weak and events's Arc above, leaving `event` as the sole Arc
267 // for this event. Thus, get_mut always succeeds.
268 Arc::get_mut(&mut event).unwrap().when = when;
269 let weak = Arc::downgrade(&event);
270
271 let idx = Scheduler::partition_point(&events, when.start);
272 events.insert(idx, event);
273 drop(events);
274
275 Some(Handle {
276 events: self.events,
277 event: weak,
278 })
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use std::{collections::HashSet, fmt::Debug, hash::Hash};
285
286 use super::*;
287
288 #[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)]
289 enum Event {
290 Foo,
291 Bar(usize),
292 }
293
294 fn check_next<E: Debug + Eq + Hash>(
295 sched: &mut Scheduler<E>,
296 want_next: TimeRange,
297 want_events: Vec<E>,
298 ) {
299 // Pending events are only sorted according to their range's start time, so when multiple
300 // events have the same start time the exact order in which they're returned is dependent
301 // on insertion order. We don't care about the relative ordering of such events, so just
302 // toss everything into a set and check that the right set of events was dispatched.
303 let want_events: HashSet<E> = HashSet::from_iter(want_events);
304
305 let next = sched.next_dispatch_range();
306 assert_eq!(next, Some(want_next));
307 let next = sched.next_dispatch();
308 assert_eq!(next, Some(want_next.end));
309 let next = next.unwrap();
310
311 let events = sched.dispatch(next).collect::<HashSet<E>>();
312 assert_eq!(events, want_events);
313 }
314
315 fn check_empty<E>(sched: &mut Scheduler<E>) {
316 assert_eq!(sched.next_dispatch(), None);
317 assert_eq!(sched.next_dispatch_range(), None);
318 }
319
320 #[test]
321 fn test_basic() {
322 let datum = Instant::now();
323 let mut sched = Scheduler::default();
324 sched.add(TimeRange::new(datum, datum), Event::Foo);
325 check_next(&mut sched, TimeRange::new(datum, datum), vec![Event::Foo]);
326 check_empty(&mut sched);
327 }
328
329 #[test]
330 fn test_many() {
331 let datum = Instant::now();
332 let mut sched = Scheduler::default();
333 let ranges: Vec<(u64, u64)> = vec![(1, 9), (2, 8), (3, 7), (4, 6), (5, 5), (2, 4)];
334 // Event ranges:
335 //
336 // <-------> (1,9)
337 // <-----> (2,8)
338 // <---> (3,7)
339 // <-> (4,6)
340 // x (5,5)
341 // <-> (2,4)
342 //
343 for (i, (start, end)) in ranges.iter().enumerate() {
344 let start = datum + Duration::from_secs(*start);
345 let end = datum + Duration::from_secs(*end);
346 let range = TimeRange::new(start, end);
347 sched.add(range, Event::Bar(i));
348 }
349 // First wakeup at 4, all events except (5,5).
350 check_next(
351 &mut sched,
352 TimeRange::new(
353 datum + Duration::from_secs(1),
354 datum + Duration::from_secs(4),
355 ),
356 vec![
357 Event::Bar(0),
358 Event::Bar(1),
359 Event::Bar(2),
360 Event::Bar(3),
361 Event::Bar(5),
362 ],
363 );
364 // Final dispatch at 5, only (5,5).
365 check_next(
366 &mut sched,
367 TimeRange::new(
368 datum + Duration::from_secs(5),
369 datum + Duration::from_secs(5),
370 ),
371 vec![Event::Bar(4)],
372 );
373 check_empty(&mut sched);
374 }
375}
376
377#[cfg(test)]
378mod proptests {
379 use std::{cmp::max, fmt::Debug, sync::LazyLock};
380
381 use proptest::{collection::vec, prelude::*};
382
383 use super::*;
384
385 static DATUM: LazyLock<Instant> = LazyLock::new(Instant::now);
386
387 prop_compose! {
388 fn arb_timerange()(start in 1..u16::MAX, duration in any::<u16>()) -> (Duration, Duration) {
389 let start = Duration::from_millis(start as u64);
390 let end = start+Duration::from_millis(duration as u64);
391 (start, end)
392 }
393 }
394
395 #[derive(Copy, Clone, Debug)]
396 enum Action {
397 /// Run a dispatch cycle, checking invariants on produced events
398 Dispatch,
399 /// Schedule a new event for the given time range.
400 Add((Duration, Duration)),
401 /// Cancel a previously added event. The f64 must be in 0..1, and is rescaled to the total
402 /// number of scheduled events so far in the run, so will try to cancel a uniformly sampled
403 /// prior event (which may have already been canceled).
404 Cancel(f64),
405 /// Reschedule a previously added event. The f64 is rescaled as with Cancel.
406 Reschedule((f64, (Duration, Duration))),
407 }
408
409 /// Convert a random 0-1 float value into an index in the range 0..max.
410 ///
411 /// Used below to distribute cancellations and reschedules over previously created events.
412 fn sample(v: f64, max: usize) -> usize {
413 (max as f64 * v.clamp(0f64, 1f64)).floor() as usize
414 }
415
416 fn arb_scheduler_action() -> impl Strategy<Value = Action> {
417 prop_oneof![
418 Just(Action::Dispatch),
419 arb_timerange().prop_map(Action::Add),
420 (0f64..1f64).prop_map(Action::Cancel),
421 ((0f64..1f64), arb_timerange()).prop_map(Action::Reschedule),
422 ]
423 }
424
425 struct Event {
426 id: usize,
427 range: Mutex<TimeRange>,
428 }
429
430 impl Event {
431 fn new(id: usize, start: Duration, end: Duration) -> Self {
432 let range = Mutex::new(TimeRange::new(*DATUM + start, *DATUM + end));
433 Self { id, range }
434 }
435
436 fn update(&mut self, start: Duration, end: Duration) {
437 let mut range = self.range.lock().unwrap();
438 *range = TimeRange::new(*DATUM + start, *DATUM + end);
439 }
440 }
441
442 impl Debug for Event {
443 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
444 let (start, end) = {
445 let range = self.range.lock().unwrap();
446 (range.start, range.end)
447 };
448 f.debug_struct("ReschedulableEvent")
449 .field("id", &self.id)
450 .field("start", &(start - *DATUM))
451 .field("end", &(end - *DATUM))
452 .finish()
453 }
454 }
455
456 proptest! {
457 #[test]
458 fn test_events(times in vec(arb_timerange(), 1..100)) {
459 let mut sched = Scheduler::default();
460 for (start, end) in × {
461 let tr = TimeRange::new(*DATUM+*start, *DATUM+*end);
462 sched.add(tr, (start, end, tr));
463 sched.assert_consistent();
464 }
465
466 let mut total_seen = 0;
467 let mut last_time = *DATUM;
468 while let Some(next) = sched.next_dispatch() {
469 // Invariant: dispatch time always moves forward.
470 assert!(next > last_time, "next={:?}, last={:?}", next-*DATUM, last_time-*DATUM);
471 last_time = next;
472
473 for (start, end, tr) in sched.dispatch(next) {
474 total_seen += 1;
475 // Invariant: all events dispatch within their requested time range.
476 // Note this is only true in this test because we schedule all events upfront,
477 // all the events are scheduled for a future time.
478 assert!(tr.contains(next), "range=({:?}, {:?}), next={:?}", start, end, next-*DATUM);
479 }
480 sched.assert_consistent();
481 }
482
483 // Invariant: the scheduler doesn't forget events.
484 assert_eq!(total_seen, times.len());
485 }
486 }
487
488 proptest! {
489 #![proptest_config(ProptestConfig::with_cases(1000))]
490 #[test]
491 fn test_event_handles(actions in vec(arb_scheduler_action(), 1..100)) {
492 let mut sched: Scheduler<usize> = Scheduler::default();
493 let mut events: Vec<Option<Event >> = Vec::new();
494 let mut handles: Vec<Option<Handle<usize>>> = Vec::new();
495 let mut now = *DATUM;
496 let mut total_scheduled = 0;
497 let mut total_canceled = 0;
498 let mut total_dispatched = 0;
499 println!("\nSTART, now=0s");
500 for action in actions {
501 sched.assert_consistent();
502 match action {
503 Action::Dispatch => {
504 let Some(next) = sched.next_dispatch() else {
505 println!("Dispatch (no scheduled events)");
506 continue;
507 };
508 // Due to reschedules, next may be in the past.
509 now = max(next, now);
510 println!("Dispatch, now={:?}", now-*DATUM);
511
512 for idx in sched.dispatch(now) {
513 if let Some(event) = events[idx].take() {
514 println!(" {:?}", event);
515 total_dispatched += 1;
516 let tr = event.range.lock().unwrap();
517 // Invariant: events dispatch within their requested time range, or
518 // are being dispatched late in the case of events (re)scheduled
519 // in the past.
520 assert!(tr.contains(now) || now > tr.end());
521 } else {
522 panic!("dispatched canceled event {}", idx);
523 }
524 }
525 }
526 Action::Add((start, end)) => {
527 let val = Event::new(events.len(), start, end);
528 println!("Add {:?}", val);
529 let tr = {
530 let range = val.range.lock().unwrap();
531 *range
532 };
533 let handle = sched.add(tr, val.id);
534 events.push(Some(val));
535 handles.push(Some(handle));
536 total_scheduled += 1;
537 }
538 Action::Cancel(idx) => {
539 if events.is_empty() {
540 println!("Cancel() (no events yet)");
541 continue;
542 }
543 let idx = sample(idx, events.len());
544 if let Some(handle) = handles[idx].take() {
545 println!("Cancel({})", idx);
546 handle.cancel();
547 events[idx] = None;
548 total_canceled += 1;
549 } else {
550 println!("Cancel({}) (already canceled)", idx);
551 };
552 }
553 Action::Reschedule((idx, (start, end))) => {
554 if events.is_empty() {
555 println!("Reschedule() (no events yet)");
556 continue;
557 }
558 let idx = sample(idx, events.len());
559 if let Some(event) = &mut events[idx] {
560 event.update(start, end);
561 let tr = {
562 *event.range.lock().unwrap()
563 };
564 println!("Reschedule({}) event={:?}", idx, event);
565 handles[idx] = handles[idx].take().and_then(|handle| handle.reschedule(tr));
566 } else {
567 println!("Reschedule({}) (no such event)", idx);
568 }
569 }
570 }
571 }
572 let total_pending: usize = events.iter().map(|x| if x.is_some() { 1 } else {0}).sum();
573 assert_eq!(total_scheduled, events.len());
574 assert!(total_dispatched <= total_scheduled);
575 assert!(total_canceled <= total_scheduled);
576 assert!(total_pending <= total_scheduled);
577 // Cancellations can cause double-counting, when cancelling an already dispatched event.
578 // So, best we can do is bracket the values.
579 let definitely_alive = total_pending+total_dispatched;
580 assert!(definitely_alive <= total_scheduled);
581 assert!(definitely_alive+total_canceled >= total_scheduled);
582 }
583 }
584}