use std::iter;
use std::sync::{Arc, Condvar, Mutex};
pub fn circbuf<T: Copy + Default>(len: usize) -> (Sender<T>, Receiver<T>) {
assert!(len > 0);
let buf = Box::from_iter(iter::repeat_n(T::default(), len));
let inner = Inner {
buf,
send_i: 0,
recv_i: 0,
filled: 0,
send_alive: true,
recv_alive: true,
};
let inner_mutex = Arc::new(Mutex::new(inner));
let cond = Arc::new(Condvar::new());
let sender = Sender::new(Arc::clone(&inner_mutex), Arc::clone(&cond));
let receiver = Receiver::new(Arc::clone(&inner_mutex), Arc::clone(&cond));
(sender, receiver)
}
struct Inner<T> {
buf: Box<[T]>,
send_i: usize, recv_i: usize, filled: usize,
send_alive: bool,
recv_alive: bool
}
type InnerMutex<T> = Arc<Mutex<Inner<T>>>;
type CondRc = Arc<Condvar>;
pub struct Sender<T: Copy> { inner_mutex: InnerMutex<T>,
cond: CondRc,
}
impl<T: Copy> Sender<T> {
fn new(inner_mutex: InnerMutex<T>, cond: CondRc) -> Self {
Self {
inner_mutex,
cond,
}
}
pub fn send(&self, buf: &[T]) -> bool {
let buf_len = buf.len();
let mut i: usize = 0;
loop {
let mut todo = buf_len - i;
if todo == 0 {
break;
}
let inner = self.inner_mutex.lock().unwrap();
let inner_buf_len = inner.buf.len();
let mut inner = self.cond.wait_while(inner, |inner| inner.recv_alive && inner.filled == inner_buf_len).unwrap();
if !inner.recv_alive { return false;
}
todo = [inner_buf_len - inner.filled, todo].into_iter().min().unwrap(); assert!(todo > 0);
inner.filled += todo;
while todo > 0 {
let inner_send_i = inner.send_i;
let copy = [inner_buf_len - inner_send_i, todo].into_iter().min().unwrap();
assert!(copy > 0);
let inner_buf_chunk = &mut inner.buf[inner_send_i..inner_send_i + copy];
inner_buf_chunk.copy_from_slice(&buf[i..i + copy]);
inner.send_i += copy;
if inner.send_i == inner_buf_len {
inner.send_i = 0;
}
i += copy;
todo -= copy;
}
self.notify();
}
true
}
fn notify(&self) {
self.cond.notify_all(); }
}
impl<T: Copy> Drop for Sender<T> {
fn drop(&mut self) {
let mut inner = self.inner_mutex.lock().unwrap();
inner.send_alive = false;
self.notify();
}
}
pub struct Receiver<T: Copy> { inner_mutex: InnerMutex<T>,
cond: CondRc,
}
impl<T: Copy> Receiver<T> {
fn new(inner_mutex: InnerMutex<T>, cond: CondRc) -> Self {
Self {
inner_mutex,
cond,
}
}
pub fn recv(&self, buf: &mut [T]) -> usize {
let buf_len = buf.len();
let mut i: usize = 0;
loop {
let mut todo = buf_len - i;
if todo == 0 {
break;
}
let inner = self.inner_mutex.lock().unwrap();
let inner_buf_len = inner.buf.len();
let mut inner = self.cond.wait_while(inner, |inner| inner.send_alive && inner.filled == 0).unwrap();
todo = [inner.filled, todo].into_iter().min().unwrap();
if todo == 0 {
assert!(!inner.send_alive);
break;
}
inner.filled -= todo;
while todo > 0 {
let inner_recv_i = inner.recv_i;
let copy = [inner_buf_len - inner_recv_i, todo].into_iter().min().unwrap();
assert!(copy > 0);
let buf_chunk = &mut buf[i..i + copy];
buf_chunk.copy_from_slice(&inner.buf[inner_recv_i..inner_recv_i + copy]);
inner.recv_i += copy;
if inner.recv_i == inner_buf_len {
inner.recv_i = 0;
}
i += copy;
todo -= copy;
}
self.notify();
}
i
}
pub fn wait_full(&self) {
let inner = self.inner_mutex.lock().unwrap();
let inner_buf_len = inner.buf.len();
let _inner = self.cond.wait_while(inner, |inner| inner.send_alive && inner.filled < inner_buf_len).unwrap(); }
fn notify(&self) {
self.cond.notify_all(); }
}
impl<T: Copy> Drop for Receiver<T> {
fn drop(&mut self) {
let mut inner = self.inner_mutex.lock().unwrap();
inner.recv_alive = false;
self.notify();
}
}