#![allow(clippy::all)]
#![allow(dead_code)]
use crate::channel::oneshot::{self, RecvError, SendError, TryRecvError};
use crate::types::Budget;
use crate::util::ArenaIndex;
use crate::{Cx, RegionId, TaskId};
use proptest::prelude::*;
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq)]
struct TestValue {
id: u64,
data: String,
sequence: u32,
}
impl TestValue {
fn new(id: u64, data: impl Into<String>, sequence: u32) -> Self {
Self {
id,
data: data.into(),
sequence,
}
}
}
fn create_test_context(region_id: u32, task_id: u32) -> Cx {
Cx::new(
RegionId::from_arena(ArenaIndex::new(region_id, 0)),
TaskId::from_arena(ArenaIndex::new(task_id, 0)),
Budget::INFINITE,
)
}
fn block_on<F: Future>(f: F) -> F::Output {
let waker = Waker::from(std::sync::Arc::new(TestNoopWaker));
let mut cx = Context::from_waker(&waker);
let mut pinned = Box::pin(f);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
#[derive(Debug)]
struct TestNoopWaker;
impl std::task::Wake for TestNoopWaker {
fn wake(self: std::sync::Arc<Self>) {}
}
struct CountWaker(Arc<AtomicUsize>);
impl std::task::Wake for CountWaker {
fn wake(self: std::sync::Arc<Self>) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
fn counting_waker(counter: Arc<AtomicUsize>) -> Waker {
Waker::from(Arc::new(CountWaker(counter)))
}
#[test]
fn mr1_permit_drop_equivalence() {
proptest!(|(
test_id in 0u64..1000,
data in "[a-zA-Z0-9]{1,20}",
sequence in 0u32..100
)| {
let value = TestValue::new(test_id, data, sequence);
let result1 = block_on(async {
let (tx, mut rx) = oneshot::channel();
let cx = create_test_context(1, 1);
{
let _permit: oneshot::SendPermit<TestValue> = tx.reserve(&cx);
}
match rx.recv(&cx).await {
Err(RecvError::Closed) => Ok(()),
other => Err(format!("Expected Closed, got {:?}", other)),
}
});
let result2 = block_on(async {
let (tx, mut rx) = oneshot::channel::<TestValue>();
let cx = create_test_context(1, 2);
drop(tx);
match rx.recv(&cx).await {
Err(RecvError::Closed) => Ok(()),
other => Err(format!("Expected Closed, got {:?}", other)),
}
});
prop_assert_eq!(&result1, &result2,
"Permit drop should be equivalent to sender drop for value {:?}", value);
prop_assert!(result1.is_ok(), "Path 1 should result in Closed");
prop_assert!(result2.is_ok(), "Path 2 should result in Closed");
});
}
#[test]
fn mr2_send_atomicity() {
proptest!(|(
test_id in 0u64..1000,
data in "[a-zA-Z0-9]{1,20}",
sequence in 0u32..100,
drop_receiver_first in prop::bool::ANY
)| {
let value = TestValue::new(test_id, data, sequence);
let result = block_on(async {
let (tx, mut rx) = oneshot::channel();
let cx = create_test_context(1, 1);
if drop_receiver_first {
drop(rx);
match tx.send(&cx, value.clone()) {
Err(SendError::Disconnected(returned_value)) => {
if returned_value == value {
Ok(("send_failed", Some(value.clone())))
} else {
Err(format!("Send returned different value: expected {:?}, got {:?}",
value, returned_value))
}
}
Ok(()) => Err("Send should have failed when receiver dropped".to_string()),
}
} else {
match tx.send(&cx, value.clone()) {
Ok(()) => {
match rx.recv(&cx).await {
Ok(received_value) => {
if received_value == value {
Ok(("send_success", Some(received_value)))
} else {
Err(format!("Received different value: expected {:?}, got {:?}",
value, received_value))
}
}
Err(e) => Err(format!("Recv failed after successful send: {:?}", e)),
}
}
Err(SendError::Disconnected(_)) => {
Err("Unexpected SendError::Disconnected".to_string())
}
}
}
});
prop_assert!(result.is_ok(), "Send atomicity violated: {:?}", result);
if let Ok(("send_success", Some(expected_value))) = &result {
let (tx2, mut rx2) = oneshot::channel();
let cx2 = create_test_context(2, 2);
let atomic_check = block_on(async {
tx2.send(&cx2, expected_value.clone()).expect("second send should work");
match rx2.try_recv() {
Ok(received) => {
if received == *expected_value {
Ok(())
} else {
Err(format!("try_recv got different value: {:?} vs {:?}",
received, expected_value))
}
}
Err(TryRecvError::Empty) => Err("try_recv returned Empty after send".to_string()),
Err(TryRecvError::Closed) => Err("try_recv returned Closed after send".to_string()),
}
});
prop_assert!(atomic_check.is_ok(), "try_recv atomicity check failed: {:?}", atomic_check);
}
});
}
#[test]
fn mr3_receiver_drop_detection() {
proptest!(|(
test_id in 0u64..1000,
data in "[a-zA-Z0-9]{1,20}",
sequence in 0u32..100,
use_reserve in prop::bool::ANY
)| {
let value = TestValue::new(test_id, data, sequence);
let result = block_on(async {
let (tx, rx) = oneshot::channel();
let cx = create_test_context(1, 1);
drop(rx);
if use_reserve {
let permit = tx.reserve(&cx);
if !permit.is_closed() {
return Err("permit.is_closed() should return true after receiver drop".to_string());
}
match permit.send(value.clone()) {
Err(SendError::Disconnected(returned_value)) => {
if returned_value == value {
Ok(("reserve_disconnected", returned_value))
} else {
Err(format!("Reserve+send returned different value: expected {:?}, got {:?}",
value, returned_value))
}
}
Ok(()) => Err("Reserve+send should have failed when receiver dropped".to_string()),
}
} else {
match tx.send(&cx, value.clone()) {
Err(SendError::Disconnected(returned_value)) => {
if returned_value == value {
Ok(("direct_disconnected", returned_value))
} else {
Err(format!("Direct send returned different value: expected {:?}, got {:?}",
value, returned_value))
}
}
Ok(()) => Err("Direct send should have failed when receiver dropped".to_string()),
}
}
});
prop_assert!(result.is_ok(), "Receiver drop detection failed: {:?}", result);
if let Ok((method, returned_value)) = result {
prop_assert_eq!(returned_value, value,
"Method {} did not return identical value", method);
}
});
}
#[test]
fn mr4_cancel_invariant_preservation() {
proptest!(|(
test_id in 0u64..1000,
data in "[a-zA-Z0-9]{1,20}",
sequence in 0u32..100,
cancel_delay_ms in 0u64..50,
send_delay_ms in 0u64..50
)| {
let value = TestValue::new(test_id, data, sequence);
let (tx, mut rx) = oneshot::channel();
let cx_send = create_test_context(1, 1);
let cx_recv = create_test_context(1, 2);
let send_result = Arc::new(std::sync::Mutex::new(None));
let recv_result = Arc::new(std::sync::Mutex::new(None));
let cancel_happened = Arc::new(AtomicBool::new(false));
let send_result_clone = Arc::clone(&send_result);
let recv_result_clone = Arc::clone(&recv_result);
let cancel_happened_clone = Arc::clone(&cancel_happened);
let value_for_send = value.clone();
let send_handle = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(send_delay_ms));
let result = block_on(async {
tx.send(&cx_send, value_for_send)
});
*send_result_clone.lock().unwrap() = Some(result);
});
let recv_handle = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(cancel_delay_ms));
cx_recv.set_cancel_requested(true);
cancel_happened_clone.store(true, Ordering::SeqCst);
let result = block_on(async {
rx.recv(&cx_recv).await
});
*recv_result_clone.lock().unwrap() = Some(result);
});
let _ = send_handle.join();
let _ = recv_handle.join();
let send_result = send_result.lock().unwrap().take().unwrap();
let recv_result = recv_result.lock().unwrap().take().unwrap();
let cancel_occurred = cancel_happened.load(Ordering::SeqCst);
match (&send_result, &recv_result) {
(Ok(()), Ok(received_value)) => {
prop_assert_eq!(received_value, &value,
"Received value should match sent value when both succeed");
}
(Ok(()), Err(RecvError::Cancelled)) => {
prop_assert!(cancel_occurred, "Cancel should have been signaled");
}
(Err(SendError::Disconnected(returned_value)), _) => {
prop_assert_eq!(returned_value, &value,
"Failed send should return original value");
prop_assert!(cancel_occurred, "Cancel should have been signaled for failed send");
}
(Ok(()), Err(RecvError::Closed)) => {
prop_assert!(false, "Unexpected combination: send success + recv closed");
}
(Ok(()), Err(RecvError::PolledAfterCompletion)) => {
prop_assert!(false, "Unexpected PolledAfterCompletion error");
}
}
let value_preserved = match (&send_result, &recv_result) {
(Ok(()), Ok(received_value)) => received_value == &value,
(Err(SendError::Disconnected(returned_value)), _) => returned_value == &value,
_ => true, };
prop_assert!(value_preserved,
"Original value must be preserved in success or error path");
});
}
#[test]
fn mr_composite_abort_vs_send_fail_equivalence() {
proptest!(|(
test_id in 0u64..1000,
data in "[a-zA-Z0-9]{1,20}",
sequence in 0u32..100
)| {
let value = TestValue::new(test_id, data, sequence);
let result1: Result<&str, String> = block_on(async {
let (tx, rx) = oneshot::channel::<TestValue>();
let cx = create_test_context(1, 1);
let permit = tx.reserve(&cx);
drop(rx); permit.abort();
Ok("aborted")
});
let result2: Result<&str, String> = block_on(async {
let (tx, rx) = oneshot::channel::<TestValue>();
let cx = create_test_context(1, 2);
let permit = tx.reserve(&cx);
drop(rx);
match permit.send(value.clone()) {
Err(SendError::Disconnected(returned_value)) => {
if returned_value == value {
Ok("send_failed")
} else {
Err(format!("Send failed but returned wrong value"))
}
}
Ok(()) => Err(format!("Send should have failed")),
}
});
prop_assert!(result1.is_ok(), "Abort path failed: {:?}", result1);
prop_assert!(result2.is_ok(), "Send-fail path failed: {:?}", result2);
let (tx3, mut rx3) = oneshot::channel::<TestValue>();
let (tx4, rx4) = oneshot::channel::<TestValue>();
let cx3 = create_test_context(3, 3);
let cx4 = create_test_context(4, 4);
let permit3 = tx3.reserve(&cx3);
permit3.abort();
let recv_result3 = block_on(rx3.recv(&cx3));
drop(rx4);
let _send_result4 = tx4.send(&cx4, value);
prop_assert!(matches!(recv_result3, Err(RecvError::Closed)),
"Abort should result in Closed error, got {:?}", recv_result3);
});
}
#[test]
fn mr_schedule_exploration() {
proptest!(|(
test_id in 0u64..100, // Smaller range for faster testing
data in "[a-zA-Z]{1,10}",
sequence in 0u32..10
)| {
let value = TestValue::new(test_id, data, sequence);
for schedule_seed in 0..3 {
let result = block_on(async {
let (tx, mut rx) = oneshot::channel();
let cx1 = create_test_context(1, 1);
let cx2 = create_test_context(1, 2);
let send_future = async {
tx.send(&cx1, value.clone())
};
let recv_future = async {
rx.recv(&cx2).await
};
let send_result = send_future.await;
let recv_result = recv_future.await;
(send_result, recv_result)
});
match result {
(Ok(()), Ok(received_value)) => {
prop_assert_eq!(&received_value, &value,
"Received value should match sent value for seed {}", schedule_seed);
}
(Err(SendError::Disconnected(returned_value)), Err(_)) => {
prop_assert_eq!(&returned_value, &value,
"Failed send should return original value for seed {}", schedule_seed);
}
other => {
prop_assert!(false, "Unexpected result combination for seed {}: {:?}",
schedule_seed, other);
}
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn integration_all_mrs_together() {
let value = TestValue::new(42, "test_data", 1);
let mr1_result = block_on(async {
let (tx, mut rx) = oneshot::channel::<TestValue>();
let cx = create_test_context(1, 1);
{
let _permit: oneshot::SendPermit<TestValue> = tx.reserve(&cx);
}
rx.recv(&cx).await
});
assert!(matches!(mr1_result, Err(RecvError::Closed)));
let (tx2, mut rx2) = oneshot::channel();
let cx2 = create_test_context(2, 2);
let send_result = tx2.send(&cx2, value.clone());
assert!(send_result.is_ok());
let recv_result = block_on(rx2.recv(&cx2));
assert_eq!(recv_result.unwrap(), value);
let (tx3, rx3) = oneshot::channel();
let cx3 = create_test_context(3, 3);
drop(rx3);
let send_result3 = tx3.send(&cx3, value.clone());
match send_result3 {
Err(SendError::Disconnected(returned)) => assert_eq!(returned, value),
_ => panic!("Should have returned disconnected error"),
}
println!("All metamorphic relations verified successfully!");
}
}