1use atomic_waker::AtomicWaker;
2use concurrent_queue::{ConcurrentQueue, PopError, PushError};
3use pin_project_lite::pin_project;
4use std::future::Future;
5use std::pin::Pin;
6use std::task::{Context, Poll, Waker};
7
8#[derive(Debug)]
13pub struct WakerQueue<T> {
14 queue: ConcurrentQueue<T>,
15 waker: AtomicWaker,
16}
17
18impl<T: 'static + Send> WakerQueue<T> {
19
20 pub fn bounded(size: usize) -> WakerQueue<T> {
22 WakerQueue {
23 queue: ConcurrentQueue::bounded(size),
24 waker: AtomicWaker::new(),
25 }
26 }
27
28 pub fn unbounded() -> WakerQueue<T> {
30 WakerQueue {
31 queue: ConcurrentQueue::unbounded(),
32 waker: AtomicWaker::new(),
33 }
34 }
35
36 pub fn is_empty(&self) -> bool { self.queue.is_empty() }
38
39 pub fn is_full(&self) -> bool { self.queue.is_full() }
41
42 pub fn len(&self) -> usize { self.queue.len() }
44
45 pub fn capacity(&self) -> Option<usize> { self.queue.capacity() }
47
48 pub fn close(&self) { self.queue.close(); }
50
51 pub fn is_closed(&self) -> bool { self.queue.is_closed() }
53
54 pub fn try_push(&self, value: T) -> Result<(), PushError<T>> { self.queue.push(value) }
56
57 pub fn try_push_wake(&self, value: T, wake: bool) -> Result<(), PushError<T>> {
60 let ret = self.try_push(value);
61 self.wake_if(ret.is_ok() && wake);
62 ret
63 }
64
65 pub fn try_push_wake_empty(&self, value: T) -> Result<(), PushError<T>> {
68 self.try_push_wake(value, self.is_empty())
69 }
70
71 pub fn try_push_wake_full(&self, value: T) -> Result<(), PushError<T>> {
74 self.try_push_wake(value, self.is_full())
75 }
76
77 pub fn try_pop(&self) -> Result<T, PopError> { self.queue.pop() }
79
80
81 pub fn try_pop_wake(&self, wake: bool) -> Result<T, PopError> {
84 let ret = self.try_pop();
85 self.wake_if(ret.is_ok() && wake);
86 ret
87 }
88
89 pub fn try_pop_wake_empty(&self) -> Result<T, PopError> { self.try_pop_wake(self.is_empty()) }
92
93 pub fn try_pop_wake_full(&self) -> Result<T, PopError> { self.try_pop_wake(self.is_full()) }
96
97 pub fn push<'a, F>(&'a self, value: T, wake_if: F) -> Push<'a, T, F>
99 where F: Fn(&'a WakerQueue<T>) -> bool {
100 Push::new(self, value, wake_if)
101 }
102
103 pub fn pop<'a, F>(&'a self, wake_if: F) -> Pop<'a, T, F>
105 where F: Fn(&'a WakerQueue<T>) -> bool {
106 Pop::new(self, wake_if)
107 }
108
109 pub fn register(&self, waker: &Waker) { self.waker.register(waker); }
111
112 pub fn wake(&self) { self.waker.wake(); }
114
115 pub fn wake_if(&self, wake: bool) { if wake { self.wake(); } }
117
118 pub fn poll_pop(&self, ctx: &Context) -> Result<T, PopError> {
121 match self.try_pop() {
122 Err(PopError::Empty) => {
123 self.register(ctx.waker());
124 self.try_pop() }
126 other => other,
127 }
128 }
129
130 pub fn poll_push(&self, value: T, ctx: &Context) -> Result<(), PushError<T>> {
133 match self.try_push(value) {
134 Err(PushError::Full(value)) => {
135 self.register(ctx.waker());
136 self.try_push(value) }
138 other => other,
139 }
140 }
141
142}
143
144unsafe impl<T: 'static + Send> Send for WakerQueue<T> {}
145unsafe impl<T: 'static + Send> Sync for WakerQueue<T> {}
146
147pin_project! {
148 pub struct Push<'a, T, F> {
150 queue: &'a WakerQueue<T>,
151 value: Option<T>,
152 wake_if: F,
153 }
154}
155
156impl<'a, T, F> Push<'a, T, F>
157where T: 'static + Send, F: Fn(&'a WakerQueue<T>) -> bool{
158 fn new(queue: &'a WakerQueue<T>, value: T, wake_if: F) -> Push<'a, T, F> {
159 Push { queue, value: Some(value), wake_if }
160 }
161}
162
163impl<'a, T, F> Future for Push<'a, T, F>
164where T: 'static + Send, F: Fn(&'a WakerQueue<T>) -> bool{
165 type Output = Result<(), PushError<T>>;
166
167 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
168 let this = self.project();
169 let value = this.value.take().expect("Do not poll futures after completion.");
170 let wake_if = (this.wake_if)(&this.queue);
171 match this.queue.poll_push(value, ctx) {
172 Ok(()) => {
173 if wake_if { this.queue.wake(); }
174 Poll::Ready(Ok(()))
175 }
176 Err(PushError::Closed(value)) =>
177 Poll::Ready(Err(PushError::Closed(value))),
178 Err(PushError::Full(value)) => {
179 *this.value = Some(value);
180 Poll::Pending
181 }
182 }
183 }
184}
185
186pin_project! {
187 pub struct Pop<'a, T, F> {
189 queue: &'a WakerQueue<T>,
190 wake_if: F,
191 }
192}
193
194impl<'a, T, F> Pop<'a, T, F>
195where T: 'static + Send, F: Fn(&'a WakerQueue<T>) -> bool{
196 fn new(queue: &'a WakerQueue<T>, wake_if: F) -> Pop<'a, T, F> {
197 Pop { queue, wake_if }
198 }
199}
200
201impl<'a, T, F> Future for Pop<'a, T, F>
202where T: 'static + Send, F: Fn(&'a WakerQueue<T>) -> bool {
203 type Output = Result<T, PopError>;
204
205 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Result<T, PopError>> {
206 let this = self.project();
207 let wake_if = (this.wake_if)(&this.queue);
208 match this.queue.poll_pop(ctx) {
209 Ok(val) => {
210 if wake_if { this.queue.wake(); }
211 Poll::Ready(Ok(val))
212 }
213 Err(PopError::Closed) => Poll::Ready(Err(PopError::Closed)),
214 Err(PopError::Empty) => Poll::Pending,
215 }
216 }
217}