laminar_sql/translator/
dag_planner.rs1use std::fmt;
7use std::fmt::Write;
8
9use laminar_core::dag::{DagChannelType, StreamingDag};
10
11#[derive(Debug, Clone)]
16pub struct DagExplainOutput {
17 pub node_count: usize,
19 pub edge_count: usize,
21 pub sources: Vec<String>,
23 pub execution_order: Vec<String>,
25 pub shared_stages: Vec<(String, usize)>,
27 pub topology_text: String,
29}
30
31impl fmt::Display for DagExplainOutput {
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 write!(f, "{}", self.topology_text)
34 }
35}
36
37#[must_use]
53pub fn format_dag_explain(dag: &StreamingDag) -> DagExplainOutput {
54 let node_count = dag.node_count();
55 let edge_count = dag.edge_count();
56
57 let sources: Vec<String> = dag
59 .sources()
60 .iter()
61 .filter_map(|&id| dag.node_name(id))
62 .collect();
63
64 let execution_order: Vec<String> = dag
66 .execution_order()
67 .iter()
68 .filter_map(|&id| dag.node_name(id))
69 .collect();
70
71 let shared_stages: Vec<(String, usize)> = dag
73 .shared_stages()
74 .iter()
75 .filter_map(|(id, meta)| dag.node_name(*id).map(|name| (name, meta.consumer_count)))
76 .collect();
77
78 let mut text = String::new();
80 let _ = writeln!(
81 text,
82 "DAG Topology ({node_count} nodes, {edge_count} edges)"
83 );
84 text.push_str(&"=".repeat(40));
85 text.push('\n');
86
87 let _ = writeln!(text, "Sources: [{}]", sources.join(", "));
89
90 let _ = writeln!(text, "Execution order: {}", execution_order.join(" -> "));
92
93 if !shared_stages.is_empty() {
95 let shared_strs: Vec<String> = shared_stages
96 .iter()
97 .map(|(name, count)| format!("{name} ({count} consumers)"))
98 .collect();
99 let _ = writeln!(text, "Shared stages: {}", shared_strs.join(", "));
100 }
101
102 text.push_str("Channel types:\n");
104 for edge in dag.edges().values() {
105 let src_name = dag.node_name(edge.source).unwrap_or_default();
106 let tgt_name = dag.node_name(edge.target).unwrap_or_default();
107 let ch_type = match edge.channel_type {
108 DagChannelType::Spsc => "SPSC",
109 DagChannelType::Spmc => "SPMC",
110 DagChannelType::Mpsc => "MPSC",
111 };
112 let _ = writeln!(text, " {src_name} -> {tgt_name}: {ch_type}");
113 }
114
115 DagExplainOutput {
116 node_count,
117 edge_count,
118 sources,
119 execution_order,
120 shared_stages,
121 topology_text: text,
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use arrow_schema::{DataType, Field, Schema};
129 use laminar_core::dag::DagBuilder;
130 use std::sync::Arc;
131
132 fn int_schema() -> Arc<Schema> {
133 Arc::new(Schema::new(vec![Field::new(
134 "value",
135 DataType::Int64,
136 false,
137 )]))
138 }
139
140 #[test]
141 fn test_explain_dag_output() {
142 let schema = int_schema();
143 let dag = DagBuilder::new()
144 .source("trades", schema.clone())
145 .operator("ohlc_1s", schema.clone())
146 .connect("trades", "ohlc_1s")
147 .fan_out("ohlc_1s", |b| {
148 b.branch("vwap", schema.clone())
149 .branch("anomaly", schema.clone())
150 })
151 .sink_for("vwap", "sink_vwap", schema.clone())
152 .sink_for("anomaly", "sink_anomaly", schema.clone())
153 .build()
154 .unwrap();
155
156 let output = format_dag_explain(&dag);
157
158 assert_eq!(output.node_count, 6);
160 assert_eq!(output.edge_count, 5);
161 assert_eq!(output.sources, vec!["trades".to_string()]);
162 assert!(output.execution_order.contains(&"trades".to_string()));
163 assert!(output.execution_order.contains(&"ohlc_1s".to_string()));
164 assert!(output.execution_order.contains(&"vwap".to_string()));
165
166 assert!(!output.shared_stages.is_empty());
168 let shared = output
169 .shared_stages
170 .iter()
171 .find(|(name, _)| name == "ohlc_1s");
172 assert!(shared.is_some());
173 assert_eq!(shared.unwrap().1, 2);
174
175 assert!(output.topology_text.contains("DAG Topology"));
177 assert!(output.topology_text.contains("6 nodes"));
178 assert!(output.topology_text.contains("5 edges"));
179 assert!(output.topology_text.contains("Sources: [trades]"));
180 assert!(output.topology_text.contains("Channel types:"));
181 assert!(output.topology_text.contains("SPMC"));
182
183 let display = format!("{output}");
185 assert!(display.contains("DAG Topology"));
186 }
187
188 #[test]
189 fn test_explain_dag_linear() {
190 let schema = int_schema();
191 let dag = DagBuilder::new()
192 .source("src", schema.clone())
193 .operator("op", schema.clone())
194 .sink_for("op", "snk", schema.clone())
195 .connect("src", "op")
196 .build()
197 .unwrap();
198
199 let output = format_dag_explain(&dag);
200 assert_eq!(output.node_count, 3);
201 assert_eq!(output.edge_count, 2);
202 assert!(output.shared_stages.is_empty());
203 assert!(output.topology_text.contains("SPSC"));
204 }
205}