use {
super::common::*,
crate::scheduler::lanes::*,
macro_rules_attribute::apply,
std::{
future::Future,
pin::Pin,
sync::{
Arc,
atomic::{
AtomicBool,
AtomicUsize,
Ordering,
},
},
task::{
Context,
Poll,
},
time::Duration,
},
};
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_task_runs_to_completion() {
let (scheduler, _conn) = test_scheduler();
scheduler.spawn_workers();
let completed = Arc::new(AtomicBool::new(false));
let completed_clone = completed.clone();
scheduler.queue(
|_control| {
async move {
completed_clone.store(true, Ordering::SeqCst);
None
}
},
DEFAULT_LANE,
);
assert!(
wait_for(|| completed.load(Ordering::SeqCst), Duration::from_secs(5)).await,
"Task did not complete within timeout"
);
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_multiple_tasks_complete() {
let (scheduler, _conn) = test_scheduler();
let counter = Arc::new(AtomicUsize::new(0));
let num_tasks = 10;
for _ in 0..num_tasks {
let counter_clone = counter.clone();
scheduler.queue(
|_control| {
async move {
for _ in 0..1000 {
futures_lite::future::yield_now().await;
}
counter_clone.fetch_add(1, Ordering::SeqCst);
None
}
},
DEFAULT_LANE,
);
}
scheduler.spawn_workers();
assert!(
wait_for(
|| counter.load(Ordering::SeqCst) == num_tasks,
Duration::from_secs(5)
)
.await,
"Not all tasks completed: {} / {}",
counter.load(Ordering::SeqCst),
num_tasks
);
}
struct YieldingFuture {
yields_remaining: usize,
completed: Arc<AtomicBool>,
}
impl Future for YieldingFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.yields_remaining > 0 {
self.yields_remaining -= 1;
cx.waker().wake_by_ref();
Poll::Pending
} else {
self.completed.store(true, Ordering::SeqCst);
Poll::Ready(())
}
}
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_task_yields_and_resumes() {
let (scheduler, _conn) = test_scheduler();
scheduler.spawn_workers();
let completed = Arc::new(AtomicBool::new(false));
let future = YieldingFuture {
yields_remaining: 5,
completed: completed.clone(),
};
scheduler.queue(
|_control| {
async move {
let mut yielding = future;
(&mut yielding).await;
None
}
},
DEFAULT_LANE,
);
assert!(
wait_for(|| completed.load(Ordering::SeqCst), Duration::from_secs(5)).await,
"Yielding task did not complete within timeout"
);
}