ballista_cli/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution functions
19
20use std::fs::File;
21use std::io::prelude::*;
22use std::io::BufReader;
23use std::sync::Arc;
24use std::time::Instant;
25
26use datafusion::common::Result;
27use datafusion::prelude::SessionContext;
28use rustyline::error::ReadlineError;
29use rustyline::Editor;
30
31use crate::{
32    command::{Command, OutputFormat},
33    helper::CliHelper,
34    print_options::PrintOptions,
35};
36
37/// run and execute SQL statements and commands from a file, against a context with the given print options
38pub async fn exec_from_lines(
39    ctx: &SessionContext,
40    reader: &mut BufReader<File>,
41    print_options: &PrintOptions,
42) {
43    let mut query = "".to_owned();
44
45    for line in reader.lines() {
46        match line {
47            Ok(line) if line.starts_with("--") => {
48                continue;
49            }
50            Ok(line) => {
51                let line = line.trim_end();
52                query.push_str(line);
53                if line.ends_with(';') {
54                    match exec_and_print(ctx, print_options, query).await {
55                        Ok(_) => {}
56                        Err(err) => println!("{err:?}"),
57                    }
58
59                    #[allow(clippy::assigning_clones)]
60                    {
61                        query = "".to_owned();
62                    }
63                } else {
64                    query.push('\n');
65                }
66            }
67            _ => {
68                break;
69            }
70        }
71    }
72
73    // run the left over query if the last statement doesn't contain ‘;’
74    if !query.is_empty() {
75        match exec_and_print(ctx, print_options, query).await {
76            Ok(_) => {}
77            Err(err) => println!("{err:?}"),
78        }
79    }
80}
81
82pub async fn exec_from_files(
83    files: Vec<String>,
84    ctx: &SessionContext,
85    print_options: &PrintOptions,
86) {
87    let files = files
88        .into_iter()
89        .map(|file_path| File::open(file_path).unwrap())
90        .collect::<Vec<_>>();
91    for file in files {
92        let mut reader = BufReader::new(file);
93        exec_from_lines(ctx, &mut reader, print_options).await;
94    }
95}
96
97/// run and execute SQL statements and commands against a context with the given print options
98pub async fn exec_from_repl(ctx: &SessionContext, print_options: &mut PrintOptions) {
99    let mut rl = Editor::new().expect("created editor");
100    rl.set_helper(Some(CliHelper::new(
101        &ctx.task_ctx().session_config().options().sql_parser.dialect,
102        print_options.color,
103    )));
104    rl.load_history(".history").ok();
105
106    let mut print_options = print_options.clone();
107
108    loop {
109        match rl.readline("❯ ") {
110            Ok(line) if line.starts_with('\\') => {
111                rl.add_history_entry(line.trim_end()).unwrap();
112                let command = line.split_whitespace().collect::<Vec<_>>().join(" ");
113                if let Ok(cmd) = &command[1..].parse::<Command>() {
114                    match cmd {
115                        Command::Quit => break,
116                        Command::OutputFormat(subcommand) => {
117                            if let Some(subcommand) = subcommand {
118                                if let Ok(command) = subcommand.parse::<OutputFormat>() {
119                                    if let Err(e) =
120                                        command.execute(&mut print_options).await
121                                    {
122                                        eprintln!("{e}")
123                                    }
124                                } else {
125                                    eprintln!(
126                                        "'\\{}' is not a valid command",
127                                        &line[1..]
128                                    );
129                                }
130                            } else {
131                                println!("Output format is {:?}.", print_options.format);
132                            }
133                        }
134                        _ => {
135                            if let Err(e) = cmd.execute(ctx, &mut print_options).await {
136                                eprintln!("{e}")
137                            }
138                        }
139                    }
140                } else {
141                    eprintln!("'\\{}' is not a valid command", &line[1..]);
142                }
143            }
144            Ok(line) => {
145                rl.add_history_entry(line.trim_end()).unwrap();
146                match exec_and_print(ctx, &print_options, line).await {
147                    Ok(_) => {}
148                    Err(err) => eprintln!("{err:?}"),
149                }
150            }
151            Err(ReadlineError::Interrupted) => {
152                println!("^C");
153                continue;
154            }
155            Err(ReadlineError::Eof) => {
156                println!("\\q");
157                break;
158            }
159            Err(err) => {
160                eprintln!("Unknown error happened {err:?}");
161                break;
162            }
163        }
164    }
165
166    rl.save_history(".history").ok();
167}
168
169async fn exec_and_print(
170    ctx: &SessionContext,
171    print_options: &PrintOptions,
172    sql: String,
173) -> Result<()> {
174    let now = Instant::now();
175    let df = ctx.sql(&sql).await?;
176    let schema = Arc::new(df.schema().as_arrow().clone());
177    let results = df.collect().await?;
178    print_options.print_batches(schema, &results, now)?;
179
180    Ok(())
181}