1use crate::args::DftArgs;
20use crate::execution::{local_benchmarks::LocalBenchmarkStats, AppExecution};
21use color_eyre::eyre::eyre;
22use color_eyre::Result;
23use datafusion::arrow::array::{RecordBatch, RecordBatchWriter};
24use datafusion::arrow::datatypes::SchemaRef;
25use datafusion::arrow::util::pretty::pretty_format_batches;
26use datafusion::arrow::{csv, json};
27use datafusion::sql::parser::DFParser;
28use futures::{Stream, StreamExt};
29use log::info;
30use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
31use std::error::Error;
32use std::fs::File;
33use std::io::Write;
34use std::path::{Path, PathBuf};
35#[cfg(feature = "flightsql")]
36use {crate::execution::flightsql_benchmarks::FlightSQLBenchmarkStats, tonic::IntoRequest};
37
38const LOCAL_BENCHMARK_HEADER_ROW: &str =
39 "query,runs,logical_planning_min,logical_planning_max,logical_planning_mean,logical_planning_median,logical_planning_percent_of_total,physical_planning_min,physical_planning_max,physical_planning,mean,physical_planning_median,physical_planning_percent_of_total,execution_min,execution_max,execution_execution_mean,execution_median,execution_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total";
40
41#[cfg(feature = "flightsql")]
42const FLIGHTSQL_BENCHMARK_HEADER_ROW: &str =
43 "query,runs,get_flight_info_min,get_flight_info_max,get_flight_info_mean,get_flight_info_median,get_flight_info_percent_of_total,ttfb_min,ttfb_max,ttfb,mean,ttfb_median,ttfb_percent_of_total,do_get_min,do_get_max,do_get_mean,do_get_median,do_get_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total";
44
45pub struct CliApp {
47 app_execution: AppExecution,
49 args: DftArgs,
50}
51
52impl CliApp {
53 pub fn new(app_execution: AppExecution, args: DftArgs) -> Self {
54 Self {
55 app_execution,
56 args,
57 }
58 }
59
60 fn validate_args(&self) -> color_eyre::Result<()> {
61 let more_than_one_command_or_file = (self.args.commands.len() > 1
62 || self.args.files.len() > 1)
63 && self.args.output.is_some();
64 if more_than_one_command_or_file {
65 return Err(eyre!(
66 "Output can only be saved for a single file or command"
67 ));
68 }
69
70 Ok(())
71 }
72
73 pub async fn execute_files_or_commands(&self) -> color_eyre::Result<()> {
77 if self.args.run_ddl {
78 self.app_execution.execution_ctx().execute_ddl().await;
79 }
80
81 self.validate_args()?;
82
83 #[cfg(not(feature = "flightsql"))]
84 match (
85 self.args.files.is_empty(),
86 self.args.commands.is_empty(),
87 self.args.flightsql,
88 self.args.bench,
89 self.args.analyze,
90 ) {
91 (_, _, true, _, _) => Err(eyre!(
93 "FLightSQL feature isn't enabled. Reinstall `dft` with `--features=flightsql`"
94 )),
95 (false, false, false, true, _) => {
96 Err(eyre!("Cannot benchmark without a command or file"))
97 }
98 (true, true, _, _, _) => Err(eyre!("No files or commands provided to execute")),
99 (false, false, _, false, _) => Err(eyre!(
100 "Cannot execute both files and commands at the same time"
101 )),
102 (_, _, false, true, true) => Err(eyre!(
103 "The `benchmark` and `analyze` flags are mutually exclusive"
104 )),
105
106 (false, true, _, false, false) => self.execute_files(&self.args.files).await,
108 (true, false, _, false, false) => self.execute_commands(&self.args.commands).await,
109
110 (false, true, _, true, false) => self.benchmark_files(&self.args.files).await,
112 (true, false, _, true, false) => self.benchmark_commands(&self.args.commands).await,
113
114 (false, true, _, false, true) => self.analyze_files(&self.args.files).await,
116 (true, false, _, false, true) => self.analyze_commands(&self.args.commands).await,
117 }
118 #[cfg(feature = "flightsql")]
119 match (
120 self.args.files.is_empty(),
121 self.args.commands.is_empty(),
122 self.args.flightsql,
123 self.args.bench,
124 self.args.analyze,
125 ) {
126 (true, true, _, _, _) => Err(eyre!("No files or commands provided to execute")),
128 (false, false, false, true, _) => {
129 Err(eyre!("Cannot benchmark without a command or file"))
130 }
131 (false, false, _, _, _) => Err(eyre!(
132 "Cannot execute both files and commands at the same time"
133 )),
134 (_, _, _, true, true) => Err(eyre!(
135 "The `benchmark` and `analyze` flags are mutually exclusive"
136 )),
137 (_, _, true, false, true) => Err(eyre!(
138 "The `analyze` flag is not currently supported with FlightSQL"
139 )),
140
141 (true, false, false, false, false) => self.execute_commands(&self.args.commands).await,
143 (false, true, false, false, false) => self.execute_files(&self.args.files).await,
144
145 (false, true, true, false, false) => {
147 self.flightsql_execute_files(&self.args.files).await
148 }
149 (true, false, true, false, false) => {
150 self.flightsql_execute_commands(&self.args.commands).await
151 }
152
153 (false, true, false, true, false) => self.benchmark_files(&self.args.files).await,
155 (false, true, true, true, false) => {
156 self.flightsql_benchmark_files(&self.args.files).await
157 }
158 (true, false, true, true, false) => {
159 self.flightsql_benchmark_commands(&self.args.commands).await
160 }
161 (true, false, false, true, false) => self.benchmark_commands(&self.args.commands).await,
162
163 (true, false, false, false, true) => self.analyze_commands(&self.args.commands).await,
165 (false, true, false, false, true) => self.analyze_files(&self.args.files).await,
166 }
167 }
168
169 async fn execute_files(&self, files: &[PathBuf]) -> Result<()> {
170 info!("Executing files: {:?}", files);
171 for file in files {
172 self.exec_from_file(file).await?
173 }
174
175 Ok(())
176 }
177
178 async fn benchmark_files(&self, files: &[PathBuf]) -> Result<()> {
179 if let Some(run_before_query) = &self.args.run_before {
180 self.app_execution
181 .execution_ctx()
182 .execute_sql_and_discard_results(run_before_query)
183 .await?;
184 }
185 info!("Benchmarking files: {:?}", files);
186 for file in files {
187 let query = std::fs::read_to_string(file)?;
188 let stats = self.benchmark_from_string(&query).await?;
189 println!("{}", stats);
190 }
191 Ok(())
192 }
193
194 async fn analyze_files(&self, files: &[PathBuf]) -> Result<()> {
195 info!("Analyzing files: {:?}", files);
196 for file in files {
197 let query = std::fs::read_to_string(file)?;
198 self.analyze_from_string(&query).await?;
199 }
200 Ok(())
201 }
202
203 #[cfg(feature = "flightsql")]
204 async fn flightsql_execute_files(&self, files: &[PathBuf]) -> color_eyre::Result<()> {
205 info!("Executing FlightSQL files: {:?}", files);
206 for (i, file) in files.iter().enumerate() {
207 let file = std::fs::read_to_string(file)?;
208 self.exec_from_flightsql(file, i).await?;
209 }
210
211 Ok(())
212 }
213
214 #[cfg(feature = "flightsql")]
215 async fn flightsql_benchmark_files(&self, files: &[PathBuf]) -> Result<()> {
216 info!("Benchmarking FlightSQL files: {:?}", files);
217
218 let mut open_opts = std::fs::OpenOptions::new();
219 let mut results_file = if let Some(p) = &self.args.save {
220 if !p.exists() {
221 if let Some(parent) = p.parent() {
222 std::fs::DirBuilder::new().recursive(true).create(parent)?;
223 }
224 };
225 if self.args.append && p.exists() {
226 open_opts.append(true).create(true);
227 Some(open_opts.open(p)?)
228 } else {
229 open_opts.write(true).create(true).truncate(true);
230 let mut file = open_opts.open(p)?;
231 writeln!(file, "{}", FLIGHTSQL_BENCHMARK_HEADER_ROW)?;
232 Some(file)
233 }
234 } else {
235 None
236 };
237
238 for file in files {
239 let query = std::fs::read_to_string(file)?;
240 let stats = self.flightsql_benchmark_from_string(&query).await?;
241 println!("{}", stats);
242 if let Some(ref mut results_file) = &mut results_file {
243 writeln!(results_file, "{}", stats.to_summary_csv_row())?
244 }
245 }
246
247 Ok(())
248 }
249
250 #[cfg(feature = "flightsql")]
251 async fn exec_from_flightsql(&self, sql: String, i: usize) -> color_eyre::Result<()> {
252 let client = self.app_execution.flightsql_client();
253 let mut guard = client.lock().await;
254 if let Some(client) = guard.as_mut() {
255 let start = if self.args.time {
256 Some(std::time::Instant::now())
257 } else {
258 None
259 };
260 let flight_info = client.execute(sql, None).await?;
261 for endpoint in flight_info.endpoint {
262 if let Some(ticket) = endpoint.ticket {
263 let stream = client.do_get(ticket.into_request()).await?;
264 if let Some(output_path) = &self.args.output {
265 self.output_stream(stream, output_path).await?
266 } else if let Some(start) = start {
267 self.exec_stream(stream).await;
268 let elapsed = start.elapsed();
269 println!("Query {i} executed in {:?}", elapsed);
270 } else {
271 self.print_any_stream(stream).await;
272 }
273 }
274 }
275 } else {
276 println!("No FlightSQL client configured. Add one in `~/.config/dft/config.toml`");
277 }
278
279 Ok(())
280 }
281
282 async fn execute_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
283 info!("Executing commands: {:?}", commands);
284 for command in commands {
285 self.exec_from_string(command).await?
286 }
287
288 Ok(())
289 }
290
291 async fn benchmark_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
292 if let Some(run_before_query) = &self.args.run_before {
293 self.app_execution
294 .execution_ctx()
295 .execute_sql_and_discard_results(run_before_query)
296 .await?;
297 }
298 info!("Benchmarking commands: {:?}", commands);
299 let mut open_opts = std::fs::OpenOptions::new();
300 let mut file = if let Some(p) = &self.args.save {
301 if !p.exists() {
302 if let Some(parent) = p.parent() {
303 std::fs::DirBuilder::new().recursive(true).create(parent)?;
304 }
305 };
306 if self.args.append && p.exists() {
307 open_opts.append(true).create(true);
308 Some(open_opts.open(p)?)
309 } else {
310 open_opts.write(true).create(true).truncate(true);
311 let mut file = open_opts.open(p)?;
312 writeln!(file, "{}", LOCAL_BENCHMARK_HEADER_ROW)?;
313 Some(file)
314 }
315 } else {
316 None
317 };
318
319 for command in commands {
320 let stats = self.benchmark_from_string(command).await?;
321 println!("{}", stats);
322 if let Some(ref mut file) = &mut file {
323 writeln!(file, "{}", stats.to_summary_csv_row())?;
324 }
325 }
326 Ok(())
327 }
328
329 async fn analyze_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
330 info!("Analyzing commands: {:?}", commands);
331 for command in commands {
332 self.analyze_from_string(command).await?;
333 }
334
335 Ok(())
336 }
337
338 #[cfg(feature = "flightsql")]
339 async fn flightsql_execute_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
340 info!("Executing FlightSQL commands: {:?}", commands);
341 for (i, command) in commands.iter().enumerate() {
342 self.exec_from_flightsql(command.to_string(), i).await?
343 }
344
345 Ok(())
346 }
347
348 #[cfg(feature = "flightsql")]
349 async fn flightsql_benchmark_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
350 info!("Benchmark FlightSQL commands: {:?}", commands);
351
352 let mut open_opts = std::fs::OpenOptions::new();
353 let mut file = if let Some(p) = &self.args.save {
354 if !p.exists() {
355 if let Some(parent) = p.parent() {
356 std::fs::DirBuilder::new().recursive(true).create(parent)?;
357 }
358 };
359 if self.args.append && p.exists() {
360 open_opts.append(true).create(true);
361 Some(open_opts.open(p)?)
362 } else {
363 open_opts.write(true).create(true).truncate(true);
364 let mut file = open_opts.open(p)?;
365 writeln!(file, "{}", FLIGHTSQL_BENCHMARK_HEADER_ROW)?;
366 Some(file)
367 }
368 } else {
369 None
370 };
371
372 for command in commands {
373 let stats = self.flightsql_benchmark_from_string(command).await?;
374 println!("{}", stats);
375 if let Some(ref mut file) = &mut file {
376 writeln!(file, "{}", stats.to_summary_csv_row())?
377 }
378 }
379
380 Ok(())
381 }
382
383 async fn exec_from_string(&self, sql: &str) -> Result<()> {
384 let dialect = datafusion::sql::sqlparser::dialect::GenericDialect {};
385 let statements = DFParser::parse_sql_with_dialect(sql, &dialect)?;
386 let start = if self.args.time {
387 Some(std::time::Instant::now())
388 } else {
389 None
390 };
391 for (i, statement) in statements.into_iter().enumerate() {
392 let stream = self
393 .app_execution
394 .execution_ctx()
395 .execute_statement(statement)
396 .await?;
397 if let Some(output_path) = &self.args.output {
398 self.output_stream(stream, output_path).await?;
399 } else if let Some(start) = start {
400 self.exec_stream(stream).await;
401 let elapsed = start.elapsed();
402 println!("Query {i} executed in {:?}", elapsed);
403 } else {
404 self.print_any_stream(stream).await;
405 }
406 }
407 Ok(())
408 }
409
410 async fn benchmark_from_string(&self, sql: &str) -> Result<LocalBenchmarkStats> {
411 let stats = self
412 .app_execution
413 .execution_ctx()
414 .benchmark_query(sql, self.args.benchmark_iterations)
415 .await?;
416 Ok(stats)
417 }
418
419 async fn analyze_from_string(&self, sql: &str) -> Result<()> {
420 let mut stats = self
421 .app_execution
422 .execution_ctx()
423 .analyze_query(sql)
424 .await?;
425 stats.collect_stats();
426 println!("{}", stats);
427 Ok(())
428 }
429
430 #[cfg(feature = "flightsql")]
431 async fn flightsql_benchmark_from_string(&self, sql: &str) -> Result<FlightSQLBenchmarkStats> {
432 let stats = self
433 .app_execution
434 .flightsql_ctx()
435 .benchmark_query(sql, self.args.benchmark_iterations)
436 .await?;
437 Ok(stats)
438 }
439
440 pub async fn exec_from_file(&self, file: &Path) -> color_eyre::Result<()> {
443 let string = std::fs::read_to_string(file)?;
444
445 self.exec_from_string(&string).await?;
446
447 Ok(())
448 }
449
450 pub async fn execute_and_print_sql(&self, sql: &str) -> color_eyre::Result<()> {
452 let stream = self.app_execution.execution_ctx().execute_sql(sql).await?;
453 self.print_any_stream(stream).await;
454 Ok(())
455 }
456
457 async fn exec_stream<S, E>(&self, mut stream: S)
458 where
459 S: Stream<Item = Result<RecordBatch, E>> + Unpin,
460 E: Error,
461 {
462 while let Some(maybe_batch) = stream.next().await {
463 match maybe_batch {
464 Ok(_) => {}
465 Err(e) => {
466 println!("Error executing SQL: {e}");
467 break;
468 }
469 }
470 }
471 }
472
473 async fn print_any_stream<S, E>(&self, mut stream: S)
474 where
475 S: Stream<Item = Result<RecordBatch, E>> + Unpin,
476 E: Error,
477 {
478 while let Some(maybe_batch) = stream.next().await {
479 match maybe_batch {
480 Ok(batch) => match pretty_format_batches(&[batch]) {
481 Ok(d) => println!("{}", d),
482 Err(e) => println!("Error formatting batch: {e}"),
483 },
484 Err(e) => println!("Error executing SQL: {e}"),
485 }
486 }
487 }
488
489 async fn output_stream<S, E>(&self, mut stream: S, path: &Path) -> Result<()>
490 where
491 S: Stream<Item = Result<RecordBatch, E>> + Unpin,
492 E: Error,
493 {
494 if let Some(Ok(first_batch)) = stream.next().await {
496 let schema = first_batch.schema();
497 let mut writer = path_to_writer(path, schema)?;
498 writer.write(&first_batch)?;
499
500 while let Some(maybe_batch) = stream.next().await {
501 match maybe_batch {
502 Ok(batch) => writer.write(&batch)?,
503 Err(e) => return Err(eyre!("Error executing SQL: {e}")),
504 }
505 }
506 writer.close()?;
507 }
508
509 Ok(())
510 }
511}
512
513#[allow(clippy::large_enum_variant)]
517enum AnyWriter {
518 Csv(csv::writer::Writer<File>),
519 Json(json::writer::LineDelimitedWriter<File>),
520 Parquet(ArrowWriter<File>),
521}
522
523impl AnyWriter {
524 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
525 match self {
526 AnyWriter::Csv(w) => Ok(w.write(batch)?),
527 AnyWriter::Json(w) => Ok(w.write(batch)?),
528 AnyWriter::Parquet(w) => Ok(w.write(batch)?),
529 }
530 }
531
532 fn close(self) -> Result<()> {
533 match self {
534 AnyWriter::Csv(w) => Ok(w.close()?),
535 AnyWriter::Json(w) => Ok(w.close()?),
536 AnyWriter::Parquet(w) => {
537 w.close()?;
538 Ok(())
539 }
540 }
541 }
542}
543
544fn path_to_writer(path: &Path, schema: SchemaRef) -> Result<AnyWriter> {
545 if let Some(extension) = path.extension() {
546 if let Some(e) = extension.to_ascii_lowercase().to_str() {
547 let file = std::fs::File::create(path)?;
548 return match e {
549 "csv" => Ok(AnyWriter::Csv(csv::writer::Writer::new(file))),
550 "json" => Ok(AnyWriter::Json(json::writer::LineDelimitedWriter::new(
551 file,
552 ))),
553 "parquet" => {
554 let props = WriterProperties::default();
555 let writer = ArrowWriter::try_new(file, schema, Some(props))?;
556 Ok(AnyWriter::Parquet(writer))
557 }
558 _ => {
559 return Err(eyre!(
560 "Only 'csv', 'parquet', and 'json' file types can be output"
561 ))
562 }
563 };
564 }
565 }
566 Err(eyre!("Unable to parse extension"))
567}