ringbuffer_spsc/
lib.rs

1//! A fast thread-safe `no_std` single-producer single-consumer ring buffer.
2//! For performance reasons, the capacity of the buffer is determined
3//! at compile time via a const generic and it is required to be a
4//! power of two for a more efficient index handling.
5//!
6//! # Example
7//! ```
8//! use ringbuffer_spsc::RingBuffer;
9//!
10//! const N: usize = 1_000_000;
11//! let (mut tx, mut rx) = RingBuffer::<usize, 16>::init();
12//!
13//! let p = std::thread::spawn(move || {
14//!     let mut current: usize = 0;
15//!     while current < N {
16//!         if tx.push(current).is_none() {
17//!             current = current.wrapping_add(1);
18//!         } else {
19//!             std::thread::yield_now();
20//!         }
21//!     }
22//! });
23//!
24//! let c = std::thread::spawn(move || {
25//!     let mut current: usize = 0;
26//!     while current < N {
27//!         if let Some(c) = rx.pull() {
28//!             assert_eq!(c, current);
29//!             current = current.wrapping_add(1);
30//!         } else {
31//!             std::thread::yield_now();
32//!         }
33//!     }
34//! });
35//!
36//! p.join().unwrap();
37//! c.join().unwrap();
38//! ```
39#![no_std]
40extern crate alloc;
41
42use alloc::sync::Arc;
43use core::{
44    cell::UnsafeCell,
45    mem::{self, MaybeUninit},
46    sync::atomic::{AtomicUsize, Ordering},
47};
48use crossbeam::utils::CachePadded;
49
50pub struct RingBuffer<T, const N: usize> {
51    buffer: UnsafeCell<[MaybeUninit<T>; N]>,
52    idx_r: CachePadded<AtomicUsize>,
53    idx_w: CachePadded<AtomicUsize>,
54}
55
56impl<T, const N: usize> RingBuffer<T, N> {
57    #[allow(clippy::new_ret_no_self)]
58    #[deprecated(since = "0.1.8", note = "please use `init()` instead.")]
59    pub fn new() -> (RingBufferWriter<T, N>, RingBufferReader<T, N>) {
60        Self::init()
61    }
62
63    /// Initialize the RingBuffer with the given capacity
64    pub fn init() -> (RingBufferWriter<T, N>, RingBufferReader<T, N>) {
65        assert!(
66            N.is_power_of_two(),
67            "RingBuffer requires the capacity to be a power of 2. {N} is not."
68        );
69        let rb = Arc::new(RingBuffer {
70            buffer: UnsafeCell::new(array_init::array_init(|_| MaybeUninit::uninit())),
71            idx_r: CachePadded::new(AtomicUsize::new(0)),
72            idx_w: CachePadded::new(AtomicUsize::new(0)),
73        });
74        (
75            RingBufferWriter {
76                inner: rb.clone(),
77                cached_idx_r: 0,
78                local_idx_w: 0,
79            },
80            RingBufferReader {
81                inner: rb,
82                local_idx_r: 0,
83                cached_idx_w: 0,
84            },
85        )
86    }
87
88    #[allow(clippy::mut_from_ref)]
89    #[inline]
90    unsafe fn get_mut(&self, idx: usize) -> &mut MaybeUninit<T> {
91        // Since N is a power of two, N-1 is a mask covering N
92        // elements overflowing when N elements have been added.
93        // Indexes are left growing indefinetely and naturally wraps
94        // around once the index increment reaches usize::MAX.
95        &mut (*self.buffer.get())[idx & (N - 1)]
96    }
97}
98
99impl<T, const N: usize> Drop for RingBuffer<T, N> {
100    fn drop(&mut self) {
101        let mut idx_r = self.idx_r.load(Ordering::Acquire);
102        let idx_w = self.idx_w.load(Ordering::Acquire);
103
104        while idx_r != idx_w {
105            let t =
106                unsafe { mem::replace(self.get_mut(idx_r), MaybeUninit::uninit()).assume_init() };
107            mem::drop(t);
108            idx_r = idx_r.wrapping_add(1);
109        }
110    }
111}
112
113pub struct RingBufferWriter<T, const N: usize> {
114    inner: Arc<RingBuffer<T, N>>,
115    cached_idx_r: usize,
116    local_idx_w: usize,
117}
118
119unsafe impl<T: Send, const N: usize> Send for RingBufferWriter<T, N> {}
120unsafe impl<T: Sync, const N: usize> Sync for RingBufferWriter<T, N> {}
121
122impl<T, const N: usize> RingBufferWriter<T, N> {
123    /// Push an element into the RingBuffer.
124    /// It returns `Some(T)` if the RingBuffer is full, giving back the ownership of `T`.
125    #[inline]
126    pub fn push(&mut self, t: T) -> Option<T> {
127        // Check if the ring buffer is full.
128        if self.is_full() {
129            return Some(t);
130        }
131
132        // Insert the element in the ring buffer
133        let _ = unsafe { mem::replace(self.inner.get_mut(self.local_idx_w), MaybeUninit::new(t)) };
134        // Let's increment the counter and let it grow indefinitely and potentially overflow resetting it to 0.
135        self.local_idx_w = self.local_idx_w.wrapping_add(1);
136        self.inner.idx_w.store(self.local_idx_w, Ordering::Release);
137
138        None
139    }
140
141    /// Check if the RingBuffer is full and evenutally updates the internal cached indexes.
142    #[inline]
143    pub fn is_full(&mut self) -> bool {
144        // Check if the ring buffer is potentially full.
145        // This happens when the difference between the write and read indexes equals
146        // the ring buffer capacity. Note that the write and read indexes are left growing
147        // indefinitely, so we need to compute the difference by accounting for any eventual
148        // overflow. This requires wrapping the subtraction operation.
149        if self.local_idx_w.wrapping_sub(self.cached_idx_r) == N {
150            self.cached_idx_r = self.inner.idx_r.load(Ordering::Acquire);
151            // Check if the ring buffer is really full
152            self.local_idx_w.wrapping_sub(self.cached_idx_r) == N
153        } else {
154            false
155        }
156    }
157}
158
159pub struct RingBufferReader<T, const N: usize> {
160    inner: Arc<RingBuffer<T, N>>,
161    local_idx_r: usize,
162    cached_idx_w: usize,
163}
164
165unsafe impl<T: Send, const N: usize> Send for RingBufferReader<T, N> {}
166unsafe impl<T: Sync, const N: usize> Sync for RingBufferReader<T, N> {}
167
168impl<T, const N: usize> RingBufferReader<T, N> {
169    /// Pull an element from the RingBuffer.
170    /// It returns `None` if the RingBuffer is empty.
171    #[inline]
172    pub fn pull(&mut self) -> Option<T> {
173        // Check if the ring buffer is potentially empty
174        if self.is_empty() {
175            return None;
176        }
177
178        // Remove the element from the ring buffer
179        let t = unsafe {
180            mem::replace(self.inner.get_mut(self.local_idx_r), MaybeUninit::uninit()).assume_init()
181        };
182        // Let's increment the counter and let it grow indefinitely
183        // and potentially overflow resetting it to 0.
184        self.local_idx_r = self.local_idx_r.wrapping_add(1);
185        self.inner.idx_r.store(self.local_idx_r, Ordering::Release);
186
187        Some(t)
188    }
189
190    /// Check if the RingBuffer is empty and evenutally updates the internal cached indexes.
191    #[inline]
192    pub fn is_empty(&mut self) -> bool {
193        // Check if the ring buffer is potentially empty
194        if self.local_idx_r == self.cached_idx_w {
195            // Update the write index
196            self.cached_idx_w = self.inner.idx_w.load(Ordering::Acquire);
197            // Check if the ring buffer is really empty
198            self.local_idx_r == self.cached_idx_w
199        } else {
200            false
201        }
202    }
203}