use {
super::common::*,
crate::{
scheduler::{
lanes::{
ASYNC_LANE1,
DEFAULT_LANE,
RPC_LANE_3,
RPC_LANE_HIGH_IDX,
RPC_LANE_LOW_IDX,
SYNC_LANE,
},
task::LaburnumTask,
},
connect::lsp::ClientId,
},
macro_rules_attribute::apply,
std::{
sync::{
Arc,
atomic::{
AtomicUsize,
Ordering,
},
},
time::Duration,
},
};
#[test]
fn rpc_queue_bubbles_up_oldest_tasks() {
let (scheduler, _conn) = test_scheduler();
let counter = Arc::new(AtomicUsize::new(0));
let task_a = {
let c = counter.clone();
LaburnumTask::new(
scheduler.clone(),
move |_ctx| {
async move {
c.fetch_add(1, Ordering::SeqCst);
None
}
},
RPC_LANE_3,
ClientId::INTERNAL,
)
};
let task_b = {
let c = counter.clone();
LaburnumTask::new(
scheduler.clone(),
move |_ctx| {
async move {
c.fetch_add(1, Ordering::SeqCst);
None
}
},
RPC_LANE_3,
ClientId::INTERNAL,
)
};
let task_c = {
let c = counter.clone();
LaburnumTask::new(
scheduler.clone(),
move |_ctx| {
async move {
c.fetch_add(1, Ordering::SeqCst);
None
}
},
RPC_LANE_3,
ClientId::INTERNAL,
)
};
scheduler.queue_rpc_task(task_a.clone());
scheduler.queue_rpc_task(task_b.clone());
scheduler.queue_rpc_task(task_c.clone());
let lane_7_len = scheduler.lane_queues[RPC_LANE_HIGH_IDX].len();
let lane_8_len = scheduler.lane_queues[RPC_LANE_HIGH_IDX + 1].len();
let lane_9_len = scheduler.lane_queues[RPC_LANE_HIGH_IDX + 2].len();
let lane_10_len = scheduler.lane_queues[RPC_LANE_LOW_IDX].len();
assert_eq!(
lane_7_len, 0,
"Lane 7 (highest priority) should be empty - needs 4+ insertions to fill"
);
assert_eq!(
lane_8_len, 1,
"Lane 8 should have oldest task (A) after 3 insertions"
);
assert_eq!(
lane_9_len, 1,
"Lane 9 should have middle task (B) after 3 insertions"
);
assert_eq!(
lane_10_len, 1,
"Lane 10 (lowest priority) should have newest task (C)"
);
}
#[test]
fn rpc_queue_inserts_at_lowest_priority() {
let (scheduler, _conn) = test_scheduler();
let task =
LaburnumTask::new(scheduler.clone(), |_ctx| async { None }, RPC_LANE_3, ClientId::INTERNAL);
scheduler.queue_rpc_task(task);
assert_eq!(
scheduler.lane_queues[RPC_LANE_LOW_IDX].len(),
1,
"New RPC task should be in lowest priority lane (lane 10)"
);
assert!(
scheduler.lane_queues[RPC_LANE_HIGH_IDX].is_empty(),
"Highest priority lane (lane 7) should be empty for single insertion"
);
}
#[test]
fn rpc_queue_rotation_is_thread_safe() {
let (scheduler, _conn) = test_scheduler();
let scheduler = Arc::new(scheduler);
let handles: Vec<_> = (0..10)
.map(|_i| {
let sched = scheduler.clone();
std::thread::spawn(move || {
for _j in 0..100 {
let task = LaburnumTask::new(
(*sched).clone(),
|_ctx| async { None },
RPC_LANE_3,
ClientId::INTERNAL,
);
sched.queue_rpc_task(task);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let total: usize = (RPC_LANE_HIGH_IDX..=RPC_LANE_LOW_IDX)
.map(|idx| scheduler.lane_queues[idx].len())
.sum();
assert_eq!(total, 1000, "All 1000 RPC tasks should be queued");
}
#[test]
fn sync_lane_preempts_rpc_lanes() {
let (scheduler, _conn) = test_scheduler();
let rpc_task =
LaburnumTask::new(scheduler.clone(), |_ctx| async { None }, RPC_LANE_3, ClientId::INTERNAL);
scheduler.queue_rpc_task(rpc_task);
let sync_task =
LaburnumTask::new(scheduler.clone(), |_ctx| async { None }, SYNC_LANE, ClientId::INTERNAL);
scheduler.queue_task(sync_task);
let mut first_task_lane = None;
for lane_idx in 0..31 {
if let Ok(task) = scheduler.lane_queues[lane_idx].pop() {
first_task_lane = Some(task.lane);
break;
}
}
assert_eq!(
first_task_lane,
Some(SYNC_LANE),
"Sync task should be stolen before RPC tasks"
);
}
#[test]
fn rpc_queue_maintains_fifo_within_priority() {
let (scheduler, _conn) = test_scheduler();
for _ in 0..5 {
let task =
LaburnumTask::new(scheduler.clone(), |_ctx| async { None }, RPC_LANE_3, ClientId::INTERNAL);
scheduler.queue_rpc_task(task);
}
let lane_7 = scheduler.lane_queues[RPC_LANE_HIGH_IDX].len();
let lane_8 = scheduler.lane_queues[RPC_LANE_HIGH_IDX + 1].len();
let lane_9 = scheduler.lane_queues[RPC_LANE_HIGH_IDX + 2].len();
let lane_10 = scheduler.lane_queues[RPC_LANE_LOW_IDX].len();
assert_eq!(lane_7, 2, "Lane 7 should have 2 oldest tasks");
assert_eq!(lane_8, 1, "Lane 8 should have 1 task");
assert_eq!(lane_9, 1, "Lane 9 should have 1 task");
assert_eq!(lane_10, 1, "Lane 10 should have 1 newest task");
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn worker_steals_rpc_tasks_oldest_first() {
let (scheduler, _conn) = test_scheduler();
let execution_order = Arc::new(parking_lot::Mutex::new(Vec::new()));
for i in 0..4 {
let order = execution_order.clone();
let task = LaburnumTask::new(
scheduler.clone(),
move |_ctx| {
let order = order.clone();
async move {
order.lock().push(i);
None
}
},
RPC_LANE_3,
ClientId::INTERNAL,
);
scheduler.queue_rpc_task(task);
}
scheduler.spawn_workers();
assert!(
wait_for(|| execution_order.lock().len() == 4, Duration::from_secs(5))
.await,
"Not all RPC tasks completed"
);
let order = execution_order.lock().clone();
assert_eq!(
order,
vec![0, 1, 2, 3],
"RPC tasks should execute oldest first (FIFO via bubble-up)"
);
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn worker_processes_standard_lanes_before_rpc() {
let (scheduler, _conn) = test_scheduler();
let execution_order = Arc::new(parking_lot::Mutex::new(Vec::new()));
let order = execution_order.clone();
let rpc_task = LaburnumTask::new(
scheduler.clone(),
move |_ctx| {
let order = order.clone();
async move {
order.lock().push("rpc");
None
}
},
RPC_LANE_3,
ClientId::INTERNAL,
);
scheduler.queue_rpc_task(rpc_task);
let order = execution_order.clone();
scheduler.queue(
move |_control| {
let order = order.clone();
async move {
order.lock().push("default");
None
}
},
DEFAULT_LANE,
);
let order = execution_order.clone();
scheduler.queue(
move |_control| {
let order = order.clone();
async move {
order.lock().push("sync");
None
}
},
SYNC_LANE,
);
scheduler.spawn_workers();
assert!(
wait_for(|| execution_order.lock().len() == 3, Duration::from_secs(5))
.await,
"Not all tasks completed"
);
let order = execution_order.lock().clone();
assert_eq!(order[0], "sync", "SYNC_LANE should execute first");
assert_eq!(
order[1], "default",
"DEFAULT_LANE should execute before RPC"
);
assert_eq!(order[2], "rpc", "RPC should execute last");
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn worker_processes_rpc_before_async_lanes() {
let (scheduler, _conn) = test_scheduler();
let execution_order = Arc::new(parking_lot::Mutex::new(Vec::new()));
let order = execution_order.clone();
scheduler.queue(
move |_control| {
let order = order.clone();
async move {
order.lock().push("async");
None
}
},
ASYNC_LANE1,
);
let order = execution_order.clone();
let rpc_task = LaburnumTask::new(
scheduler.clone(),
move |_ctx| {
let order = order.clone();
async move {
order.lock().push("rpc");
None
}
},
RPC_LANE_3,
ClientId::INTERNAL,
);
scheduler.queue_rpc_task(rpc_task);
scheduler.spawn_workers();
assert!(
wait_for(|| execution_order.lock().len() == 2, Duration::from_secs(5))
.await,
"Not all tasks completed"
);
let order = execution_order.lock().clone();
assert_eq!(order[0], "rpc", "RPC should execute before ASYNC_LANE");
assert_eq!(order[1], "async", "ASYNC_LANE should execute after RPC");
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn rpc_task_requeue_interleaves_fairly() {
let (scheduler, _conn) = test_scheduler();
let poll_sequence = Arc::new(parking_lot::Mutex::new(Vec::new()));
let completed = Arc::new(AtomicUsize::new(0));
let seq_a = poll_sequence.clone();
let done = completed.clone();
let task_a = LaburnumTask::new(
scheduler.clone(),
move |_ctx| {
let seq = seq_a.clone();
let done = done.clone();
async move {
for _ in 0..4 {
seq.lock().push('A');
futures_lite::future::yield_now().await;
}
done.fetch_add(1, Ordering::SeqCst);
None
}
},
RPC_LANE_3,
ClientId::INTERNAL,
);
let seq_b = poll_sequence.clone();
let done = completed.clone();
let task_b = LaburnumTask::new(
scheduler.clone(),
move |_ctx| {
let seq = seq_b.clone();
let done = done.clone();
async move {
for _ in 0..4 {
seq.lock().push('B');
futures_lite::future::yield_now().await;
}
done.fetch_add(1, Ordering::SeqCst);
None
}
},
RPC_LANE_3,
ClientId::INTERNAL,
);
scheduler.queue_rpc_task(task_a);
scheduler.queue_rpc_task(task_b);
scheduler.spawn_workers();
assert!(
wait_for(
|| completed.load(Ordering::SeqCst) == 2,
Duration::from_secs(5)
)
.await,
"Not all RPC tasks completed"
);
let sequence = poll_sequence.lock().clone();
assert_eq!(
sequence.len(),
8,
"Expected 8 polls total (4 per task), got {}",
sequence.len()
);
let a_count = sequence.iter().filter(|&&c| c == 'A').count();
let b_count = sequence.iter().filter(|&&c| c == 'B').count();
assert_eq!(a_count, 4, "Task A should be polled 4 times");
assert_eq!(b_count, 4, "Task B should be polled 4 times");
let mut max_consecutive = 1;
let mut current_consecutive = 1;
for i in 1..sequence.len() {
if sequence[i] == sequence[i - 1] {
current_consecutive += 1;
max_consecutive = max_consecutive.max(current_consecutive);
} else {
current_consecutive = 1;
}
}
assert!(
max_consecutive <= 2,
"Tasks should interleave fairly. Max consecutive same-task polls: {}. Sequence: {:?}",
max_consecutive,
sequence
);
}
#[test]
fn rpc_queue_concurrent_push_and_pop() {
let (scheduler, _conn) = test_scheduler();
let scheduler = Arc::new(scheduler);
let total_tasks = 500;
let stolen = Arc::new(AtomicUsize::new(0));
let push_handles: Vec<_> = (0..5)
.map(|_| {
let sched = scheduler.clone();
std::thread::spawn(move || {
for _ in 0..100 {
let task = LaburnumTask::new(
(*sched).clone(),
|_ctx| async { None },
RPC_LANE_3,
ClientId::INTERNAL,
);
sched.queue_rpc_task(task);
}
})
})
.collect();
let pop_handles: Vec<_> = (0..5)
.map(|_| {
let sched = scheduler.clone();
let stolen = stolen.clone();
std::thread::spawn(move || {
let mut local_stolen = 0;
for _ in 0..200 {
for idx in RPC_LANE_HIGH_IDX..=RPC_LANE_LOW_IDX {
if sched.lane_queues[idx].pop().is_ok() {
local_stolen += 1;
}
}
std::thread::yield_now();
}
stolen.fetch_add(local_stolen, Ordering::SeqCst);
})
})
.collect();
for h in push_handles {
h.join().unwrap();
}
for h in pop_handles {
h.join().unwrap();
}
let remaining: usize = (RPC_LANE_HIGH_IDX..=RPC_LANE_LOW_IDX)
.map(|idx| scheduler.lane_queues[idx].len())
.sum();
let total_stolen = stolen.load(Ordering::SeqCst);
assert_eq!(
remaining + total_stolen,
total_tasks,
"All {} tasks should be accounted for: {} remaining + {} stolen",
total_tasks,
remaining,
total_stolen
);
}
#[test]
fn rpc_queue_no_task_loss_under_contention() {
let (scheduler, _conn) = test_scheduler();
let scheduler = Arc::new(scheduler);
let task_ids =
Arc::new(parking_lot::Mutex::new(std::collections::HashSet::new()));
let completed_ids =
Arc::new(parking_lot::Mutex::new(std::collections::HashSet::new()));
for i in 0..100 {
task_ids.lock().insert(i);
let completed = completed_ids.clone();
let task = LaburnumTask::new(
(*scheduler).clone(),
move |_ctx| {
let completed = completed.clone();
async move {
completed.lock().insert(i);
None
}
},
RPC_LANE_3,
ClientId::INTERNAL,
);
scheduler.queue_rpc_task(task);
}
let handles: Vec<_> = (0..4)
.map(|_| {
let sched = scheduler.clone();
std::thread::spawn(move || {
for _ in 0..50 {
for idx in RPC_LANE_HIGH_IDX..=RPC_LANE_LOW_IDX {
if let Ok(task) = sched.lane_queues[idx].pop() {
let _ = task.poll_once();
}
}
std::thread::yield_now();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
for idx in RPC_LANE_HIGH_IDX..=RPC_LANE_LOW_IDX {
while let Ok(task) = scheduler.lane_queues[idx].pop() {
let _ = task.poll_once();
}
}
let original = task_ids.lock().clone();
let completed = completed_ids.lock().clone();
assert_eq!(
original,
completed,
"All queued tasks should complete. Missing: {:?}",
original.difference(&completed).collect::<Vec<_>>()
);
}