kestrel_timer/utils/
ringbuf.rs

1/// High-performance SPSC Ring Buffer with stack/heap optimization
2/// 
3/// 基于栈/堆优化的高性能 SPSC 环形缓冲区
4/// 
5/// This implementation uses FixedVec to store data on the stack for small capacities (≤32),
6/// avoiding heap allocation overhead and improving `new()` performance.
7/// 
8/// 此实现使用 FixedVec 在栈上存储小容量数据(≤32),避免堆分配开销,提升 `new()` 性能。
9
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::Arc;
12use super::vec::FixedVec;
13
14/// Ring buffer error for push operations
15/// 
16/// push 操作的环形缓冲区错误
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum PushError<T> {
19    /// Buffer is full
20    /// 
21    /// 缓冲区已满
22    Full(T),
23}
24
25/// Ring buffer error for pop operations
26/// 
27/// pop 操作的环形缓冲区错误
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum PopError {
30    /// Buffer is empty
31    /// 
32    /// 缓冲区为空
33    Empty,
34}
35
36/// Shared data between producer and consumer
37/// 
38/// 生产者和消费者之间的共享数据
39pub struct SharedData<T> {
40    /// Buffer storage using FixedVec for stack allocation optimization
41    /// 
42    /// 使用 FixedVec 的缓冲区存储,优化栈分配
43    buffer: FixedVec<T, 32>,
44    
45    /// Actual capacity (power of 2)
46    /// 
47    /// 实际容量(2 的幂次)
48    capacity: usize,
49    
50    /// Mask for fast modulo operation (capacity - 1)
51    /// 
52    /// 快速取模运算的掩码(capacity - 1)
53    mask: usize,
54    
55    /// Write index (accessed by producer, read by consumer)
56    /// 
57    /// 写入索引(由生产者访问,由消费者读取)
58    write_idx: AtomicUsize,
59    
60    /// Read index (accessed by consumer, read by producer)
61    /// 
62    /// 读取索引(由消费者访问,由生产者读取)
63    read_idx: AtomicUsize,
64}
65
66/// Producer half of the ring buffer
67/// 
68/// 环形缓冲区的生产者端
69pub struct Producer<T> {
70    /// Shared data
71    /// 
72    /// 共享数据
73    shared: Arc<SharedData<T>>,
74    
75    /// Cached read index for performance (avoid reading atomic repeatedly)
76    /// 
77    /// 缓存的读索引以提升性能(避免重复读取原子变量)
78    cached_read: usize,
79}
80
81/// Consumer half of the ring buffer
82/// 
83/// 环形缓冲区的消费者端
84pub struct Consumer<T> {
85    /// Shared data
86    /// 
87    /// 共享数据
88    shared: Arc<SharedData<T>>,
89    
90    /// Cached write index for performance (avoid reading atomic repeatedly)
91    /// 
92    /// 缓存的写索引以提升性能(避免重复读取原子变量)
93    cached_write: usize,
94}
95
96impl<T> SharedData<T> {
97    /// Get the capacity of the buffer
98    /// 
99    /// 获取缓冲区容量
100    #[inline]
101    pub fn capacity(&self) -> usize {
102        self.capacity
103    }
104}
105
106/// Create a new ring buffer with the specified capacity
107/// 
108/// 创建指定容量的新环形缓冲区
109/// 
110/// # Parameters
111/// - `capacity`: Desired capacity (will be rounded up to next power of 2)
112/// 
113/// # Returns
114/// A tuple of (Producer, Consumer)
115/// 
116/// # Panics
117/// Panics if capacity is 0
118/// 
119/// # 参数
120/// - `capacity`: 期望容量(将向上取整到下一个 2 的幂次)
121/// 
122/// # 返回值
123/// 返回 (Producer, Consumer) 元组
124/// 
125/// # Panics
126/// 如果容量为 0 则 panic
127pub fn new<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
128        assert!(capacity > 0, "Capacity must be greater than 0");
129        
130        // Round up to next power of 2 for efficient masking
131        // 向上取整到下一个 2 的幂次以实现高效的掩码操作
132        let actual_capacity = capacity.next_power_of_two();
133        let mask = actual_capacity - 1;
134        
135        let mut buffer = FixedVec::with_capacity(actual_capacity);
136        unsafe {
137            buffer.set_len(actual_capacity);
138        }
139        
140        let shared = Arc::new(SharedData {
141            buffer,
142            capacity: actual_capacity,
143            mask,
144            write_idx: AtomicUsize::new(0),
145            read_idx: AtomicUsize::new(0),
146        });
147        
148        let producer = Producer {
149            shared: shared.clone(),
150            cached_read: 0,
151        };
152        
153        let consumer = Consumer {
154            shared,
155            cached_write: 0,
156        };
157        
158        (producer, consumer)
159}
160
161impl<T> Producer<T> {
162    /// Push a value into the buffer
163    /// 
164    /// 向缓冲区推送一个值
165    /// 
166    /// # Errors
167    /// Returns `PushError::Full` if the buffer is full
168    /// 
169    /// # 错误
170    /// 如果缓冲区满则返回 `PushError::Full`
171    #[inline]
172    pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
173        let write = self.shared.write_idx.load(Ordering::Relaxed);
174        let mut read = self.cached_read;
175        
176        // Check if buffer is full
177        // 检查缓冲区是否已满
178        if write.wrapping_sub(read) >= self.shared.capacity {
179            // Update cached read index from consumer
180            // 从消费者更新缓存的读索引
181            read = self.shared.read_idx.load(Ordering::Acquire);
182            self.cached_read = read;
183            
184            if write.wrapping_sub(read) >= self.shared.capacity {
185                return Err(PushError::Full(value));
186            }
187        }
188        
189        // Write value to buffer
190        // 将值写入缓冲区
191        let index = write & self.shared.mask;
192        unsafe {
193            let ptr = self.shared.buffer.get_unchecked_ptr(index).cast::<T>() as *mut T;
194            ptr.write(value);
195        }
196        
197        // Update write index with Release ordering to ensure visibility
198        // 使用 Release 顺序更新写索引以确保可见性
199        self.shared.write_idx.store(write.wrapping_add(1), Ordering::Release);
200        
201        Ok(())
202    }
203}
204
205impl<T> Consumer<T> {
206    /// Pop a value from the buffer
207    /// 
208    /// 从缓冲区弹出一个值
209    /// 
210    /// # Errors
211    /// Returns `PopError::Empty` if the buffer is empty
212    /// 
213    /// # 错误
214    /// 如果缓冲区空则返回 `PopError::Empty`
215    #[inline]
216    pub fn pop(&mut self) -> Result<T, PopError> {
217        let read = self.shared.read_idx.load(Ordering::Relaxed);
218        let mut write = self.cached_write;
219        
220        // Check if buffer is empty
221        // 检查缓冲区是否为空
222        if read == write {
223            // Update cached write index from producer
224            // 从生产者更新缓存的写索引
225            write = self.shared.write_idx.load(Ordering::Acquire);
226            self.cached_write = write;
227            
228            if read == write {
229                return Err(PopError::Empty);
230            }
231        }
232        
233        // Read value from buffer
234        // 从缓冲区读取值
235        let index = read & self.shared.mask;
236        let value = unsafe {
237            let ptr = self.shared.buffer.get_unchecked_ptr(index).cast::<T>();
238            ptr.read()
239        };
240        
241        // Update read index with Release ordering to ensure visibility
242        // 使用 Release 顺序更新读索引以确保可见性
243        self.shared.read_idx.store(read.wrapping_add(1), Ordering::Release);
244        
245        Ok(value)
246    }
247    
248    /// Check if the buffer is empty
249    /// 
250    /// 检查缓冲区是否为空
251    #[inline]
252    pub fn is_empty(&self) -> bool {
253        let read = self.shared.read_idx.load(Ordering::Relaxed);
254        let write = self.shared.write_idx.load(Ordering::Acquire);
255        read == write
256    }
257    
258    /// Get the number of elements currently in the buffer
259    /// 
260    /// 获取缓冲区中当前的元素数量
261    #[inline]
262    pub fn slots(&self) -> usize {
263        let read = self.shared.read_idx.load(Ordering::Relaxed);
264        let write = self.shared.write_idx.load(Ordering::Acquire);
265        write.wrapping_sub(read)
266    }
267    
268    /// Get a reference to the shared buffer data
269    /// 
270    /// 获取共享缓冲区数据的引用
271    #[inline]
272    pub fn buffer(&self) -> &SharedData<T> {
273        &self.shared
274    }
275}
276
277impl<T> Drop for Consumer<T> {
278    fn drop(&mut self) {
279        // Clean up any remaining elements in the buffer
280        // 清理缓冲区中的剩余元素
281        while self.pop().is_ok() {
282            // Elements are dropped automatically
283            // 元素自动被 drop
284        }
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291    
292    #[test]
293    fn test_basic_push_pop() {
294        let (mut producer, mut consumer) = new::<i32>(4);
295        
296        assert!(producer.push(1).is_ok());
297        assert!(producer.push(2).is_ok());
298        assert!(producer.push(3).is_ok());
299        
300        assert_eq!(consumer.pop().unwrap(), 1);
301        assert_eq!(consumer.pop().unwrap(), 2);
302        assert_eq!(consumer.pop().unwrap(), 3);
303        assert!(consumer.pop().is_err());
304    }
305    
306    #[test]
307    fn test_capacity_rounding() {
308        let (_, consumer) = new::<i32>(5);
309        // 5 should round up to 8 (next power of 2)
310        assert_eq!(consumer.buffer().capacity(), 8);
311        
312        let (_, consumer) = new::<i32>(32);
313        assert_eq!(consumer.buffer().capacity(), 32);
314        
315        let (_, consumer) = new::<i32>(33);
316        // 33 should round up to 64
317        assert_eq!(consumer.buffer().capacity(), 64);
318    }
319    
320    #[test]
321    fn test_buffer_full() {
322        let (mut producer, mut consumer) = new::<i32>(4);
323        // Actual capacity is 4, but we can only store 3 items (one slot reserved)
324        
325        assert!(producer.push(1).is_ok());
326        assert!(producer.push(2).is_ok());
327        assert!(producer.push(3).is_ok());
328        assert!(producer.push(4).is_ok());
329        
330        // Buffer should be full now
331        assert!(matches!(producer.push(5), Err(PushError::Full(5))));
332        
333        // Pop one item to make space
334        assert_eq!(consumer.pop().unwrap(), 1);
335        
336        // Now we should be able to push again
337        assert!(producer.push(5).is_ok());
338    }
339    
340    #[test]
341    fn test_buffer_empty() {
342        let (mut producer, mut consumer) = new::<i32>(4);
343        
344        assert!(consumer.pop().is_err());
345        assert!(consumer.is_empty());
346        
347        producer.push(42).unwrap();
348        assert!(!consumer.is_empty());
349        
350        consumer.pop().unwrap();
351        assert!(consumer.is_empty());
352    }
353    
354    #[test]
355    fn test_slots() {
356        let (mut producer, consumer) = new::<i32>(8);
357        
358        assert_eq!(consumer.slots(), 0);
359        
360        producer.push(1).unwrap();
361        producer.push(2).unwrap();
362        producer.push(3).unwrap();
363        
364        assert_eq!(consumer.slots(), 3);
365    }
366    
367    #[test]
368    fn test_wrap_around() {
369        let (mut producer, mut consumer) = new::<i32>(4);
370        
371        // Fill and empty the buffer multiple times to test wrap-around
372        for round in 0..10 {
373            for i in 0..4 {
374                producer.push(round * 10 + i).unwrap();
375            }
376            
377            for i in 0..4 {
378                assert_eq!(consumer.pop().unwrap(), round * 10 + i);
379            }
380        }
381    }
382    
383    #[test]
384    fn test_drop_cleanup() {
385        use std::sync::atomic::{AtomicUsize, Ordering};
386        use std::sync::Arc;
387        
388        #[derive(Debug)]
389        struct DropCounter {
390            counter: Arc<AtomicUsize>,
391        }
392        
393        impl Drop for DropCounter {
394            fn drop(&mut self) {
395                self.counter.fetch_add(1, Ordering::SeqCst);
396            }
397        }
398        
399        let counter = Arc::new(AtomicUsize::new(0));
400        
401        {
402            let (mut producer, consumer) = new(8);
403            
404            for _ in 0..5 {
405                producer.push(DropCounter { counter: counter.clone() }).unwrap();
406            }
407            
408            // Drop consumer, which should drop all remaining items
409            drop(consumer);
410        }
411        
412        // All 5 items should have been dropped
413        assert_eq!(counter.load(Ordering::SeqCst), 5);
414    }
415    
416    #[test]
417    fn test_concurrent_access() {
418        use std::thread;
419        
420        let (mut producer, mut consumer) = new::<u64>(128);
421        
422        let producer_handle = thread::spawn(move || {
423            for i in 0..1000 {
424                loop {
425                    if producer.push(i).is_ok() {
426                        break;
427                    }
428                    thread::yield_now();
429                }
430            }
431        });
432        
433        let consumer_handle = thread::spawn(move || {
434            let mut received = Vec::new();
435            for _ in 0..1000 {
436                loop {
437                    match consumer.pop() {
438                        Ok(val) => {
439                            received.push(val);
440                            break;
441                        }
442                        Err(_) => thread::yield_now(),
443                    }
444                }
445            }
446            received
447        });
448        
449        producer_handle.join().unwrap();
450        let received = consumer_handle.join().unwrap();
451        
452        // Verify all numbers were received in order
453        assert_eq!(received.len(), 1000);
454        for (i, &val) in received.iter().enumerate() {
455            assert_eq!(val, i as u64);
456        }
457    }
458    
459    #[test]
460    fn test_small_capacity_stack_allocation() {
461        // Test that small capacities (≤32) use stack allocation
462        // This test mainly ensures the code compiles and works with SmallVec
463        let (mut producer, mut consumer) = new::<u8>(16);
464        
465        for i in 0..10 {
466            producer.push(i).unwrap();
467        }
468        
469        for i in 0..10 {
470            assert_eq!(consumer.pop().unwrap(), i);
471        }
472    }
473    
474    #[test]
475    fn test_large_capacity_heap_allocation() {
476        // Test that large capacities (>32) work correctly with heap allocation
477        let (mut producer, mut consumer) = new::<u8>(64);
478        
479        for i in 0..50 {
480            producer.push(i).unwrap();
481        }
482        
483        for i in 0..50 {
484            assert_eq!(consumer.pop().unwrap(), i);
485        }
486    }
487    
488    #[test]
489    #[should_panic(expected = "Capacity must be greater than 0")]
490    fn test_zero_capacity_panics() {
491        let _ = new::<i32>(0);
492    }
493}
494