kayrx_timer/delay_queue.rs
1//! A queue of delayed elements.
2//!
3//! See [`DelayQueue`] for more details.
4//!
5//! [`DelayQueue`]: struct.DelayQueue.html
6
7use crate::wheel::{self, Wheel};
8use crate::{delay_until, Delay, Duration, Error, Instant};
9
10use slab::Slab;
11use std::cmp;
12use std::future::Future;
13use std::marker::PhantomData;
14use std::pin::Pin;
15use std::task::{self, Poll};
16
17
18macro_rules! ready {
19 ($e:expr $(,)?) => {
20 match $e {
21 std::task::Poll::Ready(t) => t,
22 std::task::Poll::Pending => return std::task::Poll::Pending,
23 }
24 };
25}
26
27/// A queue of delayed elements.
28///
29/// Once an element is inserted into the `DelayQueue`, it is yielded once the
30/// specified deadline has been reached.
31///
32/// # Usage
33///
34/// Elements are inserted into `DelayQueue` using the [`insert`] or
35/// [`insert_at`] methods. A deadline is provided with the item and a [`Key`] is
36/// returned. The key is used to remove the entry or to change the deadline at
37/// which it should be yielded back.
38///
39/// Once delays have been configured, the `DelayQueue` is used via its
40/// [`Stream`] implementation. [`poll`] is called. If an entry has reached its
41/// deadline, it is returned. If not, `Async::NotReady` indicating that the
42/// current task will be notified once the deadline has been reached.
43///
44/// # `Stream` implementation
45///
46/// Items are retrieved from the queue via [`Stream::poll`]. If no delays have
47/// expired, no items are returned. In this case, `NotReady` is returned and the
48/// current task is registered to be notified once the next item's delay has
49/// expired.
50///
51/// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll`
52/// returns `Ready(None)`. This indicates that the stream has reached an end.
53/// However, if a new item is inserted *after*, `poll` will once again start
54/// returning items or `NotReady.
55///
56/// Items are returned ordered by their expirations. Items that are configured
57/// to expire first will be returned first. There are no ordering guarantees
58/// for items configured to expire the same instant. Also note that delays are
59/// rounded to the closest millisecond.
60///
61/// # Implementation
62///
63/// The `DelayQueue` is backed by the same hashed timing wheel implementation as
64/// [`Timer`] as such, it offers the same performance benefits. See [`Timer`]
65/// for further implementation notes.
66///
67/// State associated with each entry is stored in a [`slab`]. This allows
68/// amortizing the cost of allocation. Space created for expired entries is
69/// reused when inserting new entries.
70///
71/// Capacity can be checked using [`capacity`] and allocated preemptively by using
72/// the [`reserve`] method.
73///
74/// # Usage
75///
76/// Using `DelayQueue` to manage cache entries.
77///
78/// ```rust,no_run
79/// use kayrx_timer::{delay_queue, DelayQueue, Error};
80///
81/// use futures::ready;
82/// use std::collections::HashMap;
83/// use std::task::{Context, Poll};
84/// use std::time::Duration;
85///
86/// type CacheKey = String;
87/// type Value = String;
88///
89/// struct Cache {
90/// entries: HashMap<CacheKey, (Value, delay_queue::Key)>,
91/// expirations: DelayQueue<CacheKey>,
92/// }
93///
94/// const TTL_SECS: u64 = 30;
95///
96/// impl Cache {
97/// fn insert(&mut self, key: CacheKey, value: Value) {
98/// let delay = self.expirations
99/// .insert(key.clone(), Duration::from_secs(TTL_SECS));
100///
101/// self.entries.insert(key, (value, delay));
102/// }
103///
104/// fn get(&self, key: &CacheKey) -> Option<&Value> {
105/// self.entries.get(key)
106/// .map(|&(ref v, _)| v)
107/// }
108///
109/// fn remove(&mut self, key: &CacheKey) {
110/// if let Some((_, cache_key)) = self.entries.remove(key) {
111/// self.expirations.remove(&cache_key);
112/// }
113/// }
114///
115/// fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
116/// while let Some(res) = ready!(self.expirations.poll_expired(cx)) {
117/// let entry = res?;
118/// self.entries.remove(entry.get_ref());
119/// }
120///
121/// Poll::Ready(Ok(()))
122/// }
123/// }
124/// ```
125///
126/// [`insert`]: #method.insert
127/// [`insert_at`]: #method.insert_at
128/// [`Key`]: struct.Key.html
129/// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
130/// [`poll`]: #method.poll
131/// [`Stream::poll`]: #method.poll
132/// [`Timer`]: ../struct.Timer.html
133/// [`slab`]: https://docs.rs/slab
134/// [`capacity`]: #method.capacity
135/// [`reserve`]: #method.reserve
136#[derive(Debug)]
137pub struct DelayQueue<T> {
138 /// Stores data associated with entries
139 slab: Slab<Data<T>>,
140
141 /// Lookup structure tracking all delays in the queue
142 wheel: Wheel<Stack<T>>,
143
144 /// Delays that were inserted when already expired. These cannot be stored
145 /// in the wheel
146 expired: Stack<T>,
147
148 /// Delay expiring when the *first* item in the queue expires
149 delay: Option<Delay>,
150
151 /// Wheel polling state
152 poll: wheel::Poll,
153
154 /// Instant at which the timer starts
155 start: Instant,
156}
157
158/// An entry in `DelayQueue` that has expired and removed.
159///
160/// Values are returned by [`DelayQueue::poll`].
161///
162/// [`DelayQueue::poll`]: struct.DelayQueue.html#method.poll
163#[derive(Debug)]
164pub struct Expired<T> {
165 /// The data stored in the queue
166 data: T,
167
168 /// The expiration time
169 deadline: Instant,
170
171 /// The key associated with the entry
172 key: Key,
173}
174
175/// Token to a value stored in a `DelayQueue`.
176///
177/// Instances of `Key` are returned by [`DelayQueue::insert`]. See [`DelayQueue`]
178/// documentation for more details.
179///
180/// [`DelayQueue`]: struct.DelayQueue.html
181/// [`DelayQueue::insert`]: struct.DelayQueue.html#method.insert
182#[derive(Debug, Clone)]
183pub struct Key {
184 index: usize,
185}
186
187#[derive(Debug)]
188struct Stack<T> {
189 /// Head of the stack
190 head: Option<usize>,
191 _p: PhantomData<fn() -> T>,
192}
193
194#[derive(Debug)]
195struct Data<T> {
196 /// The data being stored in the queue and will be returned at the requested
197 /// instant.
198 inner: T,
199
200 /// The instant at which the item is returned.
201 when: u64,
202
203 /// Set to true when stored in the `expired` queue
204 expired: bool,
205
206 /// Next entry in the stack
207 next: Option<usize>,
208
209 /// Previous entry in the stack
210 prev: Option<usize>,
211}
212
213/// Maximum number of entries the queue can handle
214const MAX_ENTRIES: usize = (1 << 30) - 1;
215
216impl<T> DelayQueue<T> {
217 /// Create a new, empty, `DelayQueue`
218 ///
219 /// The queue will not allocate storage until items are inserted into it.
220 ///
221 /// # Examples
222 ///
223 /// ```rust
224 /// use kayrx_timer::DelayQueue;
225 ///
226 /// let delay_queue: DelayQueue<u32> = DelayQueue::new();
227 /// ```
228 pub fn new() -> DelayQueue<T> {
229 DelayQueue::with_capacity(0)
230 }
231
232 /// Create a new, empty, `DelayQueue` with the specified capacity.
233 ///
234 /// The queue will be able to hold at least `capacity` elements without
235 /// reallocating. If `capacity` is 0, the queue will not allocate for
236 /// storage.
237 ///
238 /// # Examples
239 ///
240 /// ```rust
241 /// use kayrx_timer::DelayQueue;
242 /// use std::time::Duration;
243 /// use kayrx_karx;
244 ///
245 /// fn main() {
246 /// kayrx_karx::exec(async {
247 ///
248 /// let mut delay_queue = DelayQueue::with_capacity(10);
249 ///
250 /// // These insertions are done without further allocation
251 /// for i in 0..10 {
252 /// delay_queue.insert(i, Duration::from_secs(i));
253 /// }
254 ///
255 /// // This will make the queue allocate additional storage
256 /// delay_queue.insert(11, Duration::from_secs(11));
257 ///
258 /// });
259 /// }
260 ///
261 /// ```
262 pub fn with_capacity(capacity: usize) -> DelayQueue<T> {
263 DelayQueue {
264 wheel: Wheel::new(),
265 slab: Slab::with_capacity(capacity),
266 expired: Stack::default(),
267 delay: None,
268 poll: wheel::Poll::new(0),
269 start: Instant::now(),
270 }
271 }
272
273 /// Insert `value` into the queue set to expire at a specific instant in
274 /// time.
275 ///
276 /// This function is identical to `insert`, but takes an `Instant` instead
277 /// of a `Duration`.
278 ///
279 /// `value` is stored in the queue until `when` is reached. At which point,
280 /// `value` will be returned from [`poll`]. If `when` has already been
281 /// reached, then `value` is immediately made available to poll.
282 ///
283 /// The return value represents the insertion and is used at an argument to
284 /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once
285 /// `value` is removed from the queue either by calling [`poll`] after
286 /// `when` is reached or by calling [`remove`]. At this point, the caller
287 /// must take care to not use the returned [`Key`] again as it may reference
288 /// a different item in the queue.
289 ///
290 /// See [type] level documentation for more details.
291 ///
292 /// # Panics
293 ///
294 /// This function panics if `when` is too far in the future.
295 ///
296 /// # Examples
297 ///
298 /// Basic usage
299 ///
300 /// ```rust
301 /// use kayrx_timer::{DelayQueue, Duration, Instant};
302 ///
303 /// use kayrx_karx;
304 ///
305 /// fn main() {
306 /// kayrx_karx::exec(async {
307 /// let mut delay_queue = DelayQueue::new();
308 /// let key = delay_queue.insert_at(
309 /// "foo", Instant::now() + Duration::from_secs(5));
310 ///
311 /// // Remove the entry
312 /// let item = delay_queue.remove(&key);
313 /// assert_eq!(*item.get_ref(), "foo");
314 /// });
315 /// }
316 /// ```
317 ///
318 /// [`poll`]: #method.poll
319 /// [`remove`]: #method.remove
320 /// [`reset`]: #method.reset
321 /// [`Key`]: struct.Key.html
322 /// [type]: #
323 pub fn insert_at(&mut self, value: T, when: Instant) -> Key {
324 assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded");
325
326 // Normalize the deadline. Values cannot be set to expire in the past.
327 let when = self.normalize_deadline(when);
328
329 // Insert the value in the store
330 let key = self.slab.insert(Data {
331 inner: value,
332 when,
333 expired: false,
334 next: None,
335 prev: None,
336 });
337
338 self.insert_idx(when, key);
339
340 // Set a new delay if the current's deadline is later than the one of the new item
341 let should_set_delay = if let Some(ref delay) = self.delay {
342 let current_exp = self.normalize_deadline(delay.deadline());
343 current_exp > when
344 } else {
345 true
346 };
347
348 if should_set_delay {
349 self.delay = Some(delay_until(self.start + Duration::from_millis(when)));
350 }
351
352 Key::new(key)
353 }
354
355 /// Attempt to pull out the next value of the delay queue, registering the
356 /// current task for wakeup if the value is not yet available, and returning
357 /// None if the queue is exhausted.
358 pub fn poll_expired(
359 &mut self,
360 cx: &mut task::Context<'_>,
361 ) -> Poll<Option<Result<Expired<T>, Error>>> {
362 let item = ready!(self.poll_idx(cx));
363 Poll::Ready(item.map(|result| {
364 result.map(|idx| {
365 let data = self.slab.remove(idx);
366 debug_assert!(data.next.is_none());
367 debug_assert!(data.prev.is_none());
368
369 Expired {
370 key: Key::new(idx),
371 data: data.inner,
372 deadline: self.start + Duration::from_millis(data.when),
373 }
374 })
375 }))
376 }
377
378 /// Insert `value` into the queue set to expire after the requested duration
379 /// elapses.
380 ///
381 /// This function is identical to `insert_at`, but takes a `Duration`
382 /// instead of an `Instant`.
383 ///
384 /// `value` is stored in the queue until `when` is reached. At which point,
385 /// `value` will be returned from [`poll`]. If `when` has already been
386 /// reached, then `value` is immediately made available to poll.
387 ///
388 /// The return value represents the insertion and is used at an argument to
389 /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once
390 /// `value` is removed from the queue either by calling [`poll`] after
391 /// `when` is reached or by calling [`remove`]. At this point, the caller
392 /// must take care to not use the returned [`Key`] again as it may reference
393 /// a different item in the queue.
394 ///
395 /// See [type] level documentation for more details.
396 ///
397 /// # Panics
398 ///
399 /// This function panics if `timeout` is greater than the maximum supported
400 /// duration.
401 ///
402 /// # Examples
403 ///
404 /// Basic usage
405 ///
406 /// ```rust
407 /// use kayrx_timer::DelayQueue;
408 /// use std::time::Duration;
409 /// use kayrx_karx;
410 ///
411 /// fn main() {
412 /// kayrx_karx::exec(async {
413 /// let mut delay_queue = DelayQueue::new();
414 /// let key = delay_queue.insert("foo", Duration::from_secs(5));
415 ///
416 /// // Remove the entry
417 /// let item = delay_queue.remove(&key);
418 /// assert_eq!(*item.get_ref(), "foo");
419 /// });
420 /// }
421 /// ```
422 ///
423 /// [`poll`]: #method.poll
424 /// [`remove`]: #method.remove
425 /// [`reset`]: #method.reset
426 /// [`Key`]: struct.Key.html
427 /// [type]: #
428 pub fn insert(&mut self, value: T, timeout: Duration) -> Key {
429 self.insert_at(value, Instant::now() + timeout)
430 }
431
432 fn insert_idx(&mut self, when: u64, key: usize) {
433 use self::wheel::{InsertError, Stack};
434
435 // Register the deadline with the timer wheel
436 match self.wheel.insert(when, key, &mut self.slab) {
437 Ok(_) => {}
438 Err((_, InsertError::Elapsed)) => {
439 self.slab[key].expired = true;
440 // The delay is already expired, store it in the expired queue
441 self.expired.push(key, &mut self.slab);
442 }
443 Err((_, err)) => panic!("invalid deadline; err={:?}", err),
444 }
445 }
446
447 /// Remove the item associated with `key` from the queue.
448 ///
449 /// There must be an item associated with `key`. The function returns the
450 /// removed item as well as the `Instant` at which it will the delay will
451 /// have expired.
452 ///
453 /// # Panics
454 ///
455 /// The function panics if `key` is not contained by the queue.
456 ///
457 /// # Examples
458 ///
459 /// Basic usage
460 ///
461 /// ```rust
462 /// use kayrx_timer::DelayQueue;
463 /// use std::time::Duration;
464 /// use kayrx_karx;
465 ///
466 /// fn main() {
467 /// kayrx_karx::exec(async {
468 /// let mut delay_queue = DelayQueue::new();
469 /// let key = delay_queue.insert("foo", Duration::from_secs(5));
470 ///
471 /// // Remove the entry
472 /// let item = delay_queue.remove(&key);
473 /// assert_eq!(*item.get_ref(), "foo");
474 /// });
475 /// }
476 /// ```
477 pub fn remove(&mut self, key: &Key) -> Expired<T> {
478 use crate::wheel::Stack;
479
480 // Special case the `expired` queue
481 if self.slab[key.index].expired {
482 self.expired.remove(&key.index, &mut self.slab);
483 } else {
484 self.wheel.remove(&key.index, &mut self.slab);
485 }
486
487 let data = self.slab.remove(key.index);
488
489 Expired {
490 key: Key::new(key.index),
491 data: data.inner,
492 deadline: self.start + Duration::from_millis(data.when),
493 }
494 }
495
496 /// Sets the delay of the item associated with `key` to expire at `when`.
497 ///
498 /// This function is identical to `reset` but takes an `Instant` instead of
499 /// a `Duration`.
500 ///
501 /// The item remains in the queue but the delay is set to expire at `when`.
502 /// If `when` is in the past, then the item is immediately made available to
503 /// the caller.
504 ///
505 /// # Panics
506 ///
507 /// This function panics if `when` is too far in the future or if `key` is
508 /// not contained by the queue.
509 ///
510 /// # Examples
511 ///
512 /// Basic usage
513 ///
514 /// ```rust
515 /// use kayrx_timer::{DelayQueue, Duration, Instant};
516 /// use kayrx_karx;
517 ///
518 /// fn main() {
519 /// kayrx_karx::exec(async {
520 /// let mut delay_queue = DelayQueue::new();
521 /// let key = delay_queue.insert("foo", Duration::from_secs(5));
522 ///
523 /// // "foo" is scheduled to be returned in 5 seconds
524 ///
525 /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10));
526 ///
527 /// // "foo"is now scheduled to be returned in 10 seconds
528 /// });
529 /// }
530 /// ```
531 pub fn reset_at(&mut self, key: &Key, when: Instant) {
532 self.wheel.remove(&key.index, &mut self.slab);
533
534 // Normalize the deadline. Values cannot be set to expire in the past.
535 let when = self.normalize_deadline(when);
536
537 self.slab[key.index].when = when;
538 self.insert_idx(when, key.index);
539
540 let next_deadline = self.next_deadline();
541 if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) {
542 delay.reset(deadline);
543 }
544 }
545
546 /// Returns the next time poll as determined by the wheel
547 fn next_deadline(&mut self) -> Option<Instant> {
548 self.wheel
549 .poll_at()
550 .map(|poll_at| self.start + Duration::from_millis(poll_at))
551 }
552
553 /// Sets the delay of the item associated with `key` to expire after
554 /// `timeout`.
555 ///
556 /// This function is identical to `reset_at` but takes a `Duration` instead
557 /// of an `Instant`.
558 ///
559 /// The item remains in the queue but the delay is set to expire after
560 /// `timeout`. If `timeout` is zero, then the item is immediately made
561 /// available to the caller.
562 ///
563 /// # Panics
564 ///
565 /// This function panics if `timeout` is greater than the maximum supported
566 /// duration or if `key` is not contained by the queue.
567 ///
568 /// # Examples
569 ///
570 /// Basic usage
571 ///
572 /// ```rust
573 /// use kayrx_timer::DelayQueue;
574 /// use std::time::Duration;
575 /// use kayrx_karx;
576 ///
577 /// fn main() {
578 /// kayrx_karx::exec(async {
579 /// let mut delay_queue = DelayQueue::new();
580 /// let key = delay_queue.insert("foo", Duration::from_secs(5));
581 ///
582 /// // "foo" is scheduled to be returned in 5 seconds
583 ///
584 /// delay_queue.reset(&key, Duration::from_secs(10));
585 ///
586 /// // "foo"is now scheduled to be returned in 10 seconds
587 /// });
588 /// }
589 /// ```
590 pub fn reset(&mut self, key: &Key, timeout: Duration) {
591 self.reset_at(key, Instant::now() + timeout);
592 }
593
594 /// Clears the queue, removing all items.
595 ///
596 /// After calling `clear`, [`poll`] will return `Ok(Ready(None))`.
597 ///
598 /// Note that this method has no effect on the allocated capacity.
599 ///
600 /// [`poll`]: #method.poll
601 ///
602 /// # Examples
603 ///
604 /// ```rust
605 /// use kayrx_timer::DelayQueue;
606 /// use std::time::Duration;
607 /// use kayrx_karx;
608 ///
609 /// fn main() {
610 /// kayrx_karx::exec(async {
611 /// let mut delay_queue = DelayQueue::new();
612 ///
613 /// delay_queue.insert("foo", Duration::from_secs(5));
614 ///
615 /// assert!(!delay_queue.is_empty());
616 ///
617 /// delay_queue.clear();
618 ///
619 /// assert!(delay_queue.is_empty());
620 /// });
621 /// }
622 /// ```
623 pub fn clear(&mut self) {
624 self.slab.clear();
625 self.expired = Stack::default();
626 self.wheel = Wheel::new();
627 self.delay = None;
628 }
629
630 /// Returns the number of elements the queue can hold without reallocating.
631 ///
632 /// # Examples
633 ///
634 /// ```rust
635 /// use kayrx_timer::DelayQueue;
636 ///
637 /// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10);
638 /// assert_eq!(delay_queue.capacity(), 10);
639 /// ```
640 pub fn capacity(&self) -> usize {
641 self.slab.capacity()
642 }
643
644 /// Returns the number of elements currently in the queue.
645 ///
646 /// # Examples
647 ///
648 /// ```rust
649 /// use kayrx_timer::DelayQueue;
650 /// use std::time::Duration;
651 /// use kayrx_karx;
652 ///
653 /// fn main() {
654 /// kayrx_karx::exec(async {
655 /// let mut delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10);
656 /// assert_eq!(delay_queue.len(), 0);
657 /// delay_queue.insert(3, Duration::from_secs(5));
658 /// assert_eq!(delay_queue.len(), 1);
659 /// });
660 /// }
661 /// ```
662 pub fn len(&self) -> usize {
663 self.slab.len()
664 }
665
666 /// Reserve capacity for at least `additional` more items to be queued
667 /// without allocating.
668 ///
669 /// `reserve` does nothing if the queue already has sufficient capacity for
670 /// `additional` more values. If more capacity is required, a new segment of
671 /// memory will be allocated and all existing values will be copied into it.
672 /// As such, if the queue is already very large, a call to `reserve` can end
673 /// up being expensive.
674 ///
675 /// The queue may reserve more than `additional` extra space in order to
676 /// avoid frequent reallocations.
677 ///
678 /// # Panics
679 ///
680 /// Panics if the new capacity exceeds the maximum number of entries the
681 /// queue can contain.
682 ///
683 /// # Examples
684 ///
685 /// ```
686 /// use kayrx_timer::DelayQueue;
687 /// use std::time::Duration;
688 /// use kayrx_karx;
689 ///
690 /// fn main() {
691 /// kayrx_karx::exec(async {
692 /// let mut delay_queue = DelayQueue::new();
693 ///
694 /// delay_queue.insert("hello", Duration::from_secs(10));
695 /// delay_queue.reserve(10);
696 ///
697 /// assert!(delay_queue.capacity() >= 11);
698 /// });
699 /// }
700 /// ```
701 pub fn reserve(&mut self, additional: usize) {
702 self.slab.reserve(additional);
703 }
704
705 /// Returns `true` if there are no items in the queue.
706 ///
707 /// Note that this function returns `false` even if all items have not yet
708 /// expired and a call to `poll` will return `NotReady`.
709 ///
710 /// # Examples
711 ///
712 /// ```
713 /// use kayrx_timer::DelayQueue;
714 /// use std::time::Duration;
715 /// use kayrx_karx;
716 ///
717 /// fn main() {
718 /// kayrx_karx::exec(async {
719 /// let mut delay_queue = DelayQueue::new();
720 /// assert!(delay_queue.is_empty());
721 ///
722 /// delay_queue.insert("hello", Duration::from_secs(5));
723 /// assert!(!delay_queue.is_empty());
724 /// });
725 /// }
726 /// ```
727 pub fn is_empty(&self) -> bool {
728 self.slab.is_empty()
729 }
730
731 /// Polls the queue, returning the index of the next slot in the slab that
732 /// should be returned.
733 ///
734 /// A slot should be returned when the associated deadline has been reached.
735 fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<usize, Error>>> {
736 use self::wheel::Stack;
737
738 let expired = self.expired.pop(&mut self.slab);
739
740 if expired.is_some() {
741 return Poll::Ready(expired.map(Ok));
742 }
743
744 loop {
745 if let Some(ref mut delay) = self.delay {
746 if !delay.is_elapsed() {
747 ready!(Pin::new(&mut *delay).poll(cx));
748 }
749
750 let now = crate::ms(delay.deadline() - self.start, crate::Round::Down);
751
752 self.poll = wheel::Poll::new(now);
753 }
754
755 self.delay = None;
756
757 if let Some(idx) = self.wheel.poll(&mut self.poll, &mut self.slab) {
758 return Poll::Ready(Some(Ok(idx)));
759 }
760
761 if let Some(deadline) = self.next_deadline() {
762 self.delay = Some(delay_until(deadline));
763 } else {
764 return Poll::Ready(None);
765 }
766 }
767 }
768
769 fn normalize_deadline(&self, when: Instant) -> u64 {
770 let when = if when < self.start {
771 0
772 } else {
773 crate::ms(when - self.start, crate::Round::Up)
774 };
775
776 cmp::max(when, self.wheel.elapsed())
777 }
778}
779
780// We never put `T` in a `Pin`...
781impl<T> Unpin for DelayQueue<T> {}
782
783impl<T> Default for DelayQueue<T> {
784 fn default() -> DelayQueue<T> {
785 DelayQueue::new()
786 }
787}
788
789impl<T> futures_core::Stream for DelayQueue<T> {
790 // DelayQueue seems much more specific, where a user may care that it
791 // has reached capacity, so return those errors instead of panicking.
792 type Item = Result<Expired<T>, Error>;
793
794 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
795 DelayQueue::poll_expired(self.get_mut(), cx)
796 }
797}
798
799impl<T> wheel::Stack for Stack<T> {
800 type Owned = usize;
801 type Borrowed = usize;
802 type Store = Slab<Data<T>>;
803
804 fn is_empty(&self) -> bool {
805 self.head.is_none()
806 }
807
808 fn push(&mut self, item: Self::Owned, store: &mut Self::Store) {
809 // Ensure the entry is not already in a stack.
810 debug_assert!(store[item].next.is_none());
811 debug_assert!(store[item].prev.is_none());
812
813 // Remove the old head entry
814 let old = self.head.take();
815
816 if let Some(idx) = old {
817 store[idx].prev = Some(item);
818 }
819
820 store[item].next = old;
821 self.head = Some(item)
822 }
823
824 fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned> {
825 if let Some(idx) = self.head {
826 self.head = store[idx].next;
827
828 if let Some(idx) = self.head {
829 store[idx].prev = None;
830 }
831
832 store[idx].next = None;
833 debug_assert!(store[idx].prev.is_none());
834
835 Some(idx)
836 } else {
837 None
838 }
839 }
840
841 fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) {
842 assert!(store.contains(*item));
843
844 // Ensure that the entry is in fact contained by the stack
845 debug_assert!({
846 // This walks the full linked list even if an entry is found.
847 let mut next = self.head;
848 let mut contains = false;
849
850 while let Some(idx) = next {
851 if idx == *item {
852 debug_assert!(!contains);
853 contains = true;
854 }
855
856 next = store[idx].next;
857 }
858
859 contains
860 });
861
862 if let Some(next) = store[*item].next {
863 store[next].prev = store[*item].prev;
864 }
865
866 if let Some(prev) = store[*item].prev {
867 store[prev].next = store[*item].next;
868 } else {
869 self.head = store[*item].next;
870 }
871
872 store[*item].next = None;
873 store[*item].prev = None;
874 }
875
876 fn when(item: &Self::Borrowed, store: &Self::Store) -> u64 {
877 store[*item].when
878 }
879}
880
881impl<T> Default for Stack<T> {
882 fn default() -> Stack<T> {
883 Stack {
884 head: None,
885 _p: PhantomData,
886 }
887 }
888}
889
890impl Key {
891 pub(crate) fn new(index: usize) -> Key {
892 Key { index }
893 }
894}
895
896impl<T> Expired<T> {
897 /// Returns a reference to the inner value.
898 pub fn get_ref(&self) -> &T {
899 &self.data
900 }
901
902 /// Returns a mutable reference to the inner value.
903 pub fn get_mut(&mut self) -> &mut T {
904 &mut self.data
905 }
906
907 /// Consumes `self` and returns the inner value.
908 pub fn into_inner(self) -> T {
909 self.data
910 }
911}