1use std::fs::File;
21use std::io::prelude::*;
22use std::io::BufReader;
23use std::sync::Arc;
24use std::time::Instant;
25
26use datafusion::common::Result;
27use datafusion::prelude::SessionContext;
28use rustyline::error::ReadlineError;
29use rustyline::Editor;
30
31use crate::{
32 command::{Command, OutputFormat},
33 helper::CliHelper,
34 print_options::PrintOptions,
35};
36
37pub async fn exec_from_lines(
39 ctx: &SessionContext,
40 reader: &mut BufReader<File>,
41 print_options: &PrintOptions,
42) {
43 let mut query = "".to_owned();
44 let max_rows = match print_options.maxrows {
45 datafusion_cli::print_options::MaxRows::Unlimited => usize::MAX,
46 datafusion_cli::print_options::MaxRows::Limited(max_rows) => max_rows,
47 };
48
49 for line in reader.lines() {
50 match line {
51 Ok(line) if line.starts_with("--") => {
52 continue;
53 }
54 Ok(line) => {
55 let line = line.trim_end();
56 query.push_str(line);
57 if line.ends_with(';') {
58 match exec_and_print(ctx, print_options, query, max_rows).await {
59 Ok(_) => {}
60 Err(err) => println!("{err:?}"),
61 }
62
63 #[allow(clippy::assigning_clones)]
64 {
65 query = "".to_owned();
66 }
67 } else {
68 query.push('\n');
69 }
70 }
71 _ => {
72 break;
73 }
74 }
75 }
76
77 if !query.is_empty() {
79 match exec_and_print(ctx, print_options, query, max_rows).await {
80 Ok(_) => {}
81 Err(err) => println!("{err:?}"),
82 }
83 }
84}
85
86pub async fn exec_from_files(
87 files: Vec<String>,
88 ctx: &SessionContext,
89 print_options: &PrintOptions,
90) {
91 let files = files
92 .into_iter()
93 .map(|file_path| File::open(file_path).unwrap())
94 .collect::<Vec<_>>();
95 for file in files {
96 let mut reader = BufReader::new(file);
97 exec_from_lines(ctx, &mut reader, print_options).await;
98 }
99}
100
101pub async fn exec_from_repl(ctx: &SessionContext, print_options: &mut PrintOptions) {
103 let mut rl = Editor::new().expect("created editor");
104 rl.set_helper(Some(CliHelper::new(
105 &ctx.task_ctx().session_config().options().sql_parser.dialect,
106 print_options.color,
107 )));
108 rl.load_history(".history").ok();
109
110 let mut print_options = print_options.clone();
111
112 let max_rows = match print_options.maxrows {
113 datafusion_cli::print_options::MaxRows::Unlimited => usize::MAX,
114 datafusion_cli::print_options::MaxRows::Limited(max_rows) => max_rows,
115 };
116
117 loop {
118 match rl.readline("❯ ") {
119 Ok(line) if line.starts_with('\\') => {
120 rl.add_history_entry(line.trim_end()).unwrap();
121 let command = line.split_whitespace().collect::<Vec<_>>().join(" ");
122 if let Ok(cmd) = &command[1..].parse::<Command>() {
123 match cmd {
124 Command::Quit => break,
125 Command::OutputFormat(subcommand) => {
126 if let Some(subcommand) = subcommand {
127 if let Ok(command) = subcommand.parse::<OutputFormat>() {
128 if let Err(e) =
129 command.execute(&mut print_options).await
130 {
131 eprintln!("{e}")
132 }
133 } else {
134 eprintln!(
135 "'\\{}' is not a valid command",
136 &line[1..]
137 );
138 }
139 } else {
140 println!("Output format is {:?}.", print_options.format);
141 }
142 }
143 _ => {
144 if let Err(e) = cmd.execute(ctx, &mut print_options).await {
145 eprintln!("{e}")
146 }
147 }
148 }
149 } else {
150 eprintln!("'\\{}' is not a valid command", &line[1..]);
151 }
152 }
153 Ok(line) => {
154 rl.add_history_entry(line.trim_end()).unwrap();
155 match exec_and_print(ctx, &print_options, line, max_rows).await {
156 Ok(_) => {}
157 Err(err) => eprintln!("{err:?}"),
158 }
159 }
160 Err(ReadlineError::Interrupted) => {
161 println!("^C");
162 continue;
163 }
164 Err(ReadlineError::Eof) => {
165 println!("\\q");
166 break;
167 }
168 Err(err) => {
169 eprintln!("Unknown error happened {err:?}");
170 break;
171 }
172 }
173 }
174
175 rl.save_history(".history").ok();
176}
177
178async fn exec_and_print(
179 ctx: &SessionContext,
180 print_options: &PrintOptions,
181 sql: String,
182 row_count: usize,
183) -> Result<()> {
184 let now = Instant::now();
185 let df = ctx.sql(&sql).await?;
186 let schema = Arc::new(df.schema().as_arrow().clone());
187 let results = df.collect().await?;
188 print_options.print_batches(schema, &results, now, row_count, &Default::default())?;
189
190 Ok(())
191}