use super::*;
use futures::{
FutureExt,
channel::{mpsc, oneshot},
executor::block_on,
future,
sink::SinkExt,
stream::{FuturesUnordered, StreamExt},
};
use std::{
cell::RefCell,
collections::{BTreeSet, HashSet},
pin::Pin,
rc::Rc,
sync::Arc,
task::{Context, Poll, Waker},
};
#[test]
fn test_foreground_executor_spawn() {
let result = TestScheduler::once(async |scheduler| {
let task = scheduler.foreground().spawn(async move { 42 });
task.await
});
assert_eq!(result, 42);
}
#[test]
fn test_background_executor_spawn() {
TestScheduler::once(async |scheduler| {
let task = scheduler.background().spawn(async move { 42 });
let result = task.await;
assert_eq!(result, 42);
});
}
#[test]
fn test_scheduler_drops_with_stalled_detached_foreground_task() {
let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default()));
let weak_scheduler = Arc::downgrade(&scheduler);
let (sender, receiver) = oneshot::channel::<()>();
scheduler
.foreground()
.spawn(async move {
receiver.await.ok();
})
.detach();
scheduler.run();
drop(scheduler);
assert!(weak_scheduler.upgrade().is_none());
drop(sender);
}
#[test]
fn test_scheduler_drops_with_stalled_detached_background_task() {
let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default()));
let weak_scheduler = Arc::downgrade(&scheduler);
let (sender, receiver) = oneshot::channel::<()>();
scheduler
.background()
.spawn(async move {
receiver.await.ok();
})
.detach();
scheduler.run();
drop(scheduler);
assert!(weak_scheduler.upgrade().is_none());
drop(sender);
}
#[test]
fn test_foreground_ordering() {
let mut traces = HashSet::new();
TestScheduler::many(if cfg!(miri) { 5 } else { 100 }, async |scheduler| {
#[derive(Hash, PartialEq, Eq)]
struct TraceEntry {
session: usize,
task: usize,
}
let trace = Rc::new(RefCell::new(Vec::new()));
let foreground_1 = scheduler.foreground();
for task in 0..10 {
foreground_1
.spawn({
let trace = trace.clone();
async move {
trace.borrow_mut().push(TraceEntry { session: 0, task });
}
})
.detach();
}
let foreground_2 = scheduler.foreground();
for task in 0..10 {
foreground_2
.spawn({
let trace = trace.clone();
async move {
trace.borrow_mut().push(TraceEntry { session: 1, task });
}
})
.detach();
}
scheduler.run();
assert_eq!(
trace
.borrow()
.iter()
.filter(|entry| entry.session == 0)
.map(|entry| entry.task)
.collect::<Vec<_>>(),
(0..10).collect::<Vec<_>>()
);
assert_eq!(
trace
.borrow()
.iter()
.filter(|entry| entry.session == 1)
.map(|entry| entry.task)
.collect::<Vec<_>>(),
(0..10).collect::<Vec<_>>()
);
traces.insert(trace.take());
});
assert!(traces.len() > 1, "Expected at least two traces");
}
#[test]
fn test_timer_ordering() {
TestScheduler::many(1, async |scheduler| {
let background = scheduler.background();
let futures = FuturesUnordered::new();
futures.push(
async {
background.timer(Duration::from_millis(100)).await;
2
}
.boxed(),
);
futures.push(
async {
background.timer(Duration::from_millis(50)).await;
1
}
.boxed(),
);
futures.push(
async {
background.timer(Duration::from_millis(150)).await;
3
}
.boxed(),
);
assert_eq!(futures.collect::<Vec<_>>().await, vec![1, 2, 3]);
});
}
#[test]
fn test_foreground_task_can_hold_mut_borrow_across_await() {
TestScheduler::once(async |scheduler| {
let foreground = scheduler.foreground();
let (sender, mut receiver) = mpsc::unbounded::<()>();
foreground
.spawn(async move {
receiver.next().await;
})
.detach();
scheduler.run();
sender.unbounded_send(()).unwrap();
scheduler.run();
});
}
#[test]
fn test_send_from_bg_to_fg() {
TestScheduler::once(async |scheduler| {
let foreground = scheduler.foreground();
let background = scheduler.background();
let (sender, receiver) = oneshot::channel::<i32>();
background
.spawn(async move {
sender.send(42).unwrap();
})
.detach();
let task = foreground.spawn(async move { receiver.await.unwrap() });
let result = task.await;
assert_eq!(result, 42);
});
}
#[test]
fn test_randomize_order() {
let mut deterministic_results = HashSet::new();
for seed in 0..10 {
let config = TestSchedulerConfig {
seed,
randomize_order: false,
..Default::default()
};
let order = block_on(capture_execution_order(config));
assert_eq!(order.len(), 6);
deterministic_results.insert(order);
}
assert_eq!(
deterministic_results.len(),
1,
"Deterministic mode should always produce same execution order"
);
let mut randomized_results = HashSet::new();
for seed in 0..20 {
let config = TestSchedulerConfig::with_seed(seed);
let order = block_on(capture_execution_order(config));
assert_eq!(order.len(), 6);
randomized_results.insert(order);
}
assert!(
randomized_results.len() > 1,
"Randomized mode should produce multiple different orders"
);
}
async fn capture_execution_order(config: TestSchedulerConfig) -> Vec<String> {
let scheduler = Arc::new(TestScheduler::new(config));
let foreground = scheduler.foreground();
let background = scheduler.background();
let (sender, receiver) = mpsc::unbounded::<String>();
for i in 0..3 {
let mut sender = sender.clone();
foreground
.spawn(async move {
sender.send(format!("fg-{}", i)).await.ok();
})
.detach();
}
for i in 0..3 {
let mut sender = sender.clone();
background
.spawn(async move {
sender.send(format!("bg-{}", i)).await.ok();
})
.detach();
}
drop(sender); scheduler.run();
receiver.collect().await
}
#[test]
fn test_block() {
let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default()));
let (tx, rx) = oneshot::channel();
let _ = scheduler
.background()
.spawn(async move {
tx.send(42).unwrap();
})
.detach();
let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
assert_eq!(result, 42);
}
#[test]
#[should_panic(expected = "Parking forbidden.")]
fn test_parking_panics() {
let config = TestSchedulerConfig {
capture_pending_traces: true,
..Default::default()
};
let scheduler = Arc::new(TestScheduler::new(config));
scheduler.foreground().block_on(async {
let (_tx, rx) = oneshot::channel::<()>();
rx.await.unwrap(); });
}
#[test]
fn test_block_with_parking() {
let config = TestSchedulerConfig {
allow_parking: true,
..Default::default()
};
let scheduler = Arc::new(TestScheduler::new(config));
let (tx, rx) = oneshot::channel();
let _ = scheduler
.background()
.spawn(async move {
tx.send(42).unwrap();
})
.detach();
let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
assert_eq!(result, 42);
}
#[test]
fn test_helper_methods() {
let result = TestScheduler::once(async |scheduler: Arc<TestScheduler>| {
let background = scheduler.background();
background.spawn(async { 42 }).await
});
assert_eq!(result, 42);
let results = TestScheduler::many(3, async |scheduler: Arc<TestScheduler>| {
let background = scheduler.background();
background.spawn(async { 10 }).await
});
assert_eq!(results, vec![10, 10, 10]);
}
#[test]
fn test_many_with_arbitrary_seed() {
for seed in [0u64, 1, 5, 42] {
let mut seeds_seen = Vec::new();
let iterations = 3usize;
for current_seed in seed..seed + iterations as u64 {
let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::with_seed(
current_seed,
)));
let captured_seed = current_seed;
scheduler
.foreground()
.block_on(async { seeds_seen.push(captured_seed) });
scheduler.run();
}
assert_eq!(
seeds_seen,
(seed..seed + iterations as u64).collect::<Vec<_>>(),
"Expected {iterations} iterations starting at seed {seed}"
);
}
}
#[test]
fn test_block_with_timeout() {
TestScheduler::once(async |scheduler| {
let foreground = scheduler.foreground();
let future = future::ready(42);
let output = foreground.block_with_timeout(Duration::from_millis(100), future);
assert_eq!(output.ok(), Some(42));
});
TestScheduler::once(async |scheduler| {
scheduler.set_timeout_ticks(0..=0);
let foreground = scheduler.foreground();
let future = future::pending::<()>();
let output = foreground.block_with_timeout(Duration::from_millis(50), future);
assert!(output.is_err(), "future should not have finished");
});
let mut results = BTreeSet::new();
TestScheduler::many(if cfg!(miri) { 5 } else { 100 }, async |scheduler| {
let task = scheduler.background().spawn(async move {
Yield { polls: 10 }.await;
42
});
let output = scheduler
.foreground()
.block_with_timeout(Duration::from_millis(50), task);
results.insert(output.ok());
});
assert_eq!(
results.into_iter().collect::<Vec<_>>(),
if cfg!(miri) {
vec![Some(42)]
} else {
vec![None, Some(42)]
}
);
TestScheduler::once(async |scheduler| {
scheduler.set_timeout_ticks(0..=0);
let background = scheduler.background();
let task = background.spawn({
let scheduler = scheduler.clone();
async move {
scheduler.timer(Duration::from_millis(100)).await;
123
}
});
let timed_out = scheduler
.foreground()
.block_with_timeout(Duration::from_millis(50), task);
assert!(
timed_out.is_err(),
"expected timeout before advancing the clock enough for the timer"
);
let mut task = timed_out.err().unwrap();
scheduler.advance_clock(Duration::from_millis(100));
scheduler.run();
let output = scheduler.foreground().block_on(&mut task);
assert_eq!(output, 123);
});
}
#[test]
fn test_block_does_not_progress_same_session_foreground() {
let mut task2_made_progress_once = false;
TestScheduler::many(if cfg!(miri) { 5 } else { 1000 }, async |scheduler| {
let foreground1 = scheduler.foreground();
let foreground2 = scheduler.foreground();
let task1 = foreground1.spawn(async move {});
let task2 = foreground2.spawn(async move {});
foreground1.block_on(async {
scheduler.yield_random().await;
assert!(!task1.is_ready());
task2_made_progress_once |= task2.is_ready();
});
task1.await;
task2.await;
});
assert!(
task2_made_progress_once,
"Expected task from different foreground executor to make progress (at least once)"
);
}
struct Yield {
polls: usize,
}
impl Future for Yield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.polls -= 1;
if self.polls == 0 {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[test]
fn test_nondeterministic_wake_detection() {
let config = TestSchedulerConfig {
allow_parking: false,
..Default::default()
};
let scheduler = Arc::new(TestScheduler::new(config));
struct SendWakerToThread {
waker_tx: Option<std::sync::mpsc::Sender<Waker>>,
}
impl Future for SendWakerToThread {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(tx) = self.waker_tx.take() {
tx.send(cx.waker().clone()).ok();
}
Poll::Ready(())
}
}
let (waker_tx, waker_rx) = std::sync::mpsc::channel::<Waker>();
scheduler.foreground().block_on(SendWakerToThread {
waker_tx: Some(waker_tx),
});
let handle = std::thread::spawn(move || {
if let Ok(waker) = waker_rx.recv() {
waker.wake();
}
});
handle.join().ok();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
scheduler.end_test();
}));
assert!(result.is_err(), "Expected end_test to panic");
let panic_payload = result.unwrap_err();
let panic_message = panic_payload
.downcast_ref::<String>()
.map(|s| s.as_str())
.or_else(|| panic_payload.downcast_ref::<&str>().copied())
.unwrap_or("<unknown panic>");
assert!(
panic_message.contains("Your test is not deterministic"),
"Expected panic message to contain non-determinism error, got: {}",
panic_message
);
}
#[test]
fn test_nondeterministic_wake_allowed_with_parking() {
let config = TestSchedulerConfig {
allow_parking: true,
..Default::default()
};
let scheduler = Arc::new(TestScheduler::new(config));
struct WakeFromExternalThread {
waker_sent: bool,
waker_tx: Option<std::sync::mpsc::Sender<Waker>>,
}
impl Future for WakeFromExternalThread {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.waker_sent {
self.waker_sent = true;
if let Some(tx) = self.waker_tx.take() {
tx.send(cx.waker().clone()).ok();
}
Poll::Pending
} else {
Poll::Ready(())
}
}
}
let (waker_tx, waker_rx) = std::sync::mpsc::channel::<Waker>();
std::thread::spawn(move || {
if let Ok(waker) = waker_rx.recv() {
waker.wake();
}
});
scheduler.foreground().block_on(WakeFromExternalThread {
waker_sent: false,
waker_tx: Some(waker_tx),
});
}
#[test]
fn test_nondeterministic_waker_drop_detection() {
let config = TestSchedulerConfig {
allow_parking: false,
..Default::default()
};
let scheduler = Arc::new(TestScheduler::new(config));
struct SendWakerToThread {
waker_tx: Option<std::sync::mpsc::Sender<Waker>>,
}
impl Future for SendWakerToThread {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(tx) = self.waker_tx.take() {
tx.send(cx.waker().clone()).ok();
}
Poll::Ready(())
}
}
let (waker_tx, waker_rx) = std::sync::mpsc::channel::<Waker>();
scheduler.foreground().block_on(SendWakerToThread {
waker_tx: Some(waker_tx),
});
let handle = std::thread::spawn(move || {
if let Ok(waker) = waker_rx.recv() {
drop(waker);
}
});
handle.join().ok();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
scheduler.end_test();
}));
assert!(result.is_err(), "Expected end_test to panic");
let panic_payload = result.unwrap_err();
let panic_message = panic_payload
.downcast_ref::<String>()
.map(|s| s.as_str())
.or_else(|| panic_payload.downcast_ref::<&str>().copied())
.unwrap_or("<unknown panic>");
assert!(
panic_message.contains("Your test is not deterministic"),
"Expected panic message to contain non-determinism error, got: {}",
panic_message
);
}
#[test]
fn test_background_priority_scheduling() {
use parking_lot::Mutex;
let mut high_before_low_count = 0;
let iterations = if cfg!(miri) { 5 } else { 100 };
for seed in 0..iterations {
let config = TestSchedulerConfig::with_seed(seed);
let scheduler = Arc::new(TestScheduler::new(config));
let background = scheduler.background();
let execution_order = Arc::new(Mutex::new(Vec::new()));
for i in 0..3 {
let order = execution_order.clone();
background
.spawn_with_priority(Priority::Low, async move {
order.lock().push(format!("low-{}", i));
})
.detach();
}
for i in 0..3 {
let order = execution_order.clone();
background
.spawn_with_priority(Priority::High, async move {
order.lock().push(format!("high-{}", i));
})
.detach();
}
scheduler.run();
let order = execution_order.lock();
let high_in_first_half = order
.iter()
.take(3)
.filter(|s| s.starts_with("high"))
.count();
if high_in_first_half >= 2 {
high_before_low_count += 1;
}
}
assert!(
high_before_low_count > iterations / 2,
"Expected high priority tasks to run before low priority tasks more often. \
Got {} out of {} iterations",
high_before_low_count,
iterations
);
}
#[test]
fn test_spawn_dedicated_basic_round_trip() {
let result = TestScheduler::once(async |scheduler| {
scheduler
.background()
.spawn_dedicated(|_executor| async { 42 })
.await
});
assert_eq!(result, 42);
}
#[test]
fn test_spawn_dedicated_not_send_future() {
let result = TestScheduler::once(async |scheduler| {
scheduler
.background()
.spawn_dedicated(|_executor| async move {
let state = Rc::new(RefCell::new(0_i32));
for _ in 0..5 {
*state.borrow_mut() += 1;
}
*state.borrow()
})
.await
});
assert_eq!(result, 5);
}
#[test]
fn test_spawn_dedicated_send_closure_captures() {
use parking_lot::Mutex;
let observed = TestScheduler::once(async |scheduler| {
let shared = Arc::new(Mutex::new(0_i32));
let shared_for_closure = shared.clone();
let returned = scheduler
.background()
.spawn_dedicated(move |_executor| {
let local = shared_for_closure;
async move {
*local.lock() = 7;
}
})
.await;
let _: () = returned;
*shared.lock()
});
assert_eq!(observed, 7);
}
#[test]
fn test_spawn_dedicated_inner_spawn_local() {
let result = TestScheduler::once(async |scheduler| {
scheduler
.background()
.spawn_dedicated(|executor| async move {
let inner = Rc::new(RefCell::new(0_i32));
let inner_for_child = inner.clone();
let child = executor.spawn(async move {
*inner_for_child.borrow_mut() = 99;
*inner_for_child.borrow()
});
child.await
})
.await
});
assert_eq!(result, 99);
}
#[test]
fn test_spawn_dedicated_determinism_under_many() {
use parking_lot::Mutex;
let outcomes = TestScheduler::many(if cfg!(miri) { 4 } else { 20 }, async |scheduler| {
let trace = Arc::new(Mutex::new(Vec::<u32>::new()));
let background = scheduler.background();
let mut tasks = Vec::new();
for id in 0..4_u32 {
let trace = trace.clone();
let task = background.spawn_dedicated(move |executor| async move {
for step in 0..3 {
trace.lock().push(id * 100 + step);
executor.spawn(async {}).await;
}
id
});
tasks.push(task);
}
let mut outputs = Vec::new();
for task in tasks {
outputs.push(task.await);
}
(trace.lock().clone(), outputs)
});
let outcomes_replay = TestScheduler::many(if cfg!(miri) { 4 } else { 20 }, async |scheduler| {
let trace = Arc::new(Mutex::new(Vec::<u32>::new()));
let background = scheduler.background();
let mut tasks = Vec::new();
for id in 0..4_u32 {
let trace = trace.clone();
let task = background.spawn_dedicated(move |executor| async move {
for step in 0..3 {
trace.lock().push(id * 100 + step);
executor.spawn(async {}).await;
}
id
});
tasks.push(task);
}
let mut outputs = Vec::new();
for task in tasks {
outputs.push(task.await);
}
(trace.lock().clone(), outputs)
});
assert_eq!(
outcomes, outcomes_replay,
"per-seed outcomes should be reproducible"
);
let any_interleaved = outcomes.iter().any(|(trace, _)| {
trace
.windows(2)
.any(|window| window[0] / 100 != window[1] / 100)
});
assert!(
any_interleaved,
"expected at least one seed to interleave dedicated tasks"
);
}
#[test]
fn test_spawn_dedicated_dropping_task_cancels_future() {
use parking_lot::Mutex;
let counter_after = TestScheduler::once(async |scheduler| {
let counter = Arc::new(Mutex::new(0_u32));
let (resume_tx, resume_rx) = oneshot::channel::<()>();
let task = {
let counter = counter.clone();
scheduler
.background()
.spawn_dedicated(move |_executor| async move {
*counter.lock() = 1;
let _ = resume_rx.await;
*counter.lock() = 2;
})
};
scheduler.run();
assert_eq!(*counter.lock(), 1);
drop(task);
let _ = resume_tx.send(());
scheduler.run();
*counter.lock()
});
assert_eq!(
counter_after, 1,
"dropping the dedicated task must cancel the root future before its second write"
);
}
#[test]
fn test_spawn_dedicated_detached_child_runs_after_root_completes() {
use parking_lot::Mutex;
let child_ran = TestScheduler::once(async |scheduler| {
let child_ran = Arc::new(Mutex::new(false));
let task = {
let child_ran = child_ran.clone();
scheduler
.background()
.spawn_dedicated(move |executor| async move {
executor
.spawn(async move {
*child_ran.lock() = true;
})
.detach();
})
};
task.await;
scheduler.run();
*child_ran.lock()
});
assert!(
child_ran,
"detached child must complete after the root, not be cancelled with it"
);
}