ringbuf_basedrop/
ring_buffer.rs

1use crate::{consumer::Consumer, producer::Producer};
2use alloc::vec::Vec;
3use basedrop::{Handle, Shared};
4use cache_padded::CachePadded;
5use core::{
6    cell::UnsafeCell,
7    cmp::min,
8    mem::MaybeUninit,
9    ptr::{self, copy},
10    sync::atomic::{AtomicUsize, Ordering},
11};
12
13pub(crate) struct SharedVec<T: Send + Sized + 'static> {
14    cell: UnsafeCell<Vec<T>>,
15    len: usize,
16}
17
18unsafe impl<T: Send + Sized + 'static> Sync for SharedVec<T> {}
19
20impl<T: Send + Sized + 'static> SharedVec<T> {
21    pub fn new(data: Vec<T>) -> Self {
22        Self {
23            len: data.len(),
24            cell: UnsafeCell::new(data),
25        }
26    }
27
28    pub fn len(&self) -> usize {
29        self.len
30    }
31    pub unsafe fn get_ref(&self) -> &Vec<T> {
32        &*self.cell.get()
33    }
34    #[allow(clippy::mut_from_ref)]
35    pub unsafe fn get_mut(&self) -> &mut Vec<T> {
36        &mut *self.cell.get()
37    }
38}
39
40/// Ring buffer itself.
41pub struct RingBuffer<T: Send + Sized + 'static> {
42    pub(crate) data: SharedVec<MaybeUninit<T>>,
43    pub(crate) head: CachePadded<AtomicUsize>,
44    pub(crate) tail: CachePadded<AtomicUsize>,
45}
46
47impl<T: Send + Sized + 'static> RingBuffer<T> {
48    /// Creates a new instance of a ring buffer.
49    pub fn new(capacity: usize) -> Self {
50        let mut data = Vec::new();
51        data.resize_with(capacity + 1, MaybeUninit::uninit);
52        Self {
53            data: SharedVec::new(data),
54            head: CachePadded::new(AtomicUsize::new(0)),
55            tail: CachePadded::new(AtomicUsize::new(0)),
56        }
57    }
58
59    /// Splits ring buffer into producer and consumer.
60    pub fn split(self, handle: &Handle) -> (Producer<T>, Consumer<T>) {
61        let shared = Shared::new(handle, self);
62        (
63            Producer {
64                rb: Shared::clone(&shared),
65            },
66            Consumer { rb: shared },
67        )
68    }
69
70    /// Returns capacity of the ring buffer.
71    pub fn capacity(&self) -> usize {
72        self.data.len() - 1
73    }
74
75    /// Checks if the ring buffer is empty.
76    pub fn is_empty(&self) -> bool {
77        let head = self.head.load(Ordering::Acquire);
78        let tail = self.tail.load(Ordering::Acquire);
79        head == tail
80    }
81
82    /// Checks if the ring buffer is full.
83    pub fn is_full(&self) -> bool {
84        let head = self.head.load(Ordering::Acquire);
85        let tail = self.tail.load(Ordering::Acquire);
86        (tail + 1) % self.data.len() == head
87    }
88
89    /// The length of the data in the buffer.
90    pub fn len(&self) -> usize {
91        let head = self.head.load(Ordering::Acquire);
92        let tail = self.tail.load(Ordering::Acquire);
93        (tail + self.data.len() - head) % self.data.len()
94    }
95
96    /// The remaining space in the buffer.
97    pub fn remaining(&self) -> usize {
98        self.capacity() - self.len()
99    }
100}
101
102impl<T: Send + Sized + 'static> Drop for RingBuffer<T> {
103    fn drop(&mut self) {
104        let data = unsafe { self.data.get_mut() };
105
106        let head = self.head.load(Ordering::Acquire);
107        let tail = self.tail.load(Ordering::Acquire);
108        let len = data.len();
109
110        let slices = if head <= tail {
111            (head..tail, 0..0)
112        } else {
113            (head..len, 0..tail)
114        };
115
116        let drop = |elem_ref: &mut MaybeUninit<T>| unsafe {
117            elem_ref.as_ptr().read();
118        };
119        for elem in data[slices.0].iter_mut() {
120            drop(elem);
121        }
122        for elem in data[slices.1].iter_mut() {
123            drop(elem);
124        }
125    }
126}
127
128struct SlicePtr<T: Send + Sized + 'static> {
129    pub ptr: *mut T,
130    pub len: usize,
131}
132
133impl<T: Send + Sized + 'static> SlicePtr<T> {
134    fn null() -> Self {
135        Self {
136            ptr: ptr::null_mut(),
137            len: 0,
138        }
139    }
140    fn new(slice: &mut [T]) -> Self {
141        Self {
142            ptr: slice.as_mut_ptr(),
143            len: slice.len(),
144        }
145    }
146    unsafe fn shift(&mut self, count: usize) {
147        self.ptr = self.ptr.add(count);
148        self.len -= count;
149    }
150}
151
152/// Moves at most `count` items from the `src` consumer to the `dst` producer.
153/// Consumer and producer may be of different buffers as well as of the same one.
154///
155/// `count` is the number of items being moved, if `None` - as much as possible items will be moved.
156///
157/// Returns number of items been moved.
158pub fn move_items<T: Send + Sized + 'static>(
159    src: &mut Consumer<T>,
160    dst: &mut Producer<T>,
161    count: Option<usize>,
162) -> usize {
163    unsafe {
164        src.pop_access(|src_left, src_right| -> usize {
165            dst.push_access(|dst_left, dst_right| -> usize {
166                let n = count.unwrap_or_else(|| {
167                    min(
168                        src_left.len() + src_right.len(),
169                        dst_left.len() + dst_right.len(),
170                    )
171                });
172                let mut m = 0;
173                let mut src = (SlicePtr::new(src_left), SlicePtr::new(src_right));
174                let mut dst = (SlicePtr::new(dst_left), SlicePtr::new(dst_right));
175
176                loop {
177                    let k = min(n - m, min(src.0.len, dst.0.len));
178                    if k == 0 {
179                        break;
180                    }
181                    copy(src.0.ptr, dst.0.ptr, k);
182                    if src.0.len == k {
183                        src.0 = src.1;
184                        src.1 = SlicePtr::null();
185                    } else {
186                        src.0.shift(k);
187                    }
188                    if dst.0.len == k {
189                        dst.0 = dst.1;
190                        dst.1 = SlicePtr::null();
191                    } else {
192                        dst.0.shift(k);
193                    }
194                    m += k
195                }
196
197                m
198            })
199        })
200    }
201}