rtrb/lib.rs
1//! A realtime-safe single-producer single-consumer (SPSC) ring buffer.
2//!
3//! A [`RingBuffer`] consists of two parts:
4//! a [`Producer`] for writing into the ring buffer and
5//! a [`Consumer`] for reading from the ring buffer.
6//!
7//! A fixed-capacity buffer is allocated on construction.
8//! After that, no more memory is allocated (unless the type `T` does that internally).
9//! Reading from and writing into the ring buffer is *lock-free* and *wait-free*.
10//! All reading and writing functions return immediately.
11//! Attempts to write to a full buffer return an error;
12//! values inside the buffer are *not* overwritten.
13//! Attempts to read from an empty buffer return an error as well.
14//! Only a single thread can write into the ring buffer and a single thread
15//! (typically a different one) can read from the ring buffer.
16//! If the queue is empty, there is no way for the reading thread to wait
17//! for new data, other than trying repeatedly until reading succeeds.
18//! Similarly, if the queue is full, there is no way for the writing thread
19//! to wait for newly available space to write to, other than trying repeatedly.
20//!
21//! # Examples
22//!
23//! Moving single elements into and out of a queue with
24//! [`Producer::push()`] and [`Consumer::pop()`], respectively:
25//!
26//! ```
27//! use rtrb::{RingBuffer, PushError, PopError};
28//!
29//! let (mut producer, mut consumer) = RingBuffer::new(2);
30//!
31//! assert_eq!(producer.push(10), Ok(()));
32//! assert_eq!(producer.push(20), Ok(()));
33//! assert_eq!(producer.push(30), Err(PushError::Full(30)));
34//!
35//! std::thread::spawn(move || {
36//! assert_eq!(consumer.pop(), Ok(10));
37//! assert_eq!(consumer.pop(), Ok(20));
38//! assert_eq!(consumer.pop(), Err(PopError::Empty));
39//! }).join().unwrap();
40//! ```
41//!
42//! See the documentation of the [`chunks#examples`] module
43//! for examples that write multiple items at once with
44//! [`Producer::write_chunk_uninit()`] and [`Producer::write_chunk()`]
45//! and read multiple items with [`Consumer::read_chunk()`].
46
47#![cfg_attr(not(feature = "std"), no_std)]
48#![warn(rust_2018_idioms)]
49#![deny(missing_docs, missing_debug_implementations)]
50#![deny(unsafe_op_in_unsafe_fn)]
51#![warn(clippy::undocumented_unsafe_blocks, clippy::unnecessary_safety_comment)]
52
53extern crate alloc;
54
55use alloc::sync::Arc;
56use alloc::vec::Vec;
57use core::cell::Cell;
58use core::fmt;
59use core::marker::PhantomData;
60use core::mem::{ManuallyDrop, MaybeUninit};
61use core::sync::atomic::{AtomicUsize, Ordering};
62
63#[allow(dead_code, clippy::undocumented_unsafe_blocks)]
64mod cache_padded;
65use cache_padded::CachePadded;
66
67pub mod chunks;
68
69// This is used in the documentation.
70#[allow(unused_imports)]
71use chunks::WriteChunkUninit;
72
73/// A bounded single-producer single-consumer (SPSC) queue.
74///
75/// Elements can be written with a [`Producer`] and read with a [`Consumer`],
76/// both of which can be obtained with [`RingBuffer::new()`].
77///
78/// *See also the [crate-level documentation](crate).*
79#[derive(Debug)]
80pub struct RingBuffer<T> {
81 /// The head of the queue.
82 ///
83 /// This integer is in range `0 .. 2 * capacity`.
84 head: CachePadded<AtomicUsize>,
85
86 /// The tail of the queue.
87 ///
88 /// This integer is in range `0 .. 2 * capacity`.
89 tail: CachePadded<AtomicUsize>,
90
91 /// The buffer holding slots.
92 data_ptr: *mut T,
93
94 /// The queue capacity.
95 capacity: usize,
96
97 /// Indicates that dropping a `RingBuffer<T>` may drop elements of type `T`.
98 _marker: PhantomData<T>,
99}
100
101impl<T> RingBuffer<T> {
102 /// Creates a `RingBuffer` with the given `capacity` and returns [`Producer`] and [`Consumer`].
103 ///
104 /// # Examples
105 ///
106 /// ```
107 /// use rtrb::RingBuffer;
108 ///
109 /// let (producer, consumer) = RingBuffer::<f32>::new(100);
110 /// ```
111 ///
112 /// Specifying an explicit type with the [turbofish](https://turbo.fish/)
113 /// is is only necessary if it cannot be deduced by the compiler.
114 ///
115 /// ```
116 /// use rtrb::RingBuffer;
117 ///
118 /// let (mut producer, consumer) = RingBuffer::new(100);
119 /// assert_eq!(producer.push(0.0f32), Ok(()));
120 /// ```
121 #[allow(clippy::new_ret_no_self)]
122 #[must_use]
123 pub fn new(capacity: usize) -> (Producer<T>, Consumer<T>) {
124 let buffer = Arc::new(RingBuffer {
125 head: CachePadded::new(AtomicUsize::new(0)),
126 tail: CachePadded::new(AtomicUsize::new(0)),
127 data_ptr: ManuallyDrop::new(Vec::with_capacity(capacity)).as_mut_ptr(),
128 capacity,
129 _marker: PhantomData,
130 });
131 let p = Producer {
132 buffer: buffer.clone(),
133 cached_head: Cell::new(0),
134 cached_tail: Cell::new(0),
135 };
136 let c = Consumer {
137 buffer,
138 cached_head: Cell::new(0),
139 cached_tail: Cell::new(0),
140 };
141 (p, c)
142 }
143
144 /// Returns the capacity of the queue.
145 ///
146 /// # Examples
147 ///
148 /// ```
149 /// use rtrb::RingBuffer;
150 ///
151 /// let (producer, consumer) = RingBuffer::<f32>::new(100);
152 /// assert_eq!(producer.buffer().capacity(), 100);
153 /// assert_eq!(consumer.buffer().capacity(), 100);
154 /// // Both producer and consumer of course refer to the same ring buffer:
155 /// assert_eq!(producer.buffer(), consumer.buffer());
156 /// ```
157 pub fn capacity(&self) -> usize {
158 self.capacity
159 }
160
161 /// Wraps a position from the range `0 .. 2 * capacity` to `0 .. capacity`.
162 fn collapse_position(&self, pos: usize) -> usize {
163 debug_assert!(pos == 0 || pos < 2 * self.capacity);
164 if pos < self.capacity {
165 pos
166 } else {
167 pos - self.capacity
168 }
169 }
170
171 /// Returns a pointer to the slot at position `pos`.
172 ///
173 /// If `pos == 0 && capacity == 0`, the returned pointer must not be dereferenced!
174 unsafe fn slot_ptr(&self, pos: usize) -> *mut T {
175 debug_assert!(pos == 0 || pos < 2 * self.capacity);
176 let pos = self.collapse_position(pos);
177 // SAFETY: The caller must ensure a valid pos.
178 unsafe { self.data_ptr.add(pos) }
179 }
180
181 /// Increments a position by going `n` slots forward.
182 fn increment(&self, pos: usize, n: usize) -> usize {
183 debug_assert!(pos == 0 || pos < 2 * self.capacity);
184 debug_assert!(n <= self.capacity);
185 let threshold = 2 * self.capacity - n;
186 if pos < threshold {
187 pos + n
188 } else {
189 pos - threshold
190 }
191 }
192
193 /// Increments a position by going one slot forward.
194 ///
195 /// This is more efficient than self.increment(..., 1).
196 fn increment1(&self, pos: usize) -> usize {
197 debug_assert_ne!(self.capacity, 0);
198 debug_assert!(pos < 2 * self.capacity);
199 if pos < 2 * self.capacity - 1 {
200 pos + 1
201 } else {
202 0
203 }
204 }
205
206 /// Returns the distance between two positions.
207 fn distance(&self, a: usize, b: usize) -> usize {
208 debug_assert!(a == 0 || a < 2 * self.capacity);
209 debug_assert!(b == 0 || b < 2 * self.capacity);
210 if a <= b {
211 b - a
212 } else {
213 2 * self.capacity - a + b
214 }
215 }
216}
217
218impl<T> Drop for RingBuffer<T> {
219 /// Drops all non-empty slots.
220 fn drop(&mut self) {
221 let mut head = self.head.load(Ordering::Relaxed);
222 let tail = self.tail.load(Ordering::Relaxed);
223
224 // Loop over all slots that hold a value and drop them.
225 while head != tail {
226 // SAFETY: All slots between head and tail have been initialized.
227 unsafe { self.slot_ptr(head).drop_in_place() };
228 head = self.increment1(head);
229 }
230
231 // Finally, deallocate the buffer, but don't run any destructors.
232 // SAFETY: data_ptr and capacity are still valid from the original initialization.
233 unsafe { Vec::from_raw_parts(self.data_ptr, 0, self.capacity) };
234 }
235}
236
237impl<T> PartialEq for RingBuffer<T> {
238 /// This method tests for `self` and `other` values to be equal, and is used by `==`.
239 ///
240 /// # Examples
241 ///
242 /// ```
243 /// use rtrb::RingBuffer;
244 ///
245 /// let (p1, c1) = RingBuffer::<f32>::new(1000);
246 /// assert_eq!(p1.buffer(), c1.buffer());
247 ///
248 /// let (p2, c2) = RingBuffer::<f32>::new(1000);
249 /// assert_ne!(p1.buffer(), p2.buffer());
250 /// ```
251 fn eq(&self, other: &Self) -> bool {
252 core::ptr::eq(self, other)
253 }
254}
255
256impl<T> Eq for RingBuffer<T> {}
257
258/// The producer side of a [`RingBuffer`].
259///
260/// Can be moved between threads,
261/// but references from different threads are not allowed
262/// (i.e. it is [`Send`] but not [`Sync`]).
263///
264/// Can only be created with [`RingBuffer::new()`]
265/// (together with its counterpart, the [`Consumer`]).
266///
267/// Individual elements can be moved into the ring buffer with [`Producer::push()`],
268/// multiple elements at once can be written with [`Producer::write_chunk()`]
269/// and [`Producer::write_chunk_uninit()`].
270///
271/// The number of free slots currently available for writing can be obtained with
272/// [`Producer::slots()`].
273///
274/// When the `Producer` is dropped, [`Consumer::is_abandoned()`] will return `true`.
275/// This can be used as a crude way to communicate to the receiving thread
276/// that no more data will be produced.
277/// When the `Producer` is dropped after the [`Consumer`] has already been dropped,
278/// [`RingBuffer::drop()`] will be called, freeing the allocated memory.
279#[derive(Debug, PartialEq, Eq)]
280pub struct Producer<T> {
281 /// A reference to the ring buffer.
282 buffer: Arc<RingBuffer<T>>,
283
284 /// A copy of `buffer.head` for quick access.
285 ///
286 /// This value can be stale and sometimes needs to be resynchronized with `buffer.head`.
287 cached_head: Cell<usize>,
288
289 /// A copy of `buffer.tail` for quick access.
290 ///
291 /// This value is always in sync with `buffer.tail`.
292 // NB: Caching the tail seems to have little effect on Intel CPUs, but it seems to
293 // improve performance on AMD CPUs, see https://github.com/mgeier/rtrb/pull/132
294 cached_tail: Cell<usize>,
295}
296
297// SAFETY: After moving a Producer to another thread, there is still only a single thread
298// that can access the producer side of the queue.
299unsafe impl<T: Send> Send for Producer<T> {}
300
301impl<T> Producer<T> {
302 /// Attempts to push an element into the queue.
303 ///
304 /// The element is *moved* into the ring buffer and its slot
305 /// is made available to be read by the [`Consumer`].
306 ///
307 /// # Errors
308 ///
309 /// If the queue is full, the element is returned back as an error.
310 ///
311 /// # Examples
312 ///
313 /// ```
314 /// use rtrb::{RingBuffer, PushError};
315 ///
316 /// let (mut p, c) = RingBuffer::new(1);
317 ///
318 /// assert_eq!(p.push(10), Ok(()));
319 /// assert_eq!(p.push(20), Err(PushError::Full(20)));
320 /// ```
321 pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
322 if let Some(tail) = self.next_tail() {
323 // SAFETY: tail points to an empty slot.
324 unsafe { self.buffer.slot_ptr(tail).write(value) };
325 let tail = self.buffer.increment1(tail);
326 self.buffer.tail.store(tail, Ordering::Release);
327 self.cached_tail.set(tail);
328 Ok(())
329 } else {
330 Err(PushError::Full(value))
331 }
332 }
333
334 /// Returns the number of slots available for writing.
335 ///
336 /// Since items can be concurrently consumed on another thread, the actual number
337 /// of available slots may increase at any time (up to the [`RingBuffer::capacity()`]).
338 ///
339 /// To check for a single available slot,
340 /// using [`Producer::is_full()`] is often quicker
341 /// (because it might not have to check an atomic variable).
342 ///
343 /// # Examples
344 ///
345 /// ```
346 /// use rtrb::RingBuffer;
347 ///
348 /// let (p, c) = RingBuffer::<f32>::new(1024);
349 ///
350 /// assert_eq!(p.slots(), 1024);
351 /// ```
352 pub fn slots(&self) -> usize {
353 let head = self.buffer.head.load(Ordering::Acquire);
354 self.cached_head.set(head);
355 self.buffer.capacity - self.buffer.distance(head, self.cached_tail.get())
356 }
357
358 /// Returns `true` if there are currently no slots available for writing.
359 ///
360 /// A full ring buffer might cease to be full at any time
361 /// if the corresponding [`Consumer`] is consuming items in another thread.
362 ///
363 /// # Examples
364 ///
365 /// ```
366 /// use rtrb::RingBuffer;
367 ///
368 /// let (p, c) = RingBuffer::<f32>::new(1);
369 ///
370 /// assert!(!p.is_full());
371 /// ```
372 ///
373 /// Since items can be concurrently consumed on another thread, the ring buffer
374 /// might not be full for long:
375 ///
376 /// ```
377 /// # use rtrb::RingBuffer;
378 /// # let (p, c) = RingBuffer::<f32>::new(1);
379 /// if p.is_full() {
380 /// // The buffer might be full, but it might as well not be
381 /// // if an item was just consumed on another thread.
382 /// }
383 /// ```
384 ///
385 /// However, if it's not full, another thread cannot change that:
386 ///
387 /// ```
388 /// # use rtrb::RingBuffer;
389 /// # let (p, c) = RingBuffer::<f32>::new(1);
390 /// if !p.is_full() {
391 /// // At least one slot is guaranteed to be available for writing.
392 /// }
393 /// ```
394 pub fn is_full(&self) -> bool {
395 self.next_tail().is_none()
396 }
397
398 /// Returns `true` if the corresponding [`Consumer`] has been destroyed.
399 ///
400 /// Note that since Rust version 1.74.0, this is not synchronizing with the consumer thread
401 /// anymore, see <https://github.com/mgeier/rtrb/issues/114>.
402 /// In a future version of `rtrb`, the synchronizing behavior might be restored.
403 ///
404 /// # Examples
405 ///
406 /// ```
407 /// use rtrb::RingBuffer;
408 ///
409 /// let (mut p, c) = RingBuffer::new(7);
410 /// assert!(!p.is_abandoned());
411 /// assert_eq!(p.push(10), Ok(()));
412 /// drop(c);
413 /// // The items that are still in the ring buffer are not accessible anymore.
414 /// assert!(p.is_abandoned());
415 /// // Even though it's futile, items can still be written:
416 /// assert_eq!(p.push(11), Ok(()));
417 /// ```
418 ///
419 /// Since the consumer can be concurrently dropped on another thread,
420 /// the producer might become abandoned at any time:
421 ///
422 /// ```
423 /// # use rtrb::RingBuffer;
424 /// # let (p, c) = RingBuffer::<i32>::new(1);
425 /// if !p.is_abandoned() {
426 /// // Right now, the consumer might still be alive, but it might as well not be
427 /// // if another thread has just dropped it.
428 /// }
429 /// ```
430 ///
431 /// However, if it already is abandoned, it will stay that way:
432 ///
433 /// ```
434 /// # use rtrb::RingBuffer;
435 /// # let (p, c) = RingBuffer::<i32>::new(1);
436 /// if p.is_abandoned() {
437 /// // This is needed since Rust 1.74.0, see https://github.com/mgeier/rtrb/issues/114:
438 /// std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire);
439 /// // The consumer does definitely not exist anymore.
440 /// }
441 /// ```
442 pub fn is_abandoned(&self) -> bool {
443 Arc::strong_count(&self.buffer) < 2
444 }
445
446 /// Returns a read-only reference to the ring buffer.
447 pub fn buffer(&self) -> &RingBuffer<T> {
448 &self.buffer
449 }
450
451 /// Get the tail position for writing the next slot, if available.
452 ///
453 /// This is a strict subset of the functionality implemented in `write_chunk_uninit()`.
454 /// For performance, this special case is immplemented separately.
455 fn next_tail(&self) -> Option<usize> {
456 let tail = self.cached_tail.get();
457
458 // Check if the queue is *possibly* full.
459 if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity {
460 // Refresh the head ...
461 let head = self.buffer.head.load(Ordering::Acquire);
462 self.cached_head.set(head);
463
464 // ... and check if it's *really* full.
465 if self.buffer.distance(head, tail) == self.buffer.capacity {
466 return None;
467 }
468 }
469 Some(tail)
470 }
471}
472
473/// The consumer side of a [`RingBuffer`].
474///
475/// Can be moved between threads,
476/// but references from different threads are not allowed
477/// (i.e. it is [`Send`] but not [`Sync`]).
478///
479/// Can only be created with [`RingBuffer::new()`]
480/// (together with its counterpart, the [`Producer`]).
481///
482/// Individual elements can be moved out of the ring buffer with [`Consumer::pop()`],
483/// multiple elements at once can be read with [`Consumer::read_chunk()`].
484///
485/// The number of slots currently available for reading can be obtained with
486/// [`Consumer::slots()`].
487///
488/// When the `Consumer` is dropped, [`Producer::is_abandoned()`] will return `true`.
489/// This can be used as a crude way to communicate to the sending thread
490/// that no more data will be consumed.
491/// When the `Consumer` is dropped after the [`Producer`] has already been dropped,
492/// [`RingBuffer::drop()`] will be called, freeing the allocated memory.
493#[derive(Debug, PartialEq, Eq)]
494pub struct Consumer<T> {
495 /// A reference to the ring buffer.
496 buffer: Arc<RingBuffer<T>>,
497
498 /// A copy of `buffer.head` for quick access.
499 ///
500 /// This value is always in sync with `buffer.head`.
501 // NB: Caching the head seems to have little effect on Intel CPUs, but it seems to
502 // improve performance on AMD CPUs, see https://github.com/mgeier/rtrb/pull/132
503 cached_head: Cell<usize>,
504
505 /// A copy of `buffer.tail` for quick access.
506 ///
507 /// This value can be stale and sometimes needs to be resynchronized with `buffer.tail`.
508 cached_tail: Cell<usize>,
509}
510
511// SAFETY: After moving a Consumer to another thread, there is still only a single thread
512// that can access the consumer side of the queue.
513unsafe impl<T: Send> Send for Consumer<T> {}
514
515impl<T> Consumer<T> {
516 /// Attempts to pop an element from the queue.
517 ///
518 /// The element is *moved* out of the ring buffer and its slot
519 /// is made available to be filled by the [`Producer`] again.
520 ///
521 /// # Errors
522 ///
523 /// If the queue is empty, an error is returned.
524 ///
525 /// # Examples
526 ///
527 /// ```
528 /// use rtrb::{PopError, RingBuffer};
529 ///
530 /// let (mut p, mut c) = RingBuffer::new(1);
531 ///
532 /// assert_eq!(p.push(10), Ok(()));
533 /// assert_eq!(c.pop(), Ok(10));
534 /// assert_eq!(c.pop(), Err(PopError::Empty));
535 /// ```
536 ///
537 /// To obtain an [`Option<T>`](Option), use [`.ok()`](Result::ok) on the result.
538 ///
539 /// ```
540 /// # use rtrb::RingBuffer;
541 /// # let (mut p, mut c) = RingBuffer::new(1);
542 /// assert_eq!(p.push(20), Ok(()));
543 /// assert_eq!(c.pop().ok(), Some(20));
544 /// ```
545 pub fn pop(&mut self) -> Result<T, PopError> {
546 if let Some(head) = self.next_head() {
547 // SAFETY: head points to an initialized slot.
548 let value = unsafe { self.buffer.slot_ptr(head).read() };
549 let head = self.buffer.increment1(head);
550 self.buffer.head.store(head, Ordering::Release);
551 self.cached_head.set(head);
552 Ok(value)
553 } else {
554 Err(PopError::Empty)
555 }
556 }
557
558 /// Attempts to read an element from the queue without removing it.
559 ///
560 /// # Errors
561 ///
562 /// If the queue is empty, an error is returned.
563 ///
564 /// # Examples
565 ///
566 /// ```
567 /// use rtrb::{PeekError, RingBuffer};
568 ///
569 /// let (mut p, c) = RingBuffer::new(1);
570 ///
571 /// assert_eq!(c.peek(), Err(PeekError::Empty));
572 /// assert_eq!(p.push(10), Ok(()));
573 /// assert_eq!(c.peek(), Ok(&10));
574 /// assert_eq!(c.peek(), Ok(&10));
575 /// ```
576 pub fn peek(&self) -> Result<&T, PeekError> {
577 if let Some(head) = self.next_head() {
578 // SAFETY: head points to an initialized slot.
579 Ok(unsafe { &*self.buffer.slot_ptr(head) })
580 } else {
581 Err(PeekError::Empty)
582 }
583 }
584
585 /// Returns the number of slots available for reading.
586 ///
587 /// Since items can be concurrently produced on another thread, the actual number
588 /// of available slots may increase at any time (up to the [`RingBuffer::capacity()`]).
589 ///
590 /// To check for a single available slot,
591 /// using [`Consumer::is_empty()`] is often quicker
592 /// (because it might not have to check an atomic variable).
593 ///
594 /// # Examples
595 ///
596 /// ```
597 /// use rtrb::RingBuffer;
598 ///
599 /// let (p, c) = RingBuffer::<f32>::new(1024);
600 ///
601 /// assert_eq!(c.slots(), 0);
602 /// ```
603 pub fn slots(&self) -> usize {
604 let tail = self.buffer.tail.load(Ordering::Acquire);
605 self.cached_tail.set(tail);
606 self.buffer.distance(self.cached_head.get(), tail)
607 }
608
609 /// Returns `true` if there are currently no slots available for reading.
610 ///
611 /// An empty ring buffer might cease to be empty at any time
612 /// if the corresponding [`Producer`] is producing items in another thread.
613 ///
614 /// # Examples
615 ///
616 /// ```
617 /// use rtrb::RingBuffer;
618 ///
619 /// let (p, c) = RingBuffer::<f32>::new(1);
620 ///
621 /// assert!(c.is_empty());
622 /// ```
623 ///
624 /// Since items can be concurrently produced on another thread, the ring buffer
625 /// might not be empty for long:
626 ///
627 /// ```
628 /// # use rtrb::RingBuffer;
629 /// # let (p, c) = RingBuffer::<f32>::new(1);
630 /// if c.is_empty() {
631 /// // The buffer might be empty, but it might as well not be
632 /// // if an item was just produced on another thread.
633 /// }
634 /// ```
635 ///
636 /// However, if it's not empty, another thread cannot change that:
637 ///
638 /// ```
639 /// # use rtrb::RingBuffer;
640 /// # let (p, c) = RingBuffer::<f32>::new(1);
641 /// if !c.is_empty() {
642 /// // At least one slot is guaranteed to be available for reading.
643 /// }
644 /// ```
645 pub fn is_empty(&self) -> bool {
646 self.next_head().is_none()
647 }
648
649 /// Returns `true` if the corresponding [`Producer`] has been destroyed.
650 ///
651 /// Note that since Rust version 1.74.0, this is not synchronizing with the producer thread
652 /// anymore, see <https://github.com/mgeier/rtrb/issues/114>.
653 /// In a future version of `rtrb`, the synchronizing behavior might be restored.
654 ///
655 /// # Examples
656 ///
657 /// ```
658 /// use rtrb::RingBuffer;
659 ///
660 /// let (mut p, mut c) = RingBuffer::new(7);
661 /// assert!(!c.is_abandoned());
662 /// assert_eq!(p.push(10), Ok(()));
663 /// drop(p);
664 /// assert!(c.is_abandoned());
665 /// // The items that are left in the ring buffer can still be consumed:
666 /// assert_eq!(c.pop(), Ok(10));
667 /// ```
668 ///
669 /// Since the producer can be concurrently dropped on another thread,
670 /// the consumer might become abandoned at any time:
671 ///
672 /// ```
673 /// # use rtrb::RingBuffer;
674 /// # let (p, c) = RingBuffer::<i32>::new(1);
675 /// if !c.is_abandoned() {
676 /// // Right now, the producer might still be alive, but it might as well not be
677 /// // if another thread has just dropped it.
678 /// }
679 /// ```
680 ///
681 /// However, if it already is abandoned, it will stay that way:
682 ///
683 /// ```
684 /// # use rtrb::RingBuffer;
685 /// # let (p, c) = RingBuffer::<i32>::new(1);
686 /// if c.is_abandoned() {
687 /// // This is needed since Rust 1.74.0, see https://github.com/mgeier/rtrb/issues/114:
688 /// std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire);
689 /// // The producer does definitely not exist anymore.
690 /// }
691 /// ```
692 pub fn is_abandoned(&self) -> bool {
693 Arc::strong_count(&self.buffer) < 2
694 }
695
696 /// Returns a read-only reference to the ring buffer.
697 pub fn buffer(&self) -> &RingBuffer<T> {
698 &self.buffer
699 }
700
701 /// Get the head position for reading the next slot, if available.
702 ///
703 /// This is a strict subset of the functionality implemented in `read_chunk()`.
704 /// For performance, this special case is immplemented separately.
705 fn next_head(&self) -> Option<usize> {
706 let head = self.cached_head.get();
707
708 // Check if the queue is *possibly* empty.
709 if head == self.cached_tail.get() {
710 // Refresh the tail ...
711 let tail = self.buffer.tail.load(Ordering::Acquire);
712 self.cached_tail.set(tail);
713
714 // ... and check if it's *really* empty.
715 if head == tail {
716 return None;
717 }
718 }
719 Some(head)
720 }
721}
722
723/// Extension trait used to provide a [`copy_to_uninit()`](CopyToUninit::copy_to_uninit)
724/// method on built-in slices.
725///
726/// This can be used to safely copy data to the slices returned from
727/// [`WriteChunkUninit::as_mut_slices()`].
728///
729/// To use this, the trait has to be brought into scope, e.g. with:
730///
731/// ```
732/// use rtrb::CopyToUninit;
733/// ```
734pub trait CopyToUninit<T: Copy> {
735 /// Copies contents to a possibly uninitialized slice.
736 fn copy_to_uninit<'a>(&self, dst: &'a mut [MaybeUninit<T>]) -> &'a mut [T];
737}
738
739impl<T: Copy> CopyToUninit<T> for [T] {
740 /// Copies contents to a possibly uninitialized slice.
741 ///
742 /// # Panics
743 ///
744 /// This function will panic if the two slices have different lengths.
745 fn copy_to_uninit<'a>(&self, dst: &'a mut [MaybeUninit<T>]) -> &'a mut [T] {
746 assert_eq!(
747 self.len(),
748 dst.len(),
749 "source slice length does not match destination slice length"
750 );
751 let dst_ptr = dst.as_mut_ptr().cast();
752 // SAFETY: The lengths have been checked to be equal and
753 // the mutable reference makes sure that there is no overlap.
754 unsafe {
755 self.as_ptr().copy_to_nonoverlapping(dst_ptr, self.len());
756 core::slice::from_raw_parts_mut(dst_ptr, self.len())
757 }
758 }
759}
760
761/// Error type for [`Consumer::pop()`].
762#[derive(Debug, Copy, Clone, PartialEq, Eq)]
763pub enum PopError {
764 /// The queue was empty.
765 Empty,
766}
767
768#[cfg(feature = "std")]
769impl std::error::Error for PopError {}
770
771impl fmt::Display for PopError {
772 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
773 match self {
774 PopError::Empty => "empty ring buffer".fmt(f),
775 }
776 }
777}
778
779/// Error type for [`Consumer::peek()`].
780#[derive(Debug, Copy, Clone, PartialEq, Eq)]
781pub enum PeekError {
782 /// The queue was empty.
783 Empty,
784}
785
786#[cfg(feature = "std")]
787impl std::error::Error for PeekError {}
788
789impl fmt::Display for PeekError {
790 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
791 match self {
792 PeekError::Empty => "empty ring buffer".fmt(f),
793 }
794 }
795}
796
797/// Error type for [`Producer::push()`].
798#[derive(Copy, Clone, PartialEq, Eq)]
799pub enum PushError<T> {
800 /// The queue was full.
801 Full(T),
802}
803
804#[cfg(feature = "std")]
805impl<T> std::error::Error for PushError<T> {}
806
807impl<T> fmt::Debug for PushError<T> {
808 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
809 match self {
810 PushError::Full(_) => f.pad("Full(_)"),
811 }
812 }
813}
814
815impl<T> fmt::Display for PushError<T> {
816 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
817 match self {
818 PushError::Full(_) => "full ring buffer".fmt(f),
819 }
820 }
821}