ringbuffer_spsc/lib.rs
1//! A fast thread-safe `no_std` single-producer single-consumer ring buffer.
2//! For performance reasons, the capacity of the buffer is determined
3//! at compile time via a const generic and it is required to be a
4//! power of two for a more efficient index handling.
5//!
6//! # Example
7//! ```
8//! use ringbuffer_spsc::RingBuffer;
9//!
10//! const N: usize = 1_000_000;
11//! let (mut tx, mut rx) = RingBuffer::<usize, 16>::init();
12//!
13//! let p = std::thread::spawn(move || {
14//! let mut current: usize = 0;
15//! while current < N {
16//! if tx.push(current).is_none() {
17//! current = current.wrapping_add(1);
18//! } else {
19//! std::thread::yield_now();
20//! }
21//! }
22//! });
23//!
24//! let c = std::thread::spawn(move || {
25//! let mut current: usize = 0;
26//! while current < N {
27//! if let Some(c) = rx.pull() {
28//! assert_eq!(c, current);
29//! current = current.wrapping_add(1);
30//! } else {
31//! std::thread::yield_now();
32//! }
33//! }
34//! });
35//!
36//! p.join().unwrap();
37//! c.join().unwrap();
38//! ```
39#![no_std]
40extern crate alloc;
41
42use alloc::sync::Arc;
43use core::{
44 cell::UnsafeCell,
45 mem::{self, MaybeUninit},
46 sync::atomic::{AtomicUsize, Ordering},
47};
48use crossbeam::utils::CachePadded;
49
50pub struct RingBuffer<T, const N: usize> {
51 buffer: UnsafeCell<[MaybeUninit<T>; N]>,
52 idx_r: CachePadded<AtomicUsize>,
53 idx_w: CachePadded<AtomicUsize>,
54}
55
56impl<T, const N: usize> RingBuffer<T, N> {
57 #[allow(clippy::new_ret_no_self)]
58 #[deprecated(since = "0.1.8", note = "please use `init()` instead.")]
59 pub fn new() -> (RingBufferWriter<T, N>, RingBufferReader<T, N>) {
60 Self::init()
61 }
62
63 /// Initialize the RingBuffer with the given capacity
64 pub fn init() -> (RingBufferWriter<T, N>, RingBufferReader<T, N>) {
65 assert!(
66 N.is_power_of_two(),
67 "RingBuffer requires the capacity to be a power of 2. {N} is not."
68 );
69 let rb = Arc::new(RingBuffer {
70 buffer: UnsafeCell::new(array_init::array_init(|_| MaybeUninit::uninit())),
71 idx_r: CachePadded::new(AtomicUsize::new(0)),
72 idx_w: CachePadded::new(AtomicUsize::new(0)),
73 });
74 (
75 RingBufferWriter {
76 inner: rb.clone(),
77 cached_idx_r: 0,
78 local_idx_w: 0,
79 },
80 RingBufferReader {
81 inner: rb,
82 local_idx_r: 0,
83 cached_idx_w: 0,
84 },
85 )
86 }
87
88 #[allow(clippy::mut_from_ref)]
89 #[inline]
90 unsafe fn get_mut(&self, idx: usize) -> &mut MaybeUninit<T> {
91 // Since N is a power of two, N-1 is a mask covering N
92 // elements overflowing when N elements have been added.
93 // Indexes are left growing indefinetely and naturally wraps
94 // around once the index increment reaches usize::MAX.
95 &mut (*self.buffer.get())[idx & (N - 1)]
96 }
97}
98
99impl<T, const N: usize> Drop for RingBuffer<T, N> {
100 fn drop(&mut self) {
101 let mut idx_r = self.idx_r.load(Ordering::Acquire);
102 let idx_w = self.idx_w.load(Ordering::Acquire);
103
104 while idx_r != idx_w {
105 let t =
106 unsafe { mem::replace(self.get_mut(idx_r), MaybeUninit::uninit()).assume_init() };
107 mem::drop(t);
108 idx_r = idx_r.wrapping_add(1);
109 }
110 }
111}
112
113pub struct RingBufferWriter<T, const N: usize> {
114 inner: Arc<RingBuffer<T, N>>,
115 cached_idx_r: usize,
116 local_idx_w: usize,
117}
118
119unsafe impl<T: Send, const N: usize> Send for RingBufferWriter<T, N> {}
120unsafe impl<T: Sync, const N: usize> Sync for RingBufferWriter<T, N> {}
121
122impl<T, const N: usize> RingBufferWriter<T, N> {
123 /// Push an element into the RingBuffer.
124 /// It returns `Some(T)` if the RingBuffer is full, giving back the ownership of `T`.
125 #[inline]
126 pub fn push(&mut self, t: T) -> Option<T> {
127 // Check if the ring buffer is full.
128 if self.is_full() {
129 return Some(t);
130 }
131
132 // Insert the element in the ring buffer
133 let _ = unsafe { mem::replace(self.inner.get_mut(self.local_idx_w), MaybeUninit::new(t)) };
134 // Let's increment the counter and let it grow indefinitely and potentially overflow resetting it to 0.
135 self.local_idx_w = self.local_idx_w.wrapping_add(1);
136 self.inner.idx_w.store(self.local_idx_w, Ordering::Release);
137
138 None
139 }
140
141 /// Check if the RingBuffer is full and evenutally updates the internal cached indexes.
142 #[inline]
143 pub fn is_full(&mut self) -> bool {
144 // Check if the ring buffer is potentially full.
145 // This happens when the difference between the write and read indexes equals
146 // the ring buffer capacity. Note that the write and read indexes are left growing
147 // indefinitely, so we need to compute the difference by accounting for any eventual
148 // overflow. This requires wrapping the subtraction operation.
149 if self.local_idx_w.wrapping_sub(self.cached_idx_r) == N {
150 self.cached_idx_r = self.inner.idx_r.load(Ordering::Acquire);
151 // Check if the ring buffer is really full
152 self.local_idx_w.wrapping_sub(self.cached_idx_r) == N
153 } else {
154 false
155 }
156 }
157}
158
159pub struct RingBufferReader<T, const N: usize> {
160 inner: Arc<RingBuffer<T, N>>,
161 local_idx_r: usize,
162 cached_idx_w: usize,
163}
164
165unsafe impl<T: Send, const N: usize> Send for RingBufferReader<T, N> {}
166unsafe impl<T: Sync, const N: usize> Sync for RingBufferReader<T, N> {}
167
168impl<T, const N: usize> RingBufferReader<T, N> {
169 /// Pull an element from the RingBuffer.
170 /// It returns `None` if the RingBuffer is empty.
171 #[inline]
172 pub fn pull(&mut self) -> Option<T> {
173 // Check if the ring buffer is potentially empty
174 if self.is_empty() {
175 return None;
176 }
177
178 // Remove the element from the ring buffer
179 let t = unsafe {
180 mem::replace(self.inner.get_mut(self.local_idx_r), MaybeUninit::uninit()).assume_init()
181 };
182 // Let's increment the counter and let it grow indefinitely
183 // and potentially overflow resetting it to 0.
184 self.local_idx_r = self.local_idx_r.wrapping_add(1);
185 self.inner.idx_r.store(self.local_idx_r, Ordering::Release);
186
187 Some(t)
188 }
189
190 /// Check if the RingBuffer is empty and evenutally updates the internal cached indexes.
191 #[inline]
192 pub fn is_empty(&mut self) -> bool {
193 // Check if the ring buffer is potentially empty
194 if self.local_idx_r == self.cached_idx_w {
195 // Update the write index
196 self.cached_idx_w = self.inner.idx_w.load(Ordering::Acquire);
197 // Check if the ring buffer is really empty
198 self.local_idx_r == self.cached_idx_w
199 } else {
200 false
201 }
202 }
203}