fibre 0.5.5

High-performance, safe, memory-efficient sync/async channels built for real-time, low-overhead communication in concurrent Rust applications.
Documentation
use super::*;
use crate::mpsc::{bounded, bounded_async};
use std::collections::HashSet;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

#[test]
fn sync_send_recv() {
  let (tx, rx) = bounded(2);
  tx.send(1).unwrap();
  tx.send(2).unwrap();
  assert!(tx.is_full());
  assert_eq!(rx.recv().unwrap(), 1);
  assert_eq!(rx.recv().unwrap(), 2);
  assert!(rx.is_empty());
}

#[test]
fn sync_try_send_full() {
  let (tx, rx) = bounded(1);
  tx.try_send(10).unwrap();
  assert!(tx.is_full());
  assert_eq!(tx.try_send(20), Err(TrySendError::Full(20)));
  drop(rx);
  assert_eq!(tx.try_send(30), Err(TrySendError::Closed(30)));
}

#[test]
fn sync_send_blocks() {
  let (tx, rx) = bounded(1);
  tx.send(1).unwrap();

  let send_handle = thread::spawn(move || {
    tx.send(2).unwrap(); // This should block
  });

  thread::sleep(Duration::from_millis(100));
  assert!(!send_handle.is_finished(), "Send should have blocked");

  assert_eq!(rx.recv().unwrap(), 1);
  send_handle.join().expect("Send thread panicked");
  assert_eq!(rx.recv().unwrap(), 2);
}

#[test]
fn sync_receiver_drop() {
  let (tx, rx) = bounded(1);
  tx.send(1).unwrap();
  drop(rx);
  // The permit for item 1 should have been released.
  // The channel is now closed to the sender.
  assert!(tx.is_closed());
  assert_eq!(tx.send(2), Err(SendError::Closed));
}

#[cfg(not(miri))]
#[tokio::test]
async fn async_send_recv() {
  let (tx, rx) = bounded_async(2);
  tx.send(1).await.unwrap();
  tx.send(2).await.unwrap();
  assert!(tx.is_full());

  assert_eq!(rx.recv().await.unwrap(), 1);
  assert_eq!(rx.recv().await.unwrap(), 2);
  assert!(rx.is_empty());
}

#[cfg(not(miri))]
#[tokio::test]
async fn async_send_waits() {
  let (tx, rx) = bounded_async(1);
  tx.send(1).await.unwrap();

  let send_task = tokio::spawn(async move {
    tx.send(2).await.unwrap();
  });

  tokio::time::sleep(Duration::from_millis(50)).await;
  assert!(!send_task.is_finished(), "Send task should be waiting");

  assert_eq!(rx.recv().await.unwrap(), 1);
  send_task.await.unwrap();
  assert_eq!(rx.recv().await.unwrap(), 2);
}

#[cfg(not(miri))]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn mixed_sync_send_async_recv() {
  // `bounded` returns a sync pair.
  let (tx_sync, rx_sync) = bounded(2);
  // Convert the receiver to its async version.
  let rx_async = rx_sync.to_async();

  // Use a blocking thread for the sync sender
  let send_handle = thread::spawn(move || {
    tx_sync.send(100).unwrap();
    tx_sync.send(200).unwrap();
  });

  // Now we can `.await` on the async receiver.
  assert_eq!(rx_async.recv().await.unwrap(), 100);
  assert_eq!(rx_async.recv().await.unwrap(), 200);

  send_handle.join().unwrap();
}

#[test]
fn mixed_async_send_sync_recv() {
  // `bounded` returns a sync pair.
  let (tx_sync, rx_sync) = bounded(2);
  // Convert the sender to its async version.
  let tx_async = tx_sync.to_async();

  let rt = tokio::runtime::Builder::new_multi_thread()
    .build()
    .unwrap();
  rt.block_on(async {
    // Now we can `.await` on the async sender.
    tx_async.send(100).await.unwrap();
    tx_async.send(200).await.unwrap();
  });

  // The original sync receiver works as expected.
  assert_eq!(rx_sync.recv().unwrap(), 100);
  assert_eq!(rx_sync.recv().unwrap(), 200);
}

// --- New Intensity and Correctness Tests ---

