rtrb_basedrop/lib.rs
1//! This is a fork of the [`rtrb`] crate that uses [`basedrop`]'s `Shared` pointer in
2//! place of `Arc`. This ensures that when all references to the ring buffer are
3//! dropped, the underlying `Vec` will never potentially get deallocated (a non-realtime
4//! safe operation) in the realtime thread. Instead, all allocations are cleaned up in
5//! whatever thread owns the basedrop `Collector` object.
6//!
7//! This is especially useful for audio applications.
8//!
9//! [`rtrb`]: https://github.com/mgeier/rtrb
10//! [`basedrop`]: https://github.com/glowcoil/basedrop
11//!
12//! -------------------------------------------------------------
13//!
14//! A realtime-safe single-producer single-consumer (SPSC) ring buffer.
15//!
16//! A [`RingBuffer`] consists of two parts:
17//! a [`Producer`] for writing into the ring buffer and
18//! a [`Consumer`] for reading from the ring buffer.
19//!
20//! A fixed-capacity buffer is allocated on construction.
21//! After that, no more memory is allocated (unless the type `T` does that internally).
22//! Reading from and writing into the ring buffer is *lock-free* and *wait-free*.
23//! All reading and writing functions return immediately.
24//! Attempts to write to a full buffer return an error;
25//! values inside the buffer are *not* overwritten.
26//! Attempts to read from an empty buffer return an error as well.
27//! Only a single thread can write into the ring buffer and a single thread
28//! (typically a different one) can read from the ring buffer.
29//! If the queue is empty, there is no way for the reading thread to wait
30//! for new data, other than trying repeatedly until reading succeeds.
31//! Similarly, if the queue is full, there is no way for the writing thread
32//! to wait for newly available space to write to, other than trying repeatedly.
33//!
34//! # Examples
35//!
36//! Moving single elements into and out of a queue with
37//! [`Producer::push()`] and [`Consumer::pop()`], respectively:
38//!
39//! ```
40//! use rtrb_basedrop::{RingBuffer, PushError, PopError};
41//! use basedrop::Collector;
42//!
43//! let collector = Collector::new();
44//!
45//! let (mut producer, mut consumer) = RingBuffer::new(2, &collector.handle());
46//!
47//! assert_eq!(producer.push(10), Ok(()));
48//! assert_eq!(producer.push(20), Ok(()));
49//! assert_eq!(producer.push(30), Err(PushError::Full(30)));
50//!
51//! std::thread::spawn(move || {
52//! assert_eq!(consumer.pop(), Ok(10));
53//! assert_eq!(consumer.pop(), Ok(20));
54//! assert_eq!(consumer.pop(), Err(PopError::Empty));
55//! }).join().unwrap();
56//! ```
57//!
58//! See the documentation of the [`chunks#examples`] module
59//! for examples that write multiple items at once with
60//! [`Producer::write_chunk_uninit()`] and [`Producer::write_chunk()`]
61//! and read multiple items with [`Consumer::read_chunk()`].
62
63#![cfg_attr(not(feature = "std"), no_std)]
64#![warn(rust_2018_idioms)]
65#![deny(missing_docs, missing_debug_implementations)]
66
67extern crate alloc;
68
69use alloc::vec::Vec;
70use basedrop::{Handle, Shared};
71use core::cell::Cell;
72use core::fmt;
73use core::marker::PhantomData;
74use core::mem::{ManuallyDrop, MaybeUninit};
75use core::sync::atomic::{AtomicUsize, Ordering};
76use std::fmt::Debug;
77
78use cache_padded::CachePadded;
79
80pub mod chunks;
81
82// This is used in the documentation.
83#[allow(unused_imports)]
84use chunks::WriteChunkUninit;
85
86/// A bounded single-producer single-consumer (SPSC) queue.
87///
88/// Elements can be written with a [`Producer`] and read with a [`Consumer`],
89/// both of which can be obtained with [`RingBuffer::new()`].
90///
91/// *See also the [crate-level documentation](crate).*
92pub struct RingBuffer<T: Send + 'static> {
93 /// The head of the queue.
94 ///
95 /// This integer is in range `0 .. 2 * capacity`.
96 head: CachePadded<AtomicUsize>,
97
98 /// The tail of the queue.
99 ///
100 /// This integer is in range `0 .. 2 * capacity`.
101 tail: CachePadded<AtomicUsize>,
102
103 /// The buffer holding slots.
104 data_ptr: *mut T,
105
106 /// The queue capacity.
107 capacity: usize,
108
109 /// Indicates that dropping a `RingBuffer<T>` may drop elements of type `T`.
110 _marker: PhantomData<T>,
111}
112
113impl<T: Send + 'static> Debug for RingBuffer<T> {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 let mut f = f.debug_struct("RingBuffer");
116 f.field("head", &self.head);
117 f.field("tail", &self.tail);
118 f.field("capacity", &self.capacity);
119 f.finish()
120 }
121}
122
123unsafe impl<T: Send + 'static> Send for RingBuffer<T> {}
124
125impl<T: Send + 'static> RingBuffer<T> {
126 /// Creates a `RingBuffer` with the given `capacity` and returns [`Producer`] and [`Consumer`].
127 ///
128 /// # Examples
129 ///
130 /// ```
131 /// use rtrb_basedrop::RingBuffer;
132 /// use basedrop::Collector;
133 ///
134 /// let collector = Collector::new();
135 ///
136 /// let (producer, consumer) = RingBuffer::<f32>::new(100, &collector.handle());
137 /// ```
138 ///
139 /// Specifying an explicit type with the [turbofish](https://turbo.fish/)
140 /// is is only necessary if it cannot be deduced by the compiler.
141 ///
142 /// ```
143 /// use rtrb_basedrop::RingBuffer;
144 /// use basedrop::Collector;
145 ///
146 /// let collector = Collector::new();
147 ///
148 /// let (mut producer, consumer) = RingBuffer::new(100, &collector.handle());
149 /// assert_eq!(producer.push(0.0f32), Ok(()));
150 /// ```
151 #[allow(clippy::new_ret_no_self)]
152 #[must_use]
153 pub fn new(capacity: usize, handle: &Handle) -> (Producer<T>, Consumer<T>) {
154 let buffer = Shared::new(
155 handle,
156 RingBuffer {
157 head: CachePadded::new(AtomicUsize::new(0)),
158 tail: CachePadded::new(AtomicUsize::new(0)),
159 data_ptr: ManuallyDrop::new(Vec::with_capacity(capacity)).as_mut_ptr(),
160 capacity,
161 _marker: PhantomData,
162 },
163 );
164 let p = Producer {
165 buffer: buffer.clone(),
166 cached_head: Cell::new(0),
167 cached_tail: Cell::new(0),
168 };
169 let c = Consumer {
170 buffer,
171 cached_head: Cell::new(0),
172 cached_tail: Cell::new(0),
173 };
174 (p, c)
175 }
176
177 /// Returns the capacity of the queue.
178 ///
179 /// # Examples
180 ///
181 /// ```
182 /// use rtrb_basedrop::RingBuffer;
183 /// use basedrop::Collector;
184 ///
185 /// let collector = Collector::new();
186 ///
187 /// let (producer, consumer) = RingBuffer::<f32>::new(100, &collector.handle());
188 /// assert_eq!(producer.buffer().capacity(), 100);
189 /// assert_eq!(consumer.buffer().capacity(), 100);
190 /// // Both producer and consumer of course refer to the same ring buffer:
191 /// assert_eq!(producer.buffer(), consumer.buffer());
192 /// ```
193 pub fn capacity(&self) -> usize {
194 self.capacity
195 }
196
197 /// Wraps a position from the range `0 .. 2 * capacity` to `0 .. capacity`.
198 fn collapse_position(&self, pos: usize) -> usize {
199 debug_assert!(pos == 0 || pos < 2 * self.capacity);
200 if pos < self.capacity {
201 pos
202 } else {
203 pos - self.capacity
204 }
205 }
206
207 /// Returns a pointer to the slot at position `pos`.
208 ///
209 /// If `pos == 0 && capacity == 0`, the returned pointer must not be dereferenced!
210 unsafe fn slot_ptr(&self, pos: usize) -> *mut T {
211 debug_assert!(pos == 0 || pos < 2 * self.capacity);
212 self.data_ptr.add(self.collapse_position(pos))
213 }
214
215 /// Increments a position by going `n` slots forward.
216 fn increment(&self, pos: usize, n: usize) -> usize {
217 debug_assert!(pos == 0 || pos < 2 * self.capacity);
218 debug_assert!(n <= self.capacity);
219 let threshold = 2 * self.capacity - n;
220 if pos < threshold {
221 pos + n
222 } else {
223 pos - threshold
224 }
225 }
226
227 /// Increments a position by going one slot forward.
228 ///
229 /// This is more efficient than self.increment(..., 1).
230 fn increment1(&self, pos: usize) -> usize {
231 debug_assert_ne!(self.capacity, 0);
232 debug_assert!(pos < 2 * self.capacity);
233 if pos < 2 * self.capacity - 1 {
234 pos + 1
235 } else {
236 0
237 }
238 }
239
240 /// Returns the distance between two positions.
241 fn distance(&self, a: usize, b: usize) -> usize {
242 debug_assert!(a == 0 || a < 2 * self.capacity);
243 debug_assert!(b == 0 || b < 2 * self.capacity);
244 if a <= b {
245 b - a
246 } else {
247 2 * self.capacity - a + b
248 }
249 }
250}
251
252impl<T: Send + 'static> Drop for RingBuffer<T> {
253 /// Drops all non-empty slots.
254 fn drop(&mut self) {
255 let mut head = self.head.load(Ordering::Relaxed);
256 let tail = self.tail.load(Ordering::Relaxed);
257
258 // Loop over all slots that hold a value and drop them.
259 while head != tail {
260 unsafe {
261 self.slot_ptr(head).drop_in_place();
262 }
263 head = self.increment1(head);
264 }
265
266 // Finally, deallocate the buffer, but don't run any destructors.
267 unsafe {
268 Vec::from_raw_parts(self.data_ptr, 0, self.capacity);
269 }
270 }
271}
272
273impl<T: Send + 'static> PartialEq for RingBuffer<T> {
274 /// This method tests for `self` and `other` values to be equal, and is used by `==`.
275 ///
276 /// # Examples
277 ///
278 /// ```
279 /// use rtrb_basedrop::RingBuffer;
280 /// use basedrop::Collector;
281 ///
282 /// let collector = Collector::new();
283 ///
284 /// let (p1, c1) = RingBuffer::<f32>::new(1000, &collector.handle());
285 /// assert_eq!(p1.buffer(), c1.buffer());
286 ///
287 /// let (p2, c2) = RingBuffer::<f32>::new(1000, &collector.handle());
288 /// assert_ne!(p1.buffer(), p2.buffer());
289 /// ```
290 fn eq(&self, other: &Self) -> bool {
291 core::ptr::eq(self, other)
292 }
293}
294
295impl<T: Send + 'static> Eq for RingBuffer<T> {}
296
297/// The producer side of a [`RingBuffer`].
298///
299/// Can be moved between threads,
300/// but references from different threads are not allowed
301/// (i.e. it is [`Send`] but not [`Sync`]).
302///
303/// Can only be created with [`RingBuffer::new()`]
304/// (together with its counterpart, the [`Consumer`]).
305///
306/// Individual elements can be moved into the ring buffer with [`Producer::push()`],
307/// multiple elements at once can be written with [`Producer::write_chunk()`]
308/// and [`Producer::write_chunk_uninit()`].
309///
310/// The number of free slots currently available for writing can be obtained with
311/// [`Producer::slots()`].
312///
313/// When the `Producer` is dropped, [`Consumer::is_abandoned()`] will return `true`.
314/// This can be used as a crude way to communicate to the receiving thread
315/// that no more data will be produced.
316/// When the `Producer` is dropped after the [`Consumer`] has already been dropped,
317/// [`RingBuffer::drop()`] will be called, freeing the allocated memory.
318pub struct Producer<T: Send + 'static> {
319 /// A reference to the ring buffer.
320 buffer: Shared<RingBuffer<T>>,
321
322 /// A copy of `buffer.head` for quick access.
323 ///
324 /// This value can be stale and sometimes needs to be resynchronized with `buffer.head`.
325 cached_head: Cell<usize>,
326
327 /// A copy of `buffer.tail` for quick access.
328 ///
329 /// This value is always in sync with `buffer.tail`.
330 cached_tail: Cell<usize>,
331}
332
333impl<T: Send + 'static> Debug for Producer<T> {
334 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335 let mut f = f.debug_struct("Producer");
336 f.field("buffer", &*self.buffer);
337 f.field("cached_head", &self.cached_head);
338 f.field("cached_tail", &self.cached_tail);
339 f.finish()
340 }
341}
342
343unsafe impl<T: Send + 'static> Send for Producer<T> {}
344
345impl<T: Send + 'static> Producer<T> {
346 /// Attempts to push an element into the queue.
347 ///
348 /// The element is *moved* into the ring buffer and its slot
349 /// is made available to be read by the [`Consumer`].
350 ///
351 /// # Errors
352 ///
353 /// If the queue is full, the element is returned back as an error.
354 ///
355 /// # Examples
356 ///
357 /// ```
358 /// use rtrb_basedrop::{RingBuffer, PushError};
359 /// use basedrop::Collector;
360 ///
361 /// let collector = Collector::new();
362 ///
363 /// let (mut p, c) = RingBuffer::new(1, &collector.handle());
364 ///
365 /// assert_eq!(p.push(10), Ok(()));
366 /// assert_eq!(p.push(20), Err(PushError::Full(20)));
367 /// ```
368 pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
369 if let Some(tail) = self.next_tail() {
370 unsafe {
371 self.buffer.slot_ptr(tail).write(value);
372 }
373 let tail = self.buffer.increment1(tail);
374 self.buffer.tail.store(tail, Ordering::Release);
375 self.cached_tail.set(tail);
376 Ok(())
377 } else {
378 Err(PushError::Full(value))
379 }
380 }
381
382 /// Returns the number of slots available for writing.
383 ///
384 /// Since items can be concurrently consumed on another thread, the actual number
385 /// of available slots may increase at any time (up to the [`RingBuffer::capacity()`]).
386 ///
387 /// To check for a single available slot,
388 /// using [`Producer::is_full()`] is often quicker
389 /// (because it might not have to check an atomic variable).
390 ///
391 /// # Examples
392 ///
393 /// ```
394 /// use rtrb_basedrop::RingBuffer;
395 /// use basedrop::Collector;
396 ///
397 /// let collector = Collector::new();
398 ///
399 /// let (p, c) = RingBuffer::<f32>::new(1024, &collector.handle());
400 ///
401 /// assert_eq!(p.slots(), 1024);
402 /// ```
403 pub fn slots(&self) -> usize {
404 let head = self.buffer.head.load(Ordering::Acquire);
405 self.cached_head.set(head);
406 self.buffer.capacity - self.buffer.distance(head, self.cached_tail.get())
407 }
408
409 /// Returns `true` if there are currently no slots available for writing.
410 ///
411 /// A full ring buffer might cease to be full at any time
412 /// if the corresponding [`Consumer`] is consuming items in another thread.
413 ///
414 /// # Examples
415 ///
416 /// ```
417 /// use rtrb_basedrop::RingBuffer;
418 /// use basedrop::Collector;
419 ///
420 /// let collector = Collector::new();
421 ///
422 /// let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
423 ///
424 /// assert!(!p.is_full());
425 /// ```
426 ///
427 /// Since items can be concurrently consumed on another thread, the ring buffer
428 /// might not be full for long:
429 ///
430 /// ```
431 /// # use rtrb_basedrop::RingBuffer;
432 /// # use basedrop::Collector;
433 /// # let collector = Collector::new();
434 /// # let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
435 /// if p.is_full() {
436 /// // The buffer might be full, but it might as well not be
437 /// // if an item was just consumed on another thread.
438 /// }
439 /// ```
440 ///
441 /// However, if it's not full, another thread cannot change that:
442 ///
443 /// ```
444 /// # use rtrb_basedrop::RingBuffer;
445 /// # use basedrop::Collector;
446 /// # let collector = Collector::new();
447 /// # let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
448 /// if !p.is_full() {
449 /// // At least one slot is guaranteed to be available for writing.
450 /// }
451 /// ```
452 pub fn is_full(&self) -> bool {
453 self.next_tail().is_none()
454 }
455
456 /*
457 /// Returns `true` if the corresponding [`Consumer`] has been destroyed.
458 ///
459 /// # Examples
460 ///
461 /// ```
462 /// use rtrb_basedrop::RingBuffer;
463 /// use basedrop::Collector;
464 ///
465 /// let collector = Collector::new();
466 ///
467 /// let (mut p, c) = RingBuffer::new(7, &collector.handle());
468 /// assert!(!p.is_abandoned());
469 /// assert_eq!(p.push(10), Ok(()));
470 /// drop(c);
471 /// // The items that are still in the ring buffer are not accessible anymore.
472 /// assert!(p.is_abandoned());
473 /// // Even though it's futile, items can still be written:
474 /// assert_eq!(p.push(11), Ok(()));
475 /// ```
476 ///
477 /// Since the consumer can be concurrently dropped on another thread,
478 /// the producer might become abandoned at any time:
479 ///
480 /// ```
481 /// # use rtrb_basedrop::RingBuffer;
482 /// # let (p, c) = RingBuffer::<i32>::new(1);
483 /// if !p.is_abandoned() {
484 /// // Right now, the consumer might still be alive, but it might as well not be
485 /// // if another thread has just dropped it.
486 /// }
487 /// ```
488 ///
489 /// However, if it already is abandoned, it will stay that way:
490 ///
491 /// ```
492 /// # use rtrb_basedrop::RingBuffer;
493 /// # let (p, c) = RingBuffer::<i32>::new(1);
494 /// if p.is_abandoned() {
495 /// // The consumer does definitely not exist anymore.
496 /// }
497 /// ```
498 pub fn is_abandoned(&self) -> bool {
499 Shared::strong_count(&self.buffer) < 2
500 }
501 */
502
503 /// Returns a read-only reference to the ring buffer.
504 pub fn buffer(&self) -> &RingBuffer<T> {
505 &self.buffer
506 }
507
508 /// Get the tail position for writing the next slot, if available.
509 ///
510 /// This is a strict subset of the functionality implemented in `write_chunk_uninit()`.
511 /// For performance, this special case is immplemented separately.
512 fn next_tail(&self) -> Option<usize> {
513 let tail = self.cached_tail.get();
514
515 // Check if the queue is *possibly* full.
516 if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity {
517 // Refresh the head ...
518 let head = self.buffer.head.load(Ordering::Acquire);
519 self.cached_head.set(head);
520
521 // ... and check if it's *really* full.
522 if self.buffer.distance(head, tail) == self.buffer.capacity {
523 return None;
524 }
525 }
526 Some(tail)
527 }
528}
529
530/// The consumer side of a [`RingBuffer`].
531///
532/// Can be moved between threads,
533/// but references from different threads are not allowed
534/// (i.e. it is [`Send`] but not [`Sync`]).
535///
536/// Can only be created with [`RingBuffer::new()`]
537/// (together with its counterpart, the [`Producer`]).
538///
539/// Individual elements can be moved out of the ring buffer with [`Consumer::pop()`],
540/// multiple elements at once can be read with [`Consumer::read_chunk()`].
541///
542/// The number of slots currently available for reading can be obtained with
543/// [`Consumer::slots()`].
544///
545/// When the `Consumer` is dropped, [`Producer::is_abandoned()`] will return `true`.
546/// This can be used as a crude way to communicate to the sending thread
547/// that no more data will be consumed.
548/// When the `Consumer` is dropped after the [`Producer`] has already been dropped,
549/// [`RingBuffer::drop()`] will be called, freeing the allocated memory.
550pub struct Consumer<T: Send + 'static> {
551 /// A reference to the ring buffer.
552 buffer: Shared<RingBuffer<T>>,
553
554 /// A copy of `buffer.head` for quick access.
555 ///
556 /// This value is always in sync with `buffer.head`.
557 cached_head: Cell<usize>,
558
559 /// A copy of `buffer.tail` for quick access.
560 ///
561 /// This value can be stale and sometimes needs to be resynchronized with `buffer.tail`.
562 cached_tail: Cell<usize>,
563}
564
565impl<T: Send + 'static> Debug for Consumer<T> {
566 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
567 let mut f = f.debug_struct("Consumer");
568 f.field("buffer", &*self.buffer);
569 f.field("cached_head", &self.cached_head);
570 f.field("cached_tail", &self.cached_tail);
571 f.finish()
572 }
573}
574
575unsafe impl<T: Send + 'static> Send for Consumer<T> {}
576
577impl<T: Send + 'static> Consumer<T> {
578 /// Attempts to pop an element from the queue.
579 ///
580 /// The element is *moved* out of the ring buffer and its slot
581 /// is made available to be filled by the [`Producer`] again.
582 ///
583 /// # Errors
584 ///
585 /// If the queue is empty, an error is returned.
586 ///
587 /// # Examples
588 ///
589 /// ```
590 /// use rtrb_basedrop::{PopError, RingBuffer};
591 /// use basedrop::Collector;
592 ///
593 /// let collector = Collector::new();
594 ///
595 /// let (mut p, mut c) = RingBuffer::new(1, &collector.handle());
596 ///
597 /// assert_eq!(p.push(10), Ok(()));
598 /// assert_eq!(c.pop(), Ok(10));
599 /// assert_eq!(c.pop(), Err(PopError::Empty));
600 /// ```
601 ///
602 /// To obtain an [`Option<T>`](Option), use [`.ok()`](Result::ok) on the result.
603 ///
604 /// ```
605 /// # use rtrb_basedrop::RingBuffer;
606 /// # use basedrop::Collector;
607 /// # let collector = Collector::new();
608 /// # let (mut p, mut c) = RingBuffer::new(1, &collector.handle());
609 /// assert_eq!(p.push(20), Ok(()));
610 /// assert_eq!(c.pop().ok(), Some(20));
611 /// ```
612 pub fn pop(&mut self) -> Result<T, PopError> {
613 if let Some(head) = self.next_head() {
614 let value = unsafe { self.buffer.slot_ptr(head).read() };
615 let head = self.buffer.increment1(head);
616 self.buffer.head.store(head, Ordering::Release);
617 self.cached_head.set(head);
618 Ok(value)
619 } else {
620 Err(PopError::Empty)
621 }
622 }
623
624 /// Attempts to read an element from the queue without removing it.
625 ///
626 /// # Errors
627 ///
628 /// If the queue is empty, an error is returned.
629 ///
630 /// # Examples
631 ///
632 /// ```
633 /// use rtrb_basedrop::{PeekError, RingBuffer};
634 /// use basedrop::Collector;
635 ///
636 /// let collector = Collector::new();
637 ///
638 /// let (mut p, c) = RingBuffer::new(1, &collector.handle());
639 ///
640 /// assert_eq!(c.peek(), Err(PeekError::Empty));
641 /// assert_eq!(p.push(10), Ok(()));
642 /// assert_eq!(c.peek(), Ok(&10));
643 /// assert_eq!(c.peek(), Ok(&10));
644 /// ```
645 pub fn peek(&self) -> Result<&T, PeekError> {
646 if let Some(head) = self.next_head() {
647 Ok(unsafe { &*self.buffer.slot_ptr(head) })
648 } else {
649 Err(PeekError::Empty)
650 }
651 }
652
653 /// Returns the number of slots available for reading.
654 ///
655 /// Since items can be concurrently produced on another thread, the actual number
656 /// of available slots may increase at any time (up to the [`RingBuffer::capacity()`]).
657 ///
658 /// To check for a single available slot,
659 /// using [`Consumer::is_empty()`] is often quicker
660 /// (because it might not have to check an atomic variable).
661 ///
662 /// # Examples
663 ///
664 /// ```
665 /// use rtrb_basedrop::RingBuffer;
666 /// use basedrop::Collector;
667 ///
668 /// let collector = Collector::new();
669 ///
670 /// let (p, c) = RingBuffer::<f32>::new(1024, &collector.handle());
671 ///
672 /// assert_eq!(c.slots(), 0);
673 /// ```
674 pub fn slots(&self) -> usize {
675 let tail = self.buffer.tail.load(Ordering::Acquire);
676 self.cached_tail.set(tail);
677 self.buffer.distance(self.cached_head.get(), tail)
678 }
679
680 /// Returns `true` if there are currently no slots available for reading.
681 ///
682 /// An empty ring buffer might cease to be empty at any time
683 /// if the corresponding [`Producer`] is producing items in another thread.
684 ///
685 /// # Examples
686 ///
687 /// ```
688 /// use rtrb_basedrop::RingBuffer;
689 /// use basedrop::Collector;
690 ///
691 /// let collector = Collector::new();
692 ///
693 /// let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
694 ///
695 /// assert!(c.is_empty());
696 /// ```
697 ///
698 /// Since items can be concurrently produced on another thread, the ring buffer
699 /// might not be empty for long:
700 ///
701 /// ```
702 /// # use rtrb_basedrop::RingBuffer;
703 /// # use basedrop::Collector;
704 /// # let collector = Collector::new();
705 /// # let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
706 /// if c.is_empty() {
707 /// // The buffer might be empty, but it might as well not be
708 /// // if an item was just produced on another thread.
709 /// }
710 /// ```
711 ///
712 /// However, if it's not empty, another thread cannot change that:
713 ///
714 /// ```
715 /// # use rtrb_basedrop::RingBuffer;
716 /// # use basedrop::Collector;
717 /// # let collector = Collector::new();
718 /// # let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
719 /// if !c.is_empty() {
720 /// // At least one slot is guaranteed to be available for reading.
721 /// }
722 /// ```
723 pub fn is_empty(&self) -> bool {
724 self.next_head().is_none()
725 }
726
727 /*
728 /// Returns `true` if the corresponding [`Producer`] has been destroyed.
729 ///
730 /// # Examples
731 ///
732 /// ```
733 /// use rtrb_basedrop::RingBuffer;
734 /// use basedrop::Collector;
735 ///
736 /// let collector = Collector::new();
737 ///
738 /// let (mut p, mut c) = RingBuffer::new(7, &collector.handle());
739 /// assert!(!c.is_abandoned());
740 /// assert_eq!(p.push(10), Ok(()));
741 /// drop(p);
742 /// assert!(c.is_abandoned());
743 /// // The items that are left in the ring buffer can still be consumed:
744 /// assert_eq!(c.pop(), Ok(10));
745 /// ```
746 ///
747 /// Since the producer can be concurrently dropped on another thread,
748 /// the consumer might become abandoned at any time:
749 ///
750 /// ```
751 /// # use rtrb_basedrop::RingBuffer;
752 /// # use basedrop::Collector;
753 /// # let collector = Collector::new();
754 /// # let (p, c) = RingBuffer::<i32>::new(1, &collector.handle());
755 /// if !c.is_abandoned() {
756 /// // Right now, the producer might still be alive, but it might as well not be
757 /// // if another thread has just dropped it.
758 /// }
759 /// ```
760 ///
761 /// However, if it already is abandoned, it will stay that way:
762 ///
763 /// ```
764 /// # use rtrb_basedrop::RingBuffer;
765 /// # use basedrop::Collector;
766 /// # let collector = Collector::new();
767 /// # let (p, c) = RingBuffer::<i32>::new(1, &collector.handle());
768 /// if c.is_abandoned() {
769 /// // The producer does definitely not exist anymore.
770 /// }
771 /// ```
772 pub fn is_abandoned(&self) -> bool {
773 Shared::strong_count(&self.buffer) < 2
774 }
775 */
776
777 /// Returns a read-only reference to the ring buffer.
778 pub fn buffer(&self) -> &RingBuffer<T> {
779 &self.buffer
780 }
781
782 /// Get the head position for reading the next slot, if available.
783 ///
784 /// This is a strict subset of the functionality implemented in `read_chunk()`.
785 /// For performance, this special case is immplemented separately.
786 fn next_head(&self) -> Option<usize> {
787 let head = self.cached_head.get();
788
789 // Check if the queue is *possibly* empty.
790 if head == self.cached_tail.get() {
791 // Refresh the tail ...
792 let tail = self.buffer.tail.load(Ordering::Acquire);
793 self.cached_tail.set(tail);
794
795 // ... and check if it's *really* empty.
796 if head == tail {
797 return None;
798 }
799 }
800 Some(head)
801 }
802}
803
804/// Extension trait used to provide a [`copy_to_uninit()`](CopyToUninit::copy_to_uninit)
805/// method on built-in slices.
806///
807/// This can be used to safely copy data to the slices returned from
808/// [`WriteChunkUninit::as_mut_slices()`].
809///
810/// To use this, the trait has to be brought into scope, e.g. with:
811///
812/// ```
813/// use rtrb_basedrop::CopyToUninit;
814/// ```
815pub trait CopyToUninit<T: Copy + Send + 'static> {
816 /// Copies contents to a possibly uninitialized slice.
817 fn copy_to_uninit(&self, dst: &mut [MaybeUninit<T>]) -> &mut [T];
818}
819
820impl<T: Copy + Send + 'static> CopyToUninit<T> for [T] {
821 /// Copies contents to a possibly uninitialized slice.
822 ///
823 /// # Panics
824 ///
825 /// This function will panic if the two slices have different lengths.
826 fn copy_to_uninit(&self, dst: &mut [MaybeUninit<T>]) -> &mut [T] {
827 assert_eq!(
828 self.len(),
829 dst.len(),
830 "source slice length does not match destination slice length"
831 );
832 let dst_ptr = dst.as_mut_ptr() as *mut _;
833 unsafe {
834 self.as_ptr().copy_to_nonoverlapping(dst_ptr, self.len());
835 core::slice::from_raw_parts_mut(dst_ptr, self.len())
836 }
837 }
838}
839
840/// Error type for [`Consumer::pop()`].
841#[derive(Debug, Copy, Clone, PartialEq, Eq)]
842pub enum PopError {
843 /// The queue was empty.
844 Empty,
845}
846
847#[cfg(feature = "std")]
848impl std::error::Error for PopError {}
849
850impl fmt::Display for PopError {
851 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
852 match self {
853 PopError::Empty => write!(f, "empty ring buffer"),
854 }
855 }
856}
857
858/// Error type for [`Consumer::peek()`].
859#[derive(Debug, Copy, Clone, PartialEq, Eq)]
860pub enum PeekError {
861 /// The queue was empty.
862 Empty,
863}
864
865#[cfg(feature = "std")]
866impl std::error::Error for PeekError {}
867
868impl fmt::Display for PeekError {
869 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
870 match self {
871 PeekError::Empty => write!(f, "empty ring buffer"),
872 }
873 }
874}
875
876/// Error type for [`Producer::push()`].
877#[derive(Copy, Clone, PartialEq, Eq)]
878pub enum PushError<T> {
879 /// The queue was full.
880 Full(T),
881}
882
883#[cfg(feature = "std")]
884impl<T> std::error::Error for PushError<T> {}
885
886impl<T> fmt::Debug for PushError<T> {
887 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
888 match self {
889 PushError::Full(_) => f.pad("Full(_)"),
890 }
891 }
892}
893
894impl<T> fmt::Display for PushError<T> {
895 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
896 match self {
897 PushError::Full(_) => write!(f, "full ring buffer"),
898 }
899 }
900}