use crate::Cx;
use crate::channel::oneshot::{self, RecvError, SendError, TryRecvError};
use proptest::prelude::*;
use std::future::Future;
use std::task::{Context, Poll};
#[derive(Debug, Clone, PartialEq, Eq)]
struct TestValue {
id: u64,
data: String,
sequence: u32,
}
impl TestValue {
fn new(id: u64, data: impl Into<String>, sequence: u32) -> Self {
Self {
id,
data: data.into(),
sequence,
}
}
}
fn create_test_context(_region_id: u32, _task_id: u32) -> Cx {
Cx::for_testing()
}
fn block_on<F: Future>(f: F) -> F::Output {
let waker = std::task::Waker::noop().clone();
let mut cx = Context::from_waker(&waker);
let mut pinned = Box::pin(f);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
#[test]
fn mr1_permit_drop_equivalence() {
proptest!(|(
test_id in 0u64..1000,
data in "[a-zA-Z0-9]{1,20}",
sequence in 0u32..100
)| {
let value = TestValue::new(test_id, data, sequence);
let result1 = block_on(async {
let (tx, mut rx) = oneshot::channel();
let cx = create_test_context(1, 1);
{
let _permit: oneshot::SendPermit<TestValue> = tx.reserve(&cx).expect("reserve 1");
}
match rx.recv(&cx).await {
Err(RecvError::Closed) => Ok(()),
other => Err(format!("Expected Closed, got {:?}", other)),
}
});
let result2 = block_on(async {
let (tx, mut rx) = oneshot::channel::<TestValue>();
let cx = create_test_context(1, 2);
drop(tx);
match rx.recv(&cx).await {
Err(RecvError::Closed) => Ok(()),
other => Err(format!("Expected Closed, got {:?}", other)),
}
});
prop_assert_eq!(&result1, &result2,
"Permit drop should be equivalent to sender drop for value {:?}", value);
prop_assert!(result1.is_ok(), "Path 1 should result in Closed");
prop_assert!(result2.is_ok(), "Path 2 should result in Closed");
});
}
#[test]
fn mr2_send_atomicity() {
proptest!(|(
test_id in 0u64..1000,
data in "[a-zA-Z0-9]{1,20}",
sequence in 0u32..100,
drop_receiver_first in prop::bool::ANY
)| {
let value = TestValue::new(test_id, data, sequence);
let result = block_on(async {
let (tx, mut rx) = oneshot::channel();
let cx = create_test_context(1, 1);
if drop_receiver_first {
drop(rx);
match tx.send(&cx, value.clone()) {
Err(SendError::Disconnected(returned_value)
| SendError::Cancelled(returned_value)) => {
if returned_value == value {
Ok(("send_failed", Some(value.clone())))
} else {
Err(format!("Send returned different value: expected {:?}, got {:?}",
value, returned_value))
}
}
Ok(()) => Err("Send should have failed when receiver dropped".to_string()),
}
} else {
match tx.send(&cx, value.clone()) {
Ok(()) => {
match rx.recv(&cx).await {
Ok(received_value) => {
if received_value == value {
Ok(("send_success", Some(received_value)))
} else {
Err(format!("Received different value: expected {:?}, got {:?}",
value, received_value))
}
}
Err(e) => Err(format!("Recv failed after successful send: {:?}", e)),
}
}
Err(SendError::Disconnected(_) | SendError::Cancelled(_)) => {
Err("Unexpected SendError::Disconnected or Cancelled".to_string())
}
}
}
});
prop_assert!(result.is_ok(), "Send atomicity violated: {:?}", result);
if let Ok(("send_success", Some(expected_value))) = &result {
let (tx2, mut rx2) = oneshot::channel();
let cx2 = create_test_context(2, 2);
let atomic_check = block_on(async {
tx2.send(&cx2, expected_value.clone()).expect("second send should work");
match rx2.try_recv() {
Ok(received) => {
if received == *expected_value {
Ok(())
} else {
Err(format!("try_recv got different value: {:?} vs {:?}",
received, expected_value))
}
}
Err(TryRecvError::Empty) => Err("try_recv returned Empty after send".to_string()),
Err(TryRecvError::Closed) => Err("try_recv returned Closed after send".to_string()),
}
});
prop_assert!(atomic_check.is_ok(), "try_recv atomicity check failed: {:?}", atomic_check);
}
});
}
#[test]
fn mr3_receiver_drop_detection() {
proptest!(|(
test_id in 0u64..1000,
data in "[a-zA-Z0-9]{1,20}",
sequence in 0u32..100,
use_reserve in prop::bool::ANY
)| {
let value = TestValue::new(test_id, data, sequence);
let result = block_on(async {
let (tx, rx) = oneshot::channel();
let cx = create_test_context(1, 1);
drop(rx);
if use_reserve {
let permit = tx.reserve(&cx).expect("reserve 3");
if !permit.is_closed() {
return Err("permit.is_closed() should return true after receiver drop".to_string());
}
match permit.send(value.clone()) {
Err(SendError::Disconnected(returned_value) | SendError::Cancelled(returned_value)) => {
if returned_value == value {
Ok(("reserve_disconnected", returned_value))
} else {
Err(format!("Reserve+send returned different value: expected {:?}, got {:?}",
value, returned_value))
}
}
Ok(()) => Err("Reserve+send should have failed when receiver dropped".to_string()),
}
} else {
match tx.send(&cx, value.clone()) {
Err(SendError::Disconnected(returned_value) | SendError::Cancelled(returned_value)) => {
if returned_value == value {
Ok(("direct_disconnected", returned_value))
} else {
Err(format!("Direct send returned different value: expected {:?}, got {:?}",
value, returned_value))
}
}
Ok(()) => Err("Direct send should have failed when receiver dropped".to_string()),
}
}
});
prop_assert!(result.is_ok(), "Receiver drop detection failed: {:?}", result);
if let Ok((method, returned_value)) = result {
prop_assert_eq!(returned_value, value,
"Method {} did not return identical value", method);
}
});
}
#[test]
fn mr4_cancel_invariant_preservation() {
proptest!(|(
test_id in 0u64..1000,
data in "[a-zA-Z0-9]{1,20}",
sequence in 0u32..100
)| {
let value = TestValue::new(test_id, data, sequence);
let (tx_before_cancel, mut rx_before_cancel) = oneshot::channel();
let cx_send = create_test_context(1, 1);
let cx_recv = create_test_context(1, 2);
tx_before_cancel
.send(&cx_send, value.clone())
.expect("receiver is alive");
cx_recv.set_cancel_requested(true);
let recv_after_cancel = block_on(rx_before_cancel.recv(&cx_recv));
prop_assert_eq!(recv_after_cancel, Ok(value.clone()));
let (tx_after_cancel, mut rx_after_cancel) = oneshot::channel();
let cancel_cx = create_test_context(2, 1);
cancel_cx.set_cancel_requested(true);
let cancelled = block_on(rx_after_cancel.recv(&cancel_cx));
prop_assert_eq!(cancelled, Err(RecvError::Cancelled));
let cx_send_after_cancel = create_test_context(2, 2);
tx_after_cancel
.send(&cx_send_after_cancel, value.clone())
.expect("receiver remains alive after recv cancellation");
let retry_cx = create_test_context(2, 3);
let retried = block_on(rx_after_cancel.recv(&retry_cx));
prop_assert_eq!(retried, Ok(value.clone()));
let (tx_after_drop, rx_after_drop) = oneshot::channel();
let cx_send_after_drop = create_test_context(3, 1);
drop(rx_after_drop);
match tx_after_drop.send(&cx_send_after_drop, value.clone()) {
Err(SendError::Disconnected(returned_value) | SendError::Cancelled(returned_value)) => {
prop_assert_eq!(returned_value, value);
}
Ok(()) => prop_assert!(false, "send should fail when receiver is dropped"),
}
});
}
#[test]
fn mr_composite_abort_vs_send_fail_equivalence() {
proptest!(|(
test_id in 0u64..1000,
data in "[a-zA-Z0-9]{1,20}",
sequence in 0u32..100
)| {
let value = TestValue::new(test_id, data, sequence);
let result1: Result<&str, String> = block_on(async {
let (tx, rx) = oneshot::channel::<TestValue>();
let cx = create_test_context(1, 1);
let permit = tx.reserve(&cx).expect("reserve 5");
drop(rx); permit.abort();
Ok("aborted")
});
let result2: Result<&str, String> = block_on(async {
let (tx, rx) = oneshot::channel::<TestValue>();
let cx = create_test_context(1, 2);
let permit = tx.reserve(&cx).expect("reserve 5");
drop(rx);
match permit.send(value.clone()) {
Err(SendError::Disconnected(returned_value) | SendError::Cancelled(returned_value)) => {
if returned_value == value {
Ok("send_failed")
} else {
Err("Send failed but returned wrong value".to_string())
}
}
Ok(()) => Err("Send should have failed".to_string()),
}
});
prop_assert!(result1.is_ok(), "Abort path failed: {:?}", result1);
prop_assert!(result2.is_ok(), "Send-fail path failed: {:?}", result2);
let (tx3, mut rx3) = oneshot::channel::<TestValue>();
let (tx4, rx4) = oneshot::channel::<TestValue>();
let cx3 = create_test_context(3, 3);
let cx4 = create_test_context(4, 4);
let permit3 = tx3.reserve(&cx3).expect("reserve 3");
permit3.abort();
let recv_result3 = block_on(rx3.recv(&cx3));
drop(rx4);
let _send_result4 = tx4.send(&cx4, value);
prop_assert!(matches!(recv_result3, Err(RecvError::Closed)),
"Abort should result in Closed error, got {:?}", recv_result3);
});
}
#[test]
fn mr_schedule_exploration() {
proptest!(|(
test_id in 0u64..100, // Smaller range for faster testing
data in "[a-zA-Z]{1,10}",
sequence in 0u32..10
)| {
let value = TestValue::new(test_id, data, sequence);
for schedule_seed in 0..3 {
let result = block_on(async {
let (tx, mut rx) = oneshot::channel();
let cx1 = create_test_context(1, 1);
let cx2 = create_test_context(1, 2);
let send_future = async {
tx.send(&cx1, value.clone())
};
let recv_future = async {
rx.recv(&cx2).await
};
let send_result = send_future.await;
let recv_result = recv_future.await;
(send_result, recv_result)
});
match result {
(Ok(()), Ok(received_value)) => {
prop_assert_eq!(&received_value, &value,
"Received value should match sent value for seed {}", schedule_seed);
}
(Err(SendError::Disconnected(returned_value)), Err(_)) => {
prop_assert_eq!(&returned_value, &value,
"Failed send should return original value for seed {}", schedule_seed);
}
other => {
prop_assert!(false, "Unexpected result combination for seed {}: {:?}",
schedule_seed, other);
}
}
}
});
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
#[test]
fn integration_all_mrs_together() {
let value = TestValue::new(42, "test_data", 1);
let mr1_result = block_on(async {
let (tx, mut rx) = oneshot::channel::<TestValue>();
let cx = create_test_context(1, 1);
{
let _permit: oneshot::SendPermit<TestValue> = tx.reserve(&cx).expect("reserve 1");
}
rx.recv(&cx).await
});
assert!(matches!(mr1_result, Err(RecvError::Closed)));
let (tx2, mut rx2) = oneshot::channel();
let cx2 = create_test_context(2, 2);
let send_result = tx2.send(&cx2, value.clone());
assert!(send_result.is_ok());
let recv_result = block_on(rx2.recv(&cx2));
assert_eq!(recv_result.unwrap(), value);
let (tx3, rx3) = oneshot::channel();
let cx3 = create_test_context(3, 3);
drop(rx3);
let send_result3 = tx3.send(&cx3, value.clone());
match send_result3 {
Err(SendError::Disconnected(returned)) => assert_eq!(returned, value),
_ => panic!("Should have returned disconnected error"),
}
}
}