use {
super::common::*,
crate::scheduler::lanes::*,
macro_rules_attribute::apply,
std::{
sync::{
Arc,
atomic::{
AtomicUsize,
Ordering,
},
},
time::Duration,
},
};
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_sync_lane_highest_priority() {
let (scheduler, _conn) = test_scheduler();
let execution_order = Arc::new(parking_lot::Mutex::new(Vec::new()));
let yield_count = Arc::new(AtomicUsize::new(0));
for i in 0..100 {
let order = execution_order.clone();
let yields = yield_count.clone();
scheduler.queue(
move |_ctx| {
async move {
for _ in 0..5 {
yields.fetch_add(1, Ordering::SeqCst);
futures_lite::future::yield_now().await;
}
order.lock().push(format!("default_{}", i));
None
}
},
DEFAULT_LANE,
);
}
scheduler.spawn_workers();
assert!(
wait_for(
|| yield_count.load(Ordering::SeqCst) > 50,
Duration::from_secs(5)
)
.await,
"Default tasks should start yielding"
);
let order = execution_order.clone();
let sync_yields = yield_count.clone();
scheduler.queue(
|_control| {
async move {
for _ in 0..10 {
sync_yields.fetch_add(1, Ordering::SeqCst);
futures_lite::future::yield_now().await;
}
order.lock().push("sync_task".to_string());
None
}
},
SYNC_LANE,
);
assert!(
wait_for(
|| execution_order.lock().contains(&"sync_task".to_string()),
Duration::from_secs(5)
)
.await,
"Sync task did not execute"
);
assert!(
wait_for(
|| execution_order.lock().len() == 101,
Duration::from_secs(5)
)
.await,
"All tasks should complete"
);
let total_yields = yield_count.load(Ordering::SeqCst);
let expected_yields = (100 * 5) + 10;
assert_eq!(
total_yields, expected_yields,
"Expected {} yields (100 tasks * 5 + sync task * 10), got {}",
expected_yields, total_yields
);
let order = execution_order.lock();
let sync_pos = order
.iter()
.position(|s| s == "sync_task")
.expect("sync task should be in order");
assert!(
sync_pos < 100,
"Sync task should execute before all default tasks finish, but was at position {}",
sync_pos
);
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_priority_ordering() {
let (scheduler, _conn) = test_scheduler();
let execution_order = Arc::new(parking_lot::Mutex::new(Vec::new()));
let order = execution_order.clone();
scheduler.queue(
|_control| {
async move {
order.lock().push("speculative");
None
}
},
SPECULATIVE_LANE,
);
let order = execution_order.clone();
scheduler.queue(
|_control| {
async move {
order.lock().push("default");
None
}
},
DEFAULT_LANE,
);
let order = execution_order.clone();
scheduler.queue(
|_control| {
async move {
order.lock().push("sync");
None
}
},
SYNC_LANE,
);
scheduler.spawn_workers();
assert!(
wait_for(|| execution_order.lock().len() == 3, Duration::from_secs(5))
.await,
"Not all tasks completed"
);
let order = execution_order.lock();
let sync_pos = order.iter().position(|s| *s == "sync").unwrap();
let default_pos = order.iter().position(|s| *s == "default").unwrap();
let spec_pos = order.iter().position(|s| *s == "speculative").unwrap();
assert!(
sync_pos < default_pos,
"Sync should execute before default: sync={}, default={}",
sync_pos,
default_pos
);
assert!(
default_pos < spec_pos,
"Default should execute before speculative: default={}, spec={}",
default_pos,
spec_pos
);
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_same_lane_fairness() {
let (scheduler, _conn) = test_scheduler();
let completed = Arc::new(AtomicUsize::new(0));
let num_tasks = 10;
for _ in 0..num_tasks {
let completed_clone = completed.clone();
scheduler.queue(
|_control| {
async move {
for _ in 0..1000 {
futures_lite::future::yield_now().await;
}
completed_clone.fetch_add(1, Ordering::SeqCst);
None
}
},
DEFAULT_LANE,
);
}
scheduler.spawn_workers();
assert!(
wait_for(
|| completed.load(Ordering::SeqCst) == num_tasks,
Duration::from_secs(5)
)
.await,
"Not all same-lane tasks completed (starvation): {} / {}",
completed.load(Ordering::SeqCst),
num_tasks
);
}