nym_nonexhaustive_delayqueue/
lib.rs

1// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: Apache-2.0
3
4use std::pin::Pin;
5use std::task::{Context, Poll, Waker};
6use std::time::Duration;
7use tokio_stream::Stream;
8
9// this is a copy of tokio-util delay_queue with `Sleep` and `Instant` being replaced with
10// `wasm_timer` equivalents
11
12#[cfg(not(target_arch = "wasm32"))]
13type DelayQueue<T> = tokio_util::time::DelayQueue<T>;
14#[cfg(not(target_arch = "wasm32"))]
15pub use tokio_util::time::delay_queue::Expired;
16#[cfg(not(target_arch = "wasm32"))]
17pub type QueueKey = tokio_util::time::delay_queue::Key;
18#[cfg(not(target_arch = "wasm32"))]
19use tokio::time::Instant;
20
21#[cfg(target_arch = "wasm32")]
22type DelayQueue<T> = wasmtimer::tokio_util::DelayQueue<T>;
23#[cfg(target_arch = "wasm32")]
24pub use wasmtimer::tokio_util::delay_queue::Expired;
25#[cfg(target_arch = "wasm32")]
26pub type QueueKey = wasmtimer::tokio_util::delay_queue::Key;
27#[cfg(target_arch = "wasm32")]
28use wasmtimer::std::Instant;
29
30/// A variant of tokio's `DelayQueue`, such that its `Stream` implementation will never return a 'None'.
31pub struct NonExhaustiveDelayQueue<T> {
32    inner: DelayQueue<T>,
33    waker: Option<Waker>,
34}
35
36// more methods of underlying DelayQueue will get exposed as we need them
37impl<T> NonExhaustiveDelayQueue<T> {
38    pub fn new() -> Self {
39        NonExhaustiveDelayQueue {
40            inner: DelayQueue::new(),
41            waker: None,
42        }
43    }
44
45    pub fn insert(&mut self, value: T, timeout: Duration) -> QueueKey {
46        let key = self.inner.insert(value, timeout);
47        if let Some(waker) = self.waker.take() {
48            // we were waiting for an item - wake the executor!
49            waker.wake()
50        }
51        key
52    }
53
54    pub fn insert_at(&mut self, value: T, when: Instant) -> QueueKey {
55        let key = self.inner.insert_at(value, when);
56        if let Some(waker) = self.waker.take() {
57            // we were waiting for an item - wake the executor!
58            waker.wake()
59        }
60        key
61    }
62
63    // TODO: it seems like this one can cause panic in very rare edge cases, however,
64    // I can't seem to be able to reproduce it at all.
65    pub fn remove(&mut self, key: &QueueKey) -> Expired<T> {
66        self.inner.remove(key)
67    }
68
69    pub fn len(&self) -> usize {
70        self.inner.len()
71    }
72
73    pub fn is_empty(&self) -> bool {
74        self.inner.is_empty()
75    }
76}
77
78impl<T> Default for NonExhaustiveDelayQueue<T> {
79    fn default() -> Self {
80        NonExhaustiveDelayQueue::new()
81    }
82}
83
84impl<T> Stream for NonExhaustiveDelayQueue<T> {
85    type Item = <DelayQueue<T> as Stream>::Item;
86
87    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88        match Pin::new(&mut self.inner).poll_next(cx) {
89            Poll::Pending => Poll::Pending,
90            Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
91            Poll::Ready(None) => {
92                // we'll need to keep the waker to notify the executor once we get new item
93                self.waker = Some(cx.waker().clone());
94                Poll::Pending
95            }
96        }
97    }
98}
99
100// #[cfg(test)]
101// mod tests {
102//     use super::*;
103//
104//
105// }