1#![cfg_attr(feature = "docs", feature(doc_cfg))]
65#![warn(missing_docs, missing_debug_implementations)]
66#![doc(test(attr(deny(warnings))))]
67#![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
68
69#[cfg(feature = "async-std")]
70use async_std::task;
71use futures_intrusive::{
72 buffer::{FixedHeapBuf, GrowingHeapBuf, RingBuf},
73 channel::shared::{generic_channel, ChannelReceiveFuture, GenericReceiver},
74};
75use futures_timer::Delay;
76use pin_project_lite::pin_project;
77use std::{
78 error::Error,
79 fmt::{self, Display},
80 future::Future,
81 pin::Pin,
82 task::{Context, Poll},
83 time::{Duration, Instant},
84};
85#[cfg(feature = "tokio")]
86use tokio::task;
87
88pub use futures_intrusive::channel::shared::{GenericSender, Receiver};
89
90#[derive(Debug, Clone)]
92pub struct DelayQueue<T: 'static, A: RingBuf<Item = T>> {
93 expired: GenericSender<parking_lot::RawMutex, T, A>,
95}
96
97#[derive(Debug)]
99pub struct DelayHandle {
100 reset: GenericSender<parking_lot::RawMutex, DelayReset, FixedHeapBuf<DelayReset>>,
102}
103
104enum DelayReset {
105 NewDuration(Duration),
106 Cancel,
107}
108
109#[derive(Debug, Clone, Copy, PartialEq)]
112pub struct ErrorAlreadyExpired {}
113
114impl Error for ErrorAlreadyExpired {
115 fn description(&self) -> &str {
116 "delay already expired"
117 }
118}
119
120impl Display for ErrorAlreadyExpired {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 write!(f, "Delay already expired")
123 }
124}
125
126impl DelayHandle {
127 pub async fn reset_at(self, when: Instant) -> Result<Self, ErrorAlreadyExpired> {
130 let now = Instant::now();
131 let dur = if when <= now {
132 Duration::from_nanos(0)
133 } else {
134 when - now
135 };
136 self.reset(dur).await
137 }
138
139 pub async fn reset(self, dur: Duration) -> Result<Self, ErrorAlreadyExpired> {
142 self.reset
143 .send(DelayReset::NewDuration(dur))
144 .await
145 .map_err(|_| ErrorAlreadyExpired {})?;
146 Ok(self)
147 }
148
149 pub async fn cancel(self) -> Result<(), ErrorAlreadyExpired> {
151 self.reset
152 .send(DelayReset::Cancel)
153 .await
154 .map_err(|_| ErrorAlreadyExpired {})
155 }
156}
157
158pub fn delay_queue<T: 'static + Send>() -> (
178 DelayQueue<T, GrowingHeapBuf<T>>,
179 GenericReceiver<parking_lot::RawMutex, T, GrowingHeapBuf<T>>,
180) {
181 let (tx, rx) = generic_channel(0);
182 (DelayQueue { expired: tx }, rx)
183}
184
185pin_project! {
186 struct DelayedItem<T> {
187 value: Option<T>,
188 delay: Delay,
189 reset_rx: GenericReceiver<parking_lot::RawMutex, DelayReset, FixedHeapBuf<DelayReset>>,
190 reset: ChannelReceiveFuture<parking_lot::RawMutex, DelayReset>,
191 handle_dropped: bool,
192 }
193}
194
195impl<T> Future for DelayedItem<T> {
196 type Output = Option<T>;
197
198 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
199 if !self.handle_dropped {
200 while let Poll::Ready(v) = unsafe { Pin::new_unchecked(&mut self.reset).poll(cx) } {
206 match v {
207 Some(reset) => match reset {
208 DelayReset::Cancel => return Poll::Ready(None),
209 DelayReset::NewDuration(dur) => self.delay = Delay::new(dur),
210 },
211 None => {
213 self.handle_dropped = true;
214 break;
216 }
217 }
218 self.reset = self.reset_rx.receive();
220 }
221 }
222
223 match Pin::new(&mut self.delay).poll(cx) {
225 Poll::Ready(_) => Poll::Ready(self.value.take()),
226 Poll::Pending => Poll::Pending,
227 }
228 }
229}
230
231impl<T, A> DelayQueue<T, A>
232where
233 T: 'static + Send,
234 A: 'static + RingBuf<Item = T> + Send,
235{
236 pub fn insert(&self, value: T, dur: Duration) -> DelayHandle {
239 self.new_handle_with_future(value, dur)
240 }
241
242 pub fn insert_at(&self, value: T, when: Instant) -> DelayHandle {
244 let now = Instant::now();
245 let dur = if now >= when {
246 Duration::from_nanos(0)
247 } else {
248 when - now
249 };
250 self.new_handle_with_future(value, dur)
251 }
252
253 fn new_handle_with_future(&self, value: T, dur: Duration) -> DelayHandle {
254 let (reset_tx, reset_rx) = generic_channel::<parking_lot::RawMutex, _, FixedHeapBuf<_>>(0);
255 let expired = self.expired.clone();
256 let reset = reset_rx.receive();
257 let delayed_item = DelayedItem {
258 value: Some(value),
259 delay: Delay::new(dur),
260 reset_rx,
261 reset,
262 handle_dropped: false,
263 };
264 task::spawn(async move {
265 if let Some(v) = delayed_item.await {
266 let _ = expired.send(v).await;
267 }
268 });
269 DelayHandle { reset: reset_tx }
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use async_std::future::timeout;
277
278 #[async_std::test]
279 async fn insert() {
280 let (delay_queue, rx) = delay_queue::<i32>();
281 delay_queue.insert(1, Duration::from_millis(10));
282 delay_queue.insert(2, Duration::from_millis(5));
283 assert_eq!(
284 timeout(Duration::from_millis(8), rx.receive()).await,
285 Ok(Some(2))
286 );
287 assert_eq!(
288 timeout(Duration::from_millis(7), rx.receive()).await,
289 Ok(Some(1))
290 );
291 }
292
293 #[async_std::test]
294 async fn reset() {
295 let (delay_queue, rx) = delay_queue::<i32>();
296 let delay_handle = delay_queue.insert(1, Duration::from_millis(100));
297 assert!(delay_handle.reset(Duration::from_millis(20)).await.is_ok());
298
299 assert_eq!(
300 timeout(Duration::from_millis(40), rx.receive()).await,
301 Ok(Some(1))
302 );
303
304 let delay_handle = delay_queue.insert(2, Duration::from_millis(100));
305 assert!(delay_handle
306 .reset_at(Instant::now() + Duration::from_millis(20))
307 .await
308 .is_ok());
309
310 assert_eq!(
311 timeout(Duration::from_millis(40), rx.receive()).await,
312 Ok(Some(2))
313 );
314 }
315
316 #[async_std::test]
317 async fn cancel() {
318 let (delay_queue, rx) = delay_queue::<i32>();
319 let delay_handle = delay_queue.insert(1, Duration::from_millis(200));
320 task::sleep(Duration::from_millis(50)).await;
322 let instant = Instant::now();
323 assert!(delay_handle.cancel().await.is_ok());
324 assert!(instant.elapsed() < Duration::from_millis(10));
325 assert!(timeout(Duration::from_millis(500), rx.receive())
326 .await
327 .is_err());
328 }
329}