use parking_lot::{Mutex, RwLock, RwLockReadGuard};
use smallvec::SmallVec;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
use crate::cx::Cx;
use crate::util::{Arena, ArenaIndex};
struct WatchWaiter {
waker: Waker,
queued: Arc<AtomicBool>,
}
impl std::fmt::Debug for WatchWaiter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WatchWaiter")
.field("waker", &self.waker)
.field("queued", &self.queued.load(Ordering::Relaxed))
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SendError<T> {
Closed(T),
}
impl<T> std::fmt::Display for SendError<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Closed(_) => write!(f, "sending on a closed watch channel"),
}
}
}
impl<T: std::fmt::Debug> std::error::Error for SendError<T> {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecvError {
Closed,
Cancelled,
PolledAfterCompletion,
}
impl std::fmt::Display for RecvError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Closed => write!(f, "receiving on a closed watch channel"),
Self::Cancelled => write!(f, "receive operation cancelled"),
Self::PolledAfterCompletion => write!(f, "watch future polled after completion"),
}
}
}
impl std::error::Error for RecvError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ModifyError;
impl std::fmt::Display for ModifyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "modifying a closed watch channel")
}
}
impl std::error::Error for ModifyError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WatchTelemetrySnapshot {
pub channel_id: u64,
pub channel_kind: &'static str,
pub capacity: usize,
pub queued_messages: usize,
pub reserved_uncommitted_obligations: usize,
pub send_waiter_count: usize,
pub recv_waiter_count: usize,
pub receiver_count: usize,
pub receiver_health: &'static str,
pub lagged_receiver_count: Option<usize>,
pub cancellation_count: u64,
pub closed: bool,
}
#[derive(Debug)]
struct WatchInner<T> {
value: RwLock<(T, u64)>,
receiver_count: AtomicUsize,
sender_dropped: AtomicBool,
waiters: Mutex<SmallVec<[WatchWaiter; 4]>>,
receiver_versions: Mutex<Arena<u64>>,
cancellation_count: AtomicU64,
}
impl<T> WatchInner<T> {
fn new(initial: T) -> Self {
Self {
value: RwLock::new((initial, 0)),
receiver_count: AtomicUsize::new(1), sender_dropped: AtomicBool::new(false),
waiters: Mutex::new(SmallVec::new()),
receiver_versions: Mutex::new(Arena::new()),
cancellation_count: AtomicU64::new(0),
}
}
fn is_sender_dropped(&self) -> bool {
self.sender_dropped.load(Ordering::Acquire)
}
fn current_version(&self) -> u64 {
self.value.read().1
}
fn insert_receiver_version(&self, version: u64) -> ArenaIndex {
self.receiver_versions.lock().insert(version)
}
fn update_receiver_version(&self, token: ArenaIndex, version: u64) {
if let Some(seen_version) = self.receiver_versions.lock().get_mut(token) {
*seen_version = version;
}
}
fn remove_receiver_version(&self, token: ArenaIndex) {
self.receiver_versions.lock().remove(token);
}
fn lagged_receiver_count(&self, current_version: u64) -> usize {
self.receiver_versions
.lock()
.iter()
.filter(|(_, seen_version)| **seen_version != current_version)
.count()
}
fn recv_waiter_count(&self) -> usize {
self.waiters
.lock()
.iter()
.filter(|entry| {
entry.queued.load(Ordering::Acquire) && Arc::strong_count(&entry.queued) > 1
})
.count()
}
fn record_cancellation(&self) {
self.cancellation_count.fetch_add(1, Ordering::Relaxed);
}
fn telemetry_snapshot(
&self,
channel_id: u64,
receiver_seen_version: Option<u64>,
) -> WatchTelemetrySnapshot {
let current_version = self.current_version();
let receiver_count = self.receiver_count.load(Ordering::Acquire);
let recv_waiter_count = self.recv_waiter_count();
let lagged_receiver_count = self.lagged_receiver_count(current_version);
let sender_dropped = self.is_sender_dropped();
let receiver_has_change =
receiver_seen_version.is_some_and(|seen_version| seen_version != current_version);
let closed = receiver_count == 0 || sender_dropped;
let receiver_health = if receiver_count == 0 {
"receiver_dropped"
} else if receiver_has_change {
"changed"
} else if sender_dropped {
"sender_closed"
} else if recv_waiter_count > 0 {
"waiting"
} else if receiver_seen_version.is_some() {
"unchanged"
} else if lagged_receiver_count > 0 {
"lagged"
} else {
"open"
};
WatchTelemetrySnapshot {
channel_id,
channel_kind: "watch",
capacity: 1,
queued_messages: usize::from(receiver_has_change || lagged_receiver_count > 0),
reserved_uncommitted_obligations: 0,
send_waiter_count: 0,
recv_waiter_count,
receiver_count,
receiver_health,
lagged_receiver_count: Some(lagged_receiver_count),
cancellation_count: self.cancellation_count.load(Ordering::Relaxed),
closed,
}
}
fn wake_all_waiters(&self) {
let waiters: SmallVec<[WatchWaiter; 4]> = {
let mut w = self.waiters.lock();
std::mem::take(&mut *w)
};
for w in waiters {
w.queued.store(false, Ordering::Release);
w.waker.wake();
}
}
fn register_waker(&self, waiter: WatchWaiter) {
let mut waiters = self.waiters.lock();
let mut found = false;
waiters.retain_mut(|entry| {
if Arc::strong_count(&entry.queued) <= 1 {
return false;
}
if !found && Arc::ptr_eq(&entry.queued, &waiter.queued) {
if !entry.waker.will_wake(&waiter.waker) {
entry.waker.clone_from(&waiter.waker);
}
found = true;
}
true
});
if !found {
waiters.push(waiter);
}
}
fn refresh_waker(&self, queued: &Arc<AtomicBool>, new_waker: &Waker) -> bool {
let mut waiters = self.waiters.lock();
let mut found = false;
waiters.retain_mut(|entry| {
if Arc::strong_count(&entry.queued) <= 1 {
return false;
}
if !found && Arc::ptr_eq(&entry.queued, queued) {
if !entry.waker.will_wake(new_waker) {
entry.waker.clone_from(new_waker);
}
found = true;
}
true
});
found
}
}
#[inline]
#[must_use]
pub fn channel<T>(initial: T) -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(WatchInner::new(initial));
let receiver_token = inner.insert_receiver_version(0);
(
Sender {
inner: Arc::clone(&inner),
},
Receiver {
inner,
seen_version: 0,
receiver_token,
waiter: None,
},
)
}
#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<WatchInner<T>>,
}
impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
if self.inner.is_sender_dropped() {
return Err(SendError::Closed(value));
}
let _old_value = {
let mut guard = self.inner.value.write();
let old = std::mem::replace(&mut guard.0, value);
guard.1 = guard.1.wrapping_add(1);
old
};
if self.inner.receiver_count.load(Ordering::Acquire) != 0 {
self.inner.wake_all_waiters();
}
Ok(())
}
pub fn send_modify<F>(&self, f: F) -> Result<(), ModifyError>
where
T: Clone,
F: FnOnce(&mut T),
{
if self.inner.is_sender_dropped() {
return Err(ModifyError);
}
let mut value = {
let guard = self.inner.value.read();
guard.0.clone()
};
f(&mut value);
{
let mut guard = self.inner.value.write();
guard.0 = value;
guard.1 = guard.1.wrapping_add(1);
}
if self.inner.receiver_count.load(Ordering::Acquire) != 0 {
self.inner.wake_all_waiters();
}
Ok(())
}
#[inline]
#[must_use]
pub fn borrow(&self) -> Ref<'_, T> {
Ref {
guard: self.inner.value.read(),
}
}
#[must_use]
pub fn subscribe(&self) -> Receiver<T> {
let (current_version, receiver_token) = {
let guard = self.inner.value.read();
self.inner.receiver_count.fetch_add(1, Ordering::Relaxed);
let receiver_token = self.inner.insert_receiver_version(guard.1);
(guard.1, receiver_token)
};
Receiver {
inner: Arc::clone(&self.inner),
seen_version: current_version,
receiver_token,
waiter: None,
}
}
#[inline]
#[must_use]
pub fn receiver_count(&self) -> usize {
self.inner.receiver_count.load(Ordering::Relaxed)
}
#[inline]
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.receiver_count.load(Ordering::Acquire) == 0
}
#[must_use]
#[inline]
pub fn telemetry_snapshot(&self, channel_id: u64) -> WatchTelemetrySnapshot {
self.inner.telemetry_snapshot(channel_id, None)
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.inner.sender_dropped.store(true, Ordering::Release);
let waiters: SmallVec<[WatchWaiter; 4]> = {
let mut w = self.inner.waiters.lock();
std::mem::take(&mut *w)
};
for w in waiters {
w.queued.store(false, Ordering::Release);
w.waker.wake();
}
}
}
#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<WatchInner<T>>,
seen_version: u64,
receiver_token: ArenaIndex,
waiter: Option<Arc<AtomicBool>>,
}
impl<T> Receiver<T> {
pub fn changed<'a, 'b, Caps>(&'a mut self, cx: &'b Cx<Caps>) -> ChangedFuture<'a, 'b, T, Caps> {
cx.trace("watch::changed starting wait");
ChangedFuture {
receiver: self,
cx,
completed: false,
}
}
pub(crate) fn poll_changed<Caps>(
&mut self,
cx: &Cx<Caps>,
context: &Context<'_>,
) -> Poll<Result<(), RecvError>> {
if cx.checkpoint().is_err() {
cx.trace("watch::changed cancelled");
self.inner.record_cancellation();
return Poll::Ready(Err(RecvError::Cancelled));
}
let current = self.inner.current_version();
if current != self.seen_version {
self.seen_version = current;
self.inner
.update_receiver_version(self.receiver_token, self.seen_version);
cx.trace("watch::changed received update");
return Poll::Ready(Ok(()));
}
if self.inner.is_sender_dropped() {
let current = self.inner.current_version();
if current != self.seen_version {
self.seen_version = current;
self.inner
.update_receiver_version(self.receiver_token, self.seen_version);
return Poll::Ready(Ok(()));
}
cx.trace("watch::changed sender dropped");
return Poll::Ready(Err(RecvError::Closed));
}
match self.waiter.as_ref() {
Some(w) if !w.load(Ordering::Acquire) => {
w.store(true, Ordering::Release);
self.inner.register_waker(WatchWaiter {
waker: context.waker().clone(),
queued: Arc::clone(w),
});
}
Some(w) => {
if !self.inner.refresh_waker(w, context.waker()) {
self.inner.register_waker(WatchWaiter {
waker: context.waker().clone(),
queued: Arc::clone(w),
});
}
}
None => {
let w = Arc::new(AtomicBool::new(true));
self.inner.register_waker(WatchWaiter {
waker: context.waker().clone(),
queued: Arc::clone(&w),
});
self.waiter = Some(w);
}
}
let current_after_register = self.inner.current_version();
if current_after_register != self.seen_version {
self.seen_version = current_after_register;
self.inner
.update_receiver_version(self.receiver_token, self.seen_version);
cx.trace("watch::changed received update after register");
if let Some(w) = self.waiter.as_ref() {
w.store(false, Ordering::Release);
}
return Poll::Ready(Ok(()));
}
if self.inner.is_sender_dropped() {
if let Some(w) = self.waiter.as_ref() {
w.store(false, Ordering::Release);
}
return Poll::Ready(Err(RecvError::Closed));
}
Poll::Pending
}
#[inline]
#[must_use]
pub fn borrow(&self) -> Ref<'_, T> {
Ref {
guard: self.inner.value.read(),
}
}
#[inline]
#[must_use]
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
let guard = self.inner.value.read();
self.seen_version = guard.1;
self.inner
.update_receiver_version(self.receiver_token, self.seen_version);
Ref { guard }
}
#[inline]
#[must_use]
pub fn borrow_and_clone(&self) -> T
where
T: Clone,
{
self.borrow().clone()
}
#[inline]
#[must_use]
pub fn borrow_and_update_clone(&mut self) -> T
where
T: Clone,
{
self.borrow_and_update().clone_inner()
}
#[inline]
pub fn mark_seen(&mut self) {
self.seen_version = self.inner.current_version();
self.inner
.update_receiver_version(self.receiver_token, self.seen_version);
}
#[inline]
#[must_use]
pub fn has_changed(&self) -> bool {
self.inner.current_version() != self.seen_version
}
#[inline]
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_sender_dropped()
}
#[inline]
#[must_use]
pub fn seen_version(&self) -> u64 {
self.seen_version
}
#[must_use]
#[inline]
pub fn telemetry_snapshot(&self, channel_id: u64) -> WatchTelemetrySnapshot {
self.inner
.telemetry_snapshot(channel_id, Some(self.seen_version))
}
}
pub struct ChangedFuture<'a, 'b, T, Caps = crate::cx::cap::All> {
receiver: &'a mut Receiver<T>,
cx: &'b Cx<Caps>,
completed: bool,
}
impl<T, Caps> Future for ChangedFuture<'_, '_, T, Caps> {
type Output = Result<(), RecvError>;
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.completed {
return Poll::Ready(Err(RecvError::PolledAfterCompletion));
}
match this.receiver.poll_changed(this.cx, context) {
Poll::Ready(result) => {
this.completed = true;
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
}
}
impl<T, Caps> Drop for ChangedFuture<'_, '_, T, Caps> {
fn drop(&mut self) {
let mut removed_pending_waiter = false;
if let Some(waiter) = self.receiver.waiter.as_ref() {
if waiter.load(Ordering::Acquire) || Arc::strong_count(waiter) > 1 {
waiter.store(false, Ordering::Release);
let mut waiters = self.receiver.inner.waiters.lock();
waiters.retain(|entry| {
let remove = Arc::ptr_eq(&entry.queued, waiter);
removed_pending_waiter |= remove;
!remove && Arc::strong_count(&entry.queued) > 1
});
}
}
if !self.completed && removed_pending_waiter {
self.receiver.inner.record_cancellation();
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let receiver_token = self.inner.insert_receiver_version(self.seen_version);
self.inner.receiver_count.fetch_add(1, Ordering::Relaxed);
Self {
inner: Arc::clone(&self.inner),
seen_version: self.seen_version,
receiver_token,
waiter: None,
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.inner.receiver_count.fetch_sub(1, Ordering::Release);
self.inner.remove_receiver_version(self.receiver_token);
if let Some(waiter) = self.waiter.take() {
let mut waiters = self.inner.waiters.lock();
waiters.retain(|entry| {
!Arc::ptr_eq(&entry.queued, &waiter) && Arc::strong_count(&entry.queued) > 1
});
}
}
}
#[derive(Debug)]
pub struct Ref<'a, T> {
guard: RwLockReadGuard<'a, (T, u64)>,
}
impl<T> std::ops::Deref for Ref<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.guard.0
}
}
impl<T: Clone> Ref<'_, T> {
#[must_use]
pub fn clone_inner(&self) -> T {
self.guard.0.clone()
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use std::sync::atomic::AtomicUsize;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn test_cx() -> Cx<crate::cx::cap::All> {
Cx::for_testing()
}
fn poll_ready<F: Future + Unpin>(f: &mut F) -> F::Output {
let waker = Waker::noop();
let mut cx = Context::from_waker(waker);
match Pin::new(f).poll(&mut cx) {
Poll::Ready(v) => v,
Poll::Pending => panic!("expected Ready, got Pending"),
}
}
fn block_on<F: Future>(f: F) -> F::Output {
let waker = Waker::noop().clone();
let mut cx = Context::from_waker(&waker);
let mut pinned = Box::pin(f);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
#[test]
fn changed_accepts_detached_no_cap_context() {
init_test("changed_accepts_detached_no_cap_context");
let cx = Cx::<crate::cx::cap::None>::detached_cancel_context();
let (tx, mut rx) = channel(0);
tx.send(47).expect("send should succeed");
block_on(rx.changed(&cx)).expect("changed should accept cap::None Cx");
let value = *rx.borrow();
crate::assert_with_log!(value == 47, "watch value", 47, value);
crate::test_complete!("changed_accepts_detached_no_cap_context");
}
#[test]
fn basic_send_recv() {
init_test("basic_send_recv");
let cx = test_cx();
let (tx, mut rx) = channel(0);
tx.send(42).expect("send failed");
poll_ready(&mut rx.changed(&cx)).expect("changed failed");
let value = *rx.borrow();
crate::assert_with_log!(value == 42, "recv value", 42, value);
crate::test_complete!("basic_send_recv");
}
#[test]
fn initial_value_visible() {
init_test("initial_value_visible");
let (tx, rx) = channel(42);
let rx_value = *rx.borrow();
crate::assert_with_log!(rx_value == 42, "rx initial", 42, rx_value);
let tx_value = *tx.borrow();
crate::assert_with_log!(tx_value == 42, "tx initial", 42, tx_value);
crate::test_complete!("initial_value_visible");
}
#[test]
fn multiple_updates() {
init_test("multiple_updates");
let cx = test_cx();
let (tx, mut rx) = channel(0);
for i in 1..=10 {
tx.send(i).expect("send failed");
poll_ready(&mut rx.changed(&cx)).expect("changed failed");
let value = *rx.borrow();
crate::assert_with_log!(value == i, "rx value", i, value);
}
crate::test_complete!("multiple_updates");
}
#[test]
fn latest_value_wins() {
init_test("latest_value_wins");
let (tx, rx) = channel(0);
for i in 1..=100 {
tx.send(i).expect("send failed");
}
let value = *rx.borrow();
crate::assert_with_log!(value == 100, "latest value", 100, value);
crate::test_complete!("latest_value_wins");
}
#[test]
fn send_modify() {
init_test("send_modify");
let cx = test_cx();
let (tx, mut rx) = channel(0);
tx.send_modify(|v| *v = 42).expect("send_modify failed");
poll_ready(&mut rx.changed(&cx)).expect("changed failed");
let first = *rx.borrow();
crate::assert_with_log!(first == 42, "after first modify", 42, first);
tx.send_modify(|v| *v += 10).expect("send_modify failed");
poll_ready(&mut rx.changed(&cx)).expect("changed failed");
let second = *rx.borrow();
crate::assert_with_log!(second == 52, "after second modify", 52, second);
crate::test_complete!("send_modify");
}
#[test]
fn borrow_and_clone() {
init_test("borrow_and_clone");
let (_tx, rx) = channel(42);
let value: i32 = rx.borrow_and_clone();
crate::assert_with_log!(value == 42, "borrow_and_clone", 42, value);
crate::test_complete!("borrow_and_clone");
}
#[test]
fn mark_seen() {
init_test("mark_seen");
let cx = test_cx();
let (tx, mut rx) = channel(0);
tx.send(1).expect("send failed");
let changed = rx.has_changed();
crate::assert_with_log!(changed, "has_changed after send", true, changed);
rx.mark_seen();
let changed = rx.has_changed();
crate::assert_with_log!(!changed, "has_changed after mark", false, changed);
tx.send(2).expect("send failed");
poll_ready(&mut rx.changed(&cx)).expect("changed failed");
let value = *rx.borrow();
crate::assert_with_log!(value == 2, "after second send", 2, value);
crate::test_complete!("mark_seen");
}
#[test]
fn changed_returns_only_on_new_value() {
init_test("changed_returns_only_on_new_value");
let cx = test_cx();
let (tx, mut rx) = channel(0);
tx.send(1).expect("send failed");
poll_ready(&mut rx.changed(&cx)).expect("changed failed");
let changed = rx.has_changed();
crate::assert_with_log!(!changed, "has_changed false", false, changed);
tx.send(2).expect("send failed");
let changed = rx.has_changed();
crate::assert_with_log!(changed, "has_changed true", true, changed);
poll_ready(&mut rx.changed(&cx)).expect("changed failed");
let value = *rx.borrow();
crate::assert_with_log!(value == 2, "value", 2, value);
crate::test_complete!("changed_returns_only_on_new_value");
}
#[test]
fn multiple_receivers() {
init_test("multiple_receivers");
let cx = test_cx();
let (tx, mut rx1) = channel(0);
let mut rx2 = rx1.clone();
tx.send(42).expect("send failed");
let rx3 = tx.subscribe();
poll_ready(&mut rx1.changed(&cx)).expect("changed failed");
poll_ready(&mut rx2.changed(&cx)).expect("changed failed");
let changed = rx3.has_changed();
crate::assert_with_log!(!changed, "rx3 has_changed", false, changed);
let v1 = *rx1.borrow();
crate::assert_with_log!(v1 == 42, "rx1 value", 42, v1);
let v2 = *rx2.borrow();
crate::assert_with_log!(v2 == 42, "rx2 value", 42, v2);
let v3 = *rx3.borrow();
crate::assert_with_log!(v3 == 42, "rx3 value", 42, v3);
crate::test_complete!("multiple_receivers");
}
#[test]
fn receiver_count() {
init_test("receiver_count");
let (tx, rx1) = channel::<i32>(0);
let count = tx.receiver_count();
crate::assert_with_log!(count == 1, "count 1", 1, count);
let rx2 = rx1.clone();
let count = tx.receiver_count();
crate::assert_with_log!(count == 2, "count 2", 2, count);
let rx3 = tx.subscribe();
let count = tx.receiver_count();
crate::assert_with_log!(count == 3, "count 3", 3, count);
drop(rx1);
let count = tx.receiver_count();
crate::assert_with_log!(count == 2, "count 2 after drop", 2, count);
drop(rx2);
drop(rx3);
let count = tx.receiver_count();
crate::assert_with_log!(count == 0, "count 0", 0, count);
let closed = tx.is_closed();
crate::assert_with_log!(closed, "tx closed", true, closed);
crate::test_complete!("receiver_count");
}
#[test]
fn sender_dropped() {
init_test("sender_dropped");
let cx = test_cx();
let (tx, mut rx) = channel(0);
tx.send(42).expect("send failed");
drop(tx);
let closed = rx.is_closed();
crate::assert_with_log!(closed, "rx closed", true, closed);
poll_ready(&mut rx.changed(&cx)).expect("should see final update");
let value = *rx.borrow();
crate::assert_with_log!(value == 42, "borrow value", 42, value);
let result = poll_ready(&mut rx.changed(&cx));
crate::assert_with_log!(
result.is_err(),
"changed returns error",
true,
result.is_err()
);
crate::test_complete!("sender_dropped");
}
#[test]
fn send_without_receivers_preserves_latest_value() {
init_test("send_without_receivers_preserves_latest_value");
let (tx, rx) = channel(0);
drop(rx);
let closed = tx.is_closed();
crate::assert_with_log!(closed, "tx closed", true, closed);
let result = tx.send(42);
crate::assert_with_log!(
result.is_ok(),
"send still preserves state",
true,
result.is_ok()
);
let rx2 = tx.subscribe();
let value = *rx2.borrow();
crate::assert_with_log!(value == 42, "subscriber sees preserved state", 42, value);
let changed = rx2.has_changed();
crate::assert_with_log!(
!changed,
"new subscriber starts at current version",
false,
changed
);
crate::test_complete!("send_without_receivers_preserves_latest_value");
}
#[test]
fn send_modify_without_receivers_preserves_latest_value() {
init_test("send_modify_without_receivers_preserves_latest_value");
let (tx, rx) = channel(10);
drop(rx);
let result = tx.send_modify(|value| *value += 32);
crate::assert_with_log!(
result.is_ok(),
"send_modify preserves state",
true,
result.is_ok()
);
let rx2 = tx.subscribe();
let value = *rx2.borrow();
crate::assert_with_log!(value == 42, "subscriber sees modified state", 42, value);
crate::test_complete!("send_modify_without_receivers_preserves_latest_value");
}
#[test]
fn version_tracking() {
init_test("version_tracking");
let (_tx, rx) = channel(0);
let version = rx.seen_version();
crate::assert_with_log!(version == 0, "seen_version", 0, version);
crate::test_complete!("version_tracking");
}
#[test]
fn version_wraparound_still_detects_changes() {
init_test("version_wraparound_still_detects_changes");
let cx = test_cx();
let (tx, mut rx) = channel(0_u8);
{
let mut guard = tx.inner.value.write();
guard.1 = u64::MAX - 1;
drop(guard);
}
rx.seen_version = u64::MAX - 1;
tx.send(1).expect("send failed");
let changed = rx.has_changed();
crate::assert_with_log!(changed, "has_changed at u64::MAX", true, changed);
poll_ready(&mut rx.changed(&cx)).expect("changed at u64::MAX failed");
let first = *rx.borrow();
crate::assert_with_log!(first == 1, "value at u64::MAX", 1, first);
tx.send(2).expect("send failed");
let changed = rx.has_changed();
crate::assert_with_log!(changed, "has_changed after wrap", true, changed);
poll_ready(&mut rx.changed(&cx)).expect("changed after wrap failed");
let second = *rx.borrow();
crate::assert_with_log!(second == 2, "value after wrap", 2, second);
let seen = rx.seen_version();
crate::assert_with_log!(seen == 0, "seen_version wrapped", 0, seen);
crate::test_complete!("version_wraparound_still_detects_changes");
}
#[test]
fn has_changed_reflects_state() {
init_test("has_changed_reflects_state");
let (tx, rx) = channel(0);
let changed = rx.has_changed();
crate::assert_with_log!(!changed, "initial has_changed", false, changed);
tx.send(1).expect("send failed");
let changed = rx.has_changed();
crate::assert_with_log!(changed, "has_changed after send", true, changed);
crate::test_complete!("has_changed_reflects_state");
}
#[test]
fn cloned_receiver_inherits_version() {
init_test("cloned_receiver_inherits_version");
let cx = test_cx();
let (tx, mut rx1) = channel(0);
tx.send(1).expect("send failed");
poll_ready(&mut rx1.changed(&cx)).expect("changed failed");
let rx2 = rx1.clone();
let changed = rx2.has_changed();
crate::assert_with_log!(!changed, "rx2 inherits version", false, changed);
crate::test_complete!("cloned_receiver_inherits_version");
}
#[test]
fn subscribe_gets_current_version() {
init_test("subscribe_gets_current_version");
let (tx, _rx) = channel(0);
tx.send(1).expect("send failed");
tx.send(2).expect("send failed");
let rx2 = tx.subscribe();
let changed = rx2.has_changed();
crate::assert_with_log!(!changed, "rx2 no change", false, changed);
let value = *rx2.borrow();
crate::assert_with_log!(value == 2, "rx2 value", 2, value);
crate::test_complete!("subscribe_gets_current_version");
}
#[test]
fn send_error_display() {
init_test("send_error_display");
let err = SendError::Closed(42);
let text = err.to_string();
crate::assert_with_log!(
text == "sending on a closed watch channel",
"display",
"sending on a closed watch channel",
text
);
crate::test_complete!("send_error_display");
}
#[test]
fn recv_error_display() {
init_test("recv_error_display");
let closed_text = RecvError::Closed.to_string();
crate::assert_with_log!(
closed_text == "receiving on a closed watch channel",
"display",
"receiving on a closed watch channel",
closed_text
);
let cancelled_text = RecvError::Cancelled.to_string();
crate::assert_with_log!(
cancelled_text == "receive operation cancelled",
"display",
"receive operation cancelled",
cancelled_text
);
crate::test_complete!("recv_error_display");
}
#[test]
fn ref_deref() {
init_test("ref_deref");
let (_tx, rx) = channel(42);
let r = rx.borrow();
let _: &i32 = &r;
let value = *r;
crate::assert_with_log!(value == 42, "deref", 42, value);
drop(r);
crate::test_complete!("ref_deref");
}
#[test]
fn ref_clone_inner() {
init_test("ref_clone_inner");
let (_tx, rx) = channel(String::from("hello"));
let cloned: String = rx.borrow().clone_inner();
crate::assert_with_log!(cloned == "hello", "clone_inner", "hello", cloned);
crate::test_complete!("ref_clone_inner");
}
#[test]
fn cancel_during_wait_preserves_version() {
init_test("cancel_during_wait_preserves_version");
let cx = test_cx();
cx.set_cancel_requested(true);
let (tx, mut rx) = channel(0);
let result = poll_ready(&mut rx.changed(&cx));
crate::assert_with_log!(
result.is_err(),
"changed error on cancel",
true,
result.is_err()
);
let version = rx.seen_version();
crate::assert_with_log!(version == 0, "seen_version", 0, version);
cx.set_cancel_requested(false);
tx.send(1).expect("send failed");
poll_ready(&mut rx.changed(&cx)).expect("changed failed");
let version = rx.seen_version();
crate::assert_with_log!(version == 1, "seen_version after", 1, version);
crate::test_complete!("cancel_during_wait_preserves_version");
}
#[test]
fn cancel_after_pending_repoll_reuses_waiter_slot() {
init_test("cancel_after_pending_repoll_reuses_waiter_slot");
let cx = test_cx();
let (tx, mut rx) = channel(0);
let waker = Waker::noop();
let mut task_cx = Context::from_waker(waker);
{
let mut future = rx.changed(&cx);
let first_poll = Pin::new(&mut future).poll(&mut task_cx);
crate::assert_with_log!(
first_poll.is_pending(),
"first poll pending",
true,
first_poll.is_pending()
);
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(waiter_count == 1, "waiter registered", 1, waiter_count);
cx.set_cancel_requested(true);
let cancelled_poll = Pin::new(&mut future).poll(&mut task_cx);
crate::assert_with_log!(
matches!(cancelled_poll, Poll::Ready(Err(RecvError::Cancelled))),
"pending waiter observes cancellation",
"Ready(Err(Cancelled))",
format!("{cancelled_poll:?}")
);
}
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 0,
"cancelled future drop cleans up waiter entry",
0,
waiter_count
);
cx.set_cancel_requested(false);
{
let mut future = rx.changed(&cx);
let repoll = Pin::new(&mut future).poll(&mut task_cx);
crate::assert_with_log!(
repoll.is_pending(),
"recreated future pending",
true,
repoll.is_pending()
);
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 1,
"re-poll re-registers waiter",
1,
waiter_count
);
}
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 0,
"future drop cleans up re-registered waiter",
0,
waiter_count
);
tx.send(1).expect("send failed");
poll_ready(&mut rx.changed(&cx)).expect("changed failed after send");
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 0,
"waiters drained after send",
0,
waiter_count
);
crate::test_complete!("cancel_after_pending_repoll_reuses_waiter_slot");
}
#[test]
fn changed_returns_pending_then_ready_after_send() {
init_test("changed_returns_pending_then_ready_after_send");
let cx = test_cx();
let (tx, mut rx) = channel(0);
let waker = Waker::noop();
let mut task_cx = Context::from_waker(waker);
{
let mut future = rx.changed(&cx);
let poll_result = Pin::new(&mut future).poll(&mut task_cx);
crate::assert_with_log!(
poll_result.is_pending(),
"first poll pending",
true,
poll_result.is_pending()
);
}
tx.send(42).expect("send failed");
poll_ready(&mut rx.changed(&cx)).expect("changed after send");
let value = *rx.borrow();
crate::assert_with_log!(value == 42, "value after send", 42, value);
crate::test_complete!("changed_returns_pending_then_ready_after_send");
}
#[test]
fn sender_drop_wakes_pending_receiver() {
init_test("sender_drop_wakes_pending_receiver");
let cx = test_cx();
let (tx, mut rx) = channel(0);
let waker = Waker::noop();
let mut task_cx = Context::from_waker(waker);
{
let mut future = rx.changed(&cx);
let poll_result = Pin::new(&mut future).poll(&mut task_cx);
crate::assert_with_log!(
poll_result.is_pending(),
"pending before drop",
true,
poll_result.is_pending()
);
}
drop(tx);
let result = poll_ready(&mut rx.changed(&cx));
crate::assert_with_log!(
matches!(result, Err(RecvError::Closed)),
"closed after sender drop",
true,
matches!(result, Err(RecvError::Closed))
);
crate::test_complete!("sender_drop_wakes_pending_receiver");
}
#[test]
fn sender_drop_wakes_all_pending_receivers() {
init_test("sender_drop_wakes_all_pending_receivers");
let cx = test_cx();
let (tx, mut rx1) = channel(0);
let mut rx2 = tx.subscribe();
let inner = Arc::clone(&tx.inner);
let wake_count1 = Arc::new(AtomicUsize::new(0));
let waker1 = Waker::from(Arc::new(CountWake {
count: Arc::clone(&wake_count1),
}));
let mut task_cx1 = Context::from_waker(&waker1);
let mut future1 = rx1.changed(&cx);
let first_poll = Pin::new(&mut future1).poll(&mut task_cx1);
crate::assert_with_log!(
first_poll.is_pending(),
"receiver 1 pending before sender drop",
true,
first_poll.is_pending()
);
let wake_count2 = Arc::new(AtomicUsize::new(0));
let waker2 = Waker::from(Arc::new(CountWake {
count: Arc::clone(&wake_count2),
}));
let mut task_cx2 = Context::from_waker(&waker2);
let mut future2 = rx2.changed(&cx);
let second_poll = Pin::new(&mut future2).poll(&mut task_cx2);
crate::assert_with_log!(
second_poll.is_pending(),
"receiver 2 pending before sender drop",
true,
second_poll.is_pending()
);
let waiter_count = inner.waiters.lock().len();
crate::assert_with_log!(waiter_count == 2, "two waiters registered", 2, waiter_count);
drop(tx);
let woken1 = wake_count1.load(Ordering::SeqCst);
crate::assert_with_log!(woken1 > 0, "receiver 1 woken on close", "> 0", woken1);
let woken2 = wake_count2.load(Ordering::SeqCst);
crate::assert_with_log!(woken2 > 0, "receiver 2 woken on close", "> 0", woken2);
let waiter_count = inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 0,
"close drains all waiters",
0,
waiter_count
);
let result1 = Pin::new(&mut future1).poll(&mut task_cx1);
crate::assert_with_log!(
matches!(result1, Poll::Ready(Err(RecvError::Closed))),
"receiver 1 sees closed",
"Ready(Err(Closed))",
format!("{result1:?}")
);
let result2 = Pin::new(&mut future2).poll(&mut task_cx2);
crate::assert_with_log!(
matches!(result2, Poll::Ready(Err(RecvError::Closed))),
"receiver 2 sees closed",
"Ready(Err(Closed))",
format!("{result2:?}")
);
crate::test_complete!("sender_drop_wakes_all_pending_receivers");
}
#[test]
fn no_unbounded_waker_growth() {
init_test("no_unbounded_waker_growth");
let cx = test_cx();
let (tx, mut rx) = channel(0);
let waker = Waker::noop();
let mut task_cx = Context::from_waker(waker);
{
let mut future = rx.changed(&cx);
for _ in 0..100 {
let result = Pin::new(&mut future).poll(&mut task_cx);
assert!(result.is_pending());
}
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 1,
"waiter count after repeated polls (future alive)",
1,
waiter_count
);
}
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 0,
"waiter cleaned up after future drop",
0,
waiter_count
);
tx.send(42).expect("send failed");
poll_ready(&mut rx.changed(&cx)).expect("changed failed");
let value = *rx.borrow();
crate::assert_with_log!(value == 42, "value after send", 42, value);
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 0,
"waiter count after drain",
0,
waiter_count
);
crate::test_complete!("no_unbounded_waker_growth");
}
#[test]
fn cancel_and_recreate_bounded_waiters() {
init_test("cancel_and_recreate_bounded_waiters");
let cx = test_cx();
let (tx, mut rx) = channel(0);
let waker = Waker::noop();
let mut task_cx = Context::from_waker(waker);
for _ in 0..50 {
let mut future = rx.changed(&cx);
let result = Pin::new(&mut future).poll(&mut task_cx);
assert!(result.is_pending());
let waiter_count = tx.inner.waiters.lock().len();
assert!(waiter_count <= 1, "at most 1 waiter while future alive");
}
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 0,
"all waiter entries cleaned up after future drops",
0,
waiter_count
);
tx.send(1).expect("send failed");
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(waiter_count == 0, "all drained after send", 0, waiter_count);
crate::test_complete!("cancel_and_recreate_bounded_waiters");
}
#[test]
fn dropped_receiver_waiter_is_pruned_on_next_registration() {
init_test("dropped_receiver_waiter_is_pruned_on_next_registration");
let cx = test_cx();
let (tx, mut rx1) = channel(0);
let mut rx2 = tx.subscribe();
let waker = Waker::noop();
let mut task_cx = Context::from_waker(waker);
{
let mut future = rx1.changed(&cx);
let result = Pin::new(&mut future).poll(&mut task_cx);
assert!(result.is_pending());
}
drop(rx1);
{
let mut future = rx2.changed(&cx);
let result = Pin::new(&mut future).poll(&mut task_cx);
assert!(result.is_pending());
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 1,
"rx2 waiter registered while future alive",
1,
waiter_count
);
}
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 0,
"rx2 waiter cleaned up after future drop",
0,
waiter_count
);
crate::test_complete!("dropped_receiver_waiter_is_pruned_on_next_registration");
}
#[test]
fn dropped_receiver_eagerly_removes_pending_waiter() {
init_test("dropped_receiver_eagerly_removes_pending_waiter");
let cx = test_cx();
let (tx, mut rx) = channel(0);
let waker = Waker::noop();
let mut task_cx = Context::from_waker(waker);
{
let mut future = rx.changed(&cx);
let result = Pin::new(&mut future).poll(&mut task_cx);
assert!(result.is_pending());
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(waiter_count == 1, "waiter registered", 1, waiter_count);
}
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 0,
"waiter cleaned by future drop",
0,
waiter_count
);
drop(rx);
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 0,
"waiter removed on receiver drop",
0,
waiter_count
);
let receiver_count = tx.receiver_count();
crate::assert_with_log!(
receiver_count == 0,
"receiver count after drop",
0,
receiver_count
);
crate::test_complete!("dropped_receiver_eagerly_removes_pending_waiter");
}
#[test]
fn completed_future_drop_cleans_false_flag_waiter_entry() {
init_test("completed_future_drop_cleans_false_flag_waiter_entry");
let cx = test_cx();
let (tx, mut rx) = channel(0);
let wake_count = Arc::new(AtomicUsize::new(0));
let waiter = Arc::new(AtomicBool::new(false));
let waiter_waker = Waker::from(Arc::new(CountWake {
count: Arc::clone(&wake_count),
}));
tx.inner.register_waker(WatchWaiter {
waker: waiter_waker,
queued: Arc::clone(&waiter),
});
rx.waiter = Some(Arc::clone(&waiter));
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 1,
"stale waiter entry present before drop",
1,
waiter_count
);
crate::assert_with_log!(
!waiter.load(Ordering::Acquire),
"queued flag already cleared before drop",
false,
waiter.load(Ordering::Acquire)
);
let future = ChangedFuture {
receiver: &mut rx,
cx: &cx,
completed: true,
};
drop(future);
let waiter_count = tx.inner.waiters.lock().len();
crate::assert_with_log!(
waiter_count == 0,
"completed future drop removes stale waiter entry",
0,
waiter_count
);
let wake_total = wake_count.load(Ordering::SeqCst);
crate::assert_with_log!(
wake_total == 0,
"drop does not spuriously wake",
0,
wake_total
);
crate::test_complete!("completed_future_drop_cleans_false_flag_waiter_entry");
}
struct CountWake {
count: Arc<AtomicUsize>,
}
impl std::task::Wake for CountWake {
fn wake(self: Arc<Self>) {
self.count.fetch_add(1, Ordering::SeqCst);
}
fn wake_by_ref(self: &Arc<Self>) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn changed_updates_waiter_waker_on_repoll() {
init_test("changed_updates_waiter_waker_on_repoll");
let cx = test_cx();
let (tx, mut rx) = channel(0);
let mut future = rx.changed(&cx);
let first_count = Arc::new(AtomicUsize::new(0));
let first_waker = Waker::from(Arc::new(CountWake {
count: Arc::clone(&first_count),
}));
let mut first_cx = Context::from_waker(&first_waker);
let first_poll = Pin::new(&mut future).poll(&mut first_cx);
crate::assert_with_log!(
first_poll.is_pending(),
"first poll pending",
true,
first_poll.is_pending()
);
let second_count = Arc::new(AtomicUsize::new(0));
let second_waker = Waker::from(Arc::new(CountWake {
count: Arc::clone(&second_count),
}));
let mut second_cx = Context::from_waker(&second_waker);
let second_poll = Pin::new(&mut future).poll(&mut second_cx);
crate::assert_with_log!(
second_poll.is_pending(),
"second poll pending",
true,
second_poll.is_pending()
);
tx.send(1).expect("send failed");
let second_wake_count = second_count.load(Ordering::SeqCst);
crate::assert_with_log!(
second_wake_count > 0,
"latest waker notified",
"> 0",
second_wake_count
);
let first_wake_count = first_count.load(Ordering::SeqCst);
crate::assert_with_log!(
first_wake_count == 0,
"stale waker not notified",
0,
first_wake_count
);
poll_ready(&mut future).expect("changed should complete after send");
crate::test_complete!("changed_updates_waiter_waker_on_repoll");
}
#[test]
fn shutdown_signal_pattern() {
init_test("shutdown_signal_pattern");
let cx = test_cx();
let (shutdown_tx, mut shutdown_rx) = channel(false);
let initial = *shutdown_rx.borrow();
crate::assert_with_log!(!initial, "initial false", false, initial);
shutdown_tx.send(true).expect("send failed");
poll_ready(&mut shutdown_rx.changed(&cx)).expect("changed failed");
let value = *shutdown_rx.borrow();
crate::assert_with_log!(value, "shutdown true", true, value);
crate::test_complete!("shutdown_signal_pattern");
}
#[test]
fn sender_drop_sets_sender_dropped_atomically() {
init_test("sender_drop_sets_sender_dropped_atomically");
let (tx, rx) = channel::<i32>(0);
let dropped = tx.inner.sender_dropped.load(Ordering::Acquire);
crate::assert_with_log!(!dropped, "sender not dropped yet", false, dropped);
drop(tx);
let dropped = rx.inner.sender_dropped.load(Ordering::Acquire);
crate::assert_with_log!(dropped, "sender dropped after drop", true, dropped);
crate::test_complete!("sender_drop_sets_sender_dropped_atomically");
}
#[test]
fn receiver_drop_decrements_count_atomically() {
init_test("receiver_drop_decrements_count_atomically");
let (tx, rx) = channel::<i32>(0);
let count = tx.inner.receiver_count.load(Ordering::Acquire);
crate::assert_with_log!(count == 1, "initial count", 1usize, count);
drop(rx);
let count = tx.inner.receiver_count.load(Ordering::Acquire);
crate::assert_with_log!(count == 0, "count after drop", 0usize, count);
crate::test_complete!("receiver_drop_decrements_count_atomically");
}
#[test]
fn subscribe_version_is_consistent_with_send() {
init_test("subscribe_version_is_consistent_with_send");
let (tx, _rx) = channel(0i32);
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();
let pre_version = tx.inner.current_version();
let rx2 = tx.subscribe();
let post_version = tx.inner.current_version();
crate::assert_with_log!(
rx2.seen_version == pre_version,
"subscribe version matches current",
pre_version,
rx2.seen_version
);
crate::assert_with_log!(
pre_version == post_version,
"no concurrent version change",
pre_version,
post_version
);
assert!(!rx2.has_changed());
tx.send(4).unwrap();
assert!(rx2.has_changed());
crate::test_complete!("subscribe_version_is_consistent_with_send");
}
#[test]
fn subscribe_under_read_lock_blocks_concurrent_send() {
init_test("subscribe_under_read_lock_blocks_concurrent_send");
let (tx, _rx) = channel(0i32);
let guard = tx.inner.value.read();
let version_under_lock = guard.1;
tx.inner.receiver_count.fetch_add(1, Ordering::Relaxed);
let count = tx.inner.receiver_count.load(Ordering::Acquire);
crate::assert_with_log!(count == 2, "count bumped under lock", 2usize, count);
let version_still = tx.inner.current_version();
crate::assert_with_log!(
version_still == version_under_lock,
"version stable under read lock",
version_under_lock,
version_still
);
tx.inner.receiver_count.fetch_sub(1, Ordering::Release);
drop(guard);
crate::test_complete!("subscribe_under_read_lock_blocks_concurrent_send");
}
#[test]
fn watch_send_error_debug_clone_copy_eq() {
let e = SendError::Closed(42);
let dbg = format!("{e:?}");
assert!(dbg.contains("Closed"), "{dbg}");
let copied: SendError<i32> = e;
let cloned = e;
assert_eq!(copied, cloned);
}
#[test]
fn watch_recv_error_debug_clone_copy_eq() {
let e = RecvError::Closed;
let dbg = format!("{e:?}");
assert!(dbg.contains("Closed"), "{dbg}");
let copied: RecvError = e;
let cloned = e;
assert_eq!(copied, cloned);
assert_ne!(e, RecvError::Cancelled);
}
#[test]
fn modify_error_debug_clone_copy_eq() {
let e = ModifyError;
let dbg = format!("{e:?}");
assert!(dbg.contains("ModifyError"), "{dbg}");
let copied: ModifyError = e;
let cloned = e;
assert_eq!(copied, cloned);
}
#[test]
fn modify_error_display_matches_closed_sender_semantics() {
init_test("modify_error_display_matches_closed_sender_semantics");
let text = ModifyError.to_string();
crate::assert_with_log!(
text == "modifying a closed watch channel",
"display",
"modifying a closed watch channel",
text
);
crate::test_complete!("modify_error_display_matches_closed_sender_semantics");
}
#[test]
fn metamorphic_borrow_and_update_consistency() {
init_test("metamorphic_borrow_and_update_consistency");
let _cx = test_cx();
let (tx, mut rx) = channel(0u64);
{
let initial = rx.borrow_and_update();
crate::assert_with_log!(*initial == 0, "initial value", 0u64, *initial);
}
crate::assert_with_log!(
rx.seen_version == 0,
"initial seen version",
0u64,
rx.seen_version
);
tx.send(42).expect("send failed");
{
let value1 = rx.borrow_and_update();
crate::assert_with_log!(*value1 == 42, "after send(42)", 42u64, *value1);
}
crate::assert_with_log!(
rx.seen_version == 1,
"version after first send",
1u64,
rx.seen_version
);
for i in 1..10 {
let val = 100 + i;
tx.send(val).expect("send failed");
{
let observed = rx.borrow_and_update();
crate::assert_with_log!(
*observed == val,
&format!("sequence send {} value", i),
val,
*observed
);
}
crate::assert_with_log!(
rx.seen_version == i + 1,
&format!("sequence send {} version", i),
i + 1,
rx.seen_version
);
}
let mut rx2 = tx.subscribe();
let mut rx3 = tx.subscribe();
tx.send(999).expect("send failed");
{
let val1 = rx.borrow_and_update();
let val2 = rx2.borrow_and_update();
let val3 = rx3.borrow_and_update();
crate::assert_with_log!(*val1 == 999, "rx1 sees latest", 999u64, *val1);
crate::assert_with_log!(*val2 == 999, "rx2 sees latest", 999u64, *val2);
crate::assert_with_log!(*val3 == 999, "rx3 sees latest", 999u64, *val3);
}
tx.send(1234).expect("send failed");
{
let val_update = rx.borrow_and_update();
crate::assert_with_log!(
*val_update == 1234,
"borrow_and_update value",
1234u64,
*val_update
);
}
{
let val_borrow = rx.borrow();
crate::assert_with_log!(
*val_borrow == 1234,
"subsequent borrow value",
1234u64,
*val_borrow
);
}
let version_after_borrow = rx.seen_version;
crate::assert_with_log!(
version_after_borrow == 12,
"version unchanged by borrow",
12u64,
version_after_borrow
);
crate::test_complete!("metamorphic_borrow_and_update_consistency");
}
#[test]
fn metamorphic_receiver_isolation() {
init_test("metamorphic_receiver_isolation");
let _cx = test_cx();
let (tx, rx1_base) = channel(0u32);
let mut rx1 = rx1_base;
let mut rx2 = tx.subscribe();
let mut rx3 = tx.subscribe();
let init1 = rx1.seen_version();
let init2 = rx2.seen_version();
let init3 = rx3.seen_version();
crate::assert_with_log!(init1 == 0, "rx1 initial version", 0u64, init1);
crate::assert_with_log!(init2 == init3, "rx2 rx3 same start version", init2, init3);
tx.send(100).expect("send failed");
{
let val1 = rx1.borrow_and_update();
crate::assert_with_log!(*val1 == 100, "rx1 observes send", 100u32, *val1);
}
let _rx1_version = rx1.seen_version();
let rx2_version_before = rx2.seen_version();
crate::assert_with_log!(
rx2_version_before == init2,
"rx2 version independent of rx1",
init2,
rx2_version_before
);
{
let val2 = rx2.borrow_and_update();
crate::assert_with_log!(*val2 == 100, "rx2 observes same value", 100u32, *val2);
}
let _rx2_version_after = rx2.seen_version();
let rx3_version_before = rx3.seen_version();
crate::assert_with_log!(
rx3_version_before == init3,
"rx3 version independent of rx1/rx2",
init3,
rx3_version_before
);
tx.send(200).expect("send failed");
tx.send(300).expect("send failed");
{
let val1_latest = rx1.borrow_and_update();
crate::assert_with_log!(
*val1_latest == 300,
"rx1 sees latest after multiple sends",
300u32,
*val1_latest
);
}
{
let val3_first = rx3.borrow_and_update();
crate::assert_with_log!(
*val3_first == 300,
"rx3 sees latest on first observation",
300u32,
*val3_first
);
}
let v1 = rx1.seen_version();
let v2 = rx2.seen_version();
let v3 = rx3.seen_version();
crate::assert_with_log!(v1 == 3, "rx1 latest version", 3u64, v1);
crate::assert_with_log!(v3 == 3, "rx3 latest version", 3u64, v3);
crate::assert_with_log!(v2 == 1, "rx2 independent version", 1u64, v2);
tx.send(400).expect("send failed");
let has_changed1 = rx1.has_changed();
let has_changed2 = rx2.has_changed();
let has_changed3 = rx3.has_changed();
crate::assert_with_log!(has_changed1, "rx1 has changes", true, has_changed1);
crate::assert_with_log!(has_changed2, "rx2 has changes", true, has_changed2);
crate::assert_with_log!(has_changed3, "rx3 has changes", true, has_changed3);
rx1.mark_seen();
let has_changed1_after = rx1.has_changed();
let has_changed2_after = rx2.has_changed();
let has_changed3_after = rx3.has_changed();
crate::assert_with_log!(
!has_changed1_after,
"rx1 no changes after mark",
false,
has_changed1_after
);
crate::assert_with_log!(
has_changed2_after,
"rx2 still has changes",
true,
has_changed2_after
);
crate::assert_with_log!(
has_changed3_after,
"rx3 still has changes",
true,
has_changed3_after
);
crate::test_complete!("metamorphic_receiver_isolation");
}
#[test]
fn borrow_and_update_acknowledges_the_snapshot_it_returns() {
init_test("borrow_and_update_acknowledges_the_snapshot_it_returns");
let (tx, mut rx) = channel(10u32);
tx.send(20).expect("send failed");
let current_version = tx.inner.current_version();
let snapshot_value = {
let snapshot = rx.borrow_and_update();
*snapshot
};
crate::assert_with_log!(
snapshot_value == 20,
"snapshot value",
20u32,
snapshot_value
);
crate::assert_with_log!(
rx.seen_version() == current_version,
"borrow_and_update aligns seen version",
current_version,
rx.seen_version()
);
let changed = rx.has_changed();
crate::assert_with_log!(
!changed,
"no unread change after snapshot-aligned ack",
false,
changed
);
tx.send(30).expect("send failed");
let changed = rx.has_changed();
crate::assert_with_log!(changed, "new send becomes visible", true, changed);
crate::test_complete!("borrow_and_update_acknowledges_the_snapshot_it_returns");
}
#[test]
fn metamorphic_borrow_and_update_clone_matches_explicit_snapshot_clone() {
init_test("metamorphic_borrow_and_update_clone_matches_explicit_snapshot_clone");
let (tx, mut rx_explicit) = channel(1u32);
let mut rx_clone = tx.subscribe();
tx.send(10).expect("send failed");
tx.send(20).expect("send failed");
let current_version = tx.inner.current_version();
let explicit_value = {
let snapshot = rx_explicit.borrow_and_update();
snapshot.clone_inner()
};
let clone_value = rx_clone.borrow_and_update_clone();
crate::assert_with_log!(
explicit_value == clone_value,
"clone helper matches explicit snapshot clone",
explicit_value,
clone_value
);
crate::assert_with_log!(
explicit_value == 20,
"both paths observe latest unread value",
20u32,
explicit_value
);
crate::assert_with_log!(
rx_explicit.seen_version() == current_version,
"explicit path acknowledges current version",
current_version,
rx_explicit.seen_version()
);
crate::assert_with_log!(
rx_clone.seen_version() == current_version,
"clone helper acknowledges current version",
current_version,
rx_clone.seen_version()
);
let explicit_changed = rx_explicit.has_changed();
let clone_changed = rx_clone.has_changed();
crate::assert_with_log!(
!explicit_changed,
"explicit path has no duplicate change after acknowledgement",
false,
explicit_changed
);
crate::assert_with_log!(
!clone_changed,
"clone helper has no duplicate change after acknowledgement",
false,
clone_changed
);
tx.send(30).expect("send failed");
let explicit_next = {
let snapshot = rx_explicit.borrow_and_update();
snapshot.clone_inner()
};
let clone_next = rx_clone.borrow_and_update_clone();
let next_version = tx.inner.current_version();
crate::assert_with_log!(
explicit_next == clone_next,
"next send remains aligned across both acknowledgement paths",
explicit_next,
clone_next
);
crate::assert_with_log!(
explicit_next == 30,
"next send value observed by both paths",
30u32,
explicit_next
);
crate::assert_with_log!(
rx_explicit.seen_version() == next_version,
"explicit path advances on next send",
next_version,
rx_explicit.seen_version()
);
crate::assert_with_log!(
rx_clone.seen_version() == next_version,
"clone helper advances on next send",
next_version,
rx_clone.seen_version()
);
crate::test_complete!(
"metamorphic_borrow_and_update_clone_matches_explicit_snapshot_clone"
);
}
#[test]
fn mark_seen_acknowledges_latest_version_not_prior_borrow_snapshot() {
init_test("mark_seen_acknowledges_latest_version_not_prior_borrow_snapshot");
let (tx, mut rx) = channel(1u32);
tx.send(2).expect("send failed");
let borrowed_snapshot = rx.borrow().clone_inner();
crate::assert_with_log!(
borrowed_snapshot == 2,
"borrowed snapshot before later send",
2u32,
borrowed_snapshot
);
tx.send(3).expect("send failed");
rx.mark_seen();
let seen_version = rx.seen_version();
let current_version = tx.inner.current_version();
crate::assert_with_log!(
seen_version == current_version,
"mark_seen advances to current version",
current_version,
seen_version
);
let changed = rx.has_changed();
crate::assert_with_log!(
!changed,
"mark_seen cleared both pending versions",
false,
changed
);
let latest = *rx.borrow();
crate::assert_with_log!(
latest == 3,
"latest borrow reflects post-mark version",
3u32,
latest
);
crate::test_complete!("mark_seen_acknowledges_latest_version_not_prior_borrow_snapshot");
}
#[test]
fn metamorphic_subscription_snapshot_ordering() {
init_test("metamorphic_subscription_snapshot_ordering");
let cx = test_cx();
let (tx, mut rx1) = channel(0u32);
tx.send(10).expect("send failed");
let rx1_snapshot = rx1.borrow().clone_inner();
crate::assert_with_log!(
rx1_snapshot == 10,
"existing receiver sees first value via borrow",
10u32,
rx1_snapshot
);
let mut rx2 = tx.subscribe();
crate::assert_with_log!(
*rx2.borrow() == 10,
"new subscriber borrows current snapshot immediately",
10u32,
*rx2.borrow()
);
crate::assert_with_log!(
!rx2.has_changed(),
"new subscriber starts caught up to current version",
false,
rx2.has_changed()
);
tx.send(20).expect("send failed");
tx.send(30).expect("send failed");
let mut rx2_changed = rx2.changed(&cx);
let rx2_change = poll_ready(&mut rx2_changed);
drop(rx2_changed);
crate::assert_with_log!(
rx2_change.is_ok(),
"subscriber observes burst as a single pending change",
true,
rx2_change.is_ok()
);
crate::assert_with_log!(
*rx2.borrow() == 30,
"subscriber lands on latest burst value",
30u32,
*rx2.borrow()
);
let rx1_latest = {
let snapshot = rx1.borrow_and_update();
*snapshot
};
crate::assert_with_log!(
rx1_latest == 30,
"older receiver also lands on latest burst value",
30u32,
rx1_latest
);
crate::assert_with_log!(
!rx1.has_changed(),
"borrow_and_update fully acknowledges latest snapshot",
false,
rx1.has_changed()
);
let mut rx2_pending = rx2.changed(&cx);
let pending_waker = Waker::noop();
let mut pending_cx = Context::from_waker(pending_waker);
let pending_poll = Pin::new(&mut rx2_pending).poll(&mut pending_cx);
crate::assert_with_log!(
matches!(pending_poll, Poll::Pending),
"subscriber receives no duplicate notification after acknowledging burst",
true,
matches!(pending_poll, Poll::Pending)
);
drop(rx2_pending);
tx.send(40).expect("send failed");
crate::assert_with_log!(
rx1.has_changed(),
"next send is visible to older receiver after prior acknowledgement",
true,
rx1.has_changed()
);
crate::assert_with_log!(
rx2.has_changed(),
"next send is visible to subscriber after prior acknowledgement",
true,
rx2.has_changed()
);
crate::test_complete!("metamorphic_subscription_snapshot_ordering");
}
#[test]
fn metamorphic_changed_exactness() {
init_test("metamorphic_changed_exactness");
let cx = test_cx();
let (tx, mut rx) = channel(0i32);
let poll_changed = |rx: &mut Receiver<i32>| -> Result<(), RecvError> {
let mut future = rx.changed(&cx);
poll_ready(&mut future)
};
let initial_version = rx.seen_version();
crate::assert_with_log!(
initial_version == 0,
"initial version",
0u64,
initial_version
);
tx.send(1).expect("send failed");
let change1 = poll_changed(&mut rx);
crate::assert_with_log!(
change1.is_ok(),
"first changed() succeeds",
true,
change1.is_ok()
);
let version_after_change = rx.seen_version();
crate::assert_with_log!(
version_after_change == 1,
"version updated after changed()",
1u64,
version_after_change
);
crate::assert_with_log!(
!rx.has_changed(),
"no changes after changed()",
false,
rx.has_changed()
);
let mut change_count = 1; for i in 2..=5 {
tx.send(i).expect("send failed");
let change = poll_changed(&mut rx);
crate::assert_with_log!(
change.is_ok(),
&format!("changed() {} succeeds", i),
true,
change.is_ok()
);
change_count += 1;
let version = rx.seen_version();
crate::assert_with_log!(
version == i as u64,
&format!("version {} after send {}", i, i),
i as u64,
version
);
}
crate::assert_with_log!(
change_count == 5,
"exactly 5 changes for 5 sends",
5,
change_count
);
for i in 10..15 {
tx.send(i).expect("send failed");
}
let rapid_change = poll_changed(&mut rx);
crate::assert_with_log!(
rapid_change.is_ok(),
"coalesced rapid burst detected",
true,
rapid_change.is_ok()
);
let final_version = rx.seen_version();
crate::assert_with_log!(
final_version == 10,
"rapid burst advances to latest version",
10u64,
final_version
);
crate::assert_with_log!(
*rx.borrow() == 14,
"rapid burst exposes latest value",
14i32,
*rx.borrow()
);
let mut pending_after_burst = rx.changed(&cx);
let burst_waker = Waker::noop();
let mut burst_cx = Context::from_waker(burst_waker);
let burst_poll = Pin::new(&mut pending_after_burst).poll(&mut burst_cx);
crate::assert_with_log!(
matches!(burst_poll, Poll::Pending),
"no second notification after coalesced burst",
true,
matches!(burst_poll, Poll::Pending)
);
drop(pending_after_burst);
tx.send(999).expect("send failed");
tx.send(999).expect("send failed"); tx.send(999).expect("send failed");
let duplicate_burst = poll_changed(&mut rx);
crate::assert_with_log!(
duplicate_burst.is_ok(),
"duplicate-value burst detected",
true,
duplicate_burst.is_ok()
);
crate::assert_with_log!(
rx.seen_version() == 13,
"duplicate sends still advance version",
13u64,
rx.seen_version()
);
let mut pending_after_duplicates = rx.changed(&cx);
let duplicate_waker = Waker::noop();
let mut duplicate_cx = Context::from_waker(duplicate_waker);
let duplicate_poll = Pin::new(&mut pending_after_duplicates).poll(&mut duplicate_cx);
crate::assert_with_log!(
matches!(duplicate_poll, Poll::Pending),
"duplicate burst also coalesces to one notification",
true,
matches!(duplicate_poll, Poll::Pending)
);
drop(pending_after_duplicates);
let final_value = rx.borrow_and_update();
crate::assert_with_log!(
*final_value == 999,
"final value correct",
999i32,
*final_value
);
crate::test_complete!("metamorphic_changed_exactness");
}
#[test]
fn metamorphic_closed_sender_behavior() {
init_test("metamorphic_closed_sender_behavior");
let cx = test_cx();
let (tx, mut rx1) = channel(0u8);
let mut rx2 = tx.subscribe();
let mut rx3 = tx.subscribe();
crate::assert_with_log!(
!rx1.is_closed(),
"rx1 initially open",
false,
rx1.is_closed()
);
crate::assert_with_log!(
!rx2.is_closed(),
"rx2 initially open",
false,
rx2.is_closed()
);
crate::assert_with_log!(
!rx3.is_closed(),
"rx3 initially open",
false,
rx3.is_closed()
);
tx.send(42).expect("send failed");
{
let val1 = rx1.borrow_and_update();
let val2 = rx2.borrow_and_update();
let val3 = rx3.borrow_and_update();
crate::assert_with_log!(*val1 == 42, "rx1 sees value", 42u8, *val1);
crate::assert_with_log!(*val2 == 42, "rx2 sees value", 42u8, *val2);
crate::assert_with_log!(*val3 == 42, "rx3 sees value", 42u8, *val3);
}
drop(tx);
crate::assert_with_log!(
rx1.is_closed(),
"rx1 closed after drop",
true,
rx1.is_closed()
);
crate::assert_with_log!(
rx2.is_closed(),
"rx2 closed after drop",
true,
rx2.is_closed()
);
crate::assert_with_log!(
rx3.is_closed(),
"rx3 closed after drop",
true,
rx3.is_closed()
);
let mut future1 = rx1.changed(&cx);
let result1 = poll_ready(&mut future1);
drop(future1);
crate::assert_with_log!(
matches!(result1, Err(RecvError::Closed)),
"rx1 changed() returns Closed",
true,
matches!(result1, Err(RecvError::Closed))
);
let mut future2 = rx2.changed(&cx);
let result2 = poll_ready(&mut future2);
drop(future2);
crate::assert_with_log!(
matches!(result2, Err(RecvError::Closed)),
"rx2 changed() returns Closed",
true,
matches!(result2, Err(RecvError::Closed))
);
let mut future3 = rx3.changed(&cx);
let result3 = poll_ready(&mut future3);
drop(future3);
crate::assert_with_log!(
matches!(result3, Err(RecvError::Closed)),
"rx3 changed() returns Closed",
true,
matches!(result3, Err(RecvError::Closed))
);
{
let final1 = rx1.borrow();
let final2 = rx2.borrow();
let final3 = rx3.borrow();
crate::assert_with_log!(*final1 == 42, "rx1 final value readable", 42u8, *final1);
crate::assert_with_log!(*final2 == 42, "rx2 final value readable", 42u8, *final2);
crate::assert_with_log!(*final3 == 42, "rx3 final value readable", 42u8, *final3);
}
let (tx2, mut rx4) = channel(100i32);
let mut rx5 = tx2.subscribe();
tx2.send(200).expect("send failed");
{
let val4 = rx4.borrow_and_update();
crate::assert_with_log!(*val4 == 200, "rx4 initial value", 200i32, *val4);
}
crate::assert_with_log!(
rx5.has_changed(),
"rx5 has pending changes",
true,
rx5.has_changed()
);
drop(tx2);
let mut future5 = rx5.changed(&cx);
let result5 = poll_ready(&mut future5);
drop(future5);
crate::assert_with_log!(
matches!(result5, Ok(())),
"rx5 receives final unseen update before Closed",
true,
matches!(result5, Ok(()))
);
let mut future5_closed = rx5.changed(&cx);
let result5_closed = poll_ready(&mut future5_closed);
drop(future5_closed);
crate::assert_with_log!(
matches!(result5_closed, Err(RecvError::Closed)),
"rx5 returns Closed after draining final value",
true,
matches!(result5_closed, Err(RecvError::Closed))
);
{
let final5 = rx5.borrow();
crate::assert_with_log!(
*final5 == 200,
"rx5 can still read last value",
200i32,
*final5
);
}
crate::test_complete!("metamorphic_closed_sender_behavior");
}
#[test]
fn mr_borrow_and_update_equivalence_after_shutdown() {
init_test("mr_borrow_and_update_equivalence_after_shutdown");
let (tx, mut rx) = channel(42u32);
tx.send(100).expect("send failed");
let final_value = 100u32;
drop(tx);
let value1 = *rx.borrow_and_update();
let value2 = *rx.borrow_and_update();
let value3 = *rx.borrow_and_update();
crate::assert_with_log!(
value1 == final_value && value2 == final_value && value3 == final_value,
"equivalent values after shutdown",
(final_value, final_value, final_value),
(value1, value2, value3)
);
crate::assert_with_log!(
value1 == value2 && value2 == value3,
"all calls return same value",
value1,
(value2, value3)
);
crate::test_complete!("mr_borrow_and_update_equivalence_after_shutdown");
}
#[test]
fn mr_version_monotonicity_after_shutdown() {
init_test("mr_version_monotonicity_after_shutdown");
let (tx, mut rx) = channel(0u32);
tx.send(1).expect("send failed");
tx.send(2).expect("send failed");
let version_before_drop = tx.inner.current_version();
drop(tx);
let mut versions = Vec::new();
for i in 0..5 {
let _value = rx.borrow_and_update();
drop(_value);
let version = rx.seen_version();
versions.push(version);
if i > 0 {
crate::assert_with_log!(
version >= versions[i - 1],
&format!("version monotonic at call {}", i),
versions[i - 1],
version
);
}
}
let final_version = versions.last().copied().unwrap();
crate::assert_with_log!(
final_version == version_before_drop,
"final version matches pre-drop version",
version_before_drop,
final_version
);
crate::test_complete!("mr_version_monotonicity_after_shutdown");
}
#[test]
fn mr_receiver_isolation_after_shutdown() {
init_test("mr_receiver_isolation_after_shutdown");
let (tx, mut rx1) = channel(10u32);
let mut rx2 = tx.subscribe();
let mut rx3 = tx.subscribe();
tx.send(200).expect("send failed");
drop(tx);
let value1a = *rx1.borrow_and_update();
let version1a = rx1.seen_version();
let value2a = *rx2.borrow_and_update();
let version2a = rx2.seen_version();
let value3a = *rx3.borrow_and_update();
let version3a = rx3.seen_version();
let (tx, mut rx1) = channel(10u32);
let mut rx2 = tx.subscribe();
let mut rx3 = tx.subscribe();
tx.send(200).expect("send failed");
drop(tx);
let value3b = *rx3.borrow_and_update();
let version3b = rx3.seen_version();
let value1b = *rx1.borrow_and_update();
let version1b = rx1.seen_version();
let value2b = *rx2.borrow_and_update();
let version2b = rx2.seen_version();
crate::assert_with_log!(
(value1a, value2a, value3a) == (value1b, value2b, value3b),
"values independent of call order",
(value1a, value2a, value3a),
(value1b, value2b, value3b)
);
crate::assert_with_log!(
(version1a, version2a, version3a) == (version1b, version2b, version3b),
"versions independent of call order",
(version1a, version2a, version3a),
(version1b, version2b, version3b)
);
crate::test_complete!("mr_receiver_isolation_after_shutdown");
}
#[test]
fn mr_state_consistency_borrow_vs_borrow_and_update_after_shutdown() {
init_test("mr_state_consistency_borrow_vs_borrow_and_update_after_shutdown");
let (tx, mut rx1) = channel(5u32);
let rx2 = tx.subscribe();
tx.send(300).expect("send failed");
drop(tx);
let value_update = *rx1.borrow_and_update();
let value_borrow = *rx2.borrow();
crate::assert_with_log!(
value_update == value_borrow,
"borrow_and_update and borrow return same value after shutdown",
value_update,
value_borrow
);
crate::assert_with_log!(
value_update == 300 && value_borrow == 300,
"both see final sent value",
300u32,
(value_update, value_borrow)
);
crate::test_complete!("mr_state_consistency_borrow_vs_borrow_and_update_after_shutdown");
}
#[test]
fn send_modify_deadlock_prevention() {
init_test("send_modify_deadlock_prevention");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
let region = runtime
.state
.create_root_region(crate::types::Budget::INFINITE);
let (_tx1, rx1) = channel(0u32);
let (tx2, rx2) = channel(String::from("initial"));
let (task_id, _task_handle) = runtime
.state
.create_task(region, crate::types::Budget::INFINITE, async move {
let result = tx2.send_modify(|s| {
let current_value = *rx1.borrow();
*s = format!("modified_with_{}", current_value);
});
result.map_err(|_| {
crate::error::Error::cancelled(&crate::types::CancelReason::default())
})
})
.unwrap();
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_quiescent();
let final_value = rx2.borrow();
crate::assert_with_log!(
*final_value == "modified_with_0",
"send_modify closure executed without deadlock",
"modified_with_0",
&*final_value
);
crate::test_complete!("send_modify_deadlock_prevention");
}
#[test]
fn audit_concurrent_send_during_borrow_and_update() {
init_test("audit_concurrent_send_during_borrow_and_update");
let (tx, mut rx) = channel::<u32>(0);
tx.send(42).unwrap();
let borrowed = rx.borrow_and_update();
let observed_value = *borrowed;
drop(borrowed);
assert_eq!(
observed_value, 42,
"borrow_and_update should observe sent value"
);
assert_eq!(
rx.seen_version, 1,
"seen_version should be updated to version of observed value"
);
tx.send(100).unwrap();
tx.send(200).unwrap();
tx.send(300).unwrap();
let latest = *rx.borrow_and_update();
assert_eq!(
latest, 300,
"borrow_and_update should see latest value after rapid sends"
);
tx.send(500).unwrap();
let concurrent_borrow = rx.borrow_and_update();
let concurrent_value = *concurrent_borrow;
drop(concurrent_borrow);
tx.send(600).unwrap();
assert_eq!(
concurrent_value, 500,
"concurrent borrow should see consistent value"
);
assert!(
rx.has_changed(),
"receiver should detect new value after marking previous as seen"
);
let initial_version = rx.seen_version;
tx.send(700).unwrap(); tx.send(800).unwrap(); tx.send(900).unwrap();
let final_value = *rx.borrow_and_update();
assert_eq!(
final_value, 900,
"should see final value after rapid sequence"
);
assert!(
rx.seen_version > initial_version,
"seen_version should advance"
);
let (tx2, mut rx2) = channel::<String>("initial".to_string());
tx2.send("step1".to_string()).unwrap();
tx2.send("step2".to_string()).unwrap();
tx2.send("step3".to_string()).unwrap();
let final_state = rx2.borrow_and_update();
assert_eq!(
*final_state, "step3",
"should observe latest consistent state"
);
drop(final_state);
assert!(
!rx2.has_changed(),
"no changes should remain after observing latest"
);
crate::test_complete!("audit_concurrent_send_during_borrow_and_update");
}
#[test]
fn audit_receiver_count_immediate_decrement() {
init_test("audit_receiver_count_immediate_decrement");
let (tx, rx1) = channel::<u32>(0);
let rx2 = rx1.clone();
let rx3 = tx.subscribe();
let rx4 = tx.subscribe();
assert_eq!(tx.receiver_count(), 4, "initial receiver count");
std::thread::scope(|s| {
let tx_ref = &tx;
let handle1 = s.spawn(|| {
drop(rx2);
tx_ref.receiver_count()
});
let handle2 = s.spawn(|| {
drop(rx3);
tx_ref.receiver_count()
});
drop(rx4);
let main_count = tx.receiver_count();
let count1 = handle1.join().unwrap();
let count2 = handle2.join().unwrap();
assert!(count1 <= 3, "thread1 saw decremented count: {}", count1);
assert!(count2 <= 3, "thread2 saw decremented count: {}", count2);
assert!(
main_count <= 3,
"main thread saw decremented count: {}",
main_count
);
let final_count = tx.receiver_count();
assert_eq!(final_count, 1, "final count after concurrent drops");
});
tx.send(42).unwrap();
let value = *rx1.borrow();
assert_eq!(value, 42, "remaining receiver still functional");
drop(rx1);
assert_eq!(
tx.receiver_count(),
0,
"count zero after dropping all receivers"
);
crate::test_complete!("audit_receiver_count_immediate_decrement");
}
#[test]
fn audit_watch_no_buffering_latest_only() {
init_test("audit_watch_no_buffering_latest_only");
let cx = test_cx();
let (tx, mut rx) = channel(0u32);
let initial_version = rx.seen_version;
crate::assert_with_log!(
initial_version == 0,
"receiver starts at initial version",
0,
initial_version
);
crate::assert_with_log!(
*rx.borrow() == 0,
"receiver sees initial value",
0,
*rx.borrow()
);
for i in 1..=5 {
tx.send(i).expect("send should succeed");
}
let current_value = *rx.borrow();
crate::assert_with_log!(
current_value == 5,
"receiver sees only latest value, intermediate values lost",
5,
current_value
);
let changed_result = block_on(rx.changed(&cx));
crate::assert_with_log!(
changed_result.is_ok(),
"changed() succeeds for version jump",
true,
changed_result.is_ok()
);
crate::assert_with_log!(
rx.seen_version == 5,
"receiver version jumps directly to latest",
5,
rx.seen_version
);
crate::assert_with_log!(
*rx.borrow() == 5,
"receiver sees latest value after changed",
5,
*rx.borrow()
);
for i in 6..=1000 {
tx.send(i).expect("send should succeed");
}
let final_value = *rx.borrow();
crate::assert_with_log!(
final_value == 1000,
"receiver sees latest of many rapid updates",
1000,
final_value
);
let changed_result = block_on(rx.changed(&cx));
crate::assert_with_log!(
changed_result.is_ok(),
"changed() succeeds for large version jump",
true,
changed_result.is_ok()
);
crate::assert_with_log!(
rx.seen_version == 1000,
"receiver version jumps directly to 1000",
1000,
rx.seen_version
);
crate::test_complete!("audit_watch_no_buffering_latest_only");
}
#[test]
fn audit_send_modify_panic_safe_semantics() {
init_test("audit_send_modify_panic_safe_semantics");
let (tx, mut rx) = channel(42);
crate::assert_with_log!(*rx.borrow() == 42, "initial value is 42", 42, *rx.borrow());
let modify_result = tx.send_modify(|x| *x += 1);
crate::assert_with_log!(
modify_result.is_ok(),
"successful modify returns Ok",
true,
modify_result.is_ok()
);
crate::assert_with_log!(
*rx.borrow() == 43,
"value updated to 43 after successful modify",
43,
*rx.borrow()
);
let panic_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
tx.send_modify(|x| {
*x = 999; panic!("intentional panic during modify");
})
}));
crate::assert_with_log!(
panic_result.is_err(),
"modify closure panic was caught",
true,
panic_result.is_err()
);
crate::assert_with_log!(
*rx.borrow() == 43,
"value unchanged after panic (panic-safe)",
43,
*rx.borrow()
);
let post_panic_result = tx.send_modify(|x| *x += 10);
crate::assert_with_log!(
post_panic_result.is_ok(),
"Sender remains usable after panic (no poisoning)",
true,
post_panic_result.is_ok()
);
crate::assert_with_log!(
*rx.borrow() == 53,
"value updated to 53 after post-panic modify",
53,
*rx.borrow()
);
for i in 1..=3 {
let _panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
tx.send_modify(|_x| panic!("panic cycle {}", i))
}));
let recovery_result = tx.send_modify(|x| *x += 1);
crate::assert_with_log!(
recovery_result.is_ok(),
&format!("Sender recovers after panic cycle {}", i),
true,
recovery_result.is_ok()
);
}
let final_value = *rx.borrow();
crate::assert_with_log!(
final_value == 56, "final value correct after multiple panic cycles",
56,
final_value
);
let cx = test_cx();
let changed_future = rx.changed(&cx);
tx.send_modify(|x| *x = 100)
.expect("final modify should work");
poll_ready(&mut Box::pin(changed_future))
.expect("changed future should observe the final send_modify");
crate::assert_with_log!(
*rx.borrow() == 100,
"receiver notifications work after panic recovery",
100,
*rx.borrow()
);
crate::test_complete!("audit_send_modify_panic_safe_semantics");
}
}