Skip to main content

scouter_dataframe/parquet/bifrost/
explain.rs

1use crate::parquet::bifrost::query::QueryExecutionMetadata;
2use datafusion::logical_expr::LogicalPlan;
3use datafusion::physical_plan::displayable;
4use datafusion::physical_plan::ExecutionPlan;
5
6/// Strip object-store URIs and absolute file paths from DataFusion plan text.
7///
8/// DataFusion's `displayable(plan).indent(true)` and `display_indent()` embed
9/// real storage paths (S3 URIs, GCS bucket prefixes, local absolute paths).
10/// This function replaces them with `<storage-path>` before the text is returned
11/// to API callers, preventing storage topology leaks.
12pub fn sanitize_plan_text(text: &str) -> String {
13    let mut result = String::with_capacity(text.len());
14    for token in text.split_inclusive(|c: char| {
15        c.is_whitespace() || c == ',' || c == '[' || c == ']' || c == '(' || c == ')'
16    }) {
17        // Trim trailing delimiters to check just the token body
18        let trimmed = token.trim_end_matches(|c: char| {
19            c.is_whitespace() || c == ',' || c == '[' || c == ']' || c == '(' || c == ')'
20        });
21        let suffix = &token[trimmed.len()..];
22
23        let is_storage_uri = trimmed.starts_with("s3://")
24            || trimmed.starts_with("gs://")
25            || trimmed.starts_with("az://")
26            || trimmed.starts_with("abfss://")
27            || trimmed.starts_with("file://")
28            || (trimmed.starts_with('/')
29                && (trimmed.ends_with(".parquet") || trimmed.contains("/datasets/")));
30
31        if is_storage_uri && !trimmed.is_empty() {
32            result.push_str("<storage-path>");
33            result.push_str(suffix);
34        } else {
35            result.push_str(token);
36        }
37    }
38    result
39}
40
41/// Structured representation of a query plan node for UI rendering.
42#[derive(Debug, Clone, serde::Serialize)]
43pub struct PlanNode {
44    pub node_type: String,
45    pub description: String,
46    pub fields: Vec<PlanNodeField>,
47    pub children: Vec<PlanNode>,
48    pub metrics: Option<PlanNodeMetrics>,
49}
50
51#[derive(Debug, Clone, serde::Serialize)]
52pub struct PlanNodeField {
53    pub key: String,
54    pub value: String,
55}
56
57#[derive(Debug, Clone, serde::Serialize)]
58pub struct PlanNodeMetrics {
59    pub output_rows: Option<u64>,
60    pub elapsed_ms: Option<f64>,
61    pub bytes_scanned: Option<u64>,
62    pub spill_bytes: Option<u64>,
63}
64
65/// Result of an EXPLAIN query.
66pub struct ExplainResult {
67    pub logical_plan: PlanNode,
68    pub physical_plan: PlanNode,
69    pub logical_plan_text: String,
70    pub physical_plan_text: String,
71    pub execution_metadata: Option<QueryExecutionMetadata>,
72}
73
74/// Recursively convert a DataFusion LogicalPlan into a PlanNode tree.
75pub fn logical_plan_to_tree(plan: &LogicalPlan) -> PlanNode {
76    let node_type = format!("{:?}", plan)
77        .split('(')
78        .next()
79        .unwrap_or("Unknown")
80        .to_string();
81    let description = plan.display().to_string();
82
83    let children: Vec<PlanNode> = plan
84        .inputs()
85        .iter()
86        .map(|p| logical_plan_to_tree(p))
87        .collect();
88
89    PlanNode {
90        node_type,
91        description,
92        fields: vec![],
93        children,
94        metrics: None,
95    }
96}
97
98/// Recursively convert a DataFusion ExecutionPlan into a PlanNode tree.
99pub fn physical_plan_to_tree(plan: &dyn ExecutionPlan) -> PlanNode {
100    let node_type = plan.name().to_string();
101    let description = displayable(plan).one_line().to_string();
102
103    let children: Vec<PlanNode> = plan
104        .children()
105        .iter()
106        .map(|p| physical_plan_to_tree(p.as_ref()))
107        .collect();
108
109    // Extract metrics if available (populated after ANALYZE execution)
110    let metrics = plan.metrics().map(|m| {
111        let output_rows = m.output_rows();
112        let elapsed = m.elapsed_compute();
113        PlanNodeMetrics {
114            output_rows: output_rows.map(|r| r as u64),
115            elapsed_ms: elapsed.map(|ns| ns as f64 / 1_000_000.0),
116            bytes_scanned: m.sum_by_name("bytes_scanned").map(|v| v.as_usize() as u64),
117            spill_bytes: m.sum_by_name("spilled_bytes").map(|v| v.as_usize() as u64),
118        }
119    });
120
121    PlanNode {
122        node_type,
123        description,
124        fields: vec![],
125        children,
126        metrics,
127    }
128}