kapot_cli/
exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution functions
19
20use std::fs::File;
21use std::io::prelude::*;
22use std::io::BufReader;
23use std::sync::Arc;
24use std::time::Instant;
25
26use kapot::prelude::{KapotContext, Result};
27use rustyline::error::ReadlineError;
28use rustyline::Editor;
29
30use crate::{
31    command::{Command, OutputFormat},
32    helper::CliHelper,
33    print_options::PrintOptions,
34};
35
36/// run and execute SQL statements and commands from a file, against a context with the given print options
37pub async fn exec_from_lines(
38    ctx: &KapotContext,
39    reader: &mut BufReader<File>,
40    print_options: &PrintOptions,
41) {
42    let mut query = "".to_owned();
43
44    for line in reader.lines() {
45        match line {
46            Ok(line) if line.starts_with("--") => {
47                continue;
48            }
49            Ok(line) => {
50                let line = line.trim_end();
51                query.push_str(line);
52                if line.ends_with(';') {
53                    match exec_and_print(ctx, print_options, query).await {
54                        Ok(_) => {}
55                        Err(err) => println!("{err:?}"),
56                    }
57
58                    #[allow(clippy::assigning_clones)]
59                    {
60                        query = "".to_owned();
61                    }
62                } else {
63                    query.push('\n');
64                }
65            }
66            _ => {
67                break;
68            }
69        }
70    }
71
72    // run the left over query if the last statement doesn't contain ‘;’
73    if !query.is_empty() {
74        match exec_and_print(ctx, print_options, query).await {
75            Ok(_) => {}
76            Err(err) => println!("{err:?}"),
77        }
78    }
79}
80
81pub async fn exec_from_files(files: Vec<String>, context: &KapotContext, print_options: &PrintOptions) {
82    let files = files
83        .into_iter()
84        .map(|file_path| File::open(file_path).unwrap())
85        .collect::<Vec<_>>();
86
87    for file in files {
88        let mut reader = BufReader::new(file);
89        exec_from_lines(context, &mut reader, print_options).await;
90    }
91}
92
93/// run and execute SQL statements and commands against a context with the given print options
94pub async fn execute_from_repl(ctx: &KapotContext, print_options: &mut PrintOptions) {
95
96    let mut rl = Editor::new().expect("created editor");
97    
98    rl.set_helper(Some(CliHelper::new(
99        &ctx.context()
100            .task_ctx()
101            .session_config()
102            .options()
103            .sql_parser
104            .dialect,
105        print_options.color,
106    )));
107    rl.load_history(".history").ok();
108
109    let mut print_options = print_options.clone();
110
111    loop {
112        match rl.readline("❯ ") {
113            Ok(line) if line.starts_with('\\') => {
114                rl.add_history_entry(line.trim_end()).unwrap();
115                
116                let command = line.split_whitespace().collect::<Vec<_>>().join(" ");
117
118                if let Ok(cmd) = &command[1..].parse::<Command>() {
119                    match cmd {
120                        Command::Quit => break,
121                        Command::OutputFormat(subcommand) => {
122                            if let Some(subcommand) = subcommand {
123                                if let Ok(command) = subcommand.parse::<OutputFormat>() {
124                                    if let Err(e) =
125                                        command.execute(&mut print_options).await
126                                    {
127                                        eprintln!("{e}")
128                                    }
129                                } else {
130                                    eprintln!(
131                                        "'\\{}' is not a valid command",
132                                        &line[1..]
133                                    );
134                                }
135                            } else {
136                                println!("Output format is {:?}.", print_options.format);
137                            }
138                        }
139                        _ => {
140                            if let Err(e) = cmd.execute(ctx, &mut print_options).await {
141                                eprintln!("{e}")
142                            }
143                        }
144                    }
145                } else {
146                    eprintln!("'\\{}' is not a valid command", &line[1..]);
147                }
148            }
149
150            Ok(line) => {
151                rl.add_history_entry(line.trim_end()).unwrap();
152                match exec_and_print(ctx, &print_options, line).await {
153                    Ok(_) => {}
154                    Err(err) => eprintln!("{err:?}"),
155                }
156            }
157            Err(ReadlineError::Interrupted) => {
158                println!("^C");
159                continue;
160            }
161            Err(ReadlineError::Eof) => {
162                println!("\\q");
163                break;
164            }
165            Err(err) => {
166                eprintln!("Unknown error happened {err:?}");
167                break;
168            }
169        }
170    }
171
172    rl.save_history(".history").ok();
173}
174
175async fn exec_and_print(ctx: &KapotContext, print_options: &PrintOptions, sql: String) -> Result<()> {
176    let now = Instant::now();
177    let df = ctx.sql(&sql).await?;
178    let schema = Arc::new(df.schema().as_arrow().clone());
179    let results = df.collect().await?;
180    print_options.print_batches(schema, &results, now)?;
181
182    Ok(())
183}