#[cfg(not(miri))]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn high_contention_async_mpsc() {
  const NUM_SENDERS: usize = 100;
  const MSGS_PER_SENDER: usize = 10;
  const CAPACITY: usize = 10;
  let total_msgs = NUM_SENDERS * MSGS_PER_SENDER;

  let (tx, rx) = bounded_async(CAPACITY);
  let mut handles = Vec::new();

  // Spawn many sender tasks
  for i in 0..NUM_SENDERS {
    let tx_clone = tx.clone();
    let handle = tokio::spawn(async move {
      for j in 0..MSGS_PER_SENDER {
        let val = i * MSGS_PER_SENDER + j;
        tx_clone.send(val).await.unwrap();
      }
    });
    handles.push(handle);
  }
  // Drop the original sender
  drop(tx);

  // Spawn a receiver task
  let receiver_handle = tokio::spawn(async move {
    let mut received_values = HashSet::new();
    let mut count = 0;
    while let Ok(val) = rx.recv().await {
      assert!(received_values.insert(val), "Duplicate value received!");
      count += 1;
    }
    assert!(rx.is_closed(), "Receiver should be closed at the end");
    assert_eq!(count, total_msgs);
    received_values
  });

  // Wait for all senders to complete
  for handle in handles {
    handle.await.unwrap();
  }

  // Wait for the receiver to complete and verify the sum
  let received_set = receiver_handle.await.unwrap();
  let expected_sum: usize = (0..total_msgs).sum();
  let actual_sum: usize = received_set.iter().sum();
  assert_eq!(
    actual_sum, expected_sum,
    "Sum of received values is incorrect"
  );
}

#[cfg(not(miri))]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn high_contention_mixed_sync_async() {
  const NUM_ASYNC_SENDERS: usize = 50;
  const NUM_SYNC_SENDERS: usize = 50;
  const MSGS_PER_SENDER: usize = 10;
  const CAPACITY: usize = 15;
  let total_msgs = (NUM_ASYNC_SENDERS + NUM_SYNC_SENDERS) * MSGS_PER_SENDER;

  let (tx, rx_sync) = bounded(CAPACITY);
  let rx = rx_sync.to_async(); // Use an async receiver

  let mut handles = Vec::new();

  // Spawn async senders
  for i in 0..NUM_ASYNC_SENDERS {
    let tx_clone = tx.clone().to_async();
    handles.push(tokio::spawn(async move {
      for j in 0..MSGS_PER_SENDER {
        let val = i * MSGS_PER_SENDER + j;
        tx_clone.send(val).await.unwrap();
      }
    }));
  }

  // Spawn sync senders in blocking threads
  let mut thread_handles = Vec::new();
  for i in 0..NUM_SYNC_SENDERS {
    let tx_clone = tx.clone();
    thread_handles.push(thread::spawn(move || {
      for j in 0..MSGS_PER_SENDER {
        let val = (i + NUM_ASYNC_SENDERS) * MSGS_PER_SENDER + j;
        tx_clone.send(val).unwrap();
      }
    }));
  }

  drop(tx); // Drop the original sender

  // Receive all messages
  let mut received_count = 0;
  let mut received_sum: usize = 0;
  while let Ok(val) = rx.recv().await {
    received_sum += val;
    received_count += 1;
  }

  // Await all tasks and threads
  for handle in handles {
    handle.await.unwrap();
  }
  for handle in thread_handles {
    handle.join().unwrap();
  }

  assert_eq!(received_count, total_msgs);
  let expected_sum: usize = (0..total_msgs).sum();
  assert_eq!(received_sum, expected_sum);
  assert!(rx.is_closed());
}

#[cfg(not(miri))]
#[tokio::test]
async fn sender_unblocks_when_receiver_dropped() {
  let (tx, rx) = bounded_async(1);

  // Fill the channel
  tx.send("first").await.unwrap();
  assert!(tx.is_full());

  // This send will wait because the channel is full
  let waiter_handle = tokio::spawn(async move {
    let result = tx.send("second").await;
    assert!(
      matches!(result, Err(SendError::Closed)),
      "Sender should receive a Closed error"
    );
  });

  // Give the waiter time to block on the full channel
  tokio::time::sleep(Duration::from_millis(50)).await;
  assert!(!waiter_handle.is_finished(), "Sender should be blocked");

  // Drop the receiver, which should unblock the waiting sender
  drop(rx);

  // The waiter should now complete with an error
  waiter_handle.await.unwrap();
}

