use buffers::{Buffer, ReadableBuffer, WritableBuffer};
use internal::arrays;
use internal::arrays::CircularArray;
use std::cell::UnsafeCell;
use std::sync::Arc;
use std::sync::atomic::*;
pub fn bounded<T>(capacity: usize) -> (Reader<T>, Writer<T>) {
let inner = Arc::new(Inner::new(capacity));
(
Reader {
inner: inner.clone(),
},
Writer {
inner: inner,
},
)
}
pub struct Reader<T> {
inner: Arc<Inner<T>>,
}
impl<T> Buffer<T> for Reader<T> {
#[inline]
fn len(&self) -> usize {
self.inner.len()
}
#[inline]
fn capacity(&self) -> usize {
self.inner.capacity()
}
fn clear(&mut self) {
let tail = self.inner.tail.load(Ordering::SeqCst);
self.inner.head.store(tail, Ordering::SeqCst);
}
}
impl<T: Copy> ReadableBuffer<T> for Reader<T> {
fn copy_to(&self, dest: &mut [T]) -> usize {
let head = self.inner.head.load(Ordering::SeqCst);
let tail = self.inner.tail.load(Ordering::SeqCst);
if head == tail {
return 0;
}
unsafe {
let array = &*self.inner.array.get();
let slices = array.as_slices(head..tail);
arrays::copy_from_seq(&slices, dest)
}
}
fn consume(&mut self, count: usize) -> usize {
let count = count.min(self.len());
self.inner.head.fetch_add(count, Ordering::SeqCst);
count
}
}
unsafe impl<T: Copy> Send for Reader<T> {}
pub struct Writer<T> {
inner: Arc<Inner<T>>,
}
impl<T> Buffer<T> for Writer<T> {
#[inline]
fn len(&self) -> usize {
self.inner.len()
}
#[inline]
fn capacity(&self) -> usize {
self.inner.capacity()
}
fn clear(&mut self) {
let tail = self.inner.tail.load(Ordering::SeqCst);
self.inner.head.store(tail, Ordering::SeqCst);
}
}
impl<T: Copy> WritableBuffer<T> for Writer<T> {
fn push(&mut self, src: &[T]) -> usize {
let head = self.inner.head.load(Ordering::SeqCst);
let tail = self.inner.tail.load(Ordering::SeqCst);
if tail.wrapping_sub(head) == self.capacity() {
return 0;
}
unsafe {
let array = &mut *self.inner.array.get();
let mut slices = array.as_slices_mut(tail..head);
let pushed = arrays::copy_to_seq(src, &mut slices);
self.inner.tail.fetch_add(pushed, Ordering::SeqCst);
pushed
}
}
}
unsafe impl<T: Copy> Send for Writer<T> {}
struct Inner<T> {
array: UnsafeCell<CircularArray<T>>,
head: AtomicUsize,
tail: AtomicUsize,
}
impl<T> Inner<T> {
fn new(capacity: usize) -> Self {
Self {
array: unsafe {
UnsafeCell::new(CircularArray::uninitialized(capacity))
},
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
}
}
}
impl<T> Buffer<T> for Inner<T> {
fn len(&self) -> usize {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
tail.wrapping_sub(head)
}
#[inline]
fn capacity(&self) -> usize {
unsafe {
(&*self.array.get()).len()
}
}
fn clear(&mut self) {
let tail = self.tail.load(Ordering::SeqCst);
self.head.store(tail, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_capacity() {
let buffer = bounded::<u8>(16);
assert_eq!(buffer.0.capacity(), 16);
assert_eq!(buffer.1.capacity(), 16);
}
#[test]
fn test_push() {
let mut buffer = bounded::<u8>(16);
assert!(buffer.0.is_empty());
assert!(buffer.1.is_empty());
let bytes = b"hello world";
assert_eq!(buffer.1.push(bytes), bytes.len());
assert!(!buffer.0.is_empty());
assert!(!buffer.1.is_empty());
assert_eq!(buffer.0.len(), bytes.len());
assert_eq!(buffer.1.len(), bytes.len());
}
#[test]
fn test_push_capacity() {
let (mut r, mut w) = bounded::<u8>(8192);
let mut bytes = [1; 4096];
assert_eq!(w.push(&bytes), bytes.len());
assert_eq!(w.push(&bytes), bytes.len());
assert_eq!(r.len(), 8192);
assert_eq!(r.pull(&mut bytes), bytes.len());
}
#[test]
fn test_push_more_than_buffer() {
let mut buffer = bounded::<u8>(2);
assert_eq!(buffer.0.capacity(), 2);
assert_eq!(buffer.1.push(&[100]), 1);
assert_eq!(buffer.1.push(&[200]), 1);
assert_eq!(buffer.1.push(&[210]), 0);
assert_eq!(buffer.1.push(&[220]), 0);
assert_eq!(buffer.0.len(), 2);
}
#[test]
fn test_push_and_consume() {
let mut buffer = bounded::<u8>(12);
assert_eq!(buffer.1.push(b"hello world"), 11);
assert_eq!(buffer.0.consume(6), 6);
assert_eq!(buffer.0.len(), 5);
assert_eq!(buffer.1.push(b" hello"), 6);
assert_eq!(buffer.0.len(), 11);
let mut dest = [0; 11];
assert_eq!(buffer.0.copy_to(&mut dest), 11);
assert_eq!(&dest, b"world hello");
}
#[test]
fn test_pull_more_than_buffer() {
let mut buffer = bounded(32);
let bytes = b"hello world";
buffer.1.push(bytes);
let mut dst = [0; 1024];
assert_eq!(buffer.0.pull(&mut dst), bytes.len());
assert_eq!(&dst[0..bytes.len()], bytes);
assert!(buffer.0.is_empty());
}
#[test]
fn test_pull_less_than_buffer() {
let mut buffer = bounded(32);
let bytes = b"hello world";
buffer.1.push(bytes);
let mut dst = [0; 4];
assert_eq!(buffer.0.pull(&mut dst), dst.len());
assert_eq!(&dst, &bytes[0..4]);
assert!(!buffer.0.is_empty());
assert_eq!(buffer.0.len(), bytes.len() - dst.len());
}
}