use {
super::common::*,
crate::scheduler::lanes::*,
macro_rules_attribute::apply,
std::{
sync::{
Arc,
atomic::{
AtomicUsize,
Ordering,
},
},
time::Duration,
},
};
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_spawn_child_task() {
let (scheduler, _conn) = test_scheduler();
let parent_executed = Arc::new(AtomicUsize::new(0));
let child_executed = Arc::new(AtomicUsize::new(0));
let child_exec = child_executed.clone();
let parent_exec = parent_executed.clone();
scheduler.queue(
move |ctx| {
parent_exec.fetch_add(1, Ordering::SeqCst);
let child_exec_inner = child_exec.clone();
ctx.spawn_task(
move |_child_ctx| {
async move {
child_exec_inner.fetch_add(1, Ordering::SeqCst);
None
}
},
DEFAULT_LANE,
);
async move { None }
},
DEFAULT_LANE,
);
scheduler.spawn_workers();
assert!(
wait_for(
|| parent_executed.load(Ordering::SeqCst) == 1,
Duration::from_secs(5)
)
.await,
"Parent task did not execute"
);
assert!(
wait_for(
|| child_executed.load(Ordering::SeqCst) == 1,
Duration::from_secs(5)
)
.await,
"Child task did not execute"
);
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_spawn_multiple_children() {
let (scheduler, _conn) = test_scheduler();
let parent_executed = Arc::new(AtomicUsize::new(0));
let children_executed = Arc::new(AtomicUsize::new(0));
let parent_exec = parent_executed.clone();
let children_exec = children_executed.clone();
let num_children = 5;
scheduler.queue(
move |ctx| {
parent_exec.fetch_add(1, Ordering::SeqCst);
for _ in 0..num_children {
let child_exec = children_exec.clone();
ctx.spawn_task(
move |_child_ctx| {
async move {
child_exec.fetch_add(1, Ordering::SeqCst);
None
}
},
DEFAULT_LANE,
);
}
async move { None }
},
DEFAULT_LANE,
);
scheduler.spawn_workers();
assert!(
wait_for(
|| parent_executed.load(Ordering::SeqCst) == 1,
Duration::from_secs(5)
)
.await,
"Parent task did not execute"
);
assert!(
wait_for(
|| children_executed.load(Ordering::SeqCst) == num_children,
Duration::from_secs(5)
)
.await,
"Not all child tasks executed: {} / {}",
children_executed.load(Ordering::SeqCst),
num_children
);
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_nested_task_spawning() {
let (scheduler, _conn) = test_scheduler();
let parent_executed = Arc::new(AtomicUsize::new(0));
let child_executed = Arc::new(AtomicUsize::new(0));
let grandchild_executed = Arc::new(AtomicUsize::new(0));
let parent_exec = parent_executed.clone();
let child_exec = child_executed.clone();
let grandchild_exec = grandchild_executed.clone();
scheduler.queue(
move |ctx| {
parent_exec.fetch_add(1, Ordering::SeqCst);
let child_exec_inner = child_exec.clone();
let grandchild_exec_inner = grandchild_exec.clone();
ctx.spawn_task(
move |child_ctx| {
child_exec_inner.fetch_add(1, Ordering::SeqCst);
let grandchild_exec_inner2 = grandchild_exec_inner.clone();
child_ctx.spawn_task(
move |_grandchild_ctx| {
async move {
grandchild_exec_inner2.fetch_add(1, Ordering::SeqCst);
None
}
},
DEFAULT_LANE,
);
async move { None }
},
DEFAULT_LANE,
);
async move { None }
},
DEFAULT_LANE,
);
scheduler.spawn_workers();
assert!(
wait_for(
|| parent_executed.load(Ordering::SeqCst) == 1,
Duration::from_secs(5)
)
.await,
"Parent task did not execute"
);
assert!(
wait_for(
|| child_executed.load(Ordering::SeqCst) == 1,
Duration::from_secs(5)
)
.await,
"Child task did not execute"
);
assert!(
wait_for(
|| grandchild_executed.load(Ordering::SeqCst) == 1,
Duration::from_secs(5)
)
.await,
"Grandchild task did not execute"
);
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_spawn_tasks_on_different_lanes() {
let (scheduler, _conn) = test_scheduler();
let execution_order = Arc::new(parking_lot::Mutex::new(Vec::new()));
let order = execution_order.clone();
scheduler.queue(
move |ctx| {
let order_spec = order.clone();
ctx.spawn_task(
move |_| {
async move {
order_spec.lock().push("speculative");
None
}
},
SPECULATIVE_LANE,
);
let order_sync = order.clone();
ctx.spawn_task(
move |_| {
async move {
order_sync.lock().push("sync");
None
}
},
SYNC_LANE,
);
let order_default = order.clone();
ctx.spawn_task(
move |_| {
async move {
order_default.lock().push("default");
None
}
},
DEFAULT_LANE,
);
async move { None }
},
DEFAULT_LANE,
);
scheduler.spawn_workers();
assert!(
wait_for(|| execution_order.lock().len() == 3, Duration::from_secs(5))
.await,
"Not all spawned tasks completed"
);
let order = execution_order.lock();
let sync_pos = order.iter().position(|s| *s == "sync").unwrap();
let default_pos = order.iter().position(|s| *s == "default").unwrap();
let spec_pos = order.iter().position(|s| *s == "speculative").unwrap();
assert!(
sync_pos < default_pos,
"SYNC_LANE should execute before DEFAULT_LANE: sync={}, default={}",
sync_pos,
default_pos
);
assert!(
default_pos < spec_pos,
"DEFAULT_LANE should execute before SPECULATIVE_LANE: default={}, spec={}",
default_pos,
spec_pos
);
}