1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
use std::cmp::Reverse; use std::collections::BinaryHeap; use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; use std::time::Instant; use tokio::sync::Notify; use tokio::time::timeout; #[derive(Eq, PartialEq, Ord, PartialOrd)] struct Item<T> where T: Ord, { expiration: Reverse<Option<Instant>>, inner: T, } struct SharedInner<T> where T: Ord, { storage: Mutex<BinaryHeap<Item<T>>>, notify: Notify, } #[derive(Clone)] pub struct TimedQueue<T> where T: Ord, { inner: Arc<SharedInner<T>>, } impl<T> TimedQueue<T> where T: Ord, { pub fn new() -> Self { Self { inner: Arc::new(SharedInner { storage: Mutex::new(BinaryHeap::new()), notify: Notify::new(), }), } } pub fn enqueue(&self, t: T, expiration: Option<Instant>) { self.inner.storage.lock().unwrap().push(Item { expiration: Reverse(expiration), inner: t, }); self.inner.notify.notify_one(); } fn peek_inner(&self) -> Result<(T, Option<Instant>), Option<Duration>> { let now = Instant::now(); let mut lock = self.inner.storage.lock().unwrap(); let (ready, duration) = match lock.peek() { Some(Item { expiration: Reverse(Some(expiration)), .. }) => { if *expiration < now { (true, None) } else { (false, Some(*expiration - now)) } } Some(Item { expiration: Reverse(None), .. }) => (true, None), None => (false, None), }; if ready { let Item { expiration: Reverse(expiration), inner: item, } = lock.pop().unwrap(); Ok((item, expiration)) } else { Err(duration) } } pub async fn dequeue(&self) -> (T, Option<Instant>) { loop { match self.peek_inner() { Ok((item, duration)) => { break (item, duration); } Err(Some(duration)) => { let _ = timeout(duration, self.inner.notify.notified()).await; } Err(None) => { self.inner.notify.notified().await; } } } } }