use crate::{
ConformanceTest, OneshotSender, RuntimeInterface, TestCategory, TestMeta, TestResult,
checkpoint,
};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
pub fn all_tests<RT: RuntimeInterface + Sync>() -> Vec<ConformanceTest<RT>> {
vec![
cancel_001_timeout_cancels_task::<RT>(),
cancel_002_cleanup_before_return::<RT>(),
cancel_003_nested_timeout::<RT>(),
cancel_004_race_loser_drain::<RT>(),
cancel_005_no_interference_completed::<RT>(),
cancel_006_multiple_timeouts::<RT>(),
cancel_007_cancel_propagates_via_drop::<RT>(),
]
}
pub fn cancel_001_timeout_cancels_task<RT: RuntimeInterface + Sync>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "cancel-001".to_string(),
name: "Timeout cancels running task".to_string(),
description: "A task sleeping longer than its timeout is cancelled".to_string(),
category: TestCategory::Cancel,
tags: vec![
"cancel".to_string(),
"timeout".to_string(),
"observable".to_string(),
],
expected: "Task is cancelled; post-sleep flag not set".to_string(),
},
|rt| {
rt.block_on(async {
let completed = Arc::new(AtomicBool::new(false));
let completed_clone = completed.clone();
let long_sleep = rt.sleep(Duration::from_millis(500));
let result = rt
.timeout(Duration::from_millis(50), async move {
long_sleep.await;
completed_clone.store(true, Ordering::SeqCst);
42
})
.await;
let was_completed = completed.load(Ordering::SeqCst);
checkpoint(
"timeout_result",
serde_json::json!({
"timed_out": result.is_err(),
"task_completed": was_completed
}),
);
if result.is_ok() {
return TestResult::failed("Timeout should have expired before task completed");
}
if was_completed {
return TestResult::failed(
"Task should have been cancelled; post-sleep flag should not be set",
);
}
TestResult::passed()
})
},
)
}
pub fn cancel_002_cleanup_before_return<RT: RuntimeInterface + Sync>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "cancel-002".to_string(),
name: "Cleanup ordering on cancellation".to_string(),
description: "After timeout, task has started but not completed its terminal step"
.to_string(),
category: TestCategory::Cancel,
tags: vec![
"cancel".to_string(),
"cleanup".to_string(),
"ordering".to_string(),
],
expected: "Task started (flag set) but did not finish (terminal flag unset)"
.to_string(),
},
|rt| {
rt.block_on(async {
let started = Arc::new(AtomicBool::new(false));
let finished = Arc::new(AtomicBool::new(false));
let started_c = started.clone();
let finished_c = finished.clone();
let long_sleep = rt.sleep(Duration::from_millis(500));
let _ = rt
.timeout(Duration::from_millis(50), async move {
started_c.store(true, Ordering::SeqCst);
long_sleep.await;
finished_c.store(true, Ordering::SeqCst);
})
.await;
let did_start = started.load(Ordering::SeqCst);
let did_finish = finished.load(Ordering::SeqCst);
checkpoint(
"cleanup_state",
serde_json::json!({
"started": did_start,
"finished": did_finish
}),
);
if !did_start {
return TestResult::failed("Task should have started before cancellation");
}
if did_finish {
return TestResult::failed(
"Task should not have finished; cancellation should prevent terminal step",
);
}
TestResult::passed()
})
},
)
}
pub fn cancel_003_nested_timeout<RT: RuntimeInterface + Sync>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "cancel-003".to_string(),
name: "Nested timeout composition".to_string(),
description: "Inner (tighter) timeout fires before outer timeout".to_string(),
category: TestCategory::Cancel,
tags: vec![
"cancel".to_string(),
"timeout".to_string(),
"nested".to_string(),
],
expected: "Inner timeout fires; outer timeout does not".to_string(),
},
|rt| {
rt.block_on(async {
let start = Instant::now();
let inner_sleep = rt.sleep(Duration::from_millis(300));
let inner_timeout = rt.timeout(Duration::from_millis(50), inner_sleep);
let outer_result = rt.timeout(Duration::from_millis(500), inner_timeout).await;
let elapsed = start.elapsed();
checkpoint(
"nested_timeout",
serde_json::json!({
"elapsed_ms": elapsed.as_millis().min(u128::from(u64::MAX)) as u64,
"outer_timed_out": outer_result.is_err(),
"inner_timed_out": matches!(&outer_result, Ok(Err(_)))
}),
);
match outer_result {
Ok(Err(_inner_timeout)) => {
if elapsed > Duration::from_millis(400) {
return TestResult::failed(format!(
"Inner timeout took too long: {:?}",
elapsed
));
}
TestResult::passed()
}
Ok(Ok(_)) => {
TestResult::failed("Inner future should have timed out, but completed")
}
Err(_) => TestResult::failed("Outer timeout fired unexpectedly"),
}
})
},
)
}
pub fn cancel_004_race_loser_drain<RT: RuntimeInterface + Sync>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "cancel-004".to_string(),
name: "Race loser is drained".to_string(),
description: "In a race, the losing branch does not complete its work".to_string(),
category: TestCategory::Cancel,
tags: vec![
"cancel".to_string(),
"race".to_string(),
"drain".to_string(),
"invariant".to_string(),
],
expected: "Timeout fires; loser's terminal flag is not set".to_string(),
},
|rt| {
rt.block_on(async {
let loser_terminal = Arc::new(AtomicBool::new(false));
let loser_terminal_clone = loser_terminal.clone();
let loser_sleep = rt.sleep(Duration::from_millis(500));
let result = rt
.timeout(Duration::from_millis(50), async move {
let _winner_value = 42;
loser_sleep.await;
loser_terminal_clone.store(true, Ordering::SeqCst);
_winner_value
})
.await;
let loser_did_complete = loser_terminal.load(Ordering::SeqCst);
checkpoint(
"race_drain",
serde_json::json!({
"timed_out": result.is_err(),
"loser_completed": loser_did_complete
}),
);
if result.is_ok() {
return TestResult::failed(
"Timeout should have fired during the loser's long operation",
);
}
if loser_did_complete {
return TestResult::failed(
"Loser's terminal work should have been drained by timeout",
);
}
TestResult::passed()
})
},
)
}
pub fn cancel_005_no_interference_completed<RT: RuntimeInterface + Sync>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "cancel-005".to_string(),
name: "Timeout no-op on completed task".to_string(),
description: "A task that finishes before its timeout returns normally".to_string(),
category: TestCategory::Cancel,
tags: vec![
"cancel".to_string(),
"timeout".to_string(),
"no-op".to_string(),
],
expected: "Task result is returned without modification".to_string(),
},
|rt| {
rt.block_on(async {
let result = rt
.timeout(Duration::from_millis(500), async { 42i32 })
.await;
checkpoint(
"completed_before_timeout",
serde_json::json!({
"result": format!("{:?}", result)
}),
);
match result {
Ok(value) => {
if value != 42 {
return TestResult::failed(format!(
"Expected 42, got {}; timeout corrupted result",
value
));
}
TestResult::passed()
}
Err(_) => {
TestResult::failed("Task completed immediately but timeout fired anyway")
}
}
})
},
)
}
pub fn cancel_006_multiple_timeouts<RT: RuntimeInterface + Sync>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "cancel-006".to_string(),
name: "Multiple timeouts resolve independently".to_string(),
description: "Each timeout resolves correctly regardless of prior timeouts".to_string(),
category: TestCategory::Cancel,
tags: vec![
"cancel".to_string(),
"timeout".to_string(),
"independence".to_string(),
],
expected: "Short/medium timeouts fire; long timeout does not".to_string(),
},
|rt| {
rt.block_on(async {
let start1 = Instant::now();
let sleep1 = rt.sleep(Duration::from_millis(500));
let r1 = rt.timeout(Duration::from_millis(30), sleep1).await;
let elapsed1 = start1.elapsed();
let start2 = Instant::now();
let sleep2 = rt.sleep(Duration::from_millis(500));
let r2 = rt.timeout(Duration::from_millis(80), sleep2).await;
let elapsed2 = start2.elapsed();
let start3 = Instant::now();
let sleep3 = rt.sleep(Duration::from_millis(10));
let r3 = rt.timeout(Duration::from_millis(500), sleep3).await;
let elapsed3 = start3.elapsed();
checkpoint(
"timeout_results",
serde_json::json!({
"short": {"timed_out": r1.is_err(), "elapsed_ms": elapsed1.as_millis().min(u128::from(u64::MAX)) as u64},
"medium": {"timed_out": r2.is_err(), "elapsed_ms": elapsed2.as_millis().min(u128::from(u64::MAX)) as u64},
"long": {"timed_out": r3.is_err(), "elapsed_ms": elapsed3.as_millis().min(u128::from(u64::MAX)) as u64}
}),
);
if r1.is_ok() {
return TestResult::failed("Short timeout (30ms) should have fired");
}
if r2.is_ok() {
return TestResult::failed("Medium timeout (80ms) should have fired");
}
if r3.is_err() {
return TestResult::failed(
"Long timeout (500ms) should NOT have fired (task completes in 10ms)",
);
}
if elapsed1 > Duration::from_millis(200) {
return TestResult::failed(format!(
"Short timeout took too long: {:?}",
elapsed1
));
}
if elapsed2 > Duration::from_millis(300) {
return TestResult::failed(format!(
"Medium timeout took too long: {:?}",
elapsed2
));
}
TestResult::passed()
})
},
)
}
pub fn cancel_007_cancel_propagates_via_drop<RT: RuntimeInterface + Sync>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "cancel-007".to_string(),
name: "Cancellation propagates via resource drop".to_string(),
description: "Cancelled future drops owned resources, observable by receivers"
.to_string(),
category: TestCategory::Cancel,
tags: vec![
"cancel".to_string(),
"propagation".to_string(),
"drop".to_string(),
"invariant".to_string(),
],
expected: "Receivers observe channel closure after sender's future is cancelled"
.to_string(),
},
|rt| {
rt.block_on(async {
let (tx1, rx1) = rt.oneshot_channel::<i32>();
let (tx2, rx2) = rt.oneshot_channel::<i32>();
let (tx3, rx3) = rt.oneshot_channel::<i32>();
let long_sleep = rt.sleep(Duration::from_millis(500));
let timeout_result = rt
.timeout(Duration::from_millis(50), async move {
long_sleep.await;
let _ = tx1.send(1);
let _ = tx2.send(2);
let _ = tx3.send(3);
})
.await;
checkpoint(
"timeout_fired",
serde_json::json!({
"timed_out": timeout_result.is_err()
}),
);
if timeout_result.is_ok() {
return TestResult::failed("Timeout should have fired before sends executed");
}
let r1 = rx1.await;
let r2 = rx2.await;
let r3 = rx3.await;
let all_closed = r1.is_err() && r2.is_err() && r3.is_err();
checkpoint(
"propagation_result",
serde_json::json!({
"rx1_closed": r1.is_err(),
"rx2_closed": r2.is_err(),
"rx3_closed": r3.is_err(),
"all_propagated": all_closed
}),
);
if r1.is_ok() {
return TestResult::failed(
"Receiver 1 should observe closure (sender dropped by cancellation)",
);
}
if r2.is_ok() {
return TestResult::failed(
"Receiver 2 should observe closure (sender dropped by cancellation)",
);
}
if r3.is_ok() {
return TestResult::failed(
"Receiver 3 should observe closure (sender dropped by cancellation)",
);
}
TestResult::passed()
})
},
)
}