use serde::Serialize;
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
const DEFAULT_MAX_RELATIONSHIPS: usize = 10_000;
const DEFAULT_MIN_WAKEUP_COUNT: u32 = 10;
const MAX_LATENCY_SAMPLES: usize = 100;
pub struct WakerWakeeAnalyzer {
relationships: HashMap<(u32, u32), RelationshipStats>,
pending_wakeups: HashMap<u32, PendingWakeup>,
topology: Option<Arc<scx_utils::Topology>>,
max_relationships: usize,
min_wakeup_count: u32,
enabled: bool,
}
#[derive(Clone, Debug, Serialize)]
pub struct RelationshipStats {
pub waker_pid: u32,
pub waker_comm: String,
pub wakee_pid: u32,
pub wakee_comm: String,
pub wakeup_count: u64,
pub first_seen_ns: u64,
pub last_seen_ns: u64,
#[serde(skip)]
latency_samples: Vec<u64>,
pub min_latency_us: u64,
pub max_latency_us: u64,
pub total_latency_us: u64,
pub sample_count: u64,
pub same_cpu_count: u64,
pub cross_cpu_count: u64,
#[serde(skip)]
pub cpu_pairs: HashMap<(u32, u32), u64>,
pub same_llc_count: u64,
pub cross_llc_count: u64,
pub same_node_count: u64,
pub cross_node_count: u64,
}
#[derive(Clone)]
struct PendingWakeup {
waker_pid: u32,
waker_comm: String,
waker_cpu: u32,
timestamp_ns: u64,
}
#[derive(Clone, Debug, Serialize)]
pub struct RelationshipsByPid {
pub pid: u32,
pub as_waker: Vec<RelationshipStats>,
pub as_wakee: Vec<RelationshipStats>,
}
#[derive(Clone, Debug, Serialize)]
pub struct BidirectionalRelationship {
pub pid_a: u32,
pub comm_a: String,
pub pid_b: u32,
pub comm_b: String,
pub a_wakes_b_count: u64,
pub a_wakes_b_avg_latency_us: u64,
pub b_wakes_a_count: u64,
pub b_wakes_a_avg_latency_us: u64,
pub pattern_description: String,
}
impl WakerWakeeAnalyzer {
pub fn new() -> Self {
Self {
relationships: HashMap::new(),
pending_wakeups: HashMap::new(),
topology: None,
max_relationships: DEFAULT_MAX_RELATIONSHIPS,
min_wakeup_count: DEFAULT_MIN_WAKEUP_COUNT,
enabled: false,
}
}
pub fn with_limits(max_relationships: usize, min_wakeup_count: u32) -> Self {
Self {
relationships: HashMap::new(),
pending_wakeups: HashMap::new(),
topology: None,
max_relationships,
min_wakeup_count,
enabled: false,
}
}
pub fn set_topology(&mut self, topology: Arc<scx_utils::Topology>) {
self.topology = Some(topology);
}
pub fn start(&mut self) {
self.enabled = true;
}
pub fn stop(&mut self) {
self.enabled = false;
}
pub fn is_enabled(&self) -> bool {
self.enabled
}
pub fn reset(&mut self) {
self.relationships.clear();
self.pending_wakeups.clear();
}
pub fn record_wakeup(
&mut self,
wakee_pid: u32,
waker_pid: u32,
waker_comm: &str,
waker_cpu: u32,
timestamp_ns: u64,
) {
if !self.enabled {
return;
}
self.pending_wakeups.insert(
wakee_pid,
PendingWakeup {
waker_pid,
waker_comm: waker_comm.to_string(),
waker_cpu,
timestamp_ns,
},
);
}
pub fn record_wakee_run(
&mut self,
wakee_pid: u32,
wakee_comm: &str,
wakee_cpu: u32,
timestamp_ns: u64,
) {
if !self.enabled {
return;
}
if let Some(wakeup) = self.pending_wakeups.remove(&wakee_pid) {
let latency_ns = timestamp_ns.saturating_sub(wakeup.timestamp_ns);
let latency_us = latency_ns / 1000;
let key = (wakeup.waker_pid, wakee_pid);
let stats = self.relationships.entry(key).or_insert_with(|| {
RelationshipStats::new(
wakeup.waker_pid,
wakeup.waker_comm.clone(),
wakee_pid,
wakee_comm.to_string(),
timestamp_ns,
)
});
stats.wakeup_count += 1;
stats.last_seen_ns = timestamp_ns;
stats.record_latency(latency_us);
let same_cpu = wakeup.waker_cpu == wakee_cpu;
if same_cpu {
stats.same_cpu_count += 1;
} else {
stats.cross_cpu_count += 1;
}
*stats
.cpu_pairs
.entry((wakeup.waker_cpu, wakee_cpu))
.or_insert(0) += 1;
if let Some(ref topo) = self.topology {
if let (Some(waker_cpu_info), Some(wakee_cpu_info)) = (
topo.all_cpus.get(&(wakeup.waker_cpu as usize)),
topo.all_cpus.get(&(wakee_cpu as usize)),
) {
if waker_cpu_info.llc_id == wakee_cpu_info.llc_id {
stats.same_llc_count += 1;
} else {
stats.cross_llc_count += 1;
}
if waker_cpu_info.node_id == wakee_cpu_info.node_id {
stats.same_node_count += 1;
} else {
stats.cross_node_count += 1;
}
}
}
self.enforce_relationship_limit();
}
}
fn enforce_relationship_limit(&mut self) {
if self.relationships.len() <= self.max_relationships {
return;
}
self.relationships
.retain(|_, stats| stats.wakeup_count >= self.min_wakeup_count.into());
if self.relationships.len() > self.max_relationships {
let mut items: Vec<_> = self
.relationships
.iter()
.map(|(k, v)| (*k, v.wakeup_count, v.last_seen_ns))
.collect();
items.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| b.2.cmp(&a.2)));
let to_keep: HashSet<_> = items
.iter()
.take(self.max_relationships)
.map(|(k, _, _)| *k)
.collect();
self.relationships.retain(|k, _| to_keep.contains(k));
}
}
pub fn get_top_by_frequency(&self, limit: usize) -> Vec<RelationshipStats> {
let mut items: Vec<_> = self.relationships.values().cloned().collect();
items.sort_by_key(|s| std::cmp::Reverse(s.wakeup_count));
items.truncate(limit);
items
}
pub fn get_top_by_latency(&self, limit: usize) -> Vec<RelationshipStats> {
let mut items: Vec<_> = self.relationships.values().cloned().collect();
items.sort_by_key(|s| std::cmp::Reverse(s.avg_latency_us()));
items.truncate(limit);
items
}
pub fn get_critical_relationships(&self, limit: usize) -> Vec<RelationshipStats> {
let mut items: Vec<_> = self
.relationships
.values()
.map(|s| (s.clone(), s.criticality_score()))
.collect();
items.sort_by_key(|(_, score)| std::cmp::Reverse(*score));
items.truncate(limit);
items.into_iter().map(|(s, _)| s).collect()
}
pub fn get_bidirectional_relationships(&self) -> Vec<BidirectionalRelationship> {
let mut results = Vec::new();
for ((waker, wakee), stats1) in &self.relationships {
if let Some(stats2) = self.relationships.get(&(*wakee, *waker)) {
if waker < wakee {
let pattern = classify_bidirectional_pattern(stats1, stats2);
results.push(BidirectionalRelationship {
pid_a: *waker,
comm_a: stats1.waker_comm.clone(),
pid_b: *wakee,
comm_b: stats1.wakee_comm.clone(),
a_wakes_b_count: stats1.wakeup_count,
a_wakes_b_avg_latency_us: stats1.avg_latency_us(),
b_wakes_a_count: stats2.wakeup_count,
b_wakes_a_avg_latency_us: stats2.avg_latency_us(),
pattern_description: pattern,
});
}
}
}
results
}
pub fn get_relationships_for_pid(&self, pid: u32) -> RelationshipsByPid {
let as_waker: Vec<_> = self
.relationships
.iter()
.filter(|((waker, _), _)| *waker == pid)
.map(|(_, stats)| stats.clone())
.collect();
let as_wakee: Vec<_> = self
.relationships
.iter()
.filter(|((_, wakee), _)| *wakee == pid)
.map(|(_, stats)| stats.clone())
.collect();
RelationshipsByPid {
pid,
as_waker,
as_wakee,
}
}
pub fn get_summary(&self) -> WakerWakeeSummary {
let total_relationships = self.relationships.len();
let total_wakeups: u64 = self.relationships.values().map(|s| s.wakeup_count).sum();
let bidirectional_count = self.get_bidirectional_relationships().len();
WakerWakeeSummary {
enabled: self.enabled,
total_relationships,
total_wakeups,
bidirectional_count,
pending_wakeups: self.pending_wakeups.len(),
}
}
}
impl Default for WakerWakeeAnalyzer {
fn default() -> Self {
Self::new()
}
}
impl RelationshipStats {
fn new(
waker_pid: u32,
waker_comm: String,
wakee_pid: u32,
wakee_comm: String,
timestamp_ns: u64,
) -> Self {
Self {
waker_pid,
waker_comm,
wakee_pid,
wakee_comm,
wakeup_count: 0,
first_seen_ns: timestamp_ns,
last_seen_ns: timestamp_ns,
latency_samples: Vec::new(),
min_latency_us: u64::MAX,
max_latency_us: 0,
total_latency_us: 0,
sample_count: 0,
same_cpu_count: 0,
cross_cpu_count: 0,
cpu_pairs: HashMap::new(),
same_llc_count: 0,
cross_llc_count: 0,
same_node_count: 0,
cross_node_count: 0,
}
}
fn record_latency(&mut self, latency_us: u64) {
self.total_latency_us += latency_us;
self.sample_count += 1;
self.min_latency_us = self.min_latency_us.min(latency_us);
self.max_latency_us = self.max_latency_us.max(latency_us);
self.latency_samples.push(latency_us);
if self.latency_samples.len() > MAX_LATENCY_SAMPLES {
self.latency_samples.remove(0);
}
}
pub fn avg_latency_us(&self) -> u64 {
if self.sample_count > 0 {
self.total_latency_us / self.sample_count
} else {
0
}
}
pub fn criticality_score(&self) -> u64 {
self.wakeup_count * self.avg_latency_us()
}
pub fn same_cpu_percentage(&self) -> f64 {
let total = self.same_cpu_count + self.cross_cpu_count;
if total > 0 {
(self.same_cpu_count as f64 / total as f64) * 100.0
} else {
0.0
}
}
pub fn same_llc_percentage(&self) -> f64 {
let total = self.same_llc_count + self.cross_llc_count;
if total > 0 {
(self.same_llc_count as f64 / total as f64) * 100.0
} else {
0.0
}
}
pub fn get_percentiles(&self) -> LatencyPercentiles {
let mut samples = self.latency_samples.clone();
samples.sort_unstable();
let p50 = percentile(&samples, 50);
let p95 = percentile(&samples, 95);
let p99 = percentile(&samples, 99);
LatencyPercentiles { p50, p95, p99 }
}
}
#[derive(Clone, Debug, Serialize)]
pub struct LatencyPercentiles {
pub p50: u64,
pub p95: u64,
pub p99: u64,
}
#[derive(Clone, Debug, Serialize)]
pub struct WakerWakeeSummary {
pub enabled: bool,
pub total_relationships: usize,
pub total_wakeups: u64,
pub bidirectional_count: usize,
pub pending_wakeups: usize,
}
fn percentile(sorted_samples: &[u64], p: usize) -> u64 {
if sorted_samples.is_empty() {
return 0;
}
let index = (sorted_samples.len() * p) / 100;
sorted_samples[index.min(sorted_samples.len() - 1)]
}
fn classify_bidirectional_pattern(
stats1: &RelationshipStats,
stats2: &RelationshipStats,
) -> String {
let ratio = if stats1.wakeup_count > stats2.wakeup_count {
stats1.wakeup_count as f64 / stats2.wakeup_count as f64
} else {
stats2.wakeup_count as f64 / stats1.wakeup_count as f64
};
if ratio < 1.2 {
"Balanced ping-pong (likely mutex/condvar)".to_string()
} else if ratio < 2.0 {
"Slightly imbalanced bidirectional wakeups".to_string()
} else {
format!("Asymmetric bidirectional ({:.1}:1 ratio)", ratio)
}
}
pub fn extract_wakeup_info(json: &Value) -> Option<(u32, u32, String, u32, u64)> {
let event_type = json.get("type")?.as_str()?;
if event_type != "sched_wakeup" && event_type != "sched_wakeup_new" {
return None;
}
let wakee_pid = json.get("pid")?.as_u64()? as u32;
let waker_pid = json.get("waker_pid")?.as_u64()? as u32;
let waker_comm = json.get("waker_comm")?.as_str()?.to_string();
let waker_cpu = json.get("cpu")?.as_u64()? as u32;
let timestamp_ns = json.get("timestamp")?.as_u64()?;
Some((wakee_pid, waker_pid, waker_comm, waker_cpu, timestamp_ns))
}
pub fn extract_wakee_run_info(json: &Value) -> Option<(u32, String, u32, u64)> {
let event_type = json.get("type")?.as_str()?;
if event_type != "sched_switch" {
return None;
}
let wakee_pid = json.get("next_pid")?.as_u64()? as u32;
let wakee_comm = json.get("next_comm")?.as_str()?.to_string();
let wakee_cpu = json.get("cpu")?.as_u64()? as u32;
let timestamp_ns = json.get("timestamp")?.as_u64()?;
Some((wakee_pid, wakee_comm, wakee_cpu, timestamp_ns))
}