use uuid::Uuid;
use crate::common::protocols::{DirectRequest, EngineType, MockEngineArgs};
use crate::scheduler::vllm::{RequestStatus, VllmCore};
fn engine_args(engine_type: EngineType) -> MockEngineArgs {
MockEngineArgs::builder()
.engine_type(engine_type)
.block_size(4)
.num_gpu_blocks(6)
.max_num_batched_tokens(Some(16))
.max_num_seqs(Some(4))
.enable_chunked_prefill(true)
.enable_prefix_caching(false)
.speedup_ratio(0.0)
.build()
.unwrap()
}
fn receive(core: &mut VllmCore, uuid: Uuid, tokens: std::ops::Range<u32>, max_output: usize) {
core.receive(DirectRequest {
tokens: tokens.collect(),
max_output_tokens: max_output,
uuid: Some(uuid),
dp_rank: 0,
arrival_timestamp_ms: None,
});
}
#[test]
fn admits_only_what_fits_to_completion() {
let mut core = VllmCore::new(engine_args(EngineType::Trtllm));
let r1 = Uuid::from_u128(1);
let r2 = Uuid::from_u128(2);
receive(&mut core, r1, 0..8, 8);
receive(&mut core, r2, 100..108, 8);
let mut collector = crate::replay::TraceCollector::default();
let pass = core.execute_pass(&mut collector, 0.0);
assert_eq!(
core.state().running.iter().copied().collect::<Vec<_>>(),
vec![r1],
"only r1 fits its to-completion reservation under no-evict"
);
assert!(
core.state().waiting.contains(&r2),
"r2 must remain waiting (no skip-ahead admission)"
);
assert_eq!(
core.state().requests.get(&r2).unwrap().status,
RequestStatus::Waiting,
);
assert_eq!(
pass.mocker_metrics.vllm_preemptions_total, 0,
"no-evict policy must never preempt"
);
}
#[test]
fn vllm_admits_optimistically_unlike_trtllm() {
let mut core = VllmCore::new(engine_args(EngineType::Vllm));
let r1 = Uuid::from_u128(1);
let r2 = Uuid::from_u128(2);
receive(&mut core, r1, 0..8, 8);
receive(&mut core, r2, 100..108, 8);
let mut collector = crate::replay::TraceCollector::default();
core.execute_pass(&mut collector, 0.0);
let running: Vec<_> = core.state().running.iter().copied().collect();
assert!(
running.contains(&r1) && running.contains(&r2),
"vLLM admits both requests optimistically, got {running:?}"
);
}
#[test]
fn preemption_inducing_workload_never_preempts() {
let args = MockEngineArgs::builder()
.engine_type(EngineType::Trtllm)
.block_size(4)
.num_gpu_blocks(4)
.max_num_batched_tokens(Some(8))
.max_num_seqs(Some(4))
.enable_chunked_prefill(true)
.enable_prefix_caching(false)
.speedup_ratio(0.0)
.build()
.unwrap();
let mut core = VllmCore::new(args);
let r1 = Uuid::from_u128(1);
let r2 = Uuid::from_u128(2);
receive(&mut core, r1, 0..4, 12);
receive(&mut core, r2, 100..104, 12);
let mut collector = crate::replay::TraceCollector::default();
let mut completed = 0usize;
let mut now_ms = 0.0;
let mut max_preemptions = 0u64;
for _ in 0..300 {
if core.state().requests.is_empty() {
break;
}
let pass = core.execute_pass(&mut collector, now_ms);
now_ms = pass.end_ms.max(now_ms + 1.0);
completed += pass
.output_signals
.iter()
.filter(|signal| signal.completed)
.count();
max_preemptions = max_preemptions.max(pass.mocker_metrics.vllm_preemptions_total);
}
assert!(
core.state().requests.is_empty(),
"both requests should complete; {} left",
core.state().requests.len()
);
assert_eq!(completed, 2, "both requests should finish");
assert_eq!(max_preemptions, 0, "GUARANTEED_NO_EVICT must never preempt");
}
#[test]
fn no_evict_admission_cap_matches_hardware() {
let args = MockEngineArgs::builder()
.engine_type(EngineType::Trtllm)
.block_size(32)
.num_gpu_blocks(7319)
.max_num_seqs(Some(256)) .max_num_batched_tokens(Some(8192))
.enable_chunked_prefill(true)
.enable_prefix_caching(false)
.speedup_ratio(0.0)
.build()
.unwrap();
let mut core = VllmCore::new(args);
for i in 0..64u128 {
let base = (i as u32 + 1) * 100_000;
receive(&mut core, Uuid::from_u128(i + 1), base..(base + 1096), 7000);
}
let mut collector = crate::replay::TraceCollector::default();
let mut now_ms = 0.0;
let mut max_preemptions = 0u64;
for _ in 0..40 {
let pass = core.execute_pass(&mut collector, now_ms);
now_ms = pass.end_ms.max(now_ms + 1.0);
max_preemptions = max_preemptions.max(pass.mocker_metrics.vllm_preemptions_total);
}
let running = core.state().running.len();
let waiting = core.state().waiting.len();
eprintln!(
"no-evict cap: running={running} waiting={waiting} max_preemptions={max_preemptions} (hardware=28)"
);
assert_eq!(max_preemptions, 0, "GUARANTEED_NO_EVICT must never preempt");
assert_eq!(running, 28, "mocker admission cap must match hardware (28)");
assert_eq!(
running + waiting,
64,
"the rest must stay queued, not dropped"
);
}
fn drain(core: &mut VllmCore) -> usize {
let mut collector = crate::replay::TraceCollector::default();
let mut now_ms = 0.0;
let mut completed = 0usize;
for _ in 0..100 {
if core.state().requests.is_empty() {
break;
}
let pass = core.execute_pass(&mut collector, now_ms);
now_ms = pass.end_ms.max(now_ms + 1.0);
completed += pass
.output_signals
.iter()
.filter(|signal| signal.completed)
.count();
}
completed
}
fn capacity_args() -> MockEngineArgs {
MockEngineArgs::builder()
.engine_type(EngineType::Trtllm)
.block_size(4)
.num_gpu_blocks(4)
.max_num_batched_tokens(Some(64))
.max_num_seqs(Some(4))
.enable_chunked_prefill(true)
.enable_prefix_caching(false)
.speedup_ratio(0.0)
.build()
.unwrap()
}
#[test]
fn enqueue_clamps_excess_output_to_capacity() {
let mut core = VllmCore::new(capacity_args());
let r1 = Uuid::from_u128(1);
receive(&mut core, r1, 0..4, 40);
assert!(
core.state().requests.contains_key(&r1),
"r1 fits after clamping and is admitted, not rejected"
);
let completed = drain(&mut core);
assert_eq!(completed, 1, "clamped r1 runs to completion");
assert!(core.state().requests.is_empty(), "queue fully drains");
}
#[test]
fn inactive_cached_prefix_not_discounted_keeps_request_waiting() {
let args = MockEngineArgs::builder()
.engine_type(EngineType::Trtllm)
.block_size(4)
.num_gpu_blocks(8)
.max_num_batched_tokens(Some(64))
.max_num_seqs(Some(8))
.enable_chunked_prefill(true)
.enable_prefix_caching(true)
.speedup_ratio(0.0)
.build()
.unwrap();
let mut core = VllmCore::new(args);
let holder = Uuid::from_u128(1);
let seeder = Uuid::from_u128(2);
let reuser = Uuid::from_u128(3);
receive(&mut core, holder, 0..4, 12);
receive(&mut core, seeder, 100..108, 4);
receive(&mut core, reuser, 100..108, 12);
let mut collector = crate::replay::TraceCollector::default();
let mut now_ms = 0.0;
let mut max_preemptions = 0u64;
let mut checked = false;
for _ in 0..400 {
if core.state().requests.is_empty() {
break;
}
if !checked
&& !core.state().requests.contains_key(&seeder)
&& core.state().requests.contains_key(&holder)
{
assert!(
!core.state().running.contains(&reuser),
"reuser hits the seeder's INACTIVE prefix; un-discounted it needs 5 > 4 free, \
so it must wait while the holder holds capacity"
);
assert_eq!(
core.state().requests.get(&reuser).map(|r| r.status),
Some(RequestStatus::Waiting),
);
checked = true;
}
let pass = core.execute_pass(&mut collector, now_ms);
now_ms = pass.end_ms.max(now_ms + 1.0);
max_preemptions = max_preemptions.max(pass.mocker_metrics.vllm_preemptions_total);
}
assert!(
checked,
"test must observe the seeder-done / holder-running window"
);
assert!(
core.state().requests.is_empty(),
"reuser is admitted once the holder frees capacity; all requests drain"
);
assert_eq!(max_preemptions, 0, "GUARANTEED_NO_EVICT must never preempt");
}