datafusion-cli 51.0.0

Command Line Client for DataFusion query engine.
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Command within CLI

use crate::cli_context::CliSessionContext;
use crate::exec::{exec_and_print, exec_from_lines};
use crate::functions::{display_all_functions, Function};
use crate::print_format::PrintFormat;
use crate::print_options::PrintOptions;
use clap::ValueEnum;
use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::instant::Instant;
use datafusion::common::{exec_datafusion_err, exec_err};
use datafusion::error::Result;
use std::fs::File;
use std::io::BufReader;
use std::str::FromStr;
use std::sync::Arc;

/// Command
#[derive(Debug)]
pub enum Command {
    Quit,
    Help,
    ListTables,
    DescribeTableStmt(String),
    ListFunctions,
    Include(Option<String>),
    SearchFunctions(String),
    QuietMode(Option<bool>),
    OutputFormat(Option<String>),
    ObjectStoreProfileMode(Option<String>),
}

pub enum OutputFormat {
    ChangeFormat(String),
}

impl Command {
    pub async fn execute(
        &self,
        ctx: &dyn CliSessionContext,
        print_options: &mut PrintOptions,
    ) -> Result<()> {
        match self {
            Self::Help => {
                let now = Instant::now();
                let command_batch = all_commands_info();
                let schema = command_batch.schema();
                let num_rows = command_batch.num_rows();
                let task_ctx = ctx.task_ctx();
                let config = &task_ctx.session_config().options().format;
                print_options.print_batches(
                    schema,
                    &[command_batch],
                    now,
                    num_rows,
                    config,
                )
            }
            Self::ListTables => {
                exec_and_print(ctx, print_options, "SHOW TABLES".into()).await
            }
            Self::DescribeTableStmt(name) => {
                exec_and_print(ctx, print_options, format!("SHOW COLUMNS FROM {name}"))
                    .await
            }
            Self::Include(filename) => {
                if let Some(filename) = filename {
                    let file = File::open(filename).map_err(|e| {
                        exec_datafusion_err!("Error opening {filename:?} {e}")
                    })?;
                    exec_from_lines(ctx, &mut BufReader::new(file), print_options)
                        .await?;
                    Ok(())
                } else {
                    exec_err!("Required filename argument is missing")
                }
            }
            Self::QuietMode(quiet) => {
                if let Some(quiet) = quiet {
                    print_options.quiet = *quiet;
                    println!(
                        "Quiet mode set to {}",
                        if print_options.quiet { "true" } else { "false" }
                    );
                } else {
                    println!(
                        "Quiet mode is {}",
                        if print_options.quiet { "true" } else { "false" }
                    );
                }
                Ok(())
            }
            Self::Quit => exec_err!("Unexpected quit, this should be handled outside"),
            Self::ListFunctions => display_all_functions(),
            Self::SearchFunctions(function) => {
                if let Ok(func) = function.parse::<Function>() {
                    let details = func.function_details()?;
                    println!("{details}");
                    Ok(())
                } else {
                    exec_err!("{function} is not a supported function")
                }
            }
            Self::OutputFormat(_) => exec_err!(
                "Unexpected change output format, this should be handled outside"
            ),
            Self::ObjectStoreProfileMode(mode) => {
                if let Some(mode) = mode {
                    let profile_mode = mode
                        .parse()
                        .map_err(|_|
                            exec_datafusion_err!("Failed to parse input: {mode}. Valid options are disabled, summary, trace")
                        )?;
                    print_options
                        .instrumented_registry
                        .set_instrument_mode(profile_mode);
                    println!(
                        "ObjectStore Profile mode set to {}",
                        print_options.instrumented_registry.instrument_mode()
                    );
                } else {
                    println!(
                        "ObjectStore Profile mode is {}",
                        print_options.instrumented_registry.instrument_mode()
                    );
                }

                Ok(())
            }
        }
    }

    fn get_name_and_description(&self) -> (&'static str, &'static str) {
        match self {
            Self::Quit => ("\\q", "quit datafusion-cli"),
            Self::ListTables => ("\\d", "list tables"),
            Self::DescribeTableStmt(_) => ("\\d name", "describe table"),
            Self::Help => ("\\?", "help"),
            Self::Include(_) => {
                ("\\i filename", "reads input from the specified filename")
            }
            Self::ListFunctions => ("\\h", "function list"),
            Self::SearchFunctions(_) => ("\\h function", "search function"),
            Self::QuietMode(_) => ("\\quiet (true|false)?", "print or set quiet mode"),
            Self::OutputFormat(_) => {
                ("\\pset [NAME [VALUE]]", "set table output option\n(format)")
            }
            Self::ObjectStoreProfileMode(_) => (
                "\\object_store_profiling (disabled|summary|trace)",
                "print or set object store profile mode",
            ),
        }
    }
}

