Skip to main content

laminar_sql/translator/
dag_planner.rs

1//! DAG EXPLAIN formatter.
2//!
3//! Converts a `StreamingDag` into a human-readable topology description
4//! suitable for `EXPLAIN DAG` output.
5
6use std::fmt;
7use std::fmt::Write;
8
9use laminar_core::dag::{DagChannelType, StreamingDag};
10
11/// Structured output from `EXPLAIN DAG`.
12///
13/// Contains both human-readable text and structured data for programmatic
14/// consumption.
15#[derive(Debug, Clone)]
16pub struct DagExplainOutput {
17    /// Number of nodes in the DAG.
18    pub node_count: usize,
19    /// Number of edges in the DAG.
20    pub edge_count: usize,
21    /// Source node names.
22    pub sources: Vec<String>,
23    /// Execution order as node names.
24    pub execution_order: Vec<String>,
25    /// Shared stages (nodes with fan-out > 1) and their consumer counts.
26    pub shared_stages: Vec<(String, usize)>,
27    /// Formatted topology text for display.
28    pub topology_text: String,
29}
30
31impl fmt::Display for DagExplainOutput {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        write!(f, "{}", self.topology_text)
34    }
35}
36
37/// Formats a DAG topology into an `EXPLAIN DAG` output.
38///
39/// # Example output
40///
41/// ```text
42/// DAG Topology (5 nodes, 4 edges)
43/// ================================
44/// Sources: [trades]
45/// Execution order: trades -> ohlc_1s -> ohlc_1m -> vwap, anomaly
46/// Shared stages: ohlc_1s (2 consumers)
47/// Channel types:
48///   trades -> ohlc_1s: SPSC
49///   ohlc_1s -> vwap: SPMC
50///   ohlc_1s -> anomaly: SPMC
51/// ```
52#[must_use]
53pub fn format_dag_explain(dag: &StreamingDag) -> DagExplainOutput {
54    let node_count = dag.node_count();
55    let edge_count = dag.edge_count();
56
57    // Source names
58    let sources: Vec<String> = dag
59        .sources()
60        .iter()
61        .filter_map(|&id| dag.node_name(id))
62        .collect();
63
64    // Execution order names
65    let execution_order: Vec<String> = dag
66        .execution_order()
67        .iter()
68        .filter_map(|&id| dag.node_name(id))
69        .collect();
70
71    // Shared stages
72    let shared_stages: Vec<(String, usize)> = dag
73        .shared_stages()
74        .iter()
75        .filter_map(|(id, meta)| dag.node_name(*id).map(|name| (name, meta.consumer_count)))
76        .collect();
77
78    // Build text output
79    let mut text = String::new();
80    let _ = writeln!(
81        text,
82        "DAG Topology ({node_count} nodes, {edge_count} edges)"
83    );
84    text.push_str(&"=".repeat(40));
85    text.push('\n');
86
87    // Sources line
88    let _ = writeln!(text, "Sources: [{}]", sources.join(", "));
89
90    // Execution order line
91    let _ = writeln!(text, "Execution order: {}", execution_order.join(" -> "));
92
93    // Shared stages
94    if !shared_stages.is_empty() {
95        let shared_strs: Vec<String> = shared_stages
96            .iter()
97            .map(|(name, count)| format!("{name} ({count} consumers)"))
98            .collect();
99        let _ = writeln!(text, "Shared stages: {}", shared_strs.join(", "));
100    }
101
102    // Channel types
103    text.push_str("Channel types:\n");
104    for edge in dag.edges().values() {
105        let src_name = dag.node_name(edge.source).unwrap_or_default();
106        let tgt_name = dag.node_name(edge.target).unwrap_or_default();
107        let ch_type = match edge.channel_type {
108            DagChannelType::Spsc => "SPSC",
109            DagChannelType::Spmc => "SPMC",
110            DagChannelType::Mpsc => "MPSC",
111        };
112        let _ = writeln!(text, "  {src_name} -> {tgt_name}: {ch_type}");
113    }
114
115    DagExplainOutput {
116        node_count,
117        edge_count,
118        sources,
119        execution_order,
120        shared_stages,
121        topology_text: text,
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use arrow_schema::{DataType, Field, Schema};
129    use laminar_core::dag::DagBuilder;
130    use std::sync::Arc;
131
132    fn int_schema() -> Arc<Schema> {
133        Arc::new(Schema::new(vec![Field::new(
134            "value",
135            DataType::Int64,
136            false,
137        )]))
138    }
139
140    #[test]
141    fn test_explain_dag_output() {
142        let schema = int_schema();
143        let dag = DagBuilder::new()
144            .source("trades", schema.clone())
145            .operator("ohlc_1s", schema.clone())
146            .connect("trades", "ohlc_1s")
147            .fan_out("ohlc_1s", |b| {
148                b.branch("vwap", schema.clone())
149                    .branch("anomaly", schema.clone())
150            })
151            .sink_for("vwap", "sink_vwap", schema.clone())
152            .sink_for("anomaly", "sink_anomaly", schema.clone())
153            .build()
154            .unwrap();
155
156        let output = format_dag_explain(&dag);
157
158        // Structured data
159        assert_eq!(output.node_count, 6);
160        assert_eq!(output.edge_count, 5);
161        assert_eq!(output.sources, vec!["trades".to_string()]);
162        assert!(output.execution_order.contains(&"trades".to_string()));
163        assert!(output.execution_order.contains(&"ohlc_1s".to_string()));
164        assert!(output.execution_order.contains(&"vwap".to_string()));
165
166        // Shared stage
167        assert!(!output.shared_stages.is_empty());
168        let shared = output
169            .shared_stages
170            .iter()
171            .find(|(name, _)| name == "ohlc_1s");
172        assert!(shared.is_some());
173        assert_eq!(shared.unwrap().1, 2);
174
175        // Text output
176        assert!(output.topology_text.contains("DAG Topology"));
177        assert!(output.topology_text.contains("6 nodes"));
178        assert!(output.topology_text.contains("5 edges"));
179        assert!(output.topology_text.contains("Sources: [trades]"));
180        assert!(output.topology_text.contains("Channel types:"));
181        assert!(output.topology_text.contains("SPMC"));
182
183        // Display impl
184        let display = format!("{output}");
185        assert!(display.contains("DAG Topology"));
186    }
187
188    #[test]
189    fn test_explain_dag_linear() {
190        let schema = int_schema();
191        let dag = DagBuilder::new()
192            .source("src", schema.clone())
193            .operator("op", schema.clone())
194            .sink_for("op", "snk", schema.clone())
195            .connect("src", "op")
196            .build()
197            .unwrap();
198
199        let output = format_dag_explain(&dag);
200        assert_eq!(output.node_count, 3);
201        assert_eq!(output.edge_count, 2);
202        assert!(output.shared_stages.is_empty());
203        assert!(output.topology_text.contains("SPSC"));
204    }
205}