use std::collections::HashMap;
use hydro_lang::builder::deploy::DeployResult;
use hydro_lang::deploy::HydroDeploy;
use hydro_lang::deploy::deploy_graph::DeployCrateWrapper;
use hydro_lang::ir::{HydroLeaf, HydroNode, traverse_dfir};
use hydro_lang::location::LocationId;
use regex::Regex;
use tokio::sync::mpsc::UnboundedReceiver;
use crate::rewrites::{NetworkType, get_network_type};
pub fn parse_cpu_usage(measurement: String) -> f64 {
let regex = Regex::new(r"Total (\d+\.\d+)%").unwrap();
regex
.captures_iter(&measurement)
.last()
.map(|cap| cap[1].parse::<f64>().unwrap())
.unwrap_or(0f64)
}
fn parse_perf(file: String) -> HashMap<(usize, bool), f64> {
let mut total_samples = 0f64;
let mut unidentified_samples = 0f64;
let mut samples_per_id = HashMap::new();
let operator_regex = Regex::new(r"op_\d+v\d+__(.*?)__(send)?(recv)?(\d+)").unwrap();
for line in file.lines() {
let n_samples_index = line.rfind(' ').unwrap() + 1;
let n_samples = &line[n_samples_index..].parse::<f64>().unwrap();
if let Some(cap) = operator_regex.captures_iter(line).last() {
let id = cap[4].parse::<usize>().unwrap();
let is_network_recv = cap
.get(3)
.is_some_and(|direction| direction.as_str() == "recv");
let dfir_operator_and_samples =
samples_per_id.entry((id, is_network_recv)).or_insert(0.0);
*dfir_operator_and_samples += n_samples;
} else {
unidentified_samples += n_samples;
}
total_samples += n_samples;
}
println!(
"Out of {} samples, {} were unidentified, {}%",
total_samples,
unidentified_samples,
unidentified_samples / total_samples * 100.0
);
samples_per_id
.iter_mut()
.for_each(|(_, samples)| *samples /= total_samples);
samples_per_id
}
fn inject_perf_leaf(
leaf: &mut HydroLeaf,
id_to_usage: &HashMap<(usize, bool), f64>,
next_stmt_id: &mut usize,
) {
if let Some(cpu_usage) = id_to_usage.get(&(*next_stmt_id, false)) {
leaf.metadata_mut().cpu_usage = Some(*cpu_usage);
}
}
fn inject_perf_node(
node: &mut HydroNode,
id_to_usage: &HashMap<(usize, bool), f64>,
next_stmt_id: &mut usize,
) {
if let Some(cpu_usage) = id_to_usage.get(&(*next_stmt_id, false)) {
node.metadata_mut().cpu_usage = Some(*cpu_usage);
}
if let HydroNode::Network { metadata, .. } = node {
if let Some(cpu_usage) = id_to_usage.get(&(*next_stmt_id, true)) {
metadata.network_recv_cpu_usage = Some(*cpu_usage);
}
}
}
pub fn inject_perf(ir: &mut [HydroLeaf], folded_data: Vec<u8>) {
let id_to_usage = parse_perf(String::from_utf8(folded_data).unwrap());
traverse_dfir(
ir,
|leaf, next_stmt_id| {
inject_perf_leaf(leaf, &id_to_usage, next_stmt_id);
},
|node, next_stmt_id| {
inject_perf_node(node, &id_to_usage, next_stmt_id);
},
);
}
pub fn parse_counter_usage(measurement: String) -> (usize, usize) {
let regex = Regex::new(r"\((\d+)\): (\d+)").unwrap();
let matches = regex.captures_iter(&measurement).last().unwrap();
let op_id = matches[1].parse::<usize>().unwrap();
let count = matches[2].parse::<usize>().unwrap();
(op_id, count)
}
fn inject_count_node(
node: &mut HydroNode,
next_stmt_id: &mut usize,
op_to_count: &HashMap<usize, usize>,
) {
if let Some(count) = op_to_count.get(next_stmt_id) {
let metadata = node.metadata_mut();
metadata.cardinality = Some(*count);
} else {
match node {
HydroNode::Tee { inner ,metadata, .. } => {
metadata.cardinality = inner.0.borrow().metadata().cardinality;
}
| HydroNode::Map { input, metadata, .. } | HydroNode::DeferTick { input, metadata, .. } | HydroNode::Enumerate { input, metadata, .. }
| HydroNode::Inspect { input, metadata, .. }
| HydroNode::Sort { input, metadata, .. }
| HydroNode::Counter { input, metadata, .. }
=> {
metadata.cardinality = input.metadata().cardinality;
}
_ => {}
}
}
}
pub fn inject_count(ir: &mut [HydroLeaf], op_to_count: &HashMap<usize, usize>) {
traverse_dfir(
ir,
|_, _| {},
|node, next_stmt_id| {
inject_count_node(node, next_stmt_id, op_to_count);
},
);
}
pub async fn analyze_process_results(
process: &impl DeployCrateWrapper,
ir: &mut [HydroLeaf],
_node_usage: f64,
node_cardinality: &mut UnboundedReceiver<String>,
) {
if let Some(perf_results) = process.tracing_results().await {
inject_perf(ir, perf_results.folded_data);
let mut op_to_counter = HashMap::new();
while let Some(measurement) = node_cardinality.recv().await {
let (op_id, count) = parse_counter_usage(measurement);
op_to_counter.insert(op_id, count);
}
inject_count(ir, &op_to_counter);
}
}
pub async fn analyze_cluster_results(
nodes: &DeployResult<'_, HydroDeploy>,
ir: &mut [HydroLeaf],
usage_out: &mut HashMap<(LocationId, String, usize), UnboundedReceiver<String>>,
cardinality_out: &mut HashMap<(LocationId, String, usize), UnboundedReceiver<String>>,
exclude_from_decoupling: Vec<String>,
) -> (LocationId, String, usize) {
let mut max_usage_cluster_id = None;
let mut max_usage_cluster_size = 0;
let mut max_usage_cluster_name = String::new();
let mut max_usage_overall = 0f64;
for (id, name, cluster) in nodes.get_all_clusters() {
println!("Analyzing cluster {:?}: {}", id, name);
let mut max_usage = None;
for (idx, _) in cluster.members().iter().enumerate() {
let usage =
get_usage(usage_out.get_mut(&(id.clone(), name.clone(), idx)).unwrap()).await;
println!("Node {} usage: {}", idx, usage);
if let Some((prev_usage, _)) = max_usage {
if usage > prev_usage {
max_usage = Some((usage, idx));
}
} else {
max_usage = Some((usage, idx));
}
}
if let Some((usage, idx)) = max_usage {
let node_cardinality = cardinality_out
.get_mut(&(id.clone(), name.clone(), idx))
.unwrap();
analyze_process_results(
cluster.members().get(idx).unwrap(),
ir,
usage,
node_cardinality,
)
.await;
if max_usage_overall < usage && !exclude_from_decoupling.contains(&name) {
max_usage_cluster_id = Some(id.clone());
max_usage_cluster_name = name.clone();
max_usage_cluster_size = cluster.members().len();
max_usage_overall = usage;
println!("The bottleneck is {}", name);
}
}
}
(
max_usage_cluster_id.unwrap(),
max_usage_cluster_name,
max_usage_cluster_size,
)
}
pub async fn get_usage(usage_out: &mut UnboundedReceiver<String>) -> f64 {
let measurement = usage_out.recv().await.unwrap();
parse_cpu_usage(measurement)
}
fn analyze_overheads_node(
node: &mut HydroNode,
_next_stmt_id: &mut usize,
max_send_overhead: &mut f64,
max_recv_overhead: &mut f64,
location: &LocationId,
) {
let metadata = node.metadata();
let network_type = get_network_type(node, location.root().raw_id());
match network_type {
Some(NetworkType::Send) | Some(NetworkType::SendRecv) => {
if let Some(cpu_usage) = metadata.cpu_usage {
if let Some(cardinality) = node.input_metadata().first().unwrap().cardinality {
let overhead = cpu_usage / cardinality as f64;
println!("New send overhead: {}", overhead);
if overhead > *max_send_overhead {
*max_send_overhead = overhead;
}
}
}
}
_ => {}
}
match network_type {
Some(NetworkType::Recv) | Some(NetworkType::SendRecv) => {
if let Some(cardinality) = metadata.cardinality {
if let Some(cpu_usage) = metadata.network_recv_cpu_usage {
let overhead = cpu_usage / cardinality as f64;
println!("New receive overhead: {}", overhead);
if overhead > *max_recv_overhead {
*max_recv_overhead = overhead;
}
}
}
}
_ => {}
}
}
pub fn analyze_send_recv_overheads(ir: &mut [HydroLeaf], location: &LocationId) -> (f64, f64) {
let mut max_send_overhead = 0.0;
let mut max_recv_overhead = 0.0;
traverse_dfir(
ir,
|_, _| {},
|node, next_stmt_id| {
analyze_overheads_node(
node,
next_stmt_id,
&mut max_send_overhead,
&mut max_recv_overhead,
location,
);
},
);
if max_send_overhead == 0.0 {
println!("Warning: No send overhead found.");
}
if max_recv_overhead == 0.0 {
println!("Warning: No receive overhead found.");
}
(max_send_overhead, max_recv_overhead)
}