ringbuffer_spsc/lib.rs
1//! A fast, small single-producer single-consumer (SPSC) ringbuffer designed
2//! for low-latency and high-throughput exchange between a single writer and
3//! a single reader. This crate is `#![no_std]` and uses `alloc` internally; it provides a
4//! minimal, ergonomic API that works well from `no_std` contexts that supply
5//! an allocator as well as from normal `std` programs and examples.
6//!
7//! Important design points:
8//! - The ringbuffer capacity is specified at runtime via the [`ringbuffer`] constructor
9//! and **must be a power of two**. The implementation uses a bitmask to wrap
10//! indices which is much faster than a modulo operation and reduces runtime
11//! overhead in hot paths.
12//! - The API is non-blocking: [`RingBufferWriter::push`] returns `Some(T)` immediately when the
13//! buffer is full (giving back ownership of the value), and [`RingBufferReader::pull`] returns
14//! `None` immediately when the buffer is empty. Typical usage is to yield or
15//! retry in those cases.
16//!
17//! *NOTE:* elements remaining in the buffer are dropped when the internal storage is deallocated.
18//! This happens when both [`RingBufferReader`] and [`RingBufferWriter`] are dropped.
19//!
20//! ## Example
21//! ```rust
22//! use ringbuffer_spsc::ringbuffer;
23//!
24//! const N: usize = 8;
25//!
26//! // Create a ringbuffer with capacity 16 (must be power of two)
27//! let (mut writer, mut reader) = ringbuffer::<usize>(16);
28//! // Producer
29//! std::thread::spawn(move || for i in 0..N {
30//! if writer.push(i).is_some() {
31//! std::thread::yield_now();
32//! }
33//! });
34//! // Consumer
35//! let mut i = 0;
36//! while i < N {
37//! match reader.pull() {
38//! Some(_) => i += 1,
39//! None => std::thread::yield_now(),
40//! }
41//! }
42//! ```
43#![no_std]
44extern crate alloc;
45
46use alloc::{boxed::Box, sync::Arc, vec::Vec};
47use core::{
48 mem::{self, MaybeUninit},
49 sync::atomic::{AtomicUsize, Ordering},
50};
51use crossbeam_utils::CachePadded;
52
53/// Create a new ringbuffer with a fixed capacity.
54///
55/// # Panics
56///
57/// Panics if *capacity* is not a power of two.
58///
59/// This requirement enables a fast bitmask wrap for indices which avoids the cost of `mod`
60/// in the hot path.
61///
62/// # Returns
63///
64/// A `(`[`RingBufferWriter<T>`]`, `[`RingBufferReader<T>`]`)` pair where the writer is
65/// intended for the single producer and the reader for the single consumer.
66pub fn ringbuffer<T>(capacity: usize) -> (RingBufferWriter<T>, RingBufferReader<T>) {
67 assert!(capacity.is_power_of_two(), "Capacity must be a power of 2");
68
69 // Inner container
70 let v = (0..capacity)
71 .map(|_| MaybeUninit::uninit())
72 .collect::<Vec<_>>()
73 .into_boxed_slice();
74
75 let rb = Arc::new(RingBuffer {
76 // Keep the pointer to the boxed slice
77 ptr: Box::into_raw(v),
78 // Since capacity is a power of two, capacity-1 is a mask covering N elements overflowing when N elements have been added.
79 // Indexes are left growing indefinitely and naturally wrap around once the index increment reaches usize::MAX.
80 mask: capacity - 1,
81 idx_r: CachePadded::new(AtomicUsize::new(0)),
82 idx_w: CachePadded::new(AtomicUsize::new(0)),
83 });
84 (
85 RingBufferWriter {
86 inner: rb.clone(),
87 cached_idx_r: 0,
88 local_idx_w: 0,
89 },
90 RingBufferReader {
91 inner: rb,
92 local_idx_r: 0,
93 cached_idx_w: 0,
94 },
95 )
96}
97
98/// Internal ringbuffer storage. This type is private to the crate.
99///
100/// It stores the raw boxed slice pointer and the atomic indices used for
101/// synchronization. The implementation uses monotonically increasing indices
102/// (wrapping on overflow) and a power-of-two mask to convert indices to
103/// positions inside the buffer.
104struct RingBuffer<T> {
105 ptr: *mut [MaybeUninit<T>],
106 mask: usize,
107 idx_r: CachePadded<AtomicUsize>,
108 idx_w: CachePadded<AtomicUsize>,
109}
110
111impl<T> RingBuffer<T> {
112 #[allow(clippy::mut_from_ref)]
113 unsafe fn get_unchecked_mut(&self, idx: usize) -> &mut MaybeUninit<T> {
114 // Safety: caller must ensure that `idx` is in a range that refers to
115 // an initialized slot when reading, or to a slot that may be written
116 // when writing. This helper performs unchecked indexing into the
117 // backing slice using the internal mask.
118 //
119 // We use raw pointer arithmetic here to avoid creating a `&mut` reference
120 // to the entire backing array, which would cause aliasing issues in Miri
121 // when both producer and consumer threads call this method concurrently.
122 // Instead, we create a reference only to the specific element we need.
123 unsafe {
124 let base = self.ptr as *mut MaybeUninit<T>;
125 &mut *base.add(idx & self.mask)
126 }
127 }
128}
129
130// The internal `RingBuffer` is stored inside an `Arc` and will be deallocated
131// when the last writer or reader handle is dropped (i.e., when the `Arc`
132// reference count reaches zero).
133impl<T> Drop for RingBuffer<T> {
134 fn drop(&mut self) {
135 let mut idx_r = self.idx_r.load(Ordering::Acquire);
136 let idx_w = self.idx_w.load(Ordering::Acquire);
137
138 while idx_r != idx_w {
139 // SAFETY: we are in Drop and we must clean up any elements still
140 // present in the buffer. Since only one producer and one
141 // consumer exist, and we're dropping the entire buffer, it is
142 // safe to assume we can take ownership of remaining initialized
143 // elements between `idx_r` and `idx_w`.
144 let t = unsafe {
145 mem::replace(self.get_unchecked_mut(idx_r), MaybeUninit::uninit()).assume_init()
146 };
147 mem::drop(t);
148 idx_r = idx_r.wrapping_add(1);
149 }
150
151 // At this point we've taken ownership of and dropped every
152 // initialized element that was still present in the buffer. It is
153 // important to drop all elements before freeing the backing storage
154 // so that each element's destructor runs exactly once. Converting
155 // the raw pointer back into a `Box` will free the allocation for
156 // the slice itself.
157 let ptr = unsafe { Box::from_raw(self.ptr) };
158 mem::drop(ptr);
159 }
160}
161
162/// Writer handle of the ringbuffer.
163pub struct RingBufferWriter<T> {
164 inner: Arc<RingBuffer<T>>,
165 cached_idx_r: usize,
166 local_idx_w: usize,
167}
168
169unsafe impl<T: Send> Send for RingBufferWriter<T> {}
170unsafe impl<T: Sync> Sync for RingBufferWriter<T> {}
171
172impl<T> RingBufferWriter<T> {
173 /// Returns the capacity (number of slots) of the ringbuffer.
174 pub fn capacity(&self) -> usize {
175 self.inner.ptr.len()
176 }
177
178 /// Push an element into the RingBuffer.
179 ///
180 /// Returns `Some(T)` when the buffer is full (giving back ownership of the value), otherwise returns `None` on success.
181 #[inline]
182 pub fn push(&mut self, t: T) -> Option<T> {
183 // Check if the ringbuffer is full.
184 if self.is_full() {
185 return Some(t);
186 }
187
188 // Insert the element in the ringbuffer
189 let _ = mem::replace(
190 unsafe { self.inner.get_unchecked_mut(self.local_idx_w) },
191 MaybeUninit::new(t),
192 );
193
194 // Let's increment the counter and let it grow indefinitely and potentially overflow resetting it to 0.
195 self.local_idx_w = self.local_idx_w.wrapping_add(1);
196 self.inner.idx_w.store(self.local_idx_w, Ordering::Release);
197
198 None
199 }
200
201 /// Check if the RingBuffer is full.
202 #[inline]
203 pub fn is_full(&mut self) -> bool {
204 // Check if the ringbuffer is potentially full.
205 // This happens when the difference between the write and read indexes equals
206 // the ringbuffer capacity. Note that the write and read indexes are left growing
207 // indefinitely, so we need to compute the difference by accounting for any eventual
208 // overflow. This requires wrapping the subtraction operation.
209 if self.local_idx_w.wrapping_sub(self.cached_idx_r) == self.inner.ptr.len() {
210 self.cached_idx_r = self.inner.idx_r.load(Ordering::Acquire);
211 // Check if the ringbuffer is really full
212 self.local_idx_w.wrapping_sub(self.cached_idx_r) == self.inner.ptr.len()
213 } else {
214 false
215 }
216 }
217}
218
219/// Reader handle of the ringbuffer.
220pub struct RingBufferReader<T> {
221 inner: Arc<RingBuffer<T>>,
222 local_idx_r: usize,
223 cached_idx_w: usize,
224}
225
226unsafe impl<T: Send> Send for RingBufferReader<T> {}
227unsafe impl<T: Sync> Sync for RingBufferReader<T> {}
228
229impl<T> RingBufferReader<T> {
230 /// Returns the capacity (number of slots) of the ringbuffer.
231 pub fn capacity(&self) -> usize {
232 self.inner.ptr.len()
233 }
234
235 /// Pull an element from the ringbuffer.
236 ///
237 /// Returns `Some(T)` if an element is available, otherwise `None` when the buffer is empty.
238 #[inline]
239 pub fn pull(&mut self) -> Option<T> {
240 // Check if the ringbuffer is potentially empty
241 if self.is_empty() {
242 return None;
243 }
244
245 // Remove the element from the ringbuffer
246 let t = unsafe {
247 mem::replace(
248 self.inner.get_unchecked_mut(self.local_idx_r),
249 MaybeUninit::uninit(),
250 )
251 .assume_init()
252 };
253 // Let's increment the counter and let it grow indefinitely
254 // and potentially overflow resetting it to 0.
255 self.local_idx_r = self.local_idx_r.wrapping_add(1);
256 self.inner.idx_r.store(self.local_idx_r, Ordering::Release);
257
258 Some(t)
259 }
260
261 /// Peek an element from the ringbuffer without pulling it out.
262 ///
263 /// Returns `Some(&T)` when at lease one element is present, or `None` when the buffer is empty.
264 #[inline]
265 pub fn peek(&mut self) -> Option<&T> {
266 // Check if the ringbuffer is potentially empty
267 if self.is_empty() {
268 return None;
269 }
270
271 Some(unsafe {
272 self.inner
273 .get_unchecked_mut(self.local_idx_r)
274 .assume_init_ref()
275 })
276 }
277
278 /// Peek a mutable element from the ringbuffer without pulling it out.
279 ///
280 /// Returns `Some(&mut T)` when at lease one element is present, or `None` when the buffer is empty.
281 #[inline]
282 pub fn peek_mut(&mut self) -> Option<&mut T> {
283 // Check if the ringbuffer is potentially empty
284 if self.is_empty() {
285 return None;
286 }
287
288 Some(unsafe {
289 self.inner
290 .get_unchecked_mut(self.local_idx_r)
291 .assume_init_mut()
292 })
293 }
294
295 /// Check if the ringbuffer is empty.
296 #[inline]
297 pub fn is_empty(&mut self) -> bool {
298 // Check if the ringbuffer is potentially empty
299 if self.local_idx_r == self.cached_idx_w {
300 // Update the write index
301 self.cached_idx_w = self.inner.idx_w.load(Ordering::Acquire);
302 // Check if the ringbuffer is really empty
303 self.local_idx_r == self.cached_idx_w
304 } else {
305 false
306 }
307 }
308}