use {
super::common::*,
crate::scheduler::lanes::*,
macro_rules_attribute::apply,
std::{
sync::{
Arc,
atomic::{
AtomicUsize,
Ordering,
},
},
time::Duration,
},
};
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_multiple_workers_execute_concurrently() {
let (scheduler, _conn) = test_scheduler();
let num_tasks = 10;
let started = Arc::new(AtomicUsize::new(0));
let completed = Arc::new(AtomicUsize::new(0));
for _ in 0..num_tasks {
let started_clone = started.clone();
let completed_clone = completed.clone();
scheduler.queue(
|_control| {
async move {
started_clone.fetch_add(1, Ordering::SeqCst);
for _ in 0..1000 {
futures_lite::future::yield_now().await;
}
completed_clone.fetch_add(1, Ordering::SeqCst);
None
}
},
DEFAULT_LANE,
);
}
scheduler.spawn_workers();
assert!(
wait_for(
|| started.load(Ordering::SeqCst) == num_tasks,
Duration::from_secs(5)
)
.await,
"Not all tasks started"
);
assert!(
wait_for(
|| completed.load(Ordering::SeqCst) == num_tasks,
Duration::from_secs(10)
)
.await,
"Not all tasks completed"
);
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_workers_handle_multiple_lanes() {
let (scheduler, _conn) = test_scheduler();
let sync_completed = Arc::new(AtomicUsize::new(0));
let default_completed = Arc::new(AtomicUsize::new(0));
let async_completed = Arc::new(AtomicUsize::new(0));
for _ in 0..5 {
let sync_clone = sync_completed.clone();
scheduler.queue(
|_control| {
async move {
for _ in 0..1000 {
futures_lite::future::yield_now().await;
}
sync_clone.fetch_add(1, Ordering::SeqCst);
None
}
},
SYNC_LANE,
);
let default_clone = default_completed.clone();
scheduler.queue(
|_control| {
async move {
for _ in 0..1000 {
futures_lite::future::yield_now().await;
}
default_clone.fetch_add(1, Ordering::SeqCst);
None
}
},
DEFAULT_LANE,
);
let async_clone = async_completed.clone();
scheduler.queue(
|_control| {
async move {
for _ in 0..1000 {
futures_lite::future::yield_now().await;
}
async_clone.fetch_add(1, Ordering::SeqCst);
None
}
},
ASYNC_LANE1,
);
}
scheduler.spawn_workers();
assert!(
wait_for(
|| {
sync_completed.load(Ordering::SeqCst) == 5
&& default_completed.load(Ordering::SeqCst) == 5
&& async_completed.load(Ordering::SeqCst) == 5
},
Duration::from_secs(5)
)
.await,
"Not all tasks completed: sync={}, default={}, async={}",
sync_completed.load(Ordering::SeqCst),
default_completed.load(Ordering::SeqCst),
async_completed.load(Ordering::SeqCst)
);
}