Skip to main content

spyne_sync/
spsc.rs

1use std::{mem::MaybeUninit, sync::atomic::{AtomicUsize, Ordering}};
2
3#[repr(align(64))]
4pub struct RingIndex(AtomicUsize);
5
6pub struct RingBuffer<T> {
7    buf: Box<[MaybeUninit<T>]>,
8    capacity: usize,
9    write_index: RingIndex,
10    read_index: RingIndex
11}
12
13impl<T> RingBuffer<T> {
14    pub fn new(size: usize) -> Self {
15        let mut v: Vec<MaybeUninit<T>> = Vec::with_capacity(size);
16        for _ in 0..size {
17            v.push(MaybeUninit::uninit());
18        }
19        
20        Self {
21            buf: v.into_boxed_slice(),
22            capacity: size,
23            write_index: RingIndex(AtomicUsize::new(0)),
24            read_index: RingIndex(AtomicUsize::new(0))
25        }
26    }
27    
28    pub fn enqueue(&self, item: T) -> Result<(), T> {
29        let write_idx = self.write_index.0.load(Ordering::Relaxed);
30        let read_idx = self.read_index.0.load(Ordering::Acquire);
31        if (write_idx + 1) % self.capacity != read_idx {
32            unsafe {
33                let slot = self.buf.as_ptr().add(write_idx) as *mut MaybeUninit<T>;
34                (*slot).write(item);
35            }
36            self.write_index.0.store((write_idx + 1) % self.capacity, Ordering::Release);
37            
38            Ok(())
39        }
40        else {
41            Err(item)
42        }
43    }
44    
45    pub fn dequeue(&self) -> Option<T> {
46        let write_idx = self.write_index.0.load(Ordering::Acquire);
47        let read_idx = self.read_index.0.load(Ordering::Relaxed);
48        if write_idx != read_idx {
49            let item = unsafe { Some(self.buf[read_idx].assume_init_read()) };
50            self.read_index.0.store((read_idx + 1) % self.capacity, Ordering::Release);
51            
52            item
53        }
54        else {
55            None
56        }
57    }
58}
59
60impl<T> Drop for RingBuffer<T> {
61    fn drop(&mut self) {
62        let mut curr_idx = self.read_index.0.load(Ordering::Relaxed);
63        let write_idx = self.write_index.0.load(Ordering::Relaxed);
64        while curr_idx != write_idx {
65            unsafe { self.buf[curr_idx].assume_init_drop() };
66            if curr_idx + 1 == self.buf.len() {
67                curr_idx = 0;
68            }
69            else {
70                curr_idx += 1;
71            }
72        }
73    }
74}
75
76#[cfg(test)]
77mod test {
78    use crate::spsc::RingBuffer;
79
80    #[test]
81    fn test_ring_buffer() {
82        let rb = RingBuffer::<usize>::new(4);
83        rb.enqueue(5).expect("5 push failed");
84        rb.enqueue(4).expect("4 push failed");
85        rb.enqueue(3).expect("3 push failed");
86        rb.enqueue(2).expect_err("2 push should fail");
87        assert_eq!(rb.dequeue().unwrap(), 5);
88        assert_eq!(rb.dequeue().unwrap(), 4);
89        assert_eq!(rb.dequeue().unwrap(), 3);
90        assert_eq!(rb.dequeue(), None);
91    }
92}