use std::cell::UnsafeCell;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
const NIL: usize = 1 << 16 as usize;
#[derive(Eq, PartialEq)]
pub enum SeccErrors<T: Sync + Send> {
Full(T),
Empty,
}
impl<T: Sync + Send> fmt::Debug for SeccErrors<T> {
fn fmt(&self, formatter: &'_ mut fmt::Formatter) -> fmt::Result {
match self {
SeccErrors::Full(_) => write!(formatter, "SeccErrors::Full"),
SeccErrors::Empty => write!(formatter, "SeccErrors::Empty"),
}
}
}
struct SeccNode<T: Sync + Send> {
cell: UnsafeCell<Option<T>>,
next: AtomicUsize,
}
impl<T: Sync + Send> SeccNode<T> {
fn new() -> SeccNode<T> {
SeccNode {
cell: UnsafeCell::new(None),
next: AtomicUsize::new(NIL),
}
}
fn with_next(next: usize) -> SeccNode<T> {
SeccNode {
cell: UnsafeCell::new(None),
next: AtomicUsize::new(next),
}
}
}
pub trait SeccCoreOps<T: Sync + Send> {
fn core(&self) -> &SeccCore<T>;
fn capacity(&self) -> usize {
self.core().capacity
}
fn awaited_messages(&self) -> usize {
self.core().awaited_messages.load(Ordering::Relaxed)
}
fn awaited_capacity(&self) -> usize {
self.core().awaited_capacity.load(Ordering::Relaxed)
}
fn pending(&self) -> usize {
self.core().pending.load(Ordering::Relaxed)
}
fn receivable(&self) -> usize {
self.core().receivable.load(Ordering::Relaxed)
}
fn sent(&self) -> usize {
self.core().sent.load(Ordering::Relaxed)
}
fn received(&self) -> usize {
self.core().received.load(Ordering::Relaxed)
}
}
#[derive(Debug)]
struct SeccSendPtrs {
queue_tail: usize,
pool_head: usize,
}
#[derive(Debug)]
struct SeccReceivePtrs {
queue_head: usize,
pool_tail: usize,
skipped: usize,
cursor: usize,
}
pub struct SeccCore<T: Sync + Send> {
capacity: usize,
poll_ms: u16,
_nodes: Box<[SeccNode<T>]>,
node_ptrs: UnsafeCell<Vec<*mut SeccNode<T>>>,
send_ptrs: Arc<(Mutex<SeccSendPtrs>, Condvar)>,
receive_ptrs: Arc<(Mutex<SeccReceivePtrs>, Condvar)>,
awaited_messages: AtomicUsize,
awaited_capacity: AtomicUsize,
pending: AtomicUsize,
receivable: AtomicUsize,
sent: AtomicUsize,
received: AtomicUsize,
}
pub struct SeccSender<T: Sync + Send> {
core: Arc<SeccCore<T>>,
}
impl<T: Sync + Send> SeccSender<T> {
pub fn send(&self, message: T) -> Result<(), SeccErrors<T>> {
unsafe {
let (ref mutex, ref condvar) = &*self.core.send_ptrs;
let mut send_ptrs = mutex.lock().unwrap();
let pool_head_ptr = (*self.core.node_ptrs.get())[send_ptrs.pool_head];
let next_pool_head = (*pool_head_ptr).next.load(Ordering::SeqCst);
if NIL == next_pool_head {
Err(SeccErrors::Full(message))
} else {
let queue_tail_ptr = (*self.core.node_ptrs.get())[send_ptrs.queue_tail];
(*(*queue_tail_ptr).cell.get()) = Some(message);
let old_pool_head = send_ptrs.pool_head;
send_ptrs.queue_tail = send_ptrs.pool_head;
send_ptrs.pool_head = next_pool_head;
self.core.sent.fetch_add(1, Ordering::SeqCst);
self.core.receivable.fetch_add(1, Ordering::SeqCst);
self.core.pending.fetch_add(1, Ordering::SeqCst);
(*pool_head_ptr).next.store(NIL, Ordering::SeqCst);
(*queue_tail_ptr)
.next
.store(old_pool_head, Ordering::SeqCst);
condvar.notify_all();
Ok(())
}
}
}
pub fn send_await_timeout(&self, mut message: T, timeout_ms: u16) -> Result<(), SeccErrors<T>> {
loop {
match self.send(message) {
Err(SeccErrors::Full(v)) => {
message = v;
let dur = Duration::from_millis(timeout_ms as u64);
let (ref mutex, ref condvar) = &*self.core.receive_ptrs;
let receive_ptrs = mutex.lock().unwrap();
let next_read_pos = unsafe {
let read_ptr = if receive_ptrs.cursor == NIL {
(*self.core.node_ptrs.get())[receive_ptrs.queue_head]
} else {
(*self.core.node_ptrs.get())[receive_ptrs.cursor]
};
(*read_ptr).next.load(Ordering::SeqCst)
};
if NIL != next_read_pos {
let result = condvar.wait_timeout(receive_ptrs, dur).unwrap();
self.core.awaited_capacity.fetch_add(1, Ordering::SeqCst);
if result.1.timed_out() {
return self.send(message);
}
}
}
v => return v,
}
}
}
pub fn send_await(&self, mut message: T) -> Result<(), SeccErrors<T>> {
loop {
match self.send_await_timeout(message, self.core.poll_ms) {
Err(SeccErrors::Full(v)) => {
message = v;
()
}
other => return other,
}
}
}
}
impl<T: Sync + Send> SeccCoreOps<T> for SeccSender<T> {
fn core(&self) -> &SeccCore<T> {
&self.core
}
}
unsafe impl<T: Send + Sync> Send for SeccSender<T> {}
unsafe impl<T: Send + Sync> Sync for SeccSender<T> {}
pub struct SeccReceiver<T: Sync + Send> {
core: Arc<SeccCore<T>>,
}
impl<T: Sync + Send> SeccReceiver<T> {
pub fn peek(&self) -> Result<&T, SeccErrors<T>> {
unsafe {
let (ref mutex, _) = &*self.core.receive_ptrs;
let receive_ptrs = mutex.lock().unwrap();
let read_ptr = if receive_ptrs.cursor == NIL {
(*self.core.node_ptrs.get())[receive_ptrs.queue_head]
} else {
(*self.core.node_ptrs.get())[receive_ptrs.cursor]
};
let next_read_pos = (*read_ptr).next.load(Ordering::SeqCst);
if NIL == next_read_pos {
return Err(SeccErrors::Empty);
}
let message: &T = (*((*read_ptr).cell).get())
.as_ref()
.expect("secc::peek(): empty receivable node");
Ok(message)
}
}
pub fn receive(&self) -> Result<T, SeccErrors<T>> {
unsafe {
let (ref mutex, ref condvar) = &*self.core.receive_ptrs;
let mut receive_ptrs = mutex.lock().unwrap();
let read_ptr = if receive_ptrs.cursor == NIL {
(*self.core.node_ptrs.get())[receive_ptrs.queue_head]
} else {
(*self.core.node_ptrs.get())[receive_ptrs.cursor]
};
let next_read_pos = (*read_ptr).next.load(Ordering::SeqCst);
if NIL == next_read_pos {
Err(SeccErrors::Empty)
} else {
let message: T = (*(*read_ptr).cell.get()).take().unwrap();
let pool_tail_ptr = (*self.core.node_ptrs.get())[receive_ptrs.pool_tail];
(*read_ptr).next.store(NIL, Ordering::SeqCst);
let new_pool_tail = if receive_ptrs.cursor == NIL {
receive_ptrs.pool_tail = receive_ptrs.queue_head;
let old_queue_head = receive_ptrs.queue_head;
receive_ptrs.queue_head = next_read_pos;
old_queue_head
} else {
let skipped_ptr = (*self.core.node_ptrs.get())[receive_ptrs.skipped];
((*skipped_ptr).next).store(next_read_pos, Ordering::SeqCst);
(*read_ptr).next.store(NIL, Ordering::SeqCst);
receive_ptrs.pool_tail = receive_ptrs.cursor;
let old_cursor = receive_ptrs.cursor;
receive_ptrs.cursor = next_read_pos;
old_cursor
};
self.core.received.fetch_add(1, Ordering::SeqCst);
self.core.receivable.fetch_sub(1, Ordering::SeqCst);
self.core.pending.fetch_sub(1, Ordering::SeqCst);
(*pool_tail_ptr).next.store(new_pool_tail, Ordering::SeqCst);
condvar.notify_all();
Ok(message)
}
}
}
pub fn pop(&self) -> Result<(), SeccErrors<T>> {
self.receive()?;
Ok(())
}
pub fn receive_await_timeout(&self, timeout_ms: u16) -> Result<T, SeccErrors<T>> {
loop {
match self.receive() {
Err(SeccErrors::Empty) => {
let dur = Duration::from_millis(timeout_ms as u64);
let (ref mutex, ref condvar) = &*self.core.send_ptrs;
let send_ptrs = mutex.lock().unwrap();
let next_pool_head = unsafe {
let pool_head_ptr = (*self.core.node_ptrs.get())[send_ptrs.pool_head];
(*pool_head_ptr).next.load(Ordering::SeqCst)
};
if NIL != next_pool_head {
let result = condvar.wait_timeout(send_ptrs, dur).unwrap();
self.core.awaited_messages.fetch_add(1, Ordering::SeqCst);
if result.1.timed_out() {
return self.receive();
}
}
}
v => return v,
}
}
}
pub fn receive_await(&self) -> Result<T, SeccErrors<T>> {
loop {
match self.receive_await_timeout(10) {
Err(SeccErrors::Empty) => (),
other => return other,
}
}
}
pub fn skip(&self) -> Result<(), SeccErrors<T>> {
unsafe {
let (ref mutex, _) = &*self.core.receive_ptrs;
let mut receive_ptrs = mutex.lock().unwrap();
let read_ptr = if receive_ptrs.cursor == NIL {
(*self.core.node_ptrs.get())[receive_ptrs.queue_head]
} else {
(*self.core.node_ptrs.get())[receive_ptrs.cursor]
};
let next_read_pos = (*read_ptr).next.load(Ordering::SeqCst);
if NIL == next_read_pos {
return Err(SeccErrors::Empty);
}
if receive_ptrs.cursor == NIL {
receive_ptrs.skipped = receive_ptrs.queue_head;
receive_ptrs.cursor = next_read_pos;
} else {
receive_ptrs.skipped = receive_ptrs.cursor;
receive_ptrs.cursor = next_read_pos;
}
self.core.receivable.fetch_sub(1, Ordering::SeqCst);
Ok(())
}
}
pub fn reset_skip(&self) -> Result<(), SeccErrors<T>> {
let (ref mutex, ref condvar) = &*self.core.receive_ptrs;
let mut receive_ptrs = mutex.lock().unwrap();
if receive_ptrs.cursor != NIL {
unsafe {
let mut count: usize = 1; let mut next_ptr = (*(*self.core.node_ptrs.get())[receive_ptrs.queue_head])
.next
.load(Ordering::SeqCst);
while next_ptr != receive_ptrs.cursor {
count += 1;
next_ptr = (*(*self.core.node_ptrs.get())[next_ptr])
.next
.load(Ordering::SeqCst);
}
self.core.receivable.fetch_add(count, Ordering::SeqCst);
receive_ptrs.cursor = NIL;
receive_ptrs.skipped = NIL;
}
}
condvar.notify_all();
Ok(())
}
pub fn receive_and_reset_skip(&self) -> Result<T, SeccErrors<T>> {
let result = self.receive()?;
self.reset_skip()?;
Ok(result)
}
pub fn pop_and_reset_skip(&self) -> Result<(), SeccErrors<T>> {
self.pop()?;
self.reset_skip()
}
}
impl<T: Sync + Send> SeccCoreOps<T> for SeccReceiver<T> {
fn core(&self) -> &SeccCore<T> {
&self.core
}
}
unsafe impl<T: Send + Sync> Send for SeccReceiver<T> {}
unsafe impl<T: Send + Sync> Sync for SeccReceiver<T> {}
pub fn create<T: Sync + Send>(capacity: u16, poll_ms: u16) -> (SeccSender<T>, SeccReceiver<T>) {
if capacity < 1 {
panic!("capacity cannot be smaller than 1");
}
let alloc_capacity = (capacity + 2) as usize;
let mut nodes = Vec::<SeccNode<T>>::with_capacity(alloc_capacity);
let mut node_ptrs = Vec::<*mut SeccNode<T>>::with_capacity(alloc_capacity);
nodes.push(SeccNode::<T>::new());
node_ptrs.push(nodes.last_mut().unwrap() as *mut SeccNode<T>);
let queue_head = nodes.len() - 1;
let queue_tail = queue_head;
nodes.push(SeccNode::<T>::new());
node_ptrs.push(nodes.last_mut().unwrap() as *mut SeccNode<T>);
let mut pool_head = nodes.len() - 1;
let pool_tail = pool_head;
for _ in 0..capacity {
nodes.push(SeccNode::<T>::with_next(pool_head));
node_ptrs.push(nodes.last_mut().unwrap() as *mut SeccNode<T>);
pool_head = nodes.len() - 1;
}
let send_ptrs = SeccSendPtrs {
queue_tail,
pool_head,
};
let receive_ptrs = SeccReceivePtrs {
queue_head,
pool_tail,
skipped: NIL,
cursor: NIL,
};
let core = Arc::new(SeccCore {
capacity: capacity as usize,
poll_ms,
_nodes: nodes.into_boxed_slice(),
node_ptrs: UnsafeCell::new(node_ptrs),
send_ptrs: Arc::new((Mutex::new(send_ptrs), Condvar::new())),
receive_ptrs: Arc::new((Mutex::new(receive_ptrs), Condvar::new())),
awaited_messages: AtomicUsize::new(0),
awaited_capacity: AtomicUsize::new(0),
pending: AtomicUsize::new(0),
receivable: AtomicUsize::new(0),
sent: AtomicUsize::new(0),
received: AtomicUsize::new(0),
});
let sender = SeccSender { core: core.clone() };
let receiver = SeccReceiver { core };
(sender, receiver)
}
pub fn create_with_arcs<T: Sync + Send>(
capacity: u16,
poll_ms: u16,
) -> (Arc<SeccSender<T>>, Arc<SeccReceiver<T>>) {
let (sender, receiver) = create(capacity, poll_ms);
(Arc::new(sender), Arc::new(receiver))
}
#[cfg(test)]
mod tests {
use super::*;
use log::info;
use log::LevelFilter;
use std::sync::MutexGuard;
use std::thread;
use std::time::Duration;
pub fn init_test_log() {
let _ = env_logger::builder()
.filter_level(LevelFilter::Debug)
.is_test(true)
.try_init();
}
macro_rules! assert_pointer_nodes {
(
$sender:expr,
$receiver:expr,
$queue_head:expr,
$queue_tail:expr,
$pool_head:expr,
$pool_tail:expr,
$skipped:expr,
$cursor:expr
) => {{
let actual = debug_channel($sender.core.clone());
let (ref mutex, _) = &*$sender.core.send_ptrs;
let send_ptrs = mutex.lock().unwrap();
let (ref mutex, _) = &*$receiver.core.receive_ptrs;
let receive_ptrs = mutex.lock().unwrap();
assert_eq!(
$queue_head, receive_ptrs.queue_head,
" <== queue_head mismatch!\n Actual: {}\n",
actual
);
assert_eq!(
$queue_tail, send_ptrs.queue_tail,
"<== queue_tail mismatch\n Actual: {}\n",
actual
);
assert_eq!(
$pool_head, send_ptrs.pool_head,
"<== pool_head mismatch\n Actual: {}\n",
actual
);
assert_eq!(
$pool_tail, receive_ptrs.pool_tail,
" <== pool_tail mismatch\n Actual: {}\n",
actual
);
assert_eq!(
$skipped, receive_ptrs.skipped,
" <== skipped mismatch\n Actual: {}\n",
actual
);
assert_eq!(
$cursor, receive_ptrs.cursor,
" <== cursor mismatch\n Actual: {}\n",
actual
);
}};
}
macro_rules! assert_node_next {
($pointers:expr, $node:expr, $next:expr) => {
unsafe { assert_eq!((*$pointers[$node]).next.load(Ordering::Relaxed), $next,) }
};
}
macro_rules! assert_node_next_nil {
($pointers:expr, $node:expr) => {
unsafe { assert_eq!((*$pointers[$node]).next.load(Ordering::Relaxed), NIL,) }
};
}
fn debug_send<T: Send + Sync>(
core: Arc<SeccCore<T>>,
send_ptrs: MutexGuard<SeccSendPtrs>,
) -> String {
unsafe {
let mut pool = Vec::with_capacity(core.capacity);
pool.push(send_ptrs.pool_head);
let mut next_ptr = (*(*core.node_ptrs.get())[send_ptrs.pool_head])
.next
.load(Ordering::SeqCst);
let mut count = 1;
while next_ptr != NIL {
count += 1;
pool.push(next_ptr);
next_ptr = (*(*core.node_ptrs.get())[next_ptr])
.next
.load(Ordering::SeqCst);
}
format!(
"send_ptrs: {:?}, pool_size: {}, pool: {:?}",
send_ptrs, count, pool
)
}
}
fn debug_receive<T: Send + Sync>(
core: Arc<SeccCore<T>>,
receive_ptrs: MutexGuard<SeccReceivePtrs>,
) -> String {
unsafe {
let mut queue = Vec::with_capacity(core.capacity);
let mut next_ptr = (*(*core.node_ptrs.get())[receive_ptrs.queue_head])
.next
.load(Ordering::SeqCst);
queue.push(receive_ptrs.queue_head);
let mut count = 1;
while next_ptr != NIL {
count += 1;
queue.push(next_ptr);
next_ptr = (*(*core.node_ptrs.get())[next_ptr])
.next
.load(Ordering::SeqCst);
}
format!(
"receive_ptrs: {:?}, queue_size: {}, queue: {:?}",
receive_ptrs, count, queue
)
}
}
pub fn debug_channel<T: Send + Sync>(core: Arc<SeccCore<T>>) -> String {
let r = core.receivable.load(Ordering::Relaxed);
let (ref mutex, _) = &*core.receive_ptrs;
let receive_ptrs = mutex.lock().unwrap();
let (ref mutex, _) = &*core.send_ptrs;
let send_ptrs = mutex.lock().unwrap();
format!(
"Receivable: {}, {}, {}",
r,
debug_receive(core.clone(), receive_ptrs),
debug_send(core.clone(), send_ptrs)
)
}
#[derive(Debug, Eq, PartialEq)]
enum Items {
A,
B,
C,
D,
E,
F,
}
#[test]
fn test_send_and_receive() {
init_test_log();
let channel = create::<Items>(5, 10);
let (sender, receiver) = channel;
let pointers = unsafe { &*sender.core.node_ptrs.get() };
assert_eq!(7, pointers.len());
assert_eq!(5, sender.core.capacity);
assert_eq!(5, sender.capacity());
assert_eq!(5, receiver.capacity());
assert_eq!(0, sender.pending());
assert_eq!(0, sender.receivable());
assert_eq!(0, sender.sent());
assert_eq!(0, sender.received());
assert_node_next_nil!(pointers, 0);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 1);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 0, 6, 1, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::A));
assert_eq!(1, sender.pending());
assert_eq!(1, sender.receivable());
assert_eq!(1, sender.sent());
assert_eq!(0, sender.received());
assert_node_next!(pointers, 0, 6);
assert_node_next_nil!(pointers, 6);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 1);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 6, 5, 1, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::B));
assert_eq!(2, sender.pending());
assert_eq!(2, sender.receivable());
assert_eq!(2, sender.sent());
assert_eq!(0, sender.received());
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 1);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 5, 4, 1, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::C));
assert_eq!(3, sender.pending());
assert_eq!(3, sender.receivable());
assert_eq!(3, sender.sent());
assert_eq!(0, sender.received());
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next_nil!(pointers, 4);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 1);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 4, 3, 1, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::D));
assert_eq!(4, sender.pending());
assert_eq!(4, sender.receivable());
assert_eq!(4, sender.sent());
assert_eq!(0, sender.received());
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next_nil!(pointers, 3);
assert_node_next!(pointers, 2, 1);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 3, 2, 1, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::E));
assert_eq!(5, sender.pending());
assert_eq!(5, sender.receivable());
assert_eq!(5, sender.sent());
assert_eq!(0, sender.received());
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 2, 1, 1, NIL, NIL);
assert_eq!(Err(SeccErrors::Full(Items::F)), sender.send(Items::F));
assert_eq!(5, sender.pending());
assert_eq!(5, sender.receivable());
assert_eq!(5, sender.sent());
assert_eq!(0, sender.received());
assert_eq!(Err(SeccErrors::Full(Items::F)), sender.send(Items::F));
assert_eq!(5, sender.pending());
assert_eq!(5, sender.receivable());
assert_eq!(5, sender.sent());
assert_eq!(0, sender.received());
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 2, 1, 1, NIL, NIL);
assert_eq!(Ok(&Items::A), receiver.peek());
assert_eq!(5, receiver.pending());
assert_eq!(5, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(0, receiver.received());
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 2, 1, 1, NIL, NIL);
assert_eq!(Ok(Items::A), receiver.receive());
assert_eq!(4, receiver.pending());
assert_eq!(4, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(1, receiver.received());
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next_nil!(pointers, 0);
assert_pointer_nodes!(sender, receiver, 6, 2, 1, 0, NIL, NIL);
assert_eq!(Ok(Items::B), receiver.receive());
assert_eq!(3, receiver.pending());
assert_eq!(3, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(2, receiver.received());
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next_nil!(pointers, 6);
assert_pointer_nodes!(sender, receiver, 5, 2, 1, 6, NIL, NIL);
assert_eq!(Ok(Items::C), receiver.receive());
assert_eq!(2, receiver.pending());
assert_eq!(2, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(3, receiver.received());
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next_nil!(pointers, 5);
assert_pointer_nodes!(sender, receiver, 4, 2, 1, 5, NIL, NIL);
assert_eq!(Ok(Items::D), receiver.receive());
assert_eq!(1, receiver.pending());
assert_eq!(1, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(4, receiver.received());
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next_nil!(pointers, 4);
assert_pointer_nodes!(sender, receiver, 3, 2, 1, 4, NIL, NIL);
assert_eq!(Ok(Items::E), receiver.receive());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(5, receiver.received());
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next_nil!(pointers, 3);
assert_pointer_nodes!(sender, receiver, 2, 2, 1, 3, NIL, NIL);
assert_eq!(Err(SeccErrors::Empty), receiver.receive());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(5, receiver.received());
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next_nil!(pointers, 3);
assert_pointer_nodes!(sender, receiver, 2, 2, 1, 3, NIL, NIL);
assert_eq!(Err(SeccErrors::Empty), receiver.receive());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(5, receiver.received());
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next_nil!(pointers, 3);
assert_pointer_nodes!(sender, receiver, 2, 2, 1, 3, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::F));
assert_eq!(1, receiver.pending());
assert_eq!(1, receiver.receivable());
assert_eq!(6, receiver.sent());
assert_eq!(5, receiver.received());
assert_node_next!(pointers, 2, 1);
assert_node_next_nil!(pointers, 1);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next_nil!(pointers, 3);
assert_pointer_nodes!(sender, receiver, 2, 1, 0, 3, NIL, NIL);
assert_eq!(Ok(Items::F), receiver.receive());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(6, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next_nil!(pointers, 1);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 1, 0, 2, NIL, NIL);
assert_eq!(Err(SeccErrors::Empty), receiver.skip());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(6, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next_nil!(pointers, 1);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 1, 0, 2, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::A));
assert_eq!(1, receiver.pending());
assert_eq!(1, receiver.receivable());
assert_eq!(7, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next!(pointers, 1, 0);
assert_node_next_nil!(pointers, 0);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 0, 6, 2, NIL, NIL);
assert_eq!(Ok(()), receiver.skip());
assert_eq!(1, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(7, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next!(pointers, 1, 0);
assert_node_next_nil!(pointers, 0);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 0, 6, 2, 1, 0);
assert_eq!(Err(SeccErrors::Empty), receiver.skip());
assert_eq!(1, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(7, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next!(pointers, 1, 0);
assert_node_next_nil!(pointers, 0);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 0, 6, 2, 1, 0);
assert_eq!(Ok(()), sender.send(Items::B));
assert_eq!(2, receiver.pending());
assert_eq!(1, receiver.receivable());
assert_eq!(8, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next_nil!(pointers, 6);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 6, 5, 2, 1, 0);
assert_eq!(Ok(&Items::B), receiver.peek());
assert_eq!(2, receiver.pending());
assert_eq!(1, receiver.receivable());
assert_eq!(8, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next_nil!(pointers, 6);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 6, 5, 2, 1, 0);
assert_eq!(Ok(()), sender.send(Items::C));
assert_eq!(Ok(&Items::B), receiver.peek());
assert_eq!(3, receiver.pending());
assert_eq!(2, receiver.receivable());
assert_eq!(9, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 5, 4, 2, 1, 0);
assert_eq!(Ok(()), receiver.skip());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 5, 4, 2, 0, 6);
assert_eq!(Ok(Items::C), receiver.receive());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 6);
assert_node_next_nil!(pointers, 6);
assert_pointer_nodes!(sender, receiver, 1, 5, 4, 6, 0, 5);
assert_eq!(Ok(()), receiver.reset_skip());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 6);
assert_node_next_nil!(pointers, 6);
assert_pointer_nodes!(sender, receiver, 1, 5, 4, 6, NIL, NIL);
assert_eq!(Ok(()), receiver.skip());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 6);
assert_node_next_nil!(pointers, 6);
assert_pointer_nodes!(sender, receiver, 1, 5, 4, 6, 1, 0);
assert_eq!(Ok(Items::B), receiver.receive_and_reset_skip());
assert_node_next!(pointers, 1, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 6);
assert_node_next!(pointers, 6, 0);
assert_node_next_nil!(pointers, 0);
assert_pointer_nodes!(sender, receiver, 1, 5, 4, 0, NIL, NIL);
}
#[test]
fn test_single_producer_single_receiver() {
init_test_log();
let message_count = 200;
let capacity = 32;
let (sender, receiver) = create_with_arcs::<u32>(capacity, 20);
let rx = thread::spawn(move || {
let mut count = 0;
while count < message_count {
match receiver.receive_await_timeout(20) {
Ok(_v) => count += 1,
_ => (),
};
}
});
let tx = thread::spawn(move || {
for i in 0..message_count {
sender.send_await_timeout(i, 20).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
tx.join().unwrap();
rx.join().unwrap();
}
#[test]
fn test_receive_before_send() {
init_test_log();
let (sender, receiver) = create_with_arcs::<u32>(5, 20);
let receiver2 = receiver.clone();
let mutex = Arc::new(Mutex::new(false));
let rx_mutex = mutex.clone();
let rx = thread::spawn(move || {
let mut guard = rx_mutex.lock().unwrap();
*guard = true;
drop(guard);
match receiver2.receive_await_timeout(20) {
Ok(_) => assert!(true),
e => assert!(false, "Error {:?} when receive.", e),
};
});
loop {
let guard = mutex.lock().unwrap();
if *guard == true {
break;
}
}
let tx = thread::spawn(move || {
match sender.send_await_timeout(1, 20) {
Ok(_) => assert!(true),
e => assert!(false, "Error {:?} when receive.", e),
};
});
tx.join().unwrap();
rx.join().unwrap();
assert_eq!(1, receiver.sent());
assert_eq!(1, receiver.received());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
}
#[test]
fn test_receive_concurrent_send() {
init_test_log();
let (sender, receiver) = create_with_arcs::<u32>(5, 20);
let receiver2 = receiver.clone();
let pair = Arc::new((Mutex::new((false, false)), Condvar::new()));
let rx_pair = pair.clone();
let tx_pair = pair.clone();
let rx = thread::spawn(move || {
let mut guard = rx_pair.0.lock().unwrap();
guard.0 = true;
let c_guard = rx_pair.1.wait(guard).unwrap();
drop(c_guard);
match receiver2.receive_await_timeout(20) {
Ok(_) => assert!(true),
e => assert!(false, "Error {:?} when receive.", e),
};
});
let tx = thread::spawn(move || {
let mut guard = tx_pair.0.lock().unwrap();
guard.1 = true;
let c_guard = tx_pair.1.wait(guard).unwrap();
drop(c_guard);
match sender.send_await_timeout(1 as u32, 20) {
Ok(_) => assert!(true),
e => assert!(false, "Error {:?} when receive.", e),
};
});
loop {
let guard = pair.0.lock().unwrap();
if guard.0 && guard.1 {
break;
}
}
let guard = pair.0.lock().unwrap();
pair.1.notify_all();
drop(guard);
tx.join().unwrap();
rx.join().unwrap();
assert_eq!(1, receiver.sent());
assert_eq!(1, receiver.received());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
}
#[test]
fn test_multiple_producer_single_receiver() {
init_test_log();
let message_count = 10000;
let capacity = 10;
let (sender, receiver) = create_with_arcs::<u32>(capacity, 20);
let debug_if_needed = |core: Arc<SeccCore<u32>>| {
if core.receivable.load(Ordering::Relaxed) > core.capacity {
println!(
"{}: {}",
thread::current().name().unwrap(),
debug_channel(core)
);
}
};
let receiver1 = receiver.clone();
let rx = thread::Builder::new()
.name("R1".into())
.spawn(move || {
let mut count = 0;
while count < message_count {
match receiver1.receive_await_timeout(20) {
Ok(_) => {
debug_if_needed(receiver1.core.clone());
count += 1;
}
_ => (),
};
}
})
.unwrap();
let sender1 = sender.clone();
let tx = thread::Builder::new()
.name("S1".into())
.spawn(move || {
for i in 0..(message_count / 3) {
match sender1.send_await_timeout(i, 20) {
Ok(_c) => {
debug_if_needed(sender1.core.clone());
()
}
Err(e) => assert!(false, "----> Error while sending: {}:{:?}", i, e),
}
}
})
.unwrap();
let sender2 = sender.clone();
let tx2 = thread::Builder::new()
.name("S2".into())
.spawn(move || {
for i in (message_count / 3)..((message_count / 3) * 2) {
match sender2.send_await_timeout(i, 20) {
Ok(_c) => {
debug_if_needed(sender2.core.clone());
()
}
Err(e) => assert!(false, "----> Error while sending: {}:{:?}", i, e),
}
}
})
.unwrap();
let sender3 = sender.clone();
let tx3 = thread::Builder::new()
.name("S3".into())
.spawn(move || {
for i in ((message_count / 3) * 2)..(message_count) {
match sender3.send_await_timeout(i, 20) {
Ok(_c) => {
debug_if_needed(sender3.core.clone());
()
}
Err(e) => assert!(false, "----> Error while sending: {}:{:?}", i, e),
}
}
})
.unwrap();
tx.join().unwrap();
tx2.join().unwrap();
tx3.join().unwrap();
rx.join().unwrap();
info!(
"All messages complete: awaited_messages: {}, awaited_capacity: {}",
receiver.awaited_messages(),
sender.awaited_capacity()
);
}
}