js_utils/
queue.rs

1//! Async queue.
2
3use futures::{future::FusedFuture, Future};
4use std::{
5    cell::RefCell,
6    collections::VecDeque,
7    pin::Pin,
8    rc::{Rc, Weak},
9    task::{Context, Poll, Waker},
10};
11
12/// FIFO queue with async pop.
13pub struct Queue<T> {
14    state: RefCell<State<T>>,
15    capacity: usize,
16}
17
18struct State<T> {
19    buffer: VecDeque<T>,
20    wakers: VecDeque<Weak<RefCell<PopWaker>>>,
21}
22
23impl<T> State<T> {
24    fn new() -> Self {
25        State {
26            buffer: VecDeque::new(),
27            wakers: VecDeque::new(),
28        }
29    }
30}
31
32impl<T> Queue<T> {
33    /// Creates new queue with unbounded capacity.
34    pub fn new() -> Self {
35        Queue {
36            state: RefCell::new(State::new()),
37            capacity: 0,
38        }
39    }
40
41    /// Creates new queue with given `capacity`.
42    ///
43    /// `capacity` must be greater than 0 - it'll panic otherwise.
44    pub fn with_capacity(capacity: usize) -> Self {
45        assert!(capacity > 0, "capacity must be greater than 0");
46        Queue {
47            state: RefCell::new(State::new()),
48            capacity,
49        }
50    }
51
52    /// Pushes `element` into the queue.
53    ///
54    /// If queue is full it will push out the last (oldest) element
55    /// out of the queue.
56    pub fn push(&self, element: T) {
57        let mut state = self.state.borrow_mut();
58        state.buffer.push_front(element);
59        if self.capacity > 0 {
60            state.buffer.truncate(self.capacity)
61        }
62        drop(state);
63        self.wake_next();
64    }
65
66    /// Pops (asynchronously) element off the queue.
67    ///
68    /// It means that if queue is currently empty `await` will
69    /// wait till element is pushed into the queue.
70    #[must_use]
71    pub fn pop(&self) -> Pop<T> {
72        Pop {
73            queue: self,
74            terminated: false,
75            waker: None,
76        }
77    }
78
79    /// Pops element off the queue.
80    ///
81    /// Returns `None` if queue is currently empty.
82    pub fn try_pop(&self) -> Option<T> {
83        self.state.borrow_mut().buffer.pop_back()
84    }
85
86    /// Returns count of elements currently in the queue.
87    pub fn len(&self) -> usize {
88        self.state.borrow_mut().buffer.len()
89    }
90
91    /// Returns `true` if queue is currently empty.
92    pub fn is_empty(&self) -> bool {
93        self.state.borrow_mut().buffer.is_empty()
94    }
95
96    /// Returns `true` if queue is currently full.
97    pub fn is_full(&self) -> bool {
98        if self.capacity == 0 {
99            false
100        } else {
101            self.len() == self.capacity
102        }
103    }
104
105    fn wake_next(&self) {
106        while let Some(waker) = self.state.borrow_mut().wakers.pop_front() {
107            if let Some(waker) = waker.upgrade() {
108                let mut waker = waker.borrow_mut();
109                waker.woken = true;
110                waker.waker.wake_by_ref();
111                break;
112            }
113        }
114    }
115}
116
117impl<T> Default for Queue<T> {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123/// Future returned by [pop] method.
124///
125/// [pop]: Queue::pop
126pub struct Pop<'a, T> {
127    queue: &'a Queue<T>,
128    terminated: bool,
129    waker: Option<Rc<RefCell<PopWaker>>>,
130}
131
132struct PopWaker {
133    waker: Waker,
134    woken: bool,
135}
136
137impl PopWaker {
138    fn new(waker: Waker) -> Self {
139        PopWaker {
140            waker,
141            woken: false,
142        }
143    }
144
145    fn update(&mut self, waker: &Waker) {
146        if !self.waker.will_wake(waker) {
147            self.waker = waker.clone();
148        }
149    }
150}
151
152impl<'a, T> Drop for Pop<'a, T> {
153    fn drop(&mut self) {
154        // We were woken but didn't receive anything, wake up another
155        if self
156            .waker
157            .take()
158            .map_or(false, |waker| waker.borrow().woken)
159        {
160            self.queue.wake_next();
161        }
162    }
163}
164
165impl<'a, T> Future for Pop<'a, T> {
166    type Output = T;
167
168    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
169        if self.terminated {
170            Poll::Pending
171        } else {
172            let mut state = self.queue.state.borrow_mut();
173            match state.buffer.pop_back() {
174                Some(value) => {
175                    self.terminated = true;
176                    self.waker = None;
177                    Poll::Ready(value)
178                }
179                None => {
180                    if let Some(waker) = &self.waker {
181                        let mut waker = waker.borrow_mut();
182                        waker.update(cx.waker());
183                        waker.woken = false;
184                    } else {
185                        let waker = Rc::new(RefCell::new(PopWaker::new(cx.waker().clone())));
186                        self.waker = Some(waker);
187                    }
188                    state
189                        .wakers
190                        .push_front(Rc::downgrade(self.waker.as_ref().unwrap()));
191                    Poll::Pending
192                }
193            }
194        }
195    }
196}
197
198impl<'a, T> FusedFuture for Pop<'a, T> {
199    fn is_terminated(&self) -> bool {
200        self.terminated
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use std::{rc::Rc, time::Duration};
207
208    use futures::{join, FutureExt};
209    use wasm_bindgen_test::wasm_bindgen_test;
210
211    use crate::{sleep, spawn, Queue};
212
213    #[wasm_bindgen_test]
214    async fn test_unbounded() {
215        let queue = Queue::new();
216
217        assert_eq!(queue.try_pop(), None);
218
219        assert_eq!(queue.len(), 0);
220        assert!(queue.is_empty());
221        assert!(!queue.is_full());
222
223        queue.push(1);
224        queue.push(2);
225        queue.push(3);
226
227        assert_eq!(queue.len(), 3);
228        assert!(!queue.is_full());
229
230        assert_eq!(queue.try_pop().unwrap(), 1);
231        assert_eq!(queue.pop().await, 2);
232        assert_eq!(queue.len(), 1);
233        assert_eq!(queue.pop().await, 3);
234
235        assert_eq!(queue.len(), 0);
236        assert!((queue.is_empty()));
237
238        let queue = Rc::new(queue);
239        let queue_clone = queue.clone();
240        spawn(async move {
241            sleep(Duration::from_secs(1)).await;
242            queue_clone.push(4);
243            queue_clone.push(5);
244            sleep(Duration::from_secs(1)).await;
245            queue_clone.push(6);
246        });
247
248        assert_eq!(queue.pop().await, 4);
249        assert_eq!(queue.len(), 1);
250        assert_eq!(queue.pop().await, 5);
251        assert_eq!(queue.pop().await, 6);
252
253        assert_eq!(queue.len(), 0);
254        assert!((queue.is_empty()));
255
256        assert_eq!(queue.pop().now_or_never(), None);
257        assert_eq!(queue.pop().now_or_never(), None);
258        assert_eq!(queue.pop().now_or_never(), None);
259        let queue_clone = queue.clone();
260        let task = spawn(async move {
261            assert_eq!(queue_clone.pop().now_or_never(), None);
262            assert_eq!(queue_clone.pop().now_or_never(), None);
263            join![queue_clone.pop(), queue_clone.pop(), queue_clone.pop()]
264        });
265        sleep(Duration::from_secs(1)).await;
266        queue.push(1);
267        queue.push(2);
268        queue.push(3);
269
270        assert_eq!(task.await.unwrap(), (1, 2, 3));
271    }
272
273    #[wasm_bindgen_test]
274    async fn test_bounded() {
275        let queue = Queue::with_capacity(3);
276
277        assert_eq!(queue.try_pop(), None);
278
279        assert_eq!(queue.len(), 0);
280        assert!(queue.is_empty());
281        assert!(!queue.is_full());
282
283        queue.push(1);
284        queue.push(2);
285        queue.push(3);
286
287        assert_eq!(queue.len(), 3);
288        assert!(queue.is_full());
289    }
290}