use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::fmt::{Display, Write};
use std::num::ParseIntError;
use std::sync::OnceLock;
use auto_impl::auto_impl;
use slotmap::{Key, SecondaryMap, SlotMap};
pub use super::graphviz::{HydroDot, escape_dot};
pub use super::json::HydroJson;
pub use super::mermaid::{HydroMermaid, escape_mermaid};
use crate::compile::ir::backtrace::Backtrace;
use crate::compile::ir::{DebugExpr, HydroIrMetadata, HydroNode, HydroRoot, HydroSource};
use crate::location::dynamic::LocationId;
use crate::location::{LocationKey, LocationType};
#[derive(Debug, Clone)]
pub enum NodeLabel {
Static(String),
WithExprs {
op_name: String,
exprs: Vec<DebugExpr>,
},
}
impl NodeLabel {
pub fn static_label(s: String) -> Self {
Self::Static(s)
}
pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
Self::WithExprs { op_name, exprs }
}
}
impl Display for NodeLabel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Static(s) => write!(f, "{}", s),
Self::WithExprs { op_name, exprs } => {
if exprs.is_empty() {
write!(f, "{}()", op_name)
} else {
let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
write!(f, "{}({})", op_name, expr_strs.join(", "))
}
}
}
}
}
pub struct IndentedGraphWriter<'a, W> {
pub write: W,
pub indent: usize,
pub config: HydroWriteConfig<'a>,
}
impl<'a, W> IndentedGraphWriter<'a, W> {
pub fn new(write: W) -> Self {
Self {
write,
indent: 0,
config: HydroWriteConfig::default(),
}
}
pub fn new_with_config(write: W, config: HydroWriteConfig<'a>) -> Self {
Self {
write,
indent: 0,
config,
}
}
}
impl<W: Write> IndentedGraphWriter<'_, W> {
pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
}
}
pub type GraphWriteError = std::fmt::Error;
#[auto_impl(&mut, Box)]
pub trait HydroGraphWrite {
type Err: Error;
fn write_prologue(&mut self) -> Result<(), Self::Err>;
fn write_node_definition(
&mut self,
node_id: VizNodeKey,
node_label: &NodeLabel,
node_type: HydroNodeType,
location_key: Option<LocationKey>,
location_type: Option<LocationType>,
backtrace: Option<&Backtrace>,
) -> Result<(), Self::Err>;
fn write_edge(
&mut self,
src_id: VizNodeKey,
dst_id: VizNodeKey,
edge_properties: &HashSet<HydroEdgeProp>,
label: Option<&str>,
) -> Result<(), Self::Err>;
fn write_location_start(
&mut self,
location_key: LocationKey,
location_type: LocationType,
) -> Result<(), Self::Err>;
fn write_node(&mut self, node_id: VizNodeKey) -> Result<(), Self::Err>;
fn write_location_end(&mut self) -> Result<(), Self::Err>;
fn write_epilogue(&mut self) -> Result<(), Self::Err>;
}
pub mod node_type_utils {
use super::HydroNodeType;
const NODE_TYPE_DATA: &[(HydroNodeType, &str)] = &[
(HydroNodeType::Source, "Source"),
(HydroNodeType::Transform, "Transform"),
(HydroNodeType::Join, "Join"),
(HydroNodeType::Aggregation, "Aggregation"),
(HydroNodeType::Network, "Network"),
(HydroNodeType::Sink, "Sink"),
(HydroNodeType::Tee, "Tee"),
(HydroNodeType::NonDeterministic, "NonDeterministic"),
];
pub fn to_string(node_type: HydroNodeType) -> &'static str {
NODE_TYPE_DATA
.iter()
.find(|(nt, _)| *nt == node_type)
.map(|(_, name)| *name)
.unwrap_or("Unknown")
}
pub fn all_types_with_strings() -> Vec<(HydroNodeType, &'static str)> {
NODE_TYPE_DATA.to_vec()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HydroNodeType {
Source,
Transform,
Join,
Aggregation,
Network,
Sink,
Tee,
NonDeterministic,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum HydroEdgeProp {
Bounded,
Unbounded,
TotalOrder,
NoOrder,
Keyed,
Stream,
KeyedSingleton,
KeyedStream,
Singleton,
Optional,
Network,
Cycle,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UnifiedEdgeStyle {
pub line_pattern: LinePattern,
pub line_width: u8,
pub arrowhead: ArrowheadStyle,
pub line_style: LineStyle,
pub halo: HaloStyle,
pub waviness: WavinessStyle,
pub animation: AnimationStyle,
pub color: &'static str,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LinePattern {
Solid,
Dotted,
Dashed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ArrowheadStyle {
TriangleFilled,
CircleFilled,
DiamondOpen,
Default,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LineStyle {
Single,
HashMarks,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HaloStyle {
None,
LightBlue,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WavinessStyle {
None,
Wavy,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AnimationStyle {
Static,
Animated,
}
impl Default for UnifiedEdgeStyle {
fn default() -> Self {
Self {
line_pattern: LinePattern::Solid,
line_width: 1,
arrowhead: ArrowheadStyle::Default,
line_style: LineStyle::Single,
halo: HaloStyle::None,
waviness: WavinessStyle::None,
animation: AnimationStyle::Static,
color: "#666666",
}
}
}
pub fn get_unified_edge_style(
edge_properties: &HashSet<HydroEdgeProp>,
src_location: Option<usize>,
dst_location: Option<usize>,
) -> UnifiedEdgeStyle {
let mut style = UnifiedEdgeStyle::default();
let is_network = edge_properties.contains(&HydroEdgeProp::Network)
|| (src_location.is_some() && dst_location.is_some() && src_location != dst_location);
if is_network {
style.line_pattern = LinePattern::Dashed;
style.animation = AnimationStyle::Animated;
} else {
style.line_pattern = LinePattern::Solid;
style.animation = AnimationStyle::Static;
}
if edge_properties.contains(&HydroEdgeProp::Unbounded) {
style.halo = HaloStyle::LightBlue;
} else {
style.halo = HaloStyle::None;
}
if edge_properties.contains(&HydroEdgeProp::Stream) {
style.arrowhead = ArrowheadStyle::TriangleFilled;
style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedStream) {
style.arrowhead = ArrowheadStyle::TriangleFilled;
style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedSingleton) {
style.arrowhead = ArrowheadStyle::TriangleFilled;
style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Singleton) {
style.arrowhead = ArrowheadStyle::CircleFilled;
style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Optional) {
style.arrowhead = ArrowheadStyle::DiamondOpen;
style.color = "#6b7280"; }
if edge_properties.contains(&HydroEdgeProp::Keyed) {
style.line_style = LineStyle::HashMarks; } else {
style.line_style = LineStyle::Single;
}
if edge_properties.contains(&HydroEdgeProp::NoOrder) {
style.waviness = WavinessStyle::Wavy;
} else if edge_properties.contains(&HydroEdgeProp::TotalOrder) {
style.waviness = WavinessStyle::None;
}
style
}
pub fn extract_edge_properties_from_collection_kind(
collection_kind: &crate::compile::ir::CollectionKind,
) -> HashSet<HydroEdgeProp> {
use crate::compile::ir::CollectionKind;
let mut properties = HashSet::new();
match collection_kind {
CollectionKind::Stream { bound, order, .. } => {
properties.insert(HydroEdgeProp::Stream);
add_bound_property(&mut properties, bound);
add_order_property(&mut properties, order);
}
CollectionKind::KeyedStream {
bound, value_order, ..
} => {
properties.insert(HydroEdgeProp::KeyedStream);
properties.insert(HydroEdgeProp::Keyed);
add_bound_property(&mut properties, bound);
add_order_property(&mut properties, value_order);
}
CollectionKind::Singleton { bound, .. } => {
properties.insert(HydroEdgeProp::Singleton);
add_singleton_bound_property(&mut properties, bound);
properties.insert(HydroEdgeProp::TotalOrder);
}
CollectionKind::Optional { bound, .. } => {
properties.insert(HydroEdgeProp::Optional);
add_bound_property(&mut properties, bound);
properties.insert(HydroEdgeProp::TotalOrder);
}
CollectionKind::KeyedSingleton { bound, .. } => {
properties.insert(HydroEdgeProp::Singleton);
properties.insert(HydroEdgeProp::Keyed);
add_keyed_singleton_bound_property(&mut properties, bound);
properties.insert(HydroEdgeProp::TotalOrder);
}
}
properties
}
fn add_bound_property(
properties: &mut HashSet<HydroEdgeProp>,
bound: &crate::compile::ir::BoundKind,
) {
use crate::compile::ir::BoundKind;
match bound {
BoundKind::Bounded => {
properties.insert(HydroEdgeProp::Bounded);
}
BoundKind::Unbounded => {
properties.insert(HydroEdgeProp::Unbounded);
}
}
}
fn add_singleton_bound_property(
properties: &mut HashSet<HydroEdgeProp>,
bound: &crate::compile::ir::SingletonBoundKind,
) {
use crate::compile::ir::SingletonBoundKind;
match bound {
SingletonBoundKind::Bounded => {
properties.insert(HydroEdgeProp::Bounded);
}
SingletonBoundKind::Monotonic | SingletonBoundKind::Unbounded => {
properties.insert(HydroEdgeProp::Unbounded);
}
}
}
fn add_keyed_singleton_bound_property(
properties: &mut HashSet<HydroEdgeProp>,
bound: &crate::compile::ir::KeyedSingletonBoundKind,
) {
use crate::compile::ir::KeyedSingletonBoundKind;
match bound {
KeyedSingletonBoundKind::Bounded => {
properties.insert(HydroEdgeProp::Bounded);
}
KeyedSingletonBoundKind::BoundedValue
| KeyedSingletonBoundKind::MonotonicValue
| KeyedSingletonBoundKind::Unbounded => {
properties.insert(HydroEdgeProp::Unbounded);
}
}
}
fn add_order_property(
properties: &mut HashSet<HydroEdgeProp>,
order: &crate::compile::ir::StreamOrder,
) {
use crate::compile::ir::StreamOrder;
match order {
StreamOrder::TotalOrder => {
properties.insert(HydroEdgeProp::TotalOrder);
}
StreamOrder::NoOrder => {
properties.insert(HydroEdgeProp::NoOrder);
}
}
}
pub fn is_network_edge(src_location: &LocationId, dst_location: &LocationId) -> bool {
src_location.root() != dst_location.root()
}
pub fn add_network_edge_tag(
properties: &mut HashSet<HydroEdgeProp>,
src_location: &LocationId,
dst_location: &LocationId,
) {
if is_network_edge(src_location, dst_location) {
properties.insert(HydroEdgeProp::Network);
}
}
#[derive(Debug, Clone, Copy)]
pub struct HydroWriteConfig<'a> {
pub show_metadata: bool,
pub show_location_groups: bool,
pub use_short_labels: bool,
pub location_names: &'a SecondaryMap<LocationKey, String>,
}
impl Default for HydroWriteConfig<'_> {
fn default() -> Self {
static EMPTY: OnceLock<SecondaryMap<LocationKey, String>> = OnceLock::new();
Self {
show_metadata: false,
show_location_groups: true,
use_short_labels: true, location_names: EMPTY.get_or_init(SecondaryMap::new),
}
}
}
#[derive(Clone)]
pub struct HydroGraphNode {
pub label: NodeLabel,
pub node_type: HydroNodeType,
pub location_key: Option<LocationKey>,
pub backtrace: Option<Backtrace>,
}
slotmap::new_key_type! {
pub struct VizNodeKey;
}
impl Display for VizNodeKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "viz{:?}", self.data()) }
}
impl std::str::FromStr for VizNodeKey {
type Err = Option<ParseIntError>;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let nvn = s.strip_prefix("viz").ok_or(None)?;
let (idx, ver) = nvn.split_once("v").ok_or(None)?;
let idx: u64 = idx.parse()?;
let ver: u64 = ver.parse()?;
Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
}
}
impl VizNodeKey {
#[cfg(test)]
pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x0000008F00000001));
#[cfg(test)]
pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x0000008F00000002)); }
#[derive(Debug, Clone)]
pub struct HydroGraphEdge {
pub src: VizNodeKey,
pub dst: VizNodeKey,
pub edge_properties: HashSet<HydroEdgeProp>,
pub label: Option<String>,
}
#[derive(Default)]
pub struct HydroGraphStructure {
pub nodes: SlotMap<VizNodeKey, HydroGraphNode>,
pub edges: Vec<HydroGraphEdge>,
pub locations: SecondaryMap<LocationKey, LocationType>,
}
impl HydroGraphStructure {
pub fn new() -> Self {
Self::default()
}
pub fn add_node(
&mut self,
label: NodeLabel,
node_type: HydroNodeType,
location_key: Option<LocationKey>,
) -> VizNodeKey {
self.add_node_with_backtrace(label, node_type, location_key, None)
}
pub fn add_node_with_backtrace(
&mut self,
label: NodeLabel,
node_type: HydroNodeType,
location_key: Option<LocationKey>,
backtrace: Option<Backtrace>,
) -> VizNodeKey {
self.nodes.insert(HydroGraphNode {
label,
node_type,
location_key,
backtrace,
})
}
pub fn add_node_with_metadata(
&mut self,
label: NodeLabel,
node_type: HydroNodeType,
metadata: &HydroIrMetadata,
) -> VizNodeKey {
let location_key = Some(setup_location(self, metadata));
let backtrace = Some(metadata.op.backtrace.clone());
self.add_node_with_backtrace(label, node_type, location_key, backtrace)
}
pub fn add_edge(
&mut self,
src: VizNodeKey,
dst: VizNodeKey,
edge_properties: HashSet<HydroEdgeProp>,
label: Option<String>,
) {
self.edges.push(HydroGraphEdge {
src,
dst,
edge_properties,
label,
});
}
pub fn add_edge_single(
&mut self,
src: VizNodeKey,
dst: VizNodeKey,
edge_type: HydroEdgeProp,
label: Option<String>,
) {
let mut properties = HashSet::new();
properties.insert(edge_type);
self.edges.push(HydroGraphEdge {
src,
dst,
edge_properties: properties,
label,
});
}
pub fn add_location(&mut self, location_key: LocationKey, location_type: LocationType) {
self.locations.insert(location_key, location_type);
}
}
pub fn extract_op_name(full_label: String) -> String {
full_label
.split('(')
.next()
.unwrap_or("unknown")
.to_lowercase()
}
pub fn extract_short_label(full_label: &str) -> String {
if let Some(op_name) = full_label.split('(').next() {
let base_name = op_name.to_lowercase();
match base_name.as_str() {
"source" => {
if full_label.contains("Iter") {
"source_iter".to_owned()
} else if full_label.contains("Stream") {
"source_stream".to_owned()
} else if full_label.contains("ExternalNetwork") {
"external_network".to_owned()
} else if full_label.contains("Spin") {
"spin".to_owned()
} else {
"source".to_owned()
}
}
"network" => {
if full_label.contains("deser") {
"network(recv)".to_owned()
} else if full_label.contains("ser") {
"network(send)".to_owned()
} else {
"network".to_owned()
}
}
_ => base_name,
}
} else {
if full_label.len() > 20 {
format!("{}...", &full_label[..17])
} else {
full_label.to_owned()
}
}
}
fn setup_location(structure: &mut HydroGraphStructure, metadata: &HydroIrMetadata) -> LocationKey {
let root = metadata.location_id.root();
let location_key = root.key();
let location_type = root.location_type().unwrap();
structure.add_location(location_key, location_type);
location_key
}
fn add_edge_with_metadata(
structure: &mut HydroGraphStructure,
src_id: VizNodeKey,
dst_id: VizNodeKey,
src_metadata: Option<&HydroIrMetadata>,
dst_metadata: Option<&HydroIrMetadata>,
label: Option<String>,
) {
let mut properties = HashSet::new();
if let Some(metadata) = src_metadata {
properties.extend(extract_edge_properties_from_collection_kind(
&metadata.collection_kind,
));
}
if let (Some(src_meta), Some(dst_meta)) = (src_metadata, dst_metadata) {
add_network_edge_tag(
&mut properties,
&src_meta.location_id,
&dst_meta.location_id,
);
}
if properties.is_empty() {
properties.insert(HydroEdgeProp::Stream);
}
structure.add_edge(src_id, dst_id, properties, label);
}
fn write_graph_structure<W>(
structure: &HydroGraphStructure,
graph_write: W,
config: HydroWriteConfig<'_>,
) -> Result<(), W::Err>
where
W: HydroGraphWrite,
{
let mut graph_write = graph_write;
graph_write.write_prologue()?;
for (node_id, node) in structure.nodes.iter() {
let location_type = node
.location_key
.and_then(|loc_key| structure.locations.get(loc_key))
.copied();
graph_write.write_node_definition(
node_id,
&node.label,
node.node_type,
node.location_key,
location_type,
node.backtrace.as_ref(),
)?;
}
if config.show_location_groups {
let mut nodes_by_location = SecondaryMap::<LocationKey, Vec<VizNodeKey>>::new();
for (node_id, node) in structure.nodes.iter() {
if let Some(location_key) = node.location_key {
nodes_by_location
.entry(location_key)
.expect("location was removed")
.or_default()
.push(node_id);
}
}
for (location_key, node_ids) in nodes_by_location.iter() {
if let Some(&location_type) = structure.locations.get(location_key) {
graph_write.write_location_start(location_key, location_type)?;
for &node_id in node_ids.iter() {
graph_write.write_node(node_id)?;
}
graph_write.write_location_end()?;
}
}
}
for edge in structure.edges.iter() {
graph_write.write_edge(
edge.src,
edge.dst,
&edge.edge_properties,
edge.label.as_deref(),
)?;
}
graph_write.write_epilogue()?;
Ok(())
}
impl HydroRoot {
pub fn build_graph_structure(
&self,
structure: &mut HydroGraphStructure,
seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
config: HydroWriteConfig<'_>,
) -> VizNodeKey {
fn build_sink_node(
structure: &mut HydroGraphStructure,
seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
config: HydroWriteConfig<'_>,
input: &HydroNode,
sink_metadata: Option<&HydroIrMetadata>,
label: NodeLabel,
) -> VizNodeKey {
let input_id = input.build_graph_structure(structure, seen_tees, config);
let effective_metadata = if let Some(meta) = sink_metadata {
Some(meta)
} else {
match input {
HydroNode::Placeholder => None,
_ => Some(input.metadata()),
}
};
let location_key = effective_metadata.map(|m| setup_location(structure, m));
let sink_id = structure.add_node_with_backtrace(
label,
HydroNodeType::Sink,
location_key,
effective_metadata.map(|m| m.op.backtrace.clone()),
);
let input_metadata = input.metadata();
add_edge_with_metadata(
structure,
input_id,
sink_id,
Some(input_metadata),
sink_metadata,
None,
);
sink_id
}
match self {
HydroRoot::ForEach { f, input, .. } => build_sink_node(
structure,
seen_tees,
config,
input,
None,
NodeLabel::with_exprs("for_each".to_owned(), vec![f.clone()]),
),
HydroRoot::SendExternal {
to_external_key,
to_port_id,
input,
..
} => build_sink_node(
structure,
seen_tees,
config,
input,
None,
NodeLabel::with_exprs(
format!("send_external({}:{})", to_external_key, to_port_id),
vec![],
),
),
HydroRoot::DestSink { sink, input, .. } => build_sink_node(
structure,
seen_tees,
config,
input,
None,
NodeLabel::with_exprs("dest_sink".to_owned(), vec![sink.clone()]),
),
HydroRoot::CycleSink {
cycle_id, input, ..
} => build_sink_node(
structure,
seen_tees,
config,
input,
None,
NodeLabel::static_label(format!("cycle_sink({})", cycle_id)),
),
HydroRoot::EmbeddedOutput { ident, input, .. } => build_sink_node(
structure,
seen_tees,
config,
input,
None,
NodeLabel::static_label(format!("embedded_output({})", ident)),
),
HydroRoot::Null { input, .. } => build_sink_node(
structure,
seen_tees,
config,
input,
None,
NodeLabel::static_label("null".to_owned()),
),
}
}
}
impl HydroNode {
pub fn build_graph_structure(
&self,
structure: &mut HydroGraphStructure,
seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
config: HydroWriteConfig<'_>,
) -> VizNodeKey {
struct TransformParams<'a> {
structure: &'a mut HydroGraphStructure,
seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
config: HydroWriteConfig<'a>,
input: &'a HydroNode,
metadata: &'a HydroIrMetadata,
op_name: String,
node_type: HydroNodeType,
}
fn build_simple_transform(params: TransformParams) -> VizNodeKey {
let input_id = params.input.build_graph_structure(
params.structure,
params.seen_tees,
params.config,
);
let node_id = params.structure.add_node_with_metadata(
NodeLabel::Static(params.op_name.to_string()),
params.node_type,
params.metadata,
);
let input_metadata = params.input.metadata();
add_edge_with_metadata(
params.structure,
input_id,
node_id,
Some(input_metadata),
Some(params.metadata),
None,
);
node_id
}
fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> VizNodeKey {
let input_id = params.input.build_graph_structure(
params.structure,
params.seen_tees,
params.config,
);
let node_id = params.structure.add_node_with_metadata(
NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
params.node_type,
params.metadata,
);
let input_metadata = params.input.metadata();
add_edge_with_metadata(
params.structure,
input_id,
node_id,
Some(input_metadata),
Some(params.metadata),
None,
);
node_id
}
fn build_dual_expr_transform(
params: TransformParams,
expr1: &DebugExpr,
expr2: &DebugExpr,
) -> VizNodeKey {
let input_id = params.input.build_graph_structure(
params.structure,
params.seen_tees,
params.config,
);
let node_id = params.structure.add_node_with_metadata(
NodeLabel::with_exprs(
params.op_name.to_string(),
vec![expr1.clone(), expr2.clone()],
),
params.node_type,
params.metadata,
);
let input_metadata = params.input.metadata();
add_edge_with_metadata(
params.structure,
input_id,
node_id,
Some(input_metadata),
Some(params.metadata),
None,
);
node_id
}
fn build_source_node(
structure: &mut HydroGraphStructure,
metadata: &HydroIrMetadata,
label: String,
) -> VizNodeKey {
structure.add_node_with_metadata(
NodeLabel::Static(label),
HydroNodeType::Source,
metadata,
)
}
match self {
HydroNode::Placeholder => structure.add_node(
NodeLabel::Static("PLACEHOLDER".to_owned()),
HydroNodeType::Transform,
None,
),
HydroNode::Source {
source, metadata, ..
} => {
let label = match source {
HydroSource::Stream(expr) => format!("source_stream({})", expr),
HydroSource::ExternalNetwork() => "external_network()".to_owned(),
HydroSource::Iter(expr) => format!("source_iter({})", expr),
HydroSource::Spin() => "spin()".to_owned(),
HydroSource::ClusterMembers(location_id, _) => {
format!(
"source_stream(cluster_membership_stream({:?}))",
location_id
)
}
HydroSource::Embedded(ident) => {
format!("embedded_input({})", ident)
}
HydroSource::EmbeddedSingleton(ident) => {
format!("embedded_singleton_input({})", ident)
}
};
build_source_node(structure, metadata, label)
}
HydroNode::SingletonSource {
value,
first_tick_only,
metadata,
} => {
let label = if *first_tick_only {
format!("singleton_first_tick({})", value)
} else {
format!("singleton({})", value)
};
build_source_node(structure, metadata, label)
}
HydroNode::ExternalInput {
from_external_key,
from_port_id,
metadata,
..
} => build_source_node(
structure,
metadata,
format!("external_input({}:{})", from_external_key, from_port_id),
),
HydroNode::CycleSource {
cycle_id, metadata, ..
} => build_source_node(structure, metadata, format!("cycle_source({})", cycle_id)),
HydroNode::Tee { inner, metadata } => {
let ptr = inner.as_ptr();
if let Some(&existing_id) = seen_tees.get(&ptr) {
return existing_id;
}
let input_id = inner
.0
.borrow()
.build_graph_structure(structure, seen_tees, config);
let tee_id = structure.add_node_with_metadata(
NodeLabel::Static(extract_op_name(self.print_root())),
HydroNodeType::Tee,
metadata,
);
seen_tees.insert(ptr, tee_id);
let inner_borrow = inner.0.borrow();
let input_metadata = inner_borrow.metadata();
add_edge_with_metadata(
structure,
input_id,
tee_id,
Some(input_metadata),
Some(metadata),
None,
);
drop(inner_borrow);
tee_id
}
HydroNode::Partition {
inner, metadata, ..
} => {
let ptr = inner.as_ptr();
if let Some(&existing_id) = seen_tees.get(&ptr) {
return existing_id;
}
let input_id = inner
.0
.borrow()
.build_graph_structure(structure, seen_tees, config);
let partition_id = structure.add_node_with_metadata(
NodeLabel::Static(extract_op_name(self.print_root())),
HydroNodeType::Tee,
metadata,
);
seen_tees.insert(ptr, partition_id);
let inner_borrow = inner.0.borrow();
let input_metadata = inner_borrow.metadata();
add_edge_with_metadata(
structure,
input_id,
partition_id,
Some(input_metadata),
Some(metadata),
None,
);
drop(inner_borrow);
partition_id
}
HydroNode::ObserveNonDet {
inner, metadata, ..
} => build_simple_transform(TransformParams {
structure,
seen_tees,
config,
input: inner,
metadata,
op_name: extract_op_name(self.print_root()),
node_type: HydroNodeType::NonDeterministic,
}),
HydroNode::Cast { inner, metadata }
| HydroNode::DeferTick {
input: inner,
metadata,
}
| HydroNode::Enumerate {
input: inner,
metadata,
..
}
| HydroNode::Unique {
input: inner,
metadata,
}
| HydroNode::ResolveFutures {
input: inner,
metadata,
}
| HydroNode::ResolveFuturesBlocking {
input: inner,
metadata,
}
| HydroNode::ResolveFuturesOrdered {
input: inner,
metadata,
} => build_simple_transform(TransformParams {
structure,
seen_tees,
config,
input: inner,
metadata,
op_name: extract_op_name(self.print_root()),
node_type: HydroNodeType::Transform,
}),
HydroNode::Sort {
input: inner,
metadata,
} => build_simple_transform(TransformParams {
structure,
seen_tees,
config,
input: inner,
metadata,
op_name: extract_op_name(self.print_root()),
node_type: HydroNodeType::Aggregation,
}),
HydroNode::Map { f, input, metadata }
| HydroNode::Filter { f, input, metadata }
| HydroNode::FlatMap { f, input, metadata }
| HydroNode::FlatMapStreamBlocking { f, input, metadata }
| HydroNode::FilterMap { f, input, metadata }
| HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
TransformParams {
structure,
seen_tees,
config,
input,
metadata,
op_name: extract_op_name(self.print_root()),
node_type: HydroNodeType::Transform,
},
f,
),
HydroNode::Reduce { f, input, metadata }
| HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
TransformParams {
structure,
seen_tees,
config,
input,
metadata,
op_name: extract_op_name(self.print_root()),
node_type: HydroNodeType::Aggregation,
},
f,
),
HydroNode::Join {
left,
right,
metadata,
}
| HydroNode::JoinHalf {
left,
right,
metadata,
}
| HydroNode::CrossProduct {
left,
right,
metadata,
}
| HydroNode::CrossSingleton {
left,
right,
metadata,
} => {
let left_id = left.build_graph_structure(structure, seen_tees, config);
let right_id = right.build_graph_structure(structure, seen_tees, config);
let node_id = structure.add_node_with_metadata(
NodeLabel::Static(extract_op_name(self.print_root())),
HydroNodeType::Join,
metadata,
);
let left_metadata = left.metadata();
add_edge_with_metadata(
structure,
left_id,
node_id,
Some(left_metadata),
Some(metadata),
Some("left".to_owned()),
);
let right_metadata = right.metadata();
add_edge_with_metadata(
structure,
right_id,
node_id,
Some(right_metadata),
Some(metadata),
Some("right".to_owned()),
);
node_id
}
HydroNode::Difference {
pos: left,
neg: right,
metadata,
}
| HydroNode::AntiJoin {
pos: left,
neg: right,
metadata,
} => {
let left_id = left.build_graph_structure(structure, seen_tees, config);
let right_id = right.build_graph_structure(structure, seen_tees, config);
let node_id = structure.add_node_with_metadata(
NodeLabel::Static(extract_op_name(self.print_root())),
HydroNodeType::Join,
metadata,
);
let left_metadata = left.metadata();
add_edge_with_metadata(
structure,
left_id,
node_id,
Some(left_metadata),
Some(metadata),
Some("pos".to_owned()),
);
let right_metadata = right.metadata();
add_edge_with_metadata(
structure,
right_id,
node_id,
Some(right_metadata),
Some(metadata),
Some("neg".to_owned()),
);
node_id
}
HydroNode::Fold {
init,
acc,
input,
metadata,
}
| HydroNode::FoldKeyed {
init,
acc,
input,
metadata,
}
| HydroNode::Scan {
init,
acc,
input,
metadata,
}
| HydroNode::ScanAsyncBlocking {
init,
acc,
input,
metadata,
} => {
let node_type = HydroNodeType::Aggregation;
build_dual_expr_transform(
TransformParams {
structure,
seen_tees,
config,
input,
metadata,
op_name: extract_op_name(self.print_root()),
node_type,
},
init,
acc,
)
}
HydroNode::ReduceKeyedWatermark {
f,
input,
watermark,
metadata,
} => {
let input_id = input.build_graph_structure(structure, seen_tees, config);
let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
let location_key = Some(setup_location(structure, metadata));
let join_node_id = structure.add_node_with_backtrace(
NodeLabel::Static(extract_op_name(self.print_root())),
HydroNodeType::Join,
location_key,
Some(metadata.op.backtrace.clone()),
);
let input_metadata = input.metadata();
add_edge_with_metadata(
structure,
input_id,
join_node_id,
Some(input_metadata),
Some(metadata),
Some("input".to_owned()),
);
let watermark_metadata = watermark.metadata();
add_edge_with_metadata(
structure,
watermark_id,
join_node_id,
Some(watermark_metadata),
Some(metadata),
Some("watermark".to_owned()),
);
let node_id = structure.add_node_with_backtrace(
NodeLabel::with_exprs(extract_op_name(self.print_root()), vec![f.clone()]),
HydroNodeType::Aggregation,
location_key,
Some(metadata.op.backtrace.clone()),
);
let join_metadata = metadata; add_edge_with_metadata(
structure,
join_node_id,
node_id,
Some(join_metadata),
Some(metadata),
None,
);
node_id
}
HydroNode::Network {
serialize_fn,
deserialize_fn,
input,
metadata,
..
} => {
let input_id = input.build_graph_structure(structure, seen_tees, config);
let _from_location_key = setup_location(structure, metadata);
let root = metadata.location_id.root();
let to_location_key = root.key();
let to_location_type = root.location_type().unwrap();
structure.add_location(to_location_key, to_location_type);
let mut label = "network(".to_owned();
if serialize_fn.is_some() {
label.push_str("send");
}
if deserialize_fn.is_some() {
if serialize_fn.is_some() {
label.push_str(" + ");
}
label.push_str("recv");
}
label.push(')');
let network_id = structure.add_node_with_backtrace(
NodeLabel::Static(label),
HydroNodeType::Network,
Some(to_location_key),
Some(metadata.op.backtrace.clone()),
);
let input_metadata = input.metadata();
add_edge_with_metadata(
structure,
input_id,
network_id,
Some(input_metadata),
Some(metadata),
Some(format!("to {:?}({})", to_location_type, to_location_key)),
);
network_id
}
HydroNode::Batch { inner, metadata } => build_simple_transform(TransformParams {
structure,
seen_tees,
config,
input: inner,
metadata,
op_name: extract_op_name(self.print_root()),
node_type: HydroNodeType::NonDeterministic,
}),
HydroNode::YieldConcat { inner, .. } => {
inner.build_graph_structure(structure, seen_tees, config)
}
HydroNode::BeginAtomic { inner, .. } => {
inner.build_graph_structure(structure, seen_tees, config)
}
HydroNode::EndAtomic { inner, .. } => {
inner.build_graph_structure(structure, seen_tees, config)
}
HydroNode::Chain {
first,
second,
metadata,
} => {
let first_id = first.build_graph_structure(structure, seen_tees, config);
let second_id = second.build_graph_structure(structure, seen_tees, config);
let location_key = Some(setup_location(structure, metadata));
let chain_id = structure.add_node_with_backtrace(
NodeLabel::Static(extract_op_name(self.print_root())),
HydroNodeType::Transform,
location_key,
Some(metadata.op.backtrace.clone()),
);
let first_metadata = first.metadata();
add_edge_with_metadata(
structure,
first_id,
chain_id,
Some(first_metadata),
Some(metadata),
Some("first".to_owned()),
);
let second_metadata = second.metadata();
add_edge_with_metadata(
structure,
second_id,
chain_id,
Some(second_metadata),
Some(metadata),
Some("second".to_owned()),
);
chain_id
}
HydroNode::ChainFirst {
first,
second,
metadata,
} => {
let first_id = first.build_graph_structure(structure, seen_tees, config);
let second_id = second.build_graph_structure(structure, seen_tees, config);
let location_key = Some(setup_location(structure, metadata));
let chain_id = structure.add_node_with_backtrace(
NodeLabel::Static(extract_op_name(self.print_root())),
HydroNodeType::Transform,
location_key,
Some(metadata.op.backtrace.clone()),
);
let first_metadata = first.metadata();
add_edge_with_metadata(
structure,
first_id,
chain_id,
Some(first_metadata),
Some(metadata),
Some("first".to_owned()),
);
let second_metadata = second.metadata();
add_edge_with_metadata(
structure,
second_id,
chain_id,
Some(second_metadata),
Some(metadata),
Some("second".to_owned()),
);
chain_id
}
HydroNode::Counter {
tag: _,
prefix: _,
duration,
input,
metadata,
} => build_single_expr_transform(
TransformParams {
structure,
seen_tees,
config,
input,
metadata,
op_name: extract_op_name(self.print_root()),
node_type: HydroNodeType::Transform,
},
duration,
),
}
}
}
macro_rules! render_hydro_ir {
($name:ident, $write_fn:ident) => {
pub fn $name(roots: &[HydroRoot], config: HydroWriteConfig<'_>) -> String {
let mut output = String::new();
$write_fn(&mut output, roots, config).unwrap();
output
}
};
}
macro_rules! write_hydro_ir {
($name:ident, $writer_type:ty, $constructor:expr) => {
pub fn $name(
output: impl std::fmt::Write,
roots: &[HydroRoot],
config: HydroWriteConfig<'_>,
) -> std::fmt::Result {
let mut graph_write: $writer_type = $constructor(output, config);
write_hydro_ir_graph(&mut graph_write, roots, config)
}
};
}
render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
write_hydro_ir!(
write_hydro_ir_mermaid,
HydroMermaid<_>,
HydroMermaid::new_with_config
);
render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
render_hydro_ir!(render_hydro_ir_hydroscope, write_hydro_ir_json);
render_hydro_ir!(render_hydro_ir_json, write_hydro_ir_json);
write_hydro_ir!(write_hydro_ir_json, HydroJson<_>, HydroJson::new);
fn write_hydro_ir_graph<W>(
graph_write: W,
roots: &[HydroRoot],
config: HydroWriteConfig<'_>,
) -> Result<(), W::Err>
where
W: HydroGraphWrite,
{
let mut structure = HydroGraphStructure::new();
let mut seen_tees = HashMap::new();
for leaf in roots {
leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
}
write_graph_structure(&structure, graph_write, config)
}