use std::cell::UnsafeCell;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::time::Duration;
const NIL: usize = 1 << 16 as usize;
#[derive(Eq, PartialEq)]
pub enum SeccErrors<T: Sync + Send + Clone> {
Full(T),
Empty,
}
impl<T: Sync + Send + Clone> fmt::Debug for SeccErrors<T> {
fn fmt(&self, formatter: &'_ mut fmt::Formatter) -> fmt::Result {
match self {
SeccErrors::Full(_) => write!(formatter, "SeccErrors::Full"),
SeccErrors::Empty => write!(formatter, "SeccErrors::Empty"),
}
}
}
impl<T: Sync + Send + Clone> fmt::Display for SeccErrors<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl<T: Sync + Send + Clone> std::error::Error for SeccErrors<T> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
struct SeccNode<T: Sync + Send + Clone> {
cell: UnsafeCell<Option<T>>,
next: AtomicUsize,
}
impl<T: Sync + Send + Clone> SeccNode<T> {
fn new() -> SeccNode<T> {
SeccNode {
cell: UnsafeCell::new(None),
next: AtomicUsize::new(NIL),
}
}
fn with_next(next: usize) -> SeccNode<T> {
SeccNode {
cell: UnsafeCell::new(None),
next: AtomicUsize::new(next),
}
}
}
pub trait SeccCoreOps<T: Sync + Send + Clone> {
fn core(&self) -> &SeccCore<T>;
fn capacity(&self) -> usize {
self.core().capacity
}
fn awaited_messages(&self) -> usize {
self.core().awaited_messages.load(Ordering::Relaxed)
}
fn awaited_capacity(&self) -> usize {
self.core().awaited_capacity.load(Ordering::Relaxed)
}
fn pending(&self) -> usize {
self.core().pending.load(Ordering::Relaxed)
}
fn receivable(&self) -> usize {
self.core().receivable.load(Ordering::Relaxed)
}
fn sent(&self) -> usize {
self.core().sent.load(Ordering::Relaxed)
}
fn received(&self) -> usize {
self.core().received.load(Ordering::Relaxed)
}
}
#[derive(Debug)]
struct SeccSendPtrs {
queue_tail: usize,
pool_head: usize,
}
#[derive(Debug)]
struct SeccReceivePtrs {
queue_head: usize,
pool_tail: usize,
skipped: usize,
cursor: usize,
}
pub struct SeccCore<T: Sync + Send + Clone> {
capacity: usize,
poll_timeout: Duration,
nodes: Box<[SeccNode<T>]>,
send_ptrs: Arc<(Mutex<SeccSendPtrs>, Condvar)>,
receive_ptrs: Arc<(Mutex<SeccReceivePtrs>, Condvar)>,
awaited_messages: AtomicUsize,
awaited_capacity: AtomicUsize,
pending: AtomicUsize,
receivable: AtomicUsize,
sent: AtomicUsize,
received: AtomicUsize,
}
pub struct SeccSender<T: Sync + Send + Clone> {
core: Arc<SeccCore<T>>,
}
impl<T: Sync + Send + Clone> Clone for SeccSender<T> {
fn clone(&self) -> Self {
SeccSender {
core: self.core.clone(),
}
}
}
impl<T: Sync + Send + Clone> SeccSender<T> {
fn debug_locked(&self, send_ptrs: &MutexGuard<SeccSendPtrs>) -> String {
let mut pool = Vec::with_capacity(self.core.capacity);
pool.push(send_ptrs.pool_head);
let mut next_ptr = self.core.nodes[send_ptrs.pool_head]
.next
.load(Ordering::SeqCst);
let mut count = 1;
while next_ptr != NIL {
count += 1;
pool.push(next_ptr);
next_ptr = self.core.nodes[next_ptr].next.load(Ordering::SeqCst);
}
format!(
"send_ptrs: {:?}, pool_size: {}, pool: {:?}",
send_ptrs, count, pool
)
}
pub fn send(&self, message: T) -> Result<(), SeccErrors<T>> {
let (ref mutex, ref condvar) = &*self.core.send_ptrs;
let mut send_ptrs = mutex.lock().unwrap();
let pool_head_ptr = &self.core.nodes[send_ptrs.pool_head];
let next_pool_head = pool_head_ptr.next.load(Ordering::SeqCst);
if NIL == next_pool_head {
Err(SeccErrors::Full(message))
} else {
let queue_tail_ptr = &self.core.nodes[send_ptrs.queue_tail];
unsafe {
*queue_tail_ptr.cell.get() = Some(message);
}
let old_pool_head = send_ptrs.pool_head;
send_ptrs.queue_tail = send_ptrs.pool_head;
send_ptrs.pool_head = next_pool_head;
self.core.sent.fetch_add(1, Ordering::SeqCst);
self.core.receivable.fetch_add(1, Ordering::SeqCst);
self.core.pending.fetch_add(1, Ordering::SeqCst);
pool_head_ptr.next.store(NIL, Ordering::SeqCst);
queue_tail_ptr.next.store(old_pool_head, Ordering::SeqCst);
condvar.notify_all();
Ok(())
}
}
pub fn send_await_timeout(
&self,
mut message: T,
timeout: Duration,
) -> Result<(), SeccErrors<T>> {
loop {
match self.send(message) {
Err(SeccErrors::Full(v)) => {
message = v;
let (ref mutex, ref condvar) = &*self.core.receive_ptrs;
let receive_ptrs = mutex.lock().unwrap();
let (_, result) = condvar.wait_timeout(receive_ptrs, timeout).unwrap();
self.core.awaited_capacity.fetch_add(1, Ordering::SeqCst);
if result.timed_out() {
return self.send(message);
}
}
v => return v,
}
}
}
pub fn send_await(&self, mut message: T) -> Result<(), SeccErrors<T>> {
loop {
match self.send_await_timeout(message, self.core.poll_timeout) {
Err(SeccErrors::Full(v)) => {
message = v;
}
other => return other,
}
}
}
}
impl<T: Sync + Send + Clone> SeccCoreOps<T> for SeccSender<T> {
fn core(&self) -> &SeccCore<T> {
&self.core
}
}
impl<T: Sync + Send + Clone> fmt::Debug for SeccSender<T> {
fn fmt(&self, formatter: &'_ mut fmt::Formatter) -> fmt::Result {
let (ref mutex, _) = &*self.core.send_ptrs;
let send_ptrs = mutex.lock().unwrap();
write!(formatter, "{}", self.debug_locked(&send_ptrs))
}
}
unsafe impl<T: Send + Sync + Clone> Send for SeccSender<T> {}
unsafe impl<T: Send + Sync + Clone> Sync for SeccSender<T> {}
pub struct SeccReceiver<T: Sync + Send + Clone> {
core: Arc<SeccCore<T>>,
}
impl<T: Sync + Send + Clone> Clone for SeccReceiver<T> {
fn clone(&self) -> Self {
SeccReceiver {
core: self.core.clone(),
}
}
}
impl<T: Sync + Send + Clone> SeccReceiver<T> {
fn debug_locked(&self, receive_ptrs: &MutexGuard<SeccReceivePtrs>) -> String {
let mut queue = Vec::with_capacity(self.core.capacity);
let mut next_ptr = self.core.nodes[receive_ptrs.queue_head]
.next
.load(Ordering::SeqCst);
queue.push(receive_ptrs.queue_head);
let mut count = 1;
while next_ptr != NIL {
count += 1;
queue.push(next_ptr);
next_ptr = self.core.nodes[next_ptr].next.load(Ordering::SeqCst);
}
format!(
"receive_ptrs: {:?}, queue_size: {}, queue: {:?}",
receive_ptrs, count, queue
)
}
pub fn peek(&self) -> Result<T, SeccErrors<T>> {
let (ref mutex, _) = &*self.core.receive_ptrs;
let receive_ptrs = mutex.lock().unwrap();
let read_ptr = if receive_ptrs.cursor == NIL {
&self.core.nodes[receive_ptrs.queue_head]
} else {
&self.core.nodes[receive_ptrs.cursor]
};
let next_read_pos = (*read_ptr).next.load(Ordering::SeqCst);
if NIL == next_read_pos {
return Err(SeccErrors::Empty);
}
let message: T = unsafe {
(*((*read_ptr).cell).get())
.clone()
.expect("secc::peek(): empty receivable node")
};
Ok(message)
}
pub fn receive(&self) -> Result<T, SeccErrors<T>> {
let (ref mutex, ref condvar) = &*self.core.receive_ptrs;
let mut receive_ptrs = mutex.lock().unwrap();
let read_ptr = if receive_ptrs.cursor == NIL {
&self.core.nodes[receive_ptrs.queue_head]
} else {
&self.core.nodes[receive_ptrs.cursor]
};
let next_read_pos = (*read_ptr).next.load(Ordering::SeqCst);
if NIL == next_read_pos {
Err(SeccErrors::Empty)
} else {
let message: T = unsafe { (*(*read_ptr).cell.get()).take().unwrap() };
let pool_tail_ptr = &self.core.nodes[receive_ptrs.pool_tail];
(*read_ptr).next.store(NIL, Ordering::SeqCst);
let new_pool_tail = if receive_ptrs.cursor == NIL {
receive_ptrs.pool_tail = receive_ptrs.queue_head;
let old_queue_head = receive_ptrs.queue_head;
receive_ptrs.queue_head = next_read_pos;
old_queue_head
} else {
let skipped_ptr = &self.core.nodes[receive_ptrs.skipped];
((*skipped_ptr).next).store(next_read_pos, Ordering::SeqCst);
(*read_ptr).next.store(NIL, Ordering::SeqCst);
receive_ptrs.pool_tail = receive_ptrs.cursor;
let old_cursor = receive_ptrs.cursor;
receive_ptrs.cursor = next_read_pos;
old_cursor
};
self.core.received.fetch_add(1, Ordering::SeqCst);
self.core.receivable.fetch_sub(1, Ordering::SeqCst);
self.core.pending.fetch_sub(1, Ordering::SeqCst);
(*pool_tail_ptr).next.store(new_pool_tail, Ordering::SeqCst);
condvar.notify_all();
Ok(message)
}
}
pub fn pop(&self) -> Result<(), SeccErrors<T>> {
self.receive()?;
Ok(())
}
pub fn receive_await_timeout(&self, timeout: Duration) -> Result<T, SeccErrors<T>> {
loop {
match self.receive() {
Err(SeccErrors::Empty) => {
let (ref mutex, ref condvar) = &*self.core.send_ptrs;
let send_ptrs = mutex.lock().unwrap();
let (_, result) = condvar.wait_timeout(send_ptrs, timeout).unwrap();
self.core.awaited_capacity.fetch_add(1, Ordering::SeqCst);
if result.timed_out() {
return self.receive();
}
}
v => return v,
}
}
}
pub fn receive_await(&self) -> Result<T, SeccErrors<T>> {
loop {
match self.receive_await_timeout(self.core.poll_timeout) {
Err(SeccErrors::Empty) => (),
other => return other,
}
}
}
pub fn skip(&self) -> Result<(), SeccErrors<T>> {
let (ref mutex, _) = &*self.core.receive_ptrs;
let mut receive_ptrs = mutex.lock().unwrap();
let read_ptr = if receive_ptrs.cursor == NIL {
&self.core.nodes[receive_ptrs.queue_head]
} else {
&self.core.nodes[receive_ptrs.cursor]
};
let next_read_pos = read_ptr.next.load(Ordering::SeqCst);
if NIL == next_read_pos {
return Err(SeccErrors::Empty);
}
if receive_ptrs.cursor == NIL {
receive_ptrs.skipped = receive_ptrs.queue_head;
receive_ptrs.cursor = next_read_pos;
} else {
receive_ptrs.skipped = receive_ptrs.cursor;
receive_ptrs.cursor = next_read_pos;
}
self.core.receivable.fetch_sub(1, Ordering::SeqCst);
Ok(())
}
pub fn reset_skip(&self) -> Result<(), SeccErrors<T>> {
let (ref mutex, ref condvar) = &*self.core.receive_ptrs;
let mut receive_ptrs = mutex.lock().unwrap();
if receive_ptrs.cursor != NIL {
let mut count: usize = 1;
let mut next_ptr = self.core.nodes[receive_ptrs.queue_head]
.next
.load(Ordering::SeqCst);
while next_ptr != receive_ptrs.cursor {
count += 1;
next_ptr = self.core.nodes[next_ptr].next.load(Ordering::SeqCst);
}
self.core.receivable.fetch_add(count, Ordering::SeqCst);
receive_ptrs.cursor = NIL;
receive_ptrs.skipped = NIL;
}
condvar.notify_all();
Ok(())
}
pub fn receive_and_reset_skip(&self) -> Result<T, SeccErrors<T>> {
let result = self.receive()?;
self.reset_skip()?;
Ok(result)
}
pub fn pop_and_reset_skip(&self) -> Result<(), SeccErrors<T>> {
self.pop()?;
self.reset_skip()
}
}
impl<T: Sync + Send + Clone> SeccCoreOps<T> for SeccReceiver<T> {
fn core(&self) -> &SeccCore<T> {
&self.core
}
}
impl<T: Sync + Send + Clone> fmt::Debug for SeccReceiver<T> {
fn fmt(&self, formatter: &'_ mut fmt::Formatter) -> fmt::Result {
let (ref mutex, _) = &*self.core.receive_ptrs;
let receive_ptrs = mutex.lock().unwrap();
write!(formatter, "{}", self.debug_locked(&receive_ptrs))
}
}
unsafe impl<T: Send + Sync + Clone> Send for SeccReceiver<T> {}
unsafe impl<T: Send + Sync + Clone> Sync for SeccReceiver<T> {}
pub fn create<T: Sync + Send + Clone>(
capacity: u16,
poll_timeout: Duration,
) -> (SeccSender<T>, SeccReceiver<T>) {
if capacity < 1 {
panic!("capacity cannot be smaller than 1");
}
let alloc_capacity = (capacity + 2) as usize;
let mut nodes = Vec::<SeccNode<T>>::with_capacity(alloc_capacity);
nodes.push(SeccNode::<T>::new());
let queue_head = nodes.len() - 1;
let queue_tail = queue_head;
nodes.push(SeccNode::<T>::new());
let mut pool_head = nodes.len() - 1;
let pool_tail = pool_head;
for _ in 0..capacity {
nodes.push(SeccNode::<T>::with_next(pool_head));
pool_head = nodes.len() - 1;
}
let send_ptrs = SeccSendPtrs {
queue_tail,
pool_head,
};
let receive_ptrs = SeccReceivePtrs {
queue_head,
pool_tail,
skipped: NIL,
cursor: NIL,
};
let core = Arc::new(SeccCore {
capacity: capacity as usize,
poll_timeout,
nodes: nodes.into_boxed_slice(),
send_ptrs: Arc::new((Mutex::new(send_ptrs), Condvar::new())),
receive_ptrs: Arc::new((Mutex::new(receive_ptrs), Condvar::new())),
awaited_messages: AtomicUsize::new(0),
awaited_capacity: AtomicUsize::new(0),
pending: AtomicUsize::new(0),
receivable: AtomicUsize::new(0),
sent: AtomicUsize::new(0),
received: AtomicUsize::new(0),
});
let sender = SeccSender { core: core.clone() };
let receiver = SeccReceiver { core };
(sender, receiver)
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
macro_rules! assert_pointer_nodes {
(
$sender:expr,
$receiver:expr,
$queue_head:expr,
$queue_tail:expr,
$pool_head:expr,
$pool_tail:expr,
$skipped:expr,
$cursor:expr
) => {{
let actual = debug_channel($sender.clone(), $receiver.clone());
let (ref mutex, _) = &*$sender.core.send_ptrs;
let send_ptrs = mutex.lock().unwrap();
let (ref mutex, _) = &*$receiver.core.receive_ptrs;
let receive_ptrs = mutex.lock().unwrap();
assert_eq!(
$queue_head, receive_ptrs.queue_head,
" <== queue_head mismatch!\n Actual: {}\n",
actual
);
assert_eq!(
$queue_tail, send_ptrs.queue_tail,
"<== queue_tail mismatch\n Actual: {}\n",
actual
);
assert_eq!(
$pool_head, send_ptrs.pool_head,
"<== pool_head mismatch\n Actual: {}\n",
actual
);
assert_eq!(
$pool_tail, receive_ptrs.pool_tail,
" <== pool_tail mismatch\n Actual: {}\n",
actual
);
assert_eq!(
$skipped, receive_ptrs.skipped,
" <== skipped mismatch\n Actual: {}\n",
actual
);
assert_eq!(
$cursor, receive_ptrs.cursor,
" <== cursor mismatch\n Actual: {}\n",
actual
);
}};
}
macro_rules! assert_node_next {
($pointers:expr, $node:expr, $next:expr) => {
assert_eq!($pointers[$node].next.load(Ordering::Relaxed), $next,)
};
}
macro_rules! assert_node_next_nil {
($pointers:expr, $node:expr) => {
assert_eq!($pointers[$node].next.load(Ordering::Relaxed), NIL,)
};
}
pub fn debug_channel<T: Send + Sync + Clone>(
sender: SeccSender<T>,
receiver: SeccReceiver<T>,
) -> String {
format!("{{ Sender: {:?}, Receiver: {:?} }}", sender, receiver)
}
#[derive(Debug, Eq, PartialEq, Clone)]
enum Items {
A,
B,
C,
D,
E,
F,
}
#[test]
fn test_pop_after_peek() {
use std::num::NonZeroU8;
let value = NonZeroU8::new(5).unwrap();
let (tx, rx) = create::<NonZeroU8>(1, Duration::from_millis(100));
tx.send(value).unwrap();
let item = rx.peek().unwrap();
assert_eq!(value, item);
rx.pop().unwrap();
assert_eq!(value, item);
}
#[test]
fn test_peek_empty() {
let (sender, receiver) = create::<Items>(5, Duration::from_millis(10));
assert_eq!(Err(SeccErrors::Empty), receiver.peek());
sender.send(Items::A).unwrap();
receiver.pop().unwrap();
assert_eq!(Err(SeccErrors::Empty), receiver.peek());
}
#[test]
fn test_pop_while_peeking() {
let (sender, receiver) = create::<Items>(5, Duration::from_millis(10));
let peeked = Arc::new((Mutex::new(false), Condvar::new()));
let popped = Arc::new((Mutex::new(false), Condvar::new()));
let peeked_clone = peeked.clone();
let popped_clone = popped.clone();
let receiver_clone = receiver.clone();
sender.send(Items::A).unwrap();
let handle_peek = thread::spawn(move || {
let (ref mutex1, ref cvar1) = &*peeked_clone;
let mut ready = mutex1.lock().unwrap();
*ready = true;
let _item = receiver_clone.peek();
cvar1.notify_all();
drop(ready);
let (ref mutex2, ref cvar2) = &*popped_clone;
let mut done = mutex2.lock().unwrap();
while !*done {
done = cvar2.wait(done).unwrap();
}
assert_eq!(Err(SeccErrors::Empty), receiver_clone.pop());
});
let handle_pop = thread::spawn(move || {
let (ref mutex1, ref cvar1) = &*peeked;
let mut ready = mutex1.lock().unwrap();
while !*ready {
ready = cvar1.wait(ready).unwrap();
}
let (ref mutex2, ref cvar2) = &*popped;
let mut done = mutex2.lock().unwrap();
*done = true;
receiver.pop().unwrap();
cvar2.notify_all();
});
handle_pop.join().unwrap();
handle_peek.join().unwrap();
}
#[test]
fn test_clone_with_unclonable() {
struct Unclonable {}
let (sender, receiver) = create::<Arc<Unclonable>>(5, Duration::from_millis(10));
let _s_clone = sender.clone();
let _r_clone = receiver.clone();
}
#[test]
fn test_send_and_receive() {
let channel = create::<Items>(5, Duration::from_millis(10));
let (sender, receiver) = channel;
let pointers = &sender.core.nodes;
assert_eq!(7, pointers.len());
assert_eq!(5, sender.core.capacity);
assert_eq!(5, sender.capacity());
assert_eq!(5, receiver.capacity());
assert_eq!(0, sender.pending());
assert_eq!(0, sender.receivable());
assert_eq!(0, sender.sent());
assert_eq!(0, sender.received());
assert_node_next_nil!(pointers, 0);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 1);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 0, 6, 1, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::A));
assert_eq!(1, sender.pending());
assert_eq!(1, sender.receivable());
assert_eq!(1, sender.sent());
assert_eq!(0, sender.received());
assert_node_next!(pointers, 0, 6);
assert_node_next_nil!(pointers, 6);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 1);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 6, 5, 1, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::B));
assert_eq!(2, sender.pending());
assert_eq!(2, sender.receivable());
assert_eq!(2, sender.sent());
assert_eq!(0, sender.received());
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 1);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 5, 4, 1, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::C));
assert_eq!(3, sender.pending());
assert_eq!(3, sender.receivable());
assert_eq!(3, sender.sent());
assert_eq!(0, sender.received());
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next_nil!(pointers, 4);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 1);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 4, 3, 1, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::D));
assert_eq!(4, sender.pending());
assert_eq!(4, sender.receivable());
assert_eq!(4, sender.sent());
assert_eq!(0, sender.received());
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next_nil!(pointers, 3);
assert_node_next!(pointers, 2, 1);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 3, 2, 1, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::E));
assert_eq!(5, sender.pending());
assert_eq!(5, sender.receivable());
assert_eq!(5, sender.sent());
assert_eq!(0, sender.received());
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 2, 1, 1, NIL, NIL);
assert_eq!(Err(SeccErrors::Full(Items::F)), sender.send(Items::F));
assert_eq!(5, sender.pending());
assert_eq!(5, sender.receivable());
assert_eq!(5, sender.sent());
assert_eq!(0, sender.received());
assert_eq!(Err(SeccErrors::Full(Items::F)), sender.send(Items::F));
assert_eq!(5, sender.pending());
assert_eq!(5, sender.receivable());
assert_eq!(5, sender.sent());
assert_eq!(0, sender.received());
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 2, 1, 1, NIL, NIL);
assert_eq!(Ok(Items::A), receiver.peek());
assert_eq!(5, receiver.pending());
assert_eq!(5, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(0, receiver.received());
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next_nil!(pointers, 1);
assert_pointer_nodes!(sender, receiver, 0, 2, 1, 1, NIL, NIL);
assert_eq!(Ok(Items::A), receiver.receive());
assert_eq!(4, receiver.pending());
assert_eq!(4, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(1, receiver.received());
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next_nil!(pointers, 0);
assert_pointer_nodes!(sender, receiver, 6, 2, 1, 0, NIL, NIL);
assert_eq!(Ok(Items::B), receiver.receive());
assert_eq!(3, receiver.pending());
assert_eq!(3, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(2, receiver.received());
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next_nil!(pointers, 6);
assert_pointer_nodes!(sender, receiver, 5, 2, 1, 6, NIL, NIL);
assert_eq!(Ok(Items::C), receiver.receive());
assert_eq!(2, receiver.pending());
assert_eq!(2, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(3, receiver.received());
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next_nil!(pointers, 5);
assert_pointer_nodes!(sender, receiver, 4, 2, 1, 5, NIL, NIL);
assert_eq!(Ok(Items::D), receiver.receive());
assert_eq!(1, receiver.pending());
assert_eq!(1, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(4, receiver.received());
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next_nil!(pointers, 4);
assert_pointer_nodes!(sender, receiver, 3, 2, 1, 4, NIL, NIL);
assert_eq!(Ok(Items::E), receiver.receive());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(5, receiver.received());
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next_nil!(pointers, 3);
assert_pointer_nodes!(sender, receiver, 2, 2, 1, 3, NIL, NIL);
assert_eq!(Err(SeccErrors::Empty), receiver.receive());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(5, receiver.received());
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next_nil!(pointers, 3);
assert_pointer_nodes!(sender, receiver, 2, 2, 1, 3, NIL, NIL);
assert_eq!(Err(SeccErrors::Empty), receiver.receive());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(5, receiver.sent());
assert_eq!(5, receiver.received());
assert_node_next_nil!(pointers, 2);
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next_nil!(pointers, 3);
assert_pointer_nodes!(sender, receiver, 2, 2, 1, 3, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::F));
assert_eq!(1, receiver.pending());
assert_eq!(1, receiver.receivable());
assert_eq!(6, receiver.sent());
assert_eq!(5, receiver.received());
assert_node_next!(pointers, 2, 1);
assert_node_next_nil!(pointers, 1);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next_nil!(pointers, 3);
assert_pointer_nodes!(sender, receiver, 2, 1, 0, 3, NIL, NIL);
assert_eq!(Ok(Items::F), receiver.receive());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(6, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next_nil!(pointers, 1);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 1, 0, 2, NIL, NIL);
assert_eq!(Err(SeccErrors::Empty), receiver.skip());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(6, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next_nil!(pointers, 1);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 1, 0, 2, NIL, NIL);
assert_eq!(Ok(()), sender.send(Items::A));
assert_eq!(1, receiver.pending());
assert_eq!(1, receiver.receivable());
assert_eq!(7, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next!(pointers, 1, 0);
assert_node_next_nil!(pointers, 0);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 0, 6, 2, NIL, NIL);
assert_eq!(Ok(()), receiver.skip());
assert_eq!(1, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(7, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next!(pointers, 1, 0);
assert_node_next_nil!(pointers, 0);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 0, 6, 2, 1, 0);
assert_eq!(Err(SeccErrors::Empty), receiver.skip());
assert_eq!(1, receiver.pending());
assert_eq!(0, receiver.receivable());
assert_eq!(7, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next!(pointers, 1, 0);
assert_node_next_nil!(pointers, 0);
assert_node_next!(pointers, 6, 5);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 0, 6, 2, 1, 0);
assert_eq!(Ok(()), sender.send(Items::B));
assert_eq!(2, receiver.pending());
assert_eq!(1, receiver.receivable());
assert_eq!(8, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next_nil!(pointers, 6);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 6, 5, 2, 1, 0);
assert_eq!(Ok(Items::B), receiver.peek());
assert_eq!(2, receiver.pending());
assert_eq!(1, receiver.receivable());
assert_eq!(8, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next_nil!(pointers, 6);
assert_node_next!(pointers, 5, 4);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 6, 5, 2, 1, 0);
assert_eq!(Ok(()), sender.send(Items::C));
assert_eq!(Ok(Items::B), receiver.peek());
assert_eq!(3, receiver.pending());
assert_eq!(2, receiver.receivable());
assert_eq!(9, receiver.sent());
assert_eq!(6, receiver.received());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 5, 4, 2, 1, 0);
assert_eq!(Ok(()), receiver.skip());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 6);
assert_node_next!(pointers, 6, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next_nil!(pointers, 2);
assert_pointer_nodes!(sender, receiver, 1, 5, 4, 2, 0, 6);
assert_eq!(Ok(Items::C), receiver.receive());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 6);
assert_node_next_nil!(pointers, 6);
assert_pointer_nodes!(sender, receiver, 1, 5, 4, 6, 0, 5);
assert_eq!(Ok(()), receiver.reset_skip());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 6);
assert_node_next_nil!(pointers, 6);
assert_pointer_nodes!(sender, receiver, 1, 5, 4, 6, NIL, NIL);
assert_eq!(Ok(()), receiver.skip());
assert_node_next!(pointers, 1, 0);
assert_node_next!(pointers, 0, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 6);
assert_node_next_nil!(pointers, 6);
assert_pointer_nodes!(sender, receiver, 1, 5, 4, 6, 1, 0);
assert_eq!(Ok(Items::B), receiver.receive_and_reset_skip());
assert_node_next!(pointers, 1, 5);
assert_node_next_nil!(pointers, 5);
assert_node_next!(pointers, 4, 3);
assert_node_next!(pointers, 3, 2);
assert_node_next!(pointers, 2, 6);
assert_node_next!(pointers, 6, 0);
assert_node_next_nil!(pointers, 0);
assert_pointer_nodes!(sender, receiver, 1, 5, 4, 0, NIL, NIL);
}
#[test]
fn test_single_producer_single_receiver() {
let message_count = 200;
let capacity = 32;
let (sender, receiver) = create::<u32>(capacity, Duration::from_millis(20));
let rx = thread::spawn(move || {
let mut count = 0;
while count < message_count {
match receiver.receive_await_timeout(Duration::from_millis(20)) {
Ok(_v) => count += 1,
_ => (),
};
}
});
let tx = thread::spawn(move || {
for i in 0..message_count {
sender
.send_await_timeout(i, Duration::from_millis(20))
.unwrap();
thread::sleep(Duration::from_millis(1));
}
});
tx.join().unwrap();
rx.join().unwrap();
}
#[test]
fn test_receive_before_send() {
let (sender, receiver) = create::<u32>(5, Duration::from_millis(20));
let receiver2 = receiver.clone();
let mutex = Arc::new(Mutex::new(false));
let rx_mutex = mutex.clone();
let rx = thread::spawn(move || {
let mut guard = rx_mutex.lock().unwrap();
*guard = true;
drop(guard);
match receiver2.receive_await_timeout(Duration::from_millis(20)) {
Ok(_) => assert!(true),
e => assert!(false, "Error {:?} when receive.", e),
};
});
loop {
let guard = mutex.lock().unwrap();
if *guard == true {
break;
}
}
let tx = thread::spawn(move || {
match sender.send_await_timeout(1, Duration::from_millis(20)) {
Ok(_) => assert!(true),
e => assert!(false, "Error {:?} when receive.", e),
};
});
tx.join().unwrap();
rx.join().unwrap();
assert_eq!(1, receiver.sent());
assert_eq!(1, receiver.received());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
}
#[test]
fn test_receive_concurrent_send() {
let (sender, receiver) = create::<u32>(5, Duration::from_millis(20));
let receiver2 = receiver.clone();
let pair = Arc::new((Mutex::new((false, false)), Condvar::new()));
let rx_pair = pair.clone();
let tx_pair = pair.clone();
let rx = thread::spawn(move || {
let mut guard = rx_pair.0.lock().unwrap();
guard.0 = true;
let c_guard = rx_pair.1.wait(guard).unwrap();
drop(c_guard);
match receiver2.receive_await_timeout(Duration::from_millis(20)) {
Ok(_) => assert!(true),
e => assert!(false, "Error {:?} when receive.", e),
};
});
let tx = thread::spawn(move || {
let mut guard = tx_pair.0.lock().unwrap();
guard.1 = true;
let c_guard = tx_pair.1.wait(guard).unwrap();
drop(c_guard);
match sender.send_await_timeout(1 as u32, Duration::from_millis(20)) {
Ok(_) => assert!(true),
e => assert!(false, "Error {:?} when receive.", e),
};
});
loop {
let guard = pair.0.lock().unwrap();
if guard.0 && guard.1 {
break;
}
}
let guard = pair.0.lock().unwrap();
pair.1.notify_all();
drop(guard);
tx.join().unwrap();
rx.join().unwrap();
assert_eq!(1, receiver.sent());
assert_eq!(1, receiver.received());
assert_eq!(0, receiver.pending());
assert_eq!(0, receiver.receivable());
}
fn counted_sender<T: Sync + Send + Clone + 'static>(
sender: SeccSender<T>,
pair: Arc<(Mutex<bool>, Condvar)>,
message: T,
count: usize,
) -> JoinHandle<()> {
thread::spawn(move || {
let (ref mutex, ref condvar) = &*pair;
let mut started = mutex.lock().unwrap();
while !*started {
started = condvar.wait(started).unwrap();
}
drop(started);
while sender.sent() < count {
let _ = sender.send_await_timeout(message.clone(), Duration::from_millis(10));
}
})
}
fn counted_receiver<T: Sync + Send + Clone + 'static>(
receiver: SeccReceiver<T>,
pair: Arc<(Mutex<bool>, Condvar)>,
count: usize,
) -> JoinHandle<()> {
thread::spawn(move || {
let (ref mutex, ref condvar) = &*pair;
let mut started = mutex.lock().unwrap();
while !*started {
started = condvar.wait(started).unwrap();
}
drop(started);
while receiver.received() < count {
let _ = receiver.receive_await_timeout(Duration::from_millis(10));
}
})
}
fn multiple_thread_helper<T: Sync + Send + Clone + 'static>(
receiver_count: u8,
sender_count: u8,
message_count: usize,
time_limit: Duration,
message: T,
) {
let (sender, receiver) = create::<T>(10, Duration::from_millis(1));
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let total_thread_count: usize = receiver_count as usize + sender_count as usize;
let mut handles: Vec<JoinHandle<()>> = Vec::with_capacity(total_thread_count);
for _ in 0..receiver_count {
handles.push(counted_receiver(
receiver.clone(),
pair.clone(),
message_count,
));
}
for _ in 0..sender_count {
handles.push(counted_sender(
sender.clone(),
pair.clone(),
message.clone(),
message_count,
));
}
thread::sleep(Duration::from_millis(10));
let (ref mutex, ref condvar) = &*pair;
let mut started = mutex.lock().unwrap();
*started = true;
condvar.notify_all();
drop(started);
let start = Instant::now();
while sender.sent() < message_count && receiver.received() < message_count {
if Instant::elapsed(&start) > time_limit {
panic!("Test took more than {:?} ms to run!", time_limit);
}
}
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_multiple_receiver_single_sender() {
multiple_thread_helper(2, 1, 10_000, Duration::from_millis(1000), 7 as u32);
}
#[test]
fn test_multiple_sender_single_receiver() {
multiple_thread_helper(1, 3, 10_000, Duration::from_millis(1000), 7 as u32);
}
#[test]
fn test_multiple_receiver_multiple_sender() {
multiple_thread_helper(3, 3, 10_000, Duration::from_millis(1000), 7 as u32);
}
}