1use std::fs::File;
21use std::io::prelude::*;
22use std::io::BufReader;
23use std::sync::Arc;
24use std::time::Instant;
25
26use kapot::prelude::{KapotContext, Result};
27use rustyline::error::ReadlineError;
28use rustyline::Editor;
29
30use crate::{
31 command::{Command, OutputFormat},
32 helper::CliHelper,
33 print_options::PrintOptions,
34};
35
36pub async fn exec_from_lines(
38 ctx: &KapotContext,
39 reader: &mut BufReader<File>,
40 print_options: &PrintOptions,
41) {
42 let mut query = "".to_owned();
43
44 for line in reader.lines() {
45 match line {
46 Ok(line) if line.starts_with("--") => {
47 continue;
48 }
49 Ok(line) => {
50 let line = line.trim_end();
51 query.push_str(line);
52 if line.ends_with(';') {
53 match exec_and_print(ctx, print_options, query).await {
54 Ok(_) => {}
55 Err(err) => println!("{err:?}"),
56 }
57
58 #[allow(clippy::assigning_clones)]
59 {
60 query = "".to_owned();
61 }
62 } else {
63 query.push('\n');
64 }
65 }
66 _ => {
67 break;
68 }
69 }
70 }
71
72 if !query.is_empty() {
74 match exec_and_print(ctx, print_options, query).await {
75 Ok(_) => {}
76 Err(err) => println!("{err:?}"),
77 }
78 }
79}
80
81pub async fn exec_from_files(files: Vec<String>, context: &KapotContext, print_options: &PrintOptions) {
82 let files = files
83 .into_iter()
84 .map(|file_path| File::open(file_path).unwrap())
85 .collect::<Vec<_>>();
86
87 for file in files {
88 let mut reader = BufReader::new(file);
89 exec_from_lines(context, &mut reader, print_options).await;
90 }
91}
92
93pub async fn execute_from_repl(ctx: &KapotContext, print_options: &mut PrintOptions) {
95
96 let mut rl = Editor::new().expect("created editor");
97
98 rl.set_helper(Some(CliHelper::new(
99 &ctx.context()
100 .task_ctx()
101 .session_config()
102 .options()
103 .sql_parser
104 .dialect,
105 print_options.color,
106 )));
107 rl.load_history(".history").ok();
108
109 let mut print_options = print_options.clone();
110
111 loop {
112 match rl.readline("❯ ") {
113 Ok(line) if line.starts_with('\\') => {
114 rl.add_history_entry(line.trim_end()).unwrap();
115
116 let command = line.split_whitespace().collect::<Vec<_>>().join(" ");
117
118 if let Ok(cmd) = &command[1..].parse::<Command>() {
119 match cmd {
120 Command::Quit => break,
121 Command::OutputFormat(subcommand) => {
122 if let Some(subcommand) = subcommand {
123 if let Ok(command) = subcommand.parse::<OutputFormat>() {
124 if let Err(e) =
125 command.execute(&mut print_options).await
126 {
127 eprintln!("{e}")
128 }
129 } else {
130 eprintln!(
131 "'\\{}' is not a valid command",
132 &line[1..]
133 );
134 }
135 } else {
136 println!("Output format is {:?}.", print_options.format);
137 }
138 }
139 _ => {
140 if let Err(e) = cmd.execute(ctx, &mut print_options).await {
141 eprintln!("{e}")
142 }
143 }
144 }
145 } else {
146 eprintln!("'\\{}' is not a valid command", &line[1..]);
147 }
148 }
149
150 Ok(line) => {
151 rl.add_history_entry(line.trim_end()).unwrap();
152 match exec_and_print(ctx, &print_options, line).await {
153 Ok(_) => {}
154 Err(err) => eprintln!("{err:?}"),
155 }
156 }
157 Err(ReadlineError::Interrupted) => {
158 println!("^C");
159 continue;
160 }
161 Err(ReadlineError::Eof) => {
162 println!("\\q");
163 break;
164 }
165 Err(err) => {
166 eprintln!("Unknown error happened {err:?}");
167 break;
168 }
169 }
170 }
171
172 rl.save_history(".history").ok();
173}
174
175async fn exec_and_print(ctx: &KapotContext, print_options: &PrintOptions, sql: String) -> Result<()> {
176 let now = Instant::now();
177 let df = ctx.sql(&sql).await?;
178 let schema = Arc::new(df.schema().as_arrow().clone());
179 let results = df.collect().await?;
180 print_options.print_batches(schema, &results, now)?;
181
182 Ok(())
183}