maolan_plugin_protocol/
ringbuf.rs1use std::sync::atomic::{AtomicU32, Ordering};
2
3pub struct RingBuffer<T> {
9 buffer: *mut T,
10 write_idx: *mut AtomicU32,
11 read_idx: *mut AtomicU32,
12 mask: u32,
13}
14
15unsafe impl<T: Send> Send for RingBuffer<T> {}
16unsafe impl<T: Sync> Sync for RingBuffer<T> {}
17
18impl<T: Copy> RingBuffer<T> {
19 pub unsafe fn new(
25 buffer: *mut T,
26 write_idx: *mut AtomicU32,
27 read_idx: *mut AtomicU32,
28 capacity: usize,
29 ) -> Self {
30 assert!(
31 capacity.is_power_of_two(),
32 "RingBuffer capacity must be a power of two"
33 );
34 Self {
35 buffer,
36 write_idx,
37 read_idx,
38 mask: (capacity - 1) as u32,
39 }
40 }
41
42 pub fn push(&self, event: T) -> bool {
45 let write = unsafe { (*self.write_idx).load(Ordering::Relaxed) };
46 let read = unsafe { (*self.read_idx).load(Ordering::Acquire) };
47 let count = write.wrapping_sub(read);
48 if count as usize >= (self.mask as usize) {
49 return false;
50 }
51 let idx = (write & self.mask) as usize;
52 unsafe {
53 self.buffer.add(idx).write(event);
54 }
55 unsafe {
56 (*self.write_idx).store(write.wrapping_add(1), Ordering::Release);
57 }
58 true
59 }
60
61 pub fn pop(&self) -> Option<T> {
63 let read = unsafe { (*self.read_idx).load(Ordering::Relaxed) };
64 let write = unsafe { (*self.write_idx).load(Ordering::Acquire) };
65 if read == write {
66 return None;
67 }
68 let idx = (read & self.mask) as usize;
69 let event = unsafe { self.buffer.add(idx).read() };
70 unsafe {
71 (*self.read_idx).store(read.wrapping_add(1), Ordering::Release);
72 }
73 Some(event)
74 }
75
76 pub fn len(&self) -> usize {
78 let write = unsafe { (*self.write_idx).load(Ordering::Acquire) };
79 let read = unsafe { (*self.read_idx).load(Ordering::Acquire) };
80 write.wrapping_sub(read) as usize
81 }
82
83 pub fn is_empty(&self) -> bool {
84 self.len() == 0
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91 use std::sync::atomic::AtomicU32;
92
93 #[test]
94 fn overflow_drops_excess() {
95 let mut buf = vec![0u32; 8];
96 let write = AtomicU32::new(0);
97 let read = AtomicU32::new(0);
98 let rb = unsafe {
99 RingBuffer::new(
100 buf.as_mut_ptr(),
101 &write as *const AtomicU32 as *mut AtomicU32,
102 &read as *const AtomicU32 as *mut AtomicU32,
103 8,
104 )
105 };
106
107 for i in 0..7 {
109 assert!(rb.push(i), "push {i} should succeed");
110 }
111
112 assert!(!rb.push(99), "overflow push should be rejected");
114
115 for i in 0..7 {
117 assert_eq!(rb.pop(), Some(i), "should read item {i}");
118 }
119
120 assert!(rb.is_empty());
121 }
122}