#![allow(clippy::all)]
#![allow(dead_code)]
use crate::cx::Cx;
use crate::types::cancel::CancelReason;
use crate::types::{Budget, Outcome, RegionId, TaskId};
use crate::util::{ArenaIndex, DetRng};
use proptest::prelude::*;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
struct TestFuture {
id: u32,
value: i32,
polls_to_complete: AtomicU32,
should_panic: bool,
should_panic_during_drain: bool,
cancelled: AtomicBool,
cancel_reason: parking_lot::Mutex<Option<CancelReason>>,
completed: AtomicBool,
drained: AtomicBool,
global_state: Arc<GlobalTestState>,
}
impl TestFuture {
fn new(
id: u32,
value: i32,
polls_to_complete: u32,
global_state: Arc<GlobalTestState>,
) -> Self {
Self {
id,
value,
polls_to_complete: AtomicU32::new(polls_to_complete),
should_panic: false,
should_panic_during_drain: false,
cancelled: AtomicBool::new(false),
cancel_reason: parking_lot::Mutex::new(None),
completed: AtomicBool::new(false),
drained: AtomicBool::new(false),
global_state,
}
}
fn with_panic(mut self) -> Self {
self.should_panic = true;
self
}
fn with_drain_panic(mut self) -> Self {
self.should_panic_during_drain = true;
self
}
fn cancel(&self, reason: CancelReason) {
self.cancelled.store(true, Ordering::SeqCst);
*self.cancel_reason.lock() = Some(reason);
self.global_state.on_future_cancelled(self.id);
}
fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::SeqCst)
}
}
impl Future for TestFuture {
type Output = i32;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
this.global_state.on_future_polled(this.id);
if this.is_cancelled() {
if this.should_panic_during_drain {
this.global_state.on_future_drain_panic(this.id);
panic!("drain panic in future {}", this.id);
}
this.drained.store(true, Ordering::SeqCst);
this.global_state.on_future_drained(this.id);
return Poll::Ready(this.value); }
if this.should_panic {
this.global_state.on_future_execution_panic(this.id);
panic!("execution panic in future {}", this.id);
}
let remaining = this.polls_to_complete.load(Ordering::SeqCst);
if remaining > 0 {
this.polls_to_complete
.store(remaining - 1, Ordering::SeqCst);
Poll::Pending
} else {
this.completed.store(true, Ordering::SeqCst);
this.global_state.on_future_completed(this.id, this.value);
Poll::Ready(this.value)
}
}
}
#[derive(Debug, Default)]
struct GlobalTestState {
polled_futures: parking_lot::Mutex<Vec<u32>>,
completed_futures: parking_lot::Mutex<Vec<(u32, i32)>>,
cancelled_futures: parking_lot::Mutex<Vec<u32>>,
drained_futures: parking_lot::Mutex<Vec<u32>>,
execution_panics: parking_lot::Mutex<Vec<u32>>,
drain_panics: parking_lot::Mutex<Vec<u32>>,
total_polls: AtomicUsize,
race_operations: AtomicUsize,
}
impl GlobalTestState {
fn new() -> Arc<Self> {
Arc::new(Self::default())
}
fn on_future_polled(&self, id: u32) {
self.polled_futures.lock().push(id);
self.total_polls.fetch_add(1, Ordering::SeqCst);
}
fn on_future_completed(&self, id: u32, value: i32) {
self.completed_futures.lock().push((id, value));
}
fn on_future_cancelled(&self, id: u32) {
self.cancelled_futures.lock().push(id);
}
fn on_future_drained(&self, id: u32) {
self.drained_futures.lock().push(id);
}
fn on_future_execution_panic(&self, id: u32) {
self.execution_panics.lock().push(id);
}
fn on_future_drain_panic(&self, id: u32) {
self.drain_panics.lock().push(id);
}
fn on_race_operation(&self) {
self.race_operations.fetch_add(1, Ordering::SeqCst);
}
fn drain_order(&self) -> Vec<u32> {
self.drained_futures.lock().clone()
}
fn get_undrained_losers(&self) -> Vec<u32> {
let cancelled = self.cancelled_futures.lock();
let drained = self.drained_futures.lock();
cancelled
.iter()
.filter(|&&id| !drained.contains(&id))
.copied()
.collect()
}
fn all_cancelled_futures_drained(&self) -> bool {
self.get_undrained_losers().is_empty()
}
fn get_stats(&self) -> TestStats {
TestStats {
total_polls: self.total_polls.load(Ordering::SeqCst),
completed_count: self.completed_futures.lock().len(),
cancelled_count: self.cancelled_futures.lock().len(),
drained_count: self.drained_futures.lock().len(),
execution_panics_count: self.execution_panics.lock().len(),
drain_panics_count: self.drain_panics.lock().len(),
race_operations_count: self.race_operations.load(Ordering::SeqCst),
}
}
}
#[derive(Debug, Clone, PartialEq)]
struct TestStats {
total_polls: usize,
completed_count: usize,
cancelled_count: usize,
drained_count: usize,
execution_panics_count: usize,
drain_panics_count: usize,
race_operations_count: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct SeededDrainSimulation {
winner_completed: bool,
all_cancelled_futures_drained: bool,
drain_order: Vec<u32>,
total_polls: usize,
}
fn create_test_context(region_id: u32, task_id: u32) -> Cx {
Cx::new(
RegionId::from_arena(ArenaIndex::new(region_id, 0)),
TaskId::from_arena(ArenaIndex::new(task_id, 0)),
Budget::INFINITE,
)
}
fn block_on<F: Future>(f: F) -> F::Output {
let waker = Waker::from(Arc::new(TestWaker));
let mut cx = Context::from_waker(&waker);
let mut pinned = Box::pin(f);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => {
std::thread::yield_now();
}
}
}
}
#[derive(Debug)]
struct TestWaker;
impl std::task::Wake for TestWaker {
fn wake(self: Arc<Self>) {
}
}
fn poll_test_future(future: &mut TestFuture) -> Poll<i32> {
let waker = Waker::from(Arc::new(TestWaker));
let mut cx = Context::from_waker(&waker);
Pin::new(future).poll(&mut cx)
}
fn next_seeded_order(branch_count: usize, rng: &mut DetRng) -> Vec<usize> {
let mut order = (0..branch_count).collect::<Vec<_>>();
rng.shuffle(&mut order);
order
}
fn run_seeded_loser_drain_simulation(
branch_count: usize,
winner_index: usize,
loser_poll_counts: &[u32],
seed: u64,
) -> SeededDrainSimulation {
let global_state = GlobalTestState::new();
let mut rng = DetRng::new(seed);
let mut futures = Vec::with_capacity(branch_count);
for i in 0..branch_count {
let polls = if i == winner_index {
1
} else {
loser_poll_counts[i % loser_poll_counts.len()]
};
futures.push(TestFuture::new(
i as u32,
(i * 10) as i32,
polls,
global_state.clone(),
));
}
global_state.on_race_operation();
let mut winner_completed = false;
while !winner_completed {
for index in next_seeded_order(branch_count, &mut rng) {
let future = &mut futures[index];
if future.completed.load(Ordering::SeqCst) || future.is_cancelled() {
continue;
}
if let Poll::Ready(_) = poll_test_future(future) {
if index == winner_index {
winner_completed = true;
for (loser_index, other) in futures.iter().enumerate() {
if loser_index != index {
other.cancel(CancelReason::race_loser());
}
}
break;
}
}
}
}
let mut drain_indices = (0..branch_count)
.filter(|index| *index != winner_index)
.collect::<Vec<_>>();
rng.shuffle(&mut drain_indices);
for index in drain_indices {
let future = &mut futures[index];
if future.is_cancelled() && !future.drained.load(Ordering::SeqCst) {
let poll = poll_test_future(future);
debug_assert!(poll.is_ready(), "cancelled futures must drain immediately");
}
}
let stats = global_state.get_stats();
SeededDrainSimulation {
winner_completed,
all_cancelled_futures_drained: global_state.all_cancelled_futures_drained(),
drain_order: global_state.drain_order(),
total_polls: stats.total_polls,
}
}
fn mr1_loser_drain_completeness(
branch_count: usize,
winner_index: usize,
loser_poll_counts: Vec<u32>,
seed: u64,
) -> bool {
let simulation =
run_seeded_loser_drain_simulation(branch_count, winner_index, &loser_poll_counts, seed);
let poll_budget = branch_count * 3 - 1;
let result = simulation.winner_completed
&& simulation.all_cancelled_futures_drained
&& simulation.total_polls <= poll_budget;
crate::assert_with_log!(
result,
"MR1: winner completes and every cancelled loser drains within the bounded poll budget",
true,
result
);
result
}
fn mr2_panic_isolation(
branch_count: usize,
winner_index: usize,
panic_loser_indices: Vec<usize>,
_seed: u64,
) -> bool {
let global_state = GlobalTestState::new();
let mut futures = Vec::new();
for i in 0..branch_count {
let mut future = TestFuture::new(
i as u32,
(i * 10) as i32,
if i == winner_index { 1 } else { 5 },
global_state.clone(),
);
if panic_loser_indices.contains(&i) {
future = future.with_drain_panic();
}
futures.push(future);
}
global_state.on_race_operation();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let winner_value = loop {
let waker = Waker::from(Arc::new(TestWaker));
let mut cx = Context::from_waker(&waker);
let mut pinned = Pin::new(&mut futures[winner_index]);
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(val) => break val,
Poll::Pending => {}
}
};
for (i, future) in futures.iter_mut().enumerate() {
if i != winner_index {
future.cancel(CancelReason::race_loser());
let _drain_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let waker = Waker::from(Arc::new(TestWaker));
let mut cx = Context::from_waker(&waker);
let mut pinned = Pin::new(future);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(_) => break,
Poll::Pending => {}
}
}
}));
}
}
winner_value
}));
let race_completed = result.is_ok();
let isolation_maintained = race_completed;
crate::assert_with_log!(
isolation_maintained,
"MR2: Panic isolation maintained",
true,
isolation_maintained
);
isolation_maintained
}
fn mr3_branch_outcome_consistency(branch_count: usize, winner_index: usize, _seed: u64) -> bool {
let global_state = GlobalTestState::new();
let mut futures = Vec::new();
for i in 0..branch_count {
let future = TestFuture::new(
i as u32,
(i * 10) as i32,
if i == winner_index { 1 } else { 10 }, global_state.clone(),
);
futures.push(future);
}
global_state.on_race_operation();
let mut outcomes: Vec<Option<Outcome<i32, ()>>> = vec![None; branch_count];
let mut winner_completed = false;
while !winner_completed {
for (i, future) in futures.iter_mut().enumerate() {
if outcomes[i].is_none() && !future.is_cancelled() {
let waker = Waker::from(Arc::new(TestWaker));
let mut cx = Context::from_waker(&waker);
let mut pinned = Pin::new(future);
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(val) => {
outcomes[i] = Some(Outcome::Ok(val));
if i == winner_index {
winner_completed = true;
for (j, other) in futures.iter().enumerate() {
if j != i {
other.cancel(CancelReason::race_loser());
outcomes[j] =
Some(Outcome::Cancelled(CancelReason::race_loser()));
}
}
}
break;
}
Poll::Pending => {}
}
}
}
}
let winner_outcome_correct = matches!(outcomes[winner_index], Some(Outcome::Ok(_)));
let losers_cancelled = (0..branch_count)
.filter(|&i| i != winner_index)
.all(|i| matches!(outcomes[i], Some(Outcome::Cancelled(_))));
let consistency_maintained = winner_outcome_correct && losers_cancelled;
crate::assert_with_log!(
consistency_maintained,
"MR3: Branch outcome consistency maintained",
true,
consistency_maintained
);
consistency_maintained
}
fn mr4_cancel_propagation_consistency(
branch_count: usize,
cancel_after_polls: u32,
_seed: u64,
) -> bool {
let global_state = GlobalTestState::new();
let mut futures = Vec::new();
for i in 0..branch_count {
let future = TestFuture::new(i as u32, (i * 10) as i32, 20, global_state.clone()); futures.push(future);
}
global_state.on_race_operation();
let mut outcomes: Vec<Option<Outcome<i32, ()>>> = vec![None; branch_count];
let mut total_polls = 0;
let race_cancel_reason = CancelReason::user("test race cancellation");
while total_polls < cancel_after_polls {
let mut any_completed = false;
for (i, future) in futures.iter_mut().enumerate() {
if outcomes[i].is_none() && !future.is_cancelled() {
let waker = Waker::from(Arc::new(TestWaker));
let mut cx = Context::from_waker(&waker);
let mut pinned = Pin::new(future);
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(val) => {
outcomes[i] = Some(Outcome::Ok(val));
any_completed = true;
}
Poll::Pending => {}
}
total_polls += 1;
if total_polls >= cancel_after_polls {
break;
}
}
}
if any_completed {
break;
}
}
for (i, future) in futures.iter_mut().enumerate() {
if outcomes[i].is_none() {
future.cancel(race_cancel_reason.clone());
outcomes[i] = Some(Outcome::Cancelled(race_cancel_reason.clone()));
}
}
for future in &mut futures {
if future.is_cancelled() {
let waker = Waker::from(Arc::new(TestWaker));
let mut cx = Context::from_waker(&waker);
let mut pinned = Pin::new(future);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(_) => break,
Poll::Pending => {}
}
}
}
}
let cancelled_count = outcomes
.iter()
.filter(|outcome| matches!(outcome, Some(Outcome::Cancelled(_))))
.count();
let completed_count = outcomes
.iter()
.filter(|outcome| matches!(outcome, Some(Outcome::Ok(_))))
.count();
let propagation_correct =
cancelled_count > 0 && (cancelled_count + completed_count == branch_count);
crate::assert_with_log!(
propagation_correct,
"MR4: Cancel propagation consistency maintained",
true,
propagation_correct
);
propagation_correct
}
fn mr5_seed_replay_preserves_loser_drain_order(
branch_count: usize,
winner_index: usize,
loser_poll_counts: Vec<u32>,
seed: u64,
) -> bool {
let baseline =
run_seeded_loser_drain_simulation(branch_count, winner_index, &loser_poll_counts, seed);
let replay =
run_seeded_loser_drain_simulation(branch_count, winner_index, &loser_poll_counts, seed);
let deterministic = baseline == replay
&& baseline.drain_order.len() == branch_count - 1
&& !baseline.drain_order.contains(&(winner_index as u32));
crate::assert_with_log!(
deterministic,
"MR5: replaying a fixed seed preserves loser drain order",
true,
deterministic
);
deterministic
}
#[cfg(test)]
mod tests {
use super::*;
proptest! {
#[test]
fn test_mr1_loser_drain_completeness(
branch_count in 2usize..=8,
winner_index in 0usize..8,
loser_polls in prop::collection::vec(1u32..10, 1..8),
seed in any::<u64>(),
) {
let winner_index = winner_index % branch_count;
prop_assert!(mr1_loser_drain_completeness(
branch_count,
winner_index,
loser_polls,
seed
));
}
#[test]
fn test_mr2_panic_isolation(
branch_count in 2usize..=6,
winner_index in 0usize..6,
panic_indices in prop::collection::vec(0usize..6, 0..3),
seed in any::<u64>(),
) {
let winner_index = winner_index % branch_count;
let panic_indices: Vec<usize> = panic_indices.into_iter()
.filter(|&i| i < branch_count && i != winner_index)
.collect();
prop_assert!(mr2_panic_isolation(
branch_count,
winner_index,
panic_indices,
seed
));
}
#[test]
fn test_mr3_branch_outcome_consistency(
branch_count in 2usize..=8,
winner_index in 0usize..8,
seed in any::<u64>(),
) {
let winner_index = winner_index % branch_count;
prop_assert!(mr3_branch_outcome_consistency(
branch_count,
winner_index,
seed
));
}
#[test]
fn test_mr4_cancel_propagation_consistency(
branch_count in 2usize..=8,
cancel_after_polls in 1u32..10,
seed in any::<u64>(),
) {
prop_assert!(mr4_cancel_propagation_consistency(
branch_count,
cancel_after_polls,
seed
));
}
#[test]
fn test_mr5_seed_replay_preserves_loser_drain_order(
branch_count in 2usize..=8,
winner_index in 0usize..8,
loser_polls in prop::collection::vec(1u32..10, 1..8),
seed in any::<u64>(),
) {
let winner_index = winner_index % branch_count;
prop_assert!(mr5_seed_replay_preserves_loser_drain_order(
branch_count,
winner_index,
loser_polls,
seed
));
}
}
#[test]
fn test_race_metamorphic_integration() {
let global_state = GlobalTestState::new();
let branch_count = 4;
let winner_index = 1; let seed = 12345;
let mut futures = Vec::new();
for i in 0..branch_count {
let polls = if i == winner_index { 2 } else { 8 + i as u32 };
let mut future =
TestFuture::new(i as u32, (i * 100) as i32, polls, global_state.clone());
if i == 3 {
future = future.with_drain_panic();
}
futures.push(future);
}
assert!(mr1_loser_drain_completeness(
branch_count,
winner_index,
vec![8, 9, 10],
seed
));
assert!(mr2_panic_isolation(
branch_count,
winner_index,
vec![3],
seed
));
assert!(mr3_branch_outcome_consistency(
branch_count,
winner_index,
seed
));
assert!(mr4_cancel_propagation_consistency(branch_count, 5, seed));
assert!(mr5_seed_replay_preserves_loser_drain_order(
branch_count,
winner_index,
vec![8, 9, 10],
seed
));
}
#[test]
fn test_race_edge_cases() {
assert!(mr1_loser_drain_completeness(2, 0, vec![5], 1111));
assert!(mr3_branch_outcome_consistency(2, 1, 2222));
assert!(mr1_loser_drain_completeness(8, 3, vec![2, 4, 6], 3333));
assert!(mr3_branch_outcome_consistency(8, 7, 4444));
assert!(mr4_cancel_propagation_consistency(5, 1, 5555));
assert!(mr4_cancel_propagation_consistency(3, 15, 6666));
assert!(mr5_seed_replay_preserves_loser_drain_order(
5,
2,
vec![3, 4, 5],
7777
));
}
}