datafusion_dft/execution/
local_benchmarks.rs1use std::time::Duration;
21
22#[derive(Debug)]
24pub struct DurationsSummary {
25 pub min: Duration,
26 pub max: Duration,
27 pub mean: Duration,
28 pub median: Duration,
29 pub percent_of_total: f64,
30}
31
32impl DurationsSummary {
33 pub fn to_csv_fields(&self) -> String {
34 format!(
35 "{},{},{},{},{:.2}",
36 self.min.as_millis(),
37 self.max.as_millis(),
38 self.mean.as_millis(),
39 self.median.as_millis(),
40 self.percent_of_total,
41 )
42 }
43}
44
45impl std::fmt::Display for DurationsSummary {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 writeln!(f, "Min: {:?}", self.min)?;
48 writeln!(f, "Max: {:?}", self.max)?;
49 writeln!(f, "Median: {:?}", self.median)?;
50 writeln!(f, "Mean: {:?} ({:.2}%)", self.mean, self.percent_of_total)
51 }
52}
53
54#[derive(Debug, Default)]
56pub struct LocalBenchmarkStats {
57 query: String,
58 runs: usize,
59 rows: Vec<usize>,
60 logical_planning_durations: Vec<Duration>,
61 physical_planning_durations: Vec<Duration>,
62 execution_durations: Vec<Duration>,
63 total_durations: Vec<Duration>,
64}
65
66impl LocalBenchmarkStats {
67 pub fn new(
68 query: String,
69 rows: Vec<usize>,
70 logical_planning_durations: Vec<Duration>,
71 physical_planning_durations: Vec<Duration>,
72 execution_durations: Vec<Duration>,
73 total_durations: Vec<Duration>,
74 ) -> Self {
75 let runs = logical_planning_durations.len();
76 Self {
77 query,
78 runs,
79 rows,
80 logical_planning_durations,
81 physical_planning_durations,
82 execution_durations,
83 total_durations,
84 }
85 }
86
87 fn summarize(&self, durations: &[Duration]) -> DurationsSummary {
88 if durations.is_empty() {
89 return DurationsSummary {
90 min: Duration::from_secs(0),
91 max: Duration::from_secs(0),
92 mean: Duration::from_secs(0),
93 median: Duration::from_secs(0),
94 percent_of_total: 0.0,
95 };
96 }
97 let mut sorted = durations.to_vec();
98 sorted.sort();
99 let len = sorted.len();
100 let min = *sorted.first().unwrap();
101 let max = *sorted.last().unwrap();
102 let mean = sorted.iter().sum::<Duration>() / len as u32;
103 let median = sorted[len / 2];
104 let this_total = durations.iter().map(|d| d.as_nanos()).sum::<u128>();
105 let total_duration = self
106 .total_durations
107 .iter()
108 .map(|d| d.as_nanos())
109 .sum::<u128>();
110 let percent_of_total = (this_total as f64 / total_duration as f64) * 100.0;
111 DurationsSummary {
112 min,
113 max,
114 mean,
115 median,
116 percent_of_total,
117 }
118 }
119
120 pub fn to_summary_csv_row(&self) -> String {
121 let mut csv = String::new();
122 let logical_planning_summary = self.summarize(&self.logical_planning_durations);
123 let physical_planning_summary = self.summarize(&self.physical_planning_durations);
124 let execution_summary = self.summarize(&self.execution_durations);
125 let total_summary = self.summarize(&self.total_durations);
126
127 csv.push_str(&self.query);
128 csv.push(',');
129 csv.push_str(&self.runs.to_string());
130 csv.push(',');
131 csv.push_str(logical_planning_summary.to_csv_fields().as_str());
132 csv.push(',');
133 csv.push_str(physical_planning_summary.to_csv_fields().as_str());
134 csv.push(',');
135 csv.push_str(execution_summary.to_csv_fields().as_str());
136 csv.push(',');
137 csv.push_str(total_summary.to_csv_fields().as_str());
138 csv
139 }
140}
141
142pub fn is_all_same(arr: &[usize]) -> bool {
143 arr.iter().min() == arr.iter().max()
144}
145
146impl std::fmt::Display for LocalBenchmarkStats {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 writeln!(f)?;
149 writeln!(f, "----------------------------")?;
150 writeln!(f, "Benchmark Stats ({} runs)", self.runs)?;
151 writeln!(f, "----------------------------")?;
152 writeln!(f, "{}", self.query)?;
153 writeln!(f, "----------------------------")?;
154 if is_all_same(&self.rows) {
155 writeln!(f, "Row counts match across runs")?;
156 } else {
157 writeln!(f, "\x1b[31mRow counts differ across runs\x1b[0m")?;
158 }
159 writeln!(f, "----------------------------")?;
160 writeln!(f)?;
161
162 let logical_planning_summary = self.summarize(&self.logical_planning_durations);
163 writeln!(f, "Logical Planning")?;
164 writeln!(f, "{}", logical_planning_summary)?;
165
166 let physical_planning_summary = self.summarize(&self.physical_planning_durations);
167 writeln!(f, "Physical Planning")?;
168 writeln!(f, "{}", physical_planning_summary)?;
169
170 let execution_summary = self.summarize(&self.execution_durations);
171 writeln!(f, "Execution")?;
172 writeln!(f, "{}", execution_summary)?;
173
174 let total_summary = self.summarize(&self.total_durations);
175 writeln!(f, "Total")?;
176 writeln!(f, "{}", total_summary)
177 }
178}