use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::time::Duration;
use tokio_stream::Stream;
#[cfg(not(target_arch = "wasm32"))]
type DelayQueue<T> = tokio_util::time::DelayQueue<T>;
#[cfg(not(target_arch = "wasm32"))]
pub use tokio_util::time::delay_queue::Expired;
#[cfg(not(target_arch = "wasm32"))]
pub type QueueKey = tokio_util::time::delay_queue::Key;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(target_arch = "wasm32")]
type DelayQueue<T> = wasmtimer::tokio_util::DelayQueue<T>;
#[cfg(target_arch = "wasm32")]
pub use wasmtimer::tokio_util::delay_queue::Expired;
#[cfg(target_arch = "wasm32")]
pub type QueueKey = wasmtimer::tokio_util::delay_queue::Key;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
pub struct NonExhaustiveDelayQueue<T> {
inner: DelayQueue<T>,
waker: Option<Waker>,
}
impl<T> NonExhaustiveDelayQueue<T> {
pub fn new() -> Self {
NonExhaustiveDelayQueue {
inner: DelayQueue::new(),
waker: None,
}
}
pub fn insert(&mut self, value: T, timeout: Duration) -> QueueKey {
let key = self.inner.insert(value, timeout);
if let Some(waker) = self.waker.take() {
waker.wake()
}
key
}
pub fn insert_at(&mut self, value: T, when: Instant) -> QueueKey {
let key = self.inner.insert_at(value, when);
if let Some(waker) = self.waker.take() {
waker.wake()
}
key
}
pub fn remove(&mut self, key: &QueueKey) -> Expired<T> {
self.inner.remove(key)
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
impl<T> Default for NonExhaustiveDelayQueue<T> {
fn default() -> Self {
NonExhaustiveDelayQueue::new()
}
}
impl<T> Stream for NonExhaustiveDelayQueue<T> {
type Item = <DelayQueue<T> as Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => {
self.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}