ballista_cli/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution functions
19
20use std::fs::File;
21use std::io::prelude::*;
22use std::io::BufReader;
23use std::sync::Arc;
24use std::time::Instant;
25
26use datafusion::common::Result;
27use datafusion::prelude::SessionContext;
28use rustyline::error::ReadlineError;
29use rustyline::Editor;
30
31use crate::{
32    command::{Command, OutputFormat},
33    helper::CliHelper,
34    print_options::PrintOptions,
35};
36
37/// run and execute SQL statements and commands from a file, against a context with the given print options
38pub async fn exec_from_lines(
39    ctx: &SessionContext,
40    reader: &mut BufReader<File>,
41    print_options: &PrintOptions,
42) {
43    let mut query = "".to_owned();
44    let max_rows = match print_options.maxrows {
45        datafusion_cli::print_options::MaxRows::Unlimited => usize::MAX,
46        datafusion_cli::print_options::MaxRows::Limited(max_rows) => max_rows,
47    };
48
49    for line in reader.lines() {
50        match line {
51            Ok(line) if line.starts_with("--") => {
52                continue;
53            }
54            Ok(line) => {
55                let line = line.trim_end();
56                query.push_str(line);
57                if line.ends_with(';') {
58                    match exec_and_print(ctx, print_options, query, max_rows).await {
59                        Ok(_) => {}
60                        Err(err) => println!("{err:?}"),
61                    }
62
63                    #[allow(clippy::assigning_clones)]
64                    {
65                        query = "".to_owned();
66                    }
67                } else {
68                    query.push('\n');
69                }
70            }
71            _ => {
72                break;
73            }
74        }
75    }
76
77    // run the left over query if the last statement doesn't contain ‘;’
78    if !query.is_empty() {
79        match exec_and_print(ctx, print_options, query, max_rows).await {
80            Ok(_) => {}
81            Err(err) => println!("{err:?}"),
82        }
83    }
84}
85
86pub async fn exec_from_files(
87    files: Vec<String>,
88    ctx: &SessionContext,
89    print_options: &PrintOptions,
90) {
91    let files = files
92        .into_iter()
93        .map(|file_path| File::open(file_path).unwrap())
94        .collect::<Vec<_>>();
95    for file in files {
96        let mut reader = BufReader::new(file);
97        exec_from_lines(ctx, &mut reader, print_options).await;
98    }
99}
100
101/// run and execute SQL statements and commands against a context with the given print options
102pub async fn exec_from_repl(ctx: &SessionContext, print_options: &mut PrintOptions) {
103    let mut rl = Editor::new().expect("created editor");
104    rl.set_helper(Some(CliHelper::new(
105        &ctx.task_ctx().session_config().options().sql_parser.dialect,
106        print_options.color,
107    )));
108    rl.load_history(".history").ok();
109
110    let mut print_options = print_options.clone();
111
112    let max_rows = match print_options.maxrows {
113        datafusion_cli::print_options::MaxRows::Unlimited => usize::MAX,
114        datafusion_cli::print_options::MaxRows::Limited(max_rows) => max_rows,
115    };
116
117    loop {
118        match rl.readline("❯ ") {
119            Ok(line) if line.starts_with('\\') => {
120                rl.add_history_entry(line.trim_end()).unwrap();
121                let command = line.split_whitespace().collect::<Vec<_>>().join(" ");
122                if let Ok(cmd) = &command[1..].parse::<Command>() {
123                    match cmd {
124                        Command::Quit => break,
125                        Command::OutputFormat(subcommand) => {
126                            if let Some(subcommand) = subcommand {
127                                if let Ok(command) = subcommand.parse::<OutputFormat>() {
128                                    if let Err(e) =
129                                        command.execute(&mut print_options).await
130                                    {
131                                        eprintln!("{e}")
132                                    }
133                                } else {
134                                    eprintln!(
135                                        "'\\{}' is not a valid command",
136                                        &line[1..]
137                                    );
138                                }
139                            } else {
140                                println!("Output format is {:?}.", print_options.format);
141                            }
142                        }
143                        _ => {
144                            if let Err(e) = cmd.execute(ctx, &mut print_options).await {
145                                eprintln!("{e}")
146                            }
147                        }
148                    }
149                } else {
150                    eprintln!("'\\{}' is not a valid command", &line[1..]);
151                }
152            }
153            Ok(line) => {
154                rl.add_history_entry(line.trim_end()).unwrap();
155                match exec_and_print(ctx, &print_options, line, max_rows).await {
156                    Ok(_) => {}
157                    Err(err) => eprintln!("{err:?}"),
158                }
159            }
160            Err(ReadlineError::Interrupted) => {
161                println!("^C");
162                continue;
163            }
164            Err(ReadlineError::Eof) => {
165                println!("\\q");
166                break;
167            }
168            Err(err) => {
169                eprintln!("Unknown error happened {err:?}");
170                break;
171            }
172        }
173    }
174
175    rl.save_history(".history").ok();
176}
177
178async fn exec_and_print(
179    ctx: &SessionContext,
180    print_options: &PrintOptions,
181    sql: String,
182    row_count: usize,
183) -> Result<()> {
184    let now = Instant::now();
185    let df = ctx.sql(&sql).await?;
186    let schema = Arc::new(df.schema().as_arrow().clone());
187    let results = df.collect().await?;
188    print_options.print_batches(schema, &results, now, row_count, &Default::default())?;
189
190    Ok(())
191}