ringbuffer_spsc/
lib.rs

1//! A fast, small single-producer single-consumer (SPSC) ringbuffer designed
2//! for low-latency and high-throughput exchange between a single writer and
3//! a single reader. This crate is `#![no_std]` and uses `alloc` internally; it provides a
4//! minimal, ergonomic API that works well from `no_std` contexts that supply
5//! an allocator as well as from normal `std` programs and examples.
6//!
7//! Important design points:
8//! - The ringbuffer capacity is specified at runtime via the [`ringbuffer`] constructor
9//!   and **must be a power of two**. The implementation uses a bitmask to wrap
10//!   indices which is much faster than a modulo operation and reduces runtime
11//!   overhead in hot paths.
12//! - The API is non-blocking: [`RingBufferWriter::push`] returns `Some(T)` immediately when the
13//!   buffer is full (giving back ownership of the value), and [`RingBufferReader::pull`] returns
14//!   `None` immediately when the buffer is empty. Typical usage is to yield or
15//!   retry in those cases.
16//!
17//! *NOTE:* elements remaining in the buffer are dropped when the internal storage is deallocated.
18//! This happens when both [`RingBufferReader`] and [`RingBufferWriter`] are dropped.
19//!
20//! ## Example
21//! ```rust
22//! use ringbuffer_spsc::ringbuffer;
23//!
24//! const N: usize = 8;
25//!
26//! // Create a ringbuffer with capacity 16 (must be power of two)
27//! let (mut writer, mut reader) = ringbuffer::<usize>(16);
28//! // Producer
29//! std::thread::spawn(move || for i in 0..N {
30//!     if writer.push(i).is_some() {
31//!         std::thread::yield_now();
32//!     }
33//! });
34//! // Consumer
35//! let mut i = 0;
36//! while i < N {
37//!     match reader.pull() {
38//!         Some(_) => i += 1,
39//!         None => std::thread::yield_now(),
40//!     }
41//! }
42//! ```
43#![no_std]
44extern crate alloc;
45
46use alloc::{boxed::Box, sync::Arc, vec::Vec};
47use core::{
48    mem::{self, MaybeUninit},
49    sync::atomic::{AtomicUsize, Ordering},
50};
51use crossbeam_utils::CachePadded;
52
53/// Create a new ringbuffer with a fixed capacity.
54///
55/// # Panics
56///
57/// Panics if *capacity* is not a power of two.
58///
59/// This requirement enables a fast bitmask wrap for indices which avoids the cost of `mod`
60/// in the hot path.
61///
62/// # Returns
63///
64/// A `(`[`RingBufferWriter<T>`]`, `[`RingBufferReader<T>`]`)` pair where the writer is
65/// intended for the single producer and the reader for the single consumer.
66pub fn ringbuffer<T>(capacity: usize) -> (RingBufferWriter<T>, RingBufferReader<T>) {
67    assert!(capacity.is_power_of_two(), "Capacity must be a power of 2");
68
69    // Inner container
70    let v = (0..capacity)
71        .map(|_| MaybeUninit::uninit())
72        .collect::<Vec<_>>()
73        .into_boxed_slice();
74
75    let rb = Arc::new(RingBuffer {
76        // Keep the pointer to the boxed slice
77        ptr: Box::into_raw(v),
78        // Since capacity is a power of two, capacity-1 is a mask covering N elements overflowing when N elements have been added.
79        // Indexes are left growing indefinitely and naturally wrap around once the index increment reaches usize::MAX.
80        mask: capacity - 1,
81        idx_r: CachePadded::new(AtomicUsize::new(0)),
82        idx_w: CachePadded::new(AtomicUsize::new(0)),
83    });
84    (
85        RingBufferWriter {
86            inner: rb.clone(),
87            cached_idx_r: 0,
88            local_idx_w: 0,
89        },
90        RingBufferReader {
91            inner: rb,
92            local_idx_r: 0,
93            cached_idx_w: 0,
94        },
95    )
96}
97
98/// Internal ringbuffer storage. This type is private to the crate.
99///
100/// It stores the raw boxed slice pointer and the atomic indices used for
101/// synchronization. The implementation uses monotonically increasing indices
102/// (wrapping on overflow) and a power-of-two mask to convert indices to
103/// positions inside the buffer.
104struct RingBuffer<T> {
105    ptr: *mut [MaybeUninit<T>],
106    mask: usize,
107    idx_r: CachePadded<AtomicUsize>,
108    idx_w: CachePadded<AtomicUsize>,
109}
110
111impl<T> RingBuffer<T> {
112    #[allow(clippy::mut_from_ref)]
113    unsafe fn get_unchecked_mut(&self, idx: usize) -> &mut MaybeUninit<T> {
114        // Safety: caller must ensure that `idx` is in a range that refers to
115        // an initialized slot when reading, or to a slot that may be written
116        // when writing. This helper performs unchecked indexing into the
117        // backing slice using the internal mask.
118        unsafe { (&mut (*self.ptr)).get_unchecked_mut(idx & self.mask) }
119    }
120}
121
122// The internal `RingBuffer` is stored inside an `Arc` and will be deallocated
123// when the last writer or reader handle is dropped (i.e., when the `Arc`
124// reference count reaches zero).
125impl<T> Drop for RingBuffer<T> {
126    fn drop(&mut self) {
127        let mut idx_r = self.idx_r.load(Ordering::Acquire);
128        let idx_w = self.idx_w.load(Ordering::Acquire);
129
130        while idx_r != idx_w {
131            // SAFETY: we are in Drop and we must clean up any elements still
132            // present in the buffer. Since only one producer and one
133            // consumer exist, and we're dropping the entire buffer, it is
134            // safe to assume we can take ownership of remaining initialized
135            // elements between `idx_r` and `idx_w`.
136            let t = unsafe {
137                mem::replace(self.get_unchecked_mut(idx_r), MaybeUninit::uninit()).assume_init()
138            };
139            mem::drop(t);
140            idx_r = idx_r.wrapping_add(1);
141        }
142
143        // At this point we've taken ownership of and dropped every
144        // initialized element that was still present in the buffer. It is
145        // important to drop all elements before freeing the backing storage
146        // so that each element's destructor runs exactly once. Converting
147        // the raw pointer back into a `Box` will free the allocation for
148        // the slice itself.
149        let ptr = unsafe { Box::from_raw(self.ptr) };
150        mem::drop(ptr);
151    }
152}
153
154/// Writer handle of the ringbuffer.
155pub struct RingBufferWriter<T> {
156    inner: Arc<RingBuffer<T>>,
157    cached_idx_r: usize,
158    local_idx_w: usize,
159}
160
161unsafe impl<T: Send> Send for RingBufferWriter<T> {}
162unsafe impl<T: Sync> Sync for RingBufferWriter<T> {}
163
164impl<T> RingBufferWriter<T> {
165    /// Returns the capacity (number of slots) of the ringbuffer.
166    pub fn capacity(&self) -> usize {
167        self.inner.ptr.len()
168    }
169
170    /// Push an element into the RingBuffer.
171    ///
172    /// Returns `Some(T)` when the buffer is full (giving back ownership of the value), otherwise returns `None` on success.
173    #[inline]
174    pub fn push(&mut self, t: T) -> Option<T> {
175        // Check if the ringbuffer is full.
176        if self.is_full() {
177            return Some(t);
178        }
179
180        // Insert the element in the ringbuffer
181        let _ = mem::replace(
182            unsafe { self.inner.get_unchecked_mut(self.local_idx_w) },
183            MaybeUninit::new(t),
184        );
185
186        // Let's increment the counter and let it grow indefinitely and potentially overflow resetting it to 0.
187        self.local_idx_w = self.local_idx_w.wrapping_add(1);
188        self.inner.idx_w.store(self.local_idx_w, Ordering::Release);
189
190        None
191    }
192
193    /// Check if the RingBuffer is full.
194    #[inline]
195    pub fn is_full(&mut self) -> bool {
196        // Check if the ringbuffer is potentially full.
197        // This happens when the difference between the write and read indexes equals
198        // the ringbuffer capacity. Note that the write and read indexes are left growing
199        // indefinitely, so we need to compute the difference by accounting for any eventual
200        // overflow. This requires wrapping the subtraction operation.
201        if self.local_idx_w.wrapping_sub(self.cached_idx_r) == self.inner.ptr.len() {
202            self.cached_idx_r = self.inner.idx_r.load(Ordering::Acquire);
203            // Check if the ringbuffer is really full
204            self.local_idx_w.wrapping_sub(self.cached_idx_r) == self.inner.ptr.len()
205        } else {
206            false
207        }
208    }
209}
210
211/// Reader handle of the ringbuffer.
212pub struct RingBufferReader<T> {
213    inner: Arc<RingBuffer<T>>,
214    local_idx_r: usize,
215    cached_idx_w: usize,
216}
217
218unsafe impl<T: Send> Send for RingBufferReader<T> {}
219unsafe impl<T: Sync> Sync for RingBufferReader<T> {}
220
221impl<T> RingBufferReader<T> {
222    /// Returns the capacity (number of slots) of the ringbuffer.
223    pub fn capacity(&self) -> usize {
224        self.inner.ptr.len()
225    }
226
227    /// Pull an element from the ringbuffer.
228    ///
229    /// Returns `Some(T)` if an element is available, otherwise `None` when the buffer is empty.
230    #[inline]
231    pub fn pull(&mut self) -> Option<T> {
232        // Check if the ringbuffer is potentially empty
233        if self.is_empty() {
234            return None;
235        }
236
237        // Remove the element from the ringbuffer
238        let t = unsafe {
239            mem::replace(
240                self.inner.get_unchecked_mut(self.local_idx_r),
241                MaybeUninit::uninit(),
242            )
243            .assume_init()
244        };
245        // Let's increment the counter and let it grow indefinitely
246        // and potentially overflow resetting it to 0.
247        self.local_idx_r = self.local_idx_r.wrapping_add(1);
248        self.inner.idx_r.store(self.local_idx_r, Ordering::Release);
249
250        Some(t)
251    }
252
253    /// Peek an element from the ringbuffer without pulling it out.
254    ///
255    /// Returns `Some(&T)` when at lease one element is present, or `None` when the buffer is empty.
256    #[inline]
257    pub fn peek(&mut self) -> Option<&T> {
258        // Check if the ringbuffer is potentially empty
259        if self.is_empty() {
260            return None;
261        }
262
263        Some(unsafe {
264            self.inner
265                .get_unchecked_mut(self.local_idx_r)
266                .assume_init_ref()
267        })
268    }
269
270    /// Peek a mutable element from the ringbuffer without pulling it out.
271    ///
272    /// Returns `Some(&mut T)` when at lease one element is present, or `None` when the buffer is empty.
273    #[inline]
274    pub fn peek_mut(&mut self) -> Option<&mut T> {
275        // Check if the ringbuffer is potentially empty
276        if self.is_empty() {
277            return None;
278        }
279
280        Some(unsafe {
281            self.inner
282                .get_unchecked_mut(self.local_idx_r)
283                .assume_init_mut()
284        })
285    }
286
287    /// Check if the ringbuffer is empty.
288    #[inline]
289    pub fn is_empty(&mut self) -> bool {
290        // Check if the ringbuffer is potentially empty
291        if self.local_idx_r == self.cached_idx_w {
292            // Update the write index
293            self.cached_idx_w = self.inner.idx_w.load(Ordering::Acquire);
294            // Check if the ringbuffer is really empty
295            self.local_idx_r == self.cached_idx_w
296        } else {
297            false
298        }
299    }
300}