scouter_dataframe/parquet/bifrost/
explain.rs1use crate::parquet::bifrost::query::QueryExecutionMetadata;
2use datafusion::logical_expr::LogicalPlan;
3use datafusion::physical_plan::displayable;
4use datafusion::physical_plan::ExecutionPlan;
5
6pub fn sanitize_plan_text(text: &str) -> String {
13 let mut result = String::with_capacity(text.len());
14 for token in text.split_inclusive(|c: char| {
15 c.is_whitespace() || c == ',' || c == '[' || c == ']' || c == '(' || c == ')'
16 }) {
17 let trimmed = token.trim_end_matches(|c: char| {
19 c.is_whitespace() || c == ',' || c == '[' || c == ']' || c == '(' || c == ')'
20 });
21 let suffix = &token[trimmed.len()..];
22
23 let is_storage_uri = trimmed.starts_with("s3://")
24 || trimmed.starts_with("gs://")
25 || trimmed.starts_with("az://")
26 || trimmed.starts_with("abfss://")
27 || trimmed.starts_with("file://")
28 || (trimmed.starts_with('/')
29 && (trimmed.ends_with(".parquet") || trimmed.contains("/datasets/")));
30
31 if is_storage_uri && !trimmed.is_empty() {
32 result.push_str("<storage-path>");
33 result.push_str(suffix);
34 } else {
35 result.push_str(token);
36 }
37 }
38 result
39}
40
41#[derive(Debug, Clone, serde::Serialize)]
43pub struct PlanNode {
44 pub node_type: String,
45 pub description: String,
46 pub fields: Vec<PlanNodeField>,
47 pub children: Vec<PlanNode>,
48 pub metrics: Option<PlanNodeMetrics>,
49}
50
51#[derive(Debug, Clone, serde::Serialize)]
52pub struct PlanNodeField {
53 pub key: String,
54 pub value: String,
55}
56
57#[derive(Debug, Clone, serde::Serialize)]
58pub struct PlanNodeMetrics {
59 pub output_rows: Option<u64>,
60 pub elapsed_ms: Option<f64>,
61 pub bytes_scanned: Option<u64>,
62 pub spill_bytes: Option<u64>,
63}
64
65pub struct ExplainResult {
67 pub logical_plan: PlanNode,
68 pub physical_plan: PlanNode,
69 pub logical_plan_text: String,
70 pub physical_plan_text: String,
71 pub execution_metadata: Option<QueryExecutionMetadata>,
72}
73
74pub fn logical_plan_to_tree(plan: &LogicalPlan) -> PlanNode {
76 let node_type = format!("{:?}", plan)
77 .split('(')
78 .next()
79 .unwrap_or("Unknown")
80 .to_string();
81 let description = plan.display().to_string();
82
83 let children: Vec<PlanNode> = plan
84 .inputs()
85 .iter()
86 .map(|p| logical_plan_to_tree(p))
87 .collect();
88
89 PlanNode {
90 node_type,
91 description,
92 fields: vec![],
93 children,
94 metrics: None,
95 }
96}
97
98pub fn physical_plan_to_tree(plan: &dyn ExecutionPlan) -> PlanNode {
100 let node_type = plan.name().to_string();
101 let description = displayable(plan).one_line().to_string();
102
103 let children: Vec<PlanNode> = plan
104 .children()
105 .iter()
106 .map(|p| physical_plan_to_tree(p.as_ref()))
107 .collect();
108
109 let metrics = plan.metrics().map(|m| {
111 let output_rows = m.output_rows();
112 let elapsed = m.elapsed_compute();
113 PlanNodeMetrics {
114 output_rows: output_rows.map(|r| r as u64),
115 elapsed_ms: elapsed.map(|ns| ns as f64 / 1_000_000.0),
116 bytes_scanned: m.sum_by_name("bytes_scanned").map(|v| v.as_usize() as u64),
117 spill_bytes: m.sum_by_name("spilled_bytes").map(|v| v.as_usize() as u64),
118 }
119 });
120
121 PlanNode {
122 node_type,
123 description,
124 fields: vec![],
125 children,
126 metrics,
127 }
128}