1use crate::cli_context::CliSessionContext;
21use crate::exec::{exec_and_print, exec_from_lines};
22use crate::functions::{display_all_functions, Function};
23use crate::print_format::PrintFormat;
24use crate::print_options::PrintOptions;
25use clap::ValueEnum;
26use datafusion::arrow::array::{ArrayRef, StringArray};
27use datafusion::arrow::datatypes::{DataType, Field, Schema};
28use datafusion::arrow::record_batch::RecordBatch;
29use datafusion::common::instant::Instant;
30use datafusion::common::{exec_datafusion_err, exec_err};
31use datafusion::error::Result;
32use std::fs::File;
33use std::io::BufReader;
34use std::str::FromStr;
35use std::sync::Arc;
36
37#[derive(Debug)]
39pub enum Command {
40 Quit,
41 Help,
42 ListTables,
43 DescribeTableStmt(String),
44 ListFunctions,
45 Include(Option<String>),
46 SearchFunctions(String),
47 QuietMode(Option<bool>),
48 OutputFormat(Option<String>),
49 ObjectStoreProfileMode(Option<String>),
50}
51
52pub enum OutputFormat {
53 ChangeFormat(String),
54}
55
56impl Command {
57 pub async fn execute(
58 &self,
59 ctx: &dyn CliSessionContext,
60 print_options: &mut PrintOptions,
61 ) -> Result<()> {
62 match self {
63 Self::Help => {
64 let now = Instant::now();
65 let command_batch = all_commands_info();
66 let schema = command_batch.schema();
67 let num_rows = command_batch.num_rows();
68 let task_ctx = ctx.task_ctx();
69 let config = &task_ctx.session_config().options().format;
70 print_options.print_batches(
71 schema,
72 &[command_batch],
73 now,
74 num_rows,
75 config,
76 )
77 }
78 Self::ListTables => {
79 exec_and_print(ctx, print_options, "SHOW TABLES".into()).await
80 }
81 Self::DescribeTableStmt(name) => {
82 exec_and_print(ctx, print_options, format!("SHOW COLUMNS FROM {name}"))
83 .await
84 }
85 Self::Include(filename) => {
86 if let Some(filename) = filename {
87 let file = File::open(filename).map_err(|e| {
88 exec_datafusion_err!("Error opening {filename:?} {e}")
89 })?;
90 exec_from_lines(ctx, &mut BufReader::new(file), print_options)
91 .await?;
92 Ok(())
93 } else {
94 exec_err!("Required filename argument is missing")
95 }
96 }
97 Self::QuietMode(quiet) => {
98 if let Some(quiet) = quiet {
99 print_options.quiet = *quiet;
100 println!(
101 "Quiet mode set to {}",
102 if print_options.quiet { "true" } else { "false" }
103 );
104 } else {
105 println!(
106 "Quiet mode is {}",
107 if print_options.quiet { "true" } else { "false" }
108 );
109 }
110 Ok(())
111 }
112 Self::Quit => exec_err!("Unexpected quit, this should be handled outside"),
113 Self::ListFunctions => display_all_functions(),
114 Self::SearchFunctions(function) => {
115 if let Ok(func) = function.parse::<Function>() {
116 let details = func.function_details()?;
117 println!("{details}");
118 Ok(())
119 } else {
120 exec_err!("{function} is not a supported function")
121 }
122 }
123 Self::OutputFormat(_) => exec_err!(
124 "Unexpected change output format, this should be handled outside"
125 ),
126 Self::ObjectStoreProfileMode(mode) => {
127 if let Some(mode) = mode {
128 let profile_mode = mode
129 .parse()
130 .map_err(|_|
131 exec_datafusion_err!("Failed to parse input: {mode}. Valid options are disabled, summary, trace")
132 )?;
133 print_options
134 .instrumented_registry
135 .set_instrument_mode(profile_mode);
136 println!(
137 "ObjectStore Profile mode set to {}",
138 print_options.instrumented_registry.instrument_mode()
139 );
140 } else {
141 println!(
142 "ObjectStore Profile mode is {}",
143 print_options.instrumented_registry.instrument_mode()
144 );
145 }
146
147 Ok(())
148 }
149 }
150 }
151
152 fn get_name_and_description(&self) -> (&'static str, &'static str) {
153 match self {
154 Self::Quit => ("\\q", "quit datafusion-cli"),
155 Self::ListTables => ("\\d", "list tables"),
156 Self::DescribeTableStmt(_) => ("\\d name", "describe table"),
157 Self::Help => ("\\?", "help"),
158 Self::Include(_) => {
159 ("\\i filename", "reads input from the specified filename")
160 }
161 Self::ListFunctions => ("\\h", "function list"),
162 Self::SearchFunctions(_) => ("\\h function", "search function"),
163 Self::QuietMode(_) => ("\\quiet (true|false)?", "print or set quiet mode"),
164 Self::OutputFormat(_) => {
165 ("\\pset [NAME [VALUE]]", "set table output option\n(format)")
166 }
167 Self::ObjectStoreProfileMode(_) => (
168 "\\object_store_profiling (disabled|summary|trace)",
169 "print or set object store profile mode",
170 ),
171 }
172 }
173}
174
175const ALL_COMMANDS: [Command; 10] = [
176 Command::ListTables,
177 Command::DescribeTableStmt(String::new()),
178 Command::Quit,
179 Command::Help,
180 Command::Include(Some(String::new())),
181 Command::ListFunctions,
182 Command::SearchFunctions(String::new()),
183 Command::QuietMode(None),
184 Command::OutputFormat(None),
185 Command::ObjectStoreProfileMode(None),
186];
187
188fn all_commands_info() -> RecordBatch {
189 let schema = Arc::new(Schema::new(vec![
190 Field::new("Command", DataType::Utf8, false),
191 Field::new("Description", DataType::Utf8, false),
192 ]));
193 let (names, description): (Vec<&str>, Vec<&str>) = ALL_COMMANDS
194 .into_iter()
195 .map(|c| c.get_name_and_description())
196 .unzip();
197 RecordBatch::try_new(
198 schema,
199 [names, description]
200 .into_iter()
201 .map(|i| Arc::new(StringArray::from(i)) as ArrayRef)
202 .collect::<Vec<_>>(),
203 )
204 .expect("This should not fail")
205}
206
207impl FromStr for Command {
208 type Err = ();
209
210 fn from_str(s: &str) -> Result<Self, Self::Err> {
211 let (c, arg) = if let Some((a, b)) = s.split_once(' ') {
212 (a, Some(b))
213 } else {
214 (s, None)
215 };
216 Ok(match (c, arg) {
217 ("q", None) => Self::Quit,
218 ("d", None) => Self::ListTables,
219 ("d", Some(name)) => Self::DescribeTableStmt(name.into()),
220 ("?", None) => Self::Help,
221 ("h", None) => Self::ListFunctions,
222 ("h", Some(function)) => Self::SearchFunctions(function.into()),
223 ("i", None) => Self::Include(None),
224 ("i", Some(filename)) => Self::Include(Some(filename.to_owned())),
225 ("quiet", Some("true" | "t" | "yes" | "y" | "on")) => {
226 Self::QuietMode(Some(true))
227 }
228 ("quiet", Some("false" | "f" | "no" | "n" | "off")) => {
229 Self::QuietMode(Some(false))
230 }
231 ("quiet", None) => Self::QuietMode(None),
232 ("pset", Some(subcommand)) => {
233 Self::OutputFormat(Some(subcommand.to_string()))
234 }
235 ("pset", None) => Self::OutputFormat(None),
236 ("object_store_profiling", Some(mode)) => {
237 Self::ObjectStoreProfileMode(Some(mode.to_string()))
238 }
239 ("object_store_profiling", None) => Self::ObjectStoreProfileMode(None),
240 _ => return Err(()),
241 })
242 }
243}
244
245impl FromStr for OutputFormat {
246 type Err = ();
247
248 fn from_str(s: &str) -> Result<Self, Self::Err> {
249 let (c, arg) = if let Some((a, b)) = s.split_once(' ') {
250 (a, Some(b))
251 } else {
252 (s, None)
253 };
254 Ok(match (c, arg) {
255 ("format", Some(format)) => Self::ChangeFormat(format.to_string()),
256 _ => return Err(()),
257 })
258 }
259}
260
261impl OutputFormat {
262 pub async fn execute(&self, print_options: &mut PrintOptions) -> Result<()> {
263 match self {
264 Self::ChangeFormat(format) => {
265 if let Ok(format) = format.parse::<PrintFormat>() {
266 print_options.format = format;
267 println!("Output format is {:?}.", print_options.format);
268 Ok(())
269 } else {
270 exec_err!(
271 "{:?} is not a valid format type [possible values: {:?}]",
272 format,
273 PrintFormat::value_variants()
274 )
275 }
276 }
277 }
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use datafusion::prelude::SessionContext;
284
285 use crate::{
286 object_storage::instrumented::{
287 InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
288 },
289 print_options::MaxRows,
290 };
291
292 use super::*;
293
294 #[tokio::test]
295 async fn command_execute_profile_mode() {
296 let ctx = SessionContext::new();
297
298 let mut print_options = PrintOptions {
299 format: PrintFormat::Automatic,
300 quiet: false,
301 maxrows: MaxRows::Unlimited,
302 color: true,
303 instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()),
304 };
305
306 let mut cmd: Command = "object_store_profiling"
307 .parse()
308 .expect("expected parse to succeed");
309 assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
310 assert_eq!(
311 print_options.instrumented_registry.instrument_mode(),
312 InstrumentedObjectStoreMode::default()
313 );
314
315 cmd = "object_store_profiling summary"
316 .parse()
317 .expect("expected parse to succeed");
318 assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
319 assert_eq!(
320 print_options.instrumented_registry.instrument_mode(),
321 InstrumentedObjectStoreMode::Summary
322 );
323
324 cmd = "object_store_profiling trace"
325 .parse()
326 .expect("expected parse to succeed");
327 assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
328 assert_eq!(
329 print_options.instrumented_registry.instrument_mode(),
330 InstrumentedObjectStoreMode::Trace
331 );
332
333 cmd = "object_store_profiling does_not_exist"
334 .parse()
335 .expect("expected parse to succeed");
336 assert!(cmd.execute(&ctx, &mut print_options).await.is_err());
337 }
338}