swap_buffer_queue/
queue.rs

1use core::{fmt, num::NonZeroUsize, ops::Range};
2
3use crossbeam_utils::CachePadded;
4
5use crate::{
6    buffer::{Buffer, BufferSlice, Drain, InsertIntoBuffer, Resize},
7    error::{TryDequeueError, TryEnqueueError},
8    loom::{
9        hint,
10        sync::atomic::{AtomicUsize, Ordering},
11        LoomUnsafeCell, BACKOFF_LIMIT, SPIN_LIMIT,
12    },
13    notify::Notify,
14};
15
16const CLOSED_FLAG: usize = (usize::MAX >> 1) + 1;
17const DEQUEUING_LOCKED: usize = usize::MAX;
18
19/// Atomic usize with the following (64bit) representation
20/// 64------------63---------------------1--------------0
21/// | closed flag |  enqueuing capacity  | buffer index |
22/// +-------------+----------------------+--------------+
23/// *buffer index* bit is the index (0 or 1) of the enqueuing buffer
24/// *enqueuing capacity* is the remaining enqueuing capacity, starting at the capacity of the
25/// buffer, and decreasing until zero
26/// *closed flag* is a bit flag to mark the queue as closed
27#[derive(Copy, Clone)]
28#[repr(transparent)]
29struct EnqueuingCapacity(usize);
30
31impl EnqueuingCapacity {
32    #[inline]
33    fn new(buffer_index: usize, capacity: usize) -> Self {
34        assert!(capacity << 1 < CLOSED_FLAG);
35        Self(buffer_index | (capacity << 1))
36    }
37
38    #[inline] // I've found compiler not inlining this function
39    fn buffer_index(self) -> usize {
40        self.0 & 1
41    }
42
43    #[inline]
44    fn remaining_capacity(self) -> usize {
45        (self.0 & !CLOSED_FLAG) >> 1
46    }
47
48    #[inline]
49    fn is_closed(self) -> bool {
50        self.0 & CLOSED_FLAG != 0
51    }
52
53    #[inline]
54    fn try_reserve(self, size: NonZeroUsize) -> Option<Self> {
55        self.0.checked_sub(size.get() << 1).map(Self)
56    }
57
58    #[inline]
59    fn with_closed(self, enqueuing: Self) -> Self {
60        Self(self.0 | (enqueuing.0 & CLOSED_FLAG))
61    }
62
63    #[inline]
64    fn from_atomic(atomic: usize) -> Self {
65        Self(atomic)
66    }
67
68    #[inline]
69    fn into_atomic(self) -> usize {
70        self.0
71    }
72
73    #[inline]
74    fn close(atomic: &AtomicUsize, ordering: Ordering) {
75        atomic.fetch_or(CLOSED_FLAG, ordering);
76    }
77
78    #[inline]
79    fn reopen(atomic: &AtomicUsize, ordering: Ordering) {
80        atomic.fetch_and(!CLOSED_FLAG, ordering);
81    }
82
83    #[inline]
84    fn check_overflow(capacity: usize) {
85        assert!(
86            capacity < usize::MAX >> 2,
87            "capacity must be lower than `usize::MAX >> 2`"
88        );
89    }
90}
91
92/// Atomic usize with the following (64bit) representation
93/// 64-------------------1--------------0
94/// |  dequeuing length  | buffer index |
95/// +--------------------+--------------+
96/// *buffer index* bit is the index (0 or 1) of the dequeuing buffer
97/// *dequeueing length* is the length currently dequeued
98#[derive(Copy, Clone)]
99struct DequeuingLength(usize);
100
101impl DequeuingLength {
102    #[inline]
103    fn new(buffer_index: usize, length: usize) -> Self {
104        Self(buffer_index | length << 1)
105    }
106
107    #[inline]
108    fn buffer_index(self) -> usize {
109        self.0 & 1
110    }
111
112    #[inline]
113    fn buffer_len(self) -> usize {
114        self.0 >> 1
115    }
116
117    #[inline]
118    fn try_from_atomic(atomic: usize) -> Result<Self, TryDequeueError> {
119        if atomic != DEQUEUING_LOCKED {
120            Ok(Self(atomic))
121        } else {
122            Err(TryDequeueError::Conflict)
123        }
124    }
125
126    #[inline]
127    fn into_atomic(self) -> usize {
128        self.0
129    }
130}
131
132/// A buffered MPSC "swap-buffer" queue.
133pub struct Queue<B, N = ()>
134where
135    B: Buffer,
136{
137    enqueuing_capacity: CachePadded<AtomicUsize>,
138    dequeuing_length: CachePadded<AtomicUsize>,
139    buffers: [LoomUnsafeCell<B>; 2],
140    buffers_length: [CachePadded<AtomicUsize>; 2],
141    capacity: AtomicUsize,
142    notify: N,
143}
144
145// Needed for `BufferIter`
146impl<B, N> AsRef<Queue<B, N>> for Queue<B, N>
147where
148    B: Buffer,
149{
150    fn as_ref(&self) -> &Queue<B, N> {
151        self
152    }
153}
154
155// SAFETY: Buffer access is synchronized by the algorithm, but `Send` is required
156// because it is owned by the queue
157unsafe impl<B, N> Send for Queue<B, N>
158where
159    B: Buffer + Send,
160    N: Send,
161{
162}
163// SAFETY: Buffer access is synchronized by the algorithm, but `Send` is required
164// because it is owned by the queue
165unsafe impl<B, N> Sync for Queue<B, N>
166where
167    B: Buffer + Send,
168    N: Sync,
169{
170}
171
172impl<B, N> Queue<B, N>
173where
174    B: Buffer,
175    N: Default,
176{
177    /// Create a new queue using buffer default.
178    ///
179    /// Buffer default may have a non-zero capacity, e.g. array buffer.
180    ///
181    /// # Examples
182    /// ```
183    /// # use swap_buffer_queue::Queue;
184    /// # use swap_buffer_queue::buffer::VecBuffer;
185    /// let queue: Queue<VecBuffer<usize>> = Queue::new();
186    /// ```
187    pub fn new() -> Self {
188        let buffers: [LoomUnsafeCell<B>; 2] = Default::default();
189        // https://github.com/tokio-rs/loom/issues/277#issuecomment-1633262296
190        // SAFETY: exclusive reference to `buffers`
191        let capacity = buffers[0].with_mut(|buf| unsafe { &*buf }.capacity());
192        EnqueuingCapacity::check_overflow(capacity);
193        Self {
194            enqueuing_capacity: AtomicUsize::new(EnqueuingCapacity::new(0, capacity).into_atomic())
195                .into(),
196            dequeuing_length: AtomicUsize::new(DequeuingLength::new(1, 0).into_atomic()).into(),
197            buffers,
198            buffers_length: Default::default(),
199            capacity: AtomicUsize::new(capacity),
200            notify: Default::default(),
201        }
202    }
203}
204
205impl<B, N> Queue<B, N>
206where
207    B: Buffer + Resize,
208    N: Default,
209{
210    /// Creates a new queue with the given capacity.
211    ///
212    /// # Examples
213    /// ```
214    /// # use swap_buffer_queue::Queue;
215    /// # use swap_buffer_queue::buffer::VecBuffer;
216    /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
217    /// ```
218    pub fn with_capacity(capacity: usize) -> Self {
219        EnqueuingCapacity::check_overflow(capacity);
220        let buffers: [LoomUnsafeCell<B>; 2] = Default::default();
221        // https://github.com/tokio-rs/loom/issues/277#issuecomment-1633262296
222        // SAFETY: exclusive reference to `buffers`
223        buffers[0].with_mut(|buf| unsafe { &mut *buf }.resize(capacity));
224        // SAFETY: exclusive reference to `buffers`
225        buffers[1].with_mut(|buf| unsafe { &mut *buf }.resize(capacity));
226        Self {
227            enqueuing_capacity: AtomicUsize::new(EnqueuingCapacity::new(0, capacity).into_atomic())
228                .into(),
229            dequeuing_length: AtomicUsize::new(DequeuingLength::new(1, 0).into_atomic()).into(),
230            buffers,
231            buffers_length: Default::default(),
232            capacity: AtomicUsize::new(capacity),
233            notify: Default::default(),
234        }
235    }
236}
237
238impl<B, N> Queue<B, N>
239where
240    B: Buffer,
241{
242    /// Returns queue's [`Notify`] implementor.
243    ///
244    /// # Examples
245    /// ```
246    /// # use swap_buffer_queue::Queue;
247    /// # use swap_buffer_queue::buffer::VecBuffer;
248    /// use swap_buffer_queue::notify::Notify;
249    ///
250    /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
251    /// queue.notify().notify_dequeue();
252    /// ```
253    #[inline]
254    pub fn notify(&self) -> &N {
255        &self.notify
256    }
257
258    /// Returns the current enqueuing buffer capacity.
259    ///
260    /// # Examples
261    /// ```
262    /// # use swap_buffer_queue::Queue;
263    /// # use swap_buffer_queue::buffer::VecBuffer;
264    /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
265    /// assert_eq!(queue.capacity(), 42);
266    /// ```
267    #[inline]
268    pub fn capacity(&self) -> usize {
269        // cannot use `Buffer::capacity` because of data race
270        self.capacity.load(Ordering::Relaxed)
271    }
272
273    /// Returns the current enqueuing buffer length.
274    ///
275    /// # Examples
276    /// ```
277    /// # use swap_buffer_queue::Queue;
278    /// # use swap_buffer_queue::buffer::VecBuffer;
279    /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
280    /// assert_eq!(queue.len(), 0);
281    /// queue.try_enqueue([0]).unwrap();
282    /// assert_eq!(queue.len(), 1);
283    /// ```
284    #[inline]
285    pub fn len(&self) -> usize {
286        let enqueuing =
287            EnqueuingCapacity::from_atomic(self.enqueuing_capacity.load(Ordering::Relaxed));
288        self.capacity()
289            .saturating_sub(enqueuing.remaining_capacity())
290    }
291
292    /// Returns `true` if the current enqueuing buffer is empty.
293    ///
294    /// # Examples
295    /// ```
296    /// # use swap_buffer_queue::Queue;
297    /// # use swap_buffer_queue::buffer::VecBuffer;
298    /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
299    /// assert!(queue.is_empty());
300    /// ```
301    #[inline]
302    pub fn is_empty(&self) -> bool {
303        self.len() == 0
304    }
305
306    /// Returns `true` if the queue is closed.
307    ///
308    /// # Examples
309    /// ```
310    /// # use swap_buffer_queue::Queue;
311    /// # use swap_buffer_queue::buffer::VecBuffer;
312    /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
313    /// assert!(!queue.is_closed());
314    /// queue.close();
315    /// assert!(queue.is_closed());
316    /// ```
317    #[inline]
318    pub fn is_closed(&self) -> bool {
319        EnqueuingCapacity::from_atomic(self.enqueuing_capacity.load(Ordering::Relaxed)).is_closed()
320    }
321
322    /// Reopen a closed queue.
323    ///
324    /// Calling this method when the queue is not closed has no effect.
325    ///
326    /// # Examples
327    /// ```
328    /// # use swap_buffer_queue::Queue;
329    /// # use swap_buffer_queue::buffer::VecBuffer;
330    /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
331    /// queue.close();
332    /// assert!(queue.is_closed());
333    /// queue.reopen();
334    /// assert!(!queue.is_closed());
335    /// ```
336    #[inline]
337    pub fn reopen(&self) {
338        EnqueuingCapacity::reopen(&self.enqueuing_capacity, Ordering::AcqRel);
339    }
340
341    #[inline]
342    fn lock_dequeuing(&self) -> Result<DequeuingLength, TryDequeueError> {
343        // Protect from concurrent dequeuing by swapping the dequeuing length with a constant
344        // marking dequeuing conflict.
345        DequeuingLength::try_from_atomic(
346            self.dequeuing_length
347                .swap(DEQUEUING_LOCKED, Ordering::Relaxed),
348        )
349    }
350
351    #[allow(clippy::type_complexity)]
352    const NO_RESIZE: Option<fn(&mut B) -> (bool, usize)> = None;
353
354    #[inline]
355    fn try_dequeue_internal(
356        &self,
357        dequeuing: DequeuingLength,
358        notify_enqueue: impl Fn(),
359        resize: Option<impl FnOnce(&mut B) -> (bool, usize)>,
360    ) -> Result<BufferSlice<B, N>, TryDequeueError> {
361        // If dequeuing length is greater than zero, it means than previous dequeuing is still
362        // ongoing, either because previous `try_dequeue` operation returns pending error,
363        // or because requeuing (after partial draining for example).
364        if let Some(len) = NonZeroUsize::new(dequeuing.buffer_len()) {
365            return self
366                .try_dequeue_spin(dequeuing.buffer_index(), len)
367                .ok_or(TryDequeueError::Pending);
368        }
369        let next_buffer_index = dequeuing.buffer_index();
370        let (resized, inserted_length, next_capa) =
371            self.buffers[next_buffer_index].with_mut(|next_buf| {
372                // SAFETY: Dequeuing buffer can be accessed mutably
373                let next_buffer = unsafe { &mut *next_buf };
374                // Resize buffer if needed.
375                let (resized, inserted_length) = resize.map_or((false, 0), |f| f(next_buffer));
376                (resized, inserted_length, next_buffer.capacity())
377            });
378        if inserted_length > 0 {
379            self.buffers_length[next_buffer_index].fetch_add(inserted_length, Ordering::Relaxed);
380        }
381        let mut enqueuing =
382            EnqueuingCapacity::from_atomic(self.enqueuing_capacity.load(Ordering::Acquire));
383        debug_assert_ne!(dequeuing.buffer_index(), enqueuing.buffer_index());
384        let capacity =
385            // SAFETY: Enqueuing buffer can be immutably accessed.
386            self.buffers[enqueuing.buffer_index()].with(|buf| unsafe { &*buf }.capacity());
387        // If buffer is empty and has not be resized, return an error (and store back dequeuing)
388        if enqueuing.remaining_capacity() == capacity && !resized && inserted_length == 0 {
389            self.dequeuing_length
390                .store(dequeuing.into_atomic(), Ordering::Relaxed);
391            return Err(if enqueuing.is_closed() {
392                TryDequeueError::Closed
393            } else {
394                TryDequeueError::Empty
395            });
396        }
397        // Swap buffers: previous dequeuing buffer become the enqueuing one
398        let next_enqueuing = EnqueuingCapacity::new(next_buffer_index, next_capa - inserted_length);
399        let mut backoff = 0;
400        while let Err(enq) = self.enqueuing_capacity.compare_exchange_weak(
401            enqueuing.into_atomic(),
402            next_enqueuing.with_closed(enqueuing).into_atomic(),
403            Ordering::AcqRel,
404            Ordering::Relaxed,
405        ) {
406            enqueuing = EnqueuingCapacity::from_atomic(enq);
407            // Spin in case of concurrent modifications, except when the buffer is full ofc.
408            if enqueuing.remaining_capacity() != 0 {
409                for _ in 0..1 << backoff {
410                    hint::spin_loop();
411                }
412                if backoff < BACKOFF_LIMIT {
413                    backoff += 1;
414                }
415            }
416        }
417        // Update the queue capacity if needed.
418        if self.capacity() != next_capa {
419            self.capacity.store(next_capa, Ordering::Relaxed);
420        }
421        // Notify enqueuers.
422        notify_enqueue();
423        match NonZeroUsize::new(capacity - enqueuing.remaining_capacity()) {
424            // Try to wait ongoing insertions and take ownership of the buffer, then return the
425            // buffer slice
426            Some(len) => self
427                .try_dequeue_spin(enqueuing.buffer_index(), len)
428                .ok_or(TryDequeueError::Pending),
429            // If the enqueuing buffer was empty, but values has been inserted while resizing,
430            // retry.
431            None if inserted_length > 0 => self.try_dequeue_internal(
432                DequeuingLength::new(enqueuing.buffer_index(), 0),
433                notify_enqueue,
434                Self::NO_RESIZE,
435            ),
436            // Otherwise, (empty enqueuing buffer, resized dequeuing one), acknowledge the swap and
437            // return empty error
438            None => {
439                debug_assert!(resized);
440                self.dequeuing_length.store(
441                    DequeuingLength::new(enqueuing.buffer_index(), 0).into_atomic(),
442                    Ordering::Relaxed,
443                );
444                Err(TryDequeueError::Empty)
445            }
446        }
447    }
448
449    fn try_dequeue_spin(
450        &self,
451        buffer_index: usize,
452        length: NonZeroUsize,
453    ) -> Option<BufferSlice<B, N>> {
454        for _ in 0..SPIN_LIMIT {
455            // Buffers having been swapped, no more enqueuing can happen, we still need to wait
456            // for ongoing one. They will be finished when the buffer length (updated after
457            // enqueuing) is equal to the expected one.
458            // Also, requeuing with potential draining can lead to have an expected length lower
459            // than the effective buffer length.
460            let buffer_len = self.buffers_length[buffer_index].load(Ordering::Acquire);
461            if buffer_len >= length.get() {
462                // Returns the slice (range can be shortened by draining + requeuing).
463                let range = buffer_len - length.get()..buffer_len;
464                let slice = self.buffers[buffer_index]
465                    // SAFETY: All enqueuings are done, and buffers having been swapped, this buffer
466                    // can now be accessed mutably.
467                    // SAFETY: All enqueuing are done, range has been inserted.
468                    .with_mut(|buf| unsafe { (*buf).slice(range.clone()) });
469                return Some(BufferSlice::new(self, buffer_index, range, slice));
470            }
471            hint::spin_loop();
472        }
473        // If the enqueuing are still ongoing, just save the dequeuing state in order to retry.
474        self.dequeuing_length.store(
475            DequeuingLength::new(buffer_index, length.get()).into_atomic(),
476            Ordering::Relaxed,
477        );
478        None
479    }
480
481    pub(crate) fn release(&self, buffer_index: usize, range: Range<usize>) {
482        // Clears the dequeuing buffer and its length, and release the dequeuing "lock".
483        // SAFETY: Dequeued buffer pointed by buffer index can be accessed mutably
484        // (see `Queue::try_dequeue_spin`).
485        // SAFETY: Range comes from the dequeued slice, so it has been previously inserted.
486        self.buffers[buffer_index].with_mut(|buf| unsafe { (*buf).clear(range) });
487        self.buffers_length[buffer_index].store(0, Ordering::Release);
488        self.dequeuing_length.store(
489            DequeuingLength::new(buffer_index, 0).into_atomic(),
490            Ordering::Relaxed,
491        );
492    }
493
494    #[inline]
495    pub(crate) fn get_slice(&self, buffer_index: usize, range: Range<usize>) -> B::Slice<'_> {
496        self.buffers[buffer_index]
497            // SAFETY: Dequeued buffer pointed by buffer index can be accessed mutably
498            // (see `Queue::try_dequeue_spin`).
499            // SAFETY: Range comes from the dequeued slice, so it has been previously inserted.
500            .with_mut(|buf| unsafe { (*buf).slice(range.clone()) })
501    }
502
503    #[inline]
504    pub(crate) fn requeue(&self, buffer_index: usize, range: Range<usize>) {
505        // Requeuing the buffer just means saving the dequeuing state (or release if there is
506        // nothing to requeue).
507        let length = range.end - range.start;
508        if length > 0 {
509            self.dequeuing_length.store(
510                DequeuingLength::new(buffer_index, length).into_atomic(),
511                Ordering::Relaxed,
512            );
513        } else {
514            self.release(buffer_index, range);
515        }
516    }
517}
518
519impl<B, N> Queue<B, N>
520where
521    B: Buffer,
522    N: Notify,
523{
524    /// Tries enqueuing the given value into the queue.
525    ///
526    /// Enqueuing will fail if the queue has insufficient capacity, or if it is closed. In case of
527    /// success, it will notify waiting dequeuing operations using [`Notify::notify_dequeue`].
528    ///
529    /// Enqueuing a zero-sized value is a no-op.
530    ///
531    /// # Examples
532    /// ```
533    /// # use swap_buffer_queue::Queue;
534    /// # use swap_buffer_queue::buffer::VecBuffer;
535    /// # use swap_buffer_queue::error::TryEnqueueError;
536    /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(1);
537    /// queue.try_enqueue([0]).unwrap();
538    /// // queue is full
539    /// assert_eq!(
540    ///     queue.try_enqueue([0]),
541    ///     Err(TryEnqueueError::InsufficientCapacity([0]))
542    /// );
543    /// // let's close the queue
544    /// queue.close();
545    /// assert_eq!(queue.try_enqueue([0]), Err(TryEnqueueError::Closed([0])));
546    /// ```
547    pub fn try_enqueue<T>(&self, value: T) -> Result<(), TryEnqueueError<T>>
548    where
549        T: InsertIntoBuffer<B>,
550    {
551        // Compare-and-swap loop with backoff in order to mitigate contention on the atomic field
552        let Some(value_size) = NonZeroUsize::new(value.size()) else {
553            return Ok(());
554        };
555        let mut enqueuing =
556            EnqueuingCapacity::from_atomic(self.enqueuing_capacity.load(Ordering::Acquire));
557        let mut backoff = None;
558        loop {
559            // Check if the queue is not closed and try to reserve a slice of the buffer.
560            if enqueuing.is_closed() {
561                return Err(TryEnqueueError::Closed(value));
562            }
563            let Some(next_enq) = enqueuing.try_reserve(value_size) else {
564                return Err(TryEnqueueError::InsufficientCapacity(value));
565            };
566            if let Some(ref mut backoff) = backoff {
567                for _ in 0..1 << *backoff {
568                    hint::spin_loop();
569                }
570                if *backoff < BACKOFF_LIMIT {
571                    *backoff += 1;
572                }
573            }
574            match self.enqueuing_capacity.compare_exchange_weak(
575                enqueuing.into_atomic(),
576                next_enq.into_atomic(),
577                Ordering::AcqRel,
578                Ordering::Relaxed,
579            ) {
580                Ok(_) => break,
581                Err(enq) => {
582                    enqueuing = EnqueuingCapacity::from_atomic(enq);
583                    // Spin in case of concurrent modification, except when the buffer index has
584                    // modified, which may mean conflict was due to dequeuing.
585                    backoff = (next_enq.buffer_index() == enqueuing.buffer_index())
586                        .then(|| backoff.unwrap_or(0));
587                }
588            }
589        }
590        // Insert the value into the buffer at the index given by subtracting the remaining
591        // capacity to the buffer one.
592        self.buffers[enqueuing.buffer_index()].with(|buf| {
593            // SAFETY: As long as enqueuing is ongoing, i.e. a reserved slice has not been acknowledged
594            // in the buffer length (see `BufferWithLength::insert`), buffer cannot be dequeued and can
595            // thus be accessed immutably (see `Queue::try_dequeue_spin`).
596            let buffer = unsafe { &*buf };
597            let index = buffer.capacity() - enqueuing.remaining_capacity();
598            // SAFETY: Compare-and-swap makes indexes not overlap, and the buffer is cleared before
599            // reusing it for enqueuing (see `Queue::release`).
600            unsafe { value.insert_into(buffer, index) };
601        });
602        self.buffers_length[enqueuing.buffer_index()].fetch_add(value_size.get(), Ordering::AcqRel);
603        // Notify dequeuer.
604        self.notify.notify_dequeue();
605        Ok(())
606    }
607
608    /// Tries dequeuing a buffer with all enqueued values from the queue.
609    ///
610    /// This method swaps the current buffer with the other one, which is empty. All concurrent
611    /// enqueuing must end before the the current buffer is really dequeuable, so the queue may
612    /// be in a transitory state where `try_dequeue` must be retried. In this state, after a spin
613    /// loop, this method will return a [`TryDequeueError::Pending`] error.
614    ///
615    /// Dequeuing also fails if the queue is empty, or if it is closed. Moreover, as the algorithm
616    /// is MPSC, dequeuing is protected against concurrent calls, failing with
617    /// [`TryDequeueError::Conflict`] error.
618    ///
619    /// It returns a [`BufferSlice`], which holds, as its name may indicate, a reference to the
620    /// dequeued buffer. That's why, the concurrent dequeuing protection is maintained for the
621    /// lifetime of the buffer slice.
622    ///
623    /// # Examples
624    /// ```
625    /// # use std::ops::Deref;
626    /// # use swap_buffer_queue::Queue;
627    /// # use swap_buffer_queue::buffer::VecBuffer;
628    /// # use swap_buffer_queue::error::TryDequeueError;
629    /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
630    /// queue.try_enqueue([0]).unwrap();
631    /// queue.try_enqueue([1]).unwrap();
632    /// {
633    ///     let slice = queue.try_dequeue().unwrap();
634    ///     assert_eq!(slice.deref(), &[0, 1]);
635    ///     // dequeuing cannot be done concurrently (`slice` is still in scope)
636    ///     assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Conflict);
637    /// }
638    /// // let's close the queue
639    /// queue.try_enqueue([2]).unwrap();
640    /// queue.close();
641    /// // queue can be dequeued while closed when not empty
642    /// {
643    ///     let slice = queue.try_dequeue().unwrap();
644    ///     assert_eq!(slice.deref(), &[2]);
645    /// }
646    /// assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Closed)
647    /// ```
648    pub fn try_dequeue(&self) -> Result<BufferSlice<B, N>, TryDequeueError> {
649        self.try_dequeue_internal(
650            self.lock_dequeuing()?,
651            || self.notify.notify_enqueue(),
652            Self::NO_RESIZE,
653        )
654    }
655
656    /// Closes the queue.
657    ///
658    /// Closed queue can no more accept enqueuing, but it can be dequeued while not empty.
659    /// Calling this method on a closed queue has no effect.
660    /// See [`reopen`](Queue::reopen) to reopen a closed queue.
661    /// # Examples
662    /// ```
663    /// # use std::ops::Deref;
664    /// # use swap_buffer_queue::Queue;
665    /// # use swap_buffer_queue::buffer::VecBuffer;
666    /// # use swap_buffer_queue::error::{TryDequeueError, TryEnqueueError};
667    /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
668    /// queue.try_enqueue([0]).unwrap();
669    /// queue.close();
670    /// assert!(queue.is_closed());
671    /// assert_eq!(queue.try_enqueue([1]), Err(TryEnqueueError::Closed([1])));
672    /// assert_eq!(queue.try_dequeue().unwrap().deref(), &[0]);
673    /// assert_eq!(queue.try_dequeue().unwrap_err(), TryDequeueError::Closed);
674    /// ```
675    pub fn close(&self) {
676        EnqueuingCapacity::close(&self.enqueuing_capacity, Ordering::AcqRel);
677        self.notify.notify_dequeue();
678        self.notify.notify_enqueue();
679    }
680}
681
682impl<B, N> Queue<B, N>
683where
684    B: Buffer + Resize,
685    N: Notify,
686{
687    /// Tries dequeuing a buffer with all enqueued values from the queue, and resizes the next
688    /// buffer to be used for enqueuing.
689    ///
690    /// This method is an extension of [`try_dequeue`](Queue::try_dequeue) method. In fact,
691    /// before swapping the buffers, next one is empty and protected, so it can be resized, and
692    /// it is also possible to add values in it before making it available for enqueuing.
693    /// This can be used to make the queue [unbounded](Queue#an-amortized-unbounded-recipe).
694    ///
695    /// It is worth to be noted that only one buffer is resized, so it can lead to asymmetric buffers.
696    ///
697    /// # Examples
698    /// ```
699    /// # use std::ops::Deref;
700    /// # use swap_buffer_queue::Queue;
701    /// # use swap_buffer_queue::buffer::VecBuffer;
702    /// # use swap_buffer_queue::error::TryEnqueueError;
703    /// let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(1);
704    /// queue.try_enqueue([0]).unwrap();
705    /// // queue is full
706    /// assert_eq!(
707    ///     queue.try_enqueue([1]),
708    ///     Err(TryEnqueueError::InsufficientCapacity([1]))
709    /// );
710    /// // dequeue and resize, inserting elements before the buffer is available
711    /// {
712    ///     let slice = queue
713    ///         .try_dequeue_and_resize(3, Some(|| std::iter::once([42])))
714    ///         .unwrap();
715    ///     assert_eq!(slice.deref(), &[0]);
716    /// }
717    /// // capacity has been increased
718    /// queue.try_enqueue([1]).unwrap();
719    /// queue.try_enqueue([2]).unwrap();
720    /// let slice = queue.try_dequeue().unwrap();
721    /// assert_eq!(slice.deref(), &[42, 1, 2]);
722    /// ```
723    ///
724    /// ## An amortized unbounded recipe
725    ///
726    /// ```rust
727    /// # use std::ops::Deref;
728    /// # use std::sync::Mutex;
729    /// # use swap_buffer_queue::Queue;
730    /// # use swap_buffer_queue::buffer::{BufferSlice, InsertIntoBuffer, VecBuffer};
731    /// # use swap_buffer_queue::error::{EnqueueError, TryDequeueError, TryEnqueueError};
732    /// # use swap_buffer_queue::notify::Notify;
733    /// fn enqueue_unbounded<T>(
734    ///     queue: &Queue<VecBuffer<T>>,
735    ///     overflow: &Mutex<Vec<[T; 1]>>,
736    ///     mut value: T,
737    /// ) -> Result<(), EnqueueError<[T; 1]>> {
738    ///     // first, try to enqueue normally
739    ///     match queue.try_enqueue([value]) {
740    ///         Err(TryEnqueueError::InsufficientCapacity([v])) => value = v,
741    ///         res => return res,
742    ///     };
743    ///     // if the enqueuing fails, lock the overflow
744    ///     let mut guard = overflow.lock().unwrap();
745    ///     // retry to enqueue (we never know what happened during lock acquisition)
746    ///     match queue.try_enqueue([value]) {
747    ///         Err(TryEnqueueError::InsufficientCapacity([v])) => value = v,
748    ///         res => return res,
749    ///     };
750    ///     // then push the values to the overflow vector
751    ///     guard.push([value]);
752    ///     drop(guard);
753    ///     // notify possible waiting dequeue
754    ///     queue.notify().notify_dequeue();
755    ///     Ok(())
756    /// }
757    ///
758    /// fn try_dequeue_unbounded<'a, T>(
759    ///     queue: &'a Queue<VecBuffer<T>>,
760    ///     overflow: &Mutex<Vec<[T; 1]>>,
761    /// ) -> Result<BufferSlice<'a, VecBuffer<T>, ()>, TryDequeueError> {
762    ///     // lock the overflow and use `try_dequeue_and_resize` to drain the overflow into the
763    ///     // queue
764    ///     let mut guard = overflow.lock().unwrap();
765    ///     let vec = &mut guard;
766    ///     // `{ vec }` is a trick to get the correct FnOnce inference
767    ///     // https://stackoverflow.com/questions/74814588/why-does-rust-infer-fnmut-instead-of-fnonce-for-this-closure-even-though-inferr
768    ///     queue.try_dequeue_and_resize(queue.capacity() + vec.len(), Some(|| { vec }.drain(..)))
769    /// }
770    ///
771    /// // queue is initialized with zero capacity
772    /// let queue: Queue<VecBuffer<usize>> = Queue::new();
773    /// let overflow = Mutex::new(Vec::new());
774    /// assert_eq!(queue.capacity(), 0);
775    /// enqueue_unbounded(&queue, &overflow, 0).unwrap();
776    /// assert_eq!(
777    ///     try_dequeue_unbounded(&queue, &overflow).unwrap().deref(),
778    ///     &[0]
779    /// );
780    /// enqueue_unbounded(&queue, &overflow, 1).unwrap();
781    /// enqueue_unbounded(&queue, &overflow, 2).unwrap();
782    /// assert_eq!(
783    ///     try_dequeue_unbounded(&queue, &overflow).unwrap().deref(),
784    ///     &[1, 2]
785    /// );
786    /// ```
787    pub fn try_dequeue_and_resize<I>(
788        &self,
789        capacity: impl Into<Option<usize>>,
790        insert: Option<impl FnOnce() -> I>,
791    ) -> Result<BufferSlice<B, N>, TryDequeueError>
792    where
793        I: IntoIterator,
794        I::Item: InsertIntoBuffer<B>,
795    {
796        self.try_dequeue_internal(
797            self.lock_dequeuing()?,
798            || self.notify.notify_enqueue(),
799            Some(move |buffer_mut: &mut B| {
800                let resized_capa = capacity
801                    .into()
802                    .filter(|capa| *capa != buffer_mut.capacity());
803                if let Some(capa) = resized_capa {
804                    EnqueuingCapacity::check_overflow(capa);
805                    buffer_mut.resize(capa);
806                }
807                let mut length = 0;
808                if let Some(insert) = insert {
809                    for value in insert() {
810                        let Some(value_size) = NonZeroUsize::new(value.size()) else {
811                            continue;
812                        };
813                        if value_size.get() > buffer_mut.capacity() {
814                            break;
815                        }
816                        // SAFETY: Ranges `length..length+value.size()` will obviously not overlap,
817                        // and the buffer is cleared before reusing it for enqueuing
818                        // (see `Queue::release`)
819                        unsafe { value.insert_into(buffer_mut, length) };
820                        length += value_size.get();
821                    }
822                }
823                (resized_capa.is_some(), length)
824            }),
825        )
826    }
827}
828
829impl<B, N> Queue<B, N>
830where
831    B: Buffer + Drain,
832{
833    pub(crate) fn remove(&self, buffer_index: usize, index: usize) -> B::Value {
834        debug_assert_eq!(
835            self.dequeuing_length.load(Ordering::Relaxed),
836            DEQUEUING_LOCKED
837        );
838        // SAFETY: Dequeued buffer pointed by buffer index can be accessed mutably
839        // (see `Queue::try_dequeue_spin`).
840        // SAFETY: Index comes from an iterator on the dequeued slice, so it has
841        // been previously inserted, and can be removed.
842        self.buffers[buffer_index].with_mut(|buf| unsafe { (*buf).remove(index) })
843    }
844}
845
846impl<B, N> Default for Queue<B, N>
847where
848    B: Buffer,
849    N: Default,
850{
851    fn default() -> Self {
852        Self::new()
853    }
854}
855
856impl<B, N> fmt::Debug for Queue<B, N>
857where
858    B: Buffer,
859    N: fmt::Debug,
860{
861    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
862        f.debug_struct("Queue")
863            .field("capacity", &self.capacity())
864            .field("len", &self.len())
865            .field("notify", &self.notify)
866            .finish()
867    }
868}
869
870impl<B, N> Drop for Queue<B, N>
871where
872    B: Buffer,
873{
874    fn drop(&mut self) {
875        self.lock_dequeuing()
876            .and_then(|deq| self.try_dequeue_internal(deq, || (), Self::NO_RESIZE))
877            .ok();
878    }
879}