indexed_ring_buffer/
lib.rs

1extern crate parking_lot;
2
3use parking_lot::RwLock;
4use std::cell::UnsafeCell;
5use std::cmp::Ordering;
6use std::mem::{self, MaybeUninit};
7use std::ops::Range;
8use std::sync::Arc;
9// Maximum buffer size
10const MAX_BUFFER_SIZE: usize = 2_147_483_647;
11
12/// An indexed multiple readable spsc ring buffer.
13///
14/// - access by an absolute index.
15/// - a Single-Producer Single-Consumer with Multi-Reader
16/// - using RwLock of parking_lot
17///
18/// ```
19/// extern crate indexed_ring_buffer;
20/// use indexed_ring_buffer::*;
21///
22/// let (mut p, mut c, r) = indexed_ring_buffer::<usize>(0, 5);
23/// for i in 0..101 {
24///     p.push(i);
25///     c.shift();
26/// }
27///
28/// for i in 101..106 {
29///     p.push(i);
30/// }
31///
32/// let (start, end, data) = r.get_from(101,5).unwrap();
33/// assert_eq!(data,vec![101,102,103,104,105]);
34/// assert_eq!(start,101);
35/// assert_eq!(end,105);
36///
37/// c.shift_to(105);
38/// let rslt = r.get_all();
39/// assert_eq!(rslt,None);
40///
41/// ```
42/// Ring buffer itself.
43pub struct RingBuffer<T> {
44    pub(crate) data: UnsafeCell<Box<[MaybeUninit<T>]>>,
45    pub(crate) head: RwLock<(usize, usize)>,
46    pub(crate) tail: RwLock<usize>,
47    pub(crate) capacity: usize,
48}
49
50impl<T> RingBuffer<T>
51where
52    T: Sized + Default + Clone + Copy,
53{
54    /// Creates a new instance of a ring buffer.
55    pub fn new(offset: usize, size: usize) -> RingBuffer<T> {
56        let sz = std::cmp::min(MAX_BUFFER_SIZE, size);
57        let mut data = Vec::new();
58        data.resize_with(sz + 1, MaybeUninit::uninit);
59        let len = data.len();
60        Self {
61            data: UnsafeCell::new(data.into_boxed_slice()),
62            head: RwLock::new((offset, 0)),
63            tail: RwLock::new(0),
64            capacity: len,
65        }
66    }
67    #[inline(always)]
68    pub fn get_ref(&self) -> &[MaybeUninit<T>] {
69        unsafe { &*self.data.get() }
70    }
71    #[allow(clippy::mut_from_ref)]
72    #[inline(always)]
73    pub fn get_mut(&self) -> &mut [MaybeUninit<T>] {
74        unsafe { &mut *self.data.get() }
75    }
76
77    /// Checks if the ring buffer is empty.
78    #[inline(always)]
79    pub fn is_empty(&self) -> bool {
80        let (_, head) = *self.head.read();
81        let tail = *self.tail.read();
82        head == tail
83    }
84
85    /// Checks if the ring buffer is full.
86    #[inline(always)]
87    pub fn is_full(&self) -> bool {
88        let (_, head) = *self.head.read();
89        let tail = *self.tail.read();
90        let capacity = self.capacity;
91        (tail + 1) % capacity == head
92    }
93}
94
95///  
96struct IndexUtil;
97impl IndexUtil {
98    /// Returns the ranges.
99    #[inline(always)]
100    pub fn calc_range(head: usize, tail: usize, len: usize) -> (Range<usize>, Range<usize>) {
101        match head.partial_cmp(&tail) {
102            Some(Ordering::Less) => (head..tail, 0..0),
103            Some(Ordering::Greater) => (head..len, 0..tail),
104            Some(Ordering::Equal) => (0..0, 0..0),
105            None => (0..0, 0..0),
106        }
107    }
108    /// Checks if the exists index.
109    #[inline(always)]
110    pub fn exists_index(idx: usize, offset: usize, filled_size: usize) -> Option<usize> {
111        let mut rslt = None;
112        if idx >= offset {
113            let i = idx - offset;
114            if i < filled_size {
115                rslt = Some(i);
116            }
117        } else {
118            let dist_to_max = usize::max_value() - offset;
119            if filled_size - 1 > dist_to_max {
120                let over_size = (filled_size - 1) - dist_to_max;
121                if idx < over_size {
122                    rslt = Some(dist_to_max + 1 + idx);
123                }
124            }
125        }
126        rslt
127    }
128}
129
130/// Producer part of ring buffer.
131pub struct Producer<T> {
132    buffer: Arc<RingBuffer<T>>,
133}
134
135impl<T> Producer<T>
136where
137    T: Sized + Default + Clone + Copy,
138{
139    pub fn is_empty(&self) -> bool {
140        self.buffer.is_empty()
141    }
142
143    pub fn is_full(&self) -> bool {
144        self.buffer.is_full()
145    }
146
147    /// Pushes a new item into the buffer.
148    pub fn push(&mut self, v: T) -> bool {
149        let head_guard = self.buffer.head.read();
150        let head = head_guard.1;
151        drop(head_guard);
152
153        let mut tail_guard = self.buffer.tail.write();
154        let tail = *tail_guard;
155        let mut new_tail = tail + 1;
156
157        if new_tail == self.buffer.capacity {
158            new_tail = 0;
159        }
160
161        if head == new_tail {
162            return false;
163        }
164
165        let buf: &mut [MaybeUninit<T>] = self.buffer.get_mut();
166
167        unsafe {
168            mem::replace(buf.get_unchecked_mut(tail), MaybeUninit::new(v));
169        }
170
171        *tail_guard = new_tail;
172        true
173    }
174}
175
176unsafe impl<T> Sync for Producer<T> {}
177unsafe impl<T> Send for Producer<T> {}
178
179/// Consumer part of ring buffer.
180pub struct Consumer<T> {
181    buffer: Arc<RingBuffer<T>>,
182}
183
184impl<T> Consumer<T>
185where
186    T: Sized + Default + Clone + Copy,
187{
188    pub fn is_empty(&self) -> bool {
189        self.buffer.is_empty()
190    }
191
192    pub fn is_full(&self) -> bool {
193        self.buffer.is_full()
194    }
195
196    /// Removes and returns to multiple items from the buffer
197    pub fn shift_to(&mut self, to: usize) -> Option<(usize, Vec<T>)> {
198        let tail_guard = self.buffer.tail.read();
199        let tail = *tail_guard;
200        drop(tail_guard);
201
202        let mut head_guard = self.buffer.head.write();
203        let (offset, head) = *head_guard;
204
205        if head == tail {
206            return None;
207        }
208
209        let capacity = self.buffer.capacity;
210        let filled_size = (tail + capacity - head) % capacity;
211        let rslt = IndexUtil::exists_index(to, offset, filled_size);
212        let i = rslt?;
213        let new_offset = to.wrapping_add(1);
214        let new_head = (head + i + 1) % capacity;
215
216        let (a, b) = IndexUtil::calc_range(head, new_head, capacity);
217        let mut temp_a = Vec::new();
218        let mut temp_b = Vec::new();
219        temp_a.resize_with(a.len(), MaybeUninit::uninit);
220        temp_b.resize_with(b.len(), MaybeUninit::uninit);
221
222        let buf: &mut [MaybeUninit<T>] = self.buffer.get_mut();
223        buf[a].swap_with_slice(&mut temp_a);
224        buf[b].swap_with_slice(&mut temp_b);
225
226        let temp = [temp_a, temp_b].concat();
227        let v: Vec<T> = unsafe { mem::transmute(temp) };
228
229        *head_guard = (new_offset, new_head);
230
231        Some((to, v))
232    }
233
234    /// Removes and returns the first item from the buffer
235    pub fn shift(&mut self) -> Option<(usize, T)> {
236        let tail_guard = self.buffer.tail.read();
237        let tail = *tail_guard;
238        drop(tail_guard);
239
240        let mut head_guard = self.buffer.head.write();
241        let (offset, head) = *head_guard;
242
243        if head == tail {
244            return None;
245        }
246
247        let mut new_head = head + 1;
248
249        let capacity = self.buffer.capacity;
250        if new_head == capacity {
251            new_head = 0;
252        }
253
254        let mut temp = MaybeUninit::uninit();
255
256        let buf: &mut [MaybeUninit<T>] = self.buffer.get_mut();
257        mem::swap(unsafe { buf.get_unchecked_mut(head) }, &mut temp);
258        let temp = unsafe { temp.assume_init() };
259
260        *head_guard = (offset.wrapping_add(1), new_head);
261
262        Some((offset, temp))
263    }
264}
265
266unsafe impl<T> Sync for Consumer<T> {}
267unsafe impl<T> Send for Consumer<T> {}
268
269/// Reader part of ring buffer.
270#[derive(Clone)]
271pub struct Reader<T> {
272    buffer: Arc<RingBuffer<T>>,
273}
274
275impl<T> Reader<T>
276where
277    T: Sized + Default + Clone + Copy,
278{
279    pub fn is_empty(&self) -> bool {
280        self.buffer.is_empty()
281    }
282
283    pub fn is_full(&self) -> bool {
284        self.buffer.is_full()
285    }
286
287    /// Returns the offset.
288    pub fn offset(&self) -> usize {
289        let (offset, _) = *self.buffer.head.read();
290        offset
291    }
292
293    /// Returns the single item from the buffer.
294    pub fn get(&self, idx: usize) -> Option<(usize, T)> {
295        let (offset, head, tail) = self.read_index();
296        if head == tail {
297            return None;
298        }
299
300        let capacity = self.buffer.capacity;
301        let filled_size = (tail + capacity - head) % capacity;
302        let pos;
303        if let Some(i) = IndexUtil::exists_index(idx, offset, filled_size) {
304            pos = (head + i) % capacity;
305        } else {
306            return None;
307        }
308        let buf: &[MaybeUninit<T>] = self.buffer.get_ref();
309        let v: &T =
310            unsafe { &*(buf.get_unchecked(pos) as *const std::mem::MaybeUninit<T> as *const T) };
311        Some((idx, *v))
312    }
313
314    /// Returns the all items from the buffer.
315    pub fn get_all(&self) -> Option<(usize, usize, Vec<T>)> {
316        let (offset, head, tail) = self.read_index();
317
318        let capacity = self.buffer.capacity;
319        let (a, b) = IndexUtil::calc_range(head, tail, capacity);
320
321        let buf: &[MaybeUninit<T>] = self.buffer.get_ref();
322        let buf_a: &[T] = unsafe { &*(&buf[a] as *const [std::mem::MaybeUninit<T>] as *const [T]) };
323        let buf_b: &[T] = unsafe { &*(&buf[b] as *const [std::mem::MaybeUninit<T>] as *const [T]) };
324        let v = [buf_a, buf_b].concat().to_vec();
325        if !v.is_empty() {
326            Some((offset, offset.wrapping_add(v.len() - 1), v))
327        } else {
328            None
329        }
330    }
331
332    /// Returns the range items from the buffer.
333    pub fn get_from(&self, idx: usize, len: usize) -> Option<(usize, usize, Vec<T>)> {
334        let (offset, head, tail) = self.read_index();
335        if head == tail {
336            return None;
337        }
338        let capacity = self.buffer.capacity;
339        let filled_size = (tail + capacity - head) % capacity;
340
341        let range_head;
342        let range_tail;
343
344        if let Some(i1) = IndexUtil::exists_index(idx, offset, filled_size) {
345            range_head = (head + i1) % capacity;
346            if len == 0 || i1 + len > filled_size {
347                range_tail = tail;
348            } else {
349                range_tail = (head + i1 + len) % capacity;
350            }
351        } else {
352            return None;
353        }
354
355        let (a, b) = IndexUtil::calc_range(range_head, range_tail, capacity);
356
357        let buf: &[MaybeUninit<T>] = self.buffer.get_ref();
358        let buf_a: &[T] = unsafe { &*(&buf[a] as *const [std::mem::MaybeUninit<T>] as *const [T]) };
359        let buf_b: &[T] = unsafe { &*(&buf[b] as *const [std::mem::MaybeUninit<T>] as *const [T]) };
360
361        let v = [buf_a, buf_b].concat().to_vec();
362        let v_len = v.len();
363        if v_len > 0 {
364            Some((idx, idx.wrapping_add(v_len - 1), v))
365        } else {
366            None
367        }
368    }
369
370    #[inline(always)]
371    fn read_index(&self) -> (usize, usize, usize) {
372        let (offset, head) = *self.buffer.head.read();
373        let tail = *self.buffer.tail.read();
374        (offset, head, tail)
375    }
376}
377
378unsafe impl<T> Sync for Reader<T> {}
379unsafe impl<T> Send for Reader<T> {}
380
381/// Create a new Indexed ring buffer sets.
382pub fn indexed_ring_buffer<T>(
383    initial_index: usize,
384    capacity: usize,
385) -> (Producer<T>, Consumer<T>, Reader<T>)
386where
387    T: Sized + Default + Clone + Copy,
388{
389    let rb = Arc::new(RingBuffer::<T>::new(initial_index, capacity));
390
391    let tx = Producer::<T> { buffer: rb.clone() };
392
393    let rx = Consumer::<T> { buffer: rb.clone() };
394
395    let rdr = Reader::<T> { buffer: rb };
396
397    (tx, rx, rdr)
398}