use std::collections::HashMap;
use std::time::Duration;
use hydro_deploy::Deployment;
use hydro_lang::FlowBuilder;
use hydro_lang::builder::RewriteIrFlowBuilder;
use hydro_lang::builder::deploy::DeployResult;
use hydro_lang::deploy::HydroDeploy;
use hydro_lang::deploy::deploy_graph::DeployCrateWrapper;
use hydro_lang::internal_constants::{COUNTER_PREFIX, CPU_USAGE_PREFIX};
use hydro_lang::ir::{HydroLeaf, HydroNode, deep_clone, traverse_dfir};
use hydro_lang::location::LocationId;
use hydro_lang::rewrites::persist_pullup::persist_pullup;
use tokio::sync::mpsc::UnboundedReceiver;
use crate::decouple_analysis::decouple_analysis;
use crate::decoupler::Decoupler;
use crate::deploy::ReusableHosts;
use crate::parse_results::{analyze_cluster_results, analyze_send_recv_overheads};
use crate::repair::{cycle_source_to_sink_input, inject_id, remove_counter};
fn insert_counter_node(node: &mut HydroNode, next_stmt_id: &mut usize, duration: syn::Expr) {
match node {
HydroNode::Placeholder
| HydroNode::Unpersist { .. }
| HydroNode::Counter { .. } => {
std::panic!("Unexpected {:?} found in insert_counter_node", node.print_root());
}
HydroNode::Source { metadata, .. }
| HydroNode::CycleSource { metadata, .. }
| HydroNode::Persist { metadata, .. }
| HydroNode::Delta { metadata, .. }
| HydroNode::Chain { metadata, .. } | HydroNode::CrossSingleton { metadata, .. }
| HydroNode::CrossProduct { metadata, .. } | HydroNode::Join { metadata, .. }
| HydroNode::ResolveFutures { metadata, .. }
| HydroNode::ResolveFuturesOrdered { metadata, .. }
| HydroNode::Difference { metadata, .. }
| HydroNode::AntiJoin { metadata, .. }
| HydroNode::FlatMap { metadata, .. }
| HydroNode::Filter { metadata, .. }
| HydroNode::FilterMap { metadata, .. }
| HydroNode::Unique { metadata, .. }
| HydroNode::Scan { metadata, .. }
| HydroNode::Fold { metadata, .. } | HydroNode::Reduce { metadata, .. } | HydroNode::FoldKeyed { metadata, .. }
| HydroNode::ReduceKeyed { metadata, .. }
| HydroNode::Network { metadata, .. }
| HydroNode::ExternalInput { metadata, .. }
=> {
let metadata = metadata.clone();
let node_content = std::mem::replace(node, HydroNode::Placeholder);
let counter = HydroNode::Counter {
tag: next_stmt_id.to_string(),
duration: duration.into(),
input: Box::new(node_content),
metadata: metadata.clone(),
};
*next_stmt_id += 1;
*node = counter;
}
HydroNode::Tee { .. } | HydroNode::Map { .. } | HydroNode::DeferTick { .. } | HydroNode::Enumerate { .. }
| HydroNode::Inspect { .. }
| HydroNode::Sort { .. }
=> {}
}
}
fn insert_counter(ir: &mut [HydroLeaf], duration: syn::Expr) {
traverse_dfir(
ir,
|_, _| {},
|node, next_stmt_id| {
insert_counter_node(node, next_stmt_id, duration.clone());
},
);
}
async fn track_process_usage_cardinality(
process: &impl DeployCrateWrapper,
) -> (UnboundedReceiver<String>, UnboundedReceiver<String>) {
(
process.stdout_filter(CPU_USAGE_PREFIX).await,
process.stdout_filter(COUNTER_PREFIX).await,
)
}
async fn track_cluster_usage_cardinality(
nodes: &DeployResult<'_, HydroDeploy>,
) -> (
HashMap<(LocationId, String, usize), UnboundedReceiver<String>>,
HashMap<(LocationId, String, usize), UnboundedReceiver<String>>,
) {
let mut usage_out = HashMap::new();
let mut cardinality_out = HashMap::new();
for (id, name, cluster) in nodes.get_all_clusters() {
for (idx, node) in cluster.members().iter().enumerate() {
let (node_usage_out, node_cardinality_out) =
track_process_usage_cardinality(node).await;
usage_out.insert((id.clone(), name.clone(), idx), node_usage_out);
cardinality_out.insert((id.clone(), name.clone(), idx), node_cardinality_out);
}
}
for (id, name, process) in nodes.get_all_processes() {
let (process_usage_out, process_cardinality_out) =
track_process_usage_cardinality(process).await;
usage_out.insert((id.clone(), name.clone(), 0), process_usage_out);
cardinality_out.insert((id.clone(), name.clone(), 0), process_cardinality_out);
}
(usage_out, cardinality_out)
}
pub async fn deploy_and_analyze<'a>(
reusable_hosts: &mut ReusableHosts,
deployment: &mut Deployment,
builder: FlowBuilder<'a>,
clusters: &Vec<(usize, String, usize)>,
processes: &Vec<(usize, String)>,
exclude_from_decoupling: Vec<String>,
num_seconds: Option<usize>,
) -> (
RewriteIrFlowBuilder<'a>,
Vec<HydroLeaf>,
Decoupler,
String,
usize,
) {
let counter_output_duration = syn::parse_quote!(std::time::Duration::from_secs(1));
let rewritten_ir_builder = builder.rewritten_ir_builder();
let optimized = builder.optimize_with(persist_pullup).optimize_with(|leaf| {
insert_counter(leaf, counter_output_duration);
});
let mut ir = deep_clone(optimized.ir());
let mut deployable = optimized.into_deploy();
for (cluster_id, name, num_hosts) in clusters {
deployable = deployable.with_cluster_id_name(
*cluster_id,
name.clone(),
reusable_hosts.get_cluster_hosts(deployment, name.clone(), *num_hosts),
);
}
for (process_id, name) in processes {
deployable = deployable.with_process_id_name(
*process_id,
name.clone(),
reusable_hosts.get_process_hosts(deployment, name.clone()),
);
}
let nodes = deployable.deploy(deployment);
deployment.deploy().await.unwrap();
let (mut usage_out, mut cardinality_out) = track_cluster_usage_cardinality(&nodes).await;
deployment
.start_until(async {
if let Some(seconds) = num_seconds {
tokio::time::sleep(Duration::from_secs(seconds as u64)).await;
} else {
eprintln!("Press enter to stop deployment and analyze results");
let _ = tokio::io::AsyncBufReadExt::lines(tokio::io::BufReader::new(
tokio::io::stdin(),
))
.next_line()
.await
.unwrap();
}
})
.await
.unwrap();
let (bottleneck, bottleneck_name, bottleneck_num_nodes) = analyze_cluster_results(
&nodes,
&mut ir,
&mut usage_out,
&mut cardinality_out,
exclude_from_decoupling,
)
.await;
remove_counter(&mut ir);
inject_id(&mut ir);
let cycle_source_to_sink_input = cycle_source_to_sink_input(&mut ir);
let (send_overhead, recv_overhead) = analyze_send_recv_overheads(&mut ir, &bottleneck);
let (orig_to_decoupled, decoupled_to_orig, place_on_decoupled) = decouple_analysis(
&mut ir,
&bottleneck,
send_overhead,
recv_overhead,
&cycle_source_to_sink_input,
);
(
rewritten_ir_builder,
ir,
Decoupler {
output_to_decoupled_machine_after: orig_to_decoupled,
output_to_original_machine_after: decoupled_to_orig,
place_on_decoupled_machine: place_on_decoupled,
orig_location: bottleneck.clone(),
decoupled_location: LocationId::Process(0), },
bottleneck_name,
bottleneck_num_nodes,
)
}