datafusion_dft/cli/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17//! [`CliApp`]: Command Line User Interface
18
19use crate::args::DftArgs;
20use crate::execution::{local_benchmarks::LocalBenchmarkStats, AppExecution};
21use color_eyre::eyre::eyre;
22use color_eyre::Result;
23use datafusion::arrow::array::{RecordBatch, RecordBatchWriter};
24use datafusion::arrow::datatypes::SchemaRef;
25use datafusion::arrow::util::pretty::pretty_format_batches;
26use datafusion::arrow::{csv, json};
27use datafusion::sql::parser::DFParser;
28use futures::{Stream, StreamExt};
29use log::info;
30use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
31use std::error::Error;
32use std::fs::File;
33use std::io::Write;
34use std::path::{Path, PathBuf};
35#[cfg(feature = "flightsql")]
36use {crate::execution::flightsql_benchmarks::FlightSQLBenchmarkStats, tonic::IntoRequest};
37
38const LOCAL_BENCHMARK_HEADER_ROW: &str =
39    "query,runs,logical_planning_min,logical_planning_max,logical_planning_mean,logical_planning_median,logical_planning_percent_of_total,physical_planning_min,physical_planning_max,physical_planning,mean,physical_planning_median,physical_planning_percent_of_total,execution_min,execution_max,execution_execution_mean,execution_median,execution_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total";
40
41#[cfg(feature = "flightsql")]
42const FLIGHTSQL_BENCHMARK_HEADER_ROW: &str =
43    "query,runs,get_flight_info_min,get_flight_info_max,get_flight_info_mean,get_flight_info_median,get_flight_info_percent_of_total,ttfb_min,ttfb_max,ttfb,mean,ttfb_median,ttfb_percent_of_total,do_get_min,do_get_max,do_get_mean,do_get_median,do_get_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total";
44
45/// Encapsulates the command line interface
46pub struct CliApp {
47    /// Execution context for running queries
48    app_execution: AppExecution,
49    args: DftArgs,
50}
51
52impl CliApp {
53    pub fn new(app_execution: AppExecution, args: DftArgs) -> Self {
54        Self {
55            app_execution,
56            args,
57        }
58    }
59
60    fn validate_args(&self) -> color_eyre::Result<()> {
61        let more_than_one_command_or_file = (self.args.commands.len() > 1
62            || self.args.files.len() > 1)
63            && self.args.output.is_some();
64        if more_than_one_command_or_file {
65            return Err(eyre!(
66                "Output can only be saved for a single file or command"
67            ));
68        }
69
70        Ok(())
71    }
72
73    /// Execute the provided sql, which was passed as an argument from CLI.
74    ///
75    /// Optionally, use the FlightSQL client for execution.
76    pub async fn execute_files_or_commands(&self) -> color_eyre::Result<()> {
77        if self.args.run_ddl {
78            self.app_execution.execution_ctx().execute_ddl().await;
79        }
80
81        self.validate_args()?;
82
83        #[cfg(not(feature = "flightsql"))]
84        match (
85            self.args.files.is_empty(),
86            self.args.commands.is_empty(),
87            self.args.flightsql,
88            self.args.bench,
89            self.args.analyze,
90        ) {
91            // Error cases
92            (_, _, true, _, _) => Err(eyre!(
93                "FLightSQL feature isn't enabled. Reinstall `dft` with `--features=flightsql`"
94            )),
95            (false, false, false, true, _) => {
96                Err(eyre!("Cannot benchmark without a command or file"))
97            }
98            (true, true, _, _, _) => Err(eyre!("No files or commands provided to execute")),
99            (false, false, _, false, _) => Err(eyre!(
100                "Cannot execute both files and commands at the same time"
101            )),
102            (_, _, false, true, true) => Err(eyre!(
103                "The `benchmark` and `analyze` flags are mutually exclusive"
104            )),
105
106            // Execution cases
107            (false, true, _, false, false) => self.execute_files(&self.args.files).await,
108            (true, false, _, false, false) => self.execute_commands(&self.args.commands).await,
109
110            // Benchmark cases
111            (false, true, _, true, false) => self.benchmark_files(&self.args.files).await,
112            (true, false, _, true, false) => self.benchmark_commands(&self.args.commands).await,
113
114            // Analyze cases
115            (false, true, _, false, true) => self.analyze_files(&self.args.files).await,
116            (true, false, _, false, true) => self.analyze_commands(&self.args.commands).await,
117        }
118        #[cfg(feature = "flightsql")]
119        match (
120            self.args.files.is_empty(),
121            self.args.commands.is_empty(),
122            self.args.flightsql,
123            self.args.bench,
124            self.args.analyze,
125        ) {
126            // Error cases
127            (true, true, _, _, _) => Err(eyre!("No files or commands provided to execute")),
128            (false, false, false, true, _) => {
129                Err(eyre!("Cannot benchmark without a command or file"))
130            }
131            (false, false, _, _, _) => Err(eyre!(
132                "Cannot execute both files and commands at the same time"
133            )),
134            (_, _, _, true, true) => Err(eyre!(
135                "The `benchmark` and `analyze` flags are mutually exclusive"
136            )),
137            (_, _, true, false, true) => Err(eyre!(
138                "The `analyze` flag is not currently supported with FlightSQL"
139            )),
140
141            // Execution cases
142            (true, false, false, false, false) => self.execute_commands(&self.args.commands).await,
143            (false, true, false, false, false) => self.execute_files(&self.args.files).await,
144
145            // FlightSQL execution cases
146            (false, true, true, false, false) => {
147                self.flightsql_execute_files(&self.args.files).await
148            }
149            (true, false, true, false, false) => {
150                self.flightsql_execute_commands(&self.args.commands).await
151            }
152
153            // Benchmark cases
154            (false, true, false, true, false) => self.benchmark_files(&self.args.files).await,
155            (false, true, true, true, false) => {
156                self.flightsql_benchmark_files(&self.args.files).await
157            }
158            (true, false, true, true, false) => {
159                self.flightsql_benchmark_commands(&self.args.commands).await
160            }
161            (true, false, false, true, false) => self.benchmark_commands(&self.args.commands).await,
162
163            // Analyze cases
164            (true, false, false, false, true) => self.analyze_commands(&self.args.commands).await,
165            (false, true, false, false, true) => self.analyze_files(&self.args.files).await,
166        }
167    }
168
169    async fn execute_files(&self, files: &[PathBuf]) -> Result<()> {
170        info!("Executing files: {:?}", files);
171        for file in files {
172            self.exec_from_file(file).await?
173        }
174
175        Ok(())
176    }
177
178    async fn benchmark_files(&self, files: &[PathBuf]) -> Result<()> {
179        if let Some(run_before_query) = &self.args.run_before {
180            self.app_execution
181                .execution_ctx()
182                .execute_sql_and_discard_results(run_before_query)
183                .await?;
184        }
185        info!("Benchmarking files: {:?}", files);
186        for file in files {
187            let query = std::fs::read_to_string(file)?;
188            let stats = self.benchmark_from_string(&query).await?;
189            println!("{}", stats);
190        }
191        Ok(())
192    }
193
194    async fn analyze_files(&self, files: &[PathBuf]) -> Result<()> {
195        info!("Analyzing files: {:?}", files);
196        for file in files {
197            let query = std::fs::read_to_string(file)?;
198            self.analyze_from_string(&query).await?;
199        }
200        Ok(())
201    }
202
203    #[cfg(feature = "flightsql")]
204    async fn flightsql_execute_files(&self, files: &[PathBuf]) -> color_eyre::Result<()> {
205        info!("Executing FlightSQL files: {:?}", files);
206        for (i, file) in files.iter().enumerate() {
207            let file = std::fs::read_to_string(file)?;
208            self.exec_from_flightsql(file, i).await?;
209        }
210
211        Ok(())
212    }
213
214    #[cfg(feature = "flightsql")]
215    async fn flightsql_benchmark_files(&self, files: &[PathBuf]) -> Result<()> {
216        info!("Benchmarking FlightSQL files: {:?}", files);
217
218        let mut open_opts = std::fs::OpenOptions::new();
219        let mut results_file = if let Some(p) = &self.args.save {
220            if !p.exists() {
221                if let Some(parent) = p.parent() {
222                    std::fs::DirBuilder::new().recursive(true).create(parent)?;
223                }
224            };
225            if self.args.append && p.exists() {
226                open_opts.append(true).create(true);
227                Some(open_opts.open(p)?)
228            } else {
229                open_opts.write(true).create(true).truncate(true);
230                let mut file = open_opts.open(p)?;
231                writeln!(file, "{}", FLIGHTSQL_BENCHMARK_HEADER_ROW)?;
232                Some(file)
233            }
234        } else {
235            None
236        };
237
238        for file in files {
239            let query = std::fs::read_to_string(file)?;
240            let stats = self.flightsql_benchmark_from_string(&query).await?;
241            println!("{}", stats);
242            if let Some(ref mut results_file) = &mut results_file {
243                writeln!(results_file, "{}", stats.to_summary_csv_row())?
244            }
245        }
246
247        Ok(())
248    }
249
250    #[cfg(feature = "flightsql")]
251    async fn exec_from_flightsql(&self, sql: String, i: usize) -> color_eyre::Result<()> {
252        let client = self.app_execution.flightsql_client();
253        let mut guard = client.lock().await;
254        if let Some(client) = guard.as_mut() {
255            let start = if self.args.time {
256                Some(std::time::Instant::now())
257            } else {
258                None
259            };
260            let flight_info = client.execute(sql, None).await?;
261            for endpoint in flight_info.endpoint {
262                if let Some(ticket) = endpoint.ticket {
263                    let stream = client.do_get(ticket.into_request()).await?;
264                    if let Some(output_path) = &self.args.output {
265                        self.output_stream(stream, output_path).await?
266                    } else if let Some(start) = start {
267                        self.exec_stream(stream).await;
268                        let elapsed = start.elapsed();
269                        println!("Query {i} executed in {:?}", elapsed);
270                    } else {
271                        self.print_any_stream(stream).await;
272                    }
273                }
274            }
275        } else {
276            println!("No FlightSQL client configured.  Add one in `~/.config/dft/config.toml`");
277        }
278
279        Ok(())
280    }
281
282    async fn execute_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
283        info!("Executing commands: {:?}", commands);
284        for command in commands {
285            self.exec_from_string(command).await?
286        }
287
288        Ok(())
289    }
290
291    async fn benchmark_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
292        if let Some(run_before_query) = &self.args.run_before {
293            self.app_execution
294                .execution_ctx()
295                .execute_sql_and_discard_results(run_before_query)
296                .await?;
297        }
298        info!("Benchmarking commands: {:?}", commands);
299        let mut open_opts = std::fs::OpenOptions::new();
300        let mut file = if let Some(p) = &self.args.save {
301            if !p.exists() {
302                if let Some(parent) = p.parent() {
303                    std::fs::DirBuilder::new().recursive(true).create(parent)?;
304                }
305            };
306            if self.args.append && p.exists() {
307                open_opts.append(true).create(true);
308                Some(open_opts.open(p)?)
309            } else {
310                open_opts.write(true).create(true).truncate(true);
311                let mut file = open_opts.open(p)?;
312                writeln!(file, "{}", LOCAL_BENCHMARK_HEADER_ROW)?;
313                Some(file)
314            }
315        } else {
316            None
317        };
318
319        for command in commands {
320            let stats = self.benchmark_from_string(command).await?;
321            println!("{}", stats);
322            if let Some(ref mut file) = &mut file {
323                writeln!(file, "{}", stats.to_summary_csv_row())?;
324            }
325        }
326        Ok(())
327    }
328
329    async fn analyze_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
330        info!("Analyzing commands: {:?}", commands);
331        for command in commands {
332            self.analyze_from_string(command).await?;
333        }
334
335        Ok(())
336    }
337
338    #[cfg(feature = "flightsql")]
339    async fn flightsql_execute_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
340        info!("Executing FlightSQL commands: {:?}", commands);
341        for (i, command) in commands.iter().enumerate() {
342            self.exec_from_flightsql(command.to_string(), i).await?
343        }
344
345        Ok(())
346    }
347
348    #[cfg(feature = "flightsql")]
349    async fn flightsql_benchmark_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
350        info!("Benchmark FlightSQL commands: {:?}", commands);
351
352        let mut open_opts = std::fs::OpenOptions::new();
353        let mut file = if let Some(p) = &self.args.save {
354            if !p.exists() {
355                if let Some(parent) = p.parent() {
356                    std::fs::DirBuilder::new().recursive(true).create(parent)?;
357                }
358            };
359            if self.args.append && p.exists() {
360                open_opts.append(true).create(true);
361                Some(open_opts.open(p)?)
362            } else {
363                open_opts.write(true).create(true).truncate(true);
364                let mut file = open_opts.open(p)?;
365                writeln!(file, "{}", FLIGHTSQL_BENCHMARK_HEADER_ROW)?;
366                Some(file)
367            }
368        } else {
369            None
370        };
371
372        for command in commands {
373            let stats = self.flightsql_benchmark_from_string(command).await?;
374            println!("{}", stats);
375            if let Some(ref mut file) = &mut file {
376                writeln!(file, "{}", stats.to_summary_csv_row())?
377            }
378        }
379
380        Ok(())
381    }
382
383    async fn exec_from_string(&self, sql: &str) -> Result<()> {
384        let dialect = datafusion::sql::sqlparser::dialect::GenericDialect {};
385        let statements = DFParser::parse_sql_with_dialect(sql, &dialect)?;
386        let start = if self.args.time {
387            Some(std::time::Instant::now())
388        } else {
389            None
390        };
391        for (i, statement) in statements.into_iter().enumerate() {
392            let stream = self
393                .app_execution
394                .execution_ctx()
395                .execute_statement(statement)
396                .await?;
397            if let Some(output_path) = &self.args.output {
398                self.output_stream(stream, output_path).await?;
399            } else if let Some(start) = start {
400                self.exec_stream(stream).await;
401                let elapsed = start.elapsed();
402                println!("Query {i} executed in {:?}", elapsed);
403            } else {
404                self.print_any_stream(stream).await;
405            }
406        }
407        Ok(())
408    }
409
410    async fn benchmark_from_string(&self, sql: &str) -> Result<LocalBenchmarkStats> {
411        let stats = self
412            .app_execution
413            .execution_ctx()
414            .benchmark_query(sql, self.args.benchmark_iterations)
415            .await?;
416        Ok(stats)
417    }
418
419    async fn analyze_from_string(&self, sql: &str) -> Result<()> {
420        let mut stats = self
421            .app_execution
422            .execution_ctx()
423            .analyze_query(sql)
424            .await?;
425        stats.collect_stats();
426        println!("{}", stats);
427        Ok(())
428    }
429
430    #[cfg(feature = "flightsql")]
431    async fn flightsql_benchmark_from_string(&self, sql: &str) -> Result<FlightSQLBenchmarkStats> {
432        let stats = self
433            .app_execution
434            .flightsql_ctx()
435            .benchmark_query(sql, self.args.benchmark_iterations)
436            .await?;
437        Ok(stats)
438    }
439
440    /// run and execute SQL statements and commands from a file, against a context
441    /// with the given print options
442    pub async fn exec_from_file(&self, file: &Path) -> color_eyre::Result<()> {
443        let string = std::fs::read_to_string(file)?;
444
445        self.exec_from_string(&string).await?;
446
447        Ok(())
448    }
449
450    /// executes a sql statement and prints the result to stdout
451    pub async fn execute_and_print_sql(&self, sql: &str) -> color_eyre::Result<()> {
452        let stream = self.app_execution.execution_ctx().execute_sql(sql).await?;
453        self.print_any_stream(stream).await;
454        Ok(())
455    }
456
457    async fn exec_stream<S, E>(&self, mut stream: S)
458    where
459        S: Stream<Item = Result<RecordBatch, E>> + Unpin,
460        E: Error,
461    {
462        while let Some(maybe_batch) = stream.next().await {
463            match maybe_batch {
464                Ok(_) => {}
465                Err(e) => {
466                    println!("Error executing SQL: {e}");
467                    break;
468                }
469            }
470        }
471    }
472
473    async fn print_any_stream<S, E>(&self, mut stream: S)
474    where
475        S: Stream<Item = Result<RecordBatch, E>> + Unpin,
476        E: Error,
477    {
478        while let Some(maybe_batch) = stream.next().await {
479            match maybe_batch {
480                Ok(batch) => match pretty_format_batches(&[batch]) {
481                    Ok(d) => println!("{}", d),
482                    Err(e) => println!("Error formatting batch: {e}"),
483                },
484                Err(e) => println!("Error executing SQL: {e}"),
485            }
486        }
487    }
488
489    async fn output_stream<S, E>(&self, mut stream: S, path: &Path) -> Result<()>
490    where
491        S: Stream<Item = Result<RecordBatch, E>> + Unpin,
492        E: Error,
493    {
494        // We get the schema from the first batch and use that for creating the writer
495        if let Some(Ok(first_batch)) = stream.next().await {
496            let schema = first_batch.schema();
497            let mut writer = path_to_writer(path, schema)?;
498            writer.write(&first_batch)?;
499
500            while let Some(maybe_batch) = stream.next().await {
501                match maybe_batch {
502                    Ok(batch) => writer.write(&batch)?,
503                    Err(e) => return Err(eyre!("Error executing SQL: {e}")),
504                }
505            }
506            writer.close()?;
507        }
508
509        Ok(())
510    }
511}
512
513/// We use an Enum for this because of limitations with using trait objects and the `close` method
514/// on a writer taking `self` as an argument which requires a size for the trait object which is
515/// not known at compile time.
516#[allow(clippy::large_enum_variant)]
517enum AnyWriter {
518    Csv(csv::writer::Writer<File>),
519    Json(json::writer::LineDelimitedWriter<File>),
520    Parquet(ArrowWriter<File>),
521}
522
523impl AnyWriter {
524    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
525        match self {
526            AnyWriter::Csv(w) => Ok(w.write(batch)?),
527            AnyWriter::Json(w) => Ok(w.write(batch)?),
528            AnyWriter::Parquet(w) => Ok(w.write(batch)?),
529        }
530    }
531
532    fn close(self) -> Result<()> {
533        match self {
534            AnyWriter::Csv(w) => Ok(w.close()?),
535            AnyWriter::Json(w) => Ok(w.close()?),
536            AnyWriter::Parquet(w) => {
537                w.close()?;
538                Ok(())
539            }
540        }
541    }
542}
543
544fn path_to_writer(path: &Path, schema: SchemaRef) -> Result<AnyWriter> {
545    if let Some(extension) = path.extension() {
546        if let Some(e) = extension.to_ascii_lowercase().to_str() {
547            let file = std::fs::File::create(path)?;
548            return match e {
549                "csv" => Ok(AnyWriter::Csv(csv::writer::Writer::new(file))),
550                "json" => Ok(AnyWriter::Json(json::writer::LineDelimitedWriter::new(
551                    file,
552                ))),
553                "parquet" => {
554                    let props = WriterProperties::default();
555                    let writer = ArrowWriter::try_new(file, schema, Some(props))?;
556                    Ok(AnyWriter::Parquet(writer))
557                }
558                _ => {
559                    return Err(eyre!(
560                        "Only 'csv', 'parquet', and 'json' file types can be output"
561                    ))
562                }
563            };
564        }
565    }
566    Err(eyre!("Unable to parse extension"))
567}