Skip to main content

datafusion_app/
local_benchmarks.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ExecutionContext`]: DataFusion based execution context for running SQL queries
19
20use std::time::Duration;
21
22/// Duration summary statistics
23#[derive(Debug)]
24pub struct DurationsSummary {
25    pub min: Duration,
26    pub max: Duration,
27    pub mean: Duration,
28    pub median: Duration,
29    pub percent_of_total: f64,
30}
31
32impl DurationsSummary {
33    pub fn to_csv_fields(&self) -> String {
34        format!(
35            "{},{},{},{},{:.2}",
36            self.min.as_millis(),
37            self.max.as_millis(),
38            self.mean.as_millis(),
39            self.median.as_millis(),
40            self.percent_of_total,
41        )
42    }
43}
44
45impl std::fmt::Display for DurationsSummary {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        writeln!(f, "Min: {:?}", self.min)?;
48        writeln!(f, "Max: {:?}", self.max)?;
49        writeln!(f, "Median: {:?}", self.median)?;
50        writeln!(f, "Mean: {:?} ({:.2}%)", self.mean, self.percent_of_total)
51    }
52}
53
54/// Benchmark execution mode
55#[derive(Debug, Clone, Copy, PartialEq)]
56pub enum BenchmarkMode {
57    Serial,
58    Concurrent(usize), // number of concurrent tasks
59}
60
61impl std::fmt::Display for BenchmarkMode {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            BenchmarkMode::Serial => write!(f, "serial"),
65            BenchmarkMode::Concurrent(n) => write!(f, "concurrent({})", n),
66        }
67    }
68}
69
70impl Default for BenchmarkMode {
71    fn default() -> Self {
72        BenchmarkMode::Serial
73    }
74}
75
76/// Contains stats for all runs of a benchmarked query and provides methods for aggregating
77#[derive(Debug, Default)]
78pub struct LocalBenchmarkStats {
79    query: String,
80    runs: usize,
81    mode: BenchmarkMode,
82    rows: Vec<usize>,
83    logical_planning_durations: Vec<Duration>,
84    physical_planning_durations: Vec<Duration>,
85    execution_durations: Vec<Duration>,
86    total_durations: Vec<Duration>,
87}
88
89impl LocalBenchmarkStats {
90    pub fn new(
91        query: String,
92        rows: Vec<usize>,
93        mode: BenchmarkMode,
94        logical_planning_durations: Vec<Duration>,
95        physical_planning_durations: Vec<Duration>,
96        execution_durations: Vec<Duration>,
97        total_durations: Vec<Duration>,
98    ) -> Self {
99        let runs = logical_planning_durations.len();
100        Self {
101            query,
102            runs,
103            mode,
104            rows,
105            logical_planning_durations,
106            physical_planning_durations,
107            execution_durations,
108            total_durations,
109        }
110    }
111
112    fn summarize(&self, durations: &[Duration]) -> DurationsSummary {
113        if durations.is_empty() {
114            return DurationsSummary {
115                min: Duration::from_secs(0),
116                max: Duration::from_secs(0),
117                mean: Duration::from_secs(0),
118                median: Duration::from_secs(0),
119                percent_of_total: 0.0,
120            };
121        }
122        let mut sorted = durations.to_vec();
123        sorted.sort();
124        let len = sorted.len();
125        let min = *sorted.first().unwrap();
126        let max = *sorted.last().unwrap();
127        let mean = sorted.iter().sum::<Duration>() / len as u32;
128        let median = sorted[len / 2];
129        let this_total = durations.iter().map(|d| d.as_nanos()).sum::<u128>();
130        let total_duration = self
131            .total_durations
132            .iter()
133            .map(|d| d.as_nanos())
134            .sum::<u128>();
135        let percent_of_total = (this_total as f64 / total_duration as f64) * 100.0;
136        DurationsSummary {
137            min,
138            max,
139            mean,
140            median,
141            percent_of_total,
142        }
143    }
144
145    pub fn to_summary_csv_row(&self) -> String {
146        let mut csv = String::new();
147        let logical_planning_summary = self.summarize(&self.logical_planning_durations);
148        let physical_planning_summary = self.summarize(&self.physical_planning_durations);
149        let execution_summary = self.summarize(&self.execution_durations);
150        let total_summary = self.summarize(&self.total_durations);
151
152        csv.push_str(&self.query);
153        csv.push(',');
154        csv.push_str(&self.runs.to_string());
155        csv.push(',');
156        csv.push_str(logical_planning_summary.to_csv_fields().as_str());
157        csv.push(',');
158        csv.push_str(physical_planning_summary.to_csv_fields().as_str());
159        csv.push(',');
160        csv.push_str(execution_summary.to_csv_fields().as_str());
161        csv.push(',');
162        csv.push_str(total_summary.to_csv_fields().as_str());
163        csv.push(',');
164        csv.push_str(&self.mode.to_string());
165        csv
166    }
167}
168
169pub fn is_all_same(arr: &[usize]) -> bool {
170    arr.iter().min() == arr.iter().max()
171}
172
173impl std::fmt::Display for LocalBenchmarkStats {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        writeln!(f)?;
176        writeln!(f, "----------------------------")?;
177        writeln!(f, "Benchmark Stats ({} runs, {})", self.runs, self.mode)?;
178        writeln!(f, "----------------------------")?;
179        writeln!(f, "{}", self.query)?;
180        writeln!(f, "----------------------------")?;
181        if is_all_same(&self.rows) {
182            writeln!(f, "Row counts match across runs")?;
183        } else {
184            writeln!(f, "\x1b[31mRow counts differ across runs\x1b[0m")?;
185        }
186        writeln!(f, "----------------------------")?;
187        writeln!(f)?;
188
189        let logical_planning_summary = self.summarize(&self.logical_planning_durations);
190        writeln!(f, "Logical Planning")?;
191        writeln!(f, "{}", logical_planning_summary)?;
192
193        let physical_planning_summary = self.summarize(&self.physical_planning_durations);
194        writeln!(f, "Physical Planning")?;
195        writeln!(f, "{}", physical_planning_summary)?;
196
197        let execution_summary = self.summarize(&self.execution_durations);
198        writeln!(f, "Execution")?;
199        writeln!(f, "{}", execution_summary)?;
200
201        let total_summary = self.summarize(&self.total_durations);
202        writeln!(f, "Total")?;
203        writeln!(f, "{}", total_summary)
204    }
205}
206
207/// Trait for reporting progress during benchmark execution
208pub trait BenchmarkProgressReporter: Send + Sync {
209    /// Called after each iteration completes
210    fn on_iteration_complete(&self, completed: usize, total: usize, last_duration: Duration);
211
212    /// Called when benchmark completes
213    fn finish(&self);
214}