1use std::fs::File;
21use std::io::prelude::*;
22use std::io::BufReader;
23use std::sync::Arc;
24use std::time::Instant;
25
26use datafusion::common::Result;
27use datafusion::prelude::SessionContext;
28use rustyline::error::ReadlineError;
29use rustyline::Editor;
30
31use crate::{
32 command::{Command, OutputFormat},
33 helper::CliHelper,
34 print_options::PrintOptions,
35};
36
37pub async fn exec_from_lines(
39 ctx: &SessionContext,
40 reader: &mut BufReader<File>,
41 print_options: &PrintOptions,
42) {
43 let mut query = "".to_owned();
44
45 for line in reader.lines() {
46 match line {
47 Ok(line) if line.starts_with("--") => {
48 continue;
49 }
50 Ok(line) => {
51 let line = line.trim_end();
52 query.push_str(line);
53 if line.ends_with(';') {
54 match exec_and_print(ctx, print_options, query).await {
55 Ok(_) => {}
56 Err(err) => println!("{err:?}"),
57 }
58
59 #[allow(clippy::assigning_clones)]
60 {
61 query = "".to_owned();
62 }
63 } else {
64 query.push('\n');
65 }
66 }
67 _ => {
68 break;
69 }
70 }
71 }
72
73 if !query.is_empty() {
75 match exec_and_print(ctx, print_options, query).await {
76 Ok(_) => {}
77 Err(err) => println!("{err:?}"),
78 }
79 }
80}
81
82pub async fn exec_from_files(
83 files: Vec<String>,
84 ctx: &SessionContext,
85 print_options: &PrintOptions,
86) {
87 let files = files
88 .into_iter()
89 .map(|file_path| File::open(file_path).unwrap())
90 .collect::<Vec<_>>();
91 for file in files {
92 let mut reader = BufReader::new(file);
93 exec_from_lines(ctx, &mut reader, print_options).await;
94 }
95}
96
97pub async fn exec_from_repl(ctx: &SessionContext, print_options: &mut PrintOptions) {
99 let mut rl = Editor::new().expect("created editor");
100 rl.set_helper(Some(CliHelper::new(
101 &ctx.task_ctx().session_config().options().sql_parser.dialect,
102 print_options.color,
103 )));
104 rl.load_history(".history").ok();
105
106 let mut print_options = print_options.clone();
107
108 loop {
109 match rl.readline("❯ ") {
110 Ok(line) if line.starts_with('\\') => {
111 rl.add_history_entry(line.trim_end()).unwrap();
112 let command = line.split_whitespace().collect::<Vec<_>>().join(" ");
113 if let Ok(cmd) = &command[1..].parse::<Command>() {
114 match cmd {
115 Command::Quit => break,
116 Command::OutputFormat(subcommand) => {
117 if let Some(subcommand) = subcommand {
118 if let Ok(command) = subcommand.parse::<OutputFormat>() {
119 if let Err(e) =
120 command.execute(&mut print_options).await
121 {
122 eprintln!("{e}")
123 }
124 } else {
125 eprintln!(
126 "'\\{}' is not a valid command",
127 &line[1..]
128 );
129 }
130 } else {
131 println!("Output format is {:?}.", print_options.format);
132 }
133 }
134 _ => {
135 if let Err(e) = cmd.execute(ctx, &mut print_options).await {
136 eprintln!("{e}")
137 }
138 }
139 }
140 } else {
141 eprintln!("'\\{}' is not a valid command", &line[1..]);
142 }
143 }
144 Ok(line) => {
145 rl.add_history_entry(line.trim_end()).unwrap();
146 match exec_and_print(ctx, &print_options, line).await {
147 Ok(_) => {}
148 Err(err) => eprintln!("{err:?}"),
149 }
150 }
151 Err(ReadlineError::Interrupted) => {
152 println!("^C");
153 continue;
154 }
155 Err(ReadlineError::Eof) => {
156 println!("\\q");
157 break;
158 }
159 Err(err) => {
160 eprintln!("Unknown error happened {err:?}");
161 break;
162 }
163 }
164 }
165
166 rl.save_history(".history").ok();
167}
168
169async fn exec_and_print(
170 ctx: &SessionContext,
171 print_options: &PrintOptions,
172 sql: String,
173) -> Result<()> {
174 let now = Instant::now();
175 let df = ctx.sql(&sql).await?;
176 let schema = Arc::new(df.schema().as_arrow().clone());
177 let results = df.collect().await?;
178 print_options.print_batches(schema, &results, now)?;
179
180 Ok(())
181}