datafusion_cli/
command.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Command within CLI
19
20use crate::cli_context::CliSessionContext;
21use crate::exec::{exec_and_print, exec_from_lines};
22use crate::functions::{display_all_functions, Function};
23use crate::print_format::PrintFormat;
24use crate::print_options::PrintOptions;
25use clap::ValueEnum;
26use datafusion::arrow::array::{ArrayRef, StringArray};
27use datafusion::arrow::datatypes::{DataType, Field, Schema};
28use datafusion::arrow::record_batch::RecordBatch;
29use datafusion::common::instant::Instant;
30use datafusion::common::{exec_datafusion_err, exec_err};
31use datafusion::error::Result;
32use std::fs::File;
33use std::io::BufReader;
34use std::str::FromStr;
35use std::sync::Arc;
36
37/// Command
38#[derive(Debug)]
39pub enum Command {
40    Quit,
41    Help,
42    ListTables,
43    DescribeTableStmt(String),
44    ListFunctions,
45    Include(Option<String>),
46    SearchFunctions(String),
47    QuietMode(Option<bool>),
48    OutputFormat(Option<String>),
49    ObjectStoreProfileMode(Option<String>),
50}
51
52pub enum OutputFormat {
53    ChangeFormat(String),
54}
55
56impl Command {
57    pub async fn execute(
58        &self,
59        ctx: &dyn CliSessionContext,
60        print_options: &mut PrintOptions,
61    ) -> Result<()> {
62        match self {
63            Self::Help => {
64                let now = Instant::now();
65                let command_batch = all_commands_info();
66                let schema = command_batch.schema();
67                let num_rows = command_batch.num_rows();
68                let task_ctx = ctx.task_ctx();
69                let config = &task_ctx.session_config().options().format;
70                print_options.print_batches(
71                    schema,
72                    &[command_batch],
73                    now,
74                    num_rows,
75                    config,
76                )
77            }
78            Self::ListTables => {
79                exec_and_print(ctx, print_options, "SHOW TABLES".into()).await
80            }
81            Self::DescribeTableStmt(name) => {
82                exec_and_print(ctx, print_options, format!("SHOW COLUMNS FROM {name}"))
83                    .await
84            }
85            Self::Include(filename) => {
86                if let Some(filename) = filename {
87                    let file = File::open(filename).map_err(|e| {
88                        exec_datafusion_err!("Error opening {filename:?} {e}")
89                    })?;
90                    exec_from_lines(ctx, &mut BufReader::new(file), print_options)
91                        .await?;
92                    Ok(())
93                } else {
94                    exec_err!("Required filename argument is missing")
95                }
96            }
97            Self::QuietMode(quiet) => {
98                if let Some(quiet) = quiet {
99                    print_options.quiet = *quiet;
100                    println!(
101                        "Quiet mode set to {}",
102                        if print_options.quiet { "true" } else { "false" }
103                    );
104                } else {
105                    println!(
106                        "Quiet mode is {}",
107                        if print_options.quiet { "true" } else { "false" }
108                    );
109                }
110                Ok(())
111            }
112            Self::Quit => exec_err!("Unexpected quit, this should be handled outside"),
113            Self::ListFunctions => display_all_functions(),
114            Self::SearchFunctions(function) => {
115                if let Ok(func) = function.parse::<Function>() {
116                    let details = func.function_details()?;
117                    println!("{details}");
118                    Ok(())
119                } else {
120                    exec_err!("{function} is not a supported function")
121                }
122            }
123            Self::OutputFormat(_) => exec_err!(
124                "Unexpected change output format, this should be handled outside"
125            ),
126            Self::ObjectStoreProfileMode(mode) => {
127                if let Some(mode) = mode {
128                    let profile_mode = mode
129                        .parse()
130                        .map_err(|_|
131                            exec_datafusion_err!("Failed to parse input: {mode}. Valid options are disabled, summary, trace")
132                        )?;
133                    print_options
134                        .instrumented_registry
135                        .set_instrument_mode(profile_mode);
136                    println!(
137                        "ObjectStore Profile mode set to {}",
138                        print_options.instrumented_registry.instrument_mode()
139                    );
140                } else {
141                    println!(
142                        "ObjectStore Profile mode is {}",
143                        print_options.instrumented_registry.instrument_mode()
144                    );
145                }
146
147                Ok(())
148            }
149        }
150    }
151
152    fn get_name_and_description(&self) -> (&'static str, &'static str) {
153        match self {
154            Self::Quit => ("\\q", "quit datafusion-cli"),
155            Self::ListTables => ("\\d", "list tables"),
156            Self::DescribeTableStmt(_) => ("\\d name", "describe table"),
157            Self::Help => ("\\?", "help"),
158            Self::Include(_) => {
159                ("\\i filename", "reads input from the specified filename")
160            }
161            Self::ListFunctions => ("\\h", "function list"),
162            Self::SearchFunctions(_) => ("\\h function", "search function"),
163            Self::QuietMode(_) => ("\\quiet (true|false)?", "print or set quiet mode"),
164            Self::OutputFormat(_) => {
165                ("\\pset [NAME [VALUE]]", "set table output option\n(format)")
166            }
167            Self::ObjectStoreProfileMode(_) => (
168                "\\object_store_profiling (disabled|summary|trace)",
169                "print or set object store profile mode",
170            ),
171        }
172    }
173}
174
175const ALL_COMMANDS: [Command; 10] = [
176    Command::ListTables,
177    Command::DescribeTableStmt(String::new()),
178    Command::Quit,
179    Command::Help,
180    Command::Include(Some(String::new())),
181    Command::ListFunctions,
182    Command::SearchFunctions(String::new()),
183    Command::QuietMode(None),
184    Command::OutputFormat(None),
185    Command::ObjectStoreProfileMode(None),
186];
187
188fn all_commands_info() -> RecordBatch {
189    let schema = Arc::new(Schema::new(vec![
190        Field::new("Command", DataType::Utf8, false),
191        Field::new("Description", DataType::Utf8, false),
192    ]));
193    let (names, description): (Vec<&str>, Vec<&str>) = ALL_COMMANDS
194        .into_iter()
195        .map(|c| c.get_name_and_description())
196        .unzip();
197    RecordBatch::try_new(
198        schema,
199        [names, description]
200            .into_iter()
201            .map(|i| Arc::new(StringArray::from(i)) as ArrayRef)
202            .collect::<Vec<_>>(),
203    )
204    .expect("This should not fail")
205}
206
207impl FromStr for Command {
208    type Err = ();
209
210    fn from_str(s: &str) -> Result<Self, Self::Err> {
211        let (c, arg) = if let Some((a, b)) = s.split_once(' ') {
212            (a, Some(b))
213        } else {
214            (s, None)
215        };
216        Ok(match (c, arg) {
217            ("q", None) => Self::Quit,
218            ("d", None) => Self::ListTables,
219            ("d", Some(name)) => Self::DescribeTableStmt(name.into()),
220            ("?", None) => Self::Help,
221            ("h", None) => Self::ListFunctions,
222            ("h", Some(function)) => Self::SearchFunctions(function.into()),
223            ("i", None) => Self::Include(None),
224            ("i", Some(filename)) => Self::Include(Some(filename.to_owned())),
225            ("quiet", Some("true" | "t" | "yes" | "y" | "on")) => {
226                Self::QuietMode(Some(true))
227            }
228            ("quiet", Some("false" | "f" | "no" | "n" | "off")) => {
229                Self::QuietMode(Some(false))
230            }
231            ("quiet", None) => Self::QuietMode(None),
232            ("pset", Some(subcommand)) => {
233                Self::OutputFormat(Some(subcommand.to_string()))
234            }
235            ("pset", None) => Self::OutputFormat(None),
236            ("object_store_profiling", Some(mode)) => {
237                Self::ObjectStoreProfileMode(Some(mode.to_string()))
238            }
239            ("object_store_profiling", None) => Self::ObjectStoreProfileMode(None),
240            _ => return Err(()),
241        })
242    }
243}
244
245impl FromStr for OutputFormat {
246    type Err = ();
247
248    fn from_str(s: &str) -> Result<Self, Self::Err> {
249        let (c, arg) = if let Some((a, b)) = s.split_once(' ') {
250            (a, Some(b))
251        } else {
252            (s, None)
253        };
254        Ok(match (c, arg) {
255            ("format", Some(format)) => Self::ChangeFormat(format.to_string()),
256            _ => return Err(()),
257        })
258    }
259}
260
261impl OutputFormat {
262    pub async fn execute(&self, print_options: &mut PrintOptions) -> Result<()> {
263        match self {
264            Self::ChangeFormat(format) => {
265                if let Ok(format) = format.parse::<PrintFormat>() {
266                    print_options.format = format;
267                    println!("Output format is {:?}.", print_options.format);
268                    Ok(())
269                } else {
270                    exec_err!(
271                        "{:?} is not a valid format type [possible values: {:?}]",
272                        format,
273                        PrintFormat::value_variants()
274                    )
275                }
276            }
277        }
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use datafusion::prelude::SessionContext;
284
285    use crate::{
286        object_storage::instrumented::{
287            InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
288        },
289        print_options::MaxRows,
290    };
291
292    use super::*;
293
294    #[tokio::test]
295    async fn command_execute_profile_mode() {
296        let ctx = SessionContext::new();
297
298        let mut print_options = PrintOptions {
299            format: PrintFormat::Automatic,
300            quiet: false,
301            maxrows: MaxRows::Unlimited,
302            color: true,
303            instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()),
304        };
305
306        let mut cmd: Command = "object_store_profiling"
307            .parse()
308            .expect("expected parse to succeed");
309        assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
310        assert_eq!(
311            print_options.instrumented_registry.instrument_mode(),
312            InstrumentedObjectStoreMode::default()
313        );
314
315        cmd = "object_store_profiling summary"
316            .parse()
317            .expect("expected parse to succeed");
318        assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
319        assert_eq!(
320            print_options.instrumented_registry.instrument_mode(),
321            InstrumentedObjectStoreMode::Summary
322        );
323
324        cmd = "object_store_profiling trace"
325            .parse()
326            .expect("expected parse to succeed");
327        assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
328        assert_eq!(
329            print_options.instrumented_registry.instrument_mode(),
330            InstrumentedObjectStoreMode::Trace
331        );
332
333        cmd = "object_store_profiling does_not_exist"
334            .parse()
335            .expect("expected parse to succeed");
336        assert!(cmd.execute(&ctx, &mut print_options).await.is_err());
337    }
338}