use std::any::TypeId;
use std::ops::{Index, IndexMut};
#[derive(Debug, PartialEq)]
pub enum RBError {
TooLargeWrite,
InvalidReader,
}
#[derive(Hash, PartialEq, Clone, Debug)]
pub struct ReaderId {
t: TypeId,
read_index: usize,
written: usize,
}
impl ReaderId {
pub fn new(t: TypeId, reader_index: usize, written: usize) -> ReaderId {
ReaderId {
t,
read_index: reader_index,
written,
}
}
}
pub struct RingBufferStorage<T> {
pub(crate) data: Vec<T>,
write_index: usize,
max_size: usize,
written: usize,
reset_written: usize,
}
impl<T: 'static> RingBufferStorage<T> {
pub fn new(size: usize) -> Self {
RingBufferStorage {
data: Vec::with_capacity(size),
write_index: 0,
max_size: size,
written: 0,
reset_written: size * 1000,
}
}
pub fn write_slice(&mut self, data: &[T]) -> Result<(), RBError>
where T: Clone,
{
if data.len() > self.max_size {
return Err(RBError::TooLargeWrite);
}
for d in data {
self.write_single(d.clone());
}
Ok(())
}
pub fn drain_vec_write(&mut self, data: &mut Vec<T>) -> Result<(), RBError> {
if data.len() > self.max_size {
return Err(RBError::TooLargeWrite);
}
for d in data.drain(0..) {
self.write_single(d);
}
Ok(())
}
pub fn write_single(&mut self, data: T) {
let mut write_index = self.write_index;
if write_index == self.data.len() {
self.data.push(data);
} else {
self.data[write_index] = data;
}
write_index += 1;
if write_index >= self.max_size {
write_index = 0;
}
self.write_index = write_index;
self.written += 1;
if self.written > self.reset_written {
self.written = 0;
}
}
pub fn new_reader_id(&self) -> ReaderId {
let reader_id = ReaderId::new(TypeId::of::<T>(), self.write_index, self.written);
reader_id
}
pub fn read(&self, reader_id: &mut ReaderId) -> Result<ReadData<T>, RBError> {
if reader_id.t != TypeId::of::<T>() {
return Err(RBError::InvalidReader);
}
let num_written = if self.written < reader_id.written {
self.written + (self.reset_written - reader_id.written)
} else {
self.written - reader_id.written
};
let read_index = reader_id.read_index;
reader_id.read_index = self.write_index;
reader_id.written = self.written;
if num_written > self.max_size {
Ok(ReadData::Overflow(
StorageIterator {
storage: &self,
current: self.write_index,
end: self.write_index,
started: false,
},
num_written - self.max_size,
))
} else {
Ok(ReadData::Data(StorageIterator {
storage: &self,
current: read_index,
end: self.write_index,
started: num_written == 0,
}))
}
}
}
pub enum ReadData<'a, T: 'a> {
Data(StorageIterator<'a, T>),
Overflow(StorageIterator<'a, T>, usize),
}
pub struct StorageIterator<'a, T: 'a> {
storage: &'a RingBufferStorage<T>,
current: usize,
end: usize,
started: bool,
}
impl<'a, T> Iterator for StorageIterator<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<&'a T> {
if self.started && self.current == self.end {
None
} else {
self.started = true;
let item = &self.storage[self.current];
self.current += 1;
if self.current == self.storage.data.len() && self.end != self.storage.data.len() {
self.current = 0;
}
Some(item)
}
}
}
impl<T> Index<usize> for RingBufferStorage<T> {
type Output = T;
fn index(&self, index: usize) -> &Self::Output {
&self.data[index]
}
}
impl<T> IndexMut<usize> for RingBufferStorage<T> {
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
&mut self.data[index]
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::any::TypeId;
#[derive(Debug, Clone, PartialEq)]
struct Test {
pub id: u32,
}
#[derive(Debug, Clone, PartialEq)]
struct Test2 {
pub id: u32,
}
#[test]
fn test_empty_write() {
let mut buffer = RingBufferStorage::<Test>::new(10);
let r = buffer.drain_vec_write(&mut vec![]);
assert!(r.is_ok());
}
#[test]
fn test_too_large_write() {
let mut buffer = RingBufferStorage::<Test>::new(10);
let r = buffer.drain_vec_write(&mut events(15));
assert!(r.is_err());
match r {
Err(RBError::TooLargeWrite) => (),
_ => panic!(),
}
}
#[test]
fn test_invalid_reader() {
let buffer = RingBufferStorage::<Test>::new(10);
let mut reader_id = ReaderId::new(TypeId::of::<Test2>(), 0, 0);
let r = buffer.read(&mut reader_id);
assert!(r.is_err());
match r {
Err(RBError::InvalidReader) => (),
_ => panic!(),
}
}
#[test]
fn test_empty_read() {
let buffer = RingBufferStorage::<Test>::new(10);
let mut reader_id = buffer.new_reader_id();
match buffer.read(&mut reader_id) {
Ok(ReadData::Data(data)) => {
assert_eq!(Vec::<Test>::default(), data.cloned().collect::<Vec<_>>())
}
_ => panic!(),
}
}
#[test]
fn test_empty_read_write_before_id() {
let mut buffer = RingBufferStorage::<Test>::new(10);
assert!(buffer.drain_vec_write(&mut events(2)).is_ok());
let mut reader_id = buffer.new_reader_id();
match buffer.read(&mut reader_id) {
Ok(ReadData::Data(data)) => {
assert_eq!(Vec::<Test>::default(), data.cloned().collect::<Vec<_>>())
}
_ => panic!(),
}
}
#[test]
fn test_read() {
let mut buffer = RingBufferStorage::<Test>::new(10);
let mut reader_id = buffer.new_reader_id();
assert!(buffer.drain_vec_write(&mut events(2)).is_ok());
match buffer.read(&mut reader_id) {
Ok(ReadData::Data(data)) => assert_eq!(
vec![Test { id: 0 }, Test { id: 1 }],
data.cloned().collect::<Vec<_>>()
),
_ => panic!(),
}
}
#[test]
fn test_write_overflow() {
let mut buffer = RingBufferStorage::<Test>::new(3);
let mut reader_id = buffer.new_reader_id();
assert!(buffer.drain_vec_write(&mut events(2)).is_ok());
assert!(buffer.drain_vec_write(&mut events(2)).is_ok());
let r = buffer.read(&mut reader_id);
match r {
Ok(ReadData::Overflow(lost_data, lost_size)) => {
assert_eq!(1, lost_size);
assert_eq!(
vec![Test { id: 1 }, Test { id: 0 }, Test { id: 1 }],
lost_data.cloned().collect::<Vec<_>>()
);
}
_ => panic!(),
}
}
#[test]
fn test_write_slice() {
let mut buffer = RingBufferStorage::<Test>::new(10);
let mut reader_id = buffer.new_reader_id();
assert!(buffer.write_slice(&events(2)).is_ok());
match buffer.read(&mut reader_id) {
Ok(ReadData::Data(data)) => {
assert_eq!(
vec![Test { id: 0 }, Test { id: 1 }],
data.cloned().collect::<Vec<_>>()
);
}
_ => panic!(),
}
}
fn events(n: u32) -> Vec<Test> {
(0..n).map(|i| Test { id: i }).collect::<Vec<_>>()
}
}