1extern crate parking_lot;
2
3use parking_lot::RwLock;
4use std::cell::UnsafeCell;
5use std::cmp::Ordering;
6use std::mem::{self, MaybeUninit};
7use std::ops::Range;
8use std::sync::Arc;
9const MAX_BUFFER_SIZE: usize = 2_147_483_647;
11
12pub struct RingBuffer<T> {
44 pub(crate) data: UnsafeCell<Box<[MaybeUninit<T>]>>,
45 pub(crate) head: RwLock<(usize, usize)>,
46 pub(crate) tail: RwLock<usize>,
47 pub(crate) capacity: usize,
48}
49
50impl<T> RingBuffer<T>
51where
52 T: Sized + Default + Clone + Copy,
53{
54 pub fn new(offset: usize, size: usize) -> RingBuffer<T> {
56 let sz = std::cmp::min(MAX_BUFFER_SIZE, size);
57 let mut data = Vec::new();
58 data.resize_with(sz + 1, MaybeUninit::uninit);
59 let len = data.len();
60 Self {
61 data: UnsafeCell::new(data.into_boxed_slice()),
62 head: RwLock::new((offset, 0)),
63 tail: RwLock::new(0),
64 capacity: len,
65 }
66 }
67 #[inline(always)]
68 pub fn get_ref(&self) -> &[MaybeUninit<T>] {
69 unsafe { &*self.data.get() }
70 }
71 #[allow(clippy::mut_from_ref)]
72 #[inline(always)]
73 pub fn get_mut(&self) -> &mut [MaybeUninit<T>] {
74 unsafe { &mut *self.data.get() }
75 }
76
77 #[inline(always)]
79 pub fn is_empty(&self) -> bool {
80 let (_, head) = *self.head.read();
81 let tail = *self.tail.read();
82 head == tail
83 }
84
85 #[inline(always)]
87 pub fn is_full(&self) -> bool {
88 let (_, head) = *self.head.read();
89 let tail = *self.tail.read();
90 let capacity = self.capacity;
91 (tail + 1) % capacity == head
92 }
93}
94
95struct IndexUtil;
97impl IndexUtil {
98 #[inline(always)]
100 pub fn calc_range(head: usize, tail: usize, len: usize) -> (Range<usize>, Range<usize>) {
101 match head.partial_cmp(&tail) {
102 Some(Ordering::Less) => (head..tail, 0..0),
103 Some(Ordering::Greater) => (head..len, 0..tail),
104 Some(Ordering::Equal) => (0..0, 0..0),
105 None => (0..0, 0..0),
106 }
107 }
108 #[inline(always)]
110 pub fn exists_index(idx: usize, offset: usize, filled_size: usize) -> Option<usize> {
111 let mut rslt = None;
112 if idx >= offset {
113 let i = idx - offset;
114 if i < filled_size {
115 rslt = Some(i);
116 }
117 } else {
118 let dist_to_max = usize::max_value() - offset;
119 if filled_size - 1 > dist_to_max {
120 let over_size = (filled_size - 1) - dist_to_max;
121 if idx < over_size {
122 rslt = Some(dist_to_max + 1 + idx);
123 }
124 }
125 }
126 rslt
127 }
128}
129
130pub struct Producer<T> {
132 buffer: Arc<RingBuffer<T>>,
133}
134
135impl<T> Producer<T>
136where
137 T: Sized + Default + Clone + Copy,
138{
139 pub fn is_empty(&self) -> bool {
140 self.buffer.is_empty()
141 }
142
143 pub fn is_full(&self) -> bool {
144 self.buffer.is_full()
145 }
146
147 pub fn push(&mut self, v: T) -> bool {
149 let head_guard = self.buffer.head.read();
150 let head = head_guard.1;
151 drop(head_guard);
152
153 let mut tail_guard = self.buffer.tail.write();
154 let tail = *tail_guard;
155 let mut new_tail = tail + 1;
156
157 if new_tail == self.buffer.capacity {
158 new_tail = 0;
159 }
160
161 if head == new_tail {
162 return false;
163 }
164
165 let buf: &mut [MaybeUninit<T>] = self.buffer.get_mut();
166
167 unsafe {
168 mem::replace(buf.get_unchecked_mut(tail), MaybeUninit::new(v));
169 }
170
171 *tail_guard = new_tail;
172 true
173 }
174}
175
176unsafe impl<T> Sync for Producer<T> {}
177unsafe impl<T> Send for Producer<T> {}
178
179pub struct Consumer<T> {
181 buffer: Arc<RingBuffer<T>>,
182}
183
184impl<T> Consumer<T>
185where
186 T: Sized + Default + Clone + Copy,
187{
188 pub fn is_empty(&self) -> bool {
189 self.buffer.is_empty()
190 }
191
192 pub fn is_full(&self) -> bool {
193 self.buffer.is_full()
194 }
195
196 pub fn shift_to(&mut self, to: usize) -> Option<(usize, Vec<T>)> {
198 let tail_guard = self.buffer.tail.read();
199 let tail = *tail_guard;
200 drop(tail_guard);
201
202 let mut head_guard = self.buffer.head.write();
203 let (offset, head) = *head_guard;
204
205 if head == tail {
206 return None;
207 }
208
209 let capacity = self.buffer.capacity;
210 let filled_size = (tail + capacity - head) % capacity;
211 let rslt = IndexUtil::exists_index(to, offset, filled_size);
212 let i = rslt?;
213 let new_offset = to.wrapping_add(1);
214 let new_head = (head + i + 1) % capacity;
215
216 let (a, b) = IndexUtil::calc_range(head, new_head, capacity);
217 let mut temp_a = Vec::new();
218 let mut temp_b = Vec::new();
219 temp_a.resize_with(a.len(), MaybeUninit::uninit);
220 temp_b.resize_with(b.len(), MaybeUninit::uninit);
221
222 let buf: &mut [MaybeUninit<T>] = self.buffer.get_mut();
223 buf[a].swap_with_slice(&mut temp_a);
224 buf[b].swap_with_slice(&mut temp_b);
225
226 let temp = [temp_a, temp_b].concat();
227 let v: Vec<T> = unsafe { mem::transmute(temp) };
228
229 *head_guard = (new_offset, new_head);
230
231 Some((to, v))
232 }
233
234 pub fn shift(&mut self) -> Option<(usize, T)> {
236 let tail_guard = self.buffer.tail.read();
237 let tail = *tail_guard;
238 drop(tail_guard);
239
240 let mut head_guard = self.buffer.head.write();
241 let (offset, head) = *head_guard;
242
243 if head == tail {
244 return None;
245 }
246
247 let mut new_head = head + 1;
248
249 let capacity = self.buffer.capacity;
250 if new_head == capacity {
251 new_head = 0;
252 }
253
254 let mut temp = MaybeUninit::uninit();
255
256 let buf: &mut [MaybeUninit<T>] = self.buffer.get_mut();
257 mem::swap(unsafe { buf.get_unchecked_mut(head) }, &mut temp);
258 let temp = unsafe { temp.assume_init() };
259
260 *head_guard = (offset.wrapping_add(1), new_head);
261
262 Some((offset, temp))
263 }
264}
265
266unsafe impl<T> Sync for Consumer<T> {}
267unsafe impl<T> Send for Consumer<T> {}
268
269#[derive(Clone)]
271pub struct Reader<T> {
272 buffer: Arc<RingBuffer<T>>,
273}
274
275impl<T> Reader<T>
276where
277 T: Sized + Default + Clone + Copy,
278{
279 pub fn is_empty(&self) -> bool {
280 self.buffer.is_empty()
281 }
282
283 pub fn is_full(&self) -> bool {
284 self.buffer.is_full()
285 }
286
287 pub fn offset(&self) -> usize {
289 let (offset, _) = *self.buffer.head.read();
290 offset
291 }
292
293 pub fn get(&self, idx: usize) -> Option<(usize, T)> {
295 let (offset, head, tail) = self.read_index();
296 if head == tail {
297 return None;
298 }
299
300 let capacity = self.buffer.capacity;
301 let filled_size = (tail + capacity - head) % capacity;
302 let pos;
303 if let Some(i) = IndexUtil::exists_index(idx, offset, filled_size) {
304 pos = (head + i) % capacity;
305 } else {
306 return None;
307 }
308 let buf: &[MaybeUninit<T>] = self.buffer.get_ref();
309 let v: &T =
310 unsafe { &*(buf.get_unchecked(pos) as *const std::mem::MaybeUninit<T> as *const T) };
311 Some((idx, *v))
312 }
313
314 pub fn get_all(&self) -> Option<(usize, usize, Vec<T>)> {
316 let (offset, head, tail) = self.read_index();
317
318 let capacity = self.buffer.capacity;
319 let (a, b) = IndexUtil::calc_range(head, tail, capacity);
320
321 let buf: &[MaybeUninit<T>] = self.buffer.get_ref();
322 let buf_a: &[T] = unsafe { &*(&buf[a] as *const [std::mem::MaybeUninit<T>] as *const [T]) };
323 let buf_b: &[T] = unsafe { &*(&buf[b] as *const [std::mem::MaybeUninit<T>] as *const [T]) };
324 let v = [buf_a, buf_b].concat().to_vec();
325 if !v.is_empty() {
326 Some((offset, offset.wrapping_add(v.len() - 1), v))
327 } else {
328 None
329 }
330 }
331
332 pub fn get_from(&self, idx: usize, len: usize) -> Option<(usize, usize, Vec<T>)> {
334 let (offset, head, tail) = self.read_index();
335 if head == tail {
336 return None;
337 }
338 let capacity = self.buffer.capacity;
339 let filled_size = (tail + capacity - head) % capacity;
340
341 let range_head;
342 let range_tail;
343
344 if let Some(i1) = IndexUtil::exists_index(idx, offset, filled_size) {
345 range_head = (head + i1) % capacity;
346 if len == 0 || i1 + len > filled_size {
347 range_tail = tail;
348 } else {
349 range_tail = (head + i1 + len) % capacity;
350 }
351 } else {
352 return None;
353 }
354
355 let (a, b) = IndexUtil::calc_range(range_head, range_tail, capacity);
356
357 let buf: &[MaybeUninit<T>] = self.buffer.get_ref();
358 let buf_a: &[T] = unsafe { &*(&buf[a] as *const [std::mem::MaybeUninit<T>] as *const [T]) };
359 let buf_b: &[T] = unsafe { &*(&buf[b] as *const [std::mem::MaybeUninit<T>] as *const [T]) };
360
361 let v = [buf_a, buf_b].concat().to_vec();
362 let v_len = v.len();
363 if v_len > 0 {
364 Some((idx, idx.wrapping_add(v_len - 1), v))
365 } else {
366 None
367 }
368 }
369
370 #[inline(always)]
371 fn read_index(&self) -> (usize, usize, usize) {
372 let (offset, head) = *self.buffer.head.read();
373 let tail = *self.buffer.tail.read();
374 (offset, head, tail)
375 }
376}
377
378unsafe impl<T> Sync for Reader<T> {}
379unsafe impl<T> Send for Reader<T> {}
380
381pub fn indexed_ring_buffer<T>(
383 initial_index: usize,
384 capacity: usize,
385) -> (Producer<T>, Consumer<T>, Reader<T>)
386where
387 T: Sized + Default + Clone + Copy,
388{
389 let rb = Arc::new(RingBuffer::<T>::new(initial_index, capacity));
390
391 let tx = Producer::<T> { buffer: rb.clone() };
392
393 let rx = Consumer::<T> { buffer: rb.clone() };
394
395 let rdr = Reader::<T> { buffer: rb };
396
397 (tx, rx, rdr)
398}