ruvector_nervous_system/eventbus/
queue.rs

1//! Lock-Free Ring Buffer for Event Queues
2//!
3//! High-performance SPSC/MPSC ring buffer with <100ns push/pop operations.
4
5use super::event::Event;
6use std::cell::UnsafeCell;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9/// Lock-free ring buffer for event storage
10///
11/// Optimized for Single-Producer-Single-Consumer (SPSC) pattern
12/// with atomic head/tail pointers for wait-free operations.
13///
14/// # Thread Safety
15///
16/// This buffer is designed for SPSC (Single-Producer-Single-Consumer) use.
17/// While it is `Send + Sync`, concurrent multi-producer or multi-consumer
18/// access may lead to data races or lost events. For MPSC patterns,
19/// use external synchronization or the `ShardedEventBus` which provides
20/// isolation through sharding.
21///
22/// # Memory Ordering
23///
24/// - Producer writes data before publishing tail (Release)
25/// - Consumer reads head with Acquire before accessing data
26/// - This ensures data visibility across threads in SPSC mode
27pub struct EventRingBuffer<E: Event + Copy> {
28    buffer: Vec<UnsafeCell<E>>,
29    head: AtomicUsize,
30    tail: AtomicUsize,
31    capacity: usize,
32}
33
34// Safety: UnsafeCell is only accessed via atomic synchronization
35unsafe impl<E: Event + Copy> Send for EventRingBuffer<E> {}
36unsafe impl<E: Event + Copy> Sync for EventRingBuffer<E> {}
37
38impl<E: Event + Copy> EventRingBuffer<E> {
39    /// Create new ring buffer with specified capacity
40    ///
41    /// Capacity must be power of 2 for efficient modulo operations.
42    pub fn new(capacity: usize) -> Self {
43        assert!(
44            capacity > 0 && capacity.is_power_of_two(),
45            "Capacity must be power of 2"
46        );
47
48        // Initialize with default events (timestamp 0)
49        let buffer: Vec<UnsafeCell<E>> = (0..capacity)
50            .map(|_| {
51                // Create a dummy event with zero values
52                // This is safe because E: Copy and we'll overwrite before reading
53                unsafe { std::mem::zeroed() }
54            })
55            .map(UnsafeCell::new)
56            .collect();
57
58        Self {
59            buffer,
60            head: AtomicUsize::new(0),
61            tail: AtomicUsize::new(0),
62            capacity,
63        }
64    }
65
66    /// Push event to buffer
67    ///
68    /// Returns Err(event) if buffer is full.
69    /// Time complexity: O(1), typically <100ns
70    #[inline]
71    pub fn push(&self, event: E) -> Result<(), E> {
72        let tail = self.tail.load(Ordering::Relaxed);
73        let next_tail = (tail + 1) & (self.capacity - 1);
74
75        // Check if full
76        if next_tail == self.head.load(Ordering::Acquire) {
77            return Err(event);
78        }
79
80        // Safe: we own this slot until tail is updated
81        unsafe {
82            *self.buffer[tail].get() = event;
83        }
84
85        // Make event visible to consumer
86        self.tail.store(next_tail, Ordering::Release);
87        Ok(())
88    }
89
90    /// Pop event from buffer
91    ///
92    /// Returns None if buffer is empty.
93    /// Time complexity: O(1), typically <100ns
94    #[inline]
95    pub fn pop(&self) -> Option<E> {
96        let head = self.head.load(Ordering::Relaxed);
97
98        // Check if empty
99        if head == self.tail.load(Ordering::Acquire) {
100            return None;
101        }
102
103        // Safe: we own this slot until head is updated
104        let event = unsafe { *self.buffer[head].get() };
105
106        let next_head = (head + 1) & (self.capacity - 1);
107
108        // Make slot available to producer
109        self.head.store(next_head, Ordering::Release);
110        Some(event)
111    }
112
113    /// Get current number of events in buffer
114    #[inline]
115    pub fn len(&self) -> usize {
116        let tail = self.tail.load(Ordering::Acquire);
117        let head = self.head.load(Ordering::Acquire);
118
119        if tail >= head {
120            tail - head
121        } else {
122            self.capacity - head + tail
123        }
124    }
125
126    /// Check if buffer is empty
127    #[inline]
128    pub fn is_empty(&self) -> bool {
129        self.head.load(Ordering::Acquire) == self.tail.load(Ordering::Acquire)
130    }
131
132    /// Check if buffer is full
133    #[inline]
134    pub fn is_full(&self) -> bool {
135        let tail = self.tail.load(Ordering::Relaxed);
136        let next_tail = (tail + 1) & (self.capacity - 1);
137        next_tail == self.head.load(Ordering::Acquire)
138    }
139
140    /// Get buffer capacity
141    #[inline]
142    pub fn capacity(&self) -> usize {
143        self.capacity
144    }
145
146    /// Get fill percentage (0.0 to 1.0)
147    pub fn fill_ratio(&self) -> f32 {
148        self.len() as f32 / self.capacity as f32
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use crate::eventbus::event::DVSEvent;
156    use std::thread;
157
158    #[test]
159    fn test_ring_buffer_creation() {
160        let buffer: EventRingBuffer<DVSEvent> = EventRingBuffer::new(1024);
161        assert_eq!(buffer.capacity(), 1024);
162        assert_eq!(buffer.len(), 0);
163        assert!(buffer.is_empty());
164        assert!(!buffer.is_full());
165    }
166
167    #[test]
168    #[should_panic]
169    fn test_non_power_of_two_capacity() {
170        let _: EventRingBuffer<DVSEvent> = EventRingBuffer::new(1000);
171    }
172
173    #[test]
174    fn test_push_pop_single() {
175        let buffer = EventRingBuffer::new(16);
176        let event = DVSEvent::new(1000, 42, 123, true);
177
178        assert!(buffer.push(event).is_ok());
179        assert_eq!(buffer.len(), 1);
180
181        let popped = buffer.pop().unwrap();
182        assert_eq!(popped.timestamp(), 1000);
183        assert_eq!(popped.source_id(), 42);
184        assert!(buffer.is_empty());
185    }
186
187    #[test]
188    fn test_push_until_full() {
189        let buffer = EventRingBuffer::new(4);
190
191        // Can push capacity-1 events
192        for i in 0..3 {
193            let event = DVSEvent::new(i as u64, i as u16, 0, true);
194            assert!(buffer.push(event).is_ok());
195        }
196
197        assert!(buffer.is_full());
198
199        // Next push should fail
200        let event = DVSEvent::new(999, 999, 0, true);
201        assert!(buffer.push(event).is_err());
202    }
203
204    #[test]
205    fn test_fifo_order() {
206        let buffer = EventRingBuffer::new(16);
207
208        // Push events with different timestamps
209        for i in 0..10 {
210            let event = DVSEvent::new(i as u64, i as u16, i as u32, true);
211            buffer.push(event).unwrap();
212        }
213
214        // Pop and verify order
215        for i in 0..10 {
216            let event = buffer.pop().unwrap();
217            assert_eq!(event.timestamp(), i as u64);
218        }
219    }
220
221    #[test]
222    fn test_wrap_around() {
223        let buffer = EventRingBuffer::new(4);
224
225        // Fill buffer
226        for i in 0..3 {
227            buffer.push(DVSEvent::new(i, 0, 0, true)).unwrap();
228        }
229
230        // Pop 2
231        buffer.pop();
232        buffer.pop();
233
234        // Push 2 more (wraps around)
235        buffer.push(DVSEvent::new(100, 0, 0, true)).unwrap();
236        buffer.push(DVSEvent::new(101, 0, 0, true)).unwrap();
237
238        assert_eq!(buffer.len(), 3);
239    }
240
241    #[test]
242    fn test_fill_ratio() {
243        let buffer = EventRingBuffer::new(8);
244
245        assert_eq!(buffer.fill_ratio(), 0.0);
246
247        buffer.push(DVSEvent::new(0, 0, 0, true)).unwrap();
248        buffer.push(DVSEvent::new(1, 0, 0, true)).unwrap();
249
250        assert!((buffer.fill_ratio() - 0.25).abs() < 0.01);
251    }
252
253    #[test]
254    fn test_spsc_threaded() {
255        let buffer = std::sync::Arc::new(EventRingBuffer::new(1024));
256        let buffer_clone = buffer.clone();
257
258        const NUM_EVENTS: usize = 10000;
259
260        // Producer thread
261        let producer = thread::spawn(move || {
262            for i in 0..NUM_EVENTS {
263                let event = DVSEvent::new(i as u64, (i % 256) as u16, i as u32, true);
264                while buffer_clone.push(event).is_err() {
265                    std::hint::spin_loop();
266                }
267            }
268        });
269
270        // Consumer thread
271        let consumer = thread::spawn(move || {
272            let mut count = 0;
273            let mut last_timestamp = 0u64;
274
275            while count < NUM_EVENTS {
276                if let Some(event) = buffer.pop() {
277                    assert!(event.timestamp() >= last_timestamp);
278                    last_timestamp = event.timestamp();
279                    count += 1;
280                }
281            }
282            count
283        });
284
285        producer.join().unwrap();
286        let received = consumer.join().unwrap();
287        assert_eq!(received, NUM_EVENTS);
288    }
289
290    #[test]
291    fn test_concurrent_push_pop() {
292        let buffer = std::sync::Arc::new(EventRingBuffer::new(512));
293        let mut handles = vec![];
294
295        // Producer
296        let buf = buffer.clone();
297        handles.push(thread::spawn(move || {
298            for i in 0..1000 {
299                let event = DVSEvent::new(i, 0, 0, true);
300                while buf.push(event).is_err() {
301                    thread::yield_now();
302                }
303            }
304        }));
305
306        // Consumer
307        let buf = buffer.clone();
308        let consumer_handle = thread::spawn(move || {
309            let mut count = 0;
310            while count < 1000 {
311                if buf.pop().is_some() {
312                    count += 1;
313                }
314            }
315            count
316        });
317
318        for handle in handles {
319            handle.join().unwrap();
320        }
321
322        let received = consumer_handle.join().unwrap();
323        assert_eq!(received, 1000);
324        assert!(buffer.is_empty());
325    }
326}