use fibre::error::RecvError;
use fibre::mpmc;
use std::{
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
sync::Arc,
thread,
time::Duration,
};
mod common_async;
fn main() {
let capacity = 4;
let num_producers = 2;
let num_consumers = 2;
let messages_per_producer = 3;
let total_messages = num_producers * messages_per_producer;
println!("--- MPMC: Sync Senders, Sync Receivers ---");
{
let (tx, rx) = mpmc::bounded::<String>(capacity);
let received_count = Arc::new(AtomicUsize::new(0));
let all_sent = Arc::new(AtomicBool::new(false));
let mut producer_handles = Vec::new();
for p_id in 0..num_producers {
let tx_clone = tx.clone();
producer_handles.push(thread::spawn(move || {
for m_id in 0..messages_per_producer {
let msg = format!("SyncMPMC-P{}-M{}", p_id, m_id);
println!("[Sync Sender {}] Sending: {}", p_id, msg);
if tx_clone.send(msg).is_err() {
break;
}
thread::sleep(Duration::from_millis(10));
}
}));
}
drop(tx);
let mut consumer_handles = Vec::new();
for c_id in 0..num_consumers {
let rx_clone = rx.clone();
let received_count_clone = Arc::clone(&received_count);
let all_sent_clone = Arc::clone(&all_sent);
consumer_handles.push(thread::spawn(move || {
loop {
match rx_clone.recv() {
Ok(value) => {
println!("[Sync Receiver {}] Received: {}", c_id, value);
received_count_clone.fetch_add(1, Ordering::Relaxed);
}
Err(RecvError::Disconnected) => {
if received_count_clone.load(Ordering::Relaxed) < total_messages
&& !all_sent_clone.load(Ordering::Relaxed)
{
println!("[Sync Receiver {}] Disconnected prematurely.", c_id);
} else {
println!("[Sync Receiver {}] Disconnected (expected).", c_id);
}
break;
}
}
}
}));
}
drop(rx);
for handle in producer_handles {
handle.join().unwrap();
}
all_sent.store(true, Ordering::Relaxed); for handle in consumer_handles {
handle.join().unwrap();
}
assert_eq!(received_count.load(Ordering::Relaxed), total_messages);
}
println!("\n--- MPMC: Async Senders, Async Receivers ---");
common_async::run_async(async {
let (tx, rx) = mpmc::bounded_async::<String>(capacity);
let received_count = Arc::new(AtomicUsize::new(0));
let mut producer_handles = Vec::new();
for p_id in 0..num_producers {
let tx_clone = tx.clone();
producer_handles.push(tokio::spawn(async move {
for m_id in 0..messages_per_producer {
let msg = format!("AsyncMPMC-P{}-M{}", p_id, m_id);
println!("[Async Sender {}] Sending: {}", p_id, msg);
if tx_clone.send(msg).await.is_err() {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}));
}
drop(tx);
let mut consumer_handles = Vec::new();
for c_id in 0..num_consumers {
let rx_clone = rx.clone();
let received_count_clone = Arc::clone(&received_count);
consumer_handles.push(tokio::spawn(async move {
while let Ok(value) = rx_clone.recv().await {
println!("[Async Receiver {}] Received: {}", c_id, value);
received_count_clone.fetch_add(1, Ordering::Relaxed);
}
println!("[Async Receiver {}] Disconnected.", c_id);
}));
}
drop(rx);
for handle in producer_handles {
handle.await.unwrap();
}
for handle in consumer_handles {
handle.await.unwrap();
}
assert_eq!(received_count.load(Ordering::Relaxed), total_messages);
});
println!("\n--- MPMC: Sync Senders (Threads) to Async Receivers ---");
common_async::run_async(async {
let (tx_async, rx_async) = mpmc::bounded_async::<String>(capacity);
let received_count = Arc::new(AtomicUsize::new(0));
let mut producer_handles = Vec::new();
for p_id in 0..num_producers {
let tx_sync_converted = tx_async.clone().to_sync(); producer_handles.push(thread::spawn(move || {
for m_id in 0..messages_per_producer {
let msg = format!("SyncToAsyncMPMC-P{}-M{}", p_id, m_id);
if tx_sync_converted.send(msg).is_err() {
break;
}
}
}));
}
drop(tx_async);
let mut consumer_handles = Vec::new();
for _c_id in 0..num_consumers {
let rx_clone = rx_async.clone();
let received_count_clone = Arc::clone(&received_count);
consumer_handles.push(tokio::spawn(async move {
while let Ok(_value) = rx_clone.recv().await {
received_count_clone.fetch_add(1, Ordering::Relaxed);
}
}));
}
drop(rx_async);
for handle in producer_handles {
handle.join().unwrap();
}
for handle in consumer_handles {
handle.await.unwrap();
}
assert_eq!(received_count.load(Ordering::Relaxed), total_messages);
});
println!("\n--- MPMC: Async Senders to Sync Receivers ---");
{
let (tx_async, rx_async) = mpmc::bounded_async::<String>(capacity);
let received_count = Arc::new(AtomicUsize::new(0));
let producers_finished_count = Arc::new(AtomicUsize::new(0));
let mut os_thread_handles: Vec<thread::JoinHandle<()>> = Vec::new();
for p_id in 0..num_producers {
let tx_clone = tx_async.clone();
let producers_finished_clone = producers_finished_count.clone();
os_thread_handles.push(thread::spawn(move || {
common_async::block_on_tokio_task(async move {
for m_id in 0..messages_per_producer {
let msg = format!("AsyncToSyncMPMC-P{}-M{}", p_id, m_id);
println!("[Async Sender {} in OS Thread] Sending: {}", p_id, msg);
if tx_clone.send(msg).await.is_err() {
println!(
"[Async Sender {} in OS Thread] Receiver side closed.",
p_id
);
break;
}
tokio::time::sleep(Duration::from_millis(5)).await; }
producers_finished_clone.fetch_add(1, Ordering::Relaxed);
println!("[Async Sender {} in OS Thread] Done sending.", p_id);
});
}));
}
let mut consumer_handles = Vec::new();
for c_id in 0..num_consumers {
let rx_sync_converted = rx_async.clone().to_sync(); let received_count_clone = Arc::clone(&received_count);
let producers_finished_count_clone = Arc::clone(&producers_finished_count); consumer_handles.push(thread::spawn(move || {
loop {
match rx_sync_converted.recv() {
Ok(value) => {
println!("[Sync Receiver {}] Received: {}", c_id, value);
received_count_clone.fetch_add(1, Ordering::Relaxed);
}
Err(RecvError::Disconnected) => {
if producers_finished_count_clone.load(Ordering::Relaxed) == num_producers {
println!("[Sync Receiver {}] Disconnected (all producers finished).", c_id);
} else {
println!("[Sync Receiver {}] Disconnected (producers might not be done, or channel issue).", c_id);
}
break;
}
}
}
}));
}
drop(rx_async);
for handle in os_thread_handles {
handle.join().unwrap();
}
drop(tx_async);
for handle in consumer_handles {
handle.join().unwrap();
}
assert_eq!(
received_count.load(Ordering::Relaxed),
total_messages,
"Mismatch in received items count"
);
assert_eq!(
producers_finished_count.load(Ordering::Relaxed),
num_producers,
"Not all producers signaled completion"
);
println!("[Main Thread] Async Senders to Sync Receivers example finished.");
}
}