swap_buffer_queue/synchronized.rs
1//! Synchronization primitives for [`Queue`].
2//!
3//! It supports both synchronous and asynchronous API. [`SynchronizedQueue`] is just an alias
4//! for a [`Queue`] using [`SynchronizedNotifier`].
5//!
6//! # Examples
7//! ```rust
8//! # use std::sync::Arc;
9//! # use swap_buffer_queue::SynchronizedQueue;
10//! # use swap_buffer_queue::buffer::VecBuffer;
11//! let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
12//! Arc::new(SynchronizedQueue::with_capacity(1));
13//! let queue_clone = queue.clone();
14//! std::thread::spawn(move || {
15//! queue_clone.enqueue([0]).unwrap();
16//! queue_clone.enqueue([1]).unwrap();
17//! });
18//! assert_eq!(queue.dequeue().unwrap()[0], 0);
19//! assert_eq!(queue.dequeue().unwrap()[0], 1);
20//! ```
21use std::{
22 fmt,
23 future::poll_fn,
24 iter,
25 task::{Context, Poll},
26 time::{Duration, Instant},
27};
28
29use crate::{
30 buffer::{Buffer, BufferSlice, Drain, InsertIntoBuffer},
31 error::{DequeueError, EnqueueError, TryDequeueError, TryEnqueueError},
32 loom::{hint, thread, SPIN_LIMIT},
33 notify::Notify,
34 synchronized::{atomic_waker::AtomicWaker, waker_list::WakerList},
35 Queue,
36};
37
38mod atomic_waker;
39mod waker;
40mod waker_list;
41
42/// [`Queue`] with [`SynchronizedNotifier`]
43pub type SynchronizedQueue<B> = Queue<B, SynchronizedNotifier>;
44
45/// Synchronized (a)synchronous [`Notify`] implementation.
46#[derive(Default)]
47pub struct SynchronizedNotifier {
48 enqueuers: WakerList,
49 dequeuer: AtomicWaker,
50}
51
52impl fmt::Debug for SynchronizedNotifier {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("SynchronizedNotifier").finish()
55 }
56}
57
58impl Notify for SynchronizedNotifier {
59 #[inline]
60 fn notify_dequeue(&self) {
61 self.dequeuer.wake();
62 }
63
64 #[inline]
65 fn notify_enqueue(&self) {
66 self.enqueuers.wake();
67 }
68}
69
70impl<B> SynchronizedQueue<B>
71where
72 B: Buffer,
73{
74 #[inline]
75 fn enqueue_sync<T>(
76 &self,
77 mut value: T,
78 deadline: Option<Instant>,
79 ) -> Result<(), TryEnqueueError<T>>
80 where
81 T: InsertIntoBuffer<B>,
82 {
83 loop {
84 match try_enqueue(self, value, None) {
85 Ok(res) => return res,
86 Err(v) => value = v,
87 };
88 if wait_until(deadline) {
89 return self.try_enqueue(value);
90 }
91 }
92 }
93
94 /// Enqueues the given value inside the queue.
95 ///
96 /// This method extends [`try_enqueue`](Queue::try_enqueue) by waiting synchronously
97 /// [`SynchronizedNotifier::notify_enqueue`] call, i.e. when a buffer is dequeued, in case of
98 /// insufficient capacity.
99 ///
100 /// # Examples
101 /// ```
102 /// # use std::ops::Deref;
103 /// # use std::sync::Arc;
104 /// # use std::time::Duration;
105 /// # use swap_buffer_queue::SynchronizedQueue;
106 /// # use swap_buffer_queue::buffer::VecBuffer;
107 /// # use swap_buffer_queue::error::{EnqueueError, TryEnqueueError};
108 /// let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
109 /// Arc::new(SynchronizedQueue::with_capacity(1));
110 /// queue.try_enqueue([0]).unwrap();
111 /// assert_eq!(
112 /// queue.try_enqueue([1]),
113 /// Err(TryEnqueueError::InsufficientCapacity([1]))
114 /// );
115 /// // queue is full, let's spawn an enqueuing task and dequeue
116 /// let queue_clone = queue.clone();
117 /// let task = std::thread::spawn(move || queue_clone.enqueue([1]));
118 /// std::thread::sleep(Duration::from_millis(1));
119 /// assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
120 /// // enqueuing task has succeeded
121 /// task.join().unwrap().unwrap();
122 /// assert_eq!(queue.try_dequeue().unwrap().deref(), &[1]);
123 /// // let's close the queue
124 /// queue.try_enqueue([2]).unwrap();
125 /// let queue_clone = queue.clone();
126 /// let task = std::thread::spawn(move || queue_clone.enqueue([3]));
127 /// std::thread::sleep(Duration::from_millis(1));
128 /// queue.close();
129 /// assert_eq!(task.join().unwrap(), Err(EnqueueError::Closed([3])));
130 /// ```
131 pub fn enqueue<T>(&self, value: T) -> Result<(), EnqueueError<T>>
132 where
133 T: InsertIntoBuffer<B>,
134 {
135 self.enqueue_sync(value, None)
136 }
137
138 /// Tries enqueuing the given value inside the queue with a timeout.
139 ///
140 /// This method extends [`try_enqueue`](Queue::try_enqueue) by waiting synchronously (with a
141 /// timeout) [`SynchronizedNotifier::notify_enqueue`] call, i.e. when a buffer is dequeued, in case of
142 /// insufficient capacity.
143 ///
144 /// # Examples
145 /// ```
146 /// # use std::ops::Deref;
147 /// # use std::sync::Arc;
148 /// # use std::time::Duration;
149 /// # use swap_buffer_queue::SynchronizedQueue;
150 /// # use swap_buffer_queue::buffer::VecBuffer;
151 /// # use swap_buffer_queue::error::{EnqueueError, TryEnqueueError};
152 /// let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
153 /// Arc::new(SynchronizedQueue::with_capacity(1));
154 /// queue.try_enqueue([0]).unwrap();
155 /// assert_eq!(
156 /// queue.enqueue_timeout([1], Duration::from_millis(1)),
157 /// Err(TryEnqueueError::InsufficientCapacity([1]))
158 /// );
159 /// let queue_clone = queue.clone();
160 /// let task = std::thread::spawn(move || {
161 /// std::thread::sleep(Duration::from_millis(1));
162 /// queue_clone.try_dequeue().unwrap();
163 /// });
164 /// queue.enqueue_timeout([1], Duration::from_secs(1)).unwrap();
165 /// ```
166 pub fn enqueue_timeout<T>(&self, value: T, timeout: Duration) -> Result<(), TryEnqueueError<T>>
167 where
168 T: InsertIntoBuffer<B>,
169 {
170 self.enqueue_sync(value, Some(Instant::now() + timeout))
171 }
172
173 /// Enqueues the given value inside the queue.
174 ///
175 /// This method extends [`try_enqueue`](Queue::try_enqueue) by waiting asynchronously
176 /// [`SynchronizedNotifier::notify_enqueue`] call, i.e. when a buffer is dequeued, in case of
177 /// insufficient capacity.
178 ///
179 /// # Examples
180 /// ```
181 /// # use std::ops::Deref;
182 /// # use std::sync::Arc;
183 /// # use swap_buffer_queue::SynchronizedQueue;
184 /// # use swap_buffer_queue::buffer::VecBuffer;
185 /// # use swap_buffer_queue::error::{EnqueueError, TryEnqueueError};
186 /// # tokio_test::block_on(async {
187 /// let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
188 /// Arc::new(SynchronizedQueue::with_capacity(1));
189 /// queue.try_enqueue([0]).unwrap();
190 /// assert_eq!(
191 /// queue.try_enqueue([0]),
192 /// Err(TryEnqueueError::InsufficientCapacity([0]))
193 /// );
194 /// // queue is full, let's spawn an enqueuing task and dequeue
195 /// let queue_clone = queue.clone();
196 /// let task = tokio::spawn(async move { queue_clone.enqueue_async([1]).await });
197 /// assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
198 /// // enqueuing task has succeeded
199 /// task.await.unwrap().unwrap();
200 /// assert_eq!(queue.try_dequeue().unwrap().deref(), &[1]);
201 /// // let's close the queue
202 /// queue.try_enqueue([2]).unwrap();
203 /// let queue_clone = queue.clone();
204 /// let task = tokio::spawn(async move { queue_clone.enqueue_async([3]).await });
205 /// queue.close();
206 /// assert_eq!(task.await.unwrap(), Err(EnqueueError::Closed([3])));
207 /// # })
208 /// ```
209 pub async fn enqueue_async<T>(&self, value: T) -> Result<(), EnqueueError<T>>
210 where
211 T: InsertIntoBuffer<B>,
212 {
213 let mut value = Some(value);
214 poll_fn(|cx| {
215 let v = value.take().unwrap();
216 match try_enqueue(self, v, Some(cx)) {
217 Ok(res) => return Poll::Ready(res),
218 Err(v) => value.replace(v),
219 };
220 Poll::Pending
221 })
222 .await
223 }
224
225 fn dequeue_sync(
226 &self,
227 deadline: Option<Instant>,
228 ) -> Result<BufferSlice<B, SynchronizedNotifier>, TryDequeueError> {
229 loop {
230 if let Some(res) = try_dequeue(self, None) {
231 return res;
232 }
233 if wait_until(deadline) {
234 return self.try_dequeue();
235 }
236 }
237 }
238
239 /// Dequeues a buffer with all enqueued values from the queue.
240 ///
241 /// This method extends [`try_dequeue`](Queue::try_dequeue) by waiting synchronously
242 /// [`SynchronizedNotifier::notify_dequeue`] call, i.e. when a value is enqueued, in case of
243 /// empty queue.
244 ///
245 /// # Examples
246 /// ```
247 /// # use std::ops::Deref;
248 /// # use std::sync::Arc;
249 /// # use swap_buffer_queue::SynchronizedQueue;
250 /// # use swap_buffer_queue::buffer::VecBuffer;
251 /// # use swap_buffer_queue::error::{DequeueError, TryDequeueError};
252 /// let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
253 /// Arc::new(SynchronizedQueue::with_capacity(1));
254 /// assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Empty);
255 /// // queue is empty, let's spawn a dequeuing task and enqueue
256 /// let queue_clone = queue.clone();
257 /// let task = std::thread::spawn(move || {
258 /// Ok::<_, DequeueError>(queue_clone.dequeue()?.into_iter().collect::<Vec<_>>())
259 /// });
260 /// queue.try_enqueue([0]).unwrap();
261 /// // dequeuing task has succeeded
262 /// assert_eq!(task.join().unwrap().unwrap().deref(), &[0]);
263 /// // let's close the queue
264 /// let queue_clone = queue.clone();
265 /// let task = std::thread::spawn(move || {
266 /// Ok::<_, DequeueError>(queue_clone.dequeue()?.into_iter().collect::<Vec<_>>())
267 /// });
268 /// queue.close();
269 /// assert_eq!(task.join().unwrap().unwrap_err(), DequeueError::Closed);
270 /// ```
271 pub fn dequeue(&self) -> Result<BufferSlice<B, SynchronizedNotifier>, DequeueError> {
272 self.dequeue_sync(None).map_err(dequeue_err)
273 }
274
275 /// Tries dequeuing a buffer with all enqueued values from the queue with a timeout.
276 ///
277 /// This method extends [`try_dequeue`](Queue::try_dequeue) by waiting synchronously, with a
278 /// timeout, [`SynchronizedNotifier::notify_dequeue`] call, i.e. when a value is enqueued, in case of
279 /// empty queue.
280 ///
281 /// # Examples
282 /// ```
283 /// # use std::ops::Deref;
284 /// # use std::sync::Arc;
285 /// # use std::time::Duration;
286 /// # use swap_buffer_queue::SynchronizedQueue;
287 /// # use swap_buffer_queue::buffer::VecBuffer;
288 /// # use swap_buffer_queue::error::{DequeueError, TryDequeueError};
289 /// let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
290 /// Arc::new(SynchronizedQueue::with_capacity(1));
291 /// assert_eq!(
292 /// queue.dequeue_timeout(Duration::from_millis(1)).unwrap_err(),
293 /// TryDequeueError::Empty
294 /// );
295 /// let queue_clone = queue.clone();
296 /// let task = std::thread::spawn(move || {
297 /// std::thread::sleep(Duration::from_millis(1));
298 /// queue_clone.try_enqueue([0]).unwrap();
299 /// });
300 /// assert_eq!(
301 /// queue
302 /// .dequeue_timeout(Duration::from_secs(1))
303 /// .unwrap()
304 /// .deref(),
305 /// &[0]
306 /// );
307 /// ```
308 pub fn dequeue_timeout(
309 &self,
310 timeout: Duration,
311 ) -> Result<BufferSlice<B, SynchronizedNotifier>, TryDequeueError> {
312 self.dequeue_sync(Some(Instant::now() + timeout))
313 }
314
315 /// Dequeues a buffer with all enqueued values from the queue.
316 ///
317 /// This method extends [`try_dequeue`](Queue::try_dequeue) by waiting asynchronously
318 /// [`SynchronizedNotifier::notify_dequeue`] call, i.e. when a value is enqueued, in case of
319 /// empty queue.
320 ///
321 /// # Examples
322 /// ```
323 /// # use std::ops::Deref;
324 /// # use std::sync::Arc;
325 /// # use swap_buffer_queue::SynchronizedQueue;
326 /// # use swap_buffer_queue::buffer::VecBuffer;
327 /// # use swap_buffer_queue::error::{DequeueError, TryDequeueError};
328 /// # tokio_test::block_on(async {
329 /// let queue: Arc<SynchronizedQueue<VecBuffer<usize>>> =
330 /// Arc::new(SynchronizedQueue::with_capacity(1));
331 /// assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Empty);
332 /// // queue is empty, let's spawn a dequeuing task and enqueue
333 /// let queue_clone = queue.clone();
334 /// let task = tokio::spawn(async move {
335 /// Ok::<_, DequeueError>(
336 /// queue_clone
337 /// .dequeue_async()
338 /// .await?
339 /// .into_iter()
340 /// .collect::<Vec<_>>(),
341 /// )
342 /// });
343 /// queue.try_enqueue([0]).unwrap();
344 /// // dequeuing task has succeeded
345 /// assert_eq!(task.await.unwrap().unwrap().deref(), &[0]);
346 /// // let's close the queue
347 /// let queue_clone = queue.clone();
348 /// let task = tokio::spawn(async move {
349 /// Ok::<_, DequeueError>(
350 /// queue_clone
351 /// .dequeue_async()
352 /// .await?
353 /// .into_iter()
354 /// .collect::<Vec<_>>(),
355 /// )
356 /// });
357 /// queue.close();
358 /// assert_eq!(task.await.unwrap().unwrap_err(), DequeueError::Closed);
359 /// # })
360 /// ```
361 pub async fn dequeue_async(
362 &self,
363 ) -> Result<BufferSlice<B, SynchronizedNotifier>, DequeueError> {
364 poll_fn(|cx| {
365 if let Some(res) = try_dequeue(self, Some(cx)) {
366 return Poll::Ready(res.map_err(dequeue_err));
367 }
368 Poll::Pending
369 })
370 .await
371 }
372}
373
374impl<B> SynchronizedQueue<B>
375where
376 B: Buffer + Drain,
377{
378 /// Returns an iterator over the element of the queue (see [`BufferIter`](crate::buffer::BufferIter)).
379 ///
380 /// # Examples
381 /// ```
382 /// # use swap_buffer_queue::SynchronizedQueue;
383 /// # use swap_buffer_queue::buffer::VecBuffer;
384 /// let queue: SynchronizedQueue<VecBuffer<usize>> = SynchronizedQueue::with_capacity(42);
385 /// queue.try_enqueue([0]).unwrap();
386 /// queue.try_enqueue([1]).unwrap();
387 ///
388 /// let mut iter = queue.iter();
389 /// assert_eq!(iter.next(), Some(0));
390 /// drop(iter);
391 /// let mut iter = queue.iter();
392 /// assert_eq!(iter.next(), Some(1));
393 /// queue.close(); // close in order to stop the iterator
394 /// assert_eq!(iter.next(), None);
395 /// ```
396 pub fn iter(&self) -> impl Iterator<Item = B::Value> + '_ {
397 iter::repeat_with(|| self.dequeue())
398 .map_while(|res| res.ok())
399 .flatten()
400 }
401
402 #[cfg(feature = "stream")]
403 /// Returns an stream over the element of the queue (see [`BufferIter`](crate::buffer::BufferIter)).
404 ///
405 /// # Examples
406 /// ```
407 /// # use futures_util::StreamExt;
408 /// # use swap_buffer_queue::SynchronizedQueue;
409 /// # use swap_buffer_queue::buffer::VecBuffer;
410 /// # tokio_test::block_on(async {
411 /// let queue: SynchronizedQueue<VecBuffer<usize>> = SynchronizedQueue::with_capacity(42);
412 /// queue.try_enqueue([0]).unwrap();
413 /// queue.try_enqueue([1]).unwrap();
414 ///
415 /// let mut stream = Box::pin(queue.stream());
416 /// assert_eq!(stream.next().await, Some(0));
417 /// drop(stream);
418 /// let mut stream = Box::pin(queue.stream());
419 /// assert_eq!(stream.next().await, Some(1));
420 /// queue.close(); // close in order to stop the stream
421 /// assert_eq!(stream.next().await, None);
422 /// # })
423 /// ```
424 pub fn stream(&self) -> impl futures_core::Stream<Item = B::Value> + '_ {
425 use futures_util::{stream, StreamExt};
426 stream::repeat_with(|| stream::once(self.dequeue_async()))
427 .flatten()
428 .take_while(|res| {
429 let is_ok = res.is_ok();
430 async move { is_ok }
431 })
432 .flat_map(|res| stream::iter(res.unwrap()))
433 }
434}
435
436#[inline]
437fn try_enqueue<B, T>(
438 queue: &SynchronizedQueue<B>,
439 mut value: T,
440 cx: Option<&Context>,
441) -> Result<Result<(), TryEnqueueError<T>>, T>
442where
443 B: Buffer,
444 T: InsertIntoBuffer<B>,
445{
446 for _ in 0..SPIN_LIMIT {
447 match queue.try_enqueue(value) {
448 Err(TryEnqueueError::InsufficientCapacity(v)) if v.size() <= queue.capacity() => {
449 value = v;
450 }
451 res => return Ok(res),
452 };
453 hint::spin_loop();
454 }
455 queue.notify().enqueuers.register(cx);
456 match queue.try_enqueue(value) {
457 Err(TryEnqueueError::InsufficientCapacity(v)) if v.size() <= queue.capacity() => Err(v),
458 res => Ok(res),
459 }
460}
461
462#[inline]
463fn try_dequeue<'a, B>(
464 queue: &'a SynchronizedQueue<B>,
465 cx: Option<&Context>,
466) -> Option<Result<BufferSlice<'a, B, SynchronizedNotifier>, TryDequeueError>>
467where
468 B: Buffer,
469{
470 for _ in 0..SPIN_LIMIT {
471 match queue.try_dequeue() {
472 Err(TryDequeueError::Empty | TryDequeueError::Pending) => {}
473 res => return Some(res),
474 }
475 hint::spin_loop();
476 }
477 queue.notify().dequeuer.register(cx);
478 match queue.try_dequeue() {
479 Err(TryDequeueError::Empty | TryDequeueError::Pending) => None,
480 res => Some(res),
481 }
482}
483
484#[inline]
485fn dequeue_err(error: TryDequeueError) -> DequeueError {
486 match error {
487 TryDequeueError::Closed => DequeueError::Closed,
488 TryDequeueError::Conflict => DequeueError::Conflict,
489 _ => unreachable!(),
490 }
491}
492
493#[inline]
494fn wait_until(deadline: Option<Instant>) -> bool {
495 match deadline.map(|d| d.checked_duration_since(Instant::now())) {
496 #[cfg(not(all(loom, test)))]
497 Some(Some(timeout)) => thread::park_timeout(timeout),
498 #[cfg(all(loom, test))]
499 Some(Some(_)) => panic!("loom doesn't support park_timeout"),
500 Some(None) => return true,
501 None => thread::park(),
502 }
503 false
504}