waker_queue/
lib.rs

1use atomic_waker::AtomicWaker;
2use concurrent_queue::{ConcurrentQueue, PopError, PushError};
3use pin_project_lite::pin_project;
4use std::future::Future;
5use std::pin::Pin;
6use std::task::{Context, Poll, Waker};
7
8/// A ConcurrentQueue with a Waker attached. Allows rapid
9/// implementation of fast channels where at least one side may not be
10/// cloned. Caution: Last Write wins and you are asking for trouble if
11/// there are races registering Wakers. See `atomic_waker` for details.
12#[derive(Debug)]
13pub struct WakerQueue<T> {
14    queue: ConcurrentQueue<T>,
15    waker: AtomicWaker,
16}
17
18impl<T: 'static + Send> WakerQueue<T> {
19
20    /// Create a WakerQueue with the given capacity.
21    pub fn bounded(size: usize) -> WakerQueue<T> {
22        WakerQueue {
23            queue: ConcurrentQueue::bounded(size),
24            waker: AtomicWaker::new(),
25        }
26    }
27
28    /// Create a WakerQueue which will dynamically resize as required.
29    pub fn unbounded() -> WakerQueue<T> {
30        WakerQueue {
31            queue: ConcurrentQueue::unbounded(),
32            waker: AtomicWaker::new(),
33        }
34    }
35
36    /// true if the WakerQueue has no items.
37    pub fn is_empty(&self) -> bool { self.queue.is_empty() }
38
39    /// true if the WakerQueue has no spare capacity.
40    pub fn is_full(&self) -> bool { self.queue.is_full() }
41
42    /// Count of items in the WakerQueue
43    pub fn len(&self) -> usize { self.queue.len() }
44
45    /// Returns the maximum capacity of the queue. None for unbounded queues.
46    pub fn capacity(&self) -> Option<usize> { self.queue.capacity() }
47
48    /// Closes the queue so that no more items can be pushed. Pops are allowed.
49    pub fn close(&self) { self.queue.close(); }
50
51    /// true if the queue is closed
52    pub fn is_closed(&self) -> bool { self.queue.is_closed() }
53
54    /// Attempt to push an item into the queue. Will fail if the queue is full or closed.
55    pub fn try_push(&self, value: T) -> Result<(), PushError<T>> { self.queue.push(value) }
56
57    /// Attempts a push. If successful and `wake` is true, wakes the
58    /// last registered Waker.
59    pub fn try_push_wake(&self, value: T, wake: bool) -> Result<(), PushError<T>> {
60        let ret = self.try_push(value);
61        self.wake_if(ret.is_ok() && wake);
62        ret
63    }
64
65    /// Attempts a push. If successful and the queue was previously
66    /// empty, wakes the last registered Waker.
67    pub fn try_push_wake_empty(&self, value: T) -> Result<(), PushError<T>> {
68        self.try_push_wake(value, self.is_empty())
69    }
70    
71    /// Attempts a push. If successful and `wake` is true, wakes the
72    /// last registered Waker.
73    pub fn try_push_wake_full(&self, value: T) -> Result<(), PushError<T>> {
74        self.try_push_wake(value, self.is_full())
75    }
76    
77    /// Attempts to pop an item from the queue. Will fail if the queue is empty.
78    pub fn try_pop(&self) -> Result<T, PopError> { self.queue.pop() }
79
80    
81    /// Attempts a pop. If successful and `wake` is true, wakes the
82    /// last registered Waker.
83    pub fn try_pop_wake(&self, wake: bool) -> Result<T, PopError> {
84        let ret = self.try_pop();
85        self.wake_if(ret.is_ok() && wake);
86        ret
87    }
88
89    /// Attempts a pop. If successful and the queue was previously
90    /// empty, wakes the last registered Waker.
91    pub fn try_pop_wake_empty(&self) -> Result<T, PopError> { self.try_pop_wake(self.is_empty()) }
92
93    /// Attempts a pop. If successful and the queue was previously
94    /// full, wakes the last registered Waker.
95    pub fn try_pop_wake_full(&self) -> Result<T, PopError> { self.try_pop_wake(self.is_full()) }
96
97    /// Returns a future which pushes into a WakerQueue
98    pub fn push<'a, F>(&'a self, value: T, wake_if: F) -> Push<'a, T, F>
99    where F: Fn(&'a WakerQueue<T>) -> bool {
100        Push::new(self, value, wake_if)
101    }
102
103    /// Returns a future which pops from a WakerQueue
104    pub fn pop<'a, F>(&'a self, wake_if: F) -> Pop<'a, T, F>
105    where F: Fn(&'a WakerQueue<T>) -> bool {
106        Pop::new(self, wake_if)
107    }
108
109    /// Registers a waker with the WakerQueue.
110    pub fn register(&self, waker: &Waker) { self.waker.register(waker); }
111
112    /// Wakes the last registered Waker, if any.
113    pub fn wake(&self) { self.waker.wake(); }
114
115    /// Wakes the last registered Waker, if any, if wake is true.
116    pub fn wake_if(&self, wake: bool) { if wake { self.wake(); } }
117
118    /// Attempts to pop. If it fails because the queue is empty, it
119    /// registers the Waker from the provided context.
120    pub fn poll_pop(&self, ctx: &Context) -> Result<T, PopError> {
121        match self.try_pop() {
122            Err(PopError::Empty) => {
123                self.register(ctx.waker());
124                self.try_pop() // save us from a race.
125            }
126            other => other,
127        }
128    }
129
130    /// Attempts to push. If it fails because the queue is full, it
131    /// registers the Waker from the provided context.
132    pub fn poll_push(&self, value: T, ctx: &Context) -> Result<(), PushError<T>> {
133        match self.try_push(value) {
134            Err(PushError::Full(value)) => {
135                self.register(ctx.waker());
136                self.try_push(value) // save us from a race.
137            }
138            other => other,
139        }
140    }
141
142}
143
144unsafe impl<T: 'static + Send> Send for WakerQueue<T> {}
145unsafe impl<T: 'static + Send> Sync for WakerQueue<T> {}
146
147pin_project! {
148    /// Future for `WakerQueue.push(value, wake_if)`
149    pub struct Push<'a, T, F> {
150        queue: &'a WakerQueue<T>,
151        value: Option<T>,
152        wake_if: F,
153    }
154}
155
156impl<'a, T, F> Push<'a, T, F>
157where T: 'static + Send, F: Fn(&'a WakerQueue<T>) -> bool{
158    fn new(queue: &'a WakerQueue<T>, value: T, wake_if: F) -> Push<'a, T, F> {
159        Push { queue, value: Some(value), wake_if }
160    }
161}
162
163impl<'a, T, F> Future for Push<'a, T, F>
164where T: 'static + Send, F: Fn(&'a WakerQueue<T>) -> bool{
165    type Output = Result<(), PushError<T>>;
166
167    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
168        let this = self.project();
169        let value = this.value.take().expect("Do not poll futures after completion.");
170        let wake_if = (this.wake_if)(&this.queue);
171        match this.queue.poll_push(value, ctx) {
172            Ok(()) => {
173                if wake_if { this.queue.wake(); }
174                Poll::Ready(Ok(()))
175            }
176            Err(PushError::Closed(value)) =>
177                Poll::Ready(Err(PushError::Closed(value))),
178            Err(PushError::Full(value)) => {
179                *this.value = Some(value);
180                Poll::Pending
181            }
182        }
183    }
184}
185
186pin_project! {
187    /// Future for `WakerQueue.pop(wake_if)`
188    pub struct Pop<'a, T, F> {
189        queue: &'a WakerQueue<T>,
190        wake_if: F,
191    }
192}
193
194impl<'a, T, F> Pop<'a, T, F>
195where T: 'static + Send, F: Fn(&'a WakerQueue<T>) -> bool{
196    fn new(queue: &'a WakerQueue<T>, wake_if: F) -> Pop<'a, T, F> {
197        Pop { queue, wake_if }
198    }
199}
200
201impl<'a, T, F> Future for Pop<'a, T, F>
202where T: 'static + Send, F: Fn(&'a WakerQueue<T>) -> bool {
203    type Output = Result<T, PopError>;
204
205    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Result<T, PopError>> {
206        let this = self.project();
207        let wake_if = (this.wake_if)(&this.queue);
208        match this.queue.poll_pop(ctx) {
209            Ok(val) => {
210                if wake_if { this.queue.wake(); }
211                Poll::Ready(Ok(val))
212            }
213            Err(PopError::Closed) => Poll::Ready(Err(PopError::Closed)),
214            Err(PopError::Empty) => Poll::Pending,
215        }
216    }
217}