Skip to main content

maolan_plugin_protocol/
ringbuf.rs

1use std::sync::atomic::{AtomicU32, Ordering};
2
3/// Lock-free single-producer / single-consumer ring buffer backed by a fixed
4/// array in shared memory.
5///
6/// `T` must be `Copy` so that reads/writes are safe without destructors.
7/// The capacity must be a power of two so that modulo can be done with a mask.
8pub struct RingBuffer<T> {
9    buffer: *mut T,
10    write_idx: *mut AtomicU32,
11    read_idx: *mut AtomicU32,
12    mask: u32,
13}
14
15unsafe impl<T: Send> Send for RingBuffer<T> {}
16unsafe impl<T: Sync> Sync for RingBuffer<T> {}
17
18impl<T: Copy> RingBuffer<T> {
19    /// # Safety
20    ///
21    /// `buffer` must point to `capacity` valid, initialized slots.
22    /// `write_idx` and `read_idx` must point to distinct `AtomicU32`s.
23    /// `capacity` must be a power of two.
24    pub unsafe fn new(
25        buffer: *mut T,
26        write_idx: *mut AtomicU32,
27        read_idx: *mut AtomicU32,
28        capacity: usize,
29    ) -> Self {
30        assert!(
31            capacity.is_power_of_two(),
32            "RingBuffer capacity must be a power of two"
33        );
34        Self {
35            buffer,
36            write_idx,
37            read_idx,
38            mask: (capacity - 1) as u32,
39        }
40    }
41
42    /// Attempt to push a single event. Returns `true` on success, `false` if
43    /// the buffer is full.
44    pub fn push(&self, event: T) -> bool {
45        let write = unsafe { (*self.write_idx).load(Ordering::Relaxed) };
46        let read = unsafe { (*self.read_idx).load(Ordering::Acquire) };
47        let count = write.wrapping_sub(read);
48        if count as usize >= (self.mask as usize) {
49            return false;
50        }
51        let idx = (write & self.mask) as usize;
52        unsafe {
53            self.buffer.add(idx).write(event);
54        }
55        unsafe {
56            (*self.write_idx).store(write.wrapping_add(1), Ordering::Release);
57        }
58        true
59    }
60
61    /// Pop a single event. Returns `None` if the buffer is empty.
62    pub fn pop(&self) -> Option<T> {
63        let read = unsafe { (*self.read_idx).load(Ordering::Relaxed) };
64        let write = unsafe { (*self.write_idx).load(Ordering::Acquire) };
65        if read == write {
66            return None;
67        }
68        let idx = (read & self.mask) as usize;
69        let event = unsafe { self.buffer.add(idx).read() };
70        unsafe {
71            (*self.read_idx).store(read.wrapping_add(1), Ordering::Release);
72        }
73        Some(event)
74    }
75
76    /// Returns the number of readable slots.
77    pub fn len(&self) -> usize {
78        let write = unsafe { (*self.write_idx).load(Ordering::Acquire) };
79        let read = unsafe { (*self.read_idx).load(Ordering::Acquire) };
80        write.wrapping_sub(read) as usize
81    }
82
83    pub fn is_empty(&self) -> bool {
84        self.len() == 0
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91    use std::sync::atomic::AtomicU32;
92
93    #[test]
94    fn overflow_drops_excess() {
95        let mut buf = vec![0u32; 8];
96        let write = AtomicU32::new(0);
97        let read = AtomicU32::new(0);
98        let rb = unsafe {
99            RingBuffer::new(
100                buf.as_mut_ptr(),
101                &write as *const AtomicU32 as *mut AtomicU32,
102                &read as *const AtomicU32 as *mut AtomicU32,
103                8,
104            )
105        };
106
107        // Fill the buffer to capacity (7 usable slots in an 8-slot ring).
108        for i in 0..7 {
109            assert!(rb.push(i), "push {i} should succeed");
110        }
111
112        // Next push must fail (full).
113        assert!(!rb.push(99), "overflow push should be rejected");
114
115        // Read back the 7 items in order.
116        for i in 0..7 {
117            assert_eq!(rb.pop(), Some(i), "should read item {i}");
118        }
119
120        assert!(rb.is_empty());
121    }
122}