datafusion_dft/execution/
local_benchmarks.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ExecutionContext`]: DataFusion based execution context for running SQL queries
19
20use std::time::Duration;
21
22/// Duration summary statistics
23#[derive(Debug)]
24pub struct DurationsSummary {
25    pub min: Duration,
26    pub max: Duration,
27    pub mean: Duration,
28    pub median: Duration,
29    pub percent_of_total: f64,
30}
31
32impl DurationsSummary {
33    pub fn to_csv_fields(&self) -> String {
34        format!(
35            "{},{},{},{},{:.2}",
36            self.min.as_millis(),
37            self.max.as_millis(),
38            self.mean.as_millis(),
39            self.median.as_millis(),
40            self.percent_of_total,
41        )
42    }
43}
44
45impl std::fmt::Display for DurationsSummary {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        writeln!(f, "Min: {:?}", self.min)?;
48        writeln!(f, "Max: {:?}", self.max)?;
49        writeln!(f, "Median: {:?}", self.median)?;
50        writeln!(f, "Mean: {:?} ({:.2}%)", self.mean, self.percent_of_total)
51    }
52}
53
54/// Contains stats for all runs of a benchmarked query and provides methods for aggregating
55#[derive(Debug, Default)]
56pub struct LocalBenchmarkStats {
57    query: String,
58    runs: usize,
59    rows: Vec<usize>,
60    logical_planning_durations: Vec<Duration>,
61    physical_planning_durations: Vec<Duration>,
62    execution_durations: Vec<Duration>,
63    total_durations: Vec<Duration>,
64}
65
66impl LocalBenchmarkStats {
67    pub fn new(
68        query: String,
69        rows: Vec<usize>,
70        logical_planning_durations: Vec<Duration>,
71        physical_planning_durations: Vec<Duration>,
72        execution_durations: Vec<Duration>,
73        total_durations: Vec<Duration>,
74    ) -> Self {
75        let runs = logical_planning_durations.len();
76        Self {
77            query,
78            runs,
79            rows,
80            logical_planning_durations,
81            physical_planning_durations,
82            execution_durations,
83            total_durations,
84        }
85    }
86
87    fn summarize(&self, durations: &[Duration]) -> DurationsSummary {
88        if durations.is_empty() {
89            return DurationsSummary {
90                min: Duration::from_secs(0),
91                max: Duration::from_secs(0),
92                mean: Duration::from_secs(0),
93                median: Duration::from_secs(0),
94                percent_of_total: 0.0,
95            };
96        }
97        let mut sorted = durations.to_vec();
98        sorted.sort();
99        let len = sorted.len();
100        let min = *sorted.first().unwrap();
101        let max = *sorted.last().unwrap();
102        let mean = sorted.iter().sum::<Duration>() / len as u32;
103        let median = sorted[len / 2];
104        let this_total = durations.iter().map(|d| d.as_nanos()).sum::<u128>();
105        let total_duration = self
106            .total_durations
107            .iter()
108            .map(|d| d.as_nanos())
109            .sum::<u128>();
110        let percent_of_total = (this_total as f64 / total_duration as f64) * 100.0;
111        DurationsSummary {
112            min,
113            max,
114            mean,
115            median,
116            percent_of_total,
117        }
118    }
119
120    pub fn to_summary_csv_row(&self) -> String {
121        let mut csv = String::new();
122        let logical_planning_summary = self.summarize(&self.logical_planning_durations);
123        let physical_planning_summary = self.summarize(&self.physical_planning_durations);
124        let execution_summary = self.summarize(&self.execution_durations);
125        let total_summary = self.summarize(&self.total_durations);
126
127        csv.push_str(&self.query);
128        csv.push(',');
129        csv.push_str(&self.runs.to_string());
130        csv.push(',');
131        csv.push_str(logical_planning_summary.to_csv_fields().as_str());
132        csv.push(',');
133        csv.push_str(physical_planning_summary.to_csv_fields().as_str());
134        csv.push(',');
135        csv.push_str(execution_summary.to_csv_fields().as_str());
136        csv.push(',');
137        csv.push_str(total_summary.to_csv_fields().as_str());
138        csv
139    }
140}
141
142pub fn is_all_same(arr: &[usize]) -> bool {
143    arr.iter().min() == arr.iter().max()
144}
145
146impl std::fmt::Display for LocalBenchmarkStats {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        writeln!(f)?;
149        writeln!(f, "----------------------------")?;
150        writeln!(f, "Benchmark Stats ({} runs)", self.runs)?;
151        writeln!(f, "----------------------------")?;
152        writeln!(f, "{}", self.query)?;
153        writeln!(f, "----------------------------")?;
154        if is_all_same(&self.rows) {
155            writeln!(f, "Row counts match across runs")?;
156        } else {
157            writeln!(f, "\x1b[31mRow counts differ across runs\x1b[0m")?;
158        }
159        writeln!(f, "----------------------------")?;
160        writeln!(f)?;
161
162        let logical_planning_summary = self.summarize(&self.logical_planning_durations);
163        writeln!(f, "Logical Planning")?;
164        writeln!(f, "{}", logical_planning_summary)?;
165
166        let physical_planning_summary = self.summarize(&self.physical_planning_durations);
167        writeln!(f, "Physical Planning")?;
168        writeln!(f, "{}", physical_planning_summary)?;
169
170        let execution_summary = self.summarize(&self.execution_durations);
171        writeln!(f, "Execution")?;
172        writeln!(f, "{}", execution_summary)?;
173
174        let total_summary = self.summarize(&self.total_durations);
175        writeln!(f, "Total")?;
176        writeln!(f, "{}", total_summary)
177    }
178}