use fibre::error::RecvError;
use fibre::spsc;
use std::thread;
mod common_async;
fn main() {
let capacity = 2;
println!("--- SPSC: Sync Sender, Sync Receiver ---");
{
let (tx, rx) = spsc::bounded_sync::<String>(capacity);
let sender_handle = thread::spawn(move || {
for i in 0..5 {
let msg = format!("SyncSPSC-{}", i);
println!("[Sync Send Thread] Sending: {}", msg);
if tx.send(msg).is_err() {
println!("[Sync Send Thread] Receiver dropped.");
break;
}
thread::yield_now();
}
println!("[Sync Send Thread] Done sending.");
});
let receiver_handle = thread::spawn(move || {
for i in 0..5 {
match rx.recv() {
Ok(value) => println!(
"[Sync Recv Thread] Received: {} (expected SyncSPSC-{})",
value, i
),
Err(RecvError::Disconnected) => {
println!("[Sync Recv Thread] Sender dropped.");
break;
}
}
}
println!("[Sync Recv Thread] Done receiving.");
});
sender_handle.join().unwrap();
receiver_handle.join().unwrap();
}
println!("\n--- SPSC: Async Sender, Async Receiver ---");
common_async::run_async(async {
let (tx, rx) = spsc::bounded_async::<String>(capacity);
tokio::spawn(async move {
for i in 0..5 {
let msg = format!("AsyncSPSC-{}", i);
println!("[Async Send Task] Sending: {}", msg);
if tx.send(msg).await.is_err() {
println!("[Async Send Task] Receiver dropped.");
break;
}
tokio::task::yield_now().await;
}
println!("[Async Send Task] Done sending.");
});
for i in 0..5 {
match rx.recv().await {
Ok(value) => println!(
"[Async Recv Task] Received: {} (expected AsyncSPSC-{})",
value, i
),
Err(RecvError::Disconnected) => {
println!("[Async Recv Task] Sender dropped.");
break;
}
}
}
println!("[Async Recv Task] Done receiving.");
});
println!("\n--- SPSC: Sync Sender (Thread) to Async Receiver ---");
common_async::run_async(async {
let (tx_s, rx_s) = spsc::bounded_sync::<String>(capacity);
let rx_a = rx_s.to_async();
let sender_thread = thread::spawn(move || {
for i in 0..3 {
let msg = format!("SyncToAsyncSPSC-{}", i);
println!("[Sync Sender Thread] Sending: {}", msg);
if tx_s.send(msg).is_err() {
break;
}
thread::yield_now();
}
println!("[Sync Sender Thread] Done sending.");
});
println!("[Async Receiver Task] Waiting for messages...");
for i in 0..3 {
match rx_a.recv().await {
Ok(val) => println!(
"[Async Receiver Task] Received: {} (expected SyncToAsyncSPSC-{})",
val, i
),
Err(e) => {
println!("[Async Receiver Task] Error: {:?}", e);
break;
}
}
}
println!("[Async Receiver Task] Done receiving.");
sender_thread.join().unwrap();
});
println!("\n--- SPSC: Async Sender to Sync Receiver ---");
{
let (tx_a, rx_a) = spsc::bounded_async::<String>(capacity);
let rx_s = rx_a.to_sync();
let sender_os_thread_handle = thread::spawn(move || {
common_async::block_on_tokio_task(async move {
for i in 0..3 {
let msg = format!("AsyncToSyncSPSC-{}", i);
println!("[Async Sender Task in OS Thread] Sending: {}", msg);
if tx_a.send(msg).await.is_err() {
break;
}
tokio::task::yield_now().await;
}
println!("[Async Sender Task in OS Thread] Done sending.");
});
});
println!("[Sync Receiver Thread] Waiting for messages...");
for i in 0..3 {
match rx_s.recv() {
Ok(val) => println!(
"[Sync Receiver Thread] Received: {} (expected AsyncToSyncSPSC-{})",
val, i
),
Err(e) => {
println!("[Sync Receiver Thread] Error: {:?}", e);
break;
}
}
}
println!("[Sync Receiver Thread] Done receiving.");
sender_os_thread_handle.join().unwrap();
}
println!("\nSPSC examples finished.");
}