use std::{
io::{self, BufRead, Write},
marker::PhantomData,
path::PathBuf,
};
use derive_more::{Display, Error, From};
use itertools::Itertools;
use owo_colors::{OwoColorize, colors::css::Gray};
use tabled::{builder::Builder as TabledBuilder, settings::Style as TabledStyle};
use tempest_engine::{
Engine, EngineError,
catalog::{CatalogState, schema::{TypeId, VariantId}},
config::EngineConfig,
query::QueryResult,
types::TempestValue,
};
use tempest_io::Io;
use tempest_tql::ParseError;
use crate::stdio::Stdio;
#[macro_use]
extern crate tracing;
pub mod stdio;
#[derive(Debug, Display, Error, From)]
pub enum ReplError {
Io(io::Error),
Engine(EngineError),
}
const REPL_INTRODUCTION: &str = r"This is the Tempest REPL: Read, Evaluate, Print, Loop.
You can type in some special commands, starting with a dot,
or you can enter in valid TQL statements!";
fn format_value(val: &TempestValue, catalog: &CatalogState) -> String {
format_value_inner(val, catalog, false)
}
fn format_value_inner(val: &TempestValue, catalog: &CatalogState, quoted: bool) -> String {
match val {
TempestValue::String(s) if quoted => format!("{:?}", s.as_ref()),
TempestValue::Enum { type_id, variant_id, fields } => {
let type_schema = catalog.get_type(TypeId(*type_id));
let type_prefix = type_schema.map(|ts| match ts.database_id() {
Some(db_id) => format!("{}.{}", catalog.databases[&db_id].name, ts.name()),
None => ts.name().to_string(),
}).unwrap_or_else(|| type_id.to_string());
let variant_name = type_schema
.and_then(|ts| ts.as_enum())
.and_then(|e| e.variants.get(&VariantId(*variant_id)))
.map(|v| v.name.to_string())
.unwrap_or_else(|| variant_id.to_string());
if fields.is_empty() {
format!("{}.{}", type_prefix, variant_name)
} else {
let args: Vec<_> = fields.iter().map(|f| format_value_inner(f, catalog, true)).collect();
format!("{}.{}({})", type_prefix, variant_name, args.join(", "))
}
}
other => format!("{}", other),
}
}
pub struct Repl<I: Io, S: Stdio> {
data_dir: PathBuf,
config: EngineConfig,
stdio: S,
_marker: PhantomData<I>,
}
impl<I: Io, S: Stdio> Repl<I, S> {
pub fn new(data_dir: PathBuf, config: EngineConfig, stdio: S) -> Self {
Self {
data_dir,
config,
stdio,
_marker: PhantomData,
}
}
fn explain_command(&mut self, name: &str, description: &str) -> io::Result<()> {
writeln!(
self.stdio.stdout(),
"{} {}",
name.bright_green(),
description.fg::<Gray>()
)
}
fn show_help(&mut self) -> io::Result<()> {
writeln!(
self.stdio.stdout(),
"{}",
"List of available commands:".bright_green()
)?;
self.explain_command(".help | .h", "show this menu")?;
self.explain_command(".clear | .c", "clear the screen")?;
self.explain_command(".quit | .q", "terminate the REPL session")?;
self.explain_command(".databases | .dbs", "list all databases")?;
self.explain_command(
".tables <database>",
"list all tables, optionally scoped to a database",
)?;
self.explain_command(
".types <database>",
"list all types, optionally scoped to a database",
)?;
Ok(())
}
fn clear_screen(&mut self) -> io::Result<()> {
write!(self.stdio.stdout(), "\x1b[2J\x1b[H")
}
fn list_databases(&mut self, catalog: &CatalogState) -> io::Result<()> {
let mut builder = TabledBuilder::new();
builder.push_record(["database", "tables"].map(|s| format!("{}", s.bold())));
for db in catalog.databases.values() {
builder.push_record([db.name.to_string(), db.tables.len().to_string()]);
}
if catalog.databases.is_empty() {
builder.push_record([""]);
}
let mut table = builder.build();
table.with(TabledStyle::rounded());
writeln!(self.stdio.stdout(), "{table}")
}
fn list_types(&mut self, catalog: &CatalogState, database: Option<&str>) -> io::Result<()> {
let mut builder = TabledBuilder::new();
builder.push_record(["database", "type", "fields"].map(|s| format!("{}", s.bold())));
let types: Vec<_> = if let Some(database) = database {
catalog.types_in_database(database).collect()
} else {
catalog
.types
.iter()
.chain(catalog.global_types.iter())
.map(|(tid, schema)| (*tid, schema))
.collect()
};
if types.is_empty() {
builder.push_record([""]);
}
for (_, type_schema) in types {
use tempest_engine::catalog::schema::TypeSchema;
let db_name = match type_schema.database_id() {
Some(db_id) => catalog.databases[&db_id].name.to_string(),
None => "(global)".to_string(),
};
let members = match type_schema {
TypeSchema::Struct(s) => s.fields.values().map(|f| f.name.to_string()).join(", "),
TypeSchema::Enum(e) => {
format!("enum {{ {} }}", e.variants.values().map(|v| v.name.to_string()).join(", "))
}
};
builder.push_record([
db_name,
type_schema.name().to_string(),
members,
]);
}
let mut table = builder.build();
table.with(TabledStyle::rounded());
writeln!(self.stdio.stdout(), "{table}")
}
fn list_tables(&mut self, catalog: &CatalogState, database: Option<&str>) -> io::Result<()> {
let mut builder = TabledBuilder::new();
builder.push_record(
["database", "table", "type", "columns", "primary key"].map(|s| format!("{}", s.bold())),
);
let tables: Vec<_> = if let Some(database) = database {
catalog.tables_in_database(database).collect()
} else {
catalog
.tables
.iter()
.map(|(tid, schema)| (*tid, schema))
.collect()
};
if tables.is_empty() {
builder.push_record([""]);
}
for (_, table_schema) in tables {
let database = &catalog.databases[&table_schema.database_id].name;
let type_schema = catalog.get_type(table_schema.type_id);
let type_name = type_schema.map(|ts| match ts.database_id() {
Some(db_id) => format!("{}.{}", catalog.databases[&db_id].name, ts.name()),
None => ts.name().to_string(),
}).unwrap_or_else(|| "?".to_string());
let columns = type_schema
.and_then(|ts| ts.as_struct())
.map(|s| s.fields.len())
.unwrap_or(0);
builder.push_record(vec![
database.to_string(),
table_schema.name.to_string(),
type_name,
columns.to_string(),
table_schema
.primary_key
.iter()
.map(|path| catalog.pk_path_name(path, table_schema))
.join(", "),
]);
}
let mut table = builder.build();
table.with(TabledStyle::rounded());
writeln!(self.stdio.stdout(), "{table}")
}
fn print_query_results(&mut self, results: Vec<QueryResult>, catalog: &CatalogState) -> io::Result<()> {
for res in results {
match res {
QueryResult::Rows { columns, rows } => {
let mut builder = TabledBuilder::new();
builder.push_record(columns.iter().map(|col| format!("{}", col.bold())));
for row in rows.iter().map(|row| row.iter().map(|v| format_value(v, catalog))) {
builder.push_record(row);
}
if rows.is_empty() {
builder.push_record([""]);
}
let mut table = builder.build();
table.with(TabledStyle::rounded());
writeln!(self.stdio.stdout(), "{table}")?;
}
QueryResult::Empty => {}
QueryResult::RowsChanged(n) => {
writeln!(
self.stdio.stdout(),
"{}",
format!("Rows changed: {}", n).bold().green()
)?;
}
}
}
Ok(())
}
fn print_parse_errors(&mut self, source: &str, errors: &[ParseError]) -> io::Result<()> {
use ariadne::{Color, Label, Report, ReportKind, Source};
for error in errors {
Report::build(ReportKind::Error, ("<repl>", error.span.clone()))
.with_message("parse error")
.with_label(
Label::new(("<repl>", error.span.clone()))
.with_message(format!("{}", error.kind))
.with_color(Color::Red),
)
.finish()
.write(("<repl>", Source::from(source)), self.stdio.stdout())?;
}
Ok(())
}
pub async fn run(&mut self) -> Result<(), ReplError> {
let mut engine = Engine::<I>::open(self.data_dir.clone(), self.config.clone()).await?;
debug!("starting repl shell");
writeln!(
self.stdio.stdout(),
"{}",
"-- TempestDB REPL --".bright_cyan().bold()
)?;
writeln!(self.stdio.stdout(), "{}", REPL_INTRODUCTION.bright_cyan())?;
self.show_help()?;
let mut buf = String::new();
let mut interrupts = 0;
loop {
buf.clear();
self.stdio.stdout().flush()?;
match self.stdio.stdin().read_line(&mut buf) {
Ok(0) => break, Ok(_) => interrupts = 0,
Err(e) if e.kind() == io::ErrorKind::Interrupted => {
interrupts += 1;
if interrupts >= 2 {
break;
}
writeln!(self.stdio.stdout(), "(press Ctrl-C again to exit)")?;
continue;
}
Err(e) => return Err(e.into()),
}
let cmd = buf.trim();
if cmd.len() == 0 {
continue;
}
if cmd.starts_with(".") {
let args: Vec<_> = cmd.split_whitespace().collect();
match args[0] {
".help" | ".h" => self.show_help()?,
".clear" | ".c" => self.clear_screen()?,
".quit" | ".q" => break,
".databases" | ".dbs" => self.list_databases(engine.catalog())?,
".types" => self.list_types(engine.catalog(), args.get(1).copied())?,
".tables" => self.list_tables(engine.catalog(), args.get(1).copied())?,
_ => {
writeln!(
self.stdio.stdout(),
"{} `{}`",
"unknown command:".bright_red(),
cmd
)?;
writeln!(
self.stdio.stdout(),
"{}",
"type .help to show available commands"
.bright_green()
.italic()
)?;
}
}
} else {
match engine.execute(cmd).await {
Ok(results) => self.print_query_results(results, engine.catalog())?,
Err(err) => {
writeln!(
self.stdio.stdout(),
"{}",
"Failed to execute query:".bright_red().bold()
)?;
match err {
EngineError::Parse(parse_errors) => {
self.print_parse_errors(cmd, &parse_errors)?;
}
_ => writeln!(
self.stdio.stdout(),
"{}",
format!("{}", err).bright_red()
)?,
}
}
}
}
self.stdio.push_history(cmd);
}
self.stdio.stdout().flush()?;
Ok(())
}
}
pub async fn repl<I: Io, S: Stdio>(
data_dir: PathBuf,
config: EngineConfig,
stdio: S,
) -> Result<(), ReplError> {
Repl::<I, S>::new(data_dir, config, stdio).run().await
}