Skip to main content

ringbuffer_spsc/
lib.rs

1//! A fast, small single-producer single-consumer (SPSC) ringbuffer designed
2//! for low-latency and high-throughput exchange between a single writer and
3//! a single reader. This crate is `#![no_std]` and uses `alloc` internally; it provides a
4//! minimal, ergonomic API that works well from `no_std` contexts that supply
5//! an allocator as well as from normal `std` programs and examples.
6//!
7//! Important design points:
8//! - The ringbuffer capacity is specified at runtime via the [`ringbuffer`] constructor
9//!   and **must be a power of two**. The implementation uses a bitmask to wrap
10//!   indices which is much faster than a modulo operation and reduces runtime
11//!   overhead in hot paths.
12//! - The API is non-blocking: [`RingBufferWriter::push`] returns `Some(T)` immediately when the
13//!   buffer is full (giving back ownership of the value), and [`RingBufferReader::pull`] returns
14//!   `None` immediately when the buffer is empty. Typical usage is to yield or
15//!   retry in those cases.
16//!
17//! *NOTE:* elements remaining in the buffer are dropped when the internal storage is deallocated.
18//! This happens when both [`RingBufferReader`] and [`RingBufferWriter`] are dropped.
19//!
20//! ## Example
21//! ```rust
22//! use ringbuffer_spsc::ringbuffer;
23//!
24//! const N: usize = 8;
25//!
26//! // Create a ringbuffer with capacity 16 (must be power of two)
27//! let (mut writer, mut reader) = ringbuffer::<usize>(16);
28//! // Producer
29//! std::thread::spawn(move || for i in 0..N {
30//!     if writer.push(i).is_some() {
31//!         std::thread::yield_now();
32//!     }
33//! });
34//! // Consumer
35//! let mut i = 0;
36//! while i < N {
37//!     match reader.pull() {
38//!         Some(_) => i += 1,
39//!         None => std::thread::yield_now(),
40//!     }
41//! }
42//! ```
43#![no_std]
44extern crate alloc;
45
46use alloc::{boxed::Box, sync::Arc, vec::Vec};
47use core::{
48    mem::{self, MaybeUninit},
49    sync::atomic::{AtomicUsize, Ordering},
50};
51use crossbeam_utils::CachePadded;
52
53/// Create a new ringbuffer with a fixed capacity.
54///
55/// # Panics
56///
57/// Panics if *capacity* is not a power of two.
58///
59/// This requirement enables a fast bitmask wrap for indices which avoids the cost of `mod`
60/// in the hot path.
61///
62/// # Returns
63///
64/// A `(`[`RingBufferWriter<T>`]`, `[`RingBufferReader<T>`]`)` pair where the writer is
65/// intended for the single producer and the reader for the single consumer.
66pub fn ringbuffer<T>(capacity: usize) -> (RingBufferWriter<T>, RingBufferReader<T>) {
67    assert!(capacity.is_power_of_two(), "Capacity must be a power of 2");
68
69    // Inner container
70    let v = (0..capacity)
71        .map(|_| MaybeUninit::uninit())
72        .collect::<Vec<_>>()
73        .into_boxed_slice();
74
75    let rb = Arc::new(RingBuffer {
76        // Keep the pointer to the boxed slice
77        ptr: Box::into_raw(v),
78        // Since capacity is a power of two, capacity-1 is a mask covering N elements overflowing when N elements have been added.
79        // Indexes are left growing indefinitely and naturally wrap around once the index increment reaches usize::MAX.
80        mask: capacity - 1,
81        idx_r: CachePadded::new(AtomicUsize::new(0)),
82        idx_w: CachePadded::new(AtomicUsize::new(0)),
83    });
84    (
85        RingBufferWriter {
86            inner: rb.clone(),
87            cached_idx_r: 0,
88            local_idx_w: 0,
89        },
90        RingBufferReader {
91            inner: rb,
92            local_idx_r: 0,
93            cached_idx_w: 0,
94        },
95    )
96}
97
98/// Internal ringbuffer storage. This type is private to the crate.
99///
100/// It stores the raw boxed slice pointer and the atomic indices used for
101/// synchronization. The implementation uses monotonically increasing indices
102/// (wrapping on overflow) and a power-of-two mask to convert indices to
103/// positions inside the buffer.
104struct RingBuffer<T> {
105    ptr: *mut [MaybeUninit<T>],
106    mask: usize,
107    idx_r: CachePadded<AtomicUsize>,
108    idx_w: CachePadded<AtomicUsize>,
109}
110
111impl<T> RingBuffer<T> {
112    #[allow(clippy::mut_from_ref)]
113    unsafe fn get_unchecked_mut(&self, idx: usize) -> &mut MaybeUninit<T> {
114        // Safety: caller must ensure that `idx` is in a range that refers to
115        // an initialized slot when reading, or to a slot that may be written
116        // when writing. This helper performs unchecked indexing into the
117        // backing slice using the internal mask.
118        //
119        // We use raw pointer arithmetic here to avoid creating a `&mut` reference
120        // to the entire backing array, which would cause aliasing issues in Miri
121        // when both producer and consumer threads call this method concurrently.
122        // Instead, we create a reference only to the specific element we need.
123        unsafe {
124            let base = self.ptr as *mut MaybeUninit<T>;
125            &mut *base.add(idx & self.mask)
126        }
127    }
128}
129
130// The internal `RingBuffer` is stored inside an `Arc` and will be deallocated
131// when the last writer or reader handle is dropped (i.e., when the `Arc`
132// reference count reaches zero).
133impl<T> Drop for RingBuffer<T> {
134    fn drop(&mut self) {
135        let mut idx_r = self.idx_r.load(Ordering::Acquire);
136        let idx_w = self.idx_w.load(Ordering::Acquire);
137
138        while idx_r != idx_w {
139            // SAFETY: we are in Drop and we must clean up any elements still
140            // present in the buffer. Since only one producer and one
141            // consumer exist, and we're dropping the entire buffer, it is
142            // safe to assume we can take ownership of remaining initialized
143            // elements between `idx_r` and `idx_w`.
144            let t = unsafe {
145                mem::replace(self.get_unchecked_mut(idx_r), MaybeUninit::uninit()).assume_init()
146            };
147            mem::drop(t);
148            idx_r = idx_r.wrapping_add(1);
149        }
150
151        // At this point we've taken ownership of and dropped every
152        // initialized element that was still present in the buffer. It is
153        // important to drop all elements before freeing the backing storage
154        // so that each element's destructor runs exactly once. Converting
155        // the raw pointer back into a `Box` will free the allocation for
156        // the slice itself.
157        let ptr = unsafe { Box::from_raw(self.ptr) };
158        mem::drop(ptr);
159    }
160}
161
162/// Writer handle of the ringbuffer.
163pub struct RingBufferWriter<T> {
164    inner: Arc<RingBuffer<T>>,
165    cached_idx_r: usize,
166    local_idx_w: usize,
167}
168
169unsafe impl<T: Send> Send for RingBufferWriter<T> {}
170unsafe impl<T: Sync> Sync for RingBufferWriter<T> {}
171
172impl<T> RingBufferWriter<T> {
173    /// Returns the capacity (number of slots) of the ringbuffer.
174    pub fn capacity(&self) -> usize {
175        self.inner.ptr.len()
176    }
177
178    /// Push an element into the RingBuffer.
179    ///
180    /// Returns `Some(T)` when the buffer is full (giving back ownership of the value), otherwise returns `None` on success.
181    #[inline]
182    pub fn push(&mut self, t: T) -> Option<T> {
183        // Check if the ringbuffer is full.
184        if self.is_full() {
185            return Some(t);
186        }
187
188        // Insert the element in the ringbuffer
189        let _ = mem::replace(
190            unsafe { self.inner.get_unchecked_mut(self.local_idx_w) },
191            MaybeUninit::new(t),
192        );
193
194        // Let's increment the counter and let it grow indefinitely and potentially overflow resetting it to 0.
195        self.local_idx_w = self.local_idx_w.wrapping_add(1);
196        self.inner.idx_w.store(self.local_idx_w, Ordering::Release);
197
198        None
199    }
200
201    /// Check if the RingBuffer is full.
202    #[inline]
203    pub fn is_full(&mut self) -> bool {
204        // Check if the ringbuffer is potentially full.
205        // This happens when the difference between the write and read indexes equals
206        // the ringbuffer capacity. Note that the write and read indexes are left growing
207        // indefinitely, so we need to compute the difference by accounting for any eventual
208        // overflow. This requires wrapping the subtraction operation.
209        if self.local_idx_w.wrapping_sub(self.cached_idx_r) == self.inner.ptr.len() {
210            self.cached_idx_r = self.inner.idx_r.load(Ordering::Acquire);
211            // Check if the ringbuffer is really full
212            self.local_idx_w.wrapping_sub(self.cached_idx_r) == self.inner.ptr.len()
213        } else {
214            false
215        }
216    }
217}
218
219/// Reader handle of the ringbuffer.
220pub struct RingBufferReader<T> {
221    inner: Arc<RingBuffer<T>>,
222    local_idx_r: usize,
223    cached_idx_w: usize,
224}
225
226unsafe impl<T: Send> Send for RingBufferReader<T> {}
227unsafe impl<T: Sync> Sync for RingBufferReader<T> {}
228
229impl<T> RingBufferReader<T> {
230    /// Returns the capacity (number of slots) of the ringbuffer.
231    pub fn capacity(&self) -> usize {
232        self.inner.ptr.len()
233    }
234
235    /// Pull an element from the ringbuffer.
236    ///
237    /// Returns `Some(T)` if an element is available, otherwise `None` when the buffer is empty.
238    #[inline]
239    pub fn pull(&mut self) -> Option<T> {
240        // Check if the ringbuffer is potentially empty
241        if self.is_empty() {
242            return None;
243        }
244
245        // Remove the element from the ringbuffer
246        let t = unsafe {
247            mem::replace(
248                self.inner.get_unchecked_mut(self.local_idx_r),
249                MaybeUninit::uninit(),
250            )
251            .assume_init()
252        };
253        // Let's increment the counter and let it grow indefinitely
254        // and potentially overflow resetting it to 0.
255        self.local_idx_r = self.local_idx_r.wrapping_add(1);
256        self.inner.idx_r.store(self.local_idx_r, Ordering::Release);
257
258        Some(t)
259    }
260
261    /// Peek an element from the ringbuffer without pulling it out.
262    ///
263    /// Returns `Some(&T)` when at lease one element is present, or `None` when the buffer is empty.
264    #[inline]
265    pub fn peek(&mut self) -> Option<&T> {
266        // Check if the ringbuffer is potentially empty
267        if self.is_empty() {
268            return None;
269        }
270
271        Some(unsafe {
272            self.inner
273                .get_unchecked_mut(self.local_idx_r)
274                .assume_init_ref()
275        })
276    }
277
278    /// Peek a mutable element from the ringbuffer without pulling it out.
279    ///
280    /// Returns `Some(&mut T)` when at lease one element is present, or `None` when the buffer is empty.
281    #[inline]
282    pub fn peek_mut(&mut self) -> Option<&mut T> {
283        // Check if the ringbuffer is potentially empty
284        if self.is_empty() {
285            return None;
286        }
287
288        Some(unsafe {
289            self.inner
290                .get_unchecked_mut(self.local_idx_r)
291                .assume_init_mut()
292        })
293    }
294
295    /// Check if the ringbuffer is empty.
296    #[inline]
297    pub fn is_empty(&mut self) -> bool {
298        // Check if the ringbuffer is potentially empty
299        if self.local_idx_r == self.cached_idx_w {
300            // Update the write index
301            self.cached_idx_w = self.inner.idx_w.load(Ordering::Acquire);
302            // Check if the ringbuffer is really empty
303            self.local_idx_r == self.cached_idx_w
304        } else {
305            false
306        }
307    }
308}