pub mod arena;
pub mod bpf_map;
pub mod bpf_prog;
pub mod bpf_syscall;
pub mod btf_offsets;
pub mod btf_render;
pub mod debug_capture;
pub mod dmesg_scx;
pub mod dump;
pub mod guest;
pub mod idr;
pub mod live_host_kernel;
pub mod perf_counters;
pub mod reader;
pub mod reproducer_gen;
pub mod runnable_scan;
pub mod scx_walker;
pub mod sdt_alloc;
pub mod symbols;
pub mod task_enrichment;
pub mod timeline;
#[cfg(test)]
mod tests;
#[cfg(test)]
pub(crate) mod test_util;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct Cr3Pa(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct PageOffset(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub(crate) struct Kva(pub u64);
impl std::fmt::Display for Kva {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:#x}", self.0)
}
}
pub const DSQ_PLAUSIBILITY_CEILING: u32 = 10_000;
pub(crate) fn counter_delta(last: i64, first: i64) -> i64 {
(last - first).max(0)
}
const PREEMPTION_TICK_MULTIPLE: u64 = 10;
const DEFAULT_HZ: u64 = 250;
pub(crate) fn vcpu_preemption_threshold_ns(kernel_path: Option<&std::path::Path>) -> u64 {
let hz = guest_kernel_hz(kernel_path);
let tick_ns = 1_000_000_000u64 / hz;
tick_ns * PREEMPTION_TICK_MULTIPLE
}
pub(crate) fn guest_kernel_hz(kernel_path: Option<&std::path::Path>) -> u64 {
if let Some(kp) = kernel_path {
if let Some(vmlinux) = find_vmlinux(kp)
&& let Some(hz) = read_hz_from_ikconfig(&vmlinux)
{
return hz;
}
if let Some(hz) = read_hz_from_kernel_dir(kp) {
return hz;
}
tracing::warn!(
kernel = %kp.display(),
default_hz = DEFAULT_HZ,
"guest_kernel_hz: no IKCONFIG or .config alongside \
kernel; falling back to DEFAULT_HZ rather than host \
/boot/config (tick-dependent thresholds may be \
conservative)"
);
return DEFAULT_HZ;
}
if let Some(hz) = read_hz_from_boot_config() {
return hz;
}
DEFAULT_HZ
}
use crate::vmm::find_vmlinux;
const IKCONFIG_MAGIC: &[u8] = b"IKCFG_ST";
pub(crate) const VMLINUX_KEEP_SECTIONS: &[&[u8]] = &[
b".rodata", ];
fn read_hz_from_ikconfig(vmlinux_path: &std::path::Path) -> Option<u64> {
let data = std::fs::read(vmlinux_path).ok()?;
let pos = memchr::memmem::find(&data, IKCONFIG_MAGIC)?;
let gz_start = pos + IKCONFIG_MAGIC.len();
if gz_start >= data.len() {
return None;
}
let cursor = std::io::Cursor::new(&data[gz_start..]);
let mut decoder = flate2::read::GzDecoder::new(cursor);
let mut config = String::new();
std::io::Read::read_to_string(&mut decoder, &mut config).ok()?;
parse_config_hz(&config)
}
fn read_hz_from_kernel_dir(kernel_path: &std::path::Path) -> Option<u64> {
let mut dir = kernel_path.parent()?;
for _ in 0..4 {
let config = dir.join(".config");
if config.exists() {
let contents = std::fs::read_to_string(&config).ok()?;
return parse_config_hz(&contents);
}
dir = dir.parent()?;
}
None
}
fn read_hz_from_boot_config() -> Option<u64> {
let uname = rustix::system::uname();
let release = uname.release().to_str().ok()?;
let path = format!("/boot/config-{release}");
let contents = std::fs::read_to_string(path).ok()?;
parse_config_hz(&contents)
}
fn parse_config_hz(config: &str) -> Option<u64> {
for line in config.lines() {
let line = line.trim();
if let Some(val) = line.strip_prefix("CONFIG_HZ=") {
return val.parse().ok();
}
}
None
}
pub fn sample_looks_valid(sample: &MonitorSample) -> bool {
sample
.cpus
.iter()
.all(|cpu| cpu.local_dsq_depth <= DSQ_PLAUSIBILITY_CEILING)
}
#[cfg(test)]
pub fn find_test_vmlinux() -> Option<std::path::PathBuf> {
use crate::kernel_path::KernelId;
let raw = crate::ktstr_kernel_env();
let resolved_dir: Option<String> = match raw.as_deref().map(KernelId::parse) {
Some(KernelId::Path(p)) => p.into_os_string().into_string().ok(),
Some(id @ (KernelId::Version(_) | KernelId::CacheKey(_))) => {
crate::cli::resolve_cached_kernel(&id, "ktstr test")
.ok()
.and_then(|p| p.into_os_string().into_string().ok())
}
Some(KernelId::Range { .. }) | Some(KernelId::Git { .. }) => None,
None => None,
};
let result = crate::kernel_path::resolve_btf(resolved_dir.as_deref());
if result.is_none() {
crate::report::test_skip(format!("no vmlinux found; {}", crate::KTSTR_KERNEL_HINT));
}
result
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct MonitorReport {
pub samples: Vec<MonitorSample>,
pub summary: MonitorSummary,
pub preemption_threshold_ns: u64,
#[doc(hidden)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub watchdog_observation: Option<WatchdogObservation>,
#[doc(hidden)]
#[serde(default)]
pub page_offset: u64,
}
#[doc(hidden)]
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct WatchdogObservation {
pub expected_jiffies: u64,
pub observed_jiffies: u64,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct SustainedViolationTracker {
consecutive: usize,
worst_run: usize,
worst_value: f64,
worst_at: usize,
}
impl SustainedViolationTracker {
pub(crate) fn record(&mut self, violated: bool, value: f64, at: usize) {
if violated {
self.consecutive += 1;
if self.consecutive > self.worst_run {
self.worst_run = self.consecutive;
self.worst_value = value;
self.worst_at = at;
}
} else {
self.consecutive = 0;
}
}
pub(crate) fn sustained(&self, threshold: usize) -> bool {
self.worst_run >= threshold
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct MonitorSample {
pub elapsed_ms: u64,
pub cpus: Vec<CpuSnapshot>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub prog_stats: Option<Vec<bpf_prog::ProgRuntimeStats>>,
}
impl MonitorSample {
pub fn new(elapsed_ms: u64, cpus: Vec<CpuSnapshot>) -> Self {
Self {
elapsed_ms,
cpus,
prog_stats: None,
}
}
pub fn imbalance_ratio(&self) -> f64 {
if self.cpus.is_empty() {
return 1.0;
}
let mut min_nr = u32::MAX;
let mut max_nr = 0u32;
for cpu in &self.cpus {
min_nr = min_nr.min(cpu.nr_running);
max_nr = max_nr.max(cpu.nr_running);
}
max_nr as f64 / min_nr.max(1) as f64
}
pub fn sum_event_field(&self, f: fn(&ScxEventCounters) -> i64) -> Option<i64> {
let mut total = 0i64;
let mut any = false;
for cpu in &self.cpus {
if let Some(ev) = &cpu.event_counters {
total += f(ev);
any = true;
}
}
any.then_some(total)
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct CpuSnapshot {
pub nr_running: u32,
pub scx_nr_running: u32,
pub local_dsq_depth: u32,
pub rq_clock: u64,
pub scx_flags: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_counters: Option<ScxEventCounters>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub schedstat: Option<RqSchedstat>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub vcpu_cpu_time_ns: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub vcpu_perf: Option<perf_counters::VcpuPerfSample>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sched_domains: Option<Vec<SchedDomainSnapshot>>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct RqSchedstat {
pub run_delay: u64,
pub pcount: u64,
pub yld_count: u32,
pub sched_count: u32,
pub sched_goidle: u32,
pub ttwu_count: u32,
pub ttwu_local: u32,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct SchedDomainSnapshot {
pub level: i32,
pub name: String,
pub flags: i32,
pub span_weight: u32,
pub balance_interval: u32,
pub nr_balance_failed: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub newidle_call: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub newidle_success: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub newidle_ratio: Option<u32>,
pub max_newidle_lb_cost: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub stats: Option<SchedDomainStats>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct SchedDomainStats {
pub lb_count: [u32; btf_offsets::CPU_MAX_IDLE_TYPES],
pub lb_failed: [u32; btf_offsets::CPU_MAX_IDLE_TYPES],
pub lb_balanced: [u32; btf_offsets::CPU_MAX_IDLE_TYPES],
pub lb_imbalance_load: [u32; btf_offsets::CPU_MAX_IDLE_TYPES],
pub lb_imbalance_util: [u32; btf_offsets::CPU_MAX_IDLE_TYPES],
pub lb_imbalance_task: [u32; btf_offsets::CPU_MAX_IDLE_TYPES],
pub lb_imbalance_misfit: [u32; btf_offsets::CPU_MAX_IDLE_TYPES],
pub lb_gained: [u32; btf_offsets::CPU_MAX_IDLE_TYPES],
pub lb_hot_gained: [u32; btf_offsets::CPU_MAX_IDLE_TYPES],
pub lb_nobusyg: [u32; btf_offsets::CPU_MAX_IDLE_TYPES],
pub lb_nobusyq: [u32; btf_offsets::CPU_MAX_IDLE_TYPES],
pub alb_count: u32,
pub alb_failed: u32,
pub alb_pushed: u32,
pub sbe_count: u32,
pub sbe_balanced: u32,
pub sbe_pushed: u32,
pub sbf_count: u32,
pub sbf_balanced: u32,
pub sbf_pushed: u32,
pub ttwu_wake_remote: u32,
pub ttwu_move_affine: u32,
pub ttwu_move_balance: u32,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct ScxEventCounters {
pub select_cpu_fallback: i64,
pub dispatch_local_dsq_offline: i64,
pub dispatch_keep_last: i64,
pub enq_skip_exiting: i64,
pub enq_skip_migration_disabled: i64,
pub reenq_immed: i64,
pub reenq_local_repeat: i64,
pub refill_slice_dfl: i64,
pub bypass_duration: i64,
pub bypass_dispatch: i64,
pub bypass_activate: i64,
pub insert_not_owned: i64,
pub sub_bypass_dispatch: i64,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct MonitorSummary {
pub total_samples: usize,
pub max_imbalance_ratio: f64,
pub max_local_dsq_depth: u32,
pub stall_detected: bool,
pub avg_imbalance_ratio: f64,
pub avg_nr_running: f64,
pub avg_local_dsq_depth: f64,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_deltas: Option<ScxEventDeltas>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub schedstat_deltas: Option<SchedstatDeltas>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub prog_stats_deltas: Option<Vec<ProgStatsDelta>>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct ProgStatsDelta {
pub name: String,
pub cnt: u64,
pub nsecs: u64,
pub nsecs_per_call: f64,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct SchedstatDeltas {
pub total_run_delay: u64,
pub run_delay_rate: f64,
pub total_pcount: u64,
pub total_sched_count: u64,
pub sched_count_rate: f64,
pub total_yld_count: u64,
pub total_sched_goidle: u64,
pub total_ttwu_count: u64,
pub total_ttwu_local: u64,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct ScxEventDeltas {
pub total_fallback: i64,
pub fallback_rate: f64,
pub max_fallback_burst: i64,
pub total_dispatch_offline: i64,
pub total_dispatch_keep_last: i64,
pub keep_last_rate: f64,
pub total_enq_skip_exiting: i64,
pub total_enq_skip_migration_disabled: i64,
pub total_reenq_immed: i64,
pub total_reenq_local_repeat: i64,
pub total_refill_slice_dfl: i64,
pub total_bypass_duration: i64,
pub total_bypass_dispatch: i64,
pub total_bypass_activate: i64,
pub total_insert_not_owned: i64,
pub total_sub_bypass_dispatch: i64,
}
impl MonitorSummary {
pub fn from_samples(samples: &[MonitorSample]) -> Self {
Self::from_samples_with_threshold(samples, 0)
}
pub fn from_samples_with_threshold(
samples: &[MonitorSample],
preemption_threshold_ns: u64,
) -> Self {
if samples.is_empty() {
return Self::default();
}
let mut max_imbalance_ratio: f64 = 1.0;
let mut max_local_dsq_depth: u32 = 0;
let mut sum_imbalance_ratio: f64 = 0.0;
let mut sum_nr_running: f64 = 0.0;
let mut sum_local_dsq_depth: f64 = 0.0;
let mut valid_sample_count: usize = 0;
let mut total_cpu_readings: usize = 0;
for sample in samples {
if sample.cpus.is_empty() || !sample_looks_valid(sample) {
continue;
}
valid_sample_count += 1;
for cpu in &sample.cpus {
max_local_dsq_depth = max_local_dsq_depth.max(cpu.local_dsq_depth);
sum_nr_running += cpu.nr_running as f64;
sum_local_dsq_depth += cpu.local_dsq_depth as f64;
total_cpu_readings += 1;
}
let ratio = sample.imbalance_ratio();
sum_imbalance_ratio += ratio;
if ratio > max_imbalance_ratio {
max_imbalance_ratio = ratio;
}
}
let avg_imbalance_ratio = if valid_sample_count > 0 {
sum_imbalance_ratio / valid_sample_count as f64
} else {
0.0
};
let avg_nr_running = if total_cpu_readings > 0 {
sum_nr_running / total_cpu_readings as f64
} else {
0.0
};
let avg_local_dsq_depth = if total_cpu_readings > 0 {
sum_local_dsq_depth / total_cpu_readings as f64
} else {
0.0
};
let threshold = if preemption_threshold_ns > 0 {
preemption_threshold_ns
} else {
vcpu_preemption_threshold_ns(None)
};
let mut stall_detected = false;
let valid_samples: Vec<&MonitorSample> = samples
.iter()
.filter(|s| !s.cpus.is_empty() && sample_looks_valid(s))
.collect();
for w in valid_samples.windows(2) {
let prev = w[0];
let curr = w[1];
let cpu_count = prev.cpus.len().min(curr.cpus.len());
for cpu in 0..cpu_count {
if reader::is_cpu_stalled(&prev.cpus[cpu], &curr.cpus[cpu], threshold) {
stall_detected = true;
break;
}
}
if stall_detected {
break;
}
}
let event_deltas = Self::compute_event_deltas(samples);
let schedstat_deltas = Self::compute_schedstat_deltas(samples);
let prog_stats_deltas = Self::compute_prog_stats_deltas(samples);
Self {
total_samples: samples.len(),
max_imbalance_ratio,
max_local_dsq_depth,
stall_detected,
avg_imbalance_ratio,
avg_nr_running,
avg_local_dsq_depth,
event_deltas,
schedstat_deltas,
prog_stats_deltas,
}
}
fn compute_event_deltas(samples: &[MonitorSample]) -> Option<ScxEventDeltas> {
let has_events = |s: &MonitorSample| s.cpus.iter().any(|c| c.event_counters.is_some());
let first = samples.iter().find(|s| has_events(s))?;
let last = samples.iter().rev().find(|s| has_events(s))?;
let total_fallback = counter_delta(
last.sum_event_field(|e| e.select_cpu_fallback).unwrap_or(0),
first
.sum_event_field(|e| e.select_cpu_fallback)
.unwrap_or(0),
);
let total_keep_last = counter_delta(
last.sum_event_field(|e| e.dispatch_keep_last).unwrap_or(0),
first.sum_event_field(|e| e.dispatch_keep_last).unwrap_or(0),
);
let duration_ms = last.elapsed_ms.saturating_sub(first.elapsed_ms);
let duration_secs = duration_ms as f64 / 1000.0;
let fallback_rate = if duration_secs > 0.0 {
total_fallback as f64 / duration_secs
} else {
0.0
};
let keep_last_rate = if duration_secs > 0.0 {
total_keep_last as f64 / duration_secs
} else {
0.0
};
let mut max_fallback_burst: i64 = 0;
for w in samples.windows(2) {
let prev_sum = w[0].sum_event_field(|e| e.select_cpu_fallback).unwrap_or(0);
let curr_sum = w[1].sum_event_field(|e| e.select_cpu_fallback).unwrap_or(0);
let delta = counter_delta(curr_sum, prev_sum);
if delta > max_fallback_burst {
max_fallback_burst = delta;
}
}
let delta = |f: fn(&ScxEventCounters) -> i64| -> i64 {
counter_delta(
last.sum_event_field(f).unwrap_or(0),
first.sum_event_field(f).unwrap_or(0),
)
};
Some(ScxEventDeltas {
total_fallback,
fallback_rate,
max_fallback_burst,
total_dispatch_offline: delta(|e| e.dispatch_local_dsq_offline),
total_dispatch_keep_last: total_keep_last,
keep_last_rate,
total_enq_skip_exiting: delta(|e| e.enq_skip_exiting),
total_enq_skip_migration_disabled: delta(|e| e.enq_skip_migration_disabled),
total_reenq_immed: delta(|e| e.reenq_immed),
total_reenq_local_repeat: delta(|e| e.reenq_local_repeat),
total_refill_slice_dfl: delta(|e| e.refill_slice_dfl),
total_bypass_duration: delta(|e| e.bypass_duration),
total_bypass_dispatch: delta(|e| e.bypass_dispatch),
total_bypass_activate: delta(|e| e.bypass_activate),
total_insert_not_owned: delta(|e| e.insert_not_owned),
total_sub_bypass_dispatch: delta(|e| e.sub_bypass_dispatch),
})
}
fn compute_schedstat_deltas(samples: &[MonitorSample]) -> Option<SchedstatDeltas> {
let has_schedstat = |s: &MonitorSample| s.cpus.iter().any(|c| c.schedstat.is_some());
let first = samples.iter().find(|s| has_schedstat(s))?;
let last = samples.iter().rev().find(|s| has_schedstat(s))?;
let sum_field = |s: &MonitorSample, f: fn(&RqSchedstat) -> u64| -> u64 {
s.cpus
.iter()
.filter_map(|c| c.schedstat.as_ref().map(&f))
.sum()
};
let sum_field_u32 = |s: &MonitorSample, f: fn(&RqSchedstat) -> u32| -> u64 {
s.cpus
.iter()
.filter_map(|c| c.schedstat.as_ref().map(|ss| f(ss) as u64))
.sum()
};
let total_run_delay =
sum_field(last, |ss| ss.run_delay).saturating_sub(sum_field(first, |ss| ss.run_delay));
let total_pcount =
sum_field(last, |ss| ss.pcount).saturating_sub(sum_field(first, |ss| ss.pcount));
let total_sched_count = sum_field_u32(last, |ss| ss.sched_count)
.saturating_sub(sum_field_u32(first, |ss| ss.sched_count));
let total_yld_count = sum_field_u32(last, |ss| ss.yld_count)
.saturating_sub(sum_field_u32(first, |ss| ss.yld_count));
let total_sched_goidle = sum_field_u32(last, |ss| ss.sched_goidle)
.saturating_sub(sum_field_u32(first, |ss| ss.sched_goidle));
let total_ttwu_count = sum_field_u32(last, |ss| ss.ttwu_count)
.saturating_sub(sum_field_u32(first, |ss| ss.ttwu_count));
let total_ttwu_local = sum_field_u32(last, |ss| ss.ttwu_local)
.saturating_sub(sum_field_u32(first, |ss| ss.ttwu_local));
let duration_ms = last.elapsed_ms.saturating_sub(first.elapsed_ms);
let duration_secs = duration_ms as f64 / 1000.0;
let run_delay_rate = if duration_secs > 0.0 {
total_run_delay as f64 / duration_secs
} else {
0.0
};
let sched_count_rate = if duration_secs > 0.0 {
total_sched_count as f64 / duration_secs
} else {
0.0
};
Some(SchedstatDeltas {
total_run_delay,
run_delay_rate,
total_pcount,
total_sched_count,
sched_count_rate,
total_yld_count,
total_sched_goidle,
total_ttwu_count,
total_ttwu_local,
})
}
fn compute_prog_stats_deltas(samples: &[MonitorSample]) -> Option<Vec<ProgStatsDelta>> {
let first = samples.iter().find(|s| s.prog_stats.is_some())?;
let last = samples.iter().rev().find(|s| s.prog_stats.is_some())?;
let first_progs = first.prog_stats.as_ref()?;
let last_progs = last.prog_stats.as_ref()?;
let first_by_name: std::collections::HashMap<&str, &bpf_prog::ProgRuntimeStats> =
first_progs.iter().map(|p| (p.name.as_str(), p)).collect();
let deltas: Vec<ProgStatsDelta> = last_progs
.iter()
.map(|lp| {
let fp = first_by_name.get(lp.name.as_str()).copied();
let cnt = lp.cnt.saturating_sub(fp.map_or(0, |p| p.cnt));
let nsecs = lp.nsecs.saturating_sub(fp.map_or(0, |p| p.nsecs));
let nsecs_per_call = if cnt > 0 {
nsecs as f64 / cnt as f64
} else {
0.0
};
ProgStatsDelta {
name: lp.name.clone(),
cnt,
nsecs,
nsecs_per_call,
}
})
.collect();
if deltas.is_empty() {
None
} else {
Some(deltas)
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct MonitorThresholds {
pub max_imbalance_ratio: f64,
pub max_local_dsq_depth: u32,
pub fail_on_stall: bool,
pub sustained_samples: usize,
pub max_fallback_rate: f64,
pub max_keep_last_rate: f64,
}
impl MonitorThresholds {
pub const DEFAULT: MonitorThresholds = MonitorThresholds {
max_imbalance_ratio: 4.0,
max_local_dsq_depth: 50,
fail_on_stall: true,
sustained_samples: 5,
max_fallback_rate: 200.0,
max_keep_last_rate: 100.0,
};
}
impl Default for MonitorThresholds {
fn default() -> Self {
Self::DEFAULT
}
}
#[derive(Debug, Clone)]
pub struct MonitorVerdict {
pub passed: bool,
pub details: Vec<String>,
pub summary: String,
}
impl MonitorThresholds {
pub fn evaluate(&self, report: &MonitorReport) -> MonitorVerdict {
let mut details = Vec::new();
if report.samples.is_empty() {
return MonitorVerdict {
passed: true,
details: vec![],
summary: "no monitor samples".into(),
};
}
if !Self::data_looks_valid(&report.samples) {
return MonitorVerdict {
passed: true,
details: vec![],
summary: "monitor data not yet initialized".into(),
};
}
let mut imbalance = SustainedViolationTracker::default();
let mut dsq = SustainedViolationTracker::default();
let mut worst_dsq_cpu = 0usize;
for (i, sample) in report.samples.iter().enumerate() {
if sample.cpus.is_empty() {
imbalance.record(false, 0.0, i);
dsq.record(false, 0.0, i);
continue;
}
let ratio = sample.imbalance_ratio();
imbalance.record(ratio > self.max_imbalance_ratio, ratio, i);
let mut dsq_violated = false;
let mut sample_worst_depth = 0u32;
let mut sample_worst_cpu = 0usize;
for (cpu_idx, cpu) in sample.cpus.iter().enumerate() {
if cpu.local_dsq_depth > self.max_local_dsq_depth
&& cpu.local_dsq_depth > sample_worst_depth
{
dsq_violated = true;
sample_worst_depth = cpu.local_dsq_depth;
sample_worst_cpu = cpu_idx;
}
}
dsq.record(dsq_violated, sample_worst_depth as f64, i);
if dsq_violated && dsq.worst_value == sample_worst_depth as f64 {
worst_dsq_cpu = sample_worst_cpu;
}
}
let mut failed = false;
if imbalance.sustained(self.sustained_samples) {
failed = true;
details.push(format!(
"imbalance ratio {:.1} exceeded threshold {:.1} for {} consecutive samples (ending at sample {})",
imbalance.worst_value,
self.max_imbalance_ratio,
imbalance.worst_run,
imbalance.worst_at,
));
}
if dsq.sustained(self.sustained_samples) {
failed = true;
details.push(format!(
"local DSQ depth {} on cpu{} exceeded threshold {} for {} consecutive samples (ending at sample {})",
dsq.worst_value as u32,
worst_dsq_cpu,
self.max_local_dsq_depth,
dsq.worst_run,
dsq.worst_at,
));
}
if self.fail_on_stall {
let threshold = if report.preemption_threshold_ns > 0 {
report.preemption_threshold_ns
} else {
vcpu_preemption_threshold_ns(None)
};
let num_cpus = report
.samples
.iter()
.map(|s| s.cpus.len())
.max()
.unwrap_or(0);
let mut stall: Vec<SustainedViolationTracker> =
vec![SustainedViolationTracker::default(); num_cpus];
for i in 1..report.samples.len() {
let prev = &report.samples[i - 1];
let curr = &report.samples[i];
let cpu_count = prev.cpus.len().min(curr.cpus.len());
#[allow(clippy::needless_range_loop)]
for cpu in 0..cpu_count {
let is_stall =
reader::is_cpu_stalled(&prev.cpus[cpu], &curr.cpus[cpu], threshold);
stall[cpu].record(is_stall, curr.cpus[cpu].rq_clock as f64, i);
}
}
#[allow(clippy::needless_range_loop)] for cpu in 0..num_cpus {
if stall[cpu].sustained(self.sustained_samples) {
failed = true;
details.push(format!(
"rq_clock stall on cpu{} for {} consecutive samples (ending at sample {}, clock={})",
cpu,
stall[cpu].worst_run,
stall[cpu].worst_at,
stall[cpu].worst_value as u64,
));
}
}
}
let mut fallback_rate = SustainedViolationTracker::default();
let mut keep_last_rate = SustainedViolationTracker::default();
for i in 1..report.samples.len() {
let prev = &report.samples[i - 1];
let curr = &report.samples[i];
let interval_s = curr.elapsed_ms.saturating_sub(prev.elapsed_ms) as f64 / 1000.0;
if interval_s <= 0.0 {
fallback_rate.record(false, 0.0, i);
keep_last_rate.record(false, 0.0, i);
continue;
}
if let (Some(prev_fb), Some(curr_fb)) = (
prev.sum_event_field(|e| e.select_cpu_fallback),
curr.sum_event_field(|e| e.select_cpu_fallback),
) {
let rate = (curr_fb - prev_fb) as f64 / interval_s;
fallback_rate.record(rate > self.max_fallback_rate, rate, i);
} else {
fallback_rate.record(false, 0.0, i);
}
if let (Some(prev_kl), Some(curr_kl)) = (
prev.sum_event_field(|e| e.dispatch_keep_last),
curr.sum_event_field(|e| e.dispatch_keep_last),
) {
let rate = (curr_kl - prev_kl) as f64 / interval_s;
keep_last_rate.record(rate > self.max_keep_last_rate, rate, i);
} else {
keep_last_rate.record(false, 0.0, i);
}
}
if fallback_rate.sustained(self.sustained_samples) {
failed = true;
details.push(format!(
"fallback rate {:.1}/s exceeded threshold {:.1}/s for {} consecutive intervals (ending at sample {})",
fallback_rate.worst_value,
self.max_fallback_rate,
fallback_rate.worst_run,
fallback_rate.worst_at,
));
}
if keep_last_rate.sustained(self.sustained_samples) {
failed = true;
details.push(format!(
"keep_last rate {:.1}/s exceeded threshold {:.1}/s for {} consecutive intervals (ending at sample {})",
keep_last_rate.worst_value,
self.max_keep_last_rate,
keep_last_rate.worst_run,
keep_last_rate.worst_at,
));
}
let summary = if failed {
format!("monitor FAILED: {} violation(s)", details.len())
} else {
"monitor OK".into()
};
MonitorVerdict {
passed: !failed,
details,
summary,
}
}
fn data_looks_valid(samples: &[MonitorSample]) -> bool {
let mut first_clock: Option<u64> = None;
let mut all_clocks_same = true;
for sample in samples {
if !sample_looks_valid(sample) {
return false;
}
for cpu in &sample.cpus {
match first_clock {
None => first_clock = Some(cpu.rq_clock),
Some(fc) => {
if cpu.rq_clock != fc {
all_clocks_same = false;
}
}
}
}
}
if first_clock.is_some() && all_clocks_same {
let total_readings: usize = samples.iter().map(|s| s.cpus.len()).sum();
if total_readings > 1 {
return false;
}
}
true
}
}