swap_buffer_queue/queue.rs
1use core::{fmt, num::NonZeroUsize, ops::Range};
2
3use crossbeam_utils::CachePadded;
4
5use crate::{
6 buffer::{Buffer, BufferSlice, Drain, InsertIntoBuffer, Resize},
7 error::{TryDequeueError, TryEnqueueError},
8 loom::{
9 hint,
10 sync::atomic::{AtomicUsize, Ordering},
11 LoomUnsafeCell, BACKOFF_LIMIT, SPIN_LIMIT,
12 },
13 notify::Notify,
14};
15
16const CLOSED_FLAG: usize = (usize::MAX >> 1) + 1;
17const DEQUEUING_LOCKED: usize = usize::MAX;
18
19/// Atomic usize with the following (64bit) representation
20/// 64------------63---------------------1--------------0
21/// | closed flag | enqueuing capacity | buffer index |
22/// +-------------+----------------------+--------------+
23/// *buffer index* bit is the index (0 or 1) of the enqueuing buffer
24/// *enqueuing capacity* is the remaining enqueuing capacity, starting at the capacity of the
25/// buffer, and decreasing until zero
26/// *closed flag* is a bit flag to mark the queue as closed
27#[derive(Copy, Clone)]
28#[repr(transparent)]
29struct EnqueuingCapacity(usize);
30
31impl EnqueuingCapacity {
32 #[inline]
33 fn new(buffer_index: usize, capacity: usize) -> Self {
34 assert!(capacity << 1 < CLOSED_FLAG);
35 Self(buffer_index | (capacity << 1))
36 }
37
38 #[inline] // I've found compiler not inlining this function
39 fn buffer_index(self) -> usize {
40 self.0 & 1
41 }
42
43 #[inline]
44 fn remaining_capacity(self) -> usize {
45 (self.0 & !CLOSED_FLAG) >> 1
46 }
47
48 #[inline]
49 fn is_closed(self) -> bool {
50 self.0 & CLOSED_FLAG != 0
51 }
52
53 #[inline]
54 fn try_reserve(self, size: NonZeroUsize) -> Option<Self> {
55 self.0.checked_sub(size.get() << 1).map(Self)
56 }
57
58 #[inline]
59 fn with_closed(self, enqueuing: Self) -> Self {
60 Self(self.0 | (enqueuing.0 & CLOSED_FLAG))
61 }
62
63 #[inline]
64 fn from_atomic(atomic: usize) -> Self {
65 Self(atomic)
66 }
67
68 #[inline]
69 fn into_atomic(self) -> usize {
70 self.0
71 }
72
73 #[inline]
74 fn close(atomic: &AtomicUsize, ordering: Ordering) {
75 atomic.fetch_or(CLOSED_FLAG, ordering);
76 }
77
78 #[inline]
79 fn reopen(atomic: &AtomicUsize, ordering: Ordering) {
80 atomic.fetch_and(!CLOSED_FLAG, ordering);
81 }
82
83 #[inline]
84 fn check_overflow(capacity: usize) {
85 assert!(
86 capacity < usize::MAX >> 2,
87 "capacity must be lower than `usize::MAX >> 2`"
88 );
89 }
90}
91
92/// Atomic usize with the following (64bit) representation
93/// 64-------------------1--------------0
94/// | dequeuing length | buffer index |
95/// +--------------------+--------------+
96/// *buffer index* bit is the index (0 or 1) of the dequeuing buffer
97/// *dequeueing length* is the length currently dequeued
98#[derive(Copy, Clone)]
99struct DequeuingLength(usize);
100
101impl DequeuingLength {
102 #[inline]
103 fn new(buffer_index: usize, length: usize) -> Self {
104 Self(buffer_index | length << 1)
105 }
106
107 #[inline]
108 fn buffer_index(self) -> usize {
109 self.0 & 1
110 }
111
112 #[inline]
113 fn buffer_len(self) -> usize {
114 self.0 >> 1
115 }
116
117 #[inline]
118 fn try_from_atomic(atomic: usize) -> Result<Self, TryDequeueError> {
119 if atomic != DEQUEUING_LOCKED {
120 Ok(Self(atomic))
121 } else {
122 Err(TryDequeueError::Conflict)
123 }
124 }
125
126 #[inline]
127 fn into_atomic(self) -> usize {
128 self.0
129 }
130}
131
132/// A buffered MPSC "swap-buffer" queue.
133pub struct Queue<B, N = ()>
134where
135 B: Buffer,
136{
137 enqueuing_capacity: CachePadded<AtomicUsize>,
138 dequeuing_length: CachePadded<AtomicUsize>,
139 buffers: [LoomUnsafeCell<B>; 2],
140 buffers_length: [CachePadded<AtomicUsize>; 2],
141 capacity: AtomicUsize,
142 notify: N,
143}
144
145// Needed for `BufferIter`
146impl<B, N> AsRef<Queue<B, N>> for Queue<B, N>
147where
148 B: Buffer,
149{
150 fn as_ref(&self) -> &Queue<B, N> {
151 self
152 }
153}
154
155// SAFETY: Buffer access is synchronized by the algorithm, but `Send` is required
156// because it is owned by the queue
157unsafe impl<B, N> Send for Queue<B, N>
158where
159 B: Buffer + Send,
160 N: Send,
161{
162}
163// SAFETY: Buffer access is synchronized by the algorithm, but `Send` is required
164// because it is owned by the queue
165unsafe impl<B, N> Sync for Queue<B, N>
166where
167 B: Buffer + Send,
168 N: Sync,
169{
170}
171
172impl<B, N> Queue<B, N>
173where
174 B: Buffer,
175 N: Default,
176{
177 /// Create a new queue using buffer default.
178 ///
179 /// Buffer default may have a non-zero capacity, e.g. array buffer.
180 ///
181 /// # Examples
182 /// ```
183 /// # use swap_buffer_queue::Queue;
184 /// # use swap_buffer_queue::buffer::VecBuffer;
185 /// let queue: Queue<VecBuffer<usize>> = Queue::new();
186 /// ```
187 pub fn new() -> Self {
188 let buffers: [LoomUnsafeCell<B>; 2] = Default::default();
189 // https://github.com/tokio-rs/loom/issues/277#issuecomment-1633262296
190 // SAFETY: exclusive reference to `buffers`
191 let capacity = buffers[0].with_mut(|buf| unsafe { &*buf }.capacity());
192 EnqueuingCapacity::check_overflow(capacity);
193 Self {
194 enqueuing_capacity: AtomicUsize::new(EnqueuingCapacity::new(0, capacity).into_atomic())
195 .into(),
196 dequeuing_length: AtomicUsize::new(DequeuingLength::new(1, 0).into_atomic()).into(),
197 buffers,
198 buffers_length: Default::default(),
199 capacity: AtomicUsize::new(capacity),
200 notify: Default::default(),
201 }
202 }
203}
204
205impl<B, N> Queue<B, N>
206where
207 B: Buffer + Resize,
208 N: Default,
209{
210 /// Creates a new queue with the given capacity.
211 ///
212 /// # Examples
213 /// ```
214 /// # use swap_buffer_queue::Queue;
215 /// # use swap_buffer_queue::buffer::VecBuffer;
216 /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
217 /// ```
218 pub fn with_capacity(capacity: usize) -> Self {
219 EnqueuingCapacity::check_overflow(capacity);
220 let buffers: [LoomUnsafeCell<B>; 2] = Default::default();
221 // https://github.com/tokio-rs/loom/issues/277#issuecomment-1633262296
222 // SAFETY: exclusive reference to `buffers`
223 buffers[0].with_mut(|buf| unsafe { &mut *buf }.resize(capacity));
224 // SAFETY: exclusive reference to `buffers`
225 buffers[1].with_mut(|buf| unsafe { &mut *buf }.resize(capacity));
226 Self {
227 enqueuing_capacity: AtomicUsize::new(EnqueuingCapacity::new(0, capacity).into_atomic())
228 .into(),
229 dequeuing_length: AtomicUsize::new(DequeuingLength::new(1, 0).into_atomic()).into(),
230 buffers,
231 buffers_length: Default::default(),
232 capacity: AtomicUsize::new(capacity),
233 notify: Default::default(),
234 }
235 }
236}
237
238impl<B, N> Queue<B, N>
239where
240 B: Buffer,
241{
242 /// Returns queue's [`Notify`] implementor.
243 ///
244 /// # Examples
245 /// ```
246 /// # use swap_buffer_queue::Queue;
247 /// # use swap_buffer_queue::buffer::VecBuffer;
248 /// use swap_buffer_queue::notify::Notify;
249 ///
250 /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
251 /// queue.notify().notify_dequeue();
252 /// ```
253 #[inline]
254 pub fn notify(&self) -> &N {
255 &self.notify
256 }
257
258 /// Returns the current enqueuing buffer capacity.
259 ///
260 /// # Examples
261 /// ```
262 /// # use swap_buffer_queue::Queue;
263 /// # use swap_buffer_queue::buffer::VecBuffer;
264 /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
265 /// assert_eq!(queue.capacity(), 42);
266 /// ```
267 #[inline]
268 pub fn capacity(&self) -> usize {
269 // cannot use `Buffer::capacity` because of data race
270 self.capacity.load(Ordering::Relaxed)
271 }
272
273 /// Returns the current enqueuing buffer length.
274 ///
275 /// # Examples
276 /// ```
277 /// # use swap_buffer_queue::Queue;
278 /// # use swap_buffer_queue::buffer::VecBuffer;
279 /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
280 /// assert_eq!(queue.len(), 0);
281 /// queue.try_enqueue([0]).unwrap();
282 /// assert_eq!(queue.len(), 1);
283 /// ```
284 #[inline]
285 pub fn len(&self) -> usize {
286 let enqueuing =
287 EnqueuingCapacity::from_atomic(self.enqueuing_capacity.load(Ordering::Relaxed));
288 self.capacity()
289 .saturating_sub(enqueuing.remaining_capacity())
290 }
291
292 /// Returns `true` if the current enqueuing buffer is empty.
293 ///
294 /// # Examples
295 /// ```
296 /// # use swap_buffer_queue::Queue;
297 /// # use swap_buffer_queue::buffer::VecBuffer;
298 /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
299 /// assert!(queue.is_empty());
300 /// ```
301 #[inline]
302 pub fn is_empty(&self) -> bool {
303 self.len() == 0
304 }
305
306 /// Returns `true` if the queue is closed.
307 ///
308 /// # Examples
309 /// ```
310 /// # use swap_buffer_queue::Queue;
311 /// # use swap_buffer_queue::buffer::VecBuffer;
312 /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
313 /// assert!(!queue.is_closed());
314 /// queue.close();
315 /// assert!(queue.is_closed());
316 /// ```
317 #[inline]
318 pub fn is_closed(&self) -> bool {
319 EnqueuingCapacity::from_atomic(self.enqueuing_capacity.load(Ordering::Relaxed)).is_closed()
320 }
321
322 /// Reopen a closed queue.
323 ///
324 /// Calling this method when the queue is not closed has no effect.
325 ///
326 /// # Examples
327 /// ```
328 /// # use swap_buffer_queue::Queue;
329 /// # use swap_buffer_queue::buffer::VecBuffer;
330 /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
331 /// queue.close();
332 /// assert!(queue.is_closed());
333 /// queue.reopen();
334 /// assert!(!queue.is_closed());
335 /// ```
336 #[inline]
337 pub fn reopen(&self) {
338 EnqueuingCapacity::reopen(&self.enqueuing_capacity, Ordering::AcqRel);
339 }
340
341 #[inline]
342 fn lock_dequeuing(&self) -> Result<DequeuingLength, TryDequeueError> {
343 // Protect from concurrent dequeuing by swapping the dequeuing length with a constant
344 // marking dequeuing conflict.
345 DequeuingLength::try_from_atomic(
346 self.dequeuing_length
347 .swap(DEQUEUING_LOCKED, Ordering::Relaxed),
348 )
349 }
350
351 #[allow(clippy::type_complexity)]
352 const NO_RESIZE: Option<fn(&mut B) -> (bool, usize)> = None;
353
354 #[inline]
355 fn try_dequeue_internal(
356 &self,
357 dequeuing: DequeuingLength,
358 notify_enqueue: impl Fn(),
359 resize: Option<impl FnOnce(&mut B) -> (bool, usize)>,
360 ) -> Result<BufferSlice<B, N>, TryDequeueError> {
361 // If dequeuing length is greater than zero, it means than previous dequeuing is still
362 // ongoing, either because previous `try_dequeue` operation returns pending error,
363 // or because requeuing (after partial draining for example).
364 if let Some(len) = NonZeroUsize::new(dequeuing.buffer_len()) {
365 return self
366 .try_dequeue_spin(dequeuing.buffer_index(), len)
367 .ok_or(TryDequeueError::Pending);
368 }
369 let next_buffer_index = dequeuing.buffer_index();
370 let (resized, inserted_length, next_capa) =
371 self.buffers[next_buffer_index].with_mut(|next_buf| {
372 // SAFETY: Dequeuing buffer can be accessed mutably
373 let next_buffer = unsafe { &mut *next_buf };
374 // Resize buffer if needed.
375 let (resized, inserted_length) = resize.map_or((false, 0), |f| f(next_buffer));
376 (resized, inserted_length, next_buffer.capacity())
377 });
378 if inserted_length > 0 {
379 self.buffers_length[next_buffer_index].fetch_add(inserted_length, Ordering::Relaxed);
380 }
381 let mut enqueuing =
382 EnqueuingCapacity::from_atomic(self.enqueuing_capacity.load(Ordering::Acquire));
383 debug_assert_ne!(dequeuing.buffer_index(), enqueuing.buffer_index());
384 let capacity =
385 // SAFETY: Enqueuing buffer can be immutably accessed.
386 self.buffers[enqueuing.buffer_index()].with(|buf| unsafe { &*buf }.capacity());
387 // If buffer is empty and has not be resized, return an error (and store back dequeuing)
388 if enqueuing.remaining_capacity() == capacity && !resized && inserted_length == 0 {
389 self.dequeuing_length
390 .store(dequeuing.into_atomic(), Ordering::Relaxed);
391 return Err(if enqueuing.is_closed() {
392 TryDequeueError::Closed
393 } else {
394 TryDequeueError::Empty
395 });
396 }
397 // Swap buffers: previous dequeuing buffer become the enqueuing one
398 let next_enqueuing = EnqueuingCapacity::new(next_buffer_index, next_capa - inserted_length);
399 let mut backoff = 0;
400 while let Err(enq) = self.enqueuing_capacity.compare_exchange_weak(
401 enqueuing.into_atomic(),
402 next_enqueuing.with_closed(enqueuing).into_atomic(),
403 Ordering::AcqRel,
404 Ordering::Relaxed,
405 ) {
406 enqueuing = EnqueuingCapacity::from_atomic(enq);
407 // Spin in case of concurrent modifications, except when the buffer is full ofc.
408 if enqueuing.remaining_capacity() != 0 {
409 for _ in 0..1 << backoff {
410 hint::spin_loop();
411 }
412 if backoff < BACKOFF_LIMIT {
413 backoff += 1;
414 }
415 }
416 }
417 // Update the queue capacity if needed.
418 if self.capacity() != next_capa {
419 self.capacity.store(next_capa, Ordering::Relaxed);
420 }
421 // Notify enqueuers.
422 notify_enqueue();
423 match NonZeroUsize::new(capacity - enqueuing.remaining_capacity()) {
424 // Try to wait ongoing insertions and take ownership of the buffer, then return the
425 // buffer slice
426 Some(len) => self
427 .try_dequeue_spin(enqueuing.buffer_index(), len)
428 .ok_or(TryDequeueError::Pending),
429 // If the enqueuing buffer was empty, but values has been inserted while resizing,
430 // retry.
431 None if inserted_length > 0 => self.try_dequeue_internal(
432 DequeuingLength::new(enqueuing.buffer_index(), 0),
433 notify_enqueue,
434 Self::NO_RESIZE,
435 ),
436 // Otherwise, (empty enqueuing buffer, resized dequeuing one), acknowledge the swap and
437 // return empty error
438 None => {
439 debug_assert!(resized);
440 self.dequeuing_length.store(
441 DequeuingLength::new(enqueuing.buffer_index(), 0).into_atomic(),
442 Ordering::Relaxed,
443 );
444 Err(TryDequeueError::Empty)
445 }
446 }
447 }
448
449 fn try_dequeue_spin(
450 &self,
451 buffer_index: usize,
452 length: NonZeroUsize,
453 ) -> Option<BufferSlice<B, N>> {
454 for _ in 0..SPIN_LIMIT {
455 // Buffers having been swapped, no more enqueuing can happen, we still need to wait
456 // for ongoing one. They will be finished when the buffer length (updated after
457 // enqueuing) is equal to the expected one.
458 // Also, requeuing with potential draining can lead to have an expected length lower
459 // than the effective buffer length.
460 let buffer_len = self.buffers_length[buffer_index].load(Ordering::Acquire);
461 if buffer_len >= length.get() {
462 // Returns the slice (range can be shortened by draining + requeuing).
463 let range = buffer_len - length.get()..buffer_len;
464 let slice = self.buffers[buffer_index]
465 // SAFETY: All enqueuings are done, and buffers having been swapped, this buffer
466 // can now be accessed mutably.
467 // SAFETY: All enqueuing are done, range has been inserted.
468 .with_mut(|buf| unsafe { (*buf).slice(range.clone()) });
469 return Some(BufferSlice::new(self, buffer_index, range, slice));
470 }
471 hint::spin_loop();
472 }
473 // If the enqueuing are still ongoing, just save the dequeuing state in order to retry.
474 self.dequeuing_length.store(
475 DequeuingLength::new(buffer_index, length.get()).into_atomic(),
476 Ordering::Relaxed,
477 );
478 None
479 }
480
481 pub(crate) fn release(&self, buffer_index: usize, range: Range<usize>) {
482 // Clears the dequeuing buffer and its length, and release the dequeuing "lock".
483 // SAFETY: Dequeued buffer pointed by buffer index can be accessed mutably
484 // (see `Queue::try_dequeue_spin`).
485 // SAFETY: Range comes from the dequeued slice, so it has been previously inserted.
486 self.buffers[buffer_index].with_mut(|buf| unsafe { (*buf).clear(range) });
487 self.buffers_length[buffer_index].store(0, Ordering::Release);
488 self.dequeuing_length.store(
489 DequeuingLength::new(buffer_index, 0).into_atomic(),
490 Ordering::Relaxed,
491 );
492 }
493
494 #[inline]
495 pub(crate) fn get_slice(&self, buffer_index: usize, range: Range<usize>) -> B::Slice<'_> {
496 self.buffers[buffer_index]
497 // SAFETY: Dequeued buffer pointed by buffer index can be accessed mutably
498 // (see `Queue::try_dequeue_spin`).
499 // SAFETY: Range comes from the dequeued slice, so it has been previously inserted.
500 .with_mut(|buf| unsafe { (*buf).slice(range.clone()) })
501 }
502
503 #[inline]
504 pub(crate) fn requeue(&self, buffer_index: usize, range: Range<usize>) {
505 // Requeuing the buffer just means saving the dequeuing state (or release if there is
506 // nothing to requeue).
507 let length = range.end - range.start;
508 if length > 0 {
509 self.dequeuing_length.store(
510 DequeuingLength::new(buffer_index, length).into_atomic(),
511 Ordering::Relaxed,
512 );
513 } else {
514 self.release(buffer_index, range);
515 }
516 }
517}
518
519impl<B, N> Queue<B, N>
520where
521 B: Buffer,
522 N: Notify,
523{
524 /// Tries enqueuing the given value into the queue.
525 ///
526 /// Enqueuing will fail if the queue has insufficient capacity, or if it is closed. In case of
527 /// success, it will notify waiting dequeuing operations using [`Notify::notify_dequeue`].
528 ///
529 /// Enqueuing a zero-sized value is a no-op.
530 ///
531 /// # Examples
532 /// ```
533 /// # use swap_buffer_queue::Queue;
534 /// # use swap_buffer_queue::buffer::VecBuffer;
535 /// # use swap_buffer_queue::error::TryEnqueueError;
536 /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(1);
537 /// queue.try_enqueue([0]).unwrap();
538 /// // queue is full
539 /// assert_eq!(
540 /// queue.try_enqueue([0]),
541 /// Err(TryEnqueueError::InsufficientCapacity([0]))
542 /// );
543 /// // let's close the queue
544 /// queue.close();
545 /// assert_eq!(queue.try_enqueue([0]), Err(TryEnqueueError::Closed([0])));
546 /// ```
547 pub fn try_enqueue<T>(&self, value: T) -> Result<(), TryEnqueueError<T>>
548 where
549 T: InsertIntoBuffer<B>,
550 {
551 // Compare-and-swap loop with backoff in order to mitigate contention on the atomic field
552 let Some(value_size) = NonZeroUsize::new(value.size()) else {
553 return Ok(());
554 };
555 let mut enqueuing =
556 EnqueuingCapacity::from_atomic(self.enqueuing_capacity.load(Ordering::Acquire));
557 let mut backoff = None;
558 loop {
559 // Check if the queue is not closed and try to reserve a slice of the buffer.
560 if enqueuing.is_closed() {
561 return Err(TryEnqueueError::Closed(value));
562 }
563 let Some(next_enq) = enqueuing.try_reserve(value_size) else {
564 return Err(TryEnqueueError::InsufficientCapacity(value));
565 };
566 if let Some(ref mut backoff) = backoff {
567 for _ in 0..1 << *backoff {
568 hint::spin_loop();
569 }
570 if *backoff < BACKOFF_LIMIT {
571 *backoff += 1;
572 }
573 }
574 match self.enqueuing_capacity.compare_exchange_weak(
575 enqueuing.into_atomic(),
576 next_enq.into_atomic(),
577 Ordering::AcqRel,
578 Ordering::Relaxed,
579 ) {
580 Ok(_) => break,
581 Err(enq) => {
582 enqueuing = EnqueuingCapacity::from_atomic(enq);
583 // Spin in case of concurrent modification, except when the buffer index has
584 // modified, which may mean conflict was due to dequeuing.
585 backoff = (next_enq.buffer_index() == enqueuing.buffer_index())
586 .then(|| backoff.unwrap_or(0));
587 }
588 }
589 }
590 // Insert the value into the buffer at the index given by subtracting the remaining
591 // capacity to the buffer one.
592 self.buffers[enqueuing.buffer_index()].with(|buf| {
593 // SAFETY: As long as enqueuing is ongoing, i.e. a reserved slice has not been acknowledged
594 // in the buffer length (see `BufferWithLength::insert`), buffer cannot be dequeued and can
595 // thus be accessed immutably (see `Queue::try_dequeue_spin`).
596 let buffer = unsafe { &*buf };
597 let index = buffer.capacity() - enqueuing.remaining_capacity();
598 // SAFETY: Compare-and-swap makes indexes not overlap, and the buffer is cleared before
599 // reusing it for enqueuing (see `Queue::release`).
600 unsafe { value.insert_into(buffer, index) };
601 });
602 self.buffers_length[enqueuing.buffer_index()].fetch_add(value_size.get(), Ordering::AcqRel);
603 // Notify dequeuer.
604 self.notify.notify_dequeue();
605 Ok(())
606 }
607
608 /// Tries dequeuing a buffer with all enqueued values from the queue.
609 ///
610 /// This method swaps the current buffer with the other one, which is empty. All concurrent
611 /// enqueuing must end before the the current buffer is really dequeuable, so the queue may
612 /// be in a transitory state where `try_dequeue` must be retried. In this state, after a spin
613 /// loop, this method will return a [`TryDequeueError::Pending`] error.
614 ///
615 /// Dequeuing also fails if the queue is empty, or if it is closed. Moreover, as the algorithm
616 /// is MPSC, dequeuing is protected against concurrent calls, failing with
617 /// [`TryDequeueError::Conflict`] error.
618 ///
619 /// It returns a [`BufferSlice`], which holds, as its name may indicate, a reference to the
620 /// dequeued buffer. That's why, the concurrent dequeuing protection is maintained for the
621 /// lifetime of the buffer slice.
622 ///
623 /// # Examples
624 /// ```
625 /// # use std::ops::Deref;
626 /// # use swap_buffer_queue::Queue;
627 /// # use swap_buffer_queue::buffer::VecBuffer;
628 /// # use swap_buffer_queue::error::TryDequeueError;
629 /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
630 /// queue.try_enqueue([0]).unwrap();
631 /// queue.try_enqueue([1]).unwrap();
632 /// {
633 /// let slice = queue.try_dequeue().unwrap();
634 /// assert_eq!(slice.deref(), &[0, 1]);
635 /// // dequeuing cannot be done concurrently (`slice` is still in scope)
636 /// assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Conflict);
637 /// }
638 /// // let's close the queue
639 /// queue.try_enqueue([2]).unwrap();
640 /// queue.close();
641 /// // queue can be dequeued while closed when not empty
642 /// {
643 /// let slice = queue.try_dequeue().unwrap();
644 /// assert_eq!(slice.deref(), &[2]);
645 /// }
646 /// assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Closed)
647 /// ```
648 pub fn try_dequeue(&self) -> Result<BufferSlice<B, N>, TryDequeueError> {
649 self.try_dequeue_internal(
650 self.lock_dequeuing()?,
651 || self.notify.notify_enqueue(),
652 Self::NO_RESIZE,
653 )
654 }
655
656 /// Closes the queue.
657 ///
658 /// Closed queue can no more accept enqueuing, but it can be dequeued while not empty.
659 /// Calling this method on a closed queue has no effect.
660 /// See [`reopen`](Queue::reopen) to reopen a closed queue.
661 /// # Examples
662 /// ```
663 /// # use std::ops::Deref;
664 /// # use swap_buffer_queue::Queue;
665 /// # use swap_buffer_queue::buffer::VecBuffer;
666 /// # use swap_buffer_queue::error::{TryDequeueError, TryEnqueueError};
667 /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
668 /// queue.try_enqueue([0]).unwrap();
669 /// queue.close();
670 /// assert!(queue.is_closed());
671 /// assert_eq!(queue.try_enqueue([1]), Err(TryEnqueueError::Closed([1])));
672 /// assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
673 /// assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Closed);
674 /// ```
675 pub fn close(&self) {
676 EnqueuingCapacity::close(&self.enqueuing_capacity, Ordering::AcqRel);
677 self.notify.notify_dequeue();
678 self.notify.notify_enqueue();
679 }
680}
681
682impl<B, N> Queue<B, N>
683where
684 B: Buffer + Resize,
685 N: Notify,
686{
687 /// Tries dequeuing a buffer with all enqueued values from the queue, and resizes the next
688 /// buffer to be used for enqueuing.
689 ///
690 /// This method is an extension of [`try_dequeue`](Queue::try_dequeue) method. In fact,
691 /// before swapping the buffers, next one is empty and protected, so it can be resized, and
692 /// it is also possible to add values in it before making it available for enqueuing.
693 /// This can be used to make the queue [unbounded](Queue#an-amortized-unbounded-recipe).
694 ///
695 /// It is worth to be noted that only one buffer is resized, so it can lead to asymmetric buffers.
696 ///
697 /// # Examples
698 /// ```
699 /// # use std::ops::Deref;
700 /// # use swap_buffer_queue::Queue;
701 /// # use swap_buffer_queue::buffer::VecBuffer;
702 /// # use swap_buffer_queue::error::TryEnqueueError;
703 /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(1);
704 /// queue.try_enqueue([0]).unwrap();
705 /// // queue is full
706 /// assert_eq!(
707 /// queue.try_enqueue([1]),
708 /// Err(TryEnqueueError::InsufficientCapacity([1]))
709 /// );
710 /// // dequeue and resize, inserting elements before the buffer is available
711 /// {
712 /// let slice = queue
713 /// .try_dequeue_and_resize(3, Some(|| std::iter::once([42])))
714 /// .unwrap();
715 /// assert_eq!(slice.deref(), &[0]);
716 /// }
717 /// // capacity has been increased
718 /// queue.try_enqueue([1]).unwrap();
719 /// queue.try_enqueue([2]).unwrap();
720 /// let slice = queue.try_dequeue().unwrap();
721 /// assert_eq!(slice.deref(), &[42, 1, 2]);
722 /// ```
723 ///
724 /// ## An amortized unbounded recipe
725 ///
726 /// ```rust
727 /// # use std::ops::Deref;
728 /// # use std::sync::Mutex;
729 /// # use swap_buffer_queue::Queue;
730 /// # use swap_buffer_queue::buffer::{BufferSlice, InsertIntoBuffer, VecBuffer};
731 /// # use swap_buffer_queue::error::{EnqueueError, TryDequeueError, TryEnqueueError};
732 /// # use swap_buffer_queue::notify::Notify;
733 /// fn enqueue_unbounded<T>(
734 /// queue: &Queue<VecBuffer<T>>,
735 /// overflow: &Mutex<Vec<[T; 1]>>,
736 /// mut value: T,
737 /// ) -> Result<(), EnqueueError<[T; 1]>> {
738 /// // first, try to enqueue normally
739 /// match queue.try_enqueue([value]) {
740 /// Err(TryEnqueueError::InsufficientCapacity([v])) => value = v,
741 /// res => return res,
742 /// };
743 /// // if the enqueuing fails, lock the overflow
744 /// let mut guard = overflow.lock().unwrap();
745 /// // retry to enqueue (we never know what happened during lock acquisition)
746 /// match queue.try_enqueue([value]) {
747 /// Err(TryEnqueueError::InsufficientCapacity([v])) => value = v,
748 /// res => return res,
749 /// };
750 /// // then push the values to the overflow vector
751 /// guard.push([value]);
752 /// drop(guard);
753 /// // notify possible waiting dequeue
754 /// queue.notify().notify_dequeue();
755 /// Ok(())
756 /// }
757 ///
758 /// fn try_dequeue_unbounded<'a, T>(
759 /// queue: &'a Queue<VecBuffer<T>>,
760 /// overflow: &Mutex<Vec<[T; 1]>>,
761 /// ) -> Result<BufferSlice<'a, VecBuffer<T>, ()>, TryDequeueError> {
762 /// // lock the overflow and use `try_dequeue_and_resize` to drain the overflow into the
763 /// // queue
764 /// let mut guard = overflow.lock().unwrap();
765 /// let vec = &mut guard;
766 /// // `{ vec }` is a trick to get the correct FnOnce inference
767 /// // https://stackoverflow.com/questions/74814588/why-does-rust-infer-fnmut-instead-of-fnonce-for-this-closure-even-though-inferr
768 /// queue.try_dequeue_and_resize(queue.capacity() + vec.len(), Some(|| { vec }.drain(..)))
769 /// }
770 ///
771 /// // queue is initialized with zero capacity
772 /// let queue: Queue<VecBuffer<usize>> = Queue::new();
773 /// let overflow = Mutex::new(Vec::new());
774 /// assert_eq!(queue.capacity(), 0);
775 /// enqueue_unbounded(&queue, &overflow, 0).unwrap();
776 /// assert_eq!(
777 /// try_dequeue_unbounded(&queue, &overflow).unwrap().deref(),
778 /// &[0]
779 /// );
780 /// enqueue_unbounded(&queue, &overflow, 1).unwrap();
781 /// enqueue_unbounded(&queue, &overflow, 2).unwrap();
782 /// assert_eq!(
783 /// try_dequeue_unbounded(&queue, &overflow).unwrap().deref(),
784 /// &[1, 2]
785 /// );
786 /// ```
787 pub fn try_dequeue_and_resize<I>(
788 &self,
789 capacity: impl Into<Option<usize>>,
790 insert: Option<impl FnOnce() -> I>,
791 ) -> Result<BufferSlice<B, N>, TryDequeueError>
792 where
793 I: IntoIterator,
794 I::Item: InsertIntoBuffer<B>,
795 {
796 self.try_dequeue_internal(
797 self.lock_dequeuing()?,
798 || self.notify.notify_enqueue(),
799 Some(move |buffer_mut: &mut B| {
800 let resized_capa = capacity
801 .into()
802 .filter(|capa| *capa != buffer_mut.capacity());
803 if let Some(capa) = resized_capa {
804 EnqueuingCapacity::check_overflow(capa);
805 buffer_mut.resize(capa);
806 }
807 let mut length = 0;
808 if let Some(insert) = insert {
809 for value in insert() {
810 let Some(value_size) = NonZeroUsize::new(value.size()) else {
811 continue;
812 };
813 if value_size.get() > buffer_mut.capacity() {
814 break;
815 }
816 // SAFETY: Ranges `length..length+value.size()` will obviously not overlap,
817 // and the buffer is cleared before reusing it for enqueuing
818 // (see `Queue::release`)
819 unsafe { value.insert_into(buffer_mut, length) };
820 length += value_size.get();
821 }
822 }
823 (resized_capa.is_some(), length)
824 }),
825 )
826 }
827}
828
829impl<B, N> Queue<B, N>
830where
831 B: Buffer + Drain,
832{
833 pub(crate) fn remove(&self, buffer_index: usize, index: usize) -> B::Value {
834 debug_assert_eq!(
835 self.dequeuing_length.load(Ordering::Relaxed),
836 DEQUEUING_LOCKED
837 );
838 // SAFETY: Dequeued buffer pointed by buffer index can be accessed mutably
839 // (see `Queue::try_dequeue_spin`).
840 // SAFETY: Index comes from an iterator on the dequeued slice, so it has
841 // been previously inserted, and can be removed.
842 self.buffers[buffer_index].with_mut(|buf| unsafe { (*buf).remove(index) })
843 }
844}
845
846impl<B, N> Default for Queue<B, N>
847where
848 B: Buffer,
849 N: Default,
850{
851 fn default() -> Self {
852 Self::new()
853 }
854}
855
856impl<B, N> fmt::Debug for Queue<B, N>
857where
858 B: Buffer,
859 N: fmt::Debug,
860{
861 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
862 f.debug_struct("Queue")
863 .field("capacity", &self.capacity())
864 .field("len", &self.len())
865 .field("notify", &self.notify)
866 .finish()
867 }
868}
869
870impl<B, N> Drop for Queue<B, N>
871where
872 B: Buffer,
873{
874 fn drop(&mut self) {
875 self.lock_dequeuing()
876 .and_then(|deq| self.try_dequeue_internal(deq, || (), Self::NO_RESIZE))
877 .ok();
878 }
879}