1use futures::{future::FusedFuture, Future};
4use std::{
5 cell::RefCell,
6 collections::VecDeque,
7 pin::Pin,
8 rc::{Rc, Weak},
9 task::{Context, Poll, Waker},
10};
11
12pub struct Queue<T> {
14 state: RefCell<State<T>>,
15 capacity: usize,
16}
17
18struct State<T> {
19 buffer: VecDeque<T>,
20 wakers: VecDeque<Weak<RefCell<PopWaker>>>,
21}
22
23impl<T> State<T> {
24 fn new() -> Self {
25 State {
26 buffer: VecDeque::new(),
27 wakers: VecDeque::new(),
28 }
29 }
30}
31
32impl<T> Queue<T> {
33 pub fn new() -> Self {
35 Queue {
36 state: RefCell::new(State::new()),
37 capacity: 0,
38 }
39 }
40
41 pub fn with_capacity(capacity: usize) -> Self {
45 assert!(capacity > 0, "capacity must be greater than 0");
46 Queue {
47 state: RefCell::new(State::new()),
48 capacity,
49 }
50 }
51
52 pub fn push(&self, element: T) {
57 let mut state = self.state.borrow_mut();
58 state.buffer.push_front(element);
59 if self.capacity > 0 {
60 state.buffer.truncate(self.capacity)
61 }
62 drop(state);
63 self.wake_next();
64 }
65
66 #[must_use]
71 pub fn pop(&self) -> Pop<T> {
72 Pop {
73 queue: self,
74 terminated: false,
75 waker: None,
76 }
77 }
78
79 pub fn try_pop(&self) -> Option<T> {
83 self.state.borrow_mut().buffer.pop_back()
84 }
85
86 pub fn len(&self) -> usize {
88 self.state.borrow_mut().buffer.len()
89 }
90
91 pub fn is_empty(&self) -> bool {
93 self.state.borrow_mut().buffer.is_empty()
94 }
95
96 pub fn is_full(&self) -> bool {
98 if self.capacity == 0 {
99 false
100 } else {
101 self.len() == self.capacity
102 }
103 }
104
105 fn wake_next(&self) {
106 while let Some(waker) = self.state.borrow_mut().wakers.pop_front() {
107 if let Some(waker) = waker.upgrade() {
108 let mut waker = waker.borrow_mut();
109 waker.woken = true;
110 waker.waker.wake_by_ref();
111 break;
112 }
113 }
114 }
115}
116
117impl<T> Default for Queue<T> {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123pub struct Pop<'a, T> {
127 queue: &'a Queue<T>,
128 terminated: bool,
129 waker: Option<Rc<RefCell<PopWaker>>>,
130}
131
132struct PopWaker {
133 waker: Waker,
134 woken: bool,
135}
136
137impl PopWaker {
138 fn new(waker: Waker) -> Self {
139 PopWaker {
140 waker,
141 woken: false,
142 }
143 }
144
145 fn update(&mut self, waker: &Waker) {
146 if !self.waker.will_wake(waker) {
147 self.waker = waker.clone();
148 }
149 }
150}
151
152impl<'a, T> Drop for Pop<'a, T> {
153 fn drop(&mut self) {
154 if self
156 .waker
157 .take()
158 .map_or(false, |waker| waker.borrow().woken)
159 {
160 self.queue.wake_next();
161 }
162 }
163}
164
165impl<'a, T> Future for Pop<'a, T> {
166 type Output = T;
167
168 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
169 if self.terminated {
170 Poll::Pending
171 } else {
172 let mut state = self.queue.state.borrow_mut();
173 match state.buffer.pop_back() {
174 Some(value) => {
175 self.terminated = true;
176 self.waker = None;
177 Poll::Ready(value)
178 }
179 None => {
180 if let Some(waker) = &self.waker {
181 let mut waker = waker.borrow_mut();
182 waker.update(cx.waker());
183 waker.woken = false;
184 } else {
185 let waker = Rc::new(RefCell::new(PopWaker::new(cx.waker().clone())));
186 self.waker = Some(waker);
187 }
188 state
189 .wakers
190 .push_front(Rc::downgrade(self.waker.as_ref().unwrap()));
191 Poll::Pending
192 }
193 }
194 }
195 }
196}
197
198impl<'a, T> FusedFuture for Pop<'a, T> {
199 fn is_terminated(&self) -> bool {
200 self.terminated
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use std::{rc::Rc, time::Duration};
207
208 use futures::{join, FutureExt};
209 use wasm_bindgen_test::wasm_bindgen_test;
210
211 use crate::{sleep, spawn, Queue};
212
213 #[wasm_bindgen_test]
214 async fn test_unbounded() {
215 let queue = Queue::new();
216
217 assert_eq!(queue.try_pop(), None);
218
219 assert_eq!(queue.len(), 0);
220 assert!(queue.is_empty());
221 assert!(!queue.is_full());
222
223 queue.push(1);
224 queue.push(2);
225 queue.push(3);
226
227 assert_eq!(queue.len(), 3);
228 assert!(!queue.is_full());
229
230 assert_eq!(queue.try_pop().unwrap(), 1);
231 assert_eq!(queue.pop().await, 2);
232 assert_eq!(queue.len(), 1);
233 assert_eq!(queue.pop().await, 3);
234
235 assert_eq!(queue.len(), 0);
236 assert!((queue.is_empty()));
237
238 let queue = Rc::new(queue);
239 let queue_clone = queue.clone();
240 spawn(async move {
241 sleep(Duration::from_secs(1)).await;
242 queue_clone.push(4);
243 queue_clone.push(5);
244 sleep(Duration::from_secs(1)).await;
245 queue_clone.push(6);
246 });
247
248 assert_eq!(queue.pop().await, 4);
249 assert_eq!(queue.len(), 1);
250 assert_eq!(queue.pop().await, 5);
251 assert_eq!(queue.pop().await, 6);
252
253 assert_eq!(queue.len(), 0);
254 assert!((queue.is_empty()));
255
256 assert_eq!(queue.pop().now_or_never(), None);
257 assert_eq!(queue.pop().now_or_never(), None);
258 assert_eq!(queue.pop().now_or_never(), None);
259 let queue_clone = queue.clone();
260 let task = spawn(async move {
261 assert_eq!(queue_clone.pop().now_or_never(), None);
262 assert_eq!(queue_clone.pop().now_or_never(), None);
263 join![queue_clone.pop(), queue_clone.pop(), queue_clone.pop()]
264 });
265 sleep(Duration::from_secs(1)).await;
266 queue.push(1);
267 queue.push(2);
268 queue.push(3);
269
270 assert_eq!(task.await.unwrap(), (1, 2, 3));
271 }
272
273 #[wasm_bindgen_test]
274 async fn test_bounded() {
275 let queue = Queue::with_capacity(3);
276
277 assert_eq!(queue.try_pop(), None);
278
279 assert_eq!(queue.len(), 0);
280 assert!(queue.is_empty());
281 assert!(!queue.is_full());
282
283 queue.push(1);
284 queue.push(2);
285 queue.push(3);
286
287 assert_eq!(queue.len(), 3);
288 assert!(queue.is_full());
289 }
290}