extern crate parking_lot;
use parking_lot::RwLock;
use std::cell::UnsafeCell;
use std::cmp::Ordering;
use std::mem::{self, MaybeUninit};
use std::ops::Range;
use std::sync::Arc;
const MAX_BUFFER_SIZE: usize = 2_147_483_647;
pub struct RingBuffer<T> {
pub(crate) data: UnsafeCell<Box<[MaybeUninit<T>]>>,
pub(crate) head: RwLock<(usize, usize)>,
pub(crate) tail: RwLock<usize>,
pub(crate) capacity: usize,
}
impl<T> RingBuffer<T>
where
T: Sized + Default + Clone + Copy,
{
pub fn new(offset: usize, size: usize) -> RingBuffer<T> {
let sz = std::cmp::min(MAX_BUFFER_SIZE, size);
let mut data = Vec::new();
data.resize_with(sz + 1, MaybeUninit::uninit);
let len = data.len();
Self {
data: UnsafeCell::new(data.into_boxed_slice()),
head: RwLock::new((offset, 0)),
tail: RwLock::new(0),
capacity: len,
}
}
#[inline(always)]
pub fn get_ref(&self) -> &[MaybeUninit<T>] {
unsafe { &*self.data.get() }
}
#[allow(clippy::mut_from_ref)]
#[inline(always)]
pub fn get_mut(&self) -> &mut [MaybeUninit<T>] {
unsafe { &mut *self.data.get() }
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
let (_, head) = *self.head.read();
let tail = *self.tail.read();
head == tail
}
#[inline(always)]
pub fn is_full(&self) -> bool {
let (_, head) = *self.head.read();
let tail = *self.tail.read();
let capacity = self.capacity;
(tail + 1) % capacity == head
}
}
struct IndexUtil;
impl IndexUtil {
#[inline(always)]
pub fn calc_range(head: usize, tail: usize, len: usize) -> (Range<usize>, Range<usize>) {
match head.partial_cmp(&tail) {
Some(Ordering::Less) => (head..tail, 0..0),
Some(Ordering::Greater) => (head..len, 0..tail),
Some(Ordering::Equal) => (0..0, 0..0),
None => (0..0, 0..0),
}
}
#[inline(always)]
pub fn exists_index(idx: usize, offset: usize, filled_size: usize) -> Option<usize> {
let mut rslt = None;
if idx >= offset {
let i = idx - offset;
if i < filled_size {
rslt = Some(i);
}
} else {
let dist_to_max = usize::max_value() - offset;
if filled_size - 1 > dist_to_max {
let over_size = (filled_size - 1) - dist_to_max;
if idx < over_size {
rslt = Some(dist_to_max + 1 + idx);
}
}
}
rslt
}
}
pub struct Producer<T> {
buffer: Arc<RingBuffer<T>>,
}
impl<T> Producer<T>
where
T: Sized + Default + Clone + Copy,
{
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn is_full(&self) -> bool {
self.buffer.is_full()
}
pub fn push(&mut self, v: T) -> bool {
let head_guard = self.buffer.head.read();
let head = head_guard.1;
drop(head_guard);
let mut tail_guard = self.buffer.tail.write();
let tail = *tail_guard;
let mut new_tail = tail + 1;
if new_tail == self.buffer.capacity {
new_tail = 0;
}
if head == new_tail {
return false;
}
let buf: &mut [MaybeUninit<T>] = self.buffer.get_mut();
unsafe {
mem::replace(buf.get_unchecked_mut(tail), MaybeUninit::new(v));
}
*tail_guard = new_tail;
true
}
}
unsafe impl<T> Sync for Producer<T> {}
unsafe impl<T> Send for Producer<T> {}
pub struct Consumer<T> {
buffer: Arc<RingBuffer<T>>,
}
impl<T> Consumer<T>
where
T: Sized + Default + Clone + Copy,
{
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn is_full(&self) -> bool {
self.buffer.is_full()
}
pub fn shift_to(&mut self, to: usize) -> Option<(usize, Vec<T>)> {
let tail_guard = self.buffer.tail.read();
let tail = *tail_guard;
drop(tail_guard);
let mut head_guard = self.buffer.head.write();
let (offset, head) = *head_guard;
if head == tail {
return None;
}
let capacity = self.buffer.capacity;
let filled_size = (tail + capacity - head) % capacity;
let rslt = IndexUtil::exists_index(to, offset, filled_size);
let i = rslt?;
let new_offset = to.wrapping_add(1);
let new_head = (head + i + 1) % capacity;
let (a, b) = IndexUtil::calc_range(head, new_head, capacity);
let mut temp_a = Vec::new();
let mut temp_b = Vec::new();
temp_a.resize_with(a.len(), MaybeUninit::uninit);
temp_b.resize_with(b.len(), MaybeUninit::uninit);
let buf: &mut [MaybeUninit<T>] = self.buffer.get_mut();
buf[a].swap_with_slice(&mut temp_a);
buf[b].swap_with_slice(&mut temp_b);
let temp = [temp_a, temp_b].concat();
let v: Vec<T> = unsafe { mem::transmute(temp) };
*head_guard = (new_offset, new_head);
Some((to, v))
}
pub fn shift(&mut self) -> Option<(usize, T)> {
let tail_guard = self.buffer.tail.read();
let tail = *tail_guard;
drop(tail_guard);
let mut head_guard = self.buffer.head.write();
let (offset, head) = *head_guard;
if head == tail {
return None;
}
let mut new_head = head + 1;
let capacity = self.buffer.capacity;
if new_head == capacity {
new_head = 0;
}
let mut temp = MaybeUninit::uninit();
let buf: &mut [MaybeUninit<T>] = self.buffer.get_mut();
mem::swap(unsafe { buf.get_unchecked_mut(head) }, &mut temp);
let temp = unsafe { temp.assume_init() };
*head_guard = (offset.wrapping_add(1), new_head);
Some((offset, temp))
}
}
unsafe impl<T> Sync for Consumer<T> {}
unsafe impl<T> Send for Consumer<T> {}
#[derive(Clone)]
pub struct Reader<T> {
buffer: Arc<RingBuffer<T>>,
}
impl<T> Reader<T>
where
T: Sized + Default + Clone + Copy,
{
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn is_full(&self) -> bool {
self.buffer.is_full()
}
pub fn offset(&self) -> usize {
let (offset, _) = *self.buffer.head.read();
offset
}
pub fn get(&self, idx: usize) -> Option<(usize, T)> {
let (offset, head, tail) = self.read_index();
if head == tail {
return None;
}
let capacity = self.buffer.capacity;
let filled_size = (tail + capacity - head) % capacity;
let pos;
if let Some(i) = IndexUtil::exists_index(idx, offset, filled_size) {
pos = (head + i) % capacity;
} else {
return None;
}
let buf: &[MaybeUninit<T>] = self.buffer.get_ref();
let v: &T =
unsafe { &*(buf.get_unchecked(pos) as *const std::mem::MaybeUninit<T> as *const T) };
Some((idx, *v))
}
pub fn get_all(&self) -> Option<(usize, usize, Vec<T>)> {
let (offset, head, tail) = self.read_index();
let capacity = self.buffer.capacity;
let (a, b) = IndexUtil::calc_range(head, tail, capacity);
let buf: &[MaybeUninit<T>] = self.buffer.get_ref();
let buf_a: &[T] = unsafe { &*(&buf[a] as *const [std::mem::MaybeUninit<T>] as *const [T]) };
let buf_b: &[T] = unsafe { &*(&buf[b] as *const [std::mem::MaybeUninit<T>] as *const [T]) };
let v = [buf_a, buf_b].concat().to_vec();
if !v.is_empty() {
Some((offset, offset.wrapping_add(v.len() - 1), v))
} else {
None
}
}
pub fn get_from(&self, idx: usize, len: usize) -> Option<(usize, usize, Vec<T>)> {
let (offset, head, tail) = self.read_index();
if head == tail {
return None;
}
let capacity = self.buffer.capacity;
let filled_size = (tail + capacity - head) % capacity;
let range_head;
let range_tail;
if let Some(i1) = IndexUtil::exists_index(idx, offset, filled_size) {
range_head = (head + i1) % capacity;
if len == 0 || i1 + len > filled_size {
range_tail = tail;
} else {
range_tail = (head + i1 + len) % capacity;
}
} else {
return None;
}
let (a, b) = IndexUtil::calc_range(range_head, range_tail, capacity);
let buf: &[MaybeUninit<T>] = self.buffer.get_ref();
let buf_a: &[T] = unsafe { &*(&buf[a] as *const [std::mem::MaybeUninit<T>] as *const [T]) };
let buf_b: &[T] = unsafe { &*(&buf[b] as *const [std::mem::MaybeUninit<T>] as *const [T]) };
let v = [buf_a, buf_b].concat().to_vec();
let v_len = v.len();
if v_len > 0 {
Some((idx, idx.wrapping_add(v_len - 1), v))
} else {
None
}
}
#[inline(always)]
fn read_index(&self) -> (usize, usize, usize) {
let (offset, head) = *self.buffer.head.read();
let tail = *self.buffer.tail.read();
(offset, head, tail)
}
}
unsafe impl<T> Sync for Reader<T> {}
unsafe impl<T> Send for Reader<T> {}
pub fn indexed_ring_buffer<T>(
initial_index: usize,
capacity: usize,
) -> (Producer<T>, Consumer<T>, Reader<T>)
where
T: Sized + Default + Clone + Copy,
{
let rb = Arc::new(RingBuffer::<T>::new(initial_index, capacity));
let tx = Producer::<T> { buffer: rb.clone() };
let rx = Consumer::<T> { buffer: rb.clone() };
let rdr = Reader::<T> { buffer: rb };
(tx, rx, rdr)
}