use std::collections::BTreeMap;
use crate::ctprof::{CtprofSnapshot, ThreadState};
use super::{
AffinitySummary, AggRule, Aggregated, CTPROF_METRICS, CtprofMetricDef, DiffRow, GroupBy,
ThreadGroup,
pattern::{cgroup_normalize_skeleton, pattern_key, tighten_group},
};
pub fn collect_smaps_rollup(
snap: &CtprofSnapshot,
no_thread_normalize: bool,
) -> BTreeMap<String, BTreeMap<String, u64>> {
collect_smaps_rollup_inner(snap, no_thread_normalize, false, &[], None)
}
pub fn collect_smaps_rollup_hierarchical(
snap: &CtprofSnapshot,
no_thread_normalize: bool,
flatten: &[glob::Pattern],
cgroup_key_map: Option<&BTreeMap<String, String>>,
) -> BTreeMap<String, BTreeMap<String, u64>> {
collect_smaps_rollup_inner(snap, no_thread_normalize, true, flatten, cgroup_key_map)
}
fn collect_smaps_rollup_inner(
snap: &CtprofSnapshot,
no_thread_normalize: bool,
compound_cgroup: bool,
flatten: &[glob::Pattern],
cgroup_key_map: Option<&BTreeMap<String, String>>,
) -> BTreeMap<String, BTreeMap<String, u64>> {
let mut out: BTreeMap<String, BTreeMap<String, u64>> = BTreeMap::new();
for t in &snap.threads {
if t.smaps_rollup_kb.is_empty() {
continue;
}
let pcomm_key = if no_thread_normalize {
format!("{}[{}]", t.pcomm, t.tgid)
} else {
pattern_key(&t.pcomm)
};
let key = if compound_cgroup {
let cg = flatten_cgroup_path(&t.cgroup, flatten);
let cg_key = match cgroup_key_map.and_then(|m| m.get(&cg)) {
Some(k) => k.clone(),
None => cg,
};
format!("{cg_key}\x00{pcomm_key}")
} else {
pcomm_key
};
let entry = out.entry(key).or_default();
for (k, b) in t.smaps_rollup_bytes() {
entry
.entry(k.clone())
.and_modify(|v| *v = v.saturating_add(b.0))
.or_insert(b.0);
}
}
out
}
pub fn build_cgroup_key_map(
baseline: &CtprofSnapshot,
candidate: &CtprofSnapshot,
flatten: &[glob::Pattern],
) -> BTreeMap<String, String> {
use std::collections::BTreeSet;
let mut paths: BTreeSet<String> = BTreeSet::new();
for snap in [baseline, candidate] {
for t in &snap.threads {
paths.insert(flatten_cgroup_path(&t.cgroup, flatten));
}
for k in snap.cgroup_stats.keys() {
paths.insert(flatten_cgroup_path(k, flatten));
}
}
let entries: Vec<(String, String, String, Vec<String>)> = paths
.into_iter()
.map(|p| {
let (skeleton, post_l1, tokens) = cgroup_normalize_skeleton(&p);
(p, skeleton, post_l1, tokens)
})
.collect();
let mut groups: BTreeMap<String, Vec<usize>> = BTreeMap::new();
for (idx, (_, skel, _, _)) in entries.iter().enumerate() {
groups.entry(skel.clone()).or_default().push(idx);
}
let mut tightened: Vec<String> = vec![String::new(); entries.len()];
for (skeleton, indices) in &groups {
if indices.len() < 2 {
for &i in indices {
tightened[i] = skeleton.clone();
}
} else {
let post_l1_paths: Vec<String> =
indices.iter().map(|&i| entries[i].2.clone()).collect();
let member_tokens: Vec<Vec<String>> =
indices.iter().map(|&i| entries[i].3.clone()).collect();
let key = tighten_group(&post_l1_paths, &member_tokens);
for &i in indices {
tightened[i] = key.clone();
}
}
}
let mut out: BTreeMap<String, String> = BTreeMap::new();
for (i, (orig, _, _, _)) in entries.into_iter().enumerate() {
out.insert(orig, tightened[i].clone());
}
out
}
#[allow(clippy::too_many_arguments)]
pub(super) fn build_row(
key: &str,
display_key: &str,
n_a: usize,
n_b: usize,
metric: &'static CtprofMetricDef,
a: Aggregated,
b: Aggregated,
uptime_pct: Option<f64>,
) -> DiffRow {
let (delta, delta_pct) = match (a.numeric(), b.numeric()) {
(Some(va), Some(vb)) => {
let d = vb - va;
let pct = if va.abs() > f64::EPSILON {
Some(d / va)
} else {
None
};
(Some(d), pct)
}
_ => (None, None),
};
DiffRow {
group_key: key.to_string(),
thread_count_a: n_a,
thread_count_b: n_b,
uptime_pct,
metric_name: metric.name,
metric_ladder: metric.rule.ladder(),
baseline: a,
candidate: b,
delta,
delta_pct,
display_key: display_key.to_string(),
sort_by_cell: None,
sort_by_delta: None,
}
}
pub fn build_groups(
snap: &CtprofSnapshot,
group_by: GroupBy,
flatten: &[glob::Pattern],
pattern_counts: Option<&BTreeMap<String, usize>>,
cgroup_key_map: Option<&BTreeMap<String, String>>,
no_thread_normalize: bool,
) -> BTreeMap<String, ThreadGroup> {
let pattern_field: Option<fn(&ThreadState) -> &str> = match (group_by, no_thread_normalize) {
(GroupBy::Comm, false) => Some(|t: &ThreadState| t.comm.as_str()),
(GroupBy::Pcomm, false) => Some(|t: &ThreadState| t.pcomm.as_str()),
_ => None,
};
let local_counts: Option<BTreeMap<String, usize>> = match (pattern_field, pattern_counts) {
(Some(field), None) => {
let mut counts: BTreeMap<String, usize> = BTreeMap::new();
for t in &snap.threads {
*counts.entry(pattern_key(field(t))).or_insert(0) += 1;
}
Some(counts)
}
_ => None,
};
let counts_ref: Option<&BTreeMap<String, usize>> = pattern_counts.or(local_counts.as_ref());
let mut buckets: BTreeMap<String, Vec<&ThreadState>> = BTreeMap::new();
for t in &snap.threads {
let key = match group_by {
GroupBy::All => {
let cg = flatten_cgroup_path(&t.cgroup, flatten);
let cg_key = match cgroup_key_map.and_then(|m| m.get(&cg)) {
Some(k) => k.clone(),
None => cg,
};
let pcomm_key = if no_thread_normalize {
t.pcomm.clone()
} else {
pattern_key(&t.pcomm)
};
let comm_key = if no_thread_normalize {
t.comm.clone()
} else {
pattern_key(&t.comm)
};
format!("{cg_key}\x00{pcomm_key}\x00{comm_key}")
}
GroupBy::Pcomm | GroupBy::Comm => match pattern_field {
Some(field) => {
let name = field(t);
let pk = pattern_key(name);
let counts = counts_ref.expect("pattern_counts seeded for Pcomm/Comm");
if counts.get(&pk).copied().unwrap_or(0) >= 2 {
pk
} else {
name.to_string()
}
}
None => {
if group_by == GroupBy::Pcomm {
t.pcomm.clone()
} else {
t.comm.clone()
}
}
},
GroupBy::CommExact => t.comm.clone(),
GroupBy::Cgroup => {
let post_flatten = flatten_cgroup_path(&t.cgroup, flatten);
match cgroup_key_map.and_then(|m| m.get(&post_flatten)) {
Some(k) => k.clone(),
None => post_flatten,
}
}
};
buckets.entry(key).or_default().push(t);
}
let mut out = BTreeMap::new();
for (key, threads) in buckets {
let mut metrics = BTreeMap::new();
for m in CTPROF_METRICS {
metrics.insert(m.name.to_string(), aggregate(m.rule, &threads));
}
let cgroup_stats = if group_by == GroupBy::Cgroup {
threads
.first()
.and_then(|t| snap.cgroup_stats.get(&t.cgroup).cloned())
} else {
None
};
let members: Vec<String> = match pattern_field {
Some(field) => {
let mut v: Vec<String> = threads.iter().map(|t| field(t).to_string()).collect();
v.sort();
v.dedup();
v
}
None => Vec::new(),
};
let valid_starts: Vec<u64> = threads
.iter()
.map(|t| t.start_time_clock_ticks)
.filter(|&t| t > 0)
.collect();
let avg_start_ticks = if valid_starts.is_empty() {
0
} else {
valid_starts.iter().sum::<u64>() / valid_starts.len() as u64
};
out.insert(
key.clone(),
ThreadGroup {
key,
thread_count: threads.len(),
metrics,
cgroup_stats,
members,
avg_start_ticks,
},
);
}
out
}
fn mode_aggregate(
total: usize,
items: impl IntoIterator<Item = crate::metric_types::CategoricalString>,
) -> Aggregated {
let mut tallies: BTreeMap<String, usize> = BTreeMap::new();
for item in items {
*tallies.entry(item.0).or_insert(0) += 1;
}
Aggregated::Mode { tallies, total }
}
pub fn aggregate(rule: AggRule, threads: &[&ThreadState]) -> Aggregated {
use crate::metric_types::{CategoricalString, Maxable, Rangeable, Summable};
match rule {
AggRule::SumCount(f) => {
let s = crate::metric_types::MonotonicCount::sum_across(threads.iter().map(|t| f(t)));
Aggregated::Sum(s.0)
}
AggRule::SumNs(f) => {
let s = crate::metric_types::MonotonicNs::sum_across(threads.iter().map(|t| f(t)));
Aggregated::Sum(s.0)
}
AggRule::SumTicks(f) => {
let s = crate::metric_types::ClockTicks::sum_across(threads.iter().map(|t| f(t)));
Aggregated::Sum(s.0)
}
AggRule::SumBytes(f) => {
let s = crate::metric_types::Bytes::sum_across(threads.iter().map(|t| f(t)));
Aggregated::Sum(s.0)
}
AggRule::MaxPeak(f) => {
let m = crate::metric_types::PeakNs::max_across(threads.iter().map(|t| f(t)));
Aggregated::Max(m.map(|v| v.0).unwrap_or(0))
}
AggRule::MaxPeakBytes(f) => {
let m = crate::metric_types::PeakBytes::max_across(threads.iter().map(|t| f(t)));
Aggregated::Max(m.map(|v| v.0).unwrap_or(0))
}
AggRule::MaxGaugeNs(f) => {
let m = crate::metric_types::GaugeNs::max_across(threads.iter().map(|t| f(t)));
Aggregated::Max(m.map(|v| v.0).unwrap_or(0))
}
AggRule::MaxGaugeCount(f) => {
let m = crate::metric_types::GaugeCount::max_across(threads.iter().map(|t| f(t)));
Aggregated::Max(m.map(|v| v.0).unwrap_or(0))
}
AggRule::RangeI32(f) => {
match crate::metric_types::OrdinalI32::range_across(threads.iter().map(|t| f(t))) {
Some(r) => {
let (min, max) = r.into_tuple();
Aggregated::OrdinalRange {
min: i64::from(min.0),
max: i64::from(max.0),
}
}
None => Aggregated::OrdinalRange { min: 0, max: 0 },
}
}
AggRule::RangeU32(f) => {
match crate::metric_types::OrdinalU32::range_across(threads.iter().map(|t| f(t))) {
Some(r) => {
let (min, max) = r.into_tuple();
Aggregated::OrdinalRange {
min: i64::from(min.0),
max: i64::from(max.0),
}
}
None => Aggregated::OrdinalRange { min: 0, max: 0 },
}
}
AggRule::Mode(f) => mode_aggregate(threads.len(), threads.iter().map(|t| f(t))),
AggRule::ModeChar(f) => mode_aggregate(
threads.len(),
threads.iter().map(|t| CategoricalString(f(t).to_string())),
),
AggRule::ModeBool(f) => mode_aggregate(
threads.len(),
threads.iter().map(|t| CategoricalString(f(t).to_string())),
),
AggRule::Affinity(f) => {
let mut seen: Vec<Vec<u32>> = Vec::new();
let mut min_cpus = usize::MAX;
let mut max_cpus = 0usize;
for t in threads {
let cpus = f(t).0;
min_cpus = min_cpus.min(cpus.len());
max_cpus = max_cpus.max(cpus.len());
if !seen.iter().any(|s| s == &cpus) {
seen.push(cpus);
}
}
if threads.is_empty() {
min_cpus = 0;
}
let uniform = if seen.len() == 1 {
seen.into_iter().next()
} else {
None
};
Aggregated::Affinity(AffinitySummary {
min_cpus,
max_cpus,
uniform,
})
}
}
}
pub fn flatten_cgroup_path(path: &str, patterns: &[glob::Pattern]) -> String {
for p in patterns {
if p.matches(path) {
return p.as_str().to_string();
}
}
path.to_string()
}
pub fn compile_flatten_patterns(raw: &[String]) -> Vec<glob::Pattern> {
raw.iter()
.filter_map(|s| glob::Pattern::new(s).ok())
.collect()
}