1use super::WakerEntry;
20use std::cell::Cell;
21use std::collections::{BTreeSet, HashMap};
22use std::rc::Weak;
23use std::task::Waker;
24use std::time::Instant;
25
26#[derive(Clone, Debug, Default)]
44pub struct ScheduledWakerQueue {
45 wakers_by_time: BTreeSet<(Instant, WakerEntry)>,
47 waker_to_time: HashMap<WakerEntry, Instant>,
49}
50
51impl ScheduledWakerQueue {
52 #[inline(always)]
54 #[must_use]
55 pub fn new() -> Self {
56 Self::default()
57 }
58
59 #[inline(always)]
64 #[must_use]
65 pub fn len(&self) -> usize {
66 self.wakers_by_time.len()
67 }
68
69 #[inline(always)]
74 #[must_use]
75 pub fn is_empty(&self) -> bool {
76 self.wakers_by_time.is_empty()
77 }
78
79 pub fn clear(&mut self) {
81 self.wakers_by_time.clear();
82 self.waker_to_time.clear();
83
84 #[cfg(debug_assertions)]
85 self.validate();
86 }
87
88 pub fn push(&mut self, wake_time: Instant, waker: Weak<Cell<Option<Waker>>>) -> bool {
104 let waker_entry = WakerEntry(waker);
105 if !waker_entry.is_alive() {
106 return false;
107 }
108
109 self.trim_to_next_wake_time();
111 if self.len() == self.waker_to_time.capacity() {
112 self.wakers_by_time.retain(|(_wake_time, waker_entry)| {
115 if waker_entry.is_alive() {
116 true
117 } else {
118 self.waker_to_time.remove(waker_entry);
119 false
120 }
121 });
122
123 self.waker_to_time
128 .shrink_to(std::cmp::max(8, self.len() * 2));
129
130 self.waker_to_time.reserve(self.len());
133 }
134
135 let pushed = match self.waker_to_time.get_mut(&waker_entry) {
137 None => {
138 self.waker_to_time.insert(waker_entry.clone(), wake_time);
139 self.wakers_by_time.insert((wake_time, waker_entry))
140 }
141 Some(wake_time_entry) => {
142 if *wake_time_entry <= wake_time {
143 false
146 } else {
147 let old_wake_time = std::mem::replace(wake_time_entry, wake_time);
150 let waker_entry = self
151 .wakers_by_time
152 .take(&(old_wake_time, waker_entry))
153 .unwrap()
154 .1;
155 self.wakers_by_time.insert((wake_time, waker_entry))
156 }
157 }
158 };
159
160 #[cfg(debug_assertions)]
161 self.validate();
162
163 pushed
164 }
165
166 pub fn next_wake_time(&self) -> Option<Instant> {
177 self.wakers_by_time
178 .iter()
179 .find(|&(_, entry)| entry.is_alive())
180 .map(|(wake_time, _)| *wake_time)
181 }
182
183 pub fn trim_to_next_wake_time(&mut self) -> Option<Instant> {
200 let mut next_wake_time = None;
201 while let Some((wake_time, waker_entry)) = self.wakers_by_time.first() {
202 if waker_entry.is_alive() {
203 next_wake_time = Some(*wake_time);
204 break;
205 }
206 self.waker_to_time.remove(waker_entry);
207 self.wakers_by_time.pop_first();
208 }
209
210 #[cfg(debug_assertions)]
211 self.validate();
212
213 next_wake_time
214 }
215
216 pub fn wake(&mut self, now: Instant) {
224 while let Some((wake_time, waker_entry)) = self.wakers_by_time.first() {
225 if *wake_time > now {
226 break;
227 }
228 self.waker_to_time.remove(waker_entry);
229 let waker_entry = self.wakers_by_time.pop_first().unwrap().1;
230 if let Some(waker) = waker_entry.0.upgrade().and_then(|cell| cell.take()) {
231 waker.wake();
232 }
233 }
234
235 #[cfg(debug_assertions)]
236 self.validate();
237 }
238
239 #[cfg(debug_assertions)]
241 fn validate(&self) {
242 assert_eq!(self.wakers_by_time.len(), self.waker_to_time.len());
243 for (wake_time, entry) in &self.wakers_by_time {
244 assert_eq!(*wake_time, self.waker_to_time[entry]);
245 }
246 for (entry, wake_time) in &self.waker_to_time {
247 assert!(self.wakers_by_time.contains(&(*wake_time, entry.clone())));
248 }
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use crate::test_helper::WakeFlag;
256 use std::rc::Rc;
257 use std::sync::Arc;
258 use std::time::Duration;
259
260 fn dummy_waker() -> Rc<Cell<Option<Waker>>> {
261 Rc::new(Cell::new(Some(Waker::noop().clone())))
262 }
263
264 #[test]
265 fn queue_is_initially_empty() {
266 let queue = ScheduledWakerQueue::new();
267 assert!(queue.is_empty());
268 assert_eq!(queue.len(), 0);
269 }
270
271 #[test]
272 fn pushed_wakers_are_stored_in_queue() {
273 let mut queue = ScheduledWakerQueue::new();
274 let waker = dummy_waker();
275
276 let pushed = queue.push(Instant::now(), Rc::downgrade(&waker));
277 assert!(pushed);
278 assert!(!queue.is_empty());
279 assert_eq!(queue.len(), 1);
280 assert_eq!(Rc::strong_count(&waker), 1);
281 assert_eq!(Rc::weak_count(&waker), 2);
282
283 let another_waker = dummy_waker();
284
285 let pushed = queue.push(Instant::now(), Rc::downgrade(&another_waker));
286 assert!(pushed);
287 assert!(!queue.is_empty());
288 assert_eq!(queue.len(), 2);
289 assert_eq!(Rc::strong_count(&another_waker), 1);
290 assert_eq!(Rc::weak_count(&another_waker), 2);
291 }
292
293 #[test]
294 fn queue_is_empty_after_cleared() {
295 let mut queue = ScheduledWakerQueue::new();
296 let waker = dummy_waker();
297 queue.push(Instant::now(), Rc::downgrade(&waker));
298
299 queue.clear();
300 assert!(queue.is_empty());
301 assert_eq!(queue.len(), 0);
302 }
303
304 #[test]
305 fn pushing_existing_waker_with_earlier_wake_time_discards_existing_waker() {
306 let mut queue = ScheduledWakerQueue::new();
307 let now = Instant::now();
308 let waker = dummy_waker();
309 queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker));
310
311 let pushed = queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker));
312 assert!(pushed);
313 assert_eq!(queue.len(), 1);
314 assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(3)));
315 }
316
317 #[test]
318 fn pushing_existing_waker_with_later_wake_time_discards_new_waker() {
319 let mut queue = ScheduledWakerQueue::new();
320 let now = Instant::now();
321 let waker = dummy_waker();
322 queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker));
323
324 let pushed = queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker));
325 assert!(!pushed);
326 assert_eq!(queue.len(), 1);
327 assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(3)));
328 }
329
330 #[test]
331 fn pushing_dead_waker_is_noop() {
332 let mut queue = ScheduledWakerQueue::new();
333 let now = Instant::now();
334
335 let pushed = queue.push(now, Weak::new());
336 assert!(!pushed);
337 assert!(queue.is_empty());
338
339 let waker = dummy_waker();
340 waker.take();
341 let pushed = queue.push(now, Rc::downgrade(&waker));
342 assert!(!pushed);
343 assert!(queue.is_empty());
344 }
345
346 #[test]
347 fn next_wake_time_returns_none_if_empty() {
348 let queue = ScheduledWakerQueue::new();
349 assert_eq!(queue.next_wake_time(), None);
350 }
351
352 #[test]
353 fn next_wake_time_returns_earliest_pending_waker_time() {
354 let mut queue = ScheduledWakerQueue::new();
355 let now = Instant::now();
356 let waker_1 = dummy_waker();
357 let waker_2 = dummy_waker();
358 let waker_3 = dummy_waker();
359
360 assert_eq!(queue.next_wake_time(), None);
361
362 queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_1));
363 assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(5)));
364
365 queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_2));
366 assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(3)));
367
368 queue.push(now + Duration::from_secs(10), Rc::downgrade(&waker_3));
369 assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(3)));
370 }
371
372 #[test]
373 fn next_wake_time_ignores_dead_wakers() {
374 let mut queue = ScheduledWakerQueue::new();
375 let now = Instant::now();
376 let waker_1 = dummy_waker();
377 let waker_2 = dummy_waker();
378 let waker_3 = dummy_waker();
379 queue.push(now, Rc::downgrade(&waker_1));
380 queue.push(now, Rc::downgrade(&waker_2));
381 queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_3));
382 drop(waker_1);
383 waker_2.take();
384
385 assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(5)));
386 }
387
388 #[test]
389 fn trim_to_next_wake_time_removes_leading_dead_wakers() {
390 let mut queue = ScheduledWakerQueue::new();
391 let now = Instant::now();
392 let waker_1 = dummy_waker();
393 let waker_2 = dummy_waker();
394 let waker_3 = dummy_waker();
395 queue.push(now, Rc::downgrade(&waker_1));
396 queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_2));
397 queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_3));
398 drop(waker_1);
399 waker_2.take();
400
401 queue.trim_to_next_wake_time();
402 assert_eq!(queue.len(), 1);
403 assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(5)));
404 }
405
406 #[test]
407 fn trim_to_next_wake_time_returns_next_wake_time() {
408 let mut queue = ScheduledWakerQueue::new();
409 let now = Instant::now();
410 let waker_1 = dummy_waker();
411 let waker_2 = dummy_waker();
412 let waker_3 = dummy_waker();
413 queue.push(now, Rc::downgrade(&waker_1));
414 queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_2));
415 queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_3));
416 drop(waker_1);
417 waker_2.take();
418
419 let next_wake_time = queue.trim_to_next_wake_time();
420 assert_eq!(next_wake_time, Some(now + Duration::from_secs(5)));
421
422 drop(waker_3);
423 let next_wake_time = queue.trim_to_next_wake_time();
424 assert_eq!(next_wake_time, None);
425 }
426
427 #[test]
428 fn wake_removes_all_wakers_up_to_given_time() {
429 let mut queue = ScheduledWakerQueue::new();
430 let now = Instant::now();
431
432 let waker_1 = dummy_waker();
433 let waker_2 = dummy_waker();
434 let waker_3 = dummy_waker();
435 queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_1));
436 queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_2));
437 queue.push(now + Duration::from_secs(6), Rc::downgrade(&waker_3));
438
439 queue.wake(now + Duration::from_secs(5));
441 assert_eq!(queue.len(), 1);
442 assert_eq!(Rc::weak_count(&waker_1), 0);
443 assert_eq!(Rc::weak_count(&waker_2), 0);
444 assert_eq!(Rc::weak_count(&waker_3), 2);
445 }
446
447 #[test]
448 fn wake_activates_all_wakers_up_to_given_time() {
449 let mut queue = ScheduledWakerQueue::new();
450 let now = Instant::now();
451
452 let wake_flag_1 = Arc::new(WakeFlag::new());
453 let wake_flag_2 = Arc::new(WakeFlag::new());
454 let wake_flag_3 = Arc::new(WakeFlag::new());
455 let waker_1 = Rc::new(Cell::new(Some(Waker::from(wake_flag_1.clone()))));
456 let waker_2 = Rc::new(Cell::new(Some(Waker::from(wake_flag_2.clone()))));
457 let waker_3 = Rc::new(Cell::new(Some(Waker::from(wake_flag_3.clone()))));
458 queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_1));
459 queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_2));
460 queue.push(now + Duration::from_secs(6), Rc::downgrade(&waker_3));
461
462 queue.wake(now + Duration::from_secs(5));
464 assert!(wake_flag_1.is_woken());
465 assert!(wake_flag_2.is_woken());
466 assert!(!wake_flag_3.is_woken());
467 }
468
469 #[test]
470 fn complex_pushes_and_wakes() {
471 let mut queue = ScheduledWakerQueue::new();
472 let now = Instant::now();
473
474 let wake_flag_1 = Arc::new(WakeFlag::new());
475 let wake_flag_2 = Arc::new(WakeFlag::new());
476 let wake_flag_3 = Arc::new(WakeFlag::new());
477 let waker_1 = Rc::new(Cell::new(Some(Waker::from(wake_flag_1.clone()))));
478 let waker_2 = Rc::new(Cell::new(Some(Waker::from(wake_flag_2.clone()))));
479 let waker_3 = Rc::new(Cell::new(Some(Waker::from(wake_flag_3.clone()))));
480
481 queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_1));
482 queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_2));
483 queue.push(now + Duration::from_secs(10), Rc::downgrade(&waker_3));
484
485 queue.wake(now + Duration::from_secs(5));
487 assert!(wake_flag_1.is_woken());
488 assert!(wake_flag_2.is_woken());
489 assert!(!wake_flag_3.is_woken());
490
491 assert_eq!(queue.next_wake_time(), Some(now + Duration::from_secs(10)));
493
494 queue.wake(now + Duration::from_secs(15));
496 assert!(wake_flag_3.is_woken());
497
498 assert_eq!(queue.next_wake_time(), None);
500 }
501
502 #[test]
503 fn push_trims_earliest_dead_entries() {
504 let mut queue = ScheduledWakerQueue::new();
505 let now = Instant::now();
506
507 let wake_flag_1 = Arc::new(WakeFlag::new());
508 let wake_flag_2 = Arc::new(WakeFlag::new());
509 let wake_flag_3 = Arc::new(WakeFlag::new());
510 let wake_flag_4 = Arc::new(WakeFlag::new());
511 let waker_1 = Rc::new(Cell::new(Some(Waker::from(wake_flag_1))));
512 let waker_2 = Rc::new(Cell::new(Some(Waker::from(wake_flag_2))));
513 let waker_3 = Rc::new(Cell::new(Some(Waker::from(wake_flag_3.clone()))));
514 let waker_4 = Rc::new(Cell::new(Some(Waker::from(wake_flag_4.clone()))));
515
516 queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_1));
517 queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_2));
518 queue.push(now + Duration::from_secs(7), Rc::downgrade(&waker_3));
519
520 waker_1.take().unwrap().wake();
522 waker_2.take().unwrap().wake();
523
524 queue.push(now + Duration::from_secs(1), Rc::downgrade(&waker_4));
526 assert_eq!(queue.len(), 2);
527
528 queue.wake(now + Duration::from_secs(10));
530 assert!(wake_flag_3.is_woken());
531 assert!(wake_flag_4.is_woken());
532 }
533
534 #[test]
535 fn push_cleans_up_all_dead_entries_if_full() {
536 let mut queue = ScheduledWakerQueue::new();
537 let now = Instant::now();
538
539 let waker_1 = Rc::new(Cell::new(Some(Waker::noop().clone())));
540 let waker_2 = Rc::new(Cell::new(Some(Waker::noop().clone())));
541 let waker_3 = Rc::new(Cell::new(Some(Waker::noop().clone())));
542 let waker_4 = Rc::new(Cell::new(Some(Waker::noop().clone())));
543
544 queue.waker_to_time.reserve(10);
545 queue.push(now + Duration::from_secs(3), Rc::downgrade(&waker_1));
546 while queue.len() + 1 < queue.waker_to_time.capacity() {
547 let waker = dummy_waker();
548 queue.push(
549 now + Duration::new(3, queue.len() as u32),
550 Rc::downgrade(&waker),
551 );
552 }
553 queue.push(now + Duration::from_secs(4), Rc::downgrade(&waker_2));
554 assert_eq!(queue.len(), queue.waker_to_time.capacity());
555
556 queue.push(now + Duration::from_secs(5), Rc::downgrade(&waker_3));
558 assert_eq!(queue.len(), 3);
559
560 waker_3.take().unwrap().wake();
562
563 queue.push(now + Duration::from_secs(6), Rc::downgrade(&waker_4));
565 assert_eq!(queue.len(), 4);
566 }
567}