1use futures::{future::FusedFuture, Future};
4use std::{
5 cell::RefCell,
6 collections::VecDeque,
7 pin::Pin,
8 rc::{Rc, Weak},
9 task::{Context, Poll, Waker},
10};
11
12pub struct Queue<T> {
14 state: RefCell<State<T>>,
15 capacity: usize,
16}
17
18struct State<T> {
19 buffer: VecDeque<T>,
20 wakers: VecDeque<Weak<RefCell<PopWaker>>>,
21}
22
23impl<T> State<T> {
24 fn new() -> Self {
25 State {
26 buffer: VecDeque::new(),
27 wakers: VecDeque::new(),
28 }
29 }
30}
31
32impl<T> Queue<T> {
33 pub fn new() -> Self {
35 Queue {
36 state: RefCell::new(State::new()),
37 capacity: 0,
38 }
39 }
40
41 pub fn with_capacity(capacity: usize) -> Self {
45 assert!(capacity > 0, "capacity must be greater than 0");
46 Queue {
47 state: RefCell::new(State::new()),
48 capacity,
49 }
50 }
51
52 pub fn push(&self, element: T) {
57 let mut state = self.state.borrow_mut();
58 state.buffer.push_front(element);
59 if self.capacity > 0 {
60 state.buffer.truncate(self.capacity)
61 }
62 drop(state);
63 self.wake_next();
64 }
65
66 #[must_use]
71 pub fn pop(&self) -> Pop<T> {
72 Pop {
73 queue: self,
74 terminated: false,
75 waker: None,
76 }
77 }
78
79 pub fn try_pop(&self) -> Option<T> {
83 self.state.borrow_mut().buffer.pop_back()
84 }
85
86 pub fn len(&self) -> usize {
88 self.state.borrow_mut().buffer.len()
89 }
90
91 pub fn is_empty(&self) -> bool {
93 self.state.borrow_mut().buffer.is_empty()
94 }
95
96 pub fn is_full(&self) -> bool {
98 if self.capacity == 0 {
99 false
100 } else {
101 self.len() == self.capacity
102 }
103 }
104
105 fn wake_next(&self) {
106 while let Some(waker) = self.state.borrow_mut().wakers.pop_front() {
107 if let Some(waker) = waker.upgrade() {
108 let mut waker = waker.borrow_mut();
109 waker.woken = true;
110 waker.waker.wake_by_ref();
111 break;
112 }
113 }
114 }
115}
116
117impl<T> Default for Queue<T> {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123pub struct Pop<'a, T> {
127 queue: &'a Queue<T>,
128 terminated: bool,
129 waker: Option<Rc<RefCell<PopWaker>>>,
130}
131
132struct PopWaker {
133 waker: Waker,
134 woken: bool,
135}
136
137impl PopWaker {
138 fn new(waker: Waker) -> Self {
139 PopWaker {
140 waker,
141 woken: false,
142 }
143 }
144
145 fn update(&mut self, waker: &Waker) {
146 if !self.waker.will_wake(waker) {
147 self.waker = waker.clone();
148 }
149 }
150}
151
152impl<T> Drop for Pop<'_, T> {
153 fn drop(&mut self) {
154 if self.waker.take().is_some_and(|waker| waker.borrow().woken) {
156 self.queue.wake_next();
157 }
158 }
159}
160
161impl<T> Future for Pop<'_, T> {
162 type Output = T;
163
164 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
165 if self.terminated {
166 Poll::Pending
167 } else {
168 let mut state = self.queue.state.borrow_mut();
169 match state.buffer.pop_back() {
170 Some(value) => {
171 self.terminated = true;
172 self.waker = None;
173 Poll::Ready(value)
174 }
175 None => {
176 if let Some(waker) = &self.waker {
177 let mut waker = waker.borrow_mut();
178 waker.update(cx.waker());
179 waker.woken = false;
180 } else {
181 let waker = Rc::new(RefCell::new(PopWaker::new(cx.waker().clone())));
182 self.waker = Some(waker);
183 }
184 state
185 .wakers
186 .push_front(Rc::downgrade(self.waker.as_ref().unwrap()));
187 Poll::Pending
188 }
189 }
190 }
191 }
192}
193
194impl<T> FusedFuture for Pop<'_, T> {
195 fn is_terminated(&self) -> bool {
196 self.terminated
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use std::{rc::Rc, time::Duration};
203
204 use futures::{join, FutureExt};
205 use wasm_bindgen_test::wasm_bindgen_test;
206
207 use crate::{sleep, spawn, Queue};
208
209 #[wasm_bindgen_test]
210 async fn test_unbounded() {
211 let queue = Queue::new();
212
213 assert_eq!(queue.try_pop(), None);
214
215 assert_eq!(queue.len(), 0);
216 assert!(queue.is_empty());
217 assert!(!queue.is_full());
218
219 queue.push(1);
220 queue.push(2);
221 queue.push(3);
222
223 assert_eq!(queue.len(), 3);
224 assert!(!queue.is_full());
225
226 assert_eq!(queue.try_pop().unwrap(), 1);
227 assert_eq!(queue.pop().await, 2);
228 assert_eq!(queue.len(), 1);
229 assert_eq!(queue.pop().await, 3);
230
231 assert_eq!(queue.len(), 0);
232 assert!((queue.is_empty()));
233
234 let queue = Rc::new(queue);
235 let queue_clone = queue.clone();
236 spawn(async move {
237 sleep(Duration::from_secs(1)).await;
238 queue_clone.push(4);
239 queue_clone.push(5);
240 sleep(Duration::from_secs(1)).await;
241 queue_clone.push(6);
242 });
243
244 assert_eq!(queue.pop().await, 4);
245 assert_eq!(queue.len(), 1);
246 assert_eq!(queue.pop().await, 5);
247 assert_eq!(queue.pop().await, 6);
248
249 assert_eq!(queue.len(), 0);
250 assert!((queue.is_empty()));
251
252 assert_eq!(queue.pop().now_or_never(), None);
253 assert_eq!(queue.pop().now_or_never(), None);
254 assert_eq!(queue.pop().now_or_never(), None);
255 let queue_clone = queue.clone();
256 let task = spawn(async move {
257 assert_eq!(queue_clone.pop().now_or_never(), None);
258 assert_eq!(queue_clone.pop().now_or_never(), None);
259 join![queue_clone.pop(), queue_clone.pop(), queue_clone.pop()]
260 });
261 sleep(Duration::from_secs(1)).await;
262 queue.push(1);
263 queue.push(2);
264 queue.push(3);
265
266 assert_eq!(task.await.unwrap(), (1, 2, 3));
267 }
268
269 #[wasm_bindgen_test]
270 async fn test_bounded() {
271 let queue = Queue::with_capacity(3);
272
273 assert_eq!(queue.try_pop(), None);
274
275 assert_eq!(queue.len(), 0);
276 assert!(queue.is_empty());
277 assert!(!queue.is_full());
278
279 queue.push(1);
280 queue.push(2);
281 queue.push(3);
282
283 assert_eq!(queue.len(), 3);
284 assert!(queue.is_full());
285 }
286}