use crate::cx::Cx;
use parking_lot::Mutex;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SendError<T> {
Disconnected(T),
}
impl<T> std::fmt::Display for SendError<T> {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Disconnected(_) => write!(f, "sending on a closed oneshot channel"),
}
}
}
impl<T: std::fmt::Debug> std::error::Error for SendError<T> {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecvError {
Closed,
Cancelled,
PolledAfterCompletion,
}
impl std::fmt::Display for RecvError {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Closed => write!(f, "receiving on a closed oneshot channel"),
Self::Cancelled => write!(f, "receive operation cancelled"),
Self::PolledAfterCompletion => write!(f, "oneshot recv future polled after completion"),
}
}
}
impl std::error::Error for RecvError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TryRecvError {
Empty,
Closed,
}
impl std::fmt::Display for TryRecvError {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Empty => write!(f, "oneshot channel is empty"),
Self::Closed => write!(f, "oneshot channel is closed"),
}
}
}
impl std::error::Error for TryRecvError {}
#[derive(Debug)]
struct OneShotInner<T> {
value: Option<T>,
sender_consumed: bool,
receiver_dropped: bool,
permit_outstanding: bool,
waker: Option<Waker>,
waker_id: Option<u64>,
next_waiter_id: u64,
}
impl<T> OneShotInner<T> {
#[inline]
fn new() -> Self {
Self {
value: None,
sender_consumed: false,
receiver_dropped: false,
permit_outstanding: false,
waker: None,
waker_id: None,
next_waiter_id: 0,
}
}
#[inline]
fn is_closed(&self) -> bool {
self.sender_consumed && !self.permit_outstanding && self.value.is_none()
}
#[inline]
fn is_ready(&self) -> bool {
self.value.is_some()
}
#[inline]
fn clear_waker(&mut self) {
self.waker = None;
self.waker_id = None;
}
#[inline]
fn take_waker(&mut self) -> Option<Waker> {
self.waker_id = None;
self.waker.take()
}
}
#[inline]
#[must_use]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Mutex::new(OneShotInner::new()));
(
Sender {
inner: Arc::clone(&inner),
},
Receiver { inner },
)
}
#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<Mutex<OneShotInner<T>>>,
}
impl<T> Sender<T> {
#[must_use]
#[inline]
pub fn reserve(self, cx: &Cx) -> SendPermit<T> {
cx.trace("oneshot::reserve creating permit");
{
let mut inner = self.inner.lock();
inner.sender_consumed = true;
inner.permit_outstanding = true;
}
SendPermit {
inner: Arc::clone(&self.inner),
sent: false,
}
}
#[inline]
pub fn send(self, cx: &Cx, value: T) -> Result<(), SendError<T>> {
let permit = self.reserve(cx);
permit.send(value)
}
#[inline]
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.lock().receiver_dropped
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let waker = {
let mut inner = self.inner.lock();
if inner.sender_consumed {
None
} else {
inner.sender_consumed = true;
inner.take_waker()
}
};
if let Some(waker) = waker {
waker.wake();
}
}
}
#[derive(Debug)]
pub struct SendPermit<T> {
inner: Arc<Mutex<OneShotInner<T>>>,
sent: bool,
}
impl<T> SendPermit<T> {
#[inline]
pub fn send(mut self, value: T) -> Result<(), SendError<T>> {
let (result, waker) = {
let mut inner = self.inner.lock();
if inner.receiver_dropped {
inner.permit_outstanding = false;
inner.clear_waker();
drop(inner);
(Err(value), None)
} else {
inner.value = Some(value);
inner.permit_outstanding = false;
let waker = inner.take_waker();
drop(inner);
(Ok(()), waker)
}
};
if let Some(waker) = waker {
waker.wake();
}
self.sent = true;
result.map_err(SendError::Disconnected)
}
#[inline]
pub fn abort(mut self) {
let waker = {
let mut inner = self.inner.lock();
inner.permit_outstanding = false;
inner.take_waker()
};
self.sent = true; if let Some(waker) = waker {
waker.wake();
}
}
#[inline]
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.lock().receiver_dropped
}
}
impl<T> Drop for SendPermit<T> {
fn drop(&mut self) {
if !self.sent {
let waker = {
let mut inner = self.inner.lock();
inner.permit_outstanding = false;
inner.take_waker()
};
if let Some(waker) = waker {
waker.wake();
}
}
}
}
pub(crate) struct RecvUninterruptibleFuture<'a, T> {
receiver: &'a mut Receiver<T>,
waiter_id: Option<u64>,
completed: bool,
}
impl<T> RecvUninterruptibleFuture<'_, T> {
#[must_use]
#[inline]
pub(crate) fn receiver_finished(&self) -> bool {
self.completed || self.receiver.is_ready() || self.receiver.is_closed()
}
}
impl<T> Future for RecvUninterruptibleFuture<'_, T> {
type Output = Result<T, RecvError>;
#[inline]
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
if this.completed {
return Poll::Ready(Err(RecvError::PolledAfterCompletion));
}
let mut inner = this.receiver.inner.lock();
if let Some(value) = inner.value.take() {
inner.clear_waker();
this.waiter_id = None;
this.completed = true;
drop(inner);
return Poll::Ready(Ok(value));
}
if inner.is_closed() {
inner.clear_waker();
this.waiter_id = None;
this.completed = true;
drop(inner);
return Poll::Ready(Err(RecvError::Closed));
}
if let Some(my_id) = this.waiter_id {
if inner.waker_id == Some(my_id) {
if let Some(existing) = &inner.waker {
if !existing.will_wake(ctx.waker()) {
inner.waker = Some(ctx.waker().clone());
}
} else {
inner.waker = Some(ctx.waker().clone());
}
} else {
let waiter_id = inner.next_waiter_id;
inner.next_waiter_id = inner.next_waiter_id.wrapping_add(1);
inner.waker = Some(ctx.waker().clone());
inner.waker_id = Some(waiter_id);
this.waiter_id = Some(waiter_id);
}
} else {
let waiter_id = inner.next_waiter_id;
inner.next_waiter_id = inner.next_waiter_id.wrapping_add(1);
inner.waker = Some(ctx.waker().clone());
inner.waker_id = Some(waiter_id);
this.waiter_id = Some(waiter_id);
}
drop(inner);
Poll::Pending
}
}
impl<T> Drop for RecvUninterruptibleFuture<'_, T> {
fn drop(&mut self) {
{
let mut inner = self.receiver.inner.lock();
if self
.waiter_id
.is_some_and(|waiter_id| inner.waker_id == Some(waiter_id))
{
inner.clear_waker();
}
}
self.waiter_id = None;
}
}
pub struct RecvFuture<'a, T> {
receiver: &'a mut Receiver<T>,
cx: &'a Cx,
waiter_id: Option<u64>,
completed: bool,
}
impl<T> RecvFuture<'_, T> {
#[must_use]
#[allow(dead_code)] #[inline]
pub(crate) fn receiver_finished(&self) -> bool {
self.completed || self.receiver.is_ready() || self.receiver.is_closed()
}
}
impl<T> Future for RecvFuture<'_, T> {
type Output = Result<T, RecvError>;
#[inline]
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
if this.completed {
return Poll::Ready(Err(RecvError::PolledAfterCompletion));
}
let mut inner = this.receiver.inner.lock();
if let Some(value) = inner.value.take() {
inner.clear_waker();
this.waiter_id = None;
this.completed = true;
drop(inner);
this.cx.trace("oneshot::recv received value");
return Poll::Ready(Ok(value));
}
if inner.is_closed() {
inner.clear_waker();
this.waiter_id = None;
this.completed = true;
drop(inner);
this.cx.trace("oneshot::recv channel closed");
return Poll::Ready(Err(RecvError::Closed));
}
if this.cx.checkpoint().is_err() {
if this
.waiter_id
.is_some_and(|waiter_id| inner.waker_id == Some(waiter_id))
{
inner.clear_waker();
}
this.waiter_id = None;
this.completed = true;
drop(inner);
this.cx.trace("oneshot::recv cancelled while waiting");
return Poll::Ready(Err(RecvError::Cancelled));
}
if let Some(my_id) = this.waiter_id {
if inner.waker_id == Some(my_id) {
if let Some(existing) = &inner.waker {
if !existing.will_wake(ctx.waker()) {
inner.waker = Some(ctx.waker().clone());
}
} else {
inner.waker = Some(ctx.waker().clone());
}
} else {
let waiter_id = inner.next_waiter_id;
inner.next_waiter_id = inner.next_waiter_id.wrapping_add(1);
inner.waker = Some(ctx.waker().clone());
inner.waker_id = Some(waiter_id);
this.waiter_id = Some(waiter_id);
}
} else {
let waiter_id = inner.next_waiter_id;
inner.next_waiter_id = inner.next_waiter_id.wrapping_add(1);
inner.waker = Some(ctx.waker().clone());
inner.waker_id = Some(waiter_id);
this.waiter_id = Some(waiter_id);
}
drop(inner);
Poll::Pending
}
}
impl<T> Drop for RecvFuture<'_, T> {
fn drop(&mut self) {
{
let mut inner = self.receiver.inner.lock();
if self
.waiter_id
.is_some_and(|waiter_id| inner.waker_id == Some(waiter_id))
{
inner.clear_waker();
}
}
self.waiter_id = None;
}
}
#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<Mutex<OneShotInner<T>>>,
}
impl<T> Receiver<T> {
#[inline]
#[must_use]
pub fn recv<'a>(&'a mut self, cx: &'a Cx) -> RecvFuture<'a, T> {
RecvFuture {
receiver: self,
cx,
waiter_id: None,
completed: false,
}
}
#[must_use]
#[inline]
pub(crate) fn recv_uninterruptible(&mut self) -> RecvUninterruptibleFuture<'_, T> {
RecvUninterruptibleFuture {
receiver: self,
waiter_id: None,
completed: false,
}
}
#[inline]
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let mut inner = self.inner.lock();
if let Some(value) = inner.value.take() {
inner.clear_waker();
drop(inner);
return Ok(value);
}
if inner.is_closed() {
inner.clear_waker();
drop(inner);
return Err(TryRecvError::Closed);
}
Err(TryRecvError::Empty)
}
#[inline]
#[must_use]
pub fn is_ready(&self) -> bool {
self.inner.lock().is_ready()
}
#[inline]
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.lock().is_closed()
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let _value = {
let mut inner = self.inner.lock();
inner.receiver_dropped = true;
inner.clear_waker();
inner.value.take()
};
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::Budget;
use crate::util::ArenaIndex;
use crate::{RegionId, TaskId};
use proptest::prelude::*;
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn test_cx() -> Cx {
Cx::new(
RegionId::from_arena(ArenaIndex::new(0, 0)),
TaskId::from_arena(ArenaIndex::new(0, 0)),
Budget::INFINITE,
)
}
fn block_on<F: Future>(f: F) -> F::Output {
let waker = Waker::from(std::sync::Arc::new(TestNoopWaker));
let mut cx = Context::from_waker(&waker);
let mut pinned = Box::pin(f);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
#[derive(Debug)]
struct NonClone(i32);
#[derive(Debug)]
struct TestNoopWaker;
impl std::task::Wake for TestNoopWaker {
fn wake(self: std::sync::Arc<Self>) {}
}
struct CountWaker(Arc<AtomicUsize>);
impl std::task::Wake for CountWaker {
fn wake(self: std::sync::Arc<Self>) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
fn counting_waker(counter: Arc<AtomicUsize>) -> Waker {
Waker::from(Arc::new(CountWaker(counter)))
}
#[derive(Debug, Clone, Copy)]
enum SendScenario {
LiveNoWaiter,
LivePendingWaiter,
ReceiverDropped,
}
fn send_scenario_strategy() -> impl Strategy<Value = SendScenario> {
prop_oneof![
Just(SendScenario::LiveNoWaiter),
Just(SendScenario::LivePendingWaiter),
Just(SendScenario::ReceiverDropped),
]
}
fn send_path_signature(
reserve_first: bool,
scenario: SendScenario,
value: i32,
) -> (
bool,
Option<i32>,
usize,
&'static str,
Option<i32>,
bool,
bool,
) {
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let inner = Arc::clone(&rx.inner);
let wake_counter = Arc::new(AtomicUsize::new(0));
let (send_ok, disconnected_value, recv_state, recv_value) = match scenario {
SendScenario::LiveNoWaiter => {
let send_result = if reserve_first {
tx.reserve(&cx).send(value)
} else {
tx.send(&cx, value)
};
let (send_ok, disconnected_value) = match send_result {
Ok(()) => (true, None),
Err(SendError::Disconnected(v)) => (false, Some(v)),
};
let (recv_state, recv_value) = match rx.try_recv() {
Ok(v) => ("value", Some(v)),
Err(TryRecvError::Empty) => ("empty", None),
Err(TryRecvError::Closed) => ("closed", None),
};
(send_ok, disconnected_value, recv_state, recv_value)
}
SendScenario::LivePendingWaiter => {
let recv_waker = counting_waker(Arc::clone(&wake_counter));
let mut task_cx = Context::from_waker(&recv_waker);
let mut fut = Box::pin(rx.recv(&cx));
assert!(matches!(fut.as_mut().poll(&mut task_cx), Poll::Pending));
let send_result = if reserve_first {
tx.reserve(&cx).send(value)
} else {
tx.send(&cx, value)
};
let (send_ok, disconnected_value) = match send_result {
Ok(()) => (true, None),
Err(SendError::Disconnected(v)) => (false, Some(v)),
};
let (recv_state, recv_value) = match fut.as_mut().poll(&mut task_cx) {
Poll::Ready(Ok(v)) => ("value", Some(v)),
Poll::Ready(Err(RecvError::Closed)) => ("closed", None),
Poll::Ready(Err(RecvError::Cancelled)) => ("cancelled", None),
Poll::Ready(Err(RecvError::PolledAfterCompletion)) => ("repoll", None),
Poll::Pending => ("pending", None),
};
drop(fut);
(send_ok, disconnected_value, recv_state, recv_value)
}
SendScenario::ReceiverDropped => {
drop(rx);
let send_result = if reserve_first {
tx.reserve(&cx).send(value)
} else {
tx.send(&cx, value)
};
let (send_ok, disconnected_value) = match send_result {
Ok(()) => (true, None),
Err(SendError::Disconnected(v)) => (false, Some(v)),
};
(send_ok, disconnected_value, "receiver-dropped", None)
}
};
let inner = inner.lock();
(
send_ok,
disconnected_value,
wake_counter.load(Ordering::SeqCst),
recv_state,
recv_value,
inner.waker.is_none(),
inner.is_closed(),
)
}
#[test]
fn basic_send_recv() {
init_test("basic_send_recv");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
tx.send(&cx, 42).expect("send should succeed");
let value = block_on(rx.recv(&cx)).expect("recv should succeed");
crate::assert_with_log!(value == 42, "recv value", 42, value);
crate::test_complete!("basic_send_recv");
}
proptest! {
#[test]
fn metamorphic_send_matches_reserve_send_atomicity(
scenario in send_scenario_strategy(),
value in any::<i16>(),
) {
let value = i32::from(value);
let direct_signature = send_path_signature(false, scenario, value);
let reserved_signature = send_path_signature(true, scenario, value);
prop_assert_eq!(
direct_signature,
reserved_signature,
"oneshot convenience send must match explicit reserve().send() semantics",
);
match scenario {
SendScenario::LiveNoWaiter => {
prop_assert!(direct_signature.0, "live receiver should accept the send");
prop_assert_eq!(direct_signature.2, 0, "no waiter means no wakeup");
prop_assert_eq!(direct_signature.3, "value");
prop_assert_eq!(direct_signature.4, Some(value));
prop_assert!(direct_signature.5, "terminal receive path clears stale waker");
prop_assert!(direct_signature.6, "channel should be closed after value is consumed");
}
SendScenario::LivePendingWaiter => {
prop_assert!(direct_signature.0, "live pending waiter should accept the send");
prop_assert_eq!(direct_signature.2, 1, "pending waiter should be woken exactly once");
prop_assert_eq!(direct_signature.3, "value");
prop_assert_eq!(direct_signature.4, Some(value));
prop_assert!(direct_signature.5, "recv completion clears the waiter slot");
prop_assert!(direct_signature.6, "channel should be closed after waiter consumes the value");
}
SendScenario::ReceiverDropped => {
prop_assert!(!direct_signature.0, "dropped receiver must reject the send");
prop_assert_eq!(direct_signature.1, Some(value), "disconnected send returns ownership of the value");
prop_assert_eq!(direct_signature.2, 0, "no receiver means no wakeup");
prop_assert_eq!(direct_signature.3, "receiver-dropped");
prop_assert!(direct_signature.5, "disconnected send path clears any stale waker");
prop_assert!(direct_signature.6, "sender-consumed disconnected channel is closed");
}
}
}
}
#[test]
fn reserve_then_send() {
init_test("reserve_then_send");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let permit = tx.reserve(&cx);
permit.send(42).expect("send should succeed");
let value = block_on(rx.recv(&cx)).expect("recv should succeed");
crate::assert_with_log!(value == 42, "recv value", 42, value);
crate::test_complete!("reserve_then_send");
}
#[test]
fn reserve_then_abort() {
init_test("reserve_then_abort");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let permit = tx.reserve(&cx);
permit.abort();
let err = rx.try_recv();
crate::assert_with_log!(
matches!(err, Err(TryRecvError::Closed)),
"try_recv closed",
"Err(Closed)",
format!("{:?}", err)
);
crate::test_complete!("reserve_then_abort");
}
#[test]
fn permit_drop_is_abort() {
init_test("permit_drop_is_abort");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
{
let _permit = tx.reserve(&cx);
}
let err = rx.try_recv();
crate::assert_with_log!(
matches!(err, Err(TryRecvError::Closed)),
"try_recv closed",
"Err(Closed)",
format!("{:?}", err)
);
crate::test_complete!("permit_drop_is_abort");
}
#[test]
fn sender_dropped_without_send() {
init_test("sender_dropped_without_send");
let (tx, mut rx) = channel::<i32>();
drop(tx);
let err = rx.try_recv();
crate::assert_with_log!(
matches!(err, Err(TryRecvError::Closed)),
"try_recv closed",
"Err(Closed)",
format!("{:?}", err)
);
crate::test_complete!("sender_dropped_without_send");
}
#[test]
fn receiver_dropped_before_send() {
init_test("receiver_dropped_before_send");
let cx = test_cx();
let (tx, rx) = channel::<i32>();
drop(rx);
let closed = tx.is_closed();
crate::assert_with_log!(closed, "sender closed", true, closed);
let err = tx.send(&cx, 42);
crate::assert_with_log!(
matches!(err, Err(SendError::Disconnected(42))),
"send disconnected",
"Err(Disconnected(42))",
format!("{:?}", err)
);
crate::test_complete!("receiver_dropped_before_send");
}
#[test]
fn receiver_drop_clears_leftover_waiter_state() {
init_test("receiver_drop_clears_leftover_waiter_state");
let (_tx, rx) = channel::<i32>();
let inner = Arc::clone(&rx.inner);
{
let mut guard = inner.lock();
guard.waker = Some(std::task::Waker::noop().clone());
guard.waker_id = Some(7);
}
drop(rx);
let guard = inner.lock();
crate::assert_with_log!(
guard.receiver_dropped,
"receiver marked dropped",
true,
guard.receiver_dropped
);
crate::assert_with_log!(
guard.waker.is_none(),
"receiver drop clears leftover waker",
true,
guard.waker.is_none()
);
crate::assert_with_log!(
guard.waker_id.is_none(),
"receiver drop clears waiter identity",
true,
guard.waker_id.is_none()
);
drop(guard);
crate::test_complete!("receiver_drop_clears_leftover_waiter_state");
}
#[test]
fn try_recv_empty() {
init_test("try_recv_empty");
let (tx, mut rx) = channel::<i32>();
let err = rx.try_recv();
crate::assert_with_log!(
matches!(err, Err(TryRecvError::Empty)),
"try_recv empty",
"Err(Empty)",
format!("{:?}", err)
);
drop(tx);
crate::test_complete!("try_recv_empty");
}
#[test]
fn try_recv_ready() {
init_test("try_recv_ready");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
tx.send(&cx, 42).expect("send should succeed");
let value = rx.try_recv().expect("try_recv should succeed");
crate::assert_with_log!(value == 42, "try_recv value", 42, value);
crate::test_complete!("try_recv_ready");
}
#[test]
fn is_ready_and_is_closed() {
init_test("is_ready_and_is_closed");
let cx = test_cx();
let (tx, rx) = channel::<i32>();
let ready = rx.is_ready();
crate::assert_with_log!(!ready, "not ready", false, ready);
let closed = rx.is_closed();
crate::assert_with_log!(!closed, "not closed", false, closed);
tx.send(&cx, 42).expect("send should succeed");
let ready = rx.is_ready();
crate::assert_with_log!(ready, "ready after send", true, ready);
let closed = rx.is_closed();
crate::assert_with_log!(!closed, "still open", false, closed);
crate::test_complete!("is_ready_and_is_closed");
}
#[test]
fn sender_is_closed() {
init_test("sender_is_closed");
let (tx, rx) = channel::<i32>();
let closed = tx.is_closed();
crate::assert_with_log!(!closed, "tx open", false, closed);
drop(rx);
let closed = tx.is_closed();
crate::assert_with_log!(closed, "tx closed", true, closed);
crate::test_complete!("sender_is_closed");
}
#[test]
fn send_error_display() {
init_test("send_error_display");
let err = SendError::Disconnected(42);
let text = err.to_string();
crate::assert_with_log!(
text == "sending on a closed oneshot channel",
"display",
"sending on a closed oneshot channel",
text
);
crate::test_complete!("send_error_display");
}
#[test]
fn recv_error_display() {
init_test("recv_error_display");
let text = RecvError::Closed.to_string();
crate::assert_with_log!(
text == "receiving on a closed oneshot channel",
"display",
"receiving on a closed oneshot channel",
text
);
let cancelled = RecvError::Cancelled.to_string();
crate::assert_with_log!(
cancelled == "receive operation cancelled",
"cancelled display",
"receive operation cancelled",
cancelled
);
let polled_after_completion = RecvError::PolledAfterCompletion.to_string();
crate::assert_with_log!(
polled_after_completion == "oneshot recv future polled after completion",
"polled-after-completion display",
"oneshot recv future polled after completion",
polled_after_completion
);
crate::test_complete!("recv_error_display");
}
#[test]
fn try_recv_error_display() {
init_test("try_recv_error_display");
let empty = TryRecvError::Empty.to_string();
crate::assert_with_log!(
empty == "oneshot channel is empty",
"empty display",
"oneshot channel is empty",
empty
);
let closed = TryRecvError::Closed.to_string();
crate::assert_with_log!(
closed == "oneshot channel is closed",
"closed display",
"oneshot channel is closed",
closed
);
crate::test_complete!("try_recv_error_display");
}
#[test]
fn value_is_moved_not_cloned() {
init_test("value_is_moved_not_cloned");
let cx = test_cx();
let (tx, mut rx) = channel::<NonClone>();
tx.send(&cx, NonClone(42)).expect("send should succeed");
let value = block_on(rx.recv(&cx)).expect("recv should succeed");
crate::assert_with_log!(value.0 == 42, "value", 42, value.0);
crate::test_complete!("value_is_moved_not_cloned");
}
#[test]
fn permit_send_returns_error_with_value() {
init_test("permit_send_returns_error_with_value");
let cx = test_cx();
let (tx, rx) = channel::<i32>();
drop(rx);
let permit = tx.reserve(&cx);
let err = permit.send(42);
crate::assert_with_log!(
matches!(err, Err(SendError::Disconnected(42))),
"permit send disconnected",
"Err(Disconnected(42))",
format!("{:?}", err)
);
crate::test_complete!("permit_send_returns_error_with_value");
}
#[test]
fn recv_with_cancel_pending() {
init_test("recv_with_cancel_pending");
let cx = test_cx();
cx.set_cancel_requested(true);
let (tx, mut rx) = channel::<i32>();
tx.send(&cx, 42).expect("send should succeed");
let result = block_on(rx.recv(&cx));
crate::assert_with_log!(result.is_ok(), "recv ok", true, result.is_ok());
let value = result.unwrap();
crate::assert_with_log!(value == 42, "recv value", 42, value);
crate::test_complete!("recv_with_cancel_pending");
}
#[test]
fn recv_cancel_during_wait() {
init_test("recv_cancel_during_wait");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
cx.set_cancel_requested(true);
let err = block_on(rx.recv(&cx));
crate::assert_with_log!(
matches!(err, Err(RecvError::Cancelled)),
"recv cancelled",
"Err(Cancelled)",
format!("{:?}", err)
);
drop(tx);
crate::test_complete!("recv_cancel_during_wait");
}
#[test]
fn recv_cancel_after_pending_clears_registered_waker() {
init_test("recv_cancel_after_pending_clears_registered_waker");
let cx = test_cx();
let (_tx, mut rx) = channel::<i32>();
let inner = Arc::clone(&rx.inner);
let waker = Waker::from(std::sync::Arc::new(TestNoopWaker));
let mut task_cx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
let first_poll = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(first_poll, Poll::Pending),
"first poll pending",
true,
matches!(first_poll, Poll::Pending)
);
let registered_before_cancel = {
let inner = inner.lock();
inner.waker.is_some()
};
crate::assert_with_log!(
registered_before_cancel,
"waker registered before cancel",
true,
registered_before_cancel
);
cx.set_cancel_requested(true);
let cancelled = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(cancelled, Poll::Ready(Err(RecvError::Cancelled))),
"recv cancelled",
"Ready(Err(Cancelled))",
format!("{cancelled:?}")
);
let registered_after_cancel = {
let inner = inner.lock();
inner.waker.is_some()
};
crate::assert_with_log!(
!registered_after_cancel,
"waker cleared on cancel",
false,
registered_after_cancel
);
let repoll = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(repoll, Poll::Ready(Err(RecvError::PolledAfterCompletion))),
"cancelled recv repoll fails closed",
"Ready(Err(PolledAfterCompletion))",
format!("{repoll:?}")
);
crate::test_complete!("recv_cancel_after_pending_clears_registered_waker");
}
#[test]
fn recv_value_ready_clears_stale_waker() {
init_test("recv_value_ready_clears_stale_waker");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let inner = Arc::clone(&rx.inner);
let waker = Waker::from(std::sync::Arc::new(TestNoopWaker));
let mut task_cx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
let first = fut.as_mut().poll(&mut task_cx);
assert!(matches!(first, Poll::Pending));
assert!(
inner.lock().waker.is_some(),
"waker should be registered after Pending"
);
tx.send(&cx, 99).unwrap();
let second = fut.as_mut().poll(&mut task_cx);
assert!(
matches!(second, Poll::Ready(Ok(99))),
"should receive value"
);
assert!(
inner.lock().waker.is_none(),
"waker should be cleared after successful recv"
);
let third = fut.as_mut().poll(&mut task_cx);
assert!(
matches!(third, Poll::Ready(Err(RecvError::PolledAfterCompletion))),
"repoll after value should fail closed"
);
crate::test_complete!("recv_value_ready_clears_stale_waker");
}
#[test]
fn recv_closed_clears_stale_waker() {
init_test("recv_closed_clears_stale_waker");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let inner = Arc::clone(&rx.inner);
let waker = Waker::from(std::sync::Arc::new(TestNoopWaker));
let mut task_cx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
let first = fut.as_mut().poll(&mut task_cx);
assert!(matches!(first, Poll::Pending));
assert!(inner.lock().waker.is_some());
drop(tx);
let second = fut.as_mut().poll(&mut task_cx);
assert!(
matches!(second, Poll::Ready(Err(RecvError::Closed))),
"should get Closed"
);
assert!(
inner.lock().waker.is_none(),
"waker should be cleared after Closed recv"
);
let third = fut.as_mut().poll(&mut task_cx);
assert!(
matches!(third, Poll::Ready(Err(RecvError::PolledAfterCompletion))),
"repoll after close should fail closed"
);
crate::test_complete!("recv_closed_clears_stale_waker");
}
#[test]
fn recv_uninterruptible_repoll_after_value_fails_closed() {
init_test("recv_uninterruptible_repoll_after_value_fails_closed");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
tx.send(&cx, 7).expect("send should succeed");
let waker = Waker::from(std::sync::Arc::new(TestNoopWaker));
let mut task_cx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv_uninterruptible());
let first = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(first, Poll::Ready(Ok(7))),
"uninterruptible recv gets value",
"Ready(Ok(7))",
format!("{first:?}")
);
let second = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(second, Poll::Ready(Err(RecvError::PolledAfterCompletion))),
"uninterruptible recv repoll fails closed",
"Ready(Err(PolledAfterCompletion))",
format!("{second:?}")
);
crate::test_complete!("recv_uninterruptible_repoll_after_value_fails_closed");
}
#[test]
fn recv_uninterruptible_repoll_after_closed_fails_closed() {
init_test("recv_uninterruptible_repoll_after_closed_fails_closed");
let (tx, mut rx) = channel::<i32>();
drop(tx);
let waker = Waker::from(std::sync::Arc::new(TestNoopWaker));
let mut task_cx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv_uninterruptible());
let first = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(first, Poll::Ready(Err(RecvError::Closed))),
"uninterruptible recv closes",
"Ready(Err(Closed))",
format!("{first:?}")
);
let second = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(second, Poll::Ready(Err(RecvError::PolledAfterCompletion))),
"uninterruptible closed repoll fails closed",
"Ready(Err(PolledAfterCompletion))",
format!("{second:?}")
);
crate::test_complete!("recv_uninterruptible_repoll_after_closed_fails_closed");
}
#[test]
fn try_recv_value_ready_clears_stale_waker() {
init_test("try_recv_value_ready_clears_stale_waker");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let inner = Arc::clone(&rx.inner);
let waker = Waker::from(std::sync::Arc::new(TestNoopWaker));
let mut task_cx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
let first = fut.as_mut().poll(&mut task_cx);
assert!(matches!(first, Poll::Pending));
assert!(inner.lock().waker.is_some());
drop(fut);
tx.send(&cx, 99).unwrap();
let value = rx.try_recv().unwrap();
crate::assert_with_log!(value == 99, "try_recv value", 99, value);
assert!(
inner.lock().waker.is_none(),
"waker should be cleared after try_recv Ok"
);
crate::test_complete!("try_recv_value_ready_clears_stale_waker");
}
#[test]
fn try_recv_closed_clears_stale_waker() {
init_test("try_recv_closed_clears_stale_waker");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let inner = Arc::clone(&rx.inner);
let waker = Waker::from(std::sync::Arc::new(TestNoopWaker));
let mut task_cx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
let first = fut.as_mut().poll(&mut task_cx);
assert!(matches!(first, Poll::Pending));
assert!(inner.lock().waker.is_some());
drop(fut);
drop(tx);
let closed = rx.try_recv();
assert!(matches!(closed, Err(TryRecvError::Closed)));
assert!(
inner.lock().waker.is_none(),
"waker should be cleared after try_recv Closed"
);
crate::test_complete!("try_recv_closed_clears_stale_waker");
}
#[test]
fn permit_send_receiver_dropped_clears_waker() {
init_test("permit_send_receiver_dropped_clears_waker");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let waker = Waker::from(std::sync::Arc::new(TestNoopWaker));
let mut task_cx = Context::from_waker(&waker);
let mut fut = Box::pin(rx.recv(&cx));
let poll = fut.as_mut().poll(&mut task_cx);
assert!(matches!(poll, Poll::Pending));
drop(fut);
assert!(
tx.inner.lock().waker.is_none(),
"RecvFuture::Drop should clear stale waker"
);
drop(rx);
let permit = tx.reserve(&cx);
let result = permit.send(42);
assert!(matches!(result, Err(SendError::Disconnected(42))));
crate::test_complete!("permit_send_receiver_dropped_clears_waker");
}
#[test]
fn sender_drop_on_poisoned_mutex_does_not_panic() {
init_test("sender_drop_on_poisoned_mutex_does_not_panic");
let (tx, _rx) = channel::<i32>();
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _guard = tx.inner.lock();
panic!("intentional poison");
}));
drop(tx);
crate::test_complete!("sender_drop_on_poisoned_mutex_does_not_panic");
}
#[test]
fn permit_drop_on_poisoned_mutex_does_not_panic() {
init_test("permit_drop_on_poisoned_mutex_does_not_panic");
let cx = test_cx();
let (tx, _rx) = channel::<i32>();
let permit = tx.reserve(&cx);
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _guard = permit.inner.lock();
panic!("intentional poison");
}));
drop(permit);
crate::test_complete!("permit_drop_on_poisoned_mutex_does_not_panic");
}
#[test]
fn receiver_drop_on_poisoned_mutex_does_not_panic() {
init_test("receiver_drop_on_poisoned_mutex_does_not_panic");
let (tx, rx) = channel::<i32>();
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _guard = tx.inner.lock();
panic!("intentional poison");
}));
drop(rx);
drop(tx);
crate::test_complete!("receiver_drop_on_poisoned_mutex_does_not_panic");
}
#[test]
fn recv_future_drop_clears_stale_waker() {
init_test("recv_future_drop_clears_stale_waker");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let inner = Arc::clone(&rx.inner);
let waker = Waker::from(std::sync::Arc::new(TestNoopWaker));
let mut task_cx = Context::from_waker(&waker);
{
let mut fut = Box::pin(rx.recv(&cx));
let poll = fut.as_mut().poll(&mut task_cx);
assert!(matches!(poll, Poll::Pending));
assert!(
inner.lock().waker.is_some(),
"waker registered after Pending"
);
}
assert!(
inner.lock().waker.is_none(),
"waker cleared after RecvFuture drop"
);
tx.send(&cx, 99).unwrap();
let value = rx.try_recv().unwrap();
crate::assert_with_log!(value == 99, "recv after drop", 99, value);
crate::test_complete!("recv_future_drop_clears_stale_waker");
}
#[test]
fn recv_returns_value_even_when_cancelled() {
init_test("recv_returns_value_even_when_cancelled");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
tx.send(&cx, 77).unwrap();
cx.set_cancel_requested(true);
let result = block_on(rx.recv(&cx));
let ok = matches!(result, Ok(77));
crate::assert_with_log!(ok, "value over cancel", true, ok);
crate::test_complete!("recv_returns_value_even_when_cancelled");
}
#[test]
fn is_closed_after_permit_abort() {
init_test("is_closed_after_permit_abort");
let cx = test_cx();
let (tx, rx) = channel::<i32>();
let permit = tx.reserve(&cx);
let closed_during_permit = rx.is_closed();
crate::assert_with_log!(
!closed_during_permit,
"not closed during permit",
false,
closed_during_permit
);
permit.abort();
let closed_after_abort = rx.is_closed();
crate::assert_with_log!(
closed_after_abort,
"closed after abort",
true,
closed_after_abort
);
crate::test_complete!("is_closed_after_permit_abort");
}
#[test]
fn try_recv_returns_empty_while_permit_outstanding() {
init_test("try_recv_returns_empty_while_permit_outstanding");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let permit = tx.reserve(&cx);
let result = rx.try_recv();
let empty_ok = matches!(result, Err(TryRecvError::Empty));
crate::assert_with_log!(empty_ok, "empty while permit outstanding", true, empty_ok);
permit.send(42).unwrap();
let value = rx.try_recv().unwrap();
crate::assert_with_log!(value == 42, "value after send", 42, value);
crate::test_complete!("try_recv_returns_empty_while_permit_outstanding");
}
#[test]
fn sender_drop_wakes_pending_receiver() {
init_test("sender_drop_wakes_pending_receiver");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let notify_count = Arc::new(AtomicUsize::new(0));
let poll_waker = counting_waker(Arc::clone(¬ify_count));
let mut task_cx = Context::from_waker(&poll_waker);
let mut fut = Box::pin(rx.recv(&cx));
let poll = fut.as_mut().poll(&mut task_cx);
assert!(matches!(poll, Poll::Pending));
drop(tx);
let notifications = notify_count.load(Ordering::SeqCst);
crate::assert_with_log!(notifications == 1, "woken once", 1usize, notifications);
let result = fut.as_mut().poll(&mut task_cx);
let closed_ok = matches!(result, Poll::Ready(Err(RecvError::Closed)));
crate::assert_with_log!(closed_ok, "closed after sender drop", true, closed_ok);
crate::test_complete!("sender_drop_wakes_pending_receiver");
}
#[test]
fn dropping_stale_recv_future_does_not_clear_new_waiter() {
init_test("dropping_stale_recv_future_does_not_clear_new_waiter");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let wake_counter_1 = Arc::new(AtomicUsize::new(0));
let wake_counter_2 = Arc::new(AtomicUsize::new(0));
let recv_waker_1 = counting_waker(Arc::clone(&wake_counter_1));
let recv_waker_2 = counting_waker(Arc::clone(&wake_counter_2));
let mut task_cx_1 = Context::from_waker(&recv_waker_1);
let mut fut_1 = Box::pin(rx.recv(&cx));
let poll_1 = fut_1.as_mut().poll(&mut task_cx_1);
crate::assert_with_log!(
matches!(poll_1, Poll::Pending),
"first recv pending",
true,
matches!(poll_1, Poll::Pending)
);
drop(fut_1);
let mut task_cx_2 = Context::from_waker(&recv_waker_2);
let mut fut_2 = Box::pin(rx.recv(&cx));
let poll_2 = fut_2.as_mut().poll(&mut task_cx_2);
crate::assert_with_log!(
matches!(poll_2, Poll::Pending),
"second recv pending",
true,
matches!(poll_2, Poll::Pending)
);
tx.send(&cx, 5).expect("send should succeed");
let wake_count_1 = wake_counter_1.load(Ordering::SeqCst);
let wake_count_2 = wake_counter_2.load(Ordering::SeqCst);
crate::assert_with_log!(
wake_count_1 == 0,
"stale waiter not woken",
0usize,
wake_count_1
);
crate::assert_with_log!(
wake_count_2 == 1,
"active waiter woken once",
1usize,
wake_count_2
);
let result = fut_2.as_mut().poll(&mut task_cx_2);
crate::assert_with_log!(
matches!(result, Poll::Ready(Ok(5))),
"active future receives value",
"Ready(Ok(5))",
format!("{result:?}")
);
crate::test_complete!("dropping_stale_recv_future_does_not_clear_new_waiter");
}
#[test]
fn permit_abort_wakes_pending_receiver_and_returns_closed() {
init_test("permit_abort_wakes_pending_receiver_and_returns_closed");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let wake_counter = Arc::new(AtomicUsize::new(0));
let recv_waker = counting_waker(Arc::clone(&wake_counter));
let mut task_cx = Context::from_waker(&recv_waker);
let mut fut = Box::pin(rx.recv(&cx));
let first_poll = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(first_poll, Poll::Pending),
"recv pending before abort",
true,
matches!(first_poll, Poll::Pending)
);
let permit = tx.reserve(&cx);
permit.abort();
let wake_count = wake_counter.load(Ordering::SeqCst);
crate::assert_with_log!(wake_count == 1, "receiver woken once", 1usize, wake_count);
let second_poll = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(second_poll, Poll::Ready(Err(RecvError::Closed))),
"recv closed after abort",
"Ready(Err(Closed))",
format!("{second_poll:?}")
);
crate::test_complete!("permit_abort_wakes_pending_receiver_and_returns_closed");
}
#[test]
fn dropping_permit_wakes_pending_receiver_and_returns_closed() {
init_test("dropping_permit_wakes_pending_receiver_and_returns_closed");
let cx = test_cx();
let (tx, mut rx) = channel::<i32>();
let wake_counter = Arc::new(AtomicUsize::new(0));
let recv_waker = counting_waker(Arc::clone(&wake_counter));
let mut task_cx = Context::from_waker(&recv_waker);
let mut fut = Box::pin(rx.recv(&cx));
let first_poll = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(first_poll, Poll::Pending),
"recv pending before permit drop",
true,
matches!(first_poll, Poll::Pending)
);
let permit = tx.reserve(&cx);
drop(permit);
let wake_count = wake_counter.load(Ordering::SeqCst);
crate::assert_with_log!(wake_count == 1, "receiver woken once", 1usize, wake_count);
let second_poll = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(second_poll, Poll::Ready(Err(RecvError::Closed))),
"recv closed after permit drop",
"Ready(Err(Closed))",
format!("{second_poll:?}")
);
crate::test_complete!("dropping_permit_wakes_pending_receiver_and_returns_closed");
}
#[test]
fn recv_repoll_same_waker_keeps_waiter_identity() {
init_test("recv_repoll_same_waker_keeps_waiter_identity");
let cx = test_cx();
let (_tx, mut rx) = channel::<i32>();
let inner = Arc::clone(&rx.inner);
let recv_waker = counting_waker(Arc::new(AtomicUsize::new(0)));
let mut task_cx = Context::from_waker(&recv_waker);
let mut fut = Box::pin(rx.recv(&cx));
let first_poll = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(first_poll, Poll::Pending),
"first poll pending",
true,
matches!(first_poll, Poll::Pending)
);
let first_waiter_id = inner.lock().waker_id;
let second_poll = fut.as_mut().poll(&mut task_cx);
crate::assert_with_log!(
matches!(second_poll, Poll::Pending),
"second poll pending",
true,
matches!(second_poll, Poll::Pending)
);
let second_waiter_id = inner.lock().waker_id;
crate::assert_with_log!(
first_waiter_id == second_waiter_id,
"same waker keeps waiter identity",
first_waiter_id,
second_waiter_id
);
crate::test_complete!("recv_repoll_same_waker_keeps_waiter_identity");
}
}