use std::cell::UnsafeCell;
use std::io::{self, Read, Write};
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
struct SpscBuffer {
buf: UnsafeCell<Box<[u8]>>,
len: AtomicUsize,
}
impl SpscBuffer {
fn new(size: usize) -> Self {
Self {
buf: UnsafeCell::new(vec![0; size].into_boxed_slice()),
len: AtomicUsize::new(0),
}
}
fn len(&self) -> usize {
self.len.load(Ordering::SeqCst)
}
fn capacity(&self) -> usize {
unsafe { &*self.buf.get() }.len()
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn is_full(&self) -> bool {
self.len() == self.capacity()
}
}
pub struct SpscBufferReader {
start: usize,
buffer: Arc<SpscBuffer>,
}
impl SpscBufferReader {
pub fn len(&self) -> usize {
self.buffer.len()
}
pub fn capacity(&self) -> usize {
self.buffer.capacity()
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn is_full(&self) -> bool {
self.buffer.is_full()
}
pub fn read_to_slice(&mut self, buf: &mut [u8]) -> usize {
use std::cmp::min;
let ringbuf: &mut Box<[u8]> = unsafe { mem::transmute(self.buffer.buf.get()) };
let ringbuf_capacity = ringbuf.len();
let ringbuf_len = self.buffer.len.load(Ordering::SeqCst);
let max_read_size = min(buf.len(), ringbuf_len);
let contents_until_end = ringbuf_capacity - self.start;
let read_size = min(max_read_size, contents_until_end);
buf[..read_size].copy_from_slice(&ringbuf[self.start..self.start + read_size]);
self.start = (self.start + read_size) % ringbuf_capacity;
self.buffer.len.fetch_sub(read_size, Ordering::SeqCst);
read_size
}
}
impl Read for SpscBufferReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
Ok(self.read_to_slice(buf))
}
}
unsafe impl Sync for SpscBufferReader {}
unsafe impl Send for SpscBufferReader {}
pub struct SpscBufferWriter {
end: usize,
buffer: Arc<SpscBuffer>,
}
impl SpscBufferWriter {
pub fn len(&self) -> usize {
self.buffer.len()
}
pub fn capacity(&self) -> usize {
self.buffer.capacity()
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn is_full(&self) -> bool {
self.buffer.is_full()
}
pub fn write_from_slice(&mut self, buf: &[u8]) -> usize {
use std::cmp::min;
let ringbuf: &mut Box<[u8]> = unsafe { mem::transmute(self.buffer.buf.get()) };
let ringbuf_capacity = ringbuf.len();
let ringbuf_len = self.buffer.len.load(Ordering::SeqCst);
let max_write_size = min(buf.len(), ringbuf_capacity - ringbuf_len);
let space_until_end = ringbuf_capacity - self.end;
let write_size = min(max_write_size, space_until_end);
ringbuf[self.end..self.end + write_size].copy_from_slice(&buf[..write_size]);
self.end = (self.end + write_size) % ringbuf_capacity;
self.buffer.len.fetch_add(write_size, Ordering::SeqCst);
write_size
}
}
unsafe impl Sync for SpscBufferWriter {}
unsafe impl Send for SpscBufferWriter {}
impl Write for SpscBufferWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
Ok(self.write_from_slice(buf))
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub fn spsc_buffer(size: usize) -> (SpscBufferWriter, SpscBufferReader) {
let buffer = Arc::new(SpscBuffer::new(size));
let producer = SpscBufferWriter {
end: 0,
buffer: buffer.clone(),
};
let consumer = SpscBufferReader { start: 0, buffer };
(producer, consumer)
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_spsc_buffer() {
let buf = [1u8; 100];
let (mut producer, mut consumer) = spsc_buffer(60);
assert!(producer.is_empty());
assert!(consumer.is_empty());
assert_eq!(producer.len(), 0);
assert_eq!(consumer.len(), 0);
assert_eq!(producer.capacity(), 60);
assert_eq!(consumer.capacity(), 60);
let mut out_buf = [0u8; 100];
assert_eq!(producer.write_from_slice(&buf), 60);
assert_eq!(producer.len(), 60);
assert_eq!(consumer.len(), 60);
assert_eq!(consumer.read_to_slice(&mut out_buf), 60);
assert_eq!(producer.len(), 0);
assert_eq!(consumer.len(), 0);
assert_eq!(producer.write_from_slice(&buf[60..]), 40);
assert_eq!(producer.len(), 40);
assert_eq!(consumer.len(), 40);
assert_eq!(consumer.read_to_slice(&mut out_buf[60..]), 40);
assert_eq!(producer.len(), 0);
assert_eq!(consumer.len(), 0);
assert_eq!(&buf[..], &out_buf[..]);
}
}