const ALL_COMMANDS: [Command; 10] = [
    Command::ListTables,
    Command::DescribeTableStmt(String::new()),
    Command::Quit,
    Command::Help,
    Command::Include(Some(String::new())),
    Command::ListFunctions,
    Command::SearchFunctions(String::new()),
    Command::QuietMode(None),
    Command::OutputFormat(None),
    Command::ObjectStoreProfileMode(None),
];

fn all_commands_info() -> RecordBatch {
    let schema = Arc::new(Schema::new(vec![
        Field::new("Command", DataType::Utf8, false),
        Field::new("Description", DataType::Utf8, false),
    ]));
    let (names, description): (Vec<&str>, Vec<&str>) = ALL_COMMANDS
        .into_iter()
        .map(|c| c.get_name_and_description())
        .unzip();
    RecordBatch::try_new(
        schema,
        [names, description]
            .into_iter()
            .map(|i| Arc::new(StringArray::from(i)) as ArrayRef)
            .collect::<Vec<_>>(),
    )
    .expect("This should not fail")
}

impl FromStr for Command {
    type Err = ();

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let (c, arg) = if let Some((a, b)) = s.split_once(' ') {
            (a, Some(b))
        } else {
            (s, None)
        };
        Ok(match (c, arg) {
            ("q", None) => Self::Quit,
            ("d", None) => Self::ListTables,
            ("d", Some(name)) => Self::DescribeTableStmt(name.into()),
            ("?", None) => Self::Help,
            ("h", None) => Self::ListFunctions,
            ("h", Some(function)) => Self::SearchFunctions(function.into()),
            ("i", None) => Self::Include(None),
            ("i", Some(filename)) => Self::Include(Some(filename.to_owned())),
            ("quiet", Some("true" | "t" | "yes" | "y" | "on")) => {
                Self::QuietMode(Some(true))
            }
            ("quiet", Some("false" | "f" | "no" | "n" | "off")) => {
                Self::QuietMode(Some(false))
            }
            ("quiet", None) => Self::QuietMode(None),
            ("pset", Some(subcommand)) => {
                Self::OutputFormat(Some(subcommand.to_string()))
            }
            ("pset", None) => Self::OutputFormat(None),
            ("object_store_profiling", Some(mode)) => {
                Self::ObjectStoreProfileMode(Some(mode.to_string()))
            }
            ("object_store_profiling", None) => Self::ObjectStoreProfileMode(None),
            _ => return Err(()),
        })
    }
}

impl FromStr for OutputFormat {
    type Err = ();

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let (c, arg) = if let Some((a, b)) = s.split_once(' ') {
            (a, Some(b))
        } else {
            (s, None)
        };
        Ok(match (c, arg) {
            ("format", Some(format)) => Self::ChangeFormat(format.to_string()),
            _ => return Err(()),
        })
    }
}

impl OutputFormat {
    pub async fn execute(&self, print_options: &mut PrintOptions) -> Result<()> {
        match self {
            Self::ChangeFormat(format) => {
                if let Ok(format) = format.parse::<PrintFormat>() {
                    print_options.format = format;
                    println!("Output format is {:?}.", print_options.format);
                    Ok(())
                } else {
                    exec_err!(
                        "{:?} is not a valid format type [possible values: {:?}]",
                        format,
                        PrintFormat::value_variants()
                    )
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use datafusion::prelude::SessionContext;

    use crate::{
        object_storage::instrumented::{
            InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
        },
        print_options::MaxRows,
    };

    use super::*;

    #[tokio::test]
    async fn command_execute_profile_mode() {
        let ctx = SessionContext::new();

        let mut print_options = PrintOptions {
            format: PrintFormat::Automatic,
            quiet: false,
            maxrows: MaxRows::Unlimited,
            color: true,
            instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()),
        };

        let mut cmd: Command = "object_store_profiling"
            .parse()
            .expect("expected parse to succeed");
        assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
        assert_eq!(
            print_options.instrumented_registry.instrument_mode(),
            InstrumentedObjectStoreMode::default()
        );

        cmd = "object_store_profiling summary"
            .parse()
            .expect("expected parse to succeed");
        assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
        assert_eq!(
            print_options.instrumented_registry.instrument_mode(),
            InstrumentedObjectStoreMode::Summary
        );

        cmd = "object_store_profiling trace"
            .parse()
            .expect("expected parse to succeed");
        assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
        assert_eq!(
            print_options.instrumented_registry.instrument_mode(),
            InstrumentedObjectStoreMode::Trace
        );

        cmd = "object_store_profiling does_not_exist"
            .parse()
            .expect("expected parse to succeed");
        assert!(cmd.execute(&ctx, &mut print_options).await.is_err());
    }
}