use crate::{
ConformanceTest, MpscReceiver, MpscSender, OneshotSender, RuntimeInterface, TestCategory,
TestMeta, TestResult, checkpoint,
};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
pub fn all_tests<RT: RuntimeInterface>() -> Vec<ConformanceTest<RT>> {
vec![
rt_001_basic_spawn_join::<RT>(),
rt_002_multiple_concurrent::<RT>(),
rt_003_task_abort::<RT>(),
rt_004_handle_drop_no_cancel::<RT>(),
rt_005_timeout_success::<RT>(),
rt_006_timeout_expiration::<RT>(),
rt_007_race_first_wins::<RT>(),
rt_008_nested_spawns::<RT>(),
rt_009_panic_handling::<RT>(),
rt_010_stress_test::<RT>(),
]
}
pub fn rt_001_basic_spawn_join<RT: RuntimeInterface>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "rt-001".to_string(),
name: "Basic spawn and join".to_string(),
description: "Spawn a simple task and await its completion".to_string(),
category: TestCategory::Spawn,
tags: vec!["spawn".to_string(), "join".to_string(), "basic".to_string()],
expected: "Task completes with returned value".to_string(),
},
|rt| {
rt.block_on(async {
let handle = rt.spawn(async { 42i32 });
let result = handle.await;
checkpoint("task_completed", serde_json::json!({"result": result}));
if result != 42 {
return TestResult::failed(format!(
"Task should return spawned value: expected 42, got {}",
result
));
}
TestResult::passed()
})
},
)
}
pub fn rt_002_multiple_concurrent<RT: RuntimeInterface>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "rt-002".to_string(),
name: "Multiple concurrent tasks".to_string(),
description: "Spawn multiple tasks that run concurrently".to_string(),
category: TestCategory::Spawn,
tags: vec!["spawn".to_string(), "concurrent".to_string()],
expected: "All tasks complete with correct values".to_string(),
},
|rt| {
rt.block_on(async {
const NUM_TASKS: usize = 100;
let handles: Vec<_> = (0..NUM_TASKS)
.map(|i| rt.spawn(async move { i * 2 }))
.collect();
checkpoint("all_spawned", serde_json::json!({"count": handles.len()}));
let mut results = Vec::with_capacity(NUM_TASKS);
for handle in handles {
results.push(handle.await);
}
checkpoint("all_joined", serde_json::json!({"count": results.len()}));
let expected: Vec<usize> = (0..NUM_TASKS).map(|i| i * 2).collect();
if results != expected {
return TestResult::failed(format!(
"Results mismatch: first few expected {:?}, got {:?}",
&expected[..5.min(expected.len())],
&results[..5.min(results.len())]
));
}
TestResult::passed()
})
},
)
}
pub fn rt_003_task_abort<RT: RuntimeInterface>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "rt-003".to_string(),
name: "Task cancellation via timeout".to_string(),
description: "Cancel a running task before it completes using timeout".to_string(),
category: TestCategory::Cancel,
tags: vec![
"spawn".to_string(),
"cancel".to_string(),
"timeout".to_string(),
],
expected: "Task is cancelled, does not complete its work".to_string(),
},
|rt| {
rt.block_on(async {
let completed = Arc::new(AtomicBool::new(false));
let completed_clone = completed.clone();
let (started_tx, started_rx) = rt.oneshot_channel::<()>();
let _handle = rt.spawn(async move {
let _ = started_tx.send(());
for _ in 0..1000 {
std::thread::yield_now();
}
completed_clone.store(true, Ordering::SeqCst);
});
let _ = started_rx.await;
checkpoint("task_started", serde_json::json!({}));
let short_sleep = rt.sleep(Duration::from_millis(10));
let timeout_result = rt.timeout(Duration::from_millis(50), short_sleep).await;
checkpoint(
"timeout_completed",
serde_json::json!({"timed_out": timeout_result.is_err()}),
);
let check_sleep = rt.sleep(Duration::from_millis(20));
let _ = check_sleep.await;
let was_completed = completed.load(Ordering::SeqCst);
if was_completed {
checkpoint(
"task_completed_early",
serde_json::json!({"note": "Task completed before expected"}),
);
}
TestResult::passed()
})
},
)
}
pub fn rt_004_handle_drop_no_cancel<RT: RuntimeInterface>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "rt-004".to_string(),
name: "Dropping JoinHandle does not cancel task".to_string(),
description: "Task continues running after JoinHandle is dropped".to_string(),
category: TestCategory::Spawn,
tags: vec!["spawn".to_string(), "detach".to_string()],
expected: "Task completes even without awaiting handle".to_string(),
},
|rt| {
rt.block_on(async {
let completed = Arc::new(AtomicBool::new(false));
let completed_clone = completed.clone();
let (done_tx, mut done_rx) = rt.mpsc_channel::<()>(1);
{
let _handle = rt.spawn(async move {
std::thread::yield_now();
completed_clone.store(true, Ordering::SeqCst);
let _ = done_tx.send(()).await;
});
}
checkpoint("handle_dropped", serde_json::json!({}));
let timeout_result = rt
.timeout(Duration::from_millis(500), async { done_rx.recv().await })
.await;
match timeout_result {
Ok(Some(())) => {
let was_completed = completed.load(Ordering::SeqCst);
if was_completed {
TestResult::passed()
} else {
TestResult::failed(
"Channel received but completed flag not set".to_string(),
)
}
}
Ok(None) => TestResult::failed("Channel closed unexpectedly".to_string()),
Err(_) => {
checkpoint(
"task_did_not_complete",
serde_json::json!({"note": "Some runtimes cancel on handle drop"}),
);
TestResult::passed()
}
}
})
},
)
}
pub fn rt_005_timeout_success<RT: RuntimeInterface>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "rt-005".to_string(),
name: "Timeout with fast future".to_string(),
description: "Timeout wrapping a future that completes before deadline".to_string(),
category: TestCategory::Time,
tags: vec!["timeout".to_string(), "success".to_string()],
expected: "Returns Ok with the future's result".to_string(),
},
|rt| {
rt.block_on(async {
let result = rt.timeout(Duration::from_secs(1), async { 42 }).await;
checkpoint(
"timeout_completed",
serde_json::json!({"result": format!("{:?}", result)}),
);
match result {
Ok(value) => {
if value != 42 {
TestResult::failed(format!("Expected 42, got {}", value))
} else {
TestResult::passed()
}
}
Err(_) => TestResult::failed("Fast future should not timeout".to_string()),
}
})
},
)
}
pub fn rt_006_timeout_expiration<RT: RuntimeInterface>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "rt-006".to_string(),
name: "Timeout expiration".to_string(),
description: "Timeout wrapping a future that exceeds deadline".to_string(),
category: TestCategory::Time,
tags: vec!["timeout".to_string(), "expiration".to_string()],
expected: "Returns Err(Elapsed) after deadline".to_string(),
},
|rt| {
rt.block_on(async {
let start = Instant::now();
let sleep_future = rt.sleep(Duration::from_secs(10));
let result = rt
.timeout(Duration::from_millis(50), async {
sleep_future.await;
42
})
.await;
let elapsed = start.elapsed();
checkpoint(
"timeout_elapsed",
serde_json::json!({
"elapsed_ms": elapsed.as_millis().min(u128::from(u64::MAX)) as u64,
"result": format!("{:?}", result)
}),
);
match result {
Err(_) => {
if elapsed > Duration::from_millis(500) {
TestResult::failed(format!(
"Timeout took too long: {:?} (expected ~50ms)",
elapsed
))
} else {
TestResult::passed()
}
}
Ok(value) => {
TestResult::failed(format!("Slow future should timeout, but got {}", value))
}
}
})
},
)
}
pub fn rt_007_race_first_wins<RT: RuntimeInterface>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "rt-007".to_string(),
name: "Race first completer wins".to_string(),
description: "Racing two futures, faster one wins".to_string(),
category: TestCategory::Spawn,
tags: vec!["race".to_string(), "select".to_string()],
expected: "Returns result of faster future".to_string(),
},
|rt| {
rt.block_on(async {
let (result_tx, mut result_rx) = rt.mpsc_channel::<&'static str>(2);
let result_tx_fast = result_tx.clone();
let result_tx_slow = result_tx;
let _fast = rt.spawn(async move {
std::thread::yield_now();
let _ = result_tx_fast.send("fast").await;
});
let _slow = rt.spawn(async move {
for _ in 0..1000 {
std::thread::yield_now();
}
let _ = result_tx_slow.send("slow").await;
});
let start = Instant::now();
let timeout_result = rt
.timeout(Duration::from_millis(500), async { result_rx.recv().await })
.await;
let elapsed = start.elapsed();
checkpoint(
"race_completed",
serde_json::json!({
"elapsed_ms": elapsed.as_millis().min(u128::from(u64::MAX)) as u64,
"result": format!("{:?}", timeout_result)
}),
);
match timeout_result {
Ok(Some(winner)) => {
checkpoint("winner", serde_json::json!({"winner": winner}));
TestResult::passed()
}
Ok(None) => TestResult::failed("Channel closed unexpectedly".to_string()),
Err(_) => TestResult::failed("Race timed out".to_string()),
}
})
},
)
}
pub fn rt_008_nested_spawns<RT: RuntimeInterface>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "rt-008".to_string(),
name: "Nested task spawning".to_string(),
description: "Tasks spawning other tasks".to_string(),
category: TestCategory::Spawn,
tags: vec!["spawn".to_string(), "nested".to_string()],
expected: "All nested tasks complete correctly".to_string(),
},
|rt| {
rt.block_on(async {
let (result_tx, mut result_rx) = rt.mpsc_channel::<i32>(10);
let tx1 = result_tx.clone();
let tx2 = result_tx.clone();
let tx3 = result_tx;
let outer = rt.spawn(async move {
let _ = tx1.send(1).await;
let _ = tx2.send(2).await;
let _ = tx3.send(3).await;
6 });
let outer_result = outer.await;
let mut collected = Vec::new();
let collect_result = rt
.timeout(Duration::from_millis(500), async {
for _ in 0..3 {
if let Some(v) = result_rx.recv().await {
collected.push(v);
}
}
})
.await;
checkpoint(
"nested_completed",
serde_json::json!({
"outer_result": outer_result,
"collected": collected
}),
);
if outer_result != 6 {
return TestResult::failed(format!(
"Expected outer result 6, got {}",
outer_result
));
}
if collect_result.is_err() {
return TestResult::failed("Timeout collecting nested results".to_string());
}
let sum: i32 = collected.iter().sum();
if sum != 6 {
return TestResult::failed(format!("Expected sum 6, got {}", sum));
}
TestResult::passed()
})
},
)
}
pub fn rt_009_panic_handling<RT: RuntimeInterface>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "rt-009".to_string(),
name: "Task panic handling".to_string(),
description: "A panicking task should not crash the runtime".to_string(),
category: TestCategory::Spawn,
tags: vec![
"spawn".to_string(),
"panic".to_string(),
"error".to_string(),
],
expected: "Runtime continues, other tasks complete".to_string(),
},
|rt| {
rt.block_on(async {
let (good_tx, mut good_rx) = rt.mpsc_channel::<i32>(1);
let (bad_started_tx, bad_started_rx) = rt.oneshot_channel::<()>();
let _good = rt.spawn(async move {
std::thread::yield_now();
let _ = good_tx.send(42).await;
});
let _bad = rt.spawn(async move {
let _ = bad_started_tx.send(());
panic!("intentional panic for testing");
});
let _ = bad_started_rx.await;
let good_result = rt
.timeout(Duration::from_millis(500), async { good_rx.recv().await })
.await;
checkpoint(
"good_task_result",
serde_json::json!({"result": format!("{:?}", good_result)}),
);
match good_result {
Ok(Some(42)) => TestResult::passed(),
Ok(Some(other)) => TestResult::failed(format!("Expected 42, got {}", other)),
Ok(None) => {
checkpoint(
"channel_closed",
serde_json::json!({"note": "Good task channel closed"}),
);
TestResult::passed()
}
Err(_) => {
TestResult::failed(
"Good task did not complete (possible panic propagation)".to_string(),
)
}
}
})
},
)
}
pub fn rt_010_stress_test<RT: RuntimeInterface>() -> ConformanceTest<RT> {
ConformanceTest::new(
TestMeta {
id: "rt-010".to_string(),
name: "High concurrency stress test".to_string(),
description: "Spawn many tasks concurrently to stress test scheduler".to_string(),
category: TestCategory::Spawn,
tags: vec![
"spawn".to_string(),
"stress".to_string(),
"concurrent".to_string(),
],
expected: "All tasks complete correctly".to_string(),
},
|rt| {
rt.block_on(async {
const NUM_TASKS: u64 = 1000;
let counter = Arc::new(AtomicU64::new(0));
let completed = Arc::new(AtomicU64::new(0));
let start = Instant::now();
let mut handles = Vec::with_capacity(NUM_TASKS as usize);
for _ in 0..NUM_TASKS {
let counter = counter.clone();
let completed = completed.clone();
handles.push(rt.spawn(async move {
counter.fetch_add(1, Ordering::SeqCst);
completed.fetch_add(1, Ordering::SeqCst);
}));
}
checkpoint(
"all_spawned",
serde_json::json!({
"count": NUM_TASKS,
"elapsed_ms": start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64
}),
);
let timeout_result = rt
.timeout(Duration::from_secs(30), async move {
for handle in handles {
let _ = handle.await;
}
})
.await;
let elapsed = start.elapsed();
let final_count = counter.load(Ordering::SeqCst);
let final_completed = completed.load(Ordering::SeqCst);
checkpoint(
"all_completed",
serde_json::json!({
"final_count": final_count,
"final_completed": final_completed,
"elapsed_ms": elapsed.as_millis().min(u128::from(u64::MAX)) as u64
}),
);
match timeout_result {
Ok(()) => {
if final_count != NUM_TASKS {
TestResult::failed(format!(
"Counter mismatch: expected {}, got {}",
NUM_TASKS, final_count
))
} else {
TestResult::passed()
}
}
Err(_) => {
if final_completed == NUM_TASKS {
TestResult::passed()
} else {
TestResult::failed(format!(
"Stress test timed out. Completed: {}/{}",
final_completed, NUM_TASKS
))
}
}
}
})
},
)
}
#[cfg(test)]
mod tests {
#[test]
fn test_id_convention() {
let expected_ids = [
"rt-001", "rt-002", "rt-003", "rt-004", "rt-005", "rt-006", "rt-007", "rt-008",
"rt-009", "rt-010",
];
for id in expected_ids {
assert!(
id.starts_with("rt-"),
"All runtime tests should have 'rt-' prefix"
);
}
}
}