pub mod movable;
pub mod zero_copy;
#[cfg(any(test,doc))]
mod tests {
use crate::{
uni::channels::movable,
prelude::advanced::{
ChannelCommon,
ChannelUni,
ChannelProducer,
ChannelUniZeroCopyAtomic,
ChannelUniZeroCopyFullSync,
},
};
use std::{fmt::Debug, future::Future, sync::{Arc, atomic::{AtomicU32, Ordering::Relaxed}}, time::Duration, io::Write};
use futures::{stream, StreamExt};
use minstant::Instant;
#[ctor::ctor]
fn suite_setup() {
simple_logger::SimpleLogger::new().with_utc_timestamps().init().unwrap_or_else(|_| eprintln!("--> LOGGER WAS ALREADY STARTED"));
}
macro_rules! doc_test {
($fn_name: tt, $uni_channel_type: ty) => {
#[cfg_attr(not(doc),tokio::test)]
async fn $fn_name() {
let channel = <$uni_channel_type>::new("doc_test");
let (mut stream, _stream_id) = channel.create_stream();
let send_result = channel.send("a").is_ok();
assert!(send_result, "Event couldn't be sent!");
exec_future(stream.next(), "receiving", 1.0, true).await;
}
}
}
doc_test!(movable_atomic_queue_doc_test, movable::atomic::Atomic<&str, 1024, 1>);
doc_test!(movable_crossbeam_channel_doc_test, movable::crossbeam::Crossbeam<&str, 1024, 1>);
doc_test!(movable_full_sync_queue_doc_test, movable::full_sync::FullSync<&str, 1024, 1>);
doc_test!(zero_copy_atomic_queue_doc_test, ChannelUniZeroCopyAtomic<&str, 1024, 2>);
doc_test!(zero_copy_full_sync_queue_doc_test, ChannelUniZeroCopyFullSync<&str, 1024, 2>);
macro_rules! sending_droppable_types {
($fn_name: tt, $uni_channel_type: ty) => {
#[cfg_attr(not(doc),tokio::test)]
async fn $fn_name() {
let payload = String::from("Hey, it worked!");
let channel = <$uni_channel_type>::new("test sending droppable types");
let (mut stream, _stream_id) = channel.create_stream();
let send_result = channel.send(payload.clone()).is_ok();
assert!(send_result, "Event couldn't be `send()`");
let observed = exec_future(stream.next(), "receiving", 1.0, true).await;
assert_eq!(observed.unwrap(), payload, "Wrong payloads using `send_with()`");
let send_result = channel.send_with(|slot| unsafe { std::ptr::write(slot, payload.clone()); }).is_ok();
assert!(send_result, "Event couldn't be `send_with()`");
let observed = exec_future(stream.next(), "receiving", 1.0, true).await;
assert_eq!(observed.unwrap(), payload, "Wrong payloads using `send_with()`");
}
}
}
sending_droppable_types!(movable_atomic_queue_sending_droppable_types, movable::atomic::Atomic<String, 1024, 1>);
sending_droppable_types!(movable_crossbeam_channel_sending_droppable_types, movable::crossbeam::Crossbeam<String, 1024, 1>);
sending_droppable_types!(movable_full_sync_queue_sending_droppable_types, movable::full_sync::FullSync<String, 1024, 1>);
sending_droppable_types!(zero_copy_atomic_queue_sending_droppable_types, ChannelUniZeroCopyAtomic<String, 1024, 2>);
sending_droppable_types!(zero_copy_full_sync_queue_sending_droppable_types, ChannelUniZeroCopyFullSync<String, 1024, 2>);
macro_rules! dropping {
($fn_name: tt, $uni_channel_type: ty) => {
#[cfg_attr(not(doc),tokio::test)]
async fn $fn_name() {
{
print!("Dropping the channel before the stream consumes the element: ");
let channel = <$uni_channel_type>::new("dropping");
assert_eq!(Arc::strong_count(&channel), 1, "Sanity check on reference counting");
let (mut stream_1, _stream_id) = channel.create_stream();
let (stream_2, _stream_id) = channel.create_stream();
assert_eq!(Arc::strong_count(&channel), 3, "Creating each stream should increase the ref count by 1");
channel.send_with(|slot| *slot = "a").expect_ok("couldn't send");
exec_future(stream_1.next(), "receiving", 1.0, true).await;
drop(stream_1);
drop(stream_2);
assert_eq!(Arc::strong_count(&channel), 1, "The internal streams manager reference counting should be 1 at this point, as we are the only holders by now");
drop(channel);
}
{
print!("Dropping the stream before the channel produces something, then another stream is created to consume the element: ");
let channel = <$uni_channel_type>::new("dropping");
let (stream, _stream_id) = channel.create_stream();
assert_eq!(Arc::strong_count(&channel), 2, "`channel` + `stream` + `local ref`: reference count should be 2");
drop(stream);
assert_eq!(Arc::strong_count(&channel), 1, "Dropping a stream should decrease the ref count by 1");
let (mut stream, _stream_id) = channel.create_stream();
assert_eq!(Arc::strong_count(&channel), 2, "1 `channel` + 1 `stream` again, at this point: reference count should be 2");
channel.send_with(|slot| *slot = "a").expect_ok("couldn't send");
exec_future(stream.next(), "receiving", 1.0, true).await;
drop(stream);
assert_eq!(Arc::strong_count(&channel), 1, "The internal streams manager reference counting should be 1 at this point, as we are the only holders by now");
drop(channel);
}
}
}
}
dropping!(movable_atomic_queue_dropping, movable::atomic::Atomic<&str, 1024, 2>);
dropping!(movable_crossbeam_queue_dropping, movable::crossbeam::Crossbeam<&str, 1024, 2>);
dropping!(movable_full_sync_queue_dropping, movable::full_sync::FullSync<&str, 1024, 2>);
dropping!(zero_copy_atomic_queue_dropping, ChannelUniZeroCopyAtomic<&str, 1024, 2>);
dropping!(zero_copy_full_sync_queue_dropping, ChannelUniZeroCopyFullSync<&str, 1024, 2>);
const PARALLEL_STREAMS: usize = 128; macro_rules! parallel_streams {
($fn_name: tt, $uni_channel_type: ty) => {
#[cfg_attr(not(doc),tokio::test)]
async fn $fn_name() {
let channel = Arc::new(<$uni_channel_type>::new("parallel streams"));
let mut streams = stream::iter(0..PARALLEL_STREAMS)
.then(|_i| {
let channel = channel.clone();
async move {
let (stream, _stream_id) = channel.create_stream();
stream
}
})
.collect::<Vec<_>>().await;
for i in 0..PARALLEL_STREAMS as u32 {
channel.send_with(|slot| *slot = i)
.retry_with(|setter| channel.send_with(setter))
.spinning_forever();
}
for i in 0..PARALLEL_STREAMS as u32 {
let item = exec_future(streams[i as usize].next(), &format!("receiving on stream #{i}"), 1.0, true).await;
let observed = item.expect("item should not be none");
assert_eq!(observed, i, "Stream #{i} didn't produce item {i}")
}
for i in 0..PARALLEL_STREAMS {
match tokio::time::timeout(Duration::from_millis(10), streams[i as usize].next()).await {
Ok(item) => panic!("All items should have timed out now, because there are no more elements on the stream -- but stream #{i} produced item {:?}", item),
Err(_) => {},
}
}
}
}
}
parallel_streams!(movable_atomic_parallel_streams, movable::atomic::Atomic<u32, 1024, PARALLEL_STREAMS>);
parallel_streams!(movable_crossbeam_parallel_streams, movable::crossbeam::Crossbeam<u32, 1024, PARALLEL_STREAMS>);
parallel_streams!(movable_full_sync_parallel_streams, movable::full_sync::FullSync<u32, 1024, PARALLEL_STREAMS>);
parallel_streams!(zero_copy_atomic_parallel_streams, ChannelUniZeroCopyAtomic<u32, 1024, PARALLEL_STREAMS>);
parallel_streams!(zero_copy_full_sync_parallel_streams, ChannelUniZeroCopyFullSync<u32, 1024, PARALLEL_STREAMS>);
macro_rules! async_sending {
($fn_name: tt, $uni_channel_type: ty) => {
#[cfg_attr(not(doc),tokio::test)]
async fn $fn_name() {
let channel = Arc::new(<$uni_channel_type>::new("async sending"));
let series_size = channel.buffer_size();
let expected_sum = (1 + series_size) * (series_size / 2);
let observed_sum = Arc::new(AtomicU32::new(0));
let (mut stream, _stream_id) = channel.create_stream();
let cloned_observed_sum = observed_sum.clone();
let receiver_task = tokio::spawn(async move {
while let Some(received_number) = exec_future(stream.next(), "receiving an item produced by `send_with_async()`", 1.0, false).await {
use std::borrow::Borrow; cloned_observed_sum.fetch_add(*received_number.borrow(), Relaxed);
}
});
let latch = Arc::new(tokio::sync::RwLock::new(()));
let cloned_latch = latch.clone();
let lock = cloned_latch.write().await;
for i in 0..series_size {
let latch = Arc::clone(&latch);
let channel = Arc::clone(&channel);
tokio::spawn(async move {
channel.send_with_async(move |slot| {
async move {
_ = latch.read().await;
*slot = i+1;
slot
}
}).await.expect_ok("couldn't send");
});
}
tokio::spawn(async move {
_ = latch.read().await;
let timeout = Duration::from_millis(900);
tokio::time::sleep(timeout).await;
channel.gracefully_end_all_streams(timeout).await;
});
assert_eq!(observed_sum.load(Relaxed), 0, "Sanity check failed: no items should have been received until the latch is released");
drop(lock);
receiver_task.await.expect("Error waiting for the receiver task to finish");
assert_eq!(observed_sum.load(Relaxed), expected_sum, "Async sending is not working");
}
}
}
async_sending!(movable_atomic_async_sending, movable::atomic::Atomic<u32, 1024, 1>);
async_sending!(movable_crossbeam_async_sending, movable::crossbeam::Crossbeam<u32, 1024, 1>);
async_sending!(movable_full_sync_async_sending, movable::full_sync::FullSync<u32, 1024, 1>);
async_sending!(zero_copy_atomic_async_sending, ChannelUniZeroCopyAtomic<u32, 1024, 1>);
async_sending!(zero_copy_full_sync_async_sending, ChannelUniZeroCopyFullSync<u32, 1024, 1>);
macro_rules! allocation_semantics_alloc_and_send {
($fn_name: tt, $uni_channel_type: ty) => {
#[cfg_attr(not(doc),tokio::test)]
async fn $fn_name() {
let channel = Arc::new(<$uni_channel_type>::new("allocation_semantics_alloc_and_send"));
let series_size = channel.buffer_size();
let expected_sum = (1 + series_size) * (series_size / 2);
let observed_sum = Arc::new(AtomicU32::new(0));
let (mut stream, _stream_id) = channel.create_stream();
let cloned_observed_sum = observed_sum.clone();
tokio::spawn(async move {
while let Some(received_number) = exec_future(stream.next(), &format!("receiving an item produced by `.reserve_slot()` / `.send_reserved()`"), 1.0, false).await {
use std::borrow::Borrow; cloned_observed_sum.fetch_add(*received_number.borrow(), Relaxed);
}
});
let perform_pass = |pass_n| {
let reserved_slots: Vec<&mut u32> = (0..channel.buffer_size())
.map(|i| channel.reserve_slot().unwrap_or_else(|| panic!("We ran out of slots at #{i}, pass {pass_n}")))
.collect();
for (i, slot) in reserved_slots.into_iter().enumerate() {
*slot = (i+1) as u32;
assert!(channel.try_send_reserved(slot), "couldn't send a reserved slot on the first try -- we should, as there is no concurrency in place");
}
};
let timeout = Duration::from_millis(10);
perform_pass(1);
tokio::time::sleep(timeout).await;
assert_eq!(observed_sum.load(Relaxed), expected_sum, "1st pass didn't produce the expected sum of items -- meaning not all items were produced/received");
perform_pass(2);
tokio::time::sleep(timeout).await;
assert_eq!(observed_sum.load(Relaxed), 2*expected_sum, "2nd pass (+ 1st pass) didn't produce the expected sum of items -- meaning, on the 2nd pass, not all items were produced/received");
}
}
}
allocation_semantics_alloc_and_send!(movable_atomic_allocation_semantics_alloc_and_send, movable::atomic::Atomic<u32, 1024, 1>);
allocation_semantics_alloc_and_send!(zero_copy_atomic_allocation_semantics_alloc_and_send, ChannelUniZeroCopyAtomic<u32, 1024, 1>);
allocation_semantics_alloc_and_send!(zero_copy_full_sync_allocation_semantics_alloc_and_send, ChannelUniZeroCopyFullSync<u32, 1024, 1>);
macro_rules! allocation_semantics_alloc_and_give_up {
($fn_name: tt, $uni_channel_type: ty) => {
#[cfg_attr(not(doc),tokio::test)]
async fn $fn_name() {
let channel = Arc::new(<$uni_channel_type>::new("allocation_semantics_alloc_and_give_up"));
let series_size = channel.buffer_size() + 2;
let expected_sum = (1 + series_size) * (series_size / 2);
let observed_sum = Arc::new(AtomicU32::new(0));
let (mut stream, _stream_id) = channel.create_stream();
let cloned_observed_sum = observed_sum.clone();
tokio::spawn(async move {
while let Some(received_number) = exec_future(stream.next(), &format!("receiving an item produced by `.reserve_slot()` / `.send_reserved()`"), 1.0, false).await {
use std::borrow::Borrow; cloned_observed_sum.fetch_add(*received_number.borrow(), Relaxed);
}
});
let publish = |pass, n, val| {
debug_assert!(n >= 2, "`n` should be, at least, 2");
let mut reserved_slots: Vec<&mut u32> = (0..n)
.map(|i| channel.reserve_slot().unwrap_or_else(|| panic!("We ran out of slots at #{val} (i={i}), pass {pass}, pending_items={}", channel.pending_items_count())))
.collect();
let slot = reserved_slots.remove(0);
*slot = val;
assert!(channel.try_send_reserved(slot), "couldn't send a reserved slot on the first try -- we should, as there is no concurrency in place");
for slot in reserved_slots.into_iter().rev() {
assert!(channel.try_cancel_slot_reserve(slot), "couldn't cancel a slot reservation on the first try -- we should, as there is no concurrency in place & we are cancelling reservations in the reversed order");
}
};
let timeout = Duration::from_millis(10);
for i in 0..series_size {
publish(1, 2, i+1);
if i >= channel.buffer_size()-2 {
channel.flush(Duration::from_millis(1)).await;
}
}
assert_eq!(channel.flush(timeout).await, 0, "Couldn't flush pending items withing {timeout:?} @ pass #1");
assert_eq!(observed_sum.load(Relaxed), expected_sum, "1st pass didn't produce the expected sum of items -- meaning not all items were produced/received");
for i in 0..series_size {
publish(2, channel.buffer_size(), i+1);
channel.flush(Duration::from_millis(1)).await;
}
assert_eq!(channel.flush(timeout).await, 0, "Couldn't flush pending items withing {timeout:?} @ pass #2");
assert_eq!(observed_sum.load(Relaxed), 2*expected_sum, "2nd pass (+ 1st pass) didn't produce the expected sum of items -- meaning, on the 2nd pass, not all items were produced/received");
}
}
}
allocation_semantics_alloc_and_give_up!(movable_atomic_allocation_semantics_alloc_and_give_up, movable::atomic::Atomic<u32, 1024, 1>);
allocation_semantics_alloc_and_give_up!(zero_copy_atomic_allocation_semantics_alloc_and_give_up, ChannelUniZeroCopyAtomic<u32, 1024, 1>);
allocation_semantics_alloc_and_give_up!(zero_copy_full_sync_allocation_semantics_alloc_and_give_up, ChannelUniZeroCopyFullSync<u32, 1024, 1>);
#[cfg_attr(not(doc),tokio::test(flavor="multi_thread", worker_threads=2))]
#[ignore] async fn performance_measurements() {
const BUFFER_SIZE: usize = 1<<14;
#[cfg(not(debug_assertions))]
const FACTOR: u32 = 1024;
#[cfg(debug_assertions)]
const FACTOR: u32 = 20;
macro_rules! profile_same_task_same_thread_channel {
($channel: expr, $profiling_name: literal, $count: expr) => {
let channel = $channel;
let profiling_name = $profiling_name;
let count = $count;
let (mut stream, _stream_id) = channel.create_stream();
print!("{} (same task / same thread): ", profiling_name);
std::io::stdout().flush().unwrap();
let start = Instant::now();
for e in 0..count {
channel.send_with(|slot| *slot = e).expect_ok("couldn't send");
if let Some(consumed_e) = stream.next().await {
assert_eq!(consumed_e, e, "{profiling_name}: produced and consumed items differ");
} else {
panic!("{profiling_name}: Stream didn't provide expected item {e}");
}
}
let elapsed = start.elapsed();
println!("{:12.2}/s -- {} items processed in {:?}",
count as f64 / elapsed.as_secs_f64(),
count,
elapsed);
}
}
macro_rules! profile_different_task_same_thread_channel {
($channel: expr, $profiling_name: literal, $count: expr) => {
let channel = $channel;
let profiling_name = $profiling_name;
let count = $count;
let (mut stream, _stream_id) = channel.create_stream();
let sender_future = async {
for e in 0..count {
channel.send_with(|slot| *slot = e)
.retry_with_async(|setter| core::future::ready(channel.send_with(setter)))
.yielding_forever()
.await; }
channel.gracefully_end_all_streams(Duration::from_secs(1)).await;
};
let receiver_future = async {
let mut counter = 0;
while let Some(_e) = stream.next().await {
counter += 1;
if counter == count {
break;
}
}
};
print!("{} (different task / same thread): ", profiling_name);
std::io::stdout().flush().unwrap();
let start = Instant::now();
tokio::join!(sender_future, receiver_future);
let elapsed = start.elapsed();
println!("{:12.2}/s -- {} items processed in {:?}",
count as f64 / elapsed.as_secs_f64(),
count,
elapsed);
}
}
macro_rules! profile_different_task_different_thread_channel {
($channel: expr, $profiling_name: literal, $count: expr) => {
let channel = $channel;
let profiling_name = $profiling_name;
let count = $count;
let (mut stream, _stream_id) = channel.create_stream();
let sender_task = tokio::task::spawn_blocking(move || {
for e in 0..count {
channel.send_with(|slot| *slot = e)
.retry_with(|setter| channel.send_with(setter))
.spinning_forever();
}
channel.cancel_all_streams();
});
let receiver_task = tokio::spawn(
exec_future(async move {
let mut counter = 0;
while let Some(_e) = stream.next().await {
counter += 1;
if counter == count {
break;
}
}
},
format!("Different Task & Thread consumer for '{}'", $profiling_name),
300.0,
false)
);
print!("{} (different task / different thread): ", profiling_name);
std::io::stdout().flush().unwrap();
let start = Instant::now();
let (receiver_result, sender_result) = tokio::join!(receiver_task, sender_task);
receiver_result.expect("receiver task");
sender_result.expect("sender task");
let elapsed = start.elapsed();
println!("{:12.2}/s -- {} items processed in {:?}",
count as f64 / elapsed.as_secs_f64(),
count,
elapsed);
}
}
println!();
profile_same_task_same_thread_channel!(movable::atomic::Atomic::<u32, BUFFER_SIZE, 1>::new(""), "Movable Atomic ", FACTOR*BUFFER_SIZE as u32);
profile_different_task_same_thread_channel!(movable::atomic::Atomic::<u32, BUFFER_SIZE, 1>::new(""), "Movable Atomic ", FACTOR*BUFFER_SIZE as u32);
profile_different_task_different_thread_channel!(movable::atomic::Atomic::<u32, BUFFER_SIZE, 1>::new(""), "Movable Atomic ", FACTOR*BUFFER_SIZE as u32);
profile_same_task_same_thread_channel!(movable::crossbeam::Crossbeam::<u32, BUFFER_SIZE, 1>::new(""), "Movable Crossbeam ", FACTOR*BUFFER_SIZE as u32);
profile_different_task_same_thread_channel!(movable::crossbeam::Crossbeam::<u32, BUFFER_SIZE, 1>::new(""), "Movable Crossbeam ", FACTOR*BUFFER_SIZE as u32);
profile_different_task_different_thread_channel!(movable::crossbeam::Crossbeam::<u32, BUFFER_SIZE, 1>::new(""), "Movable Crossbeam ", FACTOR*BUFFER_SIZE as u32);
profile_same_task_same_thread_channel!(movable::full_sync::FullSync::<u32, BUFFER_SIZE, 1>::new(""), "Movable FullSync ", FACTOR*BUFFER_SIZE as u32);
profile_different_task_same_thread_channel!(movable::full_sync::FullSync::<u32, BUFFER_SIZE, 1>::new(""), "Movable FullSync ", FACTOR*BUFFER_SIZE as u32);
profile_different_task_different_thread_channel!(movable::full_sync::FullSync::<u32, BUFFER_SIZE, 1>::new(""), "Movable FullSync ", FACTOR*BUFFER_SIZE as u32);
profile_same_task_same_thread_channel!(ChannelUniZeroCopyAtomic::<u32, BUFFER_SIZE, 1>::new(""), "Zero-Copy Atomic ", FACTOR*BUFFER_SIZE as u32);
profile_different_task_same_thread_channel!(ChannelUniZeroCopyAtomic::<u32, BUFFER_SIZE, 1>::new(""), "Zero-Copy Atomic ", FACTOR*BUFFER_SIZE as u32);
profile_different_task_different_thread_channel!(ChannelUniZeroCopyAtomic::<u32, BUFFER_SIZE, 1>::new(""), "Zero-Copy Atomic ", FACTOR*BUFFER_SIZE as u32);
profile_same_task_same_thread_channel!(ChannelUniZeroCopyFullSync::<u32, BUFFER_SIZE, 1>::new(""), "Zero-Copy FullSync ", FACTOR*BUFFER_SIZE as u32);
profile_different_task_same_thread_channel!(ChannelUniZeroCopyFullSync::<u32, BUFFER_SIZE, 1>::new(""), "Zero-Copy FullSync ", FACTOR*BUFFER_SIZE as u32);
profile_different_task_different_thread_channel!(ChannelUniZeroCopyFullSync::<u32, BUFFER_SIZE, 1>::new(""), "Zero-Copy FullSync ", FACTOR*BUFFER_SIZE as u32);
}
async fn exec_future<Output: Debug,
FutureType: Future<Output=Output>,
IntoString: Into<String>>
(fut: FutureType, operation_name: IntoString, timeout_secs: f64, verbose: bool) -> Output {
let operation_name = operation_name.into();
let timeout = Duration::from_secs_f64(timeout_secs);
match tokio::time::timeout(timeout, fut).await {
Ok(non_timed_out_result) => {
if verbose {
println!("{operation_name}: {:?}", non_timed_out_result);
}
non_timed_out_result
},
Err(_time_out_err) => {
let msg = format!("\"{operation_name}\" has TIMED OUT: more than {:?} had passed while waiting the Future to complete", timeout);
println!("{}", msg);
panic!("{}", msg);
}
}
}
}