rtrb_basedrop/
lib.rs

1//! This is a fork of the [`rtrb`] crate that uses [`basedrop`]'s `Shared` pointer in
2//! place of `Arc`. This ensures that when all references to the ring buffer are
3//! dropped, the underlying `Vec` will never potentially get deallocated (a non-realtime
4//! safe operation) in the realtime thread. Instead, all allocations are cleaned up in
5//! whatever thread owns the basedrop `Collector` object.
6//! 
7//! This is especially useful for audio applications.
8//! 
9//! [`rtrb`]: https://github.com/mgeier/rtrb
10//! [`basedrop`]: https://github.com/glowcoil/basedrop
11//! 
12//! -------------------------------------------------------------
13//! 
14//! A realtime-safe single-producer single-consumer (SPSC) ring buffer.
15//!
16//! A [`RingBuffer`] consists of two parts:
17//! a [`Producer`] for writing into the ring buffer and
18//! a [`Consumer`] for reading from the ring buffer.
19//!
20//! A fixed-capacity buffer is allocated on construction.
21//! After that, no more memory is allocated (unless the type `T` does that internally).
22//! Reading from and writing into the ring buffer is *lock-free* and *wait-free*.
23//! All reading and writing functions return immediately.
24//! Attempts to write to a full buffer return an error;
25//! values inside the buffer are *not* overwritten.
26//! Attempts to read from an empty buffer return an error as well.
27//! Only a single thread can write into the ring buffer and a single thread
28//! (typically a different one) can read from the ring buffer.
29//! If the queue is empty, there is no way for the reading thread to wait
30//! for new data, other than trying repeatedly until reading succeeds.
31//! Similarly, if the queue is full, there is no way for the writing thread
32//! to wait for newly available space to write to, other than trying repeatedly.
33//!
34//! # Examples
35//!
36//! Moving single elements into and out of a queue with
37//! [`Producer::push()`] and [`Consumer::pop()`], respectively:
38//!
39//! ```
40//! use rtrb_basedrop::{RingBuffer, PushError, PopError};
41//! use basedrop::Collector;
42//!
43//! let collector = Collector::new();
44//!
45//! let (mut producer, mut consumer) = RingBuffer::new(2, &collector.handle());
46//!
47//! assert_eq!(producer.push(10), Ok(()));
48//! assert_eq!(producer.push(20), Ok(()));
49//! assert_eq!(producer.push(30), Err(PushError::Full(30)));
50//!
51//! std::thread::spawn(move || {
52//!     assert_eq!(consumer.pop(), Ok(10));
53//!     assert_eq!(consumer.pop(), Ok(20));
54//!     assert_eq!(consumer.pop(), Err(PopError::Empty));
55//! }).join().unwrap();
56//! ```
57//!
58//! See the documentation of the [`chunks#examples`] module
59//! for examples that write multiple items at once with
60//! [`Producer::write_chunk_uninit()`] and [`Producer::write_chunk()`]
61//! and read multiple items with [`Consumer::read_chunk()`].
62
63#![cfg_attr(not(feature = "std"), no_std)]
64#![warn(rust_2018_idioms)]
65#![deny(missing_docs, missing_debug_implementations)]
66
67extern crate alloc;
68
69use alloc::vec::Vec;
70use basedrop::{Handle, Shared};
71use core::cell::Cell;
72use core::fmt;
73use core::marker::PhantomData;
74use core::mem::{ManuallyDrop, MaybeUninit};
75use core::sync::atomic::{AtomicUsize, Ordering};
76use std::fmt::Debug;
77
78use cache_padded::CachePadded;
79
80pub mod chunks;
81
82// This is used in the documentation.
83#[allow(unused_imports)]
84use chunks::WriteChunkUninit;
85
86/// A bounded single-producer single-consumer (SPSC) queue.
87///
88/// Elements can be written with a [`Producer`] and read with a [`Consumer`],
89/// both of which can be obtained with [`RingBuffer::new()`].
90///
91/// *See also the [crate-level documentation](crate).*
92pub struct RingBuffer<T: Send + 'static> {
93    /// The head of the queue.
94    ///
95    /// This integer is in range `0 .. 2 * capacity`.
96    head: CachePadded<AtomicUsize>,
97
98    /// The tail of the queue.
99    ///
100    /// This integer is in range `0 .. 2 * capacity`.
101    tail: CachePadded<AtomicUsize>,
102
103    /// The buffer holding slots.
104    data_ptr: *mut T,
105
106    /// The queue capacity.
107    capacity: usize,
108
109    /// Indicates that dropping a `RingBuffer<T>` may drop elements of type `T`.
110    _marker: PhantomData<T>,
111}
112
113impl<T: Send + 'static> Debug for RingBuffer<T> {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        let mut f = f.debug_struct("RingBuffer");
116        f.field("head", &self.head);
117        f.field("tail", &self.tail);
118        f.field("capacity", &self.capacity);
119        f.finish()
120    }
121}
122
123unsafe impl<T: Send + 'static> Send for RingBuffer<T> {}
124
125impl<T: Send + 'static> RingBuffer<T> {
126    /// Creates a `RingBuffer` with the given `capacity` and returns [`Producer`] and [`Consumer`].
127    ///
128    /// # Examples
129    ///
130    /// ```
131    /// use rtrb_basedrop::RingBuffer;
132    /// use basedrop::Collector;
133    ///
134    /// let collector = Collector::new();
135    ///
136    /// let (producer, consumer) = RingBuffer::<f32>::new(100, &collector.handle());
137    /// ```
138    ///
139    /// Specifying an explicit type with the [turbofish](https://turbo.fish/)
140    /// is is only necessary if it cannot be deduced by the compiler.
141    ///
142    /// ```
143    /// use rtrb_basedrop::RingBuffer;
144    /// use basedrop::Collector;
145    ///
146    /// let collector = Collector::new();
147    ///
148    /// let (mut producer, consumer) = RingBuffer::new(100, &collector.handle());
149    /// assert_eq!(producer.push(0.0f32), Ok(()));
150    /// ```
151    #[allow(clippy::new_ret_no_self)]
152    #[must_use]
153    pub fn new(capacity: usize, handle: &Handle) -> (Producer<T>, Consumer<T>) {
154        let buffer = Shared::new(
155            handle,
156            RingBuffer {
157                head: CachePadded::new(AtomicUsize::new(0)),
158                tail: CachePadded::new(AtomicUsize::new(0)),
159                data_ptr: ManuallyDrop::new(Vec::with_capacity(capacity)).as_mut_ptr(),
160                capacity,
161                _marker: PhantomData,
162            },
163        );
164        let p = Producer {
165            buffer: buffer.clone(),
166            cached_head: Cell::new(0),
167            cached_tail: Cell::new(0),
168        };
169        let c = Consumer {
170            buffer,
171            cached_head: Cell::new(0),
172            cached_tail: Cell::new(0),
173        };
174        (p, c)
175    }
176
177    /// Returns the capacity of the queue.
178    ///
179    /// # Examples
180    ///
181    /// ```
182    /// use rtrb_basedrop::RingBuffer;
183    /// use basedrop::Collector;
184    ///
185    /// let collector = Collector::new();
186    ///
187    /// let (producer, consumer) = RingBuffer::<f32>::new(100, &collector.handle());
188    /// assert_eq!(producer.buffer().capacity(), 100);
189    /// assert_eq!(consumer.buffer().capacity(), 100);
190    /// // Both producer and consumer of course refer to the same ring buffer:
191    /// assert_eq!(producer.buffer(), consumer.buffer());
192    /// ```
193    pub fn capacity(&self) -> usize {
194        self.capacity
195    }
196
197    /// Wraps a position from the range `0 .. 2 * capacity` to `0 .. capacity`.
198    fn collapse_position(&self, pos: usize) -> usize {
199        debug_assert!(pos == 0 || pos < 2 * self.capacity);
200        if pos < self.capacity {
201            pos
202        } else {
203            pos - self.capacity
204        }
205    }
206
207    /// Returns a pointer to the slot at position `pos`.
208    ///
209    /// If `pos == 0 && capacity == 0`, the returned pointer must not be dereferenced!
210    unsafe fn slot_ptr(&self, pos: usize) -> *mut T {
211        debug_assert!(pos == 0 || pos < 2 * self.capacity);
212        self.data_ptr.add(self.collapse_position(pos))
213    }
214
215    /// Increments a position by going `n` slots forward.
216    fn increment(&self, pos: usize, n: usize) -> usize {
217        debug_assert!(pos == 0 || pos < 2 * self.capacity);
218        debug_assert!(n <= self.capacity);
219        let threshold = 2 * self.capacity - n;
220        if pos < threshold {
221            pos + n
222        } else {
223            pos - threshold
224        }
225    }
226
227    /// Increments a position by going one slot forward.
228    ///
229    /// This is more efficient than self.increment(..., 1).
230    fn increment1(&self, pos: usize) -> usize {
231        debug_assert_ne!(self.capacity, 0);
232        debug_assert!(pos < 2 * self.capacity);
233        if pos < 2 * self.capacity - 1 {
234            pos + 1
235        } else {
236            0
237        }
238    }
239
240    /// Returns the distance between two positions.
241    fn distance(&self, a: usize, b: usize) -> usize {
242        debug_assert!(a == 0 || a < 2 * self.capacity);
243        debug_assert!(b == 0 || b < 2 * self.capacity);
244        if a <= b {
245            b - a
246        } else {
247            2 * self.capacity - a + b
248        }
249    }
250}
251
252impl<T: Send + 'static> Drop for RingBuffer<T> {
253    /// Drops all non-empty slots.
254    fn drop(&mut self) {
255        let mut head = self.head.load(Ordering::Relaxed);
256        let tail = self.tail.load(Ordering::Relaxed);
257
258        // Loop over all slots that hold a value and drop them.
259        while head != tail {
260            unsafe {
261                self.slot_ptr(head).drop_in_place();
262            }
263            head = self.increment1(head);
264        }
265
266        // Finally, deallocate the buffer, but don't run any destructors.
267        unsafe {
268            Vec::from_raw_parts(self.data_ptr, 0, self.capacity);
269        }
270    }
271}
272
273impl<T: Send + 'static> PartialEq for RingBuffer<T> {
274    /// This method tests for `self` and `other` values to be equal, and is used by `==`.
275    ///
276    /// # Examples
277    ///
278    /// ```
279    /// use rtrb_basedrop::RingBuffer;
280    /// use basedrop::Collector;
281    ///
282    /// let collector = Collector::new();
283    ///
284    /// let (p1, c1) = RingBuffer::<f32>::new(1000, &collector.handle());
285    /// assert_eq!(p1.buffer(), c1.buffer());
286    ///
287    /// let (p2, c2) = RingBuffer::<f32>::new(1000, &collector.handle());
288    /// assert_ne!(p1.buffer(), p2.buffer());
289    /// ```
290    fn eq(&self, other: &Self) -> bool {
291        core::ptr::eq(self, other)
292    }
293}
294
295impl<T: Send + 'static> Eq for RingBuffer<T> {}
296
297/// The producer side of a [`RingBuffer`].
298///
299/// Can be moved between threads,
300/// but references from different threads are not allowed
301/// (i.e. it is [`Send`] but not [`Sync`]).
302///
303/// Can only be created with [`RingBuffer::new()`]
304/// (together with its counterpart, the [`Consumer`]).
305///
306/// Individual elements can be moved into the ring buffer with [`Producer::push()`],
307/// multiple elements at once can be written with [`Producer::write_chunk()`]
308/// and [`Producer::write_chunk_uninit()`].
309///
310/// The number of free slots currently available for writing can be obtained with
311/// [`Producer::slots()`].
312///
313/// When the `Producer` is dropped, [`Consumer::is_abandoned()`] will return `true`.
314/// This can be used as a crude way to communicate to the receiving thread
315/// that no more data will be produced.
316/// When the `Producer` is dropped after the [`Consumer`] has already been dropped,
317/// [`RingBuffer::drop()`] will be called, freeing the allocated memory.
318pub struct Producer<T: Send + 'static> {
319    /// A reference to the ring buffer.
320    buffer: Shared<RingBuffer<T>>,
321
322    /// A copy of `buffer.head` for quick access.
323    ///
324    /// This value can be stale and sometimes needs to be resynchronized with `buffer.head`.
325    cached_head: Cell<usize>,
326
327    /// A copy of `buffer.tail` for quick access.
328    ///
329    /// This value is always in sync with `buffer.tail`.
330    cached_tail: Cell<usize>,
331}
332
333impl<T: Send + 'static> Debug for Producer<T> {
334    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335        let mut f = f.debug_struct("Producer");
336        f.field("buffer", &*self.buffer);
337        f.field("cached_head", &self.cached_head);
338        f.field("cached_tail", &self.cached_tail);
339        f.finish()
340    }
341}
342
343unsafe impl<T: Send + 'static> Send for Producer<T> {}
344
345impl<T: Send + 'static> Producer<T> {
346    /// Attempts to push an element into the queue.
347    ///
348    /// The element is *moved* into the ring buffer and its slot
349    /// is made available to be read by the [`Consumer`].
350    ///
351    /// # Errors
352    ///
353    /// If the queue is full, the element is returned back as an error.
354    ///
355    /// # Examples
356    ///
357    /// ```
358    /// use rtrb_basedrop::{RingBuffer, PushError};
359    /// use basedrop::Collector;
360    ///
361    /// let collector = Collector::new();
362    ///
363    /// let (mut p, c) = RingBuffer::new(1, &collector.handle());
364    ///
365    /// assert_eq!(p.push(10), Ok(()));
366    /// assert_eq!(p.push(20), Err(PushError::Full(20)));
367    /// ```
368    pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
369        if let Some(tail) = self.next_tail() {
370            unsafe {
371                self.buffer.slot_ptr(tail).write(value);
372            }
373            let tail = self.buffer.increment1(tail);
374            self.buffer.tail.store(tail, Ordering::Release);
375            self.cached_tail.set(tail);
376            Ok(())
377        } else {
378            Err(PushError::Full(value))
379        }
380    }
381
382    /// Returns the number of slots available for writing.
383    ///
384    /// Since items can be concurrently consumed on another thread, the actual number
385    /// of available slots may increase at any time (up to the [`RingBuffer::capacity()`]).
386    ///
387    /// To check for a single available slot,
388    /// using [`Producer::is_full()`] is often quicker
389    /// (because it might not have to check an atomic variable).
390    ///
391    /// # Examples
392    ///
393    /// ```
394    /// use rtrb_basedrop::RingBuffer;
395    /// use basedrop::Collector;
396    ///
397    /// let collector = Collector::new();
398    ///
399    /// let (p, c) = RingBuffer::<f32>::new(1024, &collector.handle());
400    ///
401    /// assert_eq!(p.slots(), 1024);
402    /// ```
403    pub fn slots(&self) -> usize {
404        let head = self.buffer.head.load(Ordering::Acquire);
405        self.cached_head.set(head);
406        self.buffer.capacity - self.buffer.distance(head, self.cached_tail.get())
407    }
408
409    /// Returns `true` if there are currently no slots available for writing.
410    ///
411    /// A full ring buffer might cease to be full at any time
412    /// if the corresponding [`Consumer`] is consuming items in another thread.
413    ///
414    /// # Examples
415    ///
416    /// ```
417    /// use rtrb_basedrop::RingBuffer;
418    /// use basedrop::Collector;
419    ///
420    /// let collector = Collector::new();
421    ///
422    /// let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
423    ///
424    /// assert!(!p.is_full());
425    /// ```
426    ///
427    /// Since items can be concurrently consumed on another thread, the ring buffer
428    /// might not be full for long:
429    ///
430    /// ```
431    /// # use rtrb_basedrop::RingBuffer;
432    /// # use basedrop::Collector;
433    /// # let collector = Collector::new();
434    /// # let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
435    /// if p.is_full() {
436    ///     // The buffer might be full, but it might as well not be
437    ///     // if an item was just consumed on another thread.
438    /// }
439    /// ```
440    ///
441    /// However, if it's not full, another thread cannot change that:
442    ///
443    /// ```
444    /// # use rtrb_basedrop::RingBuffer;
445    /// # use basedrop::Collector;
446    /// # let collector = Collector::new();
447    /// # let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
448    /// if !p.is_full() {
449    ///     // At least one slot is guaranteed to be available for writing.
450    /// }
451    /// ```
452    pub fn is_full(&self) -> bool {
453        self.next_tail().is_none()
454    }
455
456    /*
457    /// Returns `true` if the corresponding [`Consumer`] has been destroyed.
458    ///
459    /// # Examples
460    ///
461    /// ```
462    /// use rtrb_basedrop::RingBuffer;
463    /// use basedrop::Collector;
464    ///
465    /// let collector = Collector::new();
466    ///
467    /// let (mut p, c) = RingBuffer::new(7, &collector.handle());
468    /// assert!(!p.is_abandoned());
469    /// assert_eq!(p.push(10), Ok(()));
470    /// drop(c);
471    /// // The items that are still in the ring buffer are not accessible anymore.
472    /// assert!(p.is_abandoned());
473    /// // Even though it's futile, items can still be written:
474    /// assert_eq!(p.push(11), Ok(()));
475    /// ```
476    ///
477    /// Since the consumer can be concurrently dropped on another thread,
478    /// the producer might become abandoned at any time:
479    ///
480    /// ```
481    /// # use rtrb_basedrop::RingBuffer;
482    /// # let (p, c) = RingBuffer::<i32>::new(1);
483    /// if !p.is_abandoned() {
484    ///     // Right now, the consumer might still be alive, but it might as well not be
485    ///     // if another thread has just dropped it.
486    /// }
487    /// ```
488    ///
489    /// However, if it already is abandoned, it will stay that way:
490    ///
491    /// ```
492    /// # use rtrb_basedrop::RingBuffer;
493    /// # let (p, c) = RingBuffer::<i32>::new(1);
494    /// if p.is_abandoned() {
495    ///     // The consumer does definitely not exist anymore.
496    /// }
497    /// ```
498    pub fn is_abandoned(&self) -> bool {
499        Shared::strong_count(&self.buffer) < 2
500    }
501    */
502
503    /// Returns a read-only reference to the ring buffer.
504    pub fn buffer(&self) -> &RingBuffer<T> {
505        &self.buffer
506    }
507
508    /// Get the tail position for writing the next slot, if available.
509    ///
510    /// This is a strict subset of the functionality implemented in `write_chunk_uninit()`.
511    /// For performance, this special case is immplemented separately.
512    fn next_tail(&self) -> Option<usize> {
513        let tail = self.cached_tail.get();
514
515        // Check if the queue is *possibly* full.
516        if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity {
517            // Refresh the head ...
518            let head = self.buffer.head.load(Ordering::Acquire);
519            self.cached_head.set(head);
520
521            // ... and check if it's *really* full.
522            if self.buffer.distance(head, tail) == self.buffer.capacity {
523                return None;
524            }
525        }
526        Some(tail)
527    }
528}
529
530/// The consumer side of a [`RingBuffer`].
531///
532/// Can be moved between threads,
533/// but references from different threads are not allowed
534/// (i.e. it is [`Send`] but not [`Sync`]).
535///
536/// Can only be created with [`RingBuffer::new()`]
537/// (together with its counterpart, the [`Producer`]).
538///
539/// Individual elements can be moved out of the ring buffer with [`Consumer::pop()`],
540/// multiple elements at once can be read with [`Consumer::read_chunk()`].
541///
542/// The number of slots currently available for reading can be obtained with
543/// [`Consumer::slots()`].
544///
545/// When the `Consumer` is dropped, [`Producer::is_abandoned()`] will return `true`.
546/// This can be used as a crude way to communicate to the sending thread
547/// that no more data will be consumed.
548/// When the `Consumer` is dropped after the [`Producer`] has already been dropped,
549/// [`RingBuffer::drop()`] will be called, freeing the allocated memory.
550pub struct Consumer<T: Send + 'static> {
551    /// A reference to the ring buffer.
552    buffer: Shared<RingBuffer<T>>,
553
554    /// A copy of `buffer.head` for quick access.
555    ///
556    /// This value is always in sync with `buffer.head`.
557    cached_head: Cell<usize>,
558
559    /// A copy of `buffer.tail` for quick access.
560    ///
561    /// This value can be stale and sometimes needs to be resynchronized with `buffer.tail`.
562    cached_tail: Cell<usize>,
563}
564
565impl<T: Send + 'static> Debug for Consumer<T> {
566    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
567        let mut f = f.debug_struct("Consumer");
568        f.field("buffer", &*self.buffer);
569        f.field("cached_head", &self.cached_head);
570        f.field("cached_tail", &self.cached_tail);
571        f.finish()
572    }
573}
574
575unsafe impl<T: Send + 'static> Send for Consumer<T> {}
576
577impl<T: Send + 'static> Consumer<T> {
578    /// Attempts to pop an element from the queue.
579    ///
580    /// The element is *moved* out of the ring buffer and its slot
581    /// is made available to be filled by the [`Producer`] again.
582    ///
583    /// # Errors
584    ///
585    /// If the queue is empty, an error is returned.
586    ///
587    /// # Examples
588    ///
589    /// ```
590    /// use rtrb_basedrop::{PopError, RingBuffer};
591    /// use basedrop::Collector;
592    ///
593    /// let collector = Collector::new();
594    ///
595    /// let (mut p, mut c) = RingBuffer::new(1, &collector.handle());
596    ///
597    /// assert_eq!(p.push(10), Ok(()));
598    /// assert_eq!(c.pop(), Ok(10));
599    /// assert_eq!(c.pop(), Err(PopError::Empty));
600    /// ```
601    ///
602    /// To obtain an [`Option<T>`](Option), use [`.ok()`](Result::ok) on the result.
603    ///
604    /// ```
605    /// # use rtrb_basedrop::RingBuffer;
606    /// # use basedrop::Collector;
607    /// # let collector = Collector::new();
608    /// # let (mut p, mut c) = RingBuffer::new(1, &collector.handle());
609    /// assert_eq!(p.push(20), Ok(()));
610    /// assert_eq!(c.pop().ok(), Some(20));
611    /// ```
612    pub fn pop(&mut self) -> Result<T, PopError> {
613        if let Some(head) = self.next_head() {
614            let value = unsafe { self.buffer.slot_ptr(head).read() };
615            let head = self.buffer.increment1(head);
616            self.buffer.head.store(head, Ordering::Release);
617            self.cached_head.set(head);
618            Ok(value)
619        } else {
620            Err(PopError::Empty)
621        }
622    }
623
624    /// Attempts to read an element from the queue without removing it.
625    ///
626    /// # Errors
627    ///
628    /// If the queue is empty, an error is returned.
629    ///
630    /// # Examples
631    ///
632    /// ```
633    /// use rtrb_basedrop::{PeekError, RingBuffer};
634    /// use basedrop::Collector;
635    ///
636    /// let collector = Collector::new();
637    ///
638    /// let (mut p, c) = RingBuffer::new(1, &collector.handle());
639    ///
640    /// assert_eq!(c.peek(), Err(PeekError::Empty));
641    /// assert_eq!(p.push(10), Ok(()));
642    /// assert_eq!(c.peek(), Ok(&10));
643    /// assert_eq!(c.peek(), Ok(&10));
644    /// ```
645    pub fn peek(&self) -> Result<&T, PeekError> {
646        if let Some(head) = self.next_head() {
647            Ok(unsafe { &*self.buffer.slot_ptr(head) })
648        } else {
649            Err(PeekError::Empty)
650        }
651    }
652
653    /// Returns the number of slots available for reading.
654    ///
655    /// Since items can be concurrently produced on another thread, the actual number
656    /// of available slots may increase at any time (up to the [`RingBuffer::capacity()`]).
657    ///
658    /// To check for a single available slot,
659    /// using [`Consumer::is_empty()`] is often quicker
660    /// (because it might not have to check an atomic variable).
661    ///
662    /// # Examples
663    ///
664    /// ```
665    /// use rtrb_basedrop::RingBuffer;
666    /// use basedrop::Collector;
667    ///
668    /// let collector = Collector::new();
669    ///
670    /// let (p, c) = RingBuffer::<f32>::new(1024, &collector.handle());
671    ///
672    /// assert_eq!(c.slots(), 0);
673    /// ```
674    pub fn slots(&self) -> usize {
675        let tail = self.buffer.tail.load(Ordering::Acquire);
676        self.cached_tail.set(tail);
677        self.buffer.distance(self.cached_head.get(), tail)
678    }
679
680    /// Returns `true` if there are currently no slots available for reading.
681    ///
682    /// An empty ring buffer might cease to be empty at any time
683    /// if the corresponding [`Producer`] is producing items in another thread.
684    ///
685    /// # Examples
686    ///
687    /// ```
688    /// use rtrb_basedrop::RingBuffer;
689    /// use basedrop::Collector;
690    ///
691    /// let collector = Collector::new();
692    ///
693    /// let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
694    ///
695    /// assert!(c.is_empty());
696    /// ```
697    ///
698    /// Since items can be concurrently produced on another thread, the ring buffer
699    /// might not be empty for long:
700    ///
701    /// ```
702    /// # use rtrb_basedrop::RingBuffer;
703    /// # use basedrop::Collector;
704    /// # let collector = Collector::new();
705    /// # let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
706    /// if c.is_empty() {
707    ///     // The buffer might be empty, but it might as well not be
708    ///     // if an item was just produced on another thread.
709    /// }
710    /// ```
711    ///
712    /// However, if it's not empty, another thread cannot change that:
713    ///
714    /// ```
715    /// # use rtrb_basedrop::RingBuffer;
716    /// # use basedrop::Collector;
717    /// # let collector = Collector::new();
718    /// # let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
719    /// if !c.is_empty() {
720    ///     // At least one slot is guaranteed to be available for reading.
721    /// }
722    /// ```
723    pub fn is_empty(&self) -> bool {
724        self.next_head().is_none()
725    }
726
727    /*
728    /// Returns `true` if the corresponding [`Producer`] has been destroyed.
729    ///
730    /// # Examples
731    ///
732    /// ```
733    /// use rtrb_basedrop::RingBuffer;
734    /// use basedrop::Collector;
735    ///
736    /// let collector = Collector::new();
737    ///
738    /// let (mut p, mut c) = RingBuffer::new(7, &collector.handle());
739    /// assert!(!c.is_abandoned());
740    /// assert_eq!(p.push(10), Ok(()));
741    /// drop(p);
742    /// assert!(c.is_abandoned());
743    /// // The items that are left in the ring buffer can still be consumed:
744    /// assert_eq!(c.pop(), Ok(10));
745    /// ```
746    ///
747    /// Since the producer can be concurrently dropped on another thread,
748    /// the consumer might become abandoned at any time:
749    ///
750    /// ```
751    /// # use rtrb_basedrop::RingBuffer;
752    /// # use basedrop::Collector;
753    /// # let collector = Collector::new();
754    /// # let (p, c) = RingBuffer::<i32>::new(1, &collector.handle());
755    /// if !c.is_abandoned() {
756    ///     // Right now, the producer might still be alive, but it might as well not be
757    ///     // if another thread has just dropped it.
758    /// }
759    /// ```
760    ///
761    /// However, if it already is abandoned, it will stay that way:
762    ///
763    /// ```
764    /// # use rtrb_basedrop::RingBuffer;
765    /// # use basedrop::Collector;
766    /// # let collector = Collector::new();
767    /// # let (p, c) = RingBuffer::<i32>::new(1, &collector.handle());
768    /// if c.is_abandoned() {
769    ///     // The producer does definitely not exist anymore.
770    /// }
771    /// ```
772    pub fn is_abandoned(&self) -> bool {
773        Shared::strong_count(&self.buffer) < 2
774    }
775    */
776
777    /// Returns a read-only reference to the ring buffer.
778    pub fn buffer(&self) -> &RingBuffer<T> {
779        &self.buffer
780    }
781
782    /// Get the head position for reading the next slot, if available.
783    ///
784    /// This is a strict subset of the functionality implemented in `read_chunk()`.
785    /// For performance, this special case is immplemented separately.
786    fn next_head(&self) -> Option<usize> {
787        let head = self.cached_head.get();
788
789        // Check if the queue is *possibly* empty.
790        if head == self.cached_tail.get() {
791            // Refresh the tail ...
792            let tail = self.buffer.tail.load(Ordering::Acquire);
793            self.cached_tail.set(tail);
794
795            // ... and check if it's *really* empty.
796            if head == tail {
797                return None;
798            }
799        }
800        Some(head)
801    }
802}
803
804/// Extension trait used to provide a [`copy_to_uninit()`](CopyToUninit::copy_to_uninit)
805/// method on built-in slices.
806///
807/// This can be used to safely copy data to the slices returned from
808/// [`WriteChunkUninit::as_mut_slices()`].
809///
810/// To use this, the trait has to be brought into scope, e.g. with:
811///
812/// ```
813/// use rtrb_basedrop::CopyToUninit;
814/// ```
815pub trait CopyToUninit<T: Copy + Send + 'static> {
816    /// Copies contents to a possibly uninitialized slice.
817    fn copy_to_uninit(&self, dst: &mut [MaybeUninit<T>]) -> &mut [T];
818}
819
820impl<T: Copy + Send + 'static> CopyToUninit<T> for [T] {
821    /// Copies contents to a possibly uninitialized slice.
822    ///
823    /// # Panics
824    ///
825    /// This function will panic if the two slices have different lengths.
826    fn copy_to_uninit(&self, dst: &mut [MaybeUninit<T>]) -> &mut [T] {
827        assert_eq!(
828            self.len(),
829            dst.len(),
830            "source slice length does not match destination slice length"
831        );
832        let dst_ptr = dst.as_mut_ptr() as *mut _;
833        unsafe {
834            self.as_ptr().copy_to_nonoverlapping(dst_ptr, self.len());
835            core::slice::from_raw_parts_mut(dst_ptr, self.len())
836        }
837    }
838}
839
840/// Error type for [`Consumer::pop()`].
841#[derive(Debug, Copy, Clone, PartialEq, Eq)]
842pub enum PopError {
843    /// The queue was empty.
844    Empty,
845}
846
847#[cfg(feature = "std")]
848impl std::error::Error for PopError {}
849
850impl fmt::Display for PopError {
851    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
852        match self {
853            PopError::Empty => write!(f, "empty ring buffer"),
854        }
855    }
856}
857
858/// Error type for [`Consumer::peek()`].
859#[derive(Debug, Copy, Clone, PartialEq, Eq)]
860pub enum PeekError {
861    /// The queue was empty.
862    Empty,
863}
864
865#[cfg(feature = "std")]
866impl std::error::Error for PeekError {}
867
868impl fmt::Display for PeekError {
869    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
870        match self {
871            PeekError::Empty => write!(f, "empty ring buffer"),
872        }
873    }
874}
875
876/// Error type for [`Producer::push()`].
877#[derive(Copy, Clone, PartialEq, Eq)]
878pub enum PushError<T> {
879    /// The queue was full.
880    Full(T),
881}
882
883#[cfg(feature = "std")]
884impl<T> std::error::Error for PushError<T> {}
885
886impl<T> fmt::Debug for PushError<T> {
887    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
888        match self {
889            PushError::Full(_) => f.pad("Full(_)"),
890        }
891    }
892}
893
894impl<T> fmt::Display for PushError<T> {
895    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
896        match self {
897            PushError::Full(_) => write!(f, "full ring buffer"),
898        }
899    }
900}