ringbuffer_spsc/lib.rs
1//! A fast, small single-producer single-consumer (SPSC) ringbuffer designed
2//! for low-latency and high-throughput exchange between a single writer and
3//! a single reader. This crate is `#![no_std]` and uses `alloc` internally; it provides a
4//! minimal, ergonomic API that works well from `no_std` contexts that supply
5//! an allocator as well as from normal `std` programs and examples.
6//!
7//! Important design points:
8//! - The ringbuffer capacity is specified at runtime via the [`ringbuffer`] constructor
9//! and **must be a power of two**. The implementation uses a bitmask to wrap
10//! indices which is much faster than a modulo operation and reduces runtime
11//! overhead in hot paths.
12//! - The API is non-blocking: [`RingBufferWriter::push`] returns `Some(T)` immediately when the
13//! buffer is full (giving back ownership of the value), and [`RingBufferReader::pull`] returns
14//! `None` immediately when the buffer is empty. Typical usage is to yield or
15//! retry in those cases.
16//!
17//! *NOTE:* elements remaining in the buffer are dropped when the internal storage is deallocated.
18//! This happens when both [`RingBufferReader`] and [`RingBufferWriter`] are dropped.
19//!
20//! ## Example
21//! ```rust
22//! use ringbuffer_spsc::ringbuffer;
23//!
24//! const N: usize = 8;
25//!
26//! // Create a ringbuffer with capacity 16 (must be power of two)
27//! let (mut writer, mut reader) = ringbuffer::<usize>(16);
28//! // Producer
29//! std::thread::spawn(move || for i in 0..N {
30//! if writer.push(i).is_some() {
31//! std::thread::yield_now();
32//! }
33//! });
34//! // Consumer
35//! let mut i = 0;
36//! while i < N {
37//! match reader.pull() {
38//! Some(_) => i += 1,
39//! None => std::thread::yield_now(),
40//! }
41//! }
42//! ```
43#![no_std]
44extern crate alloc;
45
46use alloc::{boxed::Box, sync::Arc, vec::Vec};
47use core::{
48 mem::{self, MaybeUninit},
49 sync::atomic::{AtomicUsize, Ordering},
50};
51use crossbeam_utils::CachePadded;
52
53/// Create a new ringbuffer with a fixed capacity.
54///
55/// # Panics
56///
57/// Panics if *capacity* is not a power of two.
58///
59/// This requirement enables a fast bitmask wrap for indices which avoids the cost of `mod`
60/// in the hot path.
61///
62/// # Returns
63///
64/// A `(`[`RingBufferWriter<T>`]`, `[`RingBufferReader<T>`]`)` pair where the writer is
65/// intended for the single producer and the reader for the single consumer.
66pub fn ringbuffer<T>(capacity: usize) -> (RingBufferWriter<T>, RingBufferReader<T>) {
67 assert!(capacity.is_power_of_two(), "Capacity must be a power of 2");
68
69 // Inner container
70 let v = (0..capacity)
71 .map(|_| MaybeUninit::uninit())
72 .collect::<Vec<_>>()
73 .into_boxed_slice();
74
75 let rb = Arc::new(RingBuffer {
76 // Keep the pointer to the boxed slice
77 ptr: Box::into_raw(v),
78 // Since capacity is a power of two, capacity-1 is a mask covering N elements overflowing when N elements have been added.
79 // Indexes are left growing indefinitely and naturally wrap around once the index increment reaches usize::MAX.
80 mask: capacity - 1,
81 idx_r: CachePadded::new(AtomicUsize::new(0)),
82 idx_w: CachePadded::new(AtomicUsize::new(0)),
83 });
84 (
85 RingBufferWriter {
86 inner: rb.clone(),
87 cached_idx_r: 0,
88 local_idx_w: 0,
89 },
90 RingBufferReader {
91 inner: rb,
92 local_idx_r: 0,
93 cached_idx_w: 0,
94 },
95 )
96}
97
98/// Internal ringbuffer storage. This type is private to the crate.
99///
100/// It stores the raw boxed slice pointer and the atomic indices used for
101/// synchronization. The implementation uses monotonically increasing indices
102/// (wrapping on overflow) and a power-of-two mask to convert indices to
103/// positions inside the buffer.
104struct RingBuffer<T> {
105 ptr: *mut [MaybeUninit<T>],
106 mask: usize,
107 idx_r: CachePadded<AtomicUsize>,
108 idx_w: CachePadded<AtomicUsize>,
109}
110
111impl<T> RingBuffer<T> {
112 #[allow(clippy::mut_from_ref)]
113 unsafe fn get_unchecked_mut(&self, idx: usize) -> &mut MaybeUninit<T> {
114 // Safety: caller must ensure that `idx` is in a range that refers to
115 // an initialized slot when reading, or to a slot that may be written
116 // when writing. This helper performs unchecked indexing into the
117 // backing slice using the internal mask.
118 unsafe { (&mut (*self.ptr)).get_unchecked_mut(idx & self.mask) }
119 }
120}
121
122// The internal `RingBuffer` is stored inside an `Arc` and will be deallocated
123// when the last writer or reader handle is dropped (i.e., when the `Arc`
124// reference count reaches zero).
125impl<T> Drop for RingBuffer<T> {
126 fn drop(&mut self) {
127 let mut idx_r = self.idx_r.load(Ordering::Acquire);
128 let idx_w = self.idx_w.load(Ordering::Acquire);
129
130 while idx_r != idx_w {
131 // SAFETY: we are in Drop and we must clean up any elements still
132 // present in the buffer. Since only one producer and one
133 // consumer exist, and we're dropping the entire buffer, it is
134 // safe to assume we can take ownership of remaining initialized
135 // elements between `idx_r` and `idx_w`.
136 let t = unsafe {
137 mem::replace(self.get_unchecked_mut(idx_r), MaybeUninit::uninit()).assume_init()
138 };
139 mem::drop(t);
140 idx_r = idx_r.wrapping_add(1);
141 }
142
143 // At this point we've taken ownership of and dropped every
144 // initialized element that was still present in the buffer. It is
145 // important to drop all elements before freeing the backing storage
146 // so that each element's destructor runs exactly once. Converting
147 // the raw pointer back into a `Box` will free the allocation for
148 // the slice itself.
149 let ptr = unsafe { Box::from_raw(self.ptr) };
150 mem::drop(ptr);
151 }
152}
153
154/// Writer handle of the ringbuffer.
155pub struct RingBufferWriter<T> {
156 inner: Arc<RingBuffer<T>>,
157 cached_idx_r: usize,
158 local_idx_w: usize,
159}
160
161unsafe impl<T: Send> Send for RingBufferWriter<T> {}
162unsafe impl<T: Sync> Sync for RingBufferWriter<T> {}
163
164impl<T> RingBufferWriter<T> {
165 /// Returns the capacity (number of slots) of the ringbuffer.
166 pub fn capacity(&self) -> usize {
167 self.inner.ptr.len()
168 }
169
170 /// Push an element into the RingBuffer.
171 ///
172 /// Returns `Some(T)` when the buffer is full (giving back ownership of the value), otherwise returns `None` on success.
173 #[inline]
174 pub fn push(&mut self, t: T) -> Option<T> {
175 // Check if the ringbuffer is full.
176 if self.is_full() {
177 return Some(t);
178 }
179
180 // Insert the element in the ringbuffer
181 let _ = mem::replace(
182 unsafe { self.inner.get_unchecked_mut(self.local_idx_w) },
183 MaybeUninit::new(t),
184 );
185
186 // Let's increment the counter and let it grow indefinitely and potentially overflow resetting it to 0.
187 self.local_idx_w = self.local_idx_w.wrapping_add(1);
188 self.inner.idx_w.store(self.local_idx_w, Ordering::Release);
189
190 None
191 }
192
193 /// Check if the RingBuffer is full.
194 #[inline]
195 pub fn is_full(&mut self) -> bool {
196 // Check if the ringbuffer is potentially full.
197 // This happens when the difference between the write and read indexes equals
198 // the ringbuffer capacity. Note that the write and read indexes are left growing
199 // indefinitely, so we need to compute the difference by accounting for any eventual
200 // overflow. This requires wrapping the subtraction operation.
201 if self.local_idx_w.wrapping_sub(self.cached_idx_r) == self.inner.ptr.len() {
202 self.cached_idx_r = self.inner.idx_r.load(Ordering::Acquire);
203 // Check if the ringbuffer is really full
204 self.local_idx_w.wrapping_sub(self.cached_idx_r) == self.inner.ptr.len()
205 } else {
206 false
207 }
208 }
209}
210
211/// Reader handle of the ringbuffer.
212pub struct RingBufferReader<T> {
213 inner: Arc<RingBuffer<T>>,
214 local_idx_r: usize,
215 cached_idx_w: usize,
216}
217
218unsafe impl<T: Send> Send for RingBufferReader<T> {}
219unsafe impl<T: Sync> Sync for RingBufferReader<T> {}
220
221impl<T> RingBufferReader<T> {
222 /// Returns the capacity (number of slots) of the ringbuffer.
223 pub fn capacity(&self) -> usize {
224 self.inner.ptr.len()
225 }
226
227 /// Pull an element from the ringbuffer.
228 ///
229 /// Returns `Some(T)` if an element is available, otherwise `None` when the buffer is empty.
230 #[inline]
231 pub fn pull(&mut self) -> Option<T> {
232 // Check if the ringbuffer is potentially empty
233 if self.is_empty() {
234 return None;
235 }
236
237 // Remove the element from the ringbuffer
238 let t = unsafe {
239 mem::replace(
240 self.inner.get_unchecked_mut(self.local_idx_r),
241 MaybeUninit::uninit(),
242 )
243 .assume_init()
244 };
245 // Let's increment the counter and let it grow indefinitely
246 // and potentially overflow resetting it to 0.
247 self.local_idx_r = self.local_idx_r.wrapping_add(1);
248 self.inner.idx_r.store(self.local_idx_r, Ordering::Release);
249
250 Some(t)
251 }
252
253 /// Peek an element from the ringbuffer without pulling it out.
254 ///
255 /// Returns `Some(&T)` when at lease one element is present, or `None` when the buffer is empty.
256 #[inline]
257 pub fn peek(&mut self) -> Option<&T> {
258 // Check if the ringbuffer is potentially empty
259 if self.is_empty() {
260 return None;
261 }
262
263 Some(unsafe {
264 self.inner
265 .get_unchecked_mut(self.local_idx_r)
266 .assume_init_ref()
267 })
268 }
269
270 /// Peek a mutable element from the ringbuffer without pulling it out.
271 ///
272 /// Returns `Some(&mut T)` when at lease one element is present, or `None` when the buffer is empty.
273 #[inline]
274 pub fn peek_mut(&mut self) -> Option<&mut T> {
275 // Check if the ringbuffer is potentially empty
276 if self.is_empty() {
277 return None;
278 }
279
280 Some(unsafe {
281 self.inner
282 .get_unchecked_mut(self.local_idx_r)
283 .assume_init_mut()
284 })
285 }
286
287 /// Check if the ringbuffer is empty.
288 #[inline]
289 pub fn is_empty(&mut self) -> bool {
290 // Check if the ringbuffer is potentially empty
291 if self.local_idx_r == self.cached_idx_w {
292 // Update the write index
293 self.cached_idx_w = self.inner.idx_w.load(Ordering::Acquire);
294 // Check if the ringbuffer is really empty
295 self.local_idx_r == self.cached_idx_w
296 } else {
297 false
298 }
299 }
300}