shm_primitives/
spsc.rs

1use core::mem::{align_of, size_of};
2use core::ptr;
3
4use crate::region::Region;
5use crate::sync::{AtomicU64, Ordering};
6
7/// SPSC ring header (192 bytes, cache-line aligned fields).
8#[repr(C)]
9pub struct SpscRingHeader {
10    /// Producer publication index (written by producer, read by consumer).
11    pub visible_head: AtomicU64,
12    _pad1: [u8; 56],
13
14    /// Consumer index (written by consumer, read by producer).
15    pub tail: AtomicU64,
16    _pad2: [u8; 56],
17
18    /// Ring capacity (power of 2, immutable after init).
19    pub capacity: u32,
20    _pad3: [u8; 60],
21}
22
23#[cfg(not(loom))]
24const _: () = assert!(core::mem::size_of::<SpscRingHeader>() == 192);
25
26impl SpscRingHeader {
27    /// Initialize a new ring header.
28    pub fn init(&mut self, capacity: u32) {
29        assert!(capacity.is_power_of_two(), "capacity must be power of 2");
30        self.visible_head = AtomicU64::new(0);
31        self._pad1 = [0; 56];
32        self.tail = AtomicU64::new(0);
33        self._pad2 = [0; 56];
34        self.capacity = capacity;
35        self._pad3 = [0; 60];
36    }
37
38    #[inline]
39    pub fn is_empty(&self) -> bool {
40        let tail = self.tail.load(Ordering::Relaxed);
41        let head = self.visible_head.load(Ordering::Acquire);
42        tail >= head
43    }
44
45    #[inline]
46    pub fn mask(&self) -> u64 {
47        self.capacity as u64 - 1
48    }
49
50    #[inline]
51    pub fn is_full(&self, local_head: u64) -> bool {
52        let tail = self.tail.load(Ordering::Acquire);
53        local_head.wrapping_sub(tail) >= self.capacity as u64
54    }
55
56    #[inline]
57    pub fn len(&self) -> u64 {
58        let tail = self.tail.load(Ordering::Relaxed);
59        let head = self.visible_head.load(Ordering::Acquire);
60        head.saturating_sub(tail)
61    }
62}
63
64/// A wait-free SPSC ring buffer in a shared memory region.
65///
66/// This is a convenience wrapper around `SpscRingRaw<T>` that manages
67/// memory through a `Region`. All operations delegate to the raw implementation.
68pub struct SpscRing<T> {
69    /// We hold the region to keep the backing memory alive.
70    /// All operations go through `inner` which holds raw pointers into this region.
71    #[allow(dead_code)]
72    region: Region,
73    inner: SpscRingRaw<T>,
74}
75
76unsafe impl<T: Send> Send for SpscRing<T> {}
77unsafe impl<T: Send> Sync for SpscRing<T> {}
78
79impl<T: Copy> SpscRing<T> {
80    /// Initialize a new ring in the region.
81    ///
82    /// # Safety
83    ///
84    /// The region must be writable and exclusively owned during initialization.
85    pub unsafe fn init(region: Region, header_offset: usize, capacity: u32) -> Self {
86        assert!(
87            capacity.is_power_of_two() && capacity > 0,
88            "capacity must be power of 2"
89        );
90        assert!(
91            header_offset.is_multiple_of(64),
92            "header_offset must be 64-byte aligned"
93        );
94        assert!(align_of::<T>() <= 64, "entry alignment must be <= 64");
95
96        let entries_offset = header_offset + size_of::<SpscRingHeader>();
97        let required = entries_offset + (capacity as usize * size_of::<T>());
98        assert!(required <= region.len(), "region too small for ring");
99        assert!(
100            entries_offset.is_multiple_of(align_of::<T>()),
101            "entries misaligned"
102        );
103
104        let header_ptr = region.offset(header_offset) as *mut SpscRingHeader;
105        let entries_ptr = region.offset(entries_offset) as *mut T;
106
107        // Initialize the header
108        unsafe { (*header_ptr).init(capacity) };
109
110        // Create the inner raw ring
111        let inner = unsafe { SpscRingRaw::from_raw(header_ptr, entries_ptr) };
112
113        Self { region, inner }
114    }
115
116    /// Attach to an existing ring in the region.
117    ///
118    /// # Safety
119    ///
120    /// The region must contain a valid, initialized ring header.
121    pub unsafe fn attach(region: Region, header_offset: usize) -> Self {
122        assert!(
123            header_offset.is_multiple_of(64),
124            "header_offset must be 64-byte aligned"
125        );
126        assert!(align_of::<T>() <= 64, "entry alignment must be <= 64");
127
128        let entries_offset = header_offset + size_of::<SpscRingHeader>();
129        let header_ptr = region.offset(header_offset) as *mut SpscRingHeader;
130        let capacity = unsafe { (*header_ptr).capacity };
131
132        assert!(
133            capacity.is_power_of_two() && capacity > 0,
134            "invalid ring capacity"
135        );
136        let required = entries_offset + (capacity as usize * size_of::<T>());
137        assert!(required <= region.len(), "region too small for ring");
138        assert!(
139            entries_offset.is_multiple_of(align_of::<T>()),
140            "entries misaligned"
141        );
142
143        let entries_ptr = region.offset(entries_offset) as *mut T;
144        let inner = unsafe { SpscRingRaw::from_raw(header_ptr, entries_ptr) };
145
146        Self { region, inner }
147    }
148
149    /// Get a reference to the inner raw ring.
150    #[inline]
151    pub fn inner(&self) -> &SpscRingRaw<T> {
152        &self.inner
153    }
154
155    /// Split into producer and consumer handles.
156    pub fn split(&self) -> (SpscProducer<'_, T>, SpscConsumer<'_, T>) {
157        let head = self.inner.status().visible_head;
158        (
159            SpscProducer {
160                ring: self,
161                local_head: head,
162            },
163            SpscConsumer { ring: self },
164        )
165    }
166
167    /// Returns the ring capacity.
168    #[inline]
169    pub fn capacity(&self) -> u32 {
170        self.inner.capacity()
171    }
172
173    /// Returns true if the ring appears empty.
174    #[inline]
175    pub fn is_empty(&self) -> bool {
176        self.inner.is_empty()
177    }
178
179    /// Returns a status snapshot of head/tail.
180    pub fn status(&self) -> RingStatus {
181        self.inner.status()
182    }
183}
184
185/// Producer handle for the ring.
186pub struct SpscProducer<'a, T> {
187    pub(crate) ring: &'a SpscRing<T>,
188    pub(crate) local_head: u64,
189}
190
191/// Consumer handle for the ring.
192pub struct SpscConsumer<'a, T> {
193    pub(crate) ring: &'a SpscRing<T>,
194}
195
196/// Result of a push attempt.
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub enum PushResult {
199    Ok,
200    WouldBlock,
201}
202
203impl PushResult {
204    #[inline]
205    pub fn is_would_block(self) -> bool {
206        matches!(self, PushResult::WouldBlock)
207    }
208}
209
210impl<'a, T: Copy> SpscProducer<'a, T> {
211    /// Try to push an entry to the ring.
212    ///
213    /// Delegates to `SpscRingRaw::enqueue`.
214    pub fn try_push(&mut self, entry: T) -> PushResult {
215        match self.ring.inner.enqueue(&mut self.local_head, &entry) {
216            Ok(()) => PushResult::Ok,
217            Err(RingFull) => PushResult::WouldBlock,
218        }
219    }
220
221    /// Returns true if the ring appears full.
222    #[inline]
223    pub fn is_full(&self) -> bool {
224        self.ring.inner.is_full(self.local_head)
225    }
226
227    /// Returns the number of entries that can be pushed (approximate).
228    #[inline]
229    pub fn available_capacity(&self) -> u64 {
230        let capacity = self.ring.inner.capacity() as u64;
231        let tail = self.ring.inner.status().tail;
232        capacity.saturating_sub(self.local_head.wrapping_sub(tail))
233    }
234}
235
236impl<'a, T: Copy> SpscConsumer<'a, T> {
237    /// Try to pop an entry from the ring.
238    ///
239    /// Delegates to `SpscRingRaw::dequeue`.
240    pub fn try_pop(&mut self) -> Option<T> {
241        self.ring.inner.dequeue()
242    }
243
244    /// Returns true if the ring appears empty.
245    #[inline]
246    pub fn is_empty(&self) -> bool {
247        self.ring.is_empty()
248    }
249
250    /// Returns the number of entries available to pop (approximate).
251    #[inline]
252    pub fn len(&self) -> u64 {
253        self.ring.inner.status().len as u64
254    }
255}
256
257/// Status snapshot of a ring.
258#[derive(Debug, Clone, Copy)]
259pub struct RingStatus {
260    pub visible_head: u64,
261    pub tail: u64,
262    pub capacity: u32,
263    pub len: u32,
264}
265
266// =============================================================================
267// SpscRingRaw - Raw pointer version for rapace-core compatibility
268// =============================================================================
269
270/// Error returned when the ring is full.
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
272pub struct RingFull;
273
274/// A wait-free SPSC ring buffer operating on raw pointers.
275///
276/// This is the "raw" API that matches rapace-core's `DescRing` interface:
277/// - Constructed from raw pointers to header and entries
278/// - `enqueue` takes `&mut local_head` (caller-managed producer state)
279/// - `dequeue` is stateless on the caller side
280///
281/// Use this when you need to integrate with existing SHM layouts or when
282/// the `Region` abstraction doesn't fit your use case.
283pub struct SpscRingRaw<T> {
284    header: *mut SpscRingHeader,
285    entries: *mut T,
286}
287
288unsafe impl<T: Send> Send for SpscRingRaw<T> {}
289unsafe impl<T: Send> Sync for SpscRingRaw<T> {}
290
291impl<T: Copy> SpscRingRaw<T> {
292    /// Create a ring view from raw pointers.
293    ///
294    /// # Safety
295    ///
296    /// - `header` must point to a valid, initialized `SpscRingHeader`
297    /// - `entries` must point to `header.capacity` initialized `T` slots
298    /// - The memory must remain valid for the lifetime of this ring
299    /// - `entries` must be properly aligned for `T`
300    #[inline]
301    pub unsafe fn from_raw(header: *mut SpscRingHeader, entries: *mut T) -> Self {
302        Self { header, entries }
303    }
304
305    /// Get the ring header.
306    #[inline]
307    fn header(&self) -> &SpscRingHeader {
308        unsafe { &*self.header }
309    }
310
311    /// Get a pointer to an entry slot.
312    #[inline]
313    unsafe fn entry_ptr(&self, slot: usize) -> *mut T {
314        unsafe { self.entries.add(slot) }
315    }
316
317    /// Enqueue an entry (producer side).
318    ///
319    /// `local_head` is producer-private (stack-local, not in SHM).
320    /// On success, `local_head` is incremented.
321    ///
322    /// This matches rapace-core's `DescRing::enqueue` signature.
323    pub fn enqueue(&self, local_head: &mut u64, entry: &T) -> Result<(), RingFull> {
324        let header = self.header();
325        let capacity = header.capacity as u64;
326        let mask = header.mask();
327
328        let tail = header.tail.load(Ordering::Acquire);
329        if local_head.wrapping_sub(tail) >= capacity {
330            return Err(RingFull);
331        }
332
333        let slot = (*local_head & mask) as usize;
334        unsafe {
335            ptr::write(self.entry_ptr(slot), *entry);
336        }
337
338        *local_head = local_head.wrapping_add(1);
339        header.visible_head.store(*local_head, Ordering::Release);
340
341        Ok(())
342    }
343
344    /// Dequeue an entry (consumer side).
345    ///
346    /// This matches rapace-core's `DescRing::dequeue` signature.
347    pub fn dequeue(&self) -> Option<T> {
348        let header = self.header();
349
350        let tail = header.tail.load(Ordering::Relaxed);
351        let visible = header.visible_head.load(Ordering::Acquire);
352
353        if tail >= visible {
354            return None;
355        }
356
357        let mask = header.mask();
358        let slot = (tail & mask) as usize;
359        let entry = unsafe { ptr::read(self.entry_ptr(slot)) };
360
361        header.tail.store(tail.wrapping_add(1), Ordering::Release);
362
363        Some(entry)
364    }
365
366    /// Check if the ring is empty.
367    #[inline]
368    pub fn is_empty(&self) -> bool {
369        self.header().is_empty()
370    }
371
372    /// Check if the ring is full (given producer's local head).
373    #[inline]
374    pub fn is_full(&self, local_head: u64) -> bool {
375        self.header().is_full(local_head)
376    }
377
378    /// Get the capacity of the ring.
379    #[inline]
380    pub fn capacity(&self) -> u32 {
381        self.header().capacity
382    }
383
384    /// Get the ring status (for diagnostics).
385    pub fn status(&self) -> RingStatus {
386        let header = self.header();
387        let visible_head = header.visible_head.load(Ordering::Acquire);
388        let tail = header.tail.load(Ordering::Acquire);
389        let capacity = header.capacity;
390        let len = visible_head.saturating_sub(tail) as u32;
391
392        RingStatus {
393            visible_head,
394            tail,
395            capacity,
396            len,
397        }
398    }
399}