#[cfg(not(miri))]
#[tokio::test]
async fn zero_capacity_channel_async_rendezvous() {
  let (tx, rx) = bounded_async::<i32>(0);
  let completed_send = Arc::new(AtomicUsize::new(0));

  let completed_send_clone = completed_send.clone();
  let sender_handle = tokio::spawn(async move {
    tx.send(1).await.unwrap();
    // This line should only be reached after the receiver has picked up the message
    completed_send_clone.store(1, Ordering::Relaxed);
  });

  // Give the sender a moment to call send() and wait
  tokio::time::sleep(Duration::from_millis(20)).await;
  assert_eq!(
    completed_send.load(Ordering::Relaxed),
    0,
    "Send should not complete before recv"
  );

  // Now receive, which should unblock the sender
  assert_eq!(rx.recv().await.unwrap(), 1);

  // Give the sender task time to run to completion
  tokio::time::sleep(Duration::from_millis(20)).await;
  assert_eq!(
    completed_send.load(Ordering::Relaxed),
    1,
    "Send should have completed after recv"
  );

  sender_handle.await.unwrap();
}

#[test]
fn zero_capacity_channel_sync_rendezvous() {
  let (tx, rx) = bounded::<()>(0);
  let tx_ready = Arc::new(std::sync::Barrier::new(2));
  let send_complete = Arc::new(std::sync::atomic::AtomicBool::new(false));

  let tx_ready_clone = tx_ready.clone();
  let send_complete_clone = send_complete.clone();
  let sender_handle = thread::spawn(move || {
    // Signal that the sender is ready to send
    tx_ready_clone.wait();
    // This will block until the receiver calls recv()
    tx.send(()).unwrap();
    send_complete_clone.store(true, Ordering::Relaxed);
  });

  // Wait for the sender to be ready to send
  tx_ready.wait();
  // Give the sender thread time to block in the send() call
  thread::sleep(Duration::from_millis(50));
  assert!(
    !send_complete.load(Ordering::Relaxed),
    "Send should not complete before recv"
  );

  // This will unblock the sender
  rx.recv().unwrap();

  sender_handle.join().unwrap();
  assert!(
    send_complete.load(Ordering::Relaxed),
    "Send should be complete after recv"
  );
}

// channels/src/mpsc/bounded_tests.rs

#[cfg(not(miri))]
#[tokio::test]
async fn test_bounded_async_receiver_drop_unblocks_all_senders() {
  use std::time::Duration;
  use tokio::time::timeout;

  // Create a bounded async channel with capacity 1
  let (tx, rx) = bounded_async::<i32>(1);

  // 1. Fill the channel to capacity.
  tx.send(100).await.unwrap();

  // 2. Clone and spawn 3 more senders.
  // Since the channel is already full, all 3 tasks will block on the capacity gate.
  let tx2 = tx.clone();
  let h2 = tokio::spawn(async move { tx2.send(200).await });

  let tx3 = tx.clone();
  let h3 = tokio::spawn(async move { tx3.send(300).await });

  let tx4 = tx.clone();
  let h4 = tokio::spawn(async move { tx4.send(400).await });

  // Give the spawned tasks time to run and block on the gate.
  tokio::time::sleep(Duration::from_millis(50)).await;

  // 3. Drop the receiver.
  // This must wake up and unblock ALL waiting senders so they can exit.
  drop(rx);

  // 4. Await all spawned senders with a timeout.
  // If the bug is present, some of the senders will remain pending forever and time out.
  let r2 = timeout(Duration::from_millis(500), h2).await;
  let r3 = timeout(Duration::from_millis(500), h3).await;
  let r4 = timeout(Duration::from_millis(500), h4).await;

  // Unwrap the timeouts (fails the test if a timeout occurred)
  let res2 = r2.expect("Sender 2 timed out (deadlocked)!").unwrap();
  let res3 = r3.expect("Sender 3 timed out (deadlocked)!").unwrap();
  let res4 = r4.expect("Sender 4 timed out (deadlocked)!").unwrap();

  // Ensure all senders received a Closed error rather than successfully sending
  assert!(matches!(res2, Err(SendError::Closed)));
  assert!(matches!(res3, Err(SendError::Closed)));
  assert!(matches!(res4, Err(SendError::Closed)));
}