rtrb/
lib.rs

1//! A realtime-safe single-producer single-consumer (SPSC) ring buffer.
2//!
3//! A [`RingBuffer`] consists of two parts:
4//! a [`Producer`] for writing into the ring buffer and
5//! a [`Consumer`] for reading from the ring buffer.
6//!
7//! A fixed-capacity buffer is allocated on construction.
8//! After that, no more memory is allocated (unless the type `T` does that internally).
9//! Reading from and writing into the ring buffer is *lock-free* and *wait-free*.
10//! All reading and writing functions return immediately.
11//! Attempts to write to a full buffer return an error;
12//! values inside the buffer are *not* overwritten.
13//! Attempts to read from an empty buffer return an error as well.
14//! Only a single thread can write into the ring buffer and a single thread
15//! (typically a different one) can read from the ring buffer.
16//! If the queue is empty, there is no way for the reading thread to wait
17//! for new data, other than trying repeatedly until reading succeeds.
18//! Similarly, if the queue is full, there is no way for the writing thread
19//! to wait for newly available space to write to, other than trying repeatedly.
20//!
21//! # Examples
22//!
23//! Moving single elements into and out of a queue with
24//! [`Producer::push()`] and [`Consumer::pop()`], respectively:
25//!
26//! ```
27//! use rtrb::{RingBuffer, PushError, PopError};
28//!
29//! let (mut producer, mut consumer) = RingBuffer::new(2);
30//!
31//! assert_eq!(producer.push(10), Ok(()));
32//! assert_eq!(producer.push(20), Ok(()));
33//! assert_eq!(producer.push(30), Err(PushError::Full(30)));
34//!
35//! std::thread::spawn(move || {
36//!     assert_eq!(consumer.pop(), Ok(10));
37//!     assert_eq!(consumer.pop(), Ok(20));
38//!     assert_eq!(consumer.pop(), Err(PopError::Empty));
39//! }).join().unwrap();
40//! ```
41//!
42//! See the documentation of the [`chunks#examples`] module
43//! for examples that write multiple items at once with
44//! [`Producer::write_chunk_uninit()`] and [`Producer::write_chunk()`]
45//! and read multiple items with [`Consumer::read_chunk()`].
46
47#![cfg_attr(not(feature = "std"), no_std)]
48#![warn(rust_2018_idioms)]
49#![deny(missing_docs, missing_debug_implementations)]
50#![deny(unsafe_op_in_unsafe_fn)]
51#![warn(clippy::undocumented_unsafe_blocks, clippy::unnecessary_safety_comment)]
52
53extern crate alloc;
54
55use alloc::sync::Arc;
56use alloc::vec::Vec;
57use core::cell::Cell;
58use core::fmt;
59use core::marker::PhantomData;
60use core::mem::{ManuallyDrop, MaybeUninit};
61use core::sync::atomic::{AtomicUsize, Ordering};
62
63#[allow(dead_code, clippy::undocumented_unsafe_blocks)]
64mod cache_padded;
65use cache_padded::CachePadded;
66
67pub mod chunks;
68
69// This is used in the documentation.
70#[allow(unused_imports)]
71use chunks::WriteChunkUninit;
72
73/// A bounded single-producer single-consumer (SPSC) queue.
74///
75/// Elements can be written with a [`Producer`] and read with a [`Consumer`],
76/// both of which can be obtained with [`RingBuffer::new()`].
77///
78/// *See also the [crate-level documentation](crate).*
79#[derive(Debug)]
80pub struct RingBuffer<T> {
81    /// The head of the queue.
82    ///
83    /// This integer is in range `0 .. 2 * capacity`.
84    head: CachePadded<AtomicUsize>,
85
86    /// The tail of the queue.
87    ///
88    /// This integer is in range `0 .. 2 * capacity`.
89    tail: CachePadded<AtomicUsize>,
90
91    /// The buffer holding slots.
92    data_ptr: *mut T,
93
94    /// The queue capacity.
95    capacity: usize,
96
97    /// Indicates that dropping a `RingBuffer<T>` may drop elements of type `T`.
98    _marker: PhantomData<T>,
99}
100
101impl<T> RingBuffer<T> {
102    /// Creates a `RingBuffer` with the given `capacity` and returns [`Producer`] and [`Consumer`].
103    ///
104    /// # Examples
105    ///
106    /// ```
107    /// use rtrb::RingBuffer;
108    ///
109    /// let (producer, consumer) = RingBuffer::<f32>::new(100);
110    /// ```
111    ///
112    /// Specifying an explicit type with the [turbofish](https://turbo.fish/)
113    /// is is only necessary if it cannot be deduced by the compiler.
114    ///
115    /// ```
116    /// use rtrb::RingBuffer;
117    ///
118    /// let (mut producer, consumer) = RingBuffer::new(100);
119    /// assert_eq!(producer.push(0.0f32), Ok(()));
120    /// ```
121    #[allow(clippy::new_ret_no_self)]
122    #[must_use]
123    pub fn new(capacity: usize) -> (Producer<T>, Consumer<T>) {
124        let buffer = Arc::new(RingBuffer {
125            head: CachePadded::new(AtomicUsize::new(0)),
126            tail: CachePadded::new(AtomicUsize::new(0)),
127            data_ptr: ManuallyDrop::new(Vec::with_capacity(capacity)).as_mut_ptr(),
128            capacity,
129            _marker: PhantomData,
130        });
131        let p = Producer {
132            buffer: buffer.clone(),
133            cached_head: Cell::new(0),
134            cached_tail: Cell::new(0),
135        };
136        let c = Consumer {
137            buffer,
138            cached_head: Cell::new(0),
139            cached_tail: Cell::new(0),
140        };
141        (p, c)
142    }
143
144    /// Returns the capacity of the queue.
145    ///
146    /// # Examples
147    ///
148    /// ```
149    /// use rtrb::RingBuffer;
150    ///
151    /// let (producer, consumer) = RingBuffer::<f32>::new(100);
152    /// assert_eq!(producer.buffer().capacity(), 100);
153    /// assert_eq!(consumer.buffer().capacity(), 100);
154    /// // Both producer and consumer of course refer to the same ring buffer:
155    /// assert_eq!(producer.buffer(), consumer.buffer());
156    /// ```
157    pub fn capacity(&self) -> usize {
158        self.capacity
159    }
160
161    /// Wraps a position from the range `0 .. 2 * capacity` to `0 .. capacity`.
162    fn collapse_position(&self, pos: usize) -> usize {
163        debug_assert!(pos == 0 || pos < 2 * self.capacity);
164        if pos < self.capacity {
165            pos
166        } else {
167            pos - self.capacity
168        }
169    }
170
171    /// Returns a pointer to the slot at position `pos`.
172    ///
173    /// If `pos == 0 && capacity == 0`, the returned pointer must not be dereferenced!
174    unsafe fn slot_ptr(&self, pos: usize) -> *mut T {
175        debug_assert!(pos == 0 || pos < 2 * self.capacity);
176        let pos = self.collapse_position(pos);
177        // SAFETY: The caller must ensure a valid pos.
178        unsafe { self.data_ptr.add(pos) }
179    }
180
181    /// Increments a position by going `n` slots forward.
182    fn increment(&self, pos: usize, n: usize) -> usize {
183        debug_assert!(pos == 0 || pos < 2 * self.capacity);
184        debug_assert!(n <= self.capacity);
185        let threshold = 2 * self.capacity - n;
186        if pos < threshold {
187            pos + n
188        } else {
189            pos - threshold
190        }
191    }
192
193    /// Increments a position by going one slot forward.
194    ///
195    /// This is more efficient than self.increment(..., 1).
196    fn increment1(&self, pos: usize) -> usize {
197        debug_assert_ne!(self.capacity, 0);
198        debug_assert!(pos < 2 * self.capacity);
199        if pos < 2 * self.capacity - 1 {
200            pos + 1
201        } else {
202            0
203        }
204    }
205
206    /// Returns the distance between two positions.
207    fn distance(&self, a: usize, b: usize) -> usize {
208        debug_assert!(a == 0 || a < 2 * self.capacity);
209        debug_assert!(b == 0 || b < 2 * self.capacity);
210        if a <= b {
211            b - a
212        } else {
213            2 * self.capacity - a + b
214        }
215    }
216}
217
218impl<T> Drop for RingBuffer<T> {
219    /// Drops all non-empty slots.
220    fn drop(&mut self) {
221        let mut head = self.head.load(Ordering::Relaxed);
222        let tail = self.tail.load(Ordering::Relaxed);
223
224        // Loop over all slots that hold a value and drop them.
225        while head != tail {
226            // SAFETY: All slots between head and tail have been initialized.
227            unsafe { self.slot_ptr(head).drop_in_place() };
228            head = self.increment1(head);
229        }
230
231        // Finally, deallocate the buffer, but don't run any destructors.
232        // SAFETY: data_ptr and capacity are still valid from the original initialization.
233        unsafe { Vec::from_raw_parts(self.data_ptr, 0, self.capacity) };
234    }
235}
236
237impl<T> PartialEq for RingBuffer<T> {
238    /// This method tests for `self` and `other` values to be equal, and is used by `==`.
239    ///
240    /// # Examples
241    ///
242    /// ```
243    /// use rtrb::RingBuffer;
244    ///
245    /// let (p1, c1) = RingBuffer::<f32>::new(1000);
246    /// assert_eq!(p1.buffer(), c1.buffer());
247    ///
248    /// let (p2, c2) = RingBuffer::<f32>::new(1000);
249    /// assert_ne!(p1.buffer(), p2.buffer());
250    /// ```
251    fn eq(&self, other: &Self) -> bool {
252        core::ptr::eq(self, other)
253    }
254}
255
256impl<T> Eq for RingBuffer<T> {}
257
258/// The producer side of a [`RingBuffer`].
259///
260/// Can be moved between threads,
261/// but references from different threads are not allowed
262/// (i.e. it is [`Send`] but not [`Sync`]).
263///
264/// Can only be created with [`RingBuffer::new()`]
265/// (together with its counterpart, the [`Consumer`]).
266///
267/// Individual elements can be moved into the ring buffer with [`Producer::push()`],
268/// multiple elements at once can be written with [`Producer::write_chunk()`]
269/// and [`Producer::write_chunk_uninit()`].
270///
271/// The number of free slots currently available for writing can be obtained with
272/// [`Producer::slots()`].
273///
274/// When the `Producer` is dropped, [`Consumer::is_abandoned()`] will return `true`.
275/// This can be used as a crude way to communicate to the receiving thread
276/// that no more data will be produced.
277/// When the `Producer` is dropped after the [`Consumer`] has already been dropped,
278/// [`RingBuffer::drop()`] will be called, freeing the allocated memory.
279#[derive(Debug, PartialEq, Eq)]
280pub struct Producer<T> {
281    /// A reference to the ring buffer.
282    buffer: Arc<RingBuffer<T>>,
283
284    /// A copy of `buffer.head` for quick access.
285    ///
286    /// This value can be stale and sometimes needs to be resynchronized with `buffer.head`.
287    cached_head: Cell<usize>,
288
289    /// A copy of `buffer.tail` for quick access.
290    ///
291    /// This value is always in sync with `buffer.tail`.
292    // NB: Caching the tail seems to have little effect on Intel CPUs, but it seems to
293    //     improve performance on AMD CPUs, see https://github.com/mgeier/rtrb/pull/132
294    cached_tail: Cell<usize>,
295}
296
297// SAFETY: After moving a Producer to another thread, there is still only a single thread
298// that can access the producer side of the queue.
299unsafe impl<T: Send> Send for Producer<T> {}
300
301impl<T> Producer<T> {
302    /// Attempts to push an element into the queue.
303    ///
304    /// The element is *moved* into the ring buffer and its slot
305    /// is made available to be read by the [`Consumer`].
306    ///
307    /// # Errors
308    ///
309    /// If the queue is full, the element is returned back as an error.
310    ///
311    /// # Examples
312    ///
313    /// ```
314    /// use rtrb::{RingBuffer, PushError};
315    ///
316    /// let (mut p, c) = RingBuffer::new(1);
317    ///
318    /// assert_eq!(p.push(10), Ok(()));
319    /// assert_eq!(p.push(20), Err(PushError::Full(20)));
320    /// ```
321    pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
322        if let Some(tail) = self.next_tail() {
323            // SAFETY: tail points to an empty slot.
324            unsafe { self.buffer.slot_ptr(tail).write(value) };
325            let tail = self.buffer.increment1(tail);
326            self.buffer.tail.store(tail, Ordering::Release);
327            self.cached_tail.set(tail);
328            Ok(())
329        } else {
330            Err(PushError::Full(value))
331        }
332    }
333
334    /// Returns the number of slots available for writing.
335    ///
336    /// Since items can be concurrently consumed on another thread, the actual number
337    /// of available slots may increase at any time (up to the [`RingBuffer::capacity()`]).
338    ///
339    /// To check for a single available slot,
340    /// using [`Producer::is_full()`] is often quicker
341    /// (because it might not have to check an atomic variable).
342    ///
343    /// # Examples
344    ///
345    /// ```
346    /// use rtrb::RingBuffer;
347    ///
348    /// let (p, c) = RingBuffer::<f32>::new(1024);
349    ///
350    /// assert_eq!(p.slots(), 1024);
351    /// ```
352    pub fn slots(&self) -> usize {
353        let head = self.buffer.head.load(Ordering::Acquire);
354        self.cached_head.set(head);
355        self.buffer.capacity - self.buffer.distance(head, self.cached_tail.get())
356    }
357
358    /// Returns `true` if there are currently no slots available for writing.
359    ///
360    /// A full ring buffer might cease to be full at any time
361    /// if the corresponding [`Consumer`] is consuming items in another thread.
362    ///
363    /// # Examples
364    ///
365    /// ```
366    /// use rtrb::RingBuffer;
367    ///
368    /// let (p, c) = RingBuffer::<f32>::new(1);
369    ///
370    /// assert!(!p.is_full());
371    /// ```
372    ///
373    /// Since items can be concurrently consumed on another thread, the ring buffer
374    /// might not be full for long:
375    ///
376    /// ```
377    /// # use rtrb::RingBuffer;
378    /// # let (p, c) = RingBuffer::<f32>::new(1);
379    /// if p.is_full() {
380    ///     // The buffer might be full, but it might as well not be
381    ///     // if an item was just consumed on another thread.
382    /// }
383    /// ```
384    ///
385    /// However, if it's not full, another thread cannot change that:
386    ///
387    /// ```
388    /// # use rtrb::RingBuffer;
389    /// # let (p, c) = RingBuffer::<f32>::new(1);
390    /// if !p.is_full() {
391    ///     // At least one slot is guaranteed to be available for writing.
392    /// }
393    /// ```
394    pub fn is_full(&self) -> bool {
395        self.next_tail().is_none()
396    }
397
398    /// Returns `true` if the corresponding [`Consumer`] has been destroyed.
399    ///
400    /// Note that since Rust version 1.74.0, this is not synchronizing with the consumer thread
401    /// anymore, see <https://github.com/mgeier/rtrb/issues/114>.
402    /// In a future version of `rtrb`, the synchronizing behavior might be restored.
403    ///
404    /// # Examples
405    ///
406    /// ```
407    /// use rtrb::RingBuffer;
408    ///
409    /// let (mut p, c) = RingBuffer::new(7);
410    /// assert!(!p.is_abandoned());
411    /// assert_eq!(p.push(10), Ok(()));
412    /// drop(c);
413    /// // The items that are still in the ring buffer are not accessible anymore.
414    /// assert!(p.is_abandoned());
415    /// // Even though it's futile, items can still be written:
416    /// assert_eq!(p.push(11), Ok(()));
417    /// ```
418    ///
419    /// Since the consumer can be concurrently dropped on another thread,
420    /// the producer might become abandoned at any time:
421    ///
422    /// ```
423    /// # use rtrb::RingBuffer;
424    /// # let (p, c) = RingBuffer::<i32>::new(1);
425    /// if !p.is_abandoned() {
426    ///     // Right now, the consumer might still be alive, but it might as well not be
427    ///     // if another thread has just dropped it.
428    /// }
429    /// ```
430    ///
431    /// However, if it already is abandoned, it will stay that way:
432    ///
433    /// ```
434    /// # use rtrb::RingBuffer;
435    /// # let (p, c) = RingBuffer::<i32>::new(1);
436    /// if p.is_abandoned() {
437    ///     // This is needed since Rust 1.74.0, see https://github.com/mgeier/rtrb/issues/114:
438    ///     std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire);
439    ///     // The consumer does definitely not exist anymore.
440    /// }
441    /// ```
442    pub fn is_abandoned(&self) -> bool {
443        Arc::strong_count(&self.buffer) < 2
444    }
445
446    /// Returns a read-only reference to the ring buffer.
447    pub fn buffer(&self) -> &RingBuffer<T> {
448        &self.buffer
449    }
450
451    /// Get the tail position for writing the next slot, if available.
452    ///
453    /// This is a strict subset of the functionality implemented in `write_chunk_uninit()`.
454    /// For performance, this special case is immplemented separately.
455    fn next_tail(&self) -> Option<usize> {
456        let tail = self.cached_tail.get();
457
458        // Check if the queue is *possibly* full.
459        if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity {
460            // Refresh the head ...
461            let head = self.buffer.head.load(Ordering::Acquire);
462            self.cached_head.set(head);
463
464            // ... and check if it's *really* full.
465            if self.buffer.distance(head, tail) == self.buffer.capacity {
466                return None;
467            }
468        }
469        Some(tail)
470    }
471}
472
473/// The consumer side of a [`RingBuffer`].
474///
475/// Can be moved between threads,
476/// but references from different threads are not allowed
477/// (i.e. it is [`Send`] but not [`Sync`]).
478///
479/// Can only be created with [`RingBuffer::new()`]
480/// (together with its counterpart, the [`Producer`]).
481///
482/// Individual elements can be moved out of the ring buffer with [`Consumer::pop()`],
483/// multiple elements at once can be read with [`Consumer::read_chunk()`].
484///
485/// The number of slots currently available for reading can be obtained with
486/// [`Consumer::slots()`].
487///
488/// When the `Consumer` is dropped, [`Producer::is_abandoned()`] will return `true`.
489/// This can be used as a crude way to communicate to the sending thread
490/// that no more data will be consumed.
491/// When the `Consumer` is dropped after the [`Producer`] has already been dropped,
492/// [`RingBuffer::drop()`] will be called, freeing the allocated memory.
493#[derive(Debug, PartialEq, Eq)]
494pub struct Consumer<T> {
495    /// A reference to the ring buffer.
496    buffer: Arc<RingBuffer<T>>,
497
498    /// A copy of `buffer.head` for quick access.
499    ///
500    /// This value is always in sync with `buffer.head`.
501    // NB: Caching the head seems to have little effect on Intel CPUs, but it seems to
502    //     improve performance on AMD CPUs, see https://github.com/mgeier/rtrb/pull/132
503    cached_head: Cell<usize>,
504
505    /// A copy of `buffer.tail` for quick access.
506    ///
507    /// This value can be stale and sometimes needs to be resynchronized with `buffer.tail`.
508    cached_tail: Cell<usize>,
509}
510
511// SAFETY: After moving a Consumer to another thread, there is still only a single thread
512// that can access the consumer side of the queue.
513unsafe impl<T: Send> Send for Consumer<T> {}
514
515impl<T> Consumer<T> {
516    /// Attempts to pop an element from the queue.
517    ///
518    /// The element is *moved* out of the ring buffer and its slot
519    /// is made available to be filled by the [`Producer`] again.
520    ///
521    /// # Errors
522    ///
523    /// If the queue is empty, an error is returned.
524    ///
525    /// # Examples
526    ///
527    /// ```
528    /// use rtrb::{PopError, RingBuffer};
529    ///
530    /// let (mut p, mut c) = RingBuffer::new(1);
531    ///
532    /// assert_eq!(p.push(10), Ok(()));
533    /// assert_eq!(c.pop(), Ok(10));
534    /// assert_eq!(c.pop(), Err(PopError::Empty));
535    /// ```
536    ///
537    /// To obtain an [`Option<T>`](Option), use [`.ok()`](Result::ok) on the result.
538    ///
539    /// ```
540    /// # use rtrb::RingBuffer;
541    /// # let (mut p, mut c) = RingBuffer::new(1);
542    /// assert_eq!(p.push(20), Ok(()));
543    /// assert_eq!(c.pop().ok(), Some(20));
544    /// ```
545    pub fn pop(&mut self) -> Result<T, PopError> {
546        if let Some(head) = self.next_head() {
547            // SAFETY: head points to an initialized slot.
548            let value = unsafe { self.buffer.slot_ptr(head).read() };
549            let head = self.buffer.increment1(head);
550            self.buffer.head.store(head, Ordering::Release);
551            self.cached_head.set(head);
552            Ok(value)
553        } else {
554            Err(PopError::Empty)
555        }
556    }
557
558    /// Attempts to read an element from the queue without removing it.
559    ///
560    /// # Errors
561    ///
562    /// If the queue is empty, an error is returned.
563    ///
564    /// # Examples
565    ///
566    /// ```
567    /// use rtrb::{PeekError, RingBuffer};
568    ///
569    /// let (mut p, c) = RingBuffer::new(1);
570    ///
571    /// assert_eq!(c.peek(), Err(PeekError::Empty));
572    /// assert_eq!(p.push(10), Ok(()));
573    /// assert_eq!(c.peek(), Ok(&10));
574    /// assert_eq!(c.peek(), Ok(&10));
575    /// ```
576    pub fn peek(&self) -> Result<&T, PeekError> {
577        if let Some(head) = self.next_head() {
578            // SAFETY: head points to an initialized slot.
579            Ok(unsafe { &*self.buffer.slot_ptr(head) })
580        } else {
581            Err(PeekError::Empty)
582        }
583    }
584
585    /// Returns the number of slots available for reading.
586    ///
587    /// Since items can be concurrently produced on another thread, the actual number
588    /// of available slots may increase at any time (up to the [`RingBuffer::capacity()`]).
589    ///
590    /// To check for a single available slot,
591    /// using [`Consumer::is_empty()`] is often quicker
592    /// (because it might not have to check an atomic variable).
593    ///
594    /// # Examples
595    ///
596    /// ```
597    /// use rtrb::RingBuffer;
598    ///
599    /// let (p, c) = RingBuffer::<f32>::new(1024);
600    ///
601    /// assert_eq!(c.slots(), 0);
602    /// ```
603    pub fn slots(&self) -> usize {
604        let tail = self.buffer.tail.load(Ordering::Acquire);
605        self.cached_tail.set(tail);
606        self.buffer.distance(self.cached_head.get(), tail)
607    }
608
609    /// Returns `true` if there are currently no slots available for reading.
610    ///
611    /// An empty ring buffer might cease to be empty at any time
612    /// if the corresponding [`Producer`] is producing items in another thread.
613    ///
614    /// # Examples
615    ///
616    /// ```
617    /// use rtrb::RingBuffer;
618    ///
619    /// let (p, c) = RingBuffer::<f32>::new(1);
620    ///
621    /// assert!(c.is_empty());
622    /// ```
623    ///
624    /// Since items can be concurrently produced on another thread, the ring buffer
625    /// might not be empty for long:
626    ///
627    /// ```
628    /// # use rtrb::RingBuffer;
629    /// # let (p, c) = RingBuffer::<f32>::new(1);
630    /// if c.is_empty() {
631    ///     // The buffer might be empty, but it might as well not be
632    ///     // if an item was just produced on another thread.
633    /// }
634    /// ```
635    ///
636    /// However, if it's not empty, another thread cannot change that:
637    ///
638    /// ```
639    /// # use rtrb::RingBuffer;
640    /// # let (p, c) = RingBuffer::<f32>::new(1);
641    /// if !c.is_empty() {
642    ///     // At least one slot is guaranteed to be available for reading.
643    /// }
644    /// ```
645    pub fn is_empty(&self) -> bool {
646        self.next_head().is_none()
647    }
648
649    /// Returns `true` if the corresponding [`Producer`] has been destroyed.
650    ///
651    /// Note that since Rust version 1.74.0, this is not synchronizing with the producer thread
652    /// anymore, see <https://github.com/mgeier/rtrb/issues/114>.
653    /// In a future version of `rtrb`, the synchronizing behavior might be restored.
654    ///
655    /// # Examples
656    ///
657    /// ```
658    /// use rtrb::RingBuffer;
659    ///
660    /// let (mut p, mut c) = RingBuffer::new(7);
661    /// assert!(!c.is_abandoned());
662    /// assert_eq!(p.push(10), Ok(()));
663    /// drop(p);
664    /// assert!(c.is_abandoned());
665    /// // The items that are left in the ring buffer can still be consumed:
666    /// assert_eq!(c.pop(), Ok(10));
667    /// ```
668    ///
669    /// Since the producer can be concurrently dropped on another thread,
670    /// the consumer might become abandoned at any time:
671    ///
672    /// ```
673    /// # use rtrb::RingBuffer;
674    /// # let (p, c) = RingBuffer::<i32>::new(1);
675    /// if !c.is_abandoned() {
676    ///     // Right now, the producer might still be alive, but it might as well not be
677    ///     // if another thread has just dropped it.
678    /// }
679    /// ```
680    ///
681    /// However, if it already is abandoned, it will stay that way:
682    ///
683    /// ```
684    /// # use rtrb::RingBuffer;
685    /// # let (p, c) = RingBuffer::<i32>::new(1);
686    /// if c.is_abandoned() {
687    ///     // This is needed since Rust 1.74.0, see https://github.com/mgeier/rtrb/issues/114:
688    ///     std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire);
689    ///     // The producer does definitely not exist anymore.
690    /// }
691    /// ```
692    pub fn is_abandoned(&self) -> bool {
693        Arc::strong_count(&self.buffer) < 2
694    }
695
696    /// Returns a read-only reference to the ring buffer.
697    pub fn buffer(&self) -> &RingBuffer<T> {
698        &self.buffer
699    }
700
701    /// Get the head position for reading the next slot, if available.
702    ///
703    /// This is a strict subset of the functionality implemented in `read_chunk()`.
704    /// For performance, this special case is immplemented separately.
705    fn next_head(&self) -> Option<usize> {
706        let head = self.cached_head.get();
707
708        // Check if the queue is *possibly* empty.
709        if head == self.cached_tail.get() {
710            // Refresh the tail ...
711            let tail = self.buffer.tail.load(Ordering::Acquire);
712            self.cached_tail.set(tail);
713
714            // ... and check if it's *really* empty.
715            if head == tail {
716                return None;
717            }
718        }
719        Some(head)
720    }
721}
722
723/// Extension trait used to provide a [`copy_to_uninit()`](CopyToUninit::copy_to_uninit)
724/// method on built-in slices.
725///
726/// This can be used to safely copy data to the slices returned from
727/// [`WriteChunkUninit::as_mut_slices()`].
728///
729/// To use this, the trait has to be brought into scope, e.g. with:
730///
731/// ```
732/// use rtrb::CopyToUninit;
733/// ```
734pub trait CopyToUninit<T: Copy> {
735    /// Copies contents to a possibly uninitialized slice.
736    fn copy_to_uninit<'a>(&self, dst: &'a mut [MaybeUninit<T>]) -> &'a mut [T];
737}
738
739impl<T: Copy> CopyToUninit<T> for [T] {
740    /// Copies contents to a possibly uninitialized slice.
741    ///
742    /// # Panics
743    ///
744    /// This function will panic if the two slices have different lengths.
745    fn copy_to_uninit<'a>(&self, dst: &'a mut [MaybeUninit<T>]) -> &'a mut [T] {
746        assert_eq!(
747            self.len(),
748            dst.len(),
749            "source slice length does not match destination slice length"
750        );
751        let dst_ptr = dst.as_mut_ptr().cast();
752        // SAFETY: The lengths have been checked to be equal and
753        // the mutable reference makes sure that there is no overlap.
754        unsafe {
755            self.as_ptr().copy_to_nonoverlapping(dst_ptr, self.len());
756            core::slice::from_raw_parts_mut(dst_ptr, self.len())
757        }
758    }
759}
760
761/// Error type for [`Consumer::pop()`].
762#[derive(Debug, Copy, Clone, PartialEq, Eq)]
763pub enum PopError {
764    /// The queue was empty.
765    Empty,
766}
767
768#[cfg(feature = "std")]
769impl std::error::Error for PopError {}
770
771impl fmt::Display for PopError {
772    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
773        match self {
774            PopError::Empty => "empty ring buffer".fmt(f),
775        }
776    }
777}
778
779/// Error type for [`Consumer::peek()`].
780#[derive(Debug, Copy, Clone, PartialEq, Eq)]
781pub enum PeekError {
782    /// The queue was empty.
783    Empty,
784}
785
786#[cfg(feature = "std")]
787impl std::error::Error for PeekError {}
788
789impl fmt::Display for PeekError {
790    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
791        match self {
792            PeekError::Empty => "empty ring buffer".fmt(f),
793        }
794    }
795}
796
797/// Error type for [`Producer::push()`].
798#[derive(Copy, Clone, PartialEq, Eq)]
799pub enum PushError<T> {
800    /// The queue was full.
801    Full(T),
802}
803
804#[cfg(feature = "std")]
805impl<T> std::error::Error for PushError<T> {}
806
807impl<T> fmt::Debug for PushError<T> {
808    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
809        match self {
810            PushError::Full(_) => f.pad("Full(_)"),
811        }
812    }
813}
814
815impl<T> fmt::Display for PushError<T> {
816    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
817        match self {
818            PushError::Full(_) => "full ring buffer".fmt(f),
819        }
820    }
821}