use crate::cx::Cx;
use crate::util::{Arena, ArenaIndex};
use parking_lot::Mutex;
use smallvec::SmallVec;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SendError<T> {
Closed(T),
}
impl<T> std::fmt::Display for SendError<T> {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Closed(_) => write!(f, "sending on a closed broadcast channel"),
}
}
}
impl<T: std::fmt::Debug> std::error::Error for SendError<T> {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecvError {
Lagged(u64),
Closed,
Cancelled,
PolledAfterCompletion,
}
impl std::fmt::Display for RecvError {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Lagged(n) => write!(f, "receiver lagged by {n} messages"),
Self::Closed => write!(f, "broadcast channel closed"),
Self::Cancelled => write!(f, "receive operation cancelled"),
Self::PolledAfterCompletion => {
write!(f, "broadcast receive future polled after completion")
}
}
}
}
impl std::error::Error for RecvError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TryRecvError {
Empty,
Lagged(u64),
Closed,
}
impl std::fmt::Display for TryRecvError {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Empty => write!(f, "broadcast channel empty"),
Self::Lagged(n) => write!(f, "receiver lagged by {n} messages"),
Self::Closed => write!(f, "broadcast channel closed"),
}
}
}
impl std::error::Error for TryRecvError {}
#[derive(Debug)]
struct Shared<T> {
buffer: VecDeque<Slot<T>>,
capacity: usize,
total_sent: u64,
wakers: Arena<Waker>,
}
#[derive(Debug)]
struct Slot<T> {
msg: T,
index: u64,
}
struct Channel<T> {
sender_count: AtomicUsize,
receiver_count: AtomicUsize,
inner: Mutex<Shared<T>>,
}
impl<T: std::fmt::Debug> std::fmt::Debug for Channel<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Channel")
.field("inner", &self.inner)
.finish_non_exhaustive()
}
}
#[must_use]
#[inline]
pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
assert!(capacity > 0, "capacity must be non-zero");
let shared = Arc::new(Channel {
sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1),
inner: Mutex::new(Shared {
buffer: VecDeque::with_capacity(capacity),
capacity,
total_sent: 0,
wakers: Arena::new(),
}),
});
let sender = Sender {
channel: Arc::clone(&shared),
};
let receiver = Receiver {
channel: shared,
next_index: 0,
};
(sender, receiver)
}
#[derive(Debug)]
pub struct Sender<T> {
channel: Arc<Channel<T>>,
}
impl<T: Clone> Sender<T> {
#[inline]
pub fn reserve(&self, cx: &Cx) -> Result<SendPermit<'_, T>, SendError<()>> {
if cx.checkpoint().is_err() {
cx.trace("broadcast::reserve called with cancel pending");
}
if self.channel.receiver_count.load(Ordering::Acquire) == 0 {
return Err(SendError::Closed(()));
}
Ok(SendPermit { sender: self })
}
#[inline]
pub fn send(&self, cx: &Cx, msg: T) -> Result<usize, SendError<T>> {
let permit = match self.reserve(cx) {
Ok(p) => p,
Err(SendError::Closed(())) => return Err(SendError::Closed(msg)),
};
Ok(permit.send(msg))
}
#[must_use]
#[inline]
pub fn receiver_count(&self) -> usize {
self.channel.receiver_count.load(Ordering::Acquire)
}
#[must_use]
#[inline]
pub fn len(&self) -> usize {
self.channel.inner.lock().buffer.len()
}
#[must_use]
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[must_use]
#[inline]
pub fn subscribe(&self) -> Receiver<T> {
let (total_sent, _to_drop) = {
let mut inner = self.channel.inner.lock();
let to_drop = if self.channel.receiver_count.fetch_add(1, Ordering::Relaxed) == 0 {
Some(std::mem::take(&mut inner.buffer))
} else {
None
};
(inner.total_sent, to_drop)
};
Receiver {
channel: Arc::clone(&self.channel),
next_index: total_sent,
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
Self {
channel: Arc::clone(&self.channel),
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) != 1 {
return;
}
let wakers_to_wake: SmallVec<[Waker; 4]> = {
let mut inner = self.channel.inner.lock();
inner.wakers.drain_values().collect()
};
for waker in wakers_to_wake {
waker.wake();
}
}
}
#[must_use = "SendPermit must be consumed via send()"]
pub struct SendPermit<'a, T> {
sender: &'a Sender<T>,
}
impl<T: Clone> SendPermit<'_, T> {
#[inline]
pub fn send(self, msg: T) -> usize {
let mut inner = self.sender.channel.inner.lock();
if self.sender.channel.receiver_count.load(Ordering::Acquire) == 0 {
return 0;
}
let popped = if inner.buffer.len() == inner.capacity {
inner.buffer.pop_front()
} else {
None
};
let index = inner.total_sent;
inner.buffer.push_back(Slot { msg, index });
let live_receivers = self.sender.channel.receiver_count.load(Ordering::Acquire);
if live_receivers == 0 {
let _ = inner.buffer.pop_back();
if let Some(slot) = popped {
inner.buffer.push_front(slot);
}
return 0;
}
inner.total_sent += 1;
let wakers_to_wake: SmallVec<[Waker; 4]> = inner.wakers.drain_values().collect();
drop(inner);
drop(popped);
for waker in wakers_to_wake {
waker.wake();
}
live_receivers
}
}
#[derive(Debug)]
pub struct Receiver<T> {
channel: Arc<Channel<T>>,
next_index: u64,
}
impl<T> Receiver<T> {
pub(crate) fn clear_waiter_registration(&self, waiter: &mut Option<ArenaIndex>) {
if let Some(token) = waiter.take() {
let mut inner = self.channel.inner.lock();
inner.wakers.remove(token);
}
}
}
impl<T: Clone> Receiver<T> {
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let inner = self.channel.inner.lock();
let earliest = inner.buffer.front().map_or(inner.total_sent, |s| s.index);
if self.next_index < earliest {
let missed = earliest - self.next_index;
self.next_index = earliest;
return Err(TryRecvError::Lagged(missed));
}
let delta = self.next_index.saturating_sub(earliest);
if let Ok(offset) = usize::try_from(delta) {
if let Some(slot) = inner.buffer.get(offset) {
let msg = slot.msg.clone();
self.next_index += 1;
return Ok(msg);
}
}
if self.channel.sender_count.load(Ordering::Acquire) == 0 {
Err(TryRecvError::Closed)
} else {
Err(TryRecvError::Empty)
}
}
#[inline]
pub fn recv<'a>(&'a mut self, cx: &'a Cx) -> Recv<'a, T> {
Recv {
receiver: self,
cx,
waiter: None,
completed: false,
}
}
#[inline]
pub(crate) fn poll_recv_with_waiter(
&mut self,
cx: &Cx,
task_cx: &Context<'_>,
waiter: &mut Option<ArenaIndex>,
) -> Poll<Result<T, RecvError>> {
if cx.checkpoint().is_err() {
cx.trace("broadcast::recv cancelled");
self.clear_waiter_registration(waiter);
return Poll::Ready(Err(RecvError::Cancelled));
}
let mut inner = self.channel.inner.lock();
let earliest = inner.buffer.front().map_or(inner.total_sent, |s| s.index);
if self.next_index < earliest {
let missed = earliest - self.next_index;
self.next_index = earliest;
if let Some(token) = waiter.take() {
inner.wakers.remove(token);
}
return Poll::Ready(Err(RecvError::Lagged(missed)));
}
let delta = self.next_index.saturating_sub(earliest);
if let Ok(offset) = usize::try_from(delta) {
if let Some(slot) = inner.buffer.get(offset) {
let msg = slot.msg.clone();
self.next_index += 1;
if let Some(token) = waiter.take() {
inner.wakers.remove(token);
}
return Poll::Ready(Ok(msg));
}
}
if self.channel.sender_count.load(Ordering::Acquire) == 0 {
if let Some(token) = waiter.take() {
inner.wakers.remove(token);
}
return Poll::Ready(Err(RecvError::Closed));
}
let current_waker = task_cx.waker();
if let Some(token) = *waiter {
if let Some(waker) = inner.wakers.get_mut(token) {
if !waker.will_wake(current_waker) {
waker.clone_from(current_waker);
}
} else {
let token = inner.wakers.insert(current_waker.clone());
*waiter = Some(token);
}
} else {
let token = inner.wakers.insert(current_waker.clone());
*waiter = Some(token);
}
drop(inner);
Poll::Pending
}
}
pub struct Recv<'a, T> {
receiver: &'a mut Receiver<T>,
cx: &'a Cx,
waiter: Option<ArenaIndex>,
completed: bool,
}
impl<T> Recv<'_, T> {
fn clear_waiter_registration(&mut self) {
self.receiver.clear_waiter_registration(&mut self.waiter);
}
}
impl<T: Clone> Future for Recv<'_, T> {
type Output = Result<T, RecvError>;
#[inline]
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
if this.completed {
return Poll::Ready(Err(RecvError::PolledAfterCompletion));
}
let poll = this
.receiver
.poll_recv_with_waiter(this.cx, ctx, &mut this.waiter);
if poll.is_ready() {
this.completed = true;
}
poll
}
}
impl<T> Drop for Recv<'_, T> {
fn drop(&mut self) {
self.clear_waiter_registration();
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
Self {
channel: Arc::clone(&self.channel),
next_index: self.next_index,
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
let mut to_drop = None;
{
let mut inner = self.channel.inner.lock();
if self.channel.receiver_count.load(Ordering::Acquire) == 0 {
to_drop = Some(std::mem::take(&mut inner.buffer));
}
}
drop(to_drop);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::conformance::{ConformanceTarget, LabRuntimeTarget, TestConfig};
use crate::runtime::yield_now;
use crate::types::Budget;
use crate::util::ArenaIndex;
use crate::{RegionId, TaskId};
use serde_json::Value;
use std::future::Future;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::task::{Context, Poll, Waker};
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn test_cx() -> Cx {
Cx::new(
RegionId::from_arena(ArenaIndex::new(0, 0)),
TaskId::from_arena(ArenaIndex::new(0, 0)),
Budget::INFINITE,
)
}
fn block_on<F: Future>(f: F) -> F::Output {
let waker = std::task::Waker::noop().clone();
let mut cx = Context::from_waker(&waker);
let mut pinned = Box::pin(f);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
#[derive(Debug)]
struct CountingWaker {
wakes: AtomicUsize,
}
impl CountingWaker {
fn new() -> Arc<Self> {
Arc::new(Self {
wakes: AtomicUsize::new(0),
})
}
fn wake_count(&self) -> usize {
self.wakes.load(AtomicOrdering::Acquire)
}
}
impl std::task::Wake for CountingWaker {
fn wake(self: Arc<Self>) {
self.wakes.fetch_add(1, AtomicOrdering::AcqRel);
}
fn wake_by_ref(self: &Arc<Self>) {
self.wakes.fetch_add(1, AtomicOrdering::AcqRel);
}
}
#[derive(Debug)]
struct DropBlocker {
entered_tx: std::sync::mpsc::SyncSender<()>,
release_rx: std::sync::Mutex<Option<std::sync::mpsc::Receiver<()>>>,
armed: AtomicUsize,
}
impl DropBlocker {
fn new() -> (
Arc<Self>,
std::sync::mpsc::Receiver<()>,
std::sync::mpsc::SyncSender<()>,
) {
let (entered_tx, entered_rx) = std::sync::mpsc::sync_channel(1);
let (release_tx, release_rx) = std::sync::mpsc::sync_channel(1);
(
Arc::new(Self {
entered_tx,
release_rx: std::sync::Mutex::new(Some(release_rx)),
armed: AtomicUsize::new(1),
}),
entered_rx,
release_tx,
)
}
}
#[derive(Debug)]
enum GateMsg {
Blocking(Arc<DropBlocker>),
Plain(i32),
}
impl Clone for GateMsg {
fn clone(&self) -> Self {
match self {
Self::Blocking(blocker) => Self::Blocking(Arc::clone(blocker)),
Self::Plain(value) => Self::Plain(*value),
}
}
}
impl Drop for GateMsg {
fn drop(&mut self) {
let Self::Blocking(blocker) = self else {
return;
};
if blocker.armed.fetch_sub(1, AtomicOrdering::AcqRel) != 1 {
return;
}
let _ = blocker.entered_tx.send(());
let release_rx = blocker
.release_rx
.lock()
.expect("drop blocker mutex poisoned")
.take();
if let Some(rx) = release_rx {
rx.recv().expect("drop blocker release recv");
}
}
}
#[test]
fn basic_send_recv() {
init_test("basic_send_recv");
let cx = test_cx();
let (tx, mut rx1) = channel(10);
let mut rx2 = tx.subscribe();
tx.send(&cx, 10).expect("send failed");
tx.send(&cx, 20).expect("send failed");
let rx1_first = block_on(rx1.recv(&cx)).unwrap();
crate::assert_with_log!(rx1_first == 10, "rx1 first", 10, rx1_first);
let rx1_second = block_on(rx1.recv(&cx)).unwrap();
crate::assert_with_log!(rx1_second == 20, "rx1 second", 20, rx1_second);
let rx2_first = block_on(rx2.recv(&cx)).unwrap();
crate::assert_with_log!(rx2_first == 10, "rx2 first", 10, rx2_first);
let rx2_second = block_on(rx2.recv(&cx)).unwrap();
crate::assert_with_log!(rx2_second == 20, "rx2 second", 20, rx2_second);
crate::test_complete!("basic_send_recv");
}
#[test]
fn lag_detection() {
init_test("lag_detection");
let cx = test_cx();
let (tx, mut rx) = channel(2);
tx.send(&cx, 1).unwrap();
tx.send(&cx, 2).unwrap();
tx.send(&cx, 3).unwrap();
let result = block_on(rx.recv(&cx));
match result {
Err(RecvError::Lagged(n)) => {
crate::assert_with_log!(n == 1, "lagged count", 1, n);
}
other => unreachable!("expected lagged, got {other:?}"),
}
let second = block_on(rx.recv(&cx)).unwrap();
crate::assert_with_log!(second == 2, "second", 2, second);
let third = block_on(rx.recv(&cx)).unwrap();
crate::assert_with_log!(third == 3, "third", 3, third);
crate::test_complete!("lag_detection");
}
#[test]
fn closed_send() {
init_test("closed_send");
let cx = test_cx();
let (tx, rx) = channel::<i32>(10);
drop(rx);
let result = tx.send(&cx, 1);
crate::assert_with_log!(
matches!(result, Err(SendError::Closed(1))),
"send after close",
"Err(Closed(1))",
format!("{:?}", result)
);
crate::test_complete!("closed_send");
}
#[test]
fn closed_recv() {
init_test("closed_recv");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(10);
drop(tx);
let result = block_on(rx.recv(&cx));
crate::assert_with_log!(
matches!(result, Err(RecvError::Closed)),
"recv after close",
"Err(Closed)",
format!("{:?}", result)
);
crate::test_complete!("closed_recv");
}
#[test]
fn subscribe_sees_future() {
init_test("subscribe_sees_future");
let cx = test_cx();
let (tx, mut rx1) = channel(10);
tx.send(&cx, 1).unwrap();
let mut rx2 = tx.subscribe();
tx.send(&cx, 2).unwrap();
let rx1_first = block_on(rx1.recv(&cx)).unwrap();
crate::assert_with_log!(rx1_first == 1, "rx1 first", 1, rx1_first);
let rx1_second = block_on(rx1.recv(&cx)).unwrap();
crate::assert_with_log!(rx1_second == 2, "rx1 second", 2, rx1_second);
let rx2_first = block_on(rx2.recv(&cx)).unwrap();
crate::assert_with_log!(rx2_first == 2, "rx2 first", 2, rx2_first);
crate::test_complete!("subscribe_sees_future");
}
#[test]
fn send_returns_live_receiver_count() {
init_test("send_returns_live_receiver_count");
let cx = test_cx();
let (tx, rx1) = channel::<i32>(10);
let rx2 = tx.subscribe();
let rx3 = rx2.clone();
let count = tx.send(&cx, 1).expect("send failed");
crate::assert_with_log!(count == 3, "receiver count", 3, count);
drop(rx1);
let count2 = tx.send(&cx, 2).expect("send failed");
crate::assert_with_log!(count2 == 2, "receiver count after drop", 2, count2);
drop(rx2);
drop(rx3);
let closed = tx.send(&cx, 3);
crate::assert_with_log!(
matches!(closed, Err(SendError::Closed(3))),
"send closed when no receivers",
"Err(Closed(3))",
format!("{:?}", closed)
);
crate::test_complete!("send_returns_live_receiver_count");
}
#[test]
fn send_count_excludes_receivers_that_subscribe_after_commit() {
init_test("send_count_excludes_receivers_that_subscribe_after_commit");
let cx = test_cx();
let (tx, mut rx1) = channel::<GateMsg>(1);
let (blocker, entered_rx, release_tx) = DropBlocker::new();
tx.send(&cx, GateMsg::Blocking(blocker)).unwrap();
let tx_thread = tx.clone();
let handle = std::thread::spawn(move || {
let cx = test_cx();
tx_thread.send(&cx, GateMsg::Plain(2)).expect("send failed")
});
entered_rx.recv().expect("drop blocker entered");
let mut rx2 = tx.subscribe();
release_tx.send(()).expect("drop blocker release send");
let count = handle.join().expect("sender thread panicked");
crate::assert_with_log!(
count == 1,
"send count excludes late subscriber",
1usize,
count
);
let lag = block_on(rx1.recv(&cx));
crate::assert_with_log!(
matches!(lag, Err(RecvError::Lagged(1))),
"existing receiver first observes eviction lag",
true,
matches!(lag, Err(RecvError::Lagged(1)))
);
let got1 = block_on(rx1.recv(&cx)).unwrap();
crate::assert_with_log!(
matches!(got1, GateMsg::Plain(2)),
"existing receiver sees committed message",
true,
matches!(got1, GateMsg::Plain(2))
);
tx.send(&cx, GateMsg::Plain(3)).unwrap();
let got2 = block_on(rx2.recv(&cx)).unwrap();
crate::assert_with_log!(
matches!(got2, GateMsg::Plain(3)),
"late subscriber sees only future message",
true,
matches!(got2, GateMsg::Plain(3))
);
crate::test_complete!("send_count_excludes_receivers_that_subscribe_after_commit");
}
#[test]
fn recv_waiter_dedup_and_wake_on_send() {
init_test("recv_waiter_dedup_and_wake_on_send");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(10);
let wake_state = CountingWaker::new();
let waker = Waker::from(Arc::clone(&wake_state));
let mut ctx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
let first_pending = matches!(fut.as_mut().poll(&mut ctx), Poll::Pending);
crate::assert_with_log!(first_pending, "first poll pending", true, first_pending);
let second_pending = matches!(fut.as_mut().poll(&mut ctx), Poll::Pending);
crate::assert_with_log!(second_pending, "second poll pending", true, second_pending);
tx.send(&cx, 123).expect("send failed");
let wake_count = wake_state.wake_count();
crate::assert_with_log!(wake_count == 1, "wake count", 1, wake_count);
let got = match fut.as_mut().poll(&mut ctx) {
Poll::Ready(Ok(v)) => v,
other => {
unreachable!("expected Ready(Ok), got {other:?}");
}
};
crate::assert_with_log!(got == 123, "received", 123, got);
crate::test_complete!("recv_waiter_dedup_and_wake_on_send");
}
#[test]
fn pending_recv_woken_on_sender_drop_returns_closed() {
init_test("pending_recv_woken_on_sender_drop_returns_closed");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(10);
let wake_state = CountingWaker::new();
let waker = Waker::from(Arc::clone(&wake_state));
let mut ctx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
let pending = matches!(fut.as_mut().poll(&mut ctx), Poll::Pending);
crate::assert_with_log!(pending, "poll pending", true, pending);
drop(tx);
let wake_count = wake_state.wake_count();
crate::assert_with_log!(wake_count == 1, "wake count", 1, wake_count);
let got = match fut.as_mut().poll(&mut ctx) {
Poll::Ready(Err(e)) => e,
other => {
unreachable!("expected Ready(Err), got {other:?}");
}
};
crate::assert_with_log!(
got == RecvError::Closed,
"recv closed after sender drop",
RecvError::Closed,
got
);
crate::test_complete!("pending_recv_woken_on_sender_drop_returns_closed");
}
#[test]
fn recv_cancelled_does_not_advance_cursor() {
init_test("recv_cancelled_does_not_advance_cursor");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(10);
cx.set_cancel_requested(true);
let cancelled = block_on(rx.recv(&cx));
crate::assert_with_log!(
matches!(cancelled, Err(RecvError::Cancelled)),
"recv cancelled",
"Err(Cancelled)",
format!("{:?}", cancelled)
);
cx.set_cancel_requested(false);
tx.send(&cx, 7).expect("send failed");
let got = block_on(rx.recv(&cx)).unwrap();
crate::assert_with_log!(got == 7, "received after cancel", 7, got);
crate::test_complete!("recv_cancelled_does_not_advance_cursor");
}
#[test]
fn recv_cancelled_clears_waiter_registration() {
init_test("recv_cancelled_clears_waiter_registration");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(10);
let wake_state = CountingWaker::new();
let waker = Waker::from(Arc::clone(&wake_state));
let mut ctx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
crate::assert_with_log!(
matches!(fut.as_mut().poll(&mut ctx), Poll::Pending),
"poll pending",
true,
true
);
let wakers_len = {
let inner = tx.channel.inner.lock();
inner.wakers.len()
};
crate::assert_with_log!(wakers_len == 1, "one waiter registered", 1usize, wakers_len);
cx.set_cancel_requested(true);
let res = fut.as_mut().poll(&mut ctx);
crate::assert_with_log!(
matches!(res, Poll::Ready(Err(RecvError::Cancelled))),
"cancelled",
"Ready(Err(Cancelled))",
format!("{res:?}")
);
let cleared = {
let inner = tx.channel.inner.lock();
inner.wakers.is_empty()
};
crate::assert_with_log!(cleared, "waiter cleared", true, cleared);
drop(fut);
cx.set_cancel_requested(false);
tx.send(&cx, 7).expect("send failed");
let got = block_on(rx.recv(&cx)).unwrap();
crate::assert_with_log!(got == 7, "received after cancel", 7, got);
crate::test_complete!("recv_cancelled_clears_waiter_registration");
}
#[test]
fn recv_drop_clears_waiter_registration() {
init_test("recv_drop_clears_waiter_registration");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(10);
let wake_state = CountingWaker::new();
let waker = Waker::from(Arc::clone(&wake_state));
let mut ctx = Context::from_waker(&waker);
{
let mut fut = Box::pin(rx.recv(&cx));
crate::assert_with_log!(
matches!(fut.as_mut().poll(&mut ctx), Poll::Pending),
"poll pending",
true,
true
);
let wakers_len = {
let inner = tx.channel.inner.lock();
inner.wakers.len()
};
crate::assert_with_log!(wakers_len == 1, "one waiter registered", 1usize, wakers_len);
}
let cleared = {
let inner = tx.channel.inner.lock();
inner.wakers.is_empty()
};
crate::assert_with_log!(cleared, "waiter cleared on drop", true, cleared);
tx.send(&cx, 7).expect("send failed");
let got = block_on(rx.recv(&cx)).unwrap();
crate::assert_with_log!(got == 7, "received after drop", 7, got);
crate::test_complete!("recv_drop_clears_waiter_registration");
}
#[test]
fn broadcast_cloned_sender_both_deliver() {
init_test("broadcast_cloned_sender_both_deliver");
let cx = test_cx();
let (tx1, mut rx) = channel(10);
let tx2 = tx1.clone();
tx1.send(&cx, 1).unwrap();
tx2.send(&cx, 2).unwrap();
let first = block_on(rx.recv(&cx)).unwrap();
crate::assert_with_log!(first == 1, "first", 1, first);
let second = block_on(rx.recv(&cx)).unwrap();
crate::assert_with_log!(second == 2, "second", 2, second);
crate::test_complete!("broadcast_cloned_sender_both_deliver");
}
#[test]
fn broadcast_heavy_lag_overwrite() {
init_test("broadcast_heavy_lag_overwrite");
let cx = test_cx();
let (tx, mut rx) = channel(4);
for i in 0..10 {
tx.send(&cx, i).unwrap();
}
let result = block_on(rx.recv(&cx));
match result {
Err(RecvError::Lagged(n)) => {
crate::assert_with_log!(n == 6, "lagged 6", 6u64, n);
}
other => unreachable!("expected lagged, got {other:?}"),
}
for expected in 6..10 {
let got = block_on(rx.recv(&cx)).unwrap();
crate::assert_with_log!(got == expected, "post-lag msg", expected, got);
}
crate::test_complete!("broadcast_heavy_lag_overwrite");
}
#[test]
fn broadcast_clone_receiver_shares_position() {
init_test("broadcast_clone_receiver_shares_position");
let cx = test_cx();
let (tx, mut rx1) = channel(10);
tx.send(&cx, 10).unwrap();
tx.send(&cx, 20).unwrap();
let first = block_on(rx1.recv(&cx)).unwrap();
crate::assert_with_log!(first == 10, "rx1 first", 10, first);
let mut rx2 = rx1.clone();
let rx1_second = block_on(rx1.recv(&cx)).unwrap();
crate::assert_with_log!(rx1_second == 20, "rx1 second", 20, rx1_second);
let rx2_second = block_on(rx2.recv(&cx)).unwrap();
crate::assert_with_log!(rx2_second == 20, "rx2 second", 20, rx2_second);
crate::test_complete!("broadcast_clone_receiver_shares_position");
}
#[test]
fn broadcast_reserve_then_send() {
init_test("broadcast_reserve_then_send");
let cx = test_cx();
let (tx, mut rx) = channel(10);
let permit = tx.reserve(&cx).expect("reserve failed");
let count = permit.send(42);
crate::assert_with_log!(count == 1, "receiver count", 1usize, count);
let got = block_on(rx.recv(&cx)).unwrap();
crate::assert_with_log!(got == 42, "received", 42, got);
crate::test_complete!("broadcast_reserve_then_send");
}
#[test]
fn broadcast_drop_all_senders_closes() {
init_test("broadcast_drop_all_senders_closes");
let cx = test_cx();
let (tx1, mut rx) = channel::<i32>(10);
let tx2 = tx1.clone();
drop(tx1);
tx2.send(&cx, 5).unwrap();
let got = block_on(rx.recv(&cx)).unwrap();
crate::assert_with_log!(got == 5, "still open", 5, got);
drop(tx2);
let result = block_on(rx.recv(&cx));
crate::assert_with_log!(
matches!(result, Err(RecvError::Closed)),
"closed after all senders drop",
true,
true
);
crate::test_complete!("broadcast_drop_all_senders_closes");
}
#[test]
fn broadcast_fan_out_under_lab_runtime() {
init_test("broadcast_fan_out_under_lab_runtime");
let config = TestConfig::new()
.with_seed(0xBADC_A571)
.with_tracing(true)
.with_max_steps(20_000);
let mut runtime = LabRuntimeTarget::create_runtime(config);
let checkpoints = Arc::new(StdMutex::new(Vec::<Value>::new()));
let ((rx1_first, rx1_second), (rx2_first, rx2_second), checkpoints) =
LabRuntimeTarget::block_on(&mut runtime, async move {
let cx = Cx::current().expect("lab runtime should install a current Cx");
let sender_spawn_cx = cx.clone();
let rx1_spawn_cx = cx.clone();
let rx2_spawn_cx = cx.clone();
let (tx, mut rx1) = channel(8);
let mut rx2 = tx.subscribe();
let sender_checkpoints = Arc::clone(&checkpoints);
let sender = LabRuntimeTarget::spawn(&sender_spawn_cx, Budget::INFINITE, {
let tx = tx.clone();
let sender_task_cx = sender_spawn_cx.clone();
async move {
yield_now().await;
let permit = tx.reserve(&sender_task_cx).expect("reserve should succeed");
let first_receivers = permit.send(11);
let first = serde_json::json!({
"phase": "sent_first",
"value": 11,
"receivers": first_receivers,
});
tracing::info!(event = %first, "broadcast_lab_checkpoint");
sender_checkpoints.lock().unwrap().push(first);
let second_receivers =
tx.send(&sender_task_cx, 22).expect("send should succeed");
let second = serde_json::json!({
"phase": "sent_second",
"value": 22,
"receivers": second_receivers,
});
tracing::info!(event = %second, "broadcast_lab_checkpoint");
sender_checkpoints.lock().unwrap().push(second);
}
});
let rx1_checkpoints = Arc::clone(&checkpoints);
let rx1_task = LabRuntimeTarget::spawn(&rx1_spawn_cx, Budget::INFINITE, {
let rx1_task_cx = rx1_spawn_cx.clone();
async move {
let first = rx1.recv(&rx1_task_cx).await.expect("rx1 first receive");
let first_event = serde_json::json!({
"phase": "rx1_first",
"value": first,
});
tracing::info!(event = %first_event, "broadcast_lab_checkpoint");
rx1_checkpoints.lock().unwrap().push(first_event);
let second = rx1.recv(&rx1_task_cx).await.expect("rx1 second receive");
let second_event = serde_json::json!({
"phase": "rx1_second",
"value": second,
});
tracing::info!(event = %second_event, "broadcast_lab_checkpoint");
rx1_checkpoints.lock().unwrap().push(second_event);
(first, second)
}
});
let rx2_checkpoints = Arc::clone(&checkpoints);
let rx2_task = LabRuntimeTarget::spawn(&rx2_spawn_cx, Budget::INFINITE, {
let rx2_task_cx = rx2_spawn_cx.clone();
async move {
let first = rx2.recv(&rx2_task_cx).await.expect("rx2 first receive");
let first_event = serde_json::json!({
"phase": "rx2_first",
"value": first,
});
tracing::info!(event = %first_event, "broadcast_lab_checkpoint");
rx2_checkpoints.lock().unwrap().push(first_event);
let second = rx2.recv(&rx2_task_cx).await.expect("rx2 second receive");
let second_event = serde_json::json!({
"phase": "rx2_second",
"value": second,
});
tracing::info!(event = %second_event, "broadcast_lab_checkpoint");
rx2_checkpoints.lock().unwrap().push(second_event);
(first, second)
}
});
let sender_outcome = sender.await;
crate::assert_with_log!(
matches!(sender_outcome, crate::types::Outcome::Ok(())),
"sender task completes successfully",
true,
matches!(sender_outcome, crate::types::Outcome::Ok(()))
);
let rx1_outcome = rx1_task.await;
crate::assert_with_log!(
matches!(rx1_outcome, crate::types::Outcome::Ok(_)),
"rx1 task completes successfully",
true,
matches!(rx1_outcome, crate::types::Outcome::Ok(_))
);
let crate::types::Outcome::Ok(rx1_values) = rx1_outcome else {
panic!("rx1 task should finish successfully");
};
let rx2_outcome = rx2_task.await;
crate::assert_with_log!(
matches!(rx2_outcome, crate::types::Outcome::Ok(_)),
"rx2 task completes successfully",
true,
matches!(rx2_outcome, crate::types::Outcome::Ok(_))
);
let crate::types::Outcome::Ok(rx2_values) = rx2_outcome else {
panic!("rx2 task should finish successfully");
};
(rx1_values, rx2_values, checkpoints.lock().unwrap().clone())
});
assert_eq!((rx1_first, rx1_second), (11, 22));
assert_eq!((rx2_first, rx2_second), (11, 22));
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "sent_first"),
"first send checkpoint should be recorded"
);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "rx1_second"),
"rx1 second receive checkpoint should be recorded"
);
assert!(
checkpoints
.iter()
.any(|event| event["phase"] == "rx2_second"),
"rx2 second receive checkpoint should be recorded"
);
let violations = runtime.oracles.check_all(runtime.now());
assert!(
violations.is_empty(),
"broadcast lab-runtime fan-out test should leave runtime invariants clean: {violations:?}"
);
}
#[test]
fn recv_closed_clears_waiter_registration() {
init_test("recv_closed_clears_waiter_registration");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(10);
let wake_state = CountingWaker::new();
let waker = Waker::from(Arc::clone(&wake_state));
let mut ctx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
crate::assert_with_log!(
matches!(fut.as_mut().poll(&mut ctx), Poll::Pending),
"poll pending",
true,
true
);
let wakers_len = {
let inner = tx.channel.inner.lock();
inner.wakers.len()
};
crate::assert_with_log!(wakers_len == 1, "one waiter registered", 1usize, wakers_len);
drop(tx);
let res = fut.as_mut().poll(&mut ctx);
crate::assert_with_log!(
matches!(res, Poll::Ready(Err(RecvError::Closed))),
"closed",
"Ready(Err(Closed))",
format!("{res:?}")
);
drop(fut);
crate::test_complete!("recv_closed_clears_waiter_registration");
}
#[test]
fn recv_second_poll_after_ok_fails_closed() {
init_test("recv_second_poll_after_ok_fails_closed");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(4);
tx.send(&cx, 42).expect("send failed");
let wake_state = CountingWaker::new();
let waker = Waker::from(Arc::clone(&wake_state));
let mut ctx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
let first = fut.as_mut().poll(&mut ctx);
crate::assert_with_log!(
matches!(first, Poll::Ready(Ok(42))),
"first poll receives value",
"Poll::Ready(Ok(42))",
format!("{first:?}")
);
let second = fut.as_mut().poll(&mut ctx);
crate::assert_with_log!(
matches!(second, Poll::Ready(Err(RecvError::PolledAfterCompletion))),
"second poll fails closed",
"Poll::Ready(Err(PolledAfterCompletion))",
format!("{second:?}")
);
crate::test_complete!("recv_second_poll_after_ok_fails_closed");
}
#[test]
fn recv_second_poll_after_lagged_fails_closed() {
init_test("recv_second_poll_after_lagged_fails_closed");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(2);
tx.send(&cx, 1).expect("send failed");
tx.send(&cx, 2).expect("send failed");
tx.send(&cx, 3).expect("send failed");
let wake_state = CountingWaker::new();
let waker = Waker::from(Arc::clone(&wake_state));
let mut ctx = Context::from_waker(&waker);
{
let mut fut = std::pin::pin!(rx.recv(&cx));
let first = fut.as_mut().poll(&mut ctx);
crate::assert_with_log!(
matches!(first, Poll::Ready(Err(RecvError::Lagged(1)))),
"first poll reports lag",
"Poll::Ready(Err(Lagged(1)))",
format!("{first:?}")
);
let second = fut.as_mut().poll(&mut ctx);
crate::assert_with_log!(
matches!(second, Poll::Ready(Err(RecvError::PolledAfterCompletion))),
"second poll fails closed after lag",
"Poll::Ready(Err(PolledAfterCompletion))",
format!("{second:?}")
);
}
let next = block_on(rx.recv(&cx))
.expect("new recv future should continue from lag-adjusted cursor");
crate::assert_with_log!(
next == 2,
"next recv continues at earliest retained item",
2,
next
);
crate::test_complete!("recv_second_poll_after_lagged_fails_closed");
}
#[test]
fn recv_second_poll_after_closed_fails_closed() {
init_test("recv_second_poll_after_closed_fails_closed");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(4);
drop(tx);
let wake_state = CountingWaker::new();
let waker = Waker::from(Arc::clone(&wake_state));
let mut ctx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
let first = fut.as_mut().poll(&mut ctx);
crate::assert_with_log!(
matches!(first, Poll::Ready(Err(RecvError::Closed))),
"first poll reports closed",
"Poll::Ready(Err(Closed))",
format!("{first:?}")
);
let second = fut.as_mut().poll(&mut ctx);
crate::assert_with_log!(
matches!(second, Poll::Ready(Err(RecvError::PolledAfterCompletion))),
"second poll fails closed after close",
"Poll::Ready(Err(PolledAfterCompletion))",
format!("{second:?}")
);
crate::test_complete!("recv_second_poll_after_closed_fails_closed");
}
#[test]
fn recv_second_poll_after_cancelled_fails_closed() {
init_test("recv_second_poll_after_cancelled_fails_closed");
let cx = test_cx();
let (_tx, mut rx) = channel::<i32>(4);
cx.set_cancel_requested(true);
let wake_state = CountingWaker::new();
let waker = Waker::from(Arc::clone(&wake_state));
let mut ctx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
let first = fut.as_mut().poll(&mut ctx);
crate::assert_with_log!(
matches!(first, Poll::Ready(Err(RecvError::Cancelled))),
"first poll reports cancelled",
"Poll::Ready(Err(Cancelled))",
format!("{first:?}")
);
let second = fut.as_mut().poll(&mut ctx);
crate::assert_with_log!(
matches!(second, Poll::Ready(Err(RecvError::PolledAfterCompletion))),
"second poll fails closed after cancellation",
"Poll::Ready(Err(PolledAfterCompletion))",
format!("{second:?}")
);
crate::test_complete!("recv_second_poll_after_cancelled_fails_closed");
}
#[test]
fn permit_send_after_last_receiver_drop_is_noop() {
init_test("permit_send_after_last_receiver_drop_is_noop");
let cx = test_cx();
let (tx, rx) = channel::<i32>(4);
let permit = tx.reserve(&cx).expect("reserve should succeed");
drop(rx);
let delivered = permit.send(42);
crate::assert_with_log!(delivered == 0, "delivered count", 0usize, delivered);
let inner = tx.channel.inner.lock();
crate::assert_with_log!(
inner.total_sent == 0,
"total_sent unchanged",
0u64,
inner.total_sent
);
crate::assert_with_log!(
inner.buffer.is_empty(),
"buffer remains empty",
true,
inner.buffer.is_empty()
);
drop(inner);
let closed = tx.send(&cx, 7);
crate::assert_with_log!(
matches!(closed, Err(SendError::Closed(7))),
"send sees closed after receiver drop",
"Err(Closed(7))",
format!("{closed:?}")
);
crate::test_complete!("permit_send_after_last_receiver_drop_is_noop");
}
#[test]
fn total_sent_advances_even_when_buffer_evicts() {
init_test("total_sent_advances_even_when_buffer_evicts");
let cx = test_cx();
let (tx, _rx) = channel::<i32>(2);
for i in 0..10 {
tx.send(&cx, i).unwrap();
}
let (total_sent, buffer_len, first_idx) = {
let inner = tx.channel.inner.lock();
(
inner.total_sent,
inner.buffer.len(),
inner.buffer.front().unwrap().index,
)
};
crate::assert_with_log!(total_sent == 10, "total_sent", 10u64, total_sent);
crate::assert_with_log!(buffer_len == 2, "buffer len", 2usize, buffer_len);
crate::assert_with_log!(first_idx == 8, "first buffer index", 8u64, first_idx);
crate::test_complete!("total_sent_advances_even_when_buffer_evicts");
}
#[test]
fn subscribe_from_lagged_position_gets_only_future() {
init_test("subscribe_from_lagged_position_gets_only_future");
let cx = test_cx();
let (tx, _rx) = channel::<i32>(4);
for i in 0..5 {
tx.send(&cx, i).unwrap();
}
let mut rx2 = tx.subscribe();
tx.send(&cx, 99).unwrap();
let got = block_on(rx2.recv(&cx)).unwrap();
crate::assert_with_log!(got == 99, "subscriber sees only future", 99, got);
crate::test_complete!("subscribe_from_lagged_position_gets_only_future");
}
#[test]
fn multiple_receivers_independent_lag() {
init_test("multiple_receivers_independent_lag");
let cx = test_cx();
let (tx, mut rx1) = channel::<i32>(2);
let mut rx2 = tx.subscribe();
tx.send(&cx, 1).unwrap();
tx.send(&cx, 2).unwrap();
let v = block_on(rx1.recv(&cx)).unwrap();
crate::assert_with_log!(v == 1, "rx1 reads 1", 1, v);
tx.send(&cx, 3).unwrap();
let v = block_on(rx1.recv(&cx)).unwrap();
crate::assert_with_log!(v == 2, "rx1 reads 2", 2, v);
let result = block_on(rx2.recv(&cx));
let lagged_ok = matches!(result, Err(RecvError::Lagged(1)));
crate::assert_with_log!(lagged_ok, "rx2 lagged by 1", true, lagged_ok);
crate::test_complete!("multiple_receivers_independent_lag");
}
#[test]
fn permit_send_returns_zero_after_all_receivers_drop() {
init_test("permit_send_returns_zero_after_all_receivers_drop");
let cx = test_cx();
let (tx, rx) = channel::<i32>(4);
let permit = tx.reserve(&cx).expect("reserve");
drop(rx);
let count = permit.send(42);
crate::assert_with_log!(count == 0, "no receivers", 0usize, count);
let (total_sent, buffer_empty) = {
let inner = tx.channel.inner.lock();
(inner.total_sent, inner.buffer.is_empty())
};
crate::assert_with_log!(total_sent == 0, "total_sent", 0u64, total_sent);
crate::assert_with_log!(buffer_empty, "buffer empty", true, buffer_empty);
crate::test_complete!("permit_send_returns_zero_after_all_receivers_drop");
}
#[test]
fn permit_send_does_not_commit_if_last_receiver_drops_while_waiting_for_lock() {
init_test("permit_send_does_not_commit_if_last_receiver_drops_while_waiting_for_lock");
let (tx, rx) = channel::<i32>(4);
let lock_guard = tx.channel.inner.lock();
let tx_thread = tx.clone();
let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel(1);
let (go_tx, go_rx) = std::sync::mpsc::sync_channel(1);
let (send_entered_tx, send_entered_rx) = std::sync::mpsc::sync_channel(1);
let handle = std::thread::spawn(move || {
let cx = test_cx();
let permit = tx_thread
.reserve(&cx)
.expect("reserve should succeed before receiver drop");
ready_tx.send(()).expect("ready send");
go_rx.recv().expect("go recv");
send_entered_tx.send(()).expect("send_entered send");
permit.send(99)
});
ready_rx.recv().expect("ready recv");
go_tx.send(()).expect("go send");
send_entered_rx.recv().expect("send_entered recv");
let drop_handle = std::thread::spawn(move || {
drop(rx);
});
while tx
.channel
.receiver_count
.load(std::sync::atomic::Ordering::Acquire)
> 0
{
std::thread::yield_now();
}
drop(lock_guard);
drop_handle.join().expect("drop thread panicked");
let delivered = handle.join().expect("sender thread panicked");
crate::assert_with_log!(
delivered == 0,
"delivered count after last receiver drop",
0usize,
delivered
);
let (total_sent, buffer_empty) = {
let inner = tx.channel.inner.lock();
(inner.total_sent, inner.buffer.is_empty())
};
crate::assert_with_log!(
total_sent == 0,
"total_sent unchanged after lock-contention drop race",
0u64,
total_sent
);
crate::assert_with_log!(
buffer_empty,
"buffer remains empty after lock-contention drop race",
true,
buffer_empty
);
crate::test_complete!(
"permit_send_does_not_commit_if_last_receiver_drops_while_waiting_for_lock"
);
}
#[test]
fn subscribe_reactivating_zero_receivers_drops_stale_buffer_outside_lock() {
init_test("subscribe_reactivating_zero_receivers_drops_stale_buffer_outside_lock");
let cx = test_cx();
let (tx, rx) = channel::<GateMsg>(1);
let (blocker, entered_rx, release_tx) = DropBlocker::new();
tx.send(&cx, GateMsg::Blocking(blocker))
.expect("send failed");
std::mem::forget(rx); tx.channel.receiver_count.store(0, Ordering::Release);
let tx_thread = tx.clone();
let handle = std::thread::spawn(move || tx_thread.subscribe());
let drop_started = entered_rx
.recv_timeout(std::time::Duration::from_secs(2))
.is_ok();
let lock_free_during_drop = drop_started && tx.channel.inner.try_lock().is_some();
let _ = release_tx.send(());
let mut rx2 = handle.join().expect("subscribe thread panicked");
crate::assert_with_log!(
drop_started,
"subscribe drops stale buffer",
true,
drop_started
);
crate::assert_with_log!(
lock_free_during_drop,
"subscribe drops stale buffer outside lock",
true,
lock_free_during_drop
);
tx.send(&cx, GateMsg::Plain(7)).expect("send failed");
let got = block_on(rx2.recv(&cx)).expect("recv failed");
crate::assert_with_log!(
matches!(got, GateMsg::Plain(7)),
"reactivated subscriber sees future message",
true,
matches!(got, GateMsg::Plain(7))
);
crate::test_complete!(
"subscribe_reactivating_zero_receivers_drops_stale_buffer_outside_lock"
);
}
#[test]
fn capacity_one_overwrites_correctly() {
init_test("capacity_one_overwrites_correctly");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(1);
tx.send(&cx, 1).unwrap();
tx.send(&cx, 2).unwrap(); tx.send(&cx, 3).unwrap();
let result = block_on(rx.recv(&cx));
let lagged_ok = matches!(result, Err(RecvError::Lagged(2)));
crate::assert_with_log!(lagged_ok, "lagged by 2", true, lagged_ok);
let got = block_on(rx.recv(&cx)).unwrap();
crate::assert_with_log!(got == 3, "last message", 3, got);
crate::test_complete!("capacity_one_overwrites_correctly");
}
#[test]
#[cfg(target_pointer_width = "32")]
fn recv_large_delta_does_not_truncate_offset() {
init_test("recv_large_delta_does_not_truncate_offset");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(2);
tx.send(&cx, 7).unwrap();
rx.next_index = u64::from(u32::MAX) + 1;
let wake_state = CountingWaker::new();
let waker = Waker::from(Arc::clone(&wake_state));
let mut ctx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
let pending = matches!(fut.as_mut().poll(&mut ctx), Poll::Pending);
crate::assert_with_log!(pending, "poll pending", true, pending);
crate::test_complete!("recv_large_delta_does_not_truncate_offset");
}
#[test]
fn send_error_debug_clone_eq_display() {
let e = SendError::Closed(42);
let dbg = format!("{e:?}");
assert!(dbg.contains("Closed"), "{dbg}");
assert!(dbg.contains("42"), "{dbg}");
let display = format!("{e}");
assert!(display.contains("closed broadcast channel"), "{display}");
let cloned = e.clone();
assert_eq!(cloned, e);
let err: &dyn std::error::Error = &e;
assert!(err.source().is_none());
}
#[test]
fn recv_error_debug_clone_copy_eq_display() {
let errors = [
RecvError::Lagged(5),
RecvError::Closed,
RecvError::Cancelled,
RecvError::PolledAfterCompletion,
];
let expected_display = [
"receiver lagged by 5 messages",
"broadcast channel closed",
"receive operation cancelled",
"broadcast receive future polled after completion",
];
for (e, expected) in errors.iter().zip(expected_display.iter()) {
let copied = *e;
let cloned = *e;
assert_eq!(copied, cloned);
assert!(!format!("{e:?}").is_empty());
assert_eq!(format!("{e}"), *expected);
}
assert_ne!(errors[0], errors[1]);
assert_ne!(errors[1], errors[2]);
}
#[test]
fn try_recv_empty_returns_empty() {
init_test("broadcast_try_recv_empty");
let (_tx, mut rx) = channel::<i32>(16);
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
}
#[test]
fn try_recv_returns_message() {
init_test("broadcast_try_recv_message");
let cx = test_cx();
let (tx, mut rx) = channel(16);
tx.send(&cx, 42).expect("send");
assert_eq!(rx.try_recv(), Ok(42));
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
}
#[test]
fn try_recv_fifo_ordering() {
init_test("broadcast_try_recv_fifo");
let cx = test_cx();
let (tx, mut rx) = channel(16);
tx.send(&cx, 1).expect("send 1");
tx.send(&cx, 2).expect("send 2");
tx.send(&cx, 3).expect("send 3");
assert_eq!(rx.try_recv(), Ok(1));
assert_eq!(rx.try_recv(), Ok(2));
assert_eq!(rx.try_recv(), Ok(3));
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
}
#[test]
fn try_recv_closed_after_drain() {
init_test("broadcast_try_recv_closed");
let cx = test_cx();
let (tx, mut rx) = channel(16);
tx.send(&cx, 99).expect("send");
drop(tx);
assert_eq!(rx.try_recv(), Ok(99));
assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
}
#[test]
fn try_recv_lagged_receiver() {
init_test("broadcast_try_recv_lagged");
let cx = test_cx();
let (tx, mut rx) = channel(2);
tx.send(&cx, 10).expect("send 1");
tx.send(&cx, 20).expect("send 2");
tx.send(&cx, 30).expect("send 3");
let err = rx.try_recv();
assert_eq!(err, Err(TryRecvError::Lagged(1)));
assert_eq!(rx.try_recv(), Ok(20));
assert_eq!(rx.try_recv(), Ok(30));
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
}
#[test]
fn receiver_count_tracks_active_receivers() {
init_test("broadcast_receiver_count");
let (tx, rx1) = channel::<i32>(16);
assert_eq!(tx.receiver_count(), 1);
let rx2 = tx.subscribe();
assert_eq!(tx.receiver_count(), 2);
let rx3 = rx1.clone();
assert_eq!(tx.receiver_count(), 3);
drop(rx2);
assert_eq!(tx.receiver_count(), 2);
drop(rx1);
drop(rx3);
assert_eq!(tx.receiver_count(), 0);
}
#[test]
fn len_tracks_buffered_messages() {
init_test("broadcast_len");
let cx = test_cx();
let (tx, mut rx) = channel(16);
assert_eq!(tx.len(), 0);
assert!(tx.is_empty());
tx.send(&cx, 1).expect("send 1");
assert_eq!(tx.len(), 1);
assert!(!tx.is_empty());
tx.send(&cx, 2).expect("send 2");
assert_eq!(tx.len(), 2);
let _ = rx.try_recv();
assert!(tx.len() >= 1);
}
#[test]
fn len_caps_at_capacity_on_eviction() {
init_test("broadcast_len_cap");
let cx = test_cx();
let (tx, _rx) = channel::<i32>(3);
tx.send(&cx, 1).expect("send 1");
tx.send(&cx, 2).expect("send 2");
tx.send(&cx, 3).expect("send 3");
assert_eq!(tx.len(), 3);
tx.send(&cx, 4).expect("send 4");
assert_eq!(tx.len(), 3);
}
#[test]
fn try_recv_error_traits() {
init_test("broadcast_try_recv_error_traits");
let errors = [
TryRecvError::Empty,
TryRecvError::Lagged(5),
TryRecvError::Closed,
];
let expected = [
"broadcast channel empty",
"receiver lagged by 5 messages",
"broadcast channel closed",
];
for (e, exp) in errors.iter().zip(expected.iter()) {
let copied = *e;
let cloned = *e;
assert_eq!(copied, cloned);
assert!(!format!("{e:?}").is_empty());
assert_eq!(format!("{e}"), *exp);
}
assert_ne!(errors[0], errors[1]);
assert_ne!(errors[1], errors[2]);
let _ = <TryRecvError as std::error::Error>::source(&errors[0]);
}
#[derive(Debug, PartialEq, Eq)]
struct BurstWraparoundSnapshot {
lagged_by: u64,
retained_values: Vec<i32>,
retained_indices: Vec<u64>,
total_sent: u64,
earliest_index: u64,
}
fn capture_burst_wraparound_snapshot(
capacity: usize,
burst_chunks: &[usize],
alternate_senders: bool,
drain_fast_receiver_each_chunk: bool,
) -> (BurstWraparoundSnapshot, Vec<i32>) {
let cx = test_cx();
let (tx_primary, mut slow_rx) = channel::<i32>(capacity);
let tx_secondary = tx_primary.clone();
let mut fast_rx = drain_fast_receiver_each_chunk.then(|| tx_primary.subscribe());
let mut fast_sequence = Vec::new();
let mut next_value = 0i32;
for &chunk_len in burst_chunks {
for _ in 0..chunk_len {
let sender = if alternate_senders && next_value % 2 != 0 {
&tx_secondary
} else {
&tx_primary
};
sender.send(&cx, next_value).expect("send burst value");
next_value += 1;
}
if let Some(rx) = fast_rx.as_mut() {
for _ in 0..chunk_len {
fast_sequence.push(block_on(rx.recv(&cx)).expect("fast receiver keeps up"));
}
}
}
let (total_sent, earliest_index, retained_indices) = {
let inner = tx_primary.channel.inner.lock();
(
inner.total_sent,
inner
.buffer
.front()
.map_or(inner.total_sent, |slot| slot.index),
inner
.buffer
.iter()
.map(|slot| slot.index)
.collect::<Vec<_>>(),
)
};
let lagged_by = match slow_rx.try_recv() {
Err(TryRecvError::Lagged(n)) => n,
other => panic!("expected lagged receiver after burst wraparound, got {other:?}"),
};
let mut retained_values = Vec::new();
loop {
match slow_rx.try_recv() {
Ok(value) => retained_values.push(value),
Err(TryRecvError::Empty) => break,
Err(other) => panic!("expected retained suffix or empty after lag, got {other:?}"),
}
}
(
BurstWraparoundSnapshot {
lagged_by,
retained_values,
retained_indices,
total_sent,
earliest_index,
},
fast_sequence,
)
}
#[test]
fn metamorphic_order_preservation_across_receivers() {
init_test("metamorphic_order_preservation_across_receivers");
let cx = test_cx();
for num_receivers in 1..=5 {
for num_messages in 2..=10 {
let (tx, mut receivers) = {
let (tx, rx1) = channel::<i32>(num_messages + 2); let mut receivers = vec![rx1];
for _ in 1..num_receivers {
receivers.push(tx.subscribe());
}
(tx, receivers)
};
let messages: Vec<i32> = (0..num_messages).map(|x| x as i32).collect();
for &msg in &messages {
tx.send(&cx, msg).expect("send");
}
let mut sequences = Vec::new();
for rx in &mut receivers {
let mut sequence = Vec::new();
for _ in 0..num_messages {
match block_on(rx.recv(&cx)) {
Ok(msg) => sequence.push(msg),
Err(e) => panic!("Unexpected recv error: {:?}", e),
}
}
sequences.push(sequence);
}
let first_sequence = &sequences[0];
for (i, sequence) in sequences.iter().enumerate() {
crate::assert_with_log!(
sequence == first_sequence,
format!(
"order preservation rx{} vs rx0 ({}rx, {}msg)",
i, num_receivers, num_messages
),
first_sequence.clone(),
sequence.clone()
);
}
crate::assert_with_log!(
first_sequence == &messages,
format!(
"order matches send order ({}rx, {}msg)",
num_receivers, num_messages
),
messages,
first_sequence.clone()
);
}
}
crate::test_complete!("metamorphic_order_preservation_across_receivers");
}
#[test]
fn metamorphic_lag_behavior_correctness() {
init_test("metamorphic_lag_behavior_correctness");
let cx = test_cx();
for capacity in 2..=6 {
for overrun in 1..=8 {
let (tx, mut rx_slow) = channel::<i32>(capacity);
let mut rx_fast = tx.subscribe();
for i in 0..capacity {
tx.send(&cx, i as i32).expect("send");
}
for _ in 0..capacity {
block_on(rx_fast.recv(&cx)).expect("fast recv");
}
for i in 0..overrun {
tx.send(&cx, (capacity + i) as i32).expect("send overrun");
}
let lag_result = block_on(rx_slow.recv(&cx));
match lag_result {
Err(RecvError::Lagged(n)) => {
crate::assert_with_log!(
n == overrun as u64,
format!("lag count (cap={}, overrun={})", capacity, overrun),
overrun as u64,
n
);
}
other => panic!("Expected Lagged({}) but got: {:?}", overrun, other),
}
let remaining_count = std::cmp::min(capacity, overrun);
let start_msg = capacity + overrun - remaining_count;
for i in 0..remaining_count {
let received = block_on(rx_slow.recv(&cx)).expect("post-lag recv");
let expected = (start_msg + i) as i32;
crate::assert_with_log!(
received == expected,
format!(
"post-lag message {} (cap={}, overrun={})",
i, capacity, overrun
),
expected,
received
);
}
}
}
crate::test_complete!("metamorphic_lag_behavior_correctness");
}
#[test]
fn metamorphic_slot_wraparound_under_burst_preserves_suffix() {
init_test("metamorphic_slot_wraparound_under_burst_preserves_suffix");
for capacity in 2..=6usize {
for wraps in 1..=4usize {
let total_messages = capacity * (wraps + 1) + 1;
let expected_lag = (total_messages - capacity) as u64;
let expected_suffix: Vec<i32> =
((total_messages - capacity) as i32..total_messages as i32).collect();
let expected_indices: Vec<u64> =
((total_messages - capacity) as u64..total_messages as u64).collect();
let mut remaining = total_messages;
let mut chunk_seed = wraps + 1;
let mut irregular_chunks = Vec::new();
while remaining > 0 {
let next = ((chunk_seed * 3) % (capacity + 2)).max(1);
let chunk_len = next.min(remaining);
irregular_chunks.push(chunk_len);
remaining -= chunk_len;
chunk_seed += 1;
}
let (base, _) =
capture_burst_wraparound_snapshot(capacity, &[total_messages], false, false);
let (chunked, _) =
capture_burst_wraparound_snapshot(capacity, &irregular_chunks, false, false);
let (perturbed, fast_sequence) =
capture_burst_wraparound_snapshot(capacity, &irregular_chunks, true, true);
crate::assert_with_log!(
base.lagged_by == expected_lag,
format!("base lag count (cap={}, wraps={})", capacity, wraps),
expected_lag,
base.lagged_by
);
crate::assert_with_log!(
base.retained_values == expected_suffix,
format!("base suffix (cap={}, wraps={})", capacity, wraps),
expected_suffix.clone(),
base.retained_values.clone()
);
crate::assert_with_log!(
base.retained_indices == expected_indices,
format!("base indices (cap={}, wraps={})", capacity, wraps),
expected_indices.clone(),
base.retained_indices.clone()
);
crate::assert_with_log!(
base.total_sent == total_messages as u64,
format!("base total_sent (cap={}, wraps={})", capacity, wraps),
total_messages as u64,
base.total_sent
);
crate::assert_with_log!(
base.earliest_index == (total_messages - capacity) as u64,
format!("base earliest index (cap={}, wraps={})", capacity, wraps),
(total_messages - capacity) as u64,
base.earliest_index
);
crate::assert_with_log!(
chunked == base,
format!(
"chunked burst matches base (cap={}, wraps={})",
capacity, wraps
),
format!("{base:?}"),
format!("{chunked:?}")
);
crate::assert_with_log!(
perturbed == base,
format!(
"sender/receiver perturbation matches base (cap={}, wraps={})",
capacity, wraps
),
format!("{base:?}"),
format!("{perturbed:?}")
);
crate::assert_with_log!(
fast_sequence == (0..total_messages as i32).collect::<Vec<_>>(),
format!(
"fast receiver keeps full order (cap={}, wraps={})",
capacity, wraps
),
(0..total_messages as i32).collect::<Vec<_>>(),
fast_sequence
);
}
}
crate::test_complete!("metamorphic_slot_wraparound_under_burst_preserves_suffix");
}
#[test]
fn metamorphic_midstream_subscription_isolation() {
init_test("metamorphic_midstream_subscription_isolation");
let cx = test_cx();
for total_messages in 5..=15 {
for split_point in 1..total_messages {
let (tx, mut rx_early) = channel::<i32>(total_messages + 2);
for i in 0..split_point {
tx.send(&cx, i as i32).expect("send pre");
}
let mut rx_late = tx.subscribe();
for i in split_point..total_messages {
tx.send(&cx, i as i32).expect("send post");
}
let mut early_sequence = Vec::new();
let mut late_sequence = Vec::new();
for _ in 0..total_messages {
early_sequence.push(block_on(rx_early.recv(&cx)).expect("early recv"));
}
for _ in split_point..total_messages {
late_sequence.push(block_on(rx_late.recv(&cx)).expect("late recv"));
}
let expected_late: Vec<i32> = (split_point as i32..total_messages as i32).collect();
let expected_early: Vec<i32> = (0..total_messages as i32).collect();
crate::assert_with_log!(
early_sequence == expected_early,
format!(
"early receiver sees all (split={}/{})",
split_point, total_messages
),
expected_early,
early_sequence
);
crate::assert_with_log!(
late_sequence == expected_late,
format!(
"late receiver sees subset (split={}/{})",
split_point, total_messages
),
expected_late,
late_sequence
);
let early_suffix = &early_sequence[split_point..];
crate::assert_with_log!(
late_sequence == early_suffix,
format!(
"late sequence is suffix of early (split={}/{})",
split_point, total_messages
),
early_suffix.to_vec(),
late_sequence
);
}
}
crate::test_complete!("metamorphic_midstream_subscription_isolation");
}
#[test]
fn metamorphic_close_propagation() {
init_test("metamorphic_close_propagation");
let cx = test_cx();
for num_senders in 1..=4 {
for num_receivers in 1..=4 {
let (tx1, mut receivers) = {
let (tx, rx1) = channel::<i32>(10);
let mut receivers = vec![rx1];
for _ in 1..num_receivers {
receivers.push(tx.subscribe());
}
(tx, receivers)
};
let mut senders = vec![tx1];
for _ in 1..num_senders {
senders.push(senders[0].clone());
}
for i in 0..3 {
senders[i % num_senders].send(&cx, i as i32).expect("send");
}
for _ in 0..num_senders - 1 {
senders.pop();
}
for rx in &mut receivers {
for _ in 0..3 {
block_on(rx.recv(&cx)).expect("recv before close");
}
}
senders.pop();
assert!(senders.is_empty());
for (i, rx) in receivers.iter_mut().enumerate() {
let result = block_on(rx.recv(&cx));
crate::assert_with_log!(
matches!(result, Err(RecvError::Closed)),
format!(
"receiver {} closed ({} senders, {} receivers)",
i, num_senders, num_receivers
),
true,
matches!(result, Err(RecvError::Closed))
);
}
}
}
crate::test_complete!("metamorphic_close_propagation");
}
#[test]
fn metamorphic_waker_deduplication() {
init_test("metamorphic_waker_deduplication");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>(10);
let wake_state = CountingWaker::new();
let waker = Waker::from(Arc::clone(&wake_state));
let mut ctx = Context::from_waker(&waker);
for scenario in 0..5 {
if scenario > 0 {
drop(rx);
rx = tx.subscribe();
}
let mut fut = Box::pin(rx.recv(&cx));
let _ = fut.as_mut().poll(&mut ctx);
let _ = fut.as_mut().poll(&mut ctx);
let _ = fut.as_mut().poll(&mut ctx);
let waker_count = {
let inner = tx.channel.inner.lock();
inner.wakers.len()
};
crate::assert_with_log!(
waker_count <= 1,
format!("waker dedup scenario {} (single receiver)", scenario),
"≤ 1".to_string(),
format!("{}", waker_count)
);
let wake_count_before = wake_state.wake_count();
tx.send(&cx, scenario as i32).expect("send");
let wake_count_after = wake_state.wake_count();
let wake_delta = wake_count_after - wake_count_before;
crate::assert_with_log!(
wake_delta <= 1,
format!("wake count scenario {} (single receiver)", scenario),
"≤ 1".to_string(),
format!("{}", wake_delta)
);
drop(fut);
block_on(rx.recv(&cx)).expect("cleanup recv");
}
crate::test_complete!("metamorphic_waker_deduplication");
}
#[test]
fn metamorphic_composite_stress_test() {
init_test("metamorphic_composite_stress_test");
let cx = test_cx();
let (tx, mut rx_fast) = channel::<i32>(4);
let mut rx_slow = tx.subscribe();
let mut rx_mid = tx.subscribe();
let initial_messages = vec![10, 20, 30, 40];
for &msg in &initial_messages {
tx.send(&cx, msg).expect("send initial");
}
let mut fast_sequence = Vec::new();
for _ in 0..4 {
fast_sequence.push(block_on(rx_fast.recv(&cx)).expect("fast recv"));
}
for i in 0..6 {
tx.send(&cx, 100 + i).expect("send overrun");
}
drop(tx);
crate::assert_with_log!(
fast_sequence == initial_messages,
"composite: initial order preserved",
initial_messages,
fast_sequence
);
let slow_lag_result = block_on(rx_slow.recv(&cx));
let got_lag = matches!(slow_lag_result, Err(RecvError::Lagged(_)));
crate::assert_with_log!(got_lag, "composite: slow receiver lagged", true, got_lag);
let mid_close_result = loop {
match block_on(rx_mid.recv(&cx)) {
Ok(_) => continue, Err(e) => break e,
}
};
crate::assert_with_log!(
matches!(mid_close_result, RecvError::Closed),
"composite: mid receiver closed",
true,
matches!(mid_close_result, RecvError::Closed)
);
crate::test_complete!("metamorphic_composite_stress_test");
}
}