ringbuf_basedrop/
ring_buffer.rs1use crate::{consumer::Consumer, producer::Producer};
2use alloc::vec::Vec;
3use basedrop::{Handle, Shared};
4use cache_padded::CachePadded;
5use core::{
6 cell::UnsafeCell,
7 cmp::min,
8 mem::MaybeUninit,
9 ptr::{self, copy},
10 sync::atomic::{AtomicUsize, Ordering},
11};
12
13pub(crate) struct SharedVec<T: Send + Sized + 'static> {
14 cell: UnsafeCell<Vec<T>>,
15 len: usize,
16}
17
18unsafe impl<T: Send + Sized + 'static> Sync for SharedVec<T> {}
19
20impl<T: Send + Sized + 'static> SharedVec<T> {
21 pub fn new(data: Vec<T>) -> Self {
22 Self {
23 len: data.len(),
24 cell: UnsafeCell::new(data),
25 }
26 }
27
28 pub fn len(&self) -> usize {
29 self.len
30 }
31 pub unsafe fn get_ref(&self) -> &Vec<T> {
32 &*self.cell.get()
33 }
34 #[allow(clippy::mut_from_ref)]
35 pub unsafe fn get_mut(&self) -> &mut Vec<T> {
36 &mut *self.cell.get()
37 }
38}
39
40pub struct RingBuffer<T: Send + Sized + 'static> {
42 pub(crate) data: SharedVec<MaybeUninit<T>>,
43 pub(crate) head: CachePadded<AtomicUsize>,
44 pub(crate) tail: CachePadded<AtomicUsize>,
45}
46
47impl<T: Send + Sized + 'static> RingBuffer<T> {
48 pub fn new(capacity: usize) -> Self {
50 let mut data = Vec::new();
51 data.resize_with(capacity + 1, MaybeUninit::uninit);
52 Self {
53 data: SharedVec::new(data),
54 head: CachePadded::new(AtomicUsize::new(0)),
55 tail: CachePadded::new(AtomicUsize::new(0)),
56 }
57 }
58
59 pub fn split(self, handle: &Handle) -> (Producer<T>, Consumer<T>) {
61 let shared = Shared::new(handle, self);
62 (
63 Producer {
64 rb: Shared::clone(&shared),
65 },
66 Consumer { rb: shared },
67 )
68 }
69
70 pub fn capacity(&self) -> usize {
72 self.data.len() - 1
73 }
74
75 pub fn is_empty(&self) -> bool {
77 let head = self.head.load(Ordering::Acquire);
78 let tail = self.tail.load(Ordering::Acquire);
79 head == tail
80 }
81
82 pub fn is_full(&self) -> bool {
84 let head = self.head.load(Ordering::Acquire);
85 let tail = self.tail.load(Ordering::Acquire);
86 (tail + 1) % self.data.len() == head
87 }
88
89 pub fn len(&self) -> usize {
91 let head = self.head.load(Ordering::Acquire);
92 let tail = self.tail.load(Ordering::Acquire);
93 (tail + self.data.len() - head) % self.data.len()
94 }
95
96 pub fn remaining(&self) -> usize {
98 self.capacity() - self.len()
99 }
100}
101
102impl<T: Send + Sized + 'static> Drop for RingBuffer<T> {
103 fn drop(&mut self) {
104 let data = unsafe { self.data.get_mut() };
105
106 let head = self.head.load(Ordering::Acquire);
107 let tail = self.tail.load(Ordering::Acquire);
108 let len = data.len();
109
110 let slices = if head <= tail {
111 (head..tail, 0..0)
112 } else {
113 (head..len, 0..tail)
114 };
115
116 let drop = |elem_ref: &mut MaybeUninit<T>| unsafe {
117 elem_ref.as_ptr().read();
118 };
119 for elem in data[slices.0].iter_mut() {
120 drop(elem);
121 }
122 for elem in data[slices.1].iter_mut() {
123 drop(elem);
124 }
125 }
126}
127
128struct SlicePtr<T: Send + Sized + 'static> {
129 pub ptr: *mut T,
130 pub len: usize,
131}
132
133impl<T: Send + Sized + 'static> SlicePtr<T> {
134 fn null() -> Self {
135 Self {
136 ptr: ptr::null_mut(),
137 len: 0,
138 }
139 }
140 fn new(slice: &mut [T]) -> Self {
141 Self {
142 ptr: slice.as_mut_ptr(),
143 len: slice.len(),
144 }
145 }
146 unsafe fn shift(&mut self, count: usize) {
147 self.ptr = self.ptr.add(count);
148 self.len -= count;
149 }
150}
151
152pub fn move_items<T: Send + Sized + 'static>(
159 src: &mut Consumer<T>,
160 dst: &mut Producer<T>,
161 count: Option<usize>,
162) -> usize {
163 unsafe {
164 src.pop_access(|src_left, src_right| -> usize {
165 dst.push_access(|dst_left, dst_right| -> usize {
166 let n = count.unwrap_or_else(|| {
167 min(
168 src_left.len() + src_right.len(),
169 dst_left.len() + dst_right.len(),
170 )
171 });
172 let mut m = 0;
173 let mut src = (SlicePtr::new(src_left), SlicePtr::new(src_right));
174 let mut dst = (SlicePtr::new(dst_left), SlicePtr::new(dst_right));
175
176 loop {
177 let k = min(n - m, min(src.0.len, dst.0.len));
178 if k == 0 {
179 break;
180 }
181 copy(src.0.ptr, dst.0.ptr, k);
182 if src.0.len == k {
183 src.0 = src.1;
184 src.1 = SlicePtr::null();
185 } else {
186 src.0.shift(k);
187 }
188 if dst.0.len == k {
189 dst.0 = dst.1;
190 dst.1 = SlicePtr::null();
191 } else {
192 dst.0.shift(k);
193 }
194 m += k
195 }
196
197 m
198 })
199 })
200 }
201}