js_utils/
queue.rs

1//! Async queue.
2
3use futures::{future::FusedFuture, Future};
4use std::{
5    cell::RefCell,
6    collections::VecDeque,
7    pin::Pin,
8    rc::{Rc, Weak},
9    task::{Context, Poll, Waker},
10};
11
12/// FIFO queue with async pop.
13pub struct Queue<T> {
14    state: RefCell<State<T>>,
15    capacity: usize,
16}
17
18struct State<T> {
19    buffer: VecDeque<T>,
20    wakers: VecDeque<Weak<RefCell<PopWaker>>>,
21}
22
23impl<T> State<T> {
24    fn new() -> Self {
25        State {
26            buffer: VecDeque::new(),
27            wakers: VecDeque::new(),
28        }
29    }
30}
31
32impl<T> Queue<T> {
33    /// Creates new queue with unbounded capacity.
34    pub fn new() -> Self {
35        Queue {
36            state: RefCell::new(State::new()),
37            capacity: 0,
38        }
39    }
40
41    /// Creates new queue with given `capacity`.
42    ///
43    /// `capacity` must be greater than 0 - it'll panic otherwise.
44    pub fn with_capacity(capacity: usize) -> Self {
45        assert!(capacity > 0, "capacity must be greater than 0");
46        Queue {
47            state: RefCell::new(State::new()),
48            capacity,
49        }
50    }
51
52    /// Pushes `element` into the queue.
53    ///
54    /// If queue is full it will push out the last (oldest) element
55    /// out of the queue.
56    pub fn push(&self, element: T) {
57        let mut state = self.state.borrow_mut();
58        state.buffer.push_front(element);
59        if self.capacity > 0 {
60            state.buffer.truncate(self.capacity)
61        }
62        drop(state);
63        self.wake_next();
64    }
65
66    /// Pops (asynchronously) element off the queue.
67    ///
68    /// It means that if queue is currently empty `await` will
69    /// wait till element is pushed into the queue.
70    #[must_use]
71    pub fn pop(&self) -> Pop<T> {
72        Pop {
73            queue: self,
74            terminated: false,
75            waker: None,
76        }
77    }
78
79    /// Pops element off the queue.
80    ///
81    /// Returns `None` if queue is currently empty.
82    pub fn try_pop(&self) -> Option<T> {
83        self.state.borrow_mut().buffer.pop_back()
84    }
85
86    /// Returns count of elements currently in the queue.
87    pub fn len(&self) -> usize {
88        self.state.borrow_mut().buffer.len()
89    }
90
91    /// Returns `true` if queue is currently empty.
92    pub fn is_empty(&self) -> bool {
93        self.state.borrow_mut().buffer.is_empty()
94    }
95
96    /// Returns `true` if queue is currently full.
97    pub fn is_full(&self) -> bool {
98        if self.capacity == 0 {
99            false
100        } else {
101            self.len() == self.capacity
102        }
103    }
104
105    fn wake_next(&self) {
106        while let Some(waker) = self.state.borrow_mut().wakers.pop_front() {
107            if let Some(waker) = waker.upgrade() {
108                let mut waker = waker.borrow_mut();
109                waker.woken = true;
110                waker.waker.wake_by_ref();
111                break;
112            }
113        }
114    }
115}
116
117impl<T> Default for Queue<T> {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123/// Future returned by [pop] method.
124///
125/// [pop]: Queue::pop
126pub struct Pop<'a, T> {
127    queue: &'a Queue<T>,
128    terminated: bool,
129    waker: Option<Rc<RefCell<PopWaker>>>,
130}
131
132struct PopWaker {
133    waker: Waker,
134    woken: bool,
135}
136
137impl PopWaker {
138    fn new(waker: Waker) -> Self {
139        PopWaker {
140            waker,
141            woken: false,
142        }
143    }
144
145    fn update(&mut self, waker: &Waker) {
146        if !self.waker.will_wake(waker) {
147            self.waker = waker.clone();
148        }
149    }
150}
151
152impl<T> Drop for Pop<'_, T> {
153    fn drop(&mut self) {
154        // We were woken but didn't receive anything, wake up another
155        if self.waker.take().is_some_and(|waker| waker.borrow().woken) {
156            self.queue.wake_next();
157        }
158    }
159}
160
161impl<T> Future for Pop<'_, T> {
162    type Output = T;
163
164    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
165        if self.terminated {
166            Poll::Pending
167        } else {
168            let mut state = self.queue.state.borrow_mut();
169            match state.buffer.pop_back() {
170                Some(value) => {
171                    self.terminated = true;
172                    self.waker = None;
173                    Poll::Ready(value)
174                }
175                None => {
176                    if let Some(waker) = &self.waker {
177                        let mut waker = waker.borrow_mut();
178                        waker.update(cx.waker());
179                        waker.woken = false;
180                    } else {
181                        let waker = Rc::new(RefCell::new(PopWaker::new(cx.waker().clone())));
182                        self.waker = Some(waker);
183                    }
184                    state
185                        .wakers
186                        .push_front(Rc::downgrade(self.waker.as_ref().unwrap()));
187                    Poll::Pending
188                }
189            }
190        }
191    }
192}
193
194impl<T> FusedFuture for Pop<'_, T> {
195    fn is_terminated(&self) -> bool {
196        self.terminated
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use std::{rc::Rc, time::Duration};
203
204    use futures::{join, FutureExt};
205    use wasm_bindgen_test::wasm_bindgen_test;
206
207    use crate::{sleep, spawn, Queue};
208
209    #[wasm_bindgen_test]
210    async fn test_unbounded() {
211        let queue = Queue::new();
212
213        assert_eq!(queue.try_pop(), None);
214
215        assert_eq!(queue.len(), 0);
216        assert!(queue.is_empty());
217        assert!(!queue.is_full());
218
219        queue.push(1);
220        queue.push(2);
221        queue.push(3);
222
223        assert_eq!(queue.len(), 3);
224        assert!(!queue.is_full());
225
226        assert_eq!(queue.try_pop().unwrap(), 1);
227        assert_eq!(queue.pop().await, 2);
228        assert_eq!(queue.len(), 1);
229        assert_eq!(queue.pop().await, 3);
230
231        assert_eq!(queue.len(), 0);
232        assert!((queue.is_empty()));
233
234        let queue = Rc::new(queue);
235        let queue_clone = queue.clone();
236        spawn(async move {
237            sleep(Duration::from_secs(1)).await;
238            queue_clone.push(4);
239            queue_clone.push(5);
240            sleep(Duration::from_secs(1)).await;
241            queue_clone.push(6);
242        });
243
244        assert_eq!(queue.pop().await, 4);
245        assert_eq!(queue.len(), 1);
246        assert_eq!(queue.pop().await, 5);
247        assert_eq!(queue.pop().await, 6);
248
249        assert_eq!(queue.len(), 0);
250        assert!((queue.is_empty()));
251
252        assert_eq!(queue.pop().now_or_never(), None);
253        assert_eq!(queue.pop().now_or_never(), None);
254        assert_eq!(queue.pop().now_or_never(), None);
255        let queue_clone = queue.clone();
256        let task = spawn(async move {
257            assert_eq!(queue_clone.pop().now_or_never(), None);
258            assert_eq!(queue_clone.pop().now_or_never(), None);
259            join![queue_clone.pop(), queue_clone.pop(), queue_clone.pop()]
260        });
261        sleep(Duration::from_secs(1)).await;
262        queue.push(1);
263        queue.push(2);
264        queue.push(3);
265
266        assert_eq!(task.await.unwrap(), (1, 2, 3));
267    }
268
269    #[wasm_bindgen_test]
270    async fn test_bounded() {
271        let queue = Queue::with_capacity(3);
272
273        assert_eq!(queue.try_pop(), None);
274
275        assert_eq!(queue.len(), 0);
276        assert!(queue.is_empty());
277        assert!(!queue.is_full());
278
279        queue.push(1);
280        queue.push(2);
281        queue.push(3);
282
283        assert_eq!(queue.len(), 3);
284        assert!(queue.is_full());
285    }
286}