1use core::mem::{align_of, size_of};
2use core::ptr;
3
4use crate::region::Region;
5use crate::sync::{AtomicU64, Ordering};
6
7#[repr(C)]
9pub struct SpscRingHeader {
10 pub visible_head: AtomicU64,
12 _pad1: [u8; 56],
13
14 pub tail: AtomicU64,
16 _pad2: [u8; 56],
17
18 pub capacity: u32,
20 _pad3: [u8; 60],
21}
22
23#[cfg(not(loom))]
24const _: () = assert!(core::mem::size_of::<SpscRingHeader>() == 192);
25
26impl SpscRingHeader {
27 pub fn init(&mut self, capacity: u32) {
29 assert!(capacity.is_power_of_two(), "capacity must be power of 2");
30 self.visible_head = AtomicU64::new(0);
31 self._pad1 = [0; 56];
32 self.tail = AtomicU64::new(0);
33 self._pad2 = [0; 56];
34 self.capacity = capacity;
35 self._pad3 = [0; 60];
36 }
37
38 #[inline]
39 pub fn is_empty(&self) -> bool {
40 let tail = self.tail.load(Ordering::Relaxed);
41 let head = self.visible_head.load(Ordering::Acquire);
42 tail >= head
43 }
44
45 #[inline]
46 pub fn mask(&self) -> u64 {
47 self.capacity as u64 - 1
48 }
49
50 #[inline]
51 pub fn is_full(&self, local_head: u64) -> bool {
52 let tail = self.tail.load(Ordering::Acquire);
53 local_head.wrapping_sub(tail) >= self.capacity as u64
54 }
55
56 #[inline]
57 pub fn len(&self) -> u64 {
58 let tail = self.tail.load(Ordering::Relaxed);
59 let head = self.visible_head.load(Ordering::Acquire);
60 head.saturating_sub(tail)
61 }
62}
63
64pub struct SpscRing<T> {
69 #[allow(dead_code)]
72 region: Region,
73 inner: SpscRingRaw<T>,
74}
75
76unsafe impl<T: Send> Send for SpscRing<T> {}
77unsafe impl<T: Send> Sync for SpscRing<T> {}
78
79impl<T: Copy> SpscRing<T> {
80 pub unsafe fn init(region: Region, header_offset: usize, capacity: u32) -> Self {
86 assert!(
87 capacity.is_power_of_two() && capacity > 0,
88 "capacity must be power of 2"
89 );
90 assert!(
91 header_offset.is_multiple_of(64),
92 "header_offset must be 64-byte aligned"
93 );
94 assert!(align_of::<T>() <= 64, "entry alignment must be <= 64");
95
96 let entries_offset = header_offset + size_of::<SpscRingHeader>();
97 let required = entries_offset + (capacity as usize * size_of::<T>());
98 assert!(required <= region.len(), "region too small for ring");
99 assert!(
100 entries_offset.is_multiple_of(align_of::<T>()),
101 "entries misaligned"
102 );
103
104 let header_ptr = region.offset(header_offset) as *mut SpscRingHeader;
105 let entries_ptr = region.offset(entries_offset) as *mut T;
106
107 unsafe { (*header_ptr).init(capacity) };
109
110 let inner = unsafe { SpscRingRaw::from_raw(header_ptr, entries_ptr) };
112
113 Self { region, inner }
114 }
115
116 pub unsafe fn attach(region: Region, header_offset: usize) -> Self {
122 assert!(
123 header_offset.is_multiple_of(64),
124 "header_offset must be 64-byte aligned"
125 );
126 assert!(align_of::<T>() <= 64, "entry alignment must be <= 64");
127
128 let entries_offset = header_offset + size_of::<SpscRingHeader>();
129 let header_ptr = region.offset(header_offset) as *mut SpscRingHeader;
130 let capacity = unsafe { (*header_ptr).capacity };
131
132 assert!(
133 capacity.is_power_of_two() && capacity > 0,
134 "invalid ring capacity"
135 );
136 let required = entries_offset + (capacity as usize * size_of::<T>());
137 assert!(required <= region.len(), "region too small for ring");
138 assert!(
139 entries_offset.is_multiple_of(align_of::<T>()),
140 "entries misaligned"
141 );
142
143 let entries_ptr = region.offset(entries_offset) as *mut T;
144 let inner = unsafe { SpscRingRaw::from_raw(header_ptr, entries_ptr) };
145
146 Self { region, inner }
147 }
148
149 #[inline]
151 pub fn inner(&self) -> &SpscRingRaw<T> {
152 &self.inner
153 }
154
155 pub fn split(&self) -> (SpscProducer<'_, T>, SpscConsumer<'_, T>) {
157 let head = self.inner.status().visible_head;
158 (
159 SpscProducer {
160 ring: self,
161 local_head: head,
162 },
163 SpscConsumer { ring: self },
164 )
165 }
166
167 #[inline]
169 pub fn capacity(&self) -> u32 {
170 self.inner.capacity()
171 }
172
173 #[inline]
175 pub fn is_empty(&self) -> bool {
176 self.inner.is_empty()
177 }
178
179 pub fn status(&self) -> RingStatus {
181 self.inner.status()
182 }
183}
184
185pub struct SpscProducer<'a, T> {
187 pub(crate) ring: &'a SpscRing<T>,
188 pub(crate) local_head: u64,
189}
190
191pub struct SpscConsumer<'a, T> {
193 pub(crate) ring: &'a SpscRing<T>,
194}
195
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub enum PushResult {
199 Ok,
200 WouldBlock,
201}
202
203impl PushResult {
204 #[inline]
205 pub fn is_would_block(self) -> bool {
206 matches!(self, PushResult::WouldBlock)
207 }
208}
209
210impl<'a, T: Copy> SpscProducer<'a, T> {
211 pub fn try_push(&mut self, entry: T) -> PushResult {
215 match self.ring.inner.enqueue(&mut self.local_head, &entry) {
216 Ok(()) => PushResult::Ok,
217 Err(RingFull) => PushResult::WouldBlock,
218 }
219 }
220
221 #[inline]
223 pub fn is_full(&self) -> bool {
224 self.ring.inner.is_full(self.local_head)
225 }
226
227 #[inline]
229 pub fn available_capacity(&self) -> u64 {
230 let capacity = self.ring.inner.capacity() as u64;
231 let tail = self.ring.inner.status().tail;
232 capacity.saturating_sub(self.local_head.wrapping_sub(tail))
233 }
234}
235
236impl<'a, T: Copy> SpscConsumer<'a, T> {
237 pub fn try_pop(&mut self) -> Option<T> {
241 self.ring.inner.dequeue()
242 }
243
244 #[inline]
246 pub fn is_empty(&self) -> bool {
247 self.ring.is_empty()
248 }
249
250 #[inline]
252 pub fn len(&self) -> u64 {
253 self.ring.inner.status().len as u64
254 }
255}
256
257#[derive(Debug, Clone, Copy)]
259pub struct RingStatus {
260 pub visible_head: u64,
261 pub tail: u64,
262 pub capacity: u32,
263 pub len: u32,
264}
265
266#[derive(Debug, Clone, Copy, PartialEq, Eq)]
272pub struct RingFull;
273
274pub struct SpscRingRaw<T> {
284 header: *mut SpscRingHeader,
285 entries: *mut T,
286}
287
288unsafe impl<T: Send> Send for SpscRingRaw<T> {}
289unsafe impl<T: Send> Sync for SpscRingRaw<T> {}
290
291impl<T: Copy> SpscRingRaw<T> {
292 #[inline]
301 pub unsafe fn from_raw(header: *mut SpscRingHeader, entries: *mut T) -> Self {
302 Self { header, entries }
303 }
304
305 #[inline]
307 fn header(&self) -> &SpscRingHeader {
308 unsafe { &*self.header }
309 }
310
311 #[inline]
313 unsafe fn entry_ptr(&self, slot: usize) -> *mut T {
314 unsafe { self.entries.add(slot) }
315 }
316
317 pub fn enqueue(&self, local_head: &mut u64, entry: &T) -> Result<(), RingFull> {
324 let header = self.header();
325 let capacity = header.capacity as u64;
326 let mask = header.mask();
327
328 let tail = header.tail.load(Ordering::Acquire);
329 if local_head.wrapping_sub(tail) >= capacity {
330 return Err(RingFull);
331 }
332
333 let slot = (*local_head & mask) as usize;
334 unsafe {
335 ptr::write(self.entry_ptr(slot), *entry);
336 }
337
338 *local_head = local_head.wrapping_add(1);
339 header.visible_head.store(*local_head, Ordering::Release);
340
341 Ok(())
342 }
343
344 pub fn dequeue(&self) -> Option<T> {
348 let header = self.header();
349
350 let tail = header.tail.load(Ordering::Relaxed);
351 let visible = header.visible_head.load(Ordering::Acquire);
352
353 if tail >= visible {
354 return None;
355 }
356
357 let mask = header.mask();
358 let slot = (tail & mask) as usize;
359 let entry = unsafe { ptr::read(self.entry_ptr(slot)) };
360
361 header.tail.store(tail.wrapping_add(1), Ordering::Release);
362
363 Some(entry)
364 }
365
366 #[inline]
368 pub fn is_empty(&self) -> bool {
369 self.header().is_empty()
370 }
371
372 #[inline]
374 pub fn is_full(&self, local_head: u64) -> bool {
375 self.header().is_full(local_head)
376 }
377
378 #[inline]
380 pub fn capacity(&self) -> u32 {
381 self.header().capacity
382 }
383
384 pub fn status(&self) -> RingStatus {
386 let header = self.header();
387 let visible_head = header.visible_head.load(Ordering::Acquire);
388 let tail = header.tail.load(Ordering::Acquire);
389 let capacity = header.capacity;
390 let len = visible_head.saturating_sub(tail) as u32;
391
392 RingStatus {
393 visible_head,
394 tail,
395 capacity,
396 len,
397 }
398 }
399}