nym_nonexhaustive_delayqueue/
lib.rs1use std::pin::Pin;
5use std::task::{Context, Poll, Waker};
6use std::time::Duration;
7use tokio_stream::Stream;
8
9#[cfg(not(target_arch = "wasm32"))]
13type DelayQueue<T> = tokio_util::time::DelayQueue<T>;
14#[cfg(not(target_arch = "wasm32"))]
15pub use tokio_util::time::delay_queue::Expired;
16#[cfg(not(target_arch = "wasm32"))]
17pub type QueueKey = tokio_util::time::delay_queue::Key;
18#[cfg(not(target_arch = "wasm32"))]
19use tokio::time::Instant;
20
21#[cfg(target_arch = "wasm32")]
22type DelayQueue<T> = wasmtimer::tokio_util::DelayQueue<T>;
23#[cfg(target_arch = "wasm32")]
24pub use wasmtimer::tokio_util::delay_queue::Expired;
25#[cfg(target_arch = "wasm32")]
26pub type QueueKey = wasmtimer::tokio_util::delay_queue::Key;
27#[cfg(target_arch = "wasm32")]
28use wasmtimer::std::Instant;
29
30pub struct NonExhaustiveDelayQueue<T> {
32 inner: DelayQueue<T>,
33 waker: Option<Waker>,
34}
35
36impl<T> NonExhaustiveDelayQueue<T> {
38 pub fn new() -> Self {
39 NonExhaustiveDelayQueue {
40 inner: DelayQueue::new(),
41 waker: None,
42 }
43 }
44
45 pub fn insert(&mut self, value: T, timeout: Duration) -> QueueKey {
46 let key = self.inner.insert(value, timeout);
47 if let Some(waker) = self.waker.take() {
48 waker.wake()
50 }
51 key
52 }
53
54 pub fn insert_at(&mut self, value: T, when: Instant) -> QueueKey {
55 let key = self.inner.insert_at(value, when);
56 if let Some(waker) = self.waker.take() {
57 waker.wake()
59 }
60 key
61 }
62
63 pub fn remove(&mut self, key: &QueueKey) -> Expired<T> {
66 self.inner.remove(key)
67 }
68
69 pub fn len(&self) -> usize {
70 self.inner.len()
71 }
72
73 pub fn is_empty(&self) -> bool {
74 self.inner.is_empty()
75 }
76}
77
78impl<T> Default for NonExhaustiveDelayQueue<T> {
79 fn default() -> Self {
80 NonExhaustiveDelayQueue::new()
81 }
82}
83
84impl<T> Stream for NonExhaustiveDelayQueue<T> {
85 type Item = <DelayQueue<T> as Stream>::Item;
86
87 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88 match Pin::new(&mut self.inner).poll_next(cx) {
89 Poll::Pending => Poll::Pending,
90 Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
91 Poll::Ready(None) => {
92 self.waker = Some(cx.waker().clone());
94 Poll::Pending
95 }
96 }
97 }
98}
99
100