swap_buffer_queue/
synchronized.rs

1//! Synchronization primitives for [`Queue`].
2//!
3//! It supports both synchronous and asynchronous API. [`SynchronizedQueue`] is just an alias
4//! for a [`Queue`] using [`SynchronizedNotifier`].
5//!
6//! # Examples
7//! ```rust
8//! # use std::sync::Arc;
9//! # use swap_buffer_queue::SynchronizedQueue;
10//! # use swap_buffer_queue::buffer::VecBuffer;
11//! let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
12//!     Arc::new(SynchronizedQueue::with_capacity(1));
13//! let queue_clone = queue.clone();
14//! std::thread::spawn(move || {
15//!     queue_clone.enqueue([0]).unwrap();
16//!     queue_clone.enqueue([1]).unwrap();
17//! });
18//! assert_eq!(queue.dequeue().unwrap()[0], 0);
19//! assert_eq!(queue.dequeue().unwrap()[0], 1);
20//! ```
21use std::{
22    fmt,
23    future::poll_fn,
24    iter,
25    task::{Context, Poll},
26    time::{Duration, Instant},
27};
28
29use crate::{
30    buffer::{Buffer, BufferSlice, Drain, InsertIntoBuffer},
31    error::{DequeueError, EnqueueError, TryDequeueError, TryEnqueueError},
32    loom::{hint, thread, SPIN_LIMIT},
33    notify::Notify,
34    synchronized::{atomic_waker::AtomicWaker, waker_list::WakerList},
35    Queue,
36};
37
38mod atomic_waker;
39mod waker;
40mod waker_list;
41
42/// [`Queue`] with [`SynchronizedNotifier`]
43pub type SynchronizedQueue<B> = Queue<B, SynchronizedNotifier>;
44
45/// Synchronized (a)synchronous [`Notify`] implementation.
46#[derive(Default)]
47pub struct SynchronizedNotifier {
48    enqueuers: WakerList,
49    dequeuer: AtomicWaker,
50}
51
52impl fmt::Debug for SynchronizedNotifier {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("SynchronizedNotifier").finish()
55    }
56}
57
58impl Notify for SynchronizedNotifier {
59    #[inline]
60    fn notify_dequeue(&self) {
61        self.dequeuer.wake();
62    }
63
64    #[inline]
65    fn notify_enqueue(&self) {
66        self.enqueuers.wake();
67    }
68}
69
70impl<B> SynchronizedQueue<B>
71where
72    B: Buffer,
73{
74    #[inline]
75    fn enqueue_sync<T>(
76        &self,
77        mut value: T,
78        deadline: Option<Instant>,
79    ) -> Result<(), TryEnqueueError<T>>
80    where
81        T: InsertIntoBuffer<B>,
82    {
83        loop {
84            match try_enqueue(self, value, None) {
85                Ok(res) => return res,
86                Err(v) => value = v,
87            };
88            if wait_until(deadline) {
89                return self.try_enqueue(value);
90            }
91        }
92    }
93
94    /// Enqueues the given value inside the queue.
95    ///
96    /// This method extends [`try_enqueue`](Queue::try_enqueue) by waiting synchronously
97    /// [`SynchronizedNotifier::notify_enqueue`] call, i.e. when a buffer is dequeued, in case of
98    /// insufficient capacity.
99    ///
100    /// # Examples
101    /// ```
102    /// # use std::ops::Deref;
103    /// # use std::sync::Arc;
104    /// # use std::time::Duration;
105    /// # use swap_buffer_queue::SynchronizedQueue;
106    /// # use swap_buffer_queue::buffer::VecBuffer;
107    /// # use swap_buffer_queue::error::{EnqueueError, TryEnqueueError};
108    /// let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
109    ///     Arc::new(SynchronizedQueue::with_capacity(1));
110    /// queue.try_enqueue([0]).unwrap();
111    /// assert_eq!(
112    ///     queue.try_enqueue([1]),
113    ///     Err(TryEnqueueError::InsufficientCapacity([1]))
114    /// );
115    /// // queue is full, let's spawn an enqueuing task and dequeue
116    /// let queue_clone = queue.clone();
117    /// let task = std::thread::spawn(move || queue_clone.enqueue([1]));
118    /// std::thread::sleep(Duration::from_millis(1));
119    /// assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
120    /// // enqueuing task has succeeded
121    /// task.join().unwrap().unwrap();
122    /// assert_eq!(queue.try_dequeue().unwrap().deref(), &[1]);
123    /// // let's close the queue
124    /// queue.try_enqueue([2]).unwrap();
125    /// let queue_clone = queue.clone();
126    /// let task = std::thread::spawn(move || queue_clone.enqueue([3]));
127    /// std::thread::sleep(Duration::from_millis(1));
128    /// queue.close();
129    /// assert_eq!(task.join().unwrap(), Err(EnqueueError::Closed([3])));
130    /// ```
131    pub fn enqueue<T>(&self, value: T) -> Result<(), EnqueueError<T>>
132    where
133        T: InsertIntoBuffer<B>,
134    {
135        self.enqueue_sync(value, None)
136    }
137
138    /// Tries enqueuing the given value inside the queue with a timeout.
139    ///
140    /// This method extends [`try_enqueue`](Queue::try_enqueue) by waiting synchronously (with a
141    /// timeout) [`SynchronizedNotifier::notify_enqueue`] call, i.e. when a buffer is dequeued, in case of
142    /// insufficient capacity.
143    ///
144    /// # Examples
145    /// ```
146    /// # use std::ops::Deref;
147    /// # use std::sync::Arc;
148    /// # use std::time::Duration;
149    /// # use swap_buffer_queue::SynchronizedQueue;
150    /// # use swap_buffer_queue::buffer::VecBuffer;
151    /// # use swap_buffer_queue::error::{EnqueueError, TryEnqueueError};
152    /// let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
153    ///     Arc::new(SynchronizedQueue::with_capacity(1));
154    /// queue.try_enqueue([0]).unwrap();
155    /// assert_eq!(
156    ///     queue.enqueue_timeout([1], Duration::from_millis(1)),
157    ///     Err(TryEnqueueError::InsufficientCapacity([1]))
158    /// );
159    /// let queue_clone = queue.clone();
160    /// let task = std::thread::spawn(move || {
161    ///     std::thread::sleep(Duration::from_millis(1));
162    ///     queue_clone.try_dequeue().unwrap();
163    /// });
164    /// queue.enqueue_timeout([1], Duration::from_secs(1)).unwrap();
165    /// ```
166    pub fn enqueue_timeout<T>(&self, value: T, timeout: Duration) -> Result<(), TryEnqueueError<T>>
167    where
168        T: InsertIntoBuffer<B>,
169    {
170        self.enqueue_sync(value, Some(Instant::now() + timeout))
171    }
172
173    /// Enqueues the given value inside the queue.
174    ///
175    /// This method extends [`try_enqueue`](Queue::try_enqueue) by waiting asynchronously
176    /// [`SynchronizedNotifier::notify_enqueue`] call, i.e. when a buffer is dequeued, in case of
177    /// insufficient capacity.
178    ///
179    /// # Examples
180    /// ```
181    /// # use std::ops::Deref;
182    /// # use std::sync::Arc;
183    /// # use swap_buffer_queue::SynchronizedQueue;
184    /// # use swap_buffer_queue::buffer::VecBuffer;
185    /// # use swap_buffer_queue::error::{EnqueueError, TryEnqueueError};
186    /// # tokio_test::block_on(async {
187    /// let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
188    ///     Arc::new(SynchronizedQueue::with_capacity(1));
189    /// queue.try_enqueue([0]).unwrap();
190    /// assert_eq!(
191    ///     queue.try_enqueue([0]),
192    ///     Err(TryEnqueueError::InsufficientCapacity([0]))
193    /// );
194    /// // queue is full, let's spawn an enqueuing task and dequeue
195    /// let queue_clone = queue.clone();
196    /// let task = tokio::spawn(async move { queue_clone.enqueue_async([1]).await });
197    /// assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
198    /// // enqueuing task has succeeded
199    /// task.await.unwrap().unwrap();
200    /// assert_eq!(queue.try_dequeue().unwrap().deref(), &[1]);
201    /// // let's close the queue
202    /// queue.try_enqueue([2]).unwrap();
203    /// let queue_clone = queue.clone();
204    /// let task = tokio::spawn(async move { queue_clone.enqueue_async([3]).await });
205    /// queue.close();
206    /// assert_eq!(task.await.unwrap(), Err(EnqueueError::Closed([3])));
207    /// # })
208    /// ```
209    pub async fn enqueue_async<T>(&self, value: T) -> Result<(), EnqueueError<T>>
210    where
211        T: InsertIntoBuffer<B>,
212    {
213        let mut value = Some(value);
214        poll_fn(|cx| {
215            let v = value.take().unwrap();
216            match try_enqueue(self, v, Some(cx)) {
217                Ok(res) => return Poll::Ready(res),
218                Err(v) => value.replace(v),
219            };
220            Poll::Pending
221        })
222        .await
223    }
224
225    fn dequeue_sync(
226        &self,
227        deadline: Option<Instant>,
228    ) -> Result<BufferSlice<B, SynchronizedNotifier>, TryDequeueError> {
229        loop {
230            if let Some(res) = try_dequeue(self, None) {
231                return res;
232            }
233            if wait_until(deadline) {
234                return self.try_dequeue();
235            }
236        }
237    }
238
239    /// Dequeues a buffer with all enqueued values from the queue.
240    ///
241    /// This method extends [`try_dequeue`](Queue::try_dequeue) by waiting synchronously
242    /// [`SynchronizedNotifier::notify_dequeue`] call, i.e. when a value is enqueued, in case of
243    /// empty queue.
244    ///
245    /// # Examples
246    /// ```
247    /// # use std::ops::Deref;
248    /// # use std::sync::Arc;
249    /// # use swap_buffer_queue::SynchronizedQueue;
250    /// # use swap_buffer_queue::buffer::VecBuffer;
251    /// # use swap_buffer_queue::error::{DequeueError, TryDequeueError};
252    /// let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
253    ///     Arc::new(SynchronizedQueue::with_capacity(1));
254    /// assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Empty);
255    /// // queue is empty, let's spawn a dequeuing task and enqueue
256    /// let queue_clone = queue.clone();
257    /// let task = std::thread::spawn(move || {
258    ///     Ok::<_, DequeueError>(queue_clone.dequeue()?.into_iter().collect::<Vec<_>>())
259    /// });
260    /// queue.try_enqueue([0]).unwrap();
261    /// // dequeuing task has succeeded
262    /// assert_eq!(task.join().unwrap().unwrap().deref(), &[0]);
263    /// // let's close the queue
264    /// let queue_clone = queue.clone();
265    /// let task = std::thread::spawn(move || {
266    ///     Ok::<_, DequeueError>(queue_clone.dequeue()?.into_iter().collect::<Vec<_>>())
267    /// });
268    /// queue.close();
269    /// assert_eq!(task.join().unwrap().unwrap_err(), DequeueError::Closed);
270    /// ```
271    pub fn dequeue(&self) -> Result<BufferSlice<B, SynchronizedNotifier>, DequeueError> {
272        self.dequeue_sync(None).map_err(dequeue_err)
273    }
274
275    /// Tries dequeuing a buffer with all enqueued values from the queue with a timeout.
276    ///
277    /// This method extends [`try_dequeue`](Queue::try_dequeue) by waiting synchronously, with a
278    /// timeout, [`SynchronizedNotifier::notify_dequeue`] call, i.e. when a value is enqueued, in case of
279    /// empty queue.
280    ///
281    /// # Examples
282    /// ```
283    /// # use std::ops::Deref;
284    /// # use std::sync::Arc;
285    /// # use std::time::Duration;
286    /// # use swap_buffer_queue::SynchronizedQueue;
287    /// # use swap_buffer_queue::buffer::VecBuffer;
288    /// # use swap_buffer_queue::error::{DequeueError, TryDequeueError};
289    /// let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
290    ///     Arc::new(SynchronizedQueue::with_capacity(1));
291    /// assert_eq!(
292    ///     queue.dequeue_timeout(Duration::from_millis(1)).unwrap_err(),
293    ///     TryDequeueError::Empty
294    /// );
295    /// let queue_clone = queue.clone();
296    /// let task = std::thread::spawn(move || {
297    ///     std::thread::sleep(Duration::from_millis(1));
298    ///     queue_clone.try_enqueue([0]).unwrap();
299    /// });
300    /// assert_eq!(
301    ///     queue
302    ///         .dequeue_timeout(Duration::from_secs(1))
303    ///         .unwrap()
304    ///         .deref(),
305    ///     &[0]
306    /// );
307    /// ```
308    pub fn dequeue_timeout(
309        &self,
310        timeout: Duration,
311    ) -> Result<BufferSlice<B, SynchronizedNotifier>, TryDequeueError> {
312        self.dequeue_sync(Some(Instant::now() + timeout))
313    }
314
315    /// Dequeues a buffer with all enqueued values from the queue.
316    ///
317    /// This method extends [`try_dequeue`](Queue::try_dequeue) by waiting asynchronously
318    /// [`SynchronizedNotifier::notify_dequeue`] call, i.e. when a value is enqueued, in case of
319    /// empty queue.
320    ///
321    /// # Examples
322    /// ```
323    /// # use std::ops::Deref;
324    /// # use std::sync::Arc;
325    /// # use swap_buffer_queue::SynchronizedQueue;
326    /// # use swap_buffer_queue::buffer::VecBuffer;
327    /// # use swap_buffer_queue::error::{DequeueError, TryDequeueError};
328    /// # tokio_test::block_on(async {
329    /// let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
330    ///     Arc::new(SynchronizedQueue::with_capacity(1));
331    /// assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Empty);
332    /// // queue is empty, let's spawn a dequeuing task and enqueue
333    /// let queue_clone = queue.clone();
334    /// let task = tokio::spawn(async move {
335    ///     Ok::<_, DequeueError>(
336    ///         queue_clone
337    ///             .dequeue_async()
338    ///             .await?
339    ///             .into_iter()
340    ///             .collect::<Vec<_>>(),
341    ///     )
342    /// });
343    /// queue.try_enqueue([0]).unwrap();
344    /// // dequeuing task has succeeded
345    /// assert_eq!(task.await.unwrap().unwrap().deref(), &[0]);
346    /// // let's close the queue
347    /// let queue_clone = queue.clone();
348    /// let task = tokio::spawn(async move {
349    ///     Ok::<_, DequeueError>(
350    ///         queue_clone
351    ///             .dequeue_async()
352    ///             .await?
353    ///             .into_iter()
354    ///             .collect::<Vec<_>>(),
355    ///     )
356    /// });
357    /// queue.close();
358    /// assert_eq!(task.await.unwrap().unwrap_err(), DequeueError::Closed);
359    /// # })
360    /// ```
361    pub async fn dequeue_async(
362        &self,
363    ) -> Result<BufferSlice<B, SynchronizedNotifier>, DequeueError> {
364        poll_fn(|cx| {
365            if let Some(res) = try_dequeue(self, Some(cx)) {
366                return Poll::Ready(res.map_err(dequeue_err));
367            }
368            Poll::Pending
369        })
370        .await
371    }
372}
373
374impl<B> SynchronizedQueue<B>
375where
376    B: Buffer + Drain,
377{
378    /// Returns an iterator over the element of the queue (see [`BufferIter`](crate::buffer::BufferIter)).
379    ///
380    /// # Examples
381    /// ```
382    /// # use swap_buffer_queue::SynchronizedQueue;
383    /// # use swap_buffer_queue::buffer::VecBuffer;
384    /// let queue: SynchronizedQueue<VecBuffer<usize>> = SynchronizedQueue::with_capacity(42);
385    /// queue.try_enqueue([0]).unwrap();
386    /// queue.try_enqueue([1]).unwrap();
387    ///
388    /// let mut iter = queue.iter();
389    /// assert_eq!(iter.next(), Some(0));
390    /// drop(iter);
391    /// let mut iter = queue.iter();
392    /// assert_eq!(iter.next(), Some(1));
393    /// queue.close(); // close in order to stop the iterator
394    /// assert_eq!(iter.next(), None);
395    /// ```
396    pub fn iter(&self) -> impl Iterator<Item = B::Value> + '_ {
397        iter::repeat_with(|| self.dequeue())
398            .map_while(|res| res.ok())
399            .flatten()
400    }
401
402    #[cfg(feature = "stream")]
403    /// Returns an stream over the element of the queue (see [`BufferIter`](crate::buffer::BufferIter)).
404    ///
405    /// # Examples
406    /// ```
407    /// # use futures_util::StreamExt;
408    /// # use swap_buffer_queue::SynchronizedQueue;
409    /// # use swap_buffer_queue::buffer::VecBuffer;
410    /// # tokio_test::block_on(async {
411    /// let queue: SynchronizedQueue<VecBuffer<usize>> = SynchronizedQueue::with_capacity(42);
412    /// queue.try_enqueue([0]).unwrap();
413    /// queue.try_enqueue([1]).unwrap();
414    ///
415    /// let mut stream = Box::pin(queue.stream());
416    /// assert_eq!(stream.next().await, Some(0));
417    /// drop(stream);
418    /// let mut stream = Box::pin(queue.stream());
419    /// assert_eq!(stream.next().await, Some(1));
420    /// queue.close(); // close in order to stop the stream
421    /// assert_eq!(stream.next().await, None);
422    /// # })
423    /// ```
424    pub fn stream(&self) -> impl futures_core::Stream<Item = B::Value> + '_ {
425        use futures_util::{stream, StreamExt};
426        stream::repeat_with(|| stream::once(self.dequeue_async()))
427            .flatten()
428            .take_while(|res| {
429                let is_ok = res.is_ok();
430                async move { is_ok }
431            })
432            .flat_map(|res| stream::iter(res.unwrap()))
433    }
434}
435
436#[inline]
437fn try_enqueue<B, T>(
438    queue: &SynchronizedQueue<B>,
439    mut value: T,
440    cx: Option<&Context>,
441) -> Result<Result<(), TryEnqueueError<T>>, T>
442where
443    B: Buffer,
444    T: InsertIntoBuffer<B>,
445{
446    for _ in 0..SPIN_LIMIT {
447        match queue.try_enqueue(value) {
448            Err(TryEnqueueError::InsufficientCapacity(v)) if v.size() <= queue.capacity() => {
449                value = v;
450            }
451            res => return Ok(res),
452        };
453        hint::spin_loop();
454    }
455    queue.notify().enqueuers.register(cx);
456    match queue.try_enqueue(value) {
457        Err(TryEnqueueError::InsufficientCapacity(v)) if v.size() <= queue.capacity() => Err(v),
458        res => Ok(res),
459    }
460}
461
462#[inline]
463fn try_dequeue<'a, B>(
464    queue: &'a SynchronizedQueue<B>,
465    cx: Option<&Context>,
466) -> Option<Result<BufferSlice<'a, B, SynchronizedNotifier>, TryDequeueError>>
467where
468    B: Buffer,
469{
470    for _ in 0..SPIN_LIMIT {
471        match queue.try_dequeue() {
472            Err(TryDequeueError::Empty | TryDequeueError::Pending) => {}
473            res => return Some(res),
474        }
475        hint::spin_loop();
476    }
477    queue.notify().dequeuer.register(cx);
478    match queue.try_dequeue() {
479        Err(TryDequeueError::Empty | TryDequeueError::Pending) => None,
480        res => Some(res),
481    }
482}
483
484#[inline]
485fn dequeue_err(error: TryDequeueError) -> DequeueError {
486    match error {
487        TryDequeueError::Closed => DequeueError::Closed,
488        TryDequeueError::Conflict => DequeueError::Conflict,
489        _ => unreachable!(),
490    }
491}
492
493#[inline]
494fn wait_until(deadline: Option<Instant>) -> bool {
495    match deadline.map(|d| d.checked_duration_since(Instant::now())) {
496        #[cfg(not(all(loom, test)))]
497        Some(Some(timeout)) => thread::park_timeout(timeout),
498        #[cfg(all(loom, test))]
499        Some(Some(_)) => panic!("loom doesn't support park_timeout"),
500        Some(None) => return true,
501        None => thread::park(),
502    }
503    false
504}