use crate::cli_context::CliSessionContext;
use crate::exec::{exec_and_print, exec_from_lines};
use crate::functions::{display_all_functions, Function};
use crate::print_format::PrintFormat;
use crate::print_options::PrintOptions;
use clap::ValueEnum;
use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::instant::Instant;
use datafusion::common::{exec_datafusion_err, exec_err};
use datafusion::error::Result;
use std::fs::File;
use std::io::BufReader;
use std::str::FromStr;
use std::sync::Arc;
#[derive(Debug)]
pub enum Command {
Quit,
Help,
ListTables,
DescribeTableStmt(String),
ListFunctions,
Include(Option<String>),
SearchFunctions(String),
QuietMode(Option<bool>),
OutputFormat(Option<String>),
ObjectStoreProfileMode(Option<String>),
}
pub enum OutputFormat {
ChangeFormat(String),
}
impl Command {
pub async fn execute(
&self,
ctx: &dyn CliSessionContext,
print_options: &mut PrintOptions,
) -> Result<()> {
match self {
Self::Help => {
let now = Instant::now();
let command_batch = all_commands_info();
let schema = command_batch.schema();
let num_rows = command_batch.num_rows();
let task_ctx = ctx.task_ctx();
let config = &task_ctx.session_config().options().format;
print_options.print_batches(
schema,
&[command_batch],
now,
num_rows,
config,
)
}
Self::ListTables => {
exec_and_print(ctx, print_options, "SHOW TABLES".into()).await
}
Self::DescribeTableStmt(name) => {
exec_and_print(ctx, print_options, format!("SHOW COLUMNS FROM {name}"))
.await
}
Self::Include(filename) => {
if let Some(filename) = filename {
let file = File::open(filename).map_err(|e| {
exec_datafusion_err!("Error opening {filename:?} {e}")
})?;
exec_from_lines(ctx, &mut BufReader::new(file), print_options)
.await?;
Ok(())
} else {
exec_err!("Required filename argument is missing")
}
}
Self::QuietMode(quiet) => {
if let Some(quiet) = quiet {
print_options.quiet = *quiet;
println!(
"Quiet mode set to {}",
if print_options.quiet { "true" } else { "false" }
);
} else {
println!(
"Quiet mode is {}",
if print_options.quiet { "true" } else { "false" }
);
}
Ok(())
}
Self::Quit => exec_err!("Unexpected quit, this should be handled outside"),
Self::ListFunctions => display_all_functions(),
Self::SearchFunctions(function) => {
if let Ok(func) = function.parse::<Function>() {
let details = func.function_details()?;
println!("{details}");
Ok(())
} else {
exec_err!("{function} is not a supported function")
}
}
Self::OutputFormat(_) => exec_err!(
"Unexpected change output format, this should be handled outside"
),
Self::ObjectStoreProfileMode(mode) => {
if let Some(mode) = mode {
let profile_mode = mode
.parse()
.map_err(|_|
exec_datafusion_err!("Failed to parse input: {mode}. Valid options are disabled, summary, trace")
)?;
print_options
.instrumented_registry
.set_instrument_mode(profile_mode);
println!(
"ObjectStore Profile mode set to {}",
print_options.instrumented_registry.instrument_mode()
);
} else {
println!(
"ObjectStore Profile mode is {}",
print_options.instrumented_registry.instrument_mode()
);
}
Ok(())
}
}
}
fn get_name_and_description(&self) -> (&'static str, &'static str) {
match self {
Self::Quit => ("\\q", "quit datafusion-cli"),
Self::ListTables => ("\\d", "list tables"),
Self::DescribeTableStmt(_) => ("\\d name", "describe table"),
Self::Help => ("\\?", "help"),
Self::Include(_) => {
("\\i filename", "reads input from the specified filename")
}
Self::ListFunctions => ("\\h", "function list"),
Self::SearchFunctions(_) => ("\\h function", "search function"),
Self::QuietMode(_) => ("\\quiet (true|false)?", "print or set quiet mode"),
Self::OutputFormat(_) => {
("\\pset [NAME [VALUE]]", "set table output option\n(format)")
}
Self::ObjectStoreProfileMode(_) => (
"\\object_store_profiling (disabled|summary|trace)",
"print or set object store profile mode",
),
}
}
}
const ALL_COMMANDS: [Command; 10] = [
Command::ListTables,
Command::DescribeTableStmt(String::new()),
Command::Quit,
Command::Help,
Command::Include(Some(String::new())),
Command::ListFunctions,
Command::SearchFunctions(String::new()),
Command::QuietMode(None),
Command::OutputFormat(None),
Command::ObjectStoreProfileMode(None),
];
fn all_commands_info() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("Command", DataType::Utf8, false),
Field::new("Description", DataType::Utf8, false),
]));
let (names, description): (Vec<&str>, Vec<&str>) = ALL_COMMANDS
.into_iter()
.map(|c| c.get_name_and_description())
.unzip();
RecordBatch::try_new(
schema,
[names, description]
.into_iter()
.map(|i| Arc::new(StringArray::from(i)) as ArrayRef)
.collect::<Vec<_>>(),
)
.expect("This should not fail")
}
impl FromStr for Command {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (c, arg) = if let Some((a, b)) = s.split_once(' ') {
(a, Some(b))
} else {
(s, None)
};
Ok(match (c, arg) {
("q", None) => Self::Quit,
("d", None) => Self::ListTables,
("d", Some(name)) => Self::DescribeTableStmt(name.into()),
("?", None) => Self::Help,
("h", None) => Self::ListFunctions,
("h", Some(function)) => Self::SearchFunctions(function.into()),
("i", None) => Self::Include(None),
("i", Some(filename)) => Self::Include(Some(filename.to_owned())),
("quiet", Some("true" | "t" | "yes" | "y" | "on")) => {
Self::QuietMode(Some(true))
}
("quiet", Some("false" | "f" | "no" | "n" | "off")) => {
Self::QuietMode(Some(false))
}
("quiet", None) => Self::QuietMode(None),
("pset", Some(subcommand)) => {
Self::OutputFormat(Some(subcommand.to_string()))
}
("pset", None) => Self::OutputFormat(None),
("object_store_profiling", Some(mode)) => {
Self::ObjectStoreProfileMode(Some(mode.to_string()))
}
("object_store_profiling", None) => Self::ObjectStoreProfileMode(None),
_ => return Err(()),
})
}
}
impl FromStr for OutputFormat {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (c, arg) = if let Some((a, b)) = s.split_once(' ') {
(a, Some(b))
} else {
(s, None)
};
Ok(match (c, arg) {
("format", Some(format)) => Self::ChangeFormat(format.to_string()),
_ => return Err(()),
})
}
}
impl OutputFormat {
pub async fn execute(&self, print_options: &mut PrintOptions) -> Result<()> {
match self {
Self::ChangeFormat(format) => {
if let Ok(format) = format.parse::<PrintFormat>() {
print_options.format = format;
println!("Output format is {:?}.", print_options.format);
Ok(())
} else {
exec_err!(
"{:?} is not a valid format type [possible values: {:?}]",
format,
PrintFormat::value_variants()
)
}
}
}
}
}
#[cfg(test)]
mod tests {
use datafusion::prelude::SessionContext;
use crate::{
object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
},
print_options::MaxRows,
};
use super::*;
#[tokio::test]
async fn command_execute_profile_mode() {
let ctx = SessionContext::new();
let mut print_options = PrintOptions {
format: PrintFormat::Automatic,
quiet: false,
maxrows: MaxRows::Unlimited,
color: true,
instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()),
};
let mut cmd: Command = "object_store_profiling"
.parse()
.expect("expected parse to succeed");
assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
assert_eq!(
print_options.instrumented_registry.instrument_mode(),
InstrumentedObjectStoreMode::default()
);
cmd = "object_store_profiling summary"
.parse()
.expect("expected parse to succeed");
assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
assert_eq!(
print_options.instrumented_registry.instrument_mode(),
InstrumentedObjectStoreMode::Summary
);
cmd = "object_store_profiling trace"
.parse()
.expect("expected parse to succeed");
assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
assert_eq!(
print_options.instrumented_registry.instrument_mode(),
InstrumentedObjectStoreMode::Trace
);
cmd = "object_store_profiling does_not_exist"
.parse()
.expect("expected parse to succeed");
assert!(cmd.execute(&ctx, &mut print_options).await.is_err());
}
}