kestrel_timer/utils/
ringbuf.rs1use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::Arc;
12use super::vec::FixedVec;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum PushError<T> {
19 Full(T),
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum PopError {
30 Empty,
34}
35
36pub struct SharedData<T> {
40 buffer: FixedVec<T, 32>,
44
45 capacity: usize,
49
50 mask: usize,
54
55 write_idx: AtomicUsize,
59
60 read_idx: AtomicUsize,
64}
65
66pub struct Producer<T> {
70 shared: Arc<SharedData<T>>,
74
75 cached_read: usize,
79}
80
81pub struct Consumer<T> {
85 shared: Arc<SharedData<T>>,
89
90 cached_write: usize,
94}
95
96impl<T> SharedData<T> {
97 #[inline]
101 pub fn capacity(&self) -> usize {
102 self.capacity
103 }
104}
105
106pub fn new<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
128 assert!(capacity > 0, "Capacity must be greater than 0");
129
130 let actual_capacity = capacity.next_power_of_two();
133 let mask = actual_capacity - 1;
134
135 let mut buffer = FixedVec::with_capacity(actual_capacity);
136 unsafe {
137 buffer.set_len(actual_capacity);
138 }
139
140 let shared = Arc::new(SharedData {
141 buffer,
142 capacity: actual_capacity,
143 mask,
144 write_idx: AtomicUsize::new(0),
145 read_idx: AtomicUsize::new(0),
146 });
147
148 let producer = Producer {
149 shared: shared.clone(),
150 cached_read: 0,
151 };
152
153 let consumer = Consumer {
154 shared,
155 cached_write: 0,
156 };
157
158 (producer, consumer)
159}
160
161impl<T> Producer<T> {
162 #[inline]
172 pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
173 let write = self.shared.write_idx.load(Ordering::Relaxed);
174 let mut read = self.cached_read;
175
176 if write.wrapping_sub(read) >= self.shared.capacity {
179 read = self.shared.read_idx.load(Ordering::Acquire);
182 self.cached_read = read;
183
184 if write.wrapping_sub(read) >= self.shared.capacity {
185 return Err(PushError::Full(value));
186 }
187 }
188
189 let index = write & self.shared.mask;
192 unsafe {
193 let ptr = self.shared.buffer.get_unchecked_ptr(index).cast::<T>() as *mut T;
194 ptr.write(value);
195 }
196
197 self.shared.write_idx.store(write.wrapping_add(1), Ordering::Release);
200
201 Ok(())
202 }
203}
204
205impl<T> Consumer<T> {
206 #[inline]
216 pub fn pop(&mut self) -> Result<T, PopError> {
217 let read = self.shared.read_idx.load(Ordering::Relaxed);
218 let mut write = self.cached_write;
219
220 if read == write {
223 write = self.shared.write_idx.load(Ordering::Acquire);
226 self.cached_write = write;
227
228 if read == write {
229 return Err(PopError::Empty);
230 }
231 }
232
233 let index = read & self.shared.mask;
236 let value = unsafe {
237 let ptr = self.shared.buffer.get_unchecked_ptr(index).cast::<T>();
238 ptr.read()
239 };
240
241 self.shared.read_idx.store(read.wrapping_add(1), Ordering::Release);
244
245 Ok(value)
246 }
247
248 #[inline]
252 pub fn is_empty(&self) -> bool {
253 let read = self.shared.read_idx.load(Ordering::Relaxed);
254 let write = self.shared.write_idx.load(Ordering::Acquire);
255 read == write
256 }
257
258 #[inline]
262 pub fn slots(&self) -> usize {
263 let read = self.shared.read_idx.load(Ordering::Relaxed);
264 let write = self.shared.write_idx.load(Ordering::Acquire);
265 write.wrapping_sub(read)
266 }
267
268 #[inline]
272 pub fn buffer(&self) -> &SharedData<T> {
273 &self.shared
274 }
275}
276
277impl<T> Drop for Consumer<T> {
278 fn drop(&mut self) {
279 while self.pop().is_ok() {
282 }
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291
292 #[test]
293 fn test_basic_push_pop() {
294 let (mut producer, mut consumer) = new::<i32>(4);
295
296 assert!(producer.push(1).is_ok());
297 assert!(producer.push(2).is_ok());
298 assert!(producer.push(3).is_ok());
299
300 assert_eq!(consumer.pop().unwrap(), 1);
301 assert_eq!(consumer.pop().unwrap(), 2);
302 assert_eq!(consumer.pop().unwrap(), 3);
303 assert!(consumer.pop().is_err());
304 }
305
306 #[test]
307 fn test_capacity_rounding() {
308 let (_, consumer) = new::<i32>(5);
309 assert_eq!(consumer.buffer().capacity(), 8);
311
312 let (_, consumer) = new::<i32>(32);
313 assert_eq!(consumer.buffer().capacity(), 32);
314
315 let (_, consumer) = new::<i32>(33);
316 assert_eq!(consumer.buffer().capacity(), 64);
318 }
319
320 #[test]
321 fn test_buffer_full() {
322 let (mut producer, mut consumer) = new::<i32>(4);
323 assert!(producer.push(1).is_ok());
326 assert!(producer.push(2).is_ok());
327 assert!(producer.push(3).is_ok());
328 assert!(producer.push(4).is_ok());
329
330 assert!(matches!(producer.push(5), Err(PushError::Full(5))));
332
333 assert_eq!(consumer.pop().unwrap(), 1);
335
336 assert!(producer.push(5).is_ok());
338 }
339
340 #[test]
341 fn test_buffer_empty() {
342 let (mut producer, mut consumer) = new::<i32>(4);
343
344 assert!(consumer.pop().is_err());
345 assert!(consumer.is_empty());
346
347 producer.push(42).unwrap();
348 assert!(!consumer.is_empty());
349
350 consumer.pop().unwrap();
351 assert!(consumer.is_empty());
352 }
353
354 #[test]
355 fn test_slots() {
356 let (mut producer, consumer) = new::<i32>(8);
357
358 assert_eq!(consumer.slots(), 0);
359
360 producer.push(1).unwrap();
361 producer.push(2).unwrap();
362 producer.push(3).unwrap();
363
364 assert_eq!(consumer.slots(), 3);
365 }
366
367 #[test]
368 fn test_wrap_around() {
369 let (mut producer, mut consumer) = new::<i32>(4);
370
371 for round in 0..10 {
373 for i in 0..4 {
374 producer.push(round * 10 + i).unwrap();
375 }
376
377 for i in 0..4 {
378 assert_eq!(consumer.pop().unwrap(), round * 10 + i);
379 }
380 }
381 }
382
383 #[test]
384 fn test_drop_cleanup() {
385 use std::sync::atomic::{AtomicUsize, Ordering};
386 use std::sync::Arc;
387
388 #[derive(Debug)]
389 struct DropCounter {
390 counter: Arc<AtomicUsize>,
391 }
392
393 impl Drop for DropCounter {
394 fn drop(&mut self) {
395 self.counter.fetch_add(1, Ordering::SeqCst);
396 }
397 }
398
399 let counter = Arc::new(AtomicUsize::new(0));
400
401 {
402 let (mut producer, consumer) = new(8);
403
404 for _ in 0..5 {
405 producer.push(DropCounter { counter: counter.clone() }).unwrap();
406 }
407
408 drop(consumer);
410 }
411
412 assert_eq!(counter.load(Ordering::SeqCst), 5);
414 }
415
416 #[test]
417 fn test_concurrent_access() {
418 use std::thread;
419
420 let (mut producer, mut consumer) = new::<u64>(128);
421
422 let producer_handle = thread::spawn(move || {
423 for i in 0..1000 {
424 loop {
425 if producer.push(i).is_ok() {
426 break;
427 }
428 thread::yield_now();
429 }
430 }
431 });
432
433 let consumer_handle = thread::spawn(move || {
434 let mut received = Vec::new();
435 for _ in 0..1000 {
436 loop {
437 match consumer.pop() {
438 Ok(val) => {
439 received.push(val);
440 break;
441 }
442 Err(_) => thread::yield_now(),
443 }
444 }
445 }
446 received
447 });
448
449 producer_handle.join().unwrap();
450 let received = consumer_handle.join().unwrap();
451
452 assert_eq!(received.len(), 1000);
454 for (i, &val) in received.iter().enumerate() {
455 assert_eq!(val, i as u64);
456 }
457 }
458
459 #[test]
460 fn test_small_capacity_stack_allocation() {
461 let (mut producer, mut consumer) = new::<u8>(16);
464
465 for i in 0..10 {
466 producer.push(i).unwrap();
467 }
468
469 for i in 0..10 {
470 assert_eq!(consumer.pop().unwrap(), i);
471 }
472 }
473
474 #[test]
475 fn test_large_capacity_heap_allocation() {
476 let (mut producer, mut consumer) = new::<u8>(64);
478
479 for i in 0..50 {
480 producer.push(i).unwrap();
481 }
482
483 for i in 0..50 {
484 assert_eq!(consumer.pop().unwrap(), i);
485 }
486 }
487
488 #[test]
489 #[should_panic(expected = "Capacity must be greater than 0")]
490 fn test_zero_capacity_panics() {
491 let _ = new::<i32>(0);
492 }
493}
494