#![allow(clippy::all)]
#![allow(dead_code)]
use crate::runtime::RuntimeState;
use crate::runtime::scheduler::three_lane::{PreemptionMetrics, ThreeLaneScheduler};
use crate::sync::ContendedMutex;
use crate::types::{TaskId, Time};
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct LanePressureConfig {
pub base_tasks_per_lane: usize,
pub scaling_factors: Vec<usize>,
pub lane_mix_ratios: [f64; 3],
pub cancel_streak_limit: usize,
pub max_fairness_deviation: f64,
pub work_duration_ns: u64,
pub seed: u64,
}
impl Default for LanePressureConfig {
fn default() -> Self {
Self {
base_tasks_per_lane: 20,
scaling_factors: vec![1, 2, 4, 8],
lane_mix_ratios: [0.3, 0.0, 0.7], cancel_streak_limit: 16,
max_fairness_deviation: 0.15, work_duration_ns: 1_000_000, seed: 42,
}
}
}
#[derive(Debug, Clone)]
pub struct ScalingTestTask {
pub task_id: TaskId,
pub lane: usize,
pub injection_order: usize,
pub execution_window: Option<Time>,
pub completion_time: Option<Time>,
pub poll_count: u32,
pub was_cancelled: bool,
}
#[derive(Debug, Clone)]
pub struct ScalingTestResults {
pub config: LanePressureConfig,
pub scaling_factor: usize,
pub task_traces: Vec<ScalingTestTask>,
pub preemption_metrics: PreemptionMetrics,
pub total_runtime_ns: u64,
pub scheduler_cycles: u64,
pub lane_dispatch_counts: [u64; 3],
pub max_ready_stall_cycles: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct FairnessCertificate {
pub lane_dispatch_ratios: [f64; 3],
pub max_ready_stall_bound: u64,
pub avg_completion_latency: [f64; 3],
pub p95_completion_latency: [f64; 3],
pub fairness_deviation: f64,
}
impl ScalingTestResults {
pub fn extract_fairness_certificate(&self) -> FairnessCertificate {
let total_dispatches: u64 = self.lane_dispatch_counts.iter().sum();
let lane_dispatch_ratios = if total_dispatches > 0 {
[
self.lane_dispatch_counts[0] as f64 / total_dispatches as f64,
self.lane_dispatch_counts[1] as f64 / total_dispatches as f64,
self.lane_dispatch_counts[2] as f64 / total_dispatches as f64,
]
} else {
[0.0, 0.0, 0.0]
};
let mut lane_latencies: [Vec<f64>; 3] = [vec![], vec![], vec![]];
for task in &self.task_traces {
if let Some(completion_time) = task.completion_time {
if task.lane < 3 {
let injection_time = task.injection_order as f64 * 1000.0;
let latency = completion_time.as_nanos() as f64 - injection_time;
lane_latencies[task.lane].push(latency);
}
}
}
let avg_completion_latency = [
calculate_avg(&lane_latencies[0]),
calculate_avg(&lane_latencies[1]),
calculate_avg(&lane_latencies[2]),
];
let p95_completion_latency = [
calculate_percentile(&lane_latencies[0], 0.95),
calculate_percentile(&lane_latencies[1], 0.95),
calculate_percentile(&lane_latencies[2], 0.95),
];
let fairness_deviation = (0..3)
.map(|i| (lane_dispatch_ratios[i] - self.config.lane_mix_ratios[i]).abs())
.fold(0.0, f64::max);
FairnessCertificate {
lane_dispatch_ratios,
max_ready_stall_bound: self.max_ready_stall_cycles,
avg_completion_latency,
p95_completion_latency,
fairness_deviation,
}
}
}
fn calculate_avg(values: &[f64]) -> f64 {
if values.is_empty() {
0.0
} else {
values.iter().sum::<f64>() / values.len() as f64
}
}
fn calculate_percentile(values: &[f64], percentile: f64) -> f64 {
if values.is_empty() {
return 0.0;
}
let mut sorted_values = values.to_vec();
sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let index = ((values.len() - 1) as f64 * percentile) as usize;
sorted_values[index]
}
pub fn run_pressure_scaling_scenario(
config: &LanePressureConfig,
scaling_factor: usize,
) -> ScalingTestResults {
let state = Arc::new(ContendedMutex::new(
"test_runtime_state",
RuntimeState::new(),
));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(
1, &state,
config.cancel_streak_limit,
);
let mut task_traces = Vec::new();
let mut lane_dispatch_counts = [0u64; 3];
let base_total = config.base_tasks_per_lane;
let scaled_total = base_total * scaling_factor;
let cancel_count = (scaled_total as f64 * config.lane_mix_ratios[0]) as usize;
let timed_count = (scaled_total as f64 * config.lane_mix_ratios[1]) as usize;
let ready_count = (scaled_total as f64 * config.lane_mix_ratios[2]) as usize;
let task_counts = [cancel_count, timed_count, ready_count];
let total_injected: usize = task_counts.iter().sum();
let mut lane_by_task: HashMap<TaskId, usize> = HashMap::with_capacity(total_injected);
let mut injection_order: usize = 0;
for (lane_idx, &count) in task_counts.iter().enumerate() {
for _ in 0..count {
let task_id = TaskId::new_for_test(injection_order as u32, 0);
lane_by_task.insert(task_id, lane_idx);
match lane_idx {
0 => scheduler.inject_cancel(task_id, 100),
1 => {
scheduler.inject_timed(task_id, Time::from_nanos(0));
}
_ => scheduler.inject_ready(task_id, 100),
}
task_traces.push(ScalingTestTask {
task_id,
lane: lane_idx,
injection_order,
execution_window: if lane_idx == 1 {
Some(Time::from_nanos(0))
} else {
None
},
completion_time: None,
poll_count: 0,
was_cancelled: false,
});
injection_order += 1;
}
}
let mut workers = scheduler.take_workers();
let start_time = std::time::Instant::now();
let mut scheduler_cycles: u64 = 0;
let mut max_ready_stall_cycles: u64 = 0;
let mut current_ready_stall: u64 = 0;
let drain_cap = total_injected.saturating_mul(8) + 16;
while scheduler_cycles < drain_cap as u64 {
let mut progressed = false;
for worker in workers.iter_mut() {
if let Some(task_id) = worker.next_task() {
progressed = true;
if let Some(&lane) = lane_by_task.get(&task_id) {
lane_dispatch_counts[lane] += 1;
if lane == 2 {
max_ready_stall_cycles = max_ready_stall_cycles.max(current_ready_stall);
current_ready_stall = 0;
} else {
current_ready_stall = current_ready_stall.saturating_add(1);
}
if let Some(trace) = task_traces.iter_mut().find(|t| t.task_id == task_id) {
trace.poll_count = trace.poll_count.saturating_add(1);
trace.completion_time = Some(Time::from_nanos(
start_time.elapsed().as_nanos() as u64
+ trace.injection_order as u64 * 1000,
));
}
}
}
}
scheduler_cycles += 1;
if !progressed {
break;
}
}
max_ready_stall_cycles = max_ready_stall_cycles.max(current_ready_stall);
let total_runtime_ns = start_time.elapsed().as_nanos() as u64;
let preemption_metrics = PreemptionMetrics {
cancel_dispatches: lane_dispatch_counts[0],
timed_dispatches: lane_dispatch_counts[1],
ready_dispatches: lane_dispatch_counts[2],
..Default::default()
};
ScalingTestResults {
config: config.clone(),
scaling_factor,
task_traces,
preemption_metrics,
total_runtime_ns,
scheduler_cycles,
lane_dispatch_counts,
max_ready_stall_cycles,
}
}
pub fn verify_proportional_pressure_scaling_invariance(
config: &LanePressureConfig,
) -> Result<(), String> {
let mut baseline_certificate: Option<FairnessCertificate> = None;
let mut certificates = Vec::new();
for &scaling_factor in &config.scaling_factors {
let results = run_pressure_scaling_scenario(config, scaling_factor);
let certificate = results.extract_fairness_certificate();
if baseline_certificate.is_none() {
baseline_certificate = Some(certificate.clone());
}
certificates.push((scaling_factor, certificate));
}
let _baseline = baseline_certificate.unwrap();
let stall_invariant_bound: u64 = (config.cancel_streak_limit as u64).saturating_add(1);
for (scaling_factor, certificate) in &certificates {
if certificate.fairness_deviation > config.max_fairness_deviation {
return Err(format!(
"Fairness deviation {} exceeds limit {} at scale factor {}",
certificate.fairness_deviation, config.max_fairness_deviation, scaling_factor
));
}
if certificate.max_ready_stall_bound > stall_invariant_bound {
return Err(format!(
"Ready stall bound {} exceeds cancel_streak_limit+1 ({}) at scale factor {}",
certificate.max_ready_stall_bound, stall_invariant_bound, scaling_factor
));
}
}
Ok(())
}
pub fn verify_mix_ratio_preservation(config: &LanePressureConfig) -> Result<(), String> {
for &scaling_factor in &config.scaling_factors {
let results = run_pressure_scaling_scenario(config, scaling_factor);
let certificate = results.extract_fairness_certificate();
for i in 0..3 {
let expected = config.lane_mix_ratios[i];
let observed = certificate.lane_dispatch_ratios[i];
let deviation = (observed - expected).abs();
if deviation > config.max_fairness_deviation {
return Err(format!(
"Lane {} dispatch ratio {} deviates from expected {} by {} at scale factor {}",
i, observed, expected, deviation, scaling_factor
));
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
#[test]
fn test_proportional_pressure_scaling_invariance() {
let config = LanePressureConfig::default();
match verify_proportional_pressure_scaling_invariance(&config) {
Ok(()) => {
}
Err(e) => {
panic!("Proportional pressure scaling invariance violated: {}", e);
}
}
}
#[test]
fn test_mix_ratio_preservation() {
let config = LanePressureConfig::default();
match verify_mix_ratio_preservation(&config) {
Ok(()) => {
}
Err(e) => {
panic!("Mix ratio preservation violated: {}", e);
}
}
}
#[test]
fn test_small_scale_factors() {
let mut config = LanePressureConfig::default();
config.scaling_factors = vec![1, 2]; config.base_tasks_per_lane = 10;
let results = run_pressure_scaling_scenario(&config, 2);
let certificate = results.extract_fairness_certificate();
assert!(certificate.fairness_deviation <= config.max_fairness_deviation);
assert!(certificate.max_ready_stall_bound <= config.cancel_streak_limit as u64 * 2);
}
#[test]
fn test_fairness_certificate_extraction() {
let config = LanePressureConfig::default();
let results = run_pressure_scaling_scenario(&config, 1);
let certificate = results.extract_fairness_certificate();
assert!(
certificate
.lane_dispatch_ratios
.iter()
.all(|&ratio| ratio >= 0.0 && ratio <= 1.0)
);
assert!(certificate.fairness_deviation >= 0.0);
assert!(certificate.max_ready_stall_bound > 0);
}
}