ruvector_nervous_system/eventbus/
queue.rs1use super::event::Event;
6use std::cell::UnsafeCell;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9pub struct EventRingBuffer<E: Event + Copy> {
28 buffer: Vec<UnsafeCell<E>>,
29 head: AtomicUsize,
30 tail: AtomicUsize,
31 capacity: usize,
32}
33
34unsafe impl<E: Event + Copy> Send for EventRingBuffer<E> {}
36unsafe impl<E: Event + Copy> Sync for EventRingBuffer<E> {}
37
38impl<E: Event + Copy> EventRingBuffer<E> {
39 pub fn new(capacity: usize) -> Self {
43 assert!(
44 capacity > 0 && capacity.is_power_of_two(),
45 "Capacity must be power of 2"
46 );
47
48 let buffer: Vec<UnsafeCell<E>> = (0..capacity)
50 .map(|_| {
51 unsafe { std::mem::zeroed() }
54 })
55 .map(UnsafeCell::new)
56 .collect();
57
58 Self {
59 buffer,
60 head: AtomicUsize::new(0),
61 tail: AtomicUsize::new(0),
62 capacity,
63 }
64 }
65
66 #[inline]
71 pub fn push(&self, event: E) -> Result<(), E> {
72 let tail = self.tail.load(Ordering::Relaxed);
73 let next_tail = (tail + 1) & (self.capacity - 1);
74
75 if next_tail == self.head.load(Ordering::Acquire) {
77 return Err(event);
78 }
79
80 unsafe {
82 *self.buffer[tail].get() = event;
83 }
84
85 self.tail.store(next_tail, Ordering::Release);
87 Ok(())
88 }
89
90 #[inline]
95 pub fn pop(&self) -> Option<E> {
96 let head = self.head.load(Ordering::Relaxed);
97
98 if head == self.tail.load(Ordering::Acquire) {
100 return None;
101 }
102
103 let event = unsafe { *self.buffer[head].get() };
105
106 let next_head = (head + 1) & (self.capacity - 1);
107
108 self.head.store(next_head, Ordering::Release);
110 Some(event)
111 }
112
113 #[inline]
115 pub fn len(&self) -> usize {
116 let tail = self.tail.load(Ordering::Acquire);
117 let head = self.head.load(Ordering::Acquire);
118
119 if tail >= head {
120 tail - head
121 } else {
122 self.capacity - head + tail
123 }
124 }
125
126 #[inline]
128 pub fn is_empty(&self) -> bool {
129 self.head.load(Ordering::Acquire) == self.tail.load(Ordering::Acquire)
130 }
131
132 #[inline]
134 pub fn is_full(&self) -> bool {
135 let tail = self.tail.load(Ordering::Relaxed);
136 let next_tail = (tail + 1) & (self.capacity - 1);
137 next_tail == self.head.load(Ordering::Acquire)
138 }
139
140 #[inline]
142 pub fn capacity(&self) -> usize {
143 self.capacity
144 }
145
146 pub fn fill_ratio(&self) -> f32 {
148 self.len() as f32 / self.capacity as f32
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use crate::eventbus::event::DVSEvent;
156 use std::thread;
157
158 #[test]
159 fn test_ring_buffer_creation() {
160 let buffer: EventRingBuffer<DVSEvent> = EventRingBuffer::new(1024);
161 assert_eq!(buffer.capacity(), 1024);
162 assert_eq!(buffer.len(), 0);
163 assert!(buffer.is_empty());
164 assert!(!buffer.is_full());
165 }
166
167 #[test]
168 #[should_panic]
169 fn test_non_power_of_two_capacity() {
170 let _: EventRingBuffer<DVSEvent> = EventRingBuffer::new(1000);
171 }
172
173 #[test]
174 fn test_push_pop_single() {
175 let buffer = EventRingBuffer::new(16);
176 let event = DVSEvent::new(1000, 42, 123, true);
177
178 assert!(buffer.push(event).is_ok());
179 assert_eq!(buffer.len(), 1);
180
181 let popped = buffer.pop().unwrap();
182 assert_eq!(popped.timestamp(), 1000);
183 assert_eq!(popped.source_id(), 42);
184 assert!(buffer.is_empty());
185 }
186
187 #[test]
188 fn test_push_until_full() {
189 let buffer = EventRingBuffer::new(4);
190
191 for i in 0..3 {
193 let event = DVSEvent::new(i as u64, i as u16, 0, true);
194 assert!(buffer.push(event).is_ok());
195 }
196
197 assert!(buffer.is_full());
198
199 let event = DVSEvent::new(999, 999, 0, true);
201 assert!(buffer.push(event).is_err());
202 }
203
204 #[test]
205 fn test_fifo_order() {
206 let buffer = EventRingBuffer::new(16);
207
208 for i in 0..10 {
210 let event = DVSEvent::new(i as u64, i as u16, i as u32, true);
211 buffer.push(event).unwrap();
212 }
213
214 for i in 0..10 {
216 let event = buffer.pop().unwrap();
217 assert_eq!(event.timestamp(), i as u64);
218 }
219 }
220
221 #[test]
222 fn test_wrap_around() {
223 let buffer = EventRingBuffer::new(4);
224
225 for i in 0..3 {
227 buffer.push(DVSEvent::new(i, 0, 0, true)).unwrap();
228 }
229
230 buffer.pop();
232 buffer.pop();
233
234 buffer.push(DVSEvent::new(100, 0, 0, true)).unwrap();
236 buffer.push(DVSEvent::new(101, 0, 0, true)).unwrap();
237
238 assert_eq!(buffer.len(), 3);
239 }
240
241 #[test]
242 fn test_fill_ratio() {
243 let buffer = EventRingBuffer::new(8);
244
245 assert_eq!(buffer.fill_ratio(), 0.0);
246
247 buffer.push(DVSEvent::new(0, 0, 0, true)).unwrap();
248 buffer.push(DVSEvent::new(1, 0, 0, true)).unwrap();
249
250 assert!((buffer.fill_ratio() - 0.25).abs() < 0.01);
251 }
252
253 #[test]
254 fn test_spsc_threaded() {
255 let buffer = std::sync::Arc::new(EventRingBuffer::new(1024));
256 let buffer_clone = buffer.clone();
257
258 const NUM_EVENTS: usize = 10000;
259
260 let producer = thread::spawn(move || {
262 for i in 0..NUM_EVENTS {
263 let event = DVSEvent::new(i as u64, (i % 256) as u16, i as u32, true);
264 while buffer_clone.push(event).is_err() {
265 std::hint::spin_loop();
266 }
267 }
268 });
269
270 let consumer = thread::spawn(move || {
272 let mut count = 0;
273 let mut last_timestamp = 0u64;
274
275 while count < NUM_EVENTS {
276 if let Some(event) = buffer.pop() {
277 assert!(event.timestamp() >= last_timestamp);
278 last_timestamp = event.timestamp();
279 count += 1;
280 }
281 }
282 count
283 });
284
285 producer.join().unwrap();
286 let received = consumer.join().unwrap();
287 assert_eq!(received, NUM_EVENTS);
288 }
289
290 #[test]
291 fn test_concurrent_push_pop() {
292 let buffer = std::sync::Arc::new(EventRingBuffer::new(512));
293 let mut handles = vec![];
294
295 let buf = buffer.clone();
297 handles.push(thread::spawn(move || {
298 for i in 0..1000 {
299 let event = DVSEvent::new(i, 0, 0, true);
300 while buf.push(event).is_err() {
301 thread::yield_now();
302 }
303 }
304 }));
305
306 let buf = buffer.clone();
308 let consumer_handle = thread::spawn(move || {
309 let mut count = 0;
310 while count < 1000 {
311 if buf.pop().is_some() {
312 count += 1;
313 }
314 }
315 count
316 });
317
318 for handle in handles {
319 handle.join().unwrap();
320 }
321
322 let received = consumer_handle.join().unwrap();
323 assert_eq!(received, 1000);
324 assert!(buffer.is_empty());
325 }
326}