use std::fs::File;
use std::io::{self, BufRead, BufReader, IsTerminal};
use std::time::Instant;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use clap::{Parser, Subcommand};
use comfy_table::{presets::UTF8_FULL_CONDENSED, Cell, ContentArrangement, Table};
use rustyline::completion::{Completer, Pair};
use rustyline::error::ReadlineError;
use rustyline::highlight::Highlighter;
use rustyline::hint::Hinter;
use rustyline::history::DefaultHistory;
use rustyline::validate::Validator;
use rustyline::{Config, Context, EditMode, Editor, Helper};
use oxibase::api::{Database, Transaction as ApiTransaction};
use oxibase::common::version::version;
use oxibase::Value;
#[cfg(feature = "cli")]
mod workspace;
const VERSION: &str = concat!(
env!("CARGO_PKG_VERSION_MAJOR"),
".",
env!("CARGO_PKG_VERSION_MINOR"),
".",
env!("CARGO_PKG_VERSION_PATCH")
);
#[derive(Parser, Debug)]
#[command(name = "oxibase")]
#[command(author = "Oxibase Contributors")]
#[command(version = VERSION)]
#[command(about = "High-performance embedded SQL database with MVCC")]
#[command(
long_about = "Oxibase is a high-performance embedded SQL database with MVCC transactions.\n\
This CLI provides an interactive interface to execute SQL queries and manage your database.\n\n\
PERSISTENCE DSN PARAMETERS:\n\
file:///path/to/db?param=value¶m2=value2\n\n\
sync=none|normal|full WAL sync mode (default: normal)\n\
snapshot_interval=SECS Snapshot interval in seconds (default: 300)\n\
keep_snapshots=COUNT Number of snapshots to keep (default: 5)\n\
wal_max_size=BYTES Max WAL file size before rotation (default: 67108864)\n\
wal_buffer_size=BYTES WAL buffer size (default: 65536)\n\
wal_flush_trigger=BYTES Buffer size to trigger flush (default: 32768)\n\
commit_batch_size=COUNT Commits to batch before sync (default: 100)\n\
sync_interval_ms=MS Min time between syncs (default: 10)\n\
compression=on|off Enable/disable all compression (default: on)\n\
wal_compression=on|off WAL compression only (default: on)\n\
snapshot_compression=on|off Snapshot compression only (default: on)\n\
compression_threshold=BYTES Min size to compress (default: 64)\n\n\
EXAMPLES:\n\
oxibase repl -d memory:// In-memory database\n\
oxibase repl -d file:///tmp/mydb Persistent database\n\
oxibase repl -d file:///tmp/mydb?sync=full Maximum durability\n\
oxibase repl -d file:///tmp/mydb?sync=none&compression=off Maximum performance\n\
oxibase repl -d file:///tmp/mydb --profile durable Use durable preset\n\
oxibase repl -d file:///tmp/mydb --sync full --compression off"
)]
struct Args {
#[command(subcommand)]
command: Option<Commands>,
#[arg(short = 'j', long = "json", default_value = "false")]
json_output: bool,
#[arg(short = 'q', long = "quiet", default_value = "false")]
quiet: bool,
#[arg(short = 'l', long = "limit", default_value = "40")]
limit: usize,
#[arg(short = 'e', long = "execute")]
execute: Option<String>,
#[arg(short = 'f', long = "file")]
file: Option<String>,
#[arg(short = 's', long = "sync", value_name = "MODE")]
sync_mode: Option<String>,
#[arg(short = 'p', long = "profile", value_name = "PROFILE")]
persistence_profile: Option<String>,
#[arg(long = "snapshot-interval", value_name = "SECONDS")]
snapshot_interval: Option<u32>,
#[arg(long = "keep-snapshots", value_name = "COUNT")]
keep_snapshots: Option<u32>,
#[arg(long = "wal-max-size", value_name = "MB")]
wal_max_size: Option<u32>,
#[arg(long = "compression", value_name = "on|off")]
compression: Option<String>,
#[arg(short = 't', long = "timeout", value_name = "MS", default_value = "0")]
timeout_ms: u64,
}
#[derive(Subcommand, Debug)]
enum Commands {
Repl {
#[arg(short = 'd', long = "db", default_value = "memory://")]
db_path: String,
},
#[cfg(feature = "server")]
Serve {
#[arg(short = 'd', long = "db", default_value = "memory://")]
db_path: String,
#[arg(short = 'p', long = "port", default_value = "8080")]
port: u16,
#[arg(long = "host", default_value = "127.0.0.1")]
host: String,
},
#[cfg(feature = "cli")]
InstallWorkspace {
#[arg(short = 'd', long = "db", default_value = "memory://")]
db_path: String,
},
}
const SQL_KEYWORDS: &[&str] = &[
"SELECT", "INSERT", "UPDATE", "DELETE", "CREATE", "DROP", "ALTER", "TABLE", "INDEX", "VIEW",
"FROM", "WHERE", "JOIN", "INNER", "LEFT", "RIGHT", "ON", "GROUP BY", "ORDER BY", "HAVING",
"LIMIT", "OFFSET", "INTO", "VALUES", "SET", "BEGIN", "COMMIT", "ROLLBACK", "SHOW", "DESCRIBE",
"EXPLAIN",
];
const CLI_COMMANDS: &[&str] = &["help", "exit", "quit", "\\q", "\\h", "\\?"];
struct SqlHelper {
db: Database,
}
impl SqlHelper {
fn new(db: Database) -> Self {
Self { db }
}
}
impl Helper for SqlHelper {}
impl Hinter for SqlHelper {
type Hint = String;
}
impl Highlighter for SqlHelper {}
impl Validator for SqlHelper {}
impl Completer for SqlHelper {
type Candidate = Pair;
fn complete(
&self,
line: &str,
pos: usize,
_ctx: &Context<'_>,
) -> rustyline::Result<(usize, Vec<Pair>)> {
let mut candidates = Vec::new();
let word_start = line[..pos]
.rfind(|c: char| c.is_whitespace())
.map_or(0, |i| i + 1);
let word = &line[word_start..pos];
let word_upper = word.to_uppercase();
let prev_line = line[..word_start].trim_end();
let is_table_context =
if let Some(last_space) = prev_line.rfind(|c: char| c.is_whitespace()) {
let prev_word = &prev_line[last_space + 1..];
let prev_word_upper = prev_word.to_uppercase();
matches!(
prev_word_upper.as_str(),
"FROM" | "INTO" | "UPDATE" | "JOIN" | "TABLE"
)
} else {
let prev_word_upper = prev_line.to_uppercase();
matches!(
prev_word_upper.as_str(),
"FROM" | "INTO" | "UPDATE" | "JOIN" | "TABLE"
)
};
if is_table_context {
let sql = "SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema != 'system' OR table_schema IS NULL";
if let Ok(rows) = self.db.query(sql, ()) {
let mut schemas = std::collections::HashSet::new();
let word_lower = word.to_lowercase();
let (has_dot, schema_prefix, table_prefix) =
if let Some(dot_idx) = word_lower.find('.') {
(true, &word_lower[..dot_idx], &word_lower[dot_idx + 1..])
} else {
(false, "", word_lower.as_str())
};
for row in rows.flatten() {
let schema_name = match row.get_value(0) {
Some(Value::Text(s)) => s.to_string(),
_ => "public".to_string(),
};
let table_name = match row.get_value(1) {
Some(Value::Text(t)) => t.to_string(),
_ => continue,
};
if has_dot {
if schema_name.to_lowercase() == schema_prefix
&& table_name.to_lowercase().starts_with(table_prefix)
{
candidates.push(Pair {
display: table_name.to_lowercase(),
replacement: format!(
"{}.{} ",
schema_name.to_lowercase(),
table_name.to_lowercase()
),
});
}
} else {
if schema_name.to_lowercase().starts_with(&word_lower)
&& schemas.insert(schema_name.clone())
{
candidates.push(Pair {
display: format!("{}.", schema_name.to_lowercase()),
replacement: format!("{}.", schema_name.to_lowercase()),
});
}
if table_name.to_lowercase().starts_with(&word_lower) {
candidates.push(Pair {
display: table_name.to_lowercase(),
replacement: format!("{} ", table_name.to_lowercase()),
});
}
}
}
}
}
if word.is_empty() && !is_table_context {
return Ok((pos, candidates));
}
if !is_table_context {
for keyword in SQL_KEYWORDS {
if keyword.starts_with(&word_upper) {
let kw_lower = keyword.to_lowercase();
candidates.push(Pair {
display: kw_lower.clone(),
replacement: format!("{} ", kw_lower),
});
}
}
for cmd in CLI_COMMANDS {
if cmd.starts_with(word) {
let cmd_lower = cmd.to_lowercase();
candidates.push(Pair {
display: cmd_lower.clone(),
replacement: cmd_lower,
});
}
}
}
Ok((word_start, candidates))
}
}
struct Cli {
db: Database,
tx: Option<ApiTransaction>,
in_transaction: bool,
json_output: bool,
limit: usize,
#[allow(dead_code)]
quiet: bool,
timeout_ms: u64,
editor: Editor<SqlHelper, DefaultHistory>,
current_query: String,
in_multi_line: bool,
}
impl Cli {
fn new(
db: Database,
json_output: bool,
limit: usize,
quiet: bool,
timeout_ms: u64,
) -> io::Result<Self> {
let config = Config::builder()
.history_ignore_space(true)
.edit_mode(EditMode::Emacs)
.build();
let mut editor =
Editor::with_config(config).map_err(|e| io::Error::other(e.to_string()))?;
editor.set_helper(Some(SqlHelper::new(db.clone())));
if let Some(home) = dirs::home_dir() {
let history_file = home.join(".oxibase_history");
let _ = editor.load_history(&history_file);
}
Ok(Self {
db,
tx: None,
in_transaction: false,
json_output,
limit,
quiet,
timeout_ms,
editor,
current_query: String::new(),
in_multi_line: false,
})
}
fn get_prompt(&self) -> &'static str {
if self.in_multi_line {
if self.in_transaction {
"\x1b[1;33m[TXN]->\x1b[0m "
} else {
"\x1b[1;36m->\x1b[0m "
}
} else if self.in_transaction {
"\x1b[1;33m[TXN]>\x1b[0m "
} else {
"\x1b[1;36m>\x1b[0m "
}
}
fn run(&mut self) -> io::Result<()> {
println!("Enter SQL commands, 'help' for assistance, or 'exit' to quit.");
println!("Use Up/Down arrows for history, Ctrl+R to search history.");
if self.json_output {
println!("JSON output mode enabled.");
}
println!();
loop {
let prompt = self.get_prompt();
match self.editor.readline(prompt) {
Ok(line) => {
let line = line.trim();
if !self.in_multi_line && line.is_empty() {
continue;
}
if !self.in_multi_line {
match line.to_lowercase().as_str() {
"exit" | "quit" | "\\q" => {
if self.in_transaction {
eprintln!("\x1b[1;33mWarning: Exiting with active transaction. Rolling back...\x1b[0m");
let _ = self.rollback_transaction();
}
break;
}
"help" | "\\h" | "\\?" => {
self.print_help();
continue;
}
_ => {}
}
}
let upper_line = line.to_uppercase();
if upper_line == "BEGIN"
|| upper_line == "COMMIT"
|| upper_line == "ROLLBACK"
|| upper_line.starts_with("BEGIN ")
{
let _ = self.editor.add_history_entry(line);
let start = Instant::now();
if let Err(e) = self.execute_query(line) {
eprintln!("\x1b[1;31mError:\x1b[0m {}", e);
} else {
println!("\x1b[1;32mQuery executed in {:?}\x1b[0m", start.elapsed());
}
continue;
}
if !self.current_query.is_empty() {
self.current_query.push('\n');
}
self.current_query.push_str(line);
let full_query = self.current_query.trim().to_string();
if full_query.ends_with(';') {
let _ = self.editor.add_history_entry(&full_query);
self.in_multi_line = false;
let statements = split_sql_statements(&full_query);
for stmt in statements {
let stmt = stmt.trim();
if stmt.is_empty() {
continue;
}
let start = Instant::now();
if let Err(e) = self.execute_query(stmt) {
eprintln!("\x1b[1;31mError:\x1b[0m {}", e);
} else {
println!(
"\x1b[1;32mQuery executed in {:?}\x1b[0m",
start.elapsed()
);
}
}
self.current_query.clear();
} else {
self.in_multi_line = true;
}
}
Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => {
if self.in_transaction {
eprintln!("\n\x1b[1;33mWarning: Exiting with active transaction. Rolling back...\x1b[0m");
let _ = self.rollback_transaction();
}
break;
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
}
if let Some(home) = dirs::home_dir() {
let history_file = home.join(".oxibase_history");
let _ = self.editor.save_history(&history_file);
}
Ok(())
}
fn execute_query(&mut self, query: &str) -> Result<(), String> {
let upper_query = query.to_uppercase();
let upper_query = upper_query.trim();
match upper_query {
"HELP" | "\\H" | "\\?" => {
self.print_help();
return Ok(());
}
_ => {}
}
if upper_query.starts_with("BEGIN") {
return self.begin_transaction();
} else if upper_query == "COMMIT" {
return self.commit_transaction();
} else if upper_query == "ROLLBACK" {
return self.rollback_transaction();
}
if upper_query.starts_with("SELECT")
|| upper_query.starts_with("WITH")
|| upper_query.starts_with("SHOW")
|| upper_query.starts_with("DESCRIBE")
|| upper_query.starts_with("DESC ")
|| upper_query.starts_with("EXPLAIN")
|| (upper_query.starts_with("PRAGMA") && !upper_query.contains('='))
|| upper_query.contains(" RETURNING ")
|| upper_query.ends_with(" RETURNING")
{
self.execute_read_query(query)
} else {
self.execute_write_query(query)
}
}
fn begin_transaction(&mut self) -> Result<(), String> {
if self.in_transaction {
return Err("already in a transaction".to_string());
}
let tx = self.db.begin().map_err(|e| e.to_string())?;
self.tx = Some(tx);
self.in_transaction = true;
println!("\x1b[1;32mTransaction started\x1b[0m");
Ok(())
}
fn commit_transaction(&mut self) -> Result<(), String> {
if !self.in_transaction {
return Err("not in a transaction".to_string());
}
if let Some(mut tx) = self.tx.take() {
tx.commit().map_err(|e| e.to_string())?;
}
self.in_transaction = false;
println!("\x1b[1;32mTransaction committed\x1b[0m");
Ok(())
}
fn rollback_transaction(&mut self) -> Result<(), String> {
if !self.in_transaction {
return Err("not in a transaction".to_string());
}
if let Some(mut tx) = self.tx.take() {
tx.rollback().map_err(|e| e.to_string())?;
}
self.in_transaction = false;
println!("\x1b[1;33mTransaction rolled back\x1b[0m");
Ok(())
}
fn execute_read_query(&mut self, query: &str) -> Result<(), String> {
let rows_result = if self.in_transaction {
if let Some(ref mut tx) = self.tx {
tx.query(query, ()).map_err(|e| e.to_string())?
} else {
return Err("Transaction not available".to_string());
}
} else if self.timeout_ms > 0 {
self.db
.query_with_timeout(query, (), self.timeout_ms)
.map_err(|e| e.to_string())?
} else {
self.db.query(query, ()).map_err(|e| e.to_string())?
};
let columns: Vec<String> = rows_result.columns().to_vec();
let mut all_rows: Vec<Vec<Value>> = Vec::new();
for row_result in rows_result {
let row = row_result.map_err(|e| e.to_string())?;
let mut values = Vec::new();
for i in 0..row.len() {
values.push(row.get_value(i).cloned().unwrap_or(Value::null_unknown()));
}
all_rows.push(values);
}
let row_count = all_rows.len();
if self.json_output {
self.output_json(&columns, &all_rows, row_count)?;
} else {
self.output_table(&columns, &all_rows, row_count)?;
}
Ok(())
}
fn execute_write_query(&mut self, query: &str) -> Result<(), String> {
let rows_affected = if self.in_transaction {
if let Some(ref mut tx) = self.tx {
tx.execute(query, ()).map_err(|e| e.to_string())?
} else {
return Err("Transaction not available".to_string());
}
} else if self.timeout_ms > 0 {
self.db
.execute_with_timeout(query, (), self.timeout_ms)
.map_err(|e| e.to_string())?
} else {
self.db.execute(query, ()).map_err(|e| e.to_string())?
};
if self.json_output {
println!(r#"{{"rows_affected":{}}}"#, rows_affected);
} else {
let row_text = if rows_affected == 1 { "row" } else { "rows" };
println!("\x1b[1;32m{} {} affected\x1b[0m", rows_affected, row_text);
}
Ok(())
}
fn output_json(
&self,
columns: &[String],
rows: &[Vec<Value>],
row_count: usize,
) -> Result<(), String> {
let json_rows: Vec<Vec<serde_json::Value>> = rows
.iter()
.map(|row| row.iter().map(value_to_json).collect())
.collect();
let result = serde_json::json!({
"columns": columns,
"rows": json_rows,
"count": row_count
});
println!(
"{}",
serde_json::to_string(&result).map_err(|e| e.to_string())?
);
Ok(())
}
fn output_table(
&self,
columns: &[String],
rows: &[Vec<Value>],
row_count: usize,
) -> Result<(), String> {
let mut table = Table::new();
table
.load_preset(UTF8_FULL_CONDENSED)
.set_content_arrangement(ContentArrangement::Dynamic);
table.set_header(columns.iter().map(Cell::new));
if self.limit > 0 && row_count > self.limit {
let top_rows = self.limit / 2;
let bottom_rows = self.limit - top_rows;
for row in rows.iter().take(top_rows) {
table.add_row(row.iter().map(|v| Cell::new(format_value(v))));
}
let hidden_rows = row_count - self.limit;
let mut truncation_row: Vec<Cell> = Vec::new();
let message = format!("... ({} more rows) ...", hidden_rows);
for (i, _) in columns.iter().enumerate() {
if i == columns.len() / 2 {
truncation_row.push(Cell::new(&message));
} else {
truncation_row.push(Cell::new(""));
}
}
table.add_row(truncation_row);
let start_idx = row_count.saturating_sub(bottom_rows).max(top_rows);
for row in rows.iter().skip(start_idx) {
table.add_row(row.iter().map(|v| Cell::new(format_value(v))));
}
} else {
for row in rows {
table.add_row(row.iter().map(|v| Cell::new(format_value(v))));
}
}
println!("{table}");
let row_text = if row_count == 1 { "row" } else { "rows" };
if self.limit > 0 && row_count > self.limit {
println!(
"\x1b[1;32m{} {} in set (showing {})\x1b[0m",
row_count, row_text, self.limit
);
} else {
println!("\x1b[1;32m{} {} in set\x1b[0m", row_count, row_text);
}
Ok(())
}
fn print_help(&self) {
println!("\x1b[1mOxibase SQL CLI Commands:\x1b[0m");
println!();
println!(" \x1b[1;33mSQL Commands:\x1b[0m");
println!(" SELECT ... Execute a SELECT query");
println!(" INSERT ... Insert data into a table");
println!(" UPDATE ... Update data in a table");
println!(" DELETE ... Delete data from a table");
println!(" CREATE TABLE ... Create a new table");
println!(" CREATE INDEX ... Create an index on a column");
println!(" SHOW TABLES List all tables");
println!(" SHOW CREATE TABLE ... Show CREATE TABLE statement for a table");
println!(" SHOW INDEXES FROM ... Show indexes for a table");
println!();
println!(" \x1b[1;33mTransaction Commands:\x1b[0m");
println!(" BEGIN Start a new transaction");
println!(" COMMIT Commit the current transaction");
println!(" ROLLBACK Rollback the current transaction");
println!();
println!(" \x1b[1;33mSpecial Commands:\x1b[0m");
println!(" exit, quit, \\q Exit the CLI");
println!(" help, \\h, \\? Show this help message");
println!();
println!(" \x1b[1;33mKeyboard Shortcuts:\x1b[0m");
println!(" Up/Down arrow keys Navigate command history");
println!(" Ctrl+R Search command history");
println!(" Ctrl+A Move cursor to beginning of line");
println!(" Ctrl+E Move cursor to end of line");
println!(" Ctrl+W Delete word before cursor");
println!(" Ctrl+U Delete from cursor to beginning of line");
println!(" Ctrl+K Delete from cursor to end of line");
println!(" Ctrl+L Clear screen");
println!();
}
}
fn build_dsn(db_path: &str, args: &Args) -> String {
let mut dsn = db_path.to_string();
if !dsn.starts_with("file://") {
return dsn;
}
let mut params = Vec::new();
if let Some(ref profile) = args.persistence_profile {
match profile.to_lowercase().as_str() {
"fast" => {
params.push("sync=none".to_string());
}
"durable" => {
params.push("sync=full".to_string());
}
"normal" => {
}
_ => {
eprintln!(
"Warning: Unknown profile '{}', using 'normal'. Valid: fast, normal, durable",
profile
);
}
}
}
if let Some(ref sync) = args.sync_mode {
params.retain(|p| !p.starts_with("sync="));
match sync.to_lowercase().as_str() {
"none" | "off" => params.push("sync=none".to_string()),
"normal" => params.push("sync=normal".to_string()),
"full" => params.push("sync=full".to_string()),
_ => {
eprintln!(
"Warning: Unknown sync mode '{}', using 'normal'. Valid: none, normal, full",
sync
);
}
}
}
if let Some(interval) = args.snapshot_interval {
params.push(format!("snapshot_interval={}", interval));
}
if let Some(count) = args.keep_snapshots {
params.push(format!("keep_snapshots={}", count));
}
if let Some(mb) = args.wal_max_size {
params.push(format!("wal_max_size={}", mb as u64 * 1024 * 1024));
}
if let Some(ref comp) = args.compression {
match comp.to_lowercase().as_str() {
"on" | "true" | "1" | "yes" => params.push("compression=on".to_string()),
"off" | "false" | "0" | "no" => params.push("compression=off".to_string()),
_ => {
eprintln!(
"Warning: Unknown compression value '{}', using 'on'. Valid: on, off",
comp
);
}
}
}
if !params.is_empty() {
let separator = if dsn.contains('?') { "&" } else { "?" };
dsn.push_str(separator);
dsn.push_str(¶ms.join("&"));
}
dsn
}
fn print_persistence_info(args: &Args) {
let sync_mode = if let Some(ref sync) = args.sync_mode {
sync.to_lowercase()
} else if let Some(ref profile) = args.persistence_profile {
match profile.to_lowercase().as_str() {
"fast" => "none".to_string(),
"durable" => "full".to_string(),
_ => "normal".to_string(),
}
} else {
"normal".to_string()
};
let sync_desc = match sync_mode.as_str() {
"none" | "off" => "none (fastest, less durable)",
"full" => "full (slowest, most durable)",
_ => "normal (balanced)",
};
println!("Persistence: WAL sync mode = {}", sync_desc);
if let Some(interval) = args.snapshot_interval {
println!("Persistence: Snapshot interval = {}s", interval);
}
if let Some(count) = args.keep_snapshots {
println!("Persistence: Keep snapshots = {}", count);
}
if let Some(mb) = args.wal_max_size {
println!("Persistence: WAL max size = {}MB", mb);
}
if let Some(ref comp) = args.compression {
println!("Persistence: Compression = {}", comp);
}
}
fn main() {
let args = Args::parse();
let (log_tx, log_rx) = crossbeam_channel::bounded(10000);
let internal_layer = oxibase::common::logging::InternalLogLayer::new(log_tx);
let fmt_layer = tracing_subscriber::fmt::layer()
.json()
.with_target(true)
.with_thread_ids(true)
.with_level(true)
.with_writer(std::io::stderr);
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::Layer;
let console_filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("error"));
let (trace_tx, trace_rx) = crossbeam_channel::bounded(10000);
let trace_layer = oxibase::common::tracing::SystemTraceLayer::new(trace_tx);
let (metrics_tx, metrics_rx) = crossbeam_channel::bounded(10000);
let metrics_layer = oxibase::common::metrics::SystemMetricsLayer::new(metrics_tx);
let registry = tracing_subscriber::registry()
.with(LevelFilter::INFO)
.with(fmt_layer.with_filter(console_filter))
.with(internal_layer)
.with(trace_layer)
.with(metrics_layer);
if let Ok(endpoint) = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") {
use opentelemetry_otlp::WithExportConfig;
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.expect("Failed to initialize OTLP exporter");
let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.build();
use opentelemetry::trace::TracerProvider;
let tracer = provider.tracer("oxibase");
let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
registry.with(telemetry_layer).init();
} else {
registry.init();
}
let (db_path, is_serve) = match &args.command {
Some(Commands::Repl { db_path }) => (db_path.clone(), false),
#[cfg(feature = "server")]
Some(Commands::Serve { db_path, .. }) => (db_path.clone(), true),
#[cfg(feature = "cli")]
Some(Commands::InstallWorkspace { db_path }) => (db_path.clone(), false),
None => ("memory://".to_string(), false), };
let db_path = build_dsn(&db_path, &args);
if !is_serve || !args.quiet {
println!("Oxibase v{}", version());
}
struct TelemetryFlushers {
log_flusher: Option<(
std::sync::Arc<std::sync::atomic::AtomicBool>,
std::thread::JoinHandle<()>,
)>,
trace_flusher: Option<(
std::sync::Arc<std::sync::atomic::AtomicBool>,
std::thread::JoinHandle<()>,
)>,
metrics_flusher: Option<(
std::sync::Arc<std::sync::atomic::AtomicBool>,
std::thread::JoinHandle<()>,
)>,
}
impl Drop for TelemetryFlushers {
fn drop(&mut self) {
if let Some((flag, handle)) = self.log_flusher.take() {
flag.store(true, std::sync::atomic::Ordering::Relaxed);
let _ = handle.join();
}
if let Some((flag, handle)) = self.trace_flusher.take() {
flag.store(true, std::sync::atomic::Ordering::Relaxed);
let _ = handle.join();
}
if let Some((flag, handle)) = self.metrics_flusher.take() {
flag.store(true, std::sync::atomic::Ordering::Relaxed);
let _ = handle.join();
}
}
}
let db = match Database::open(&db_path) {
Ok(db) => db,
Err(e) => {
eprintln!("Error opening database: {}", e);
std::process::exit(1);
}
};
let log_flusher = oxibase::common::logging::start_log_flusher(db.engine().clone(), log_rx);
let trace_flusher =
oxibase::common::tracing::start_trace_flusher(db.engine().clone(), trace_rx);
let metrics_flusher =
oxibase::common::metrics::start_metrics_flusher(db.engine().clone(), metrics_rx);
let _flushers = TelemetryFlushers {
log_flusher: Some(log_flusher),
trace_flusher: Some(trace_flusher),
metrics_flusher: Some(metrics_flusher),
};
if !args.quiet {
println!("Connected to database: {}", db_path);
if db_path.starts_with("file://") {
print_persistence_info(&args);
}
}
match args.command {
#[cfg(feature = "server")]
Some(Commands::Serve { port, host, .. }) => {
println!("Server starting on {}:{}...", host, port);
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to build tokio runtime");
rt.block_on(async {
let app = oxibase::server::create_router(db);
let addr = format!("{}:{}", host, port);
let listener = tokio::net::TcpListener::bind(&addr)
.await
.expect("Failed to bind to port");
println!("Listening on {}", addr);
axum::serve(listener, app).await.expect("Server failed");
});
return;
}
#[cfg(feature = "cli")]
Some(Commands::InstallWorkspace { .. }) => {
workspace::install(&db);
return;
}
_ => {
}
}
if let Some(ref sql) = args.execute {
if let Err(e) = execute_query_with_options(
&db,
sql,
args.json_output,
args.quiet,
args.limit,
args.timeout_ms,
) {
eprintln!("Error: {}", e);
std::process::exit(1);
}
return;
}
if let Some(ref filename) = args.file {
if let Err(e) = execute_from_file(
&db,
filename,
args.json_output,
args.quiet,
args.limit,
args.timeout_ms,
) {
eprintln!("Error: {}", e);
std::process::exit(1);
}
return;
}
let is_pipe = !std::io::stdin().is_terminal();
if is_pipe {
if let Err(e) = execute_piped_input(
&db,
args.json_output,
args.quiet,
args.limit,
args.timeout_ms,
) {
eprintln!("Error: {}", e);
std::process::exit(1);
}
return;
}
let mut cli = match Cli::new(
db,
args.json_output,
args.limit,
args.quiet,
args.timeout_ms,
) {
Ok(cli) => cli,
Err(e) => {
eprintln!("Error initializing CLI: {}", e);
std::process::exit(1);
}
};
if let Err(e) = cli.run() {
eprintln!("Error: {}", e);
std::process::exit(1);
}
}
fn execute_from_file(
db: &Database,
filename: &str,
json_output: bool,
quiet: bool,
row_limit: usize,
timeout_ms: u64,
) -> Result<(), String> {
let file =
File::open(filename).map_err(|e| format!("Error opening file {}: {}", filename, e))?;
let reader = BufReader::new(file);
let mut current_statement = String::new();
for line_result in reader.lines() {
let line = line_result.map_err(|e| format!("Error reading file: {}", e))?;
let trimmed = line.trim();
if trimmed.starts_with('#') {
continue;
}
if trimmed.starts_with("--") || (trimmed.starts_with("/*") && trimmed.ends_with("*/")) {
continue;
}
if trimmed.is_empty() && !current_statement.is_empty() {
let q = current_statement.trim().to_string();
current_statement.clear();
if !q.is_empty() {
let statements = split_sql_statements(&q);
for stmt in statements {
let stmt = stmt.trim();
if stmt.is_empty() {
continue;
}
if let Err(e) = execute_query_with_options(
db,
stmt,
json_output,
quiet,
row_limit,
timeout_ms,
) {
eprintln!("Error: {}", e);
}
}
}
} else {
current_statement.push_str(&line);
current_statement.push('\n');
}
}
if !current_statement.is_empty() {
let q = current_statement.trim().to_string();
if !q.is_empty() {
let statements = split_sql_statements(&q);
for stmt in statements {
let stmt = stmt.trim();
if stmt.is_empty() {
continue;
}
if let Err(e) =
execute_query_with_options(db, stmt, json_output, quiet, row_limit, timeout_ms)
{
eprintln!("Error: {}", e);
}
}
}
}
Ok(())
}
fn execute_piped_input(
db: &Database,
json_output: bool,
quiet: bool,
row_limit: usize,
timeout_ms: u64,
) -> Result<(), String> {
let stdin = io::stdin();
let reader = stdin.lock();
let mut current_statement = String::new();
for line_result in reader.lines() {
let line = line_result.map_err(|e| format!("Error reading input: {}", e))?;
let trimmed = line.trim();
if trimmed.starts_with('#') {
continue;
}
if trimmed.starts_with("--") || (trimmed.starts_with("/*") && trimmed.ends_with("*/")) {
continue;
}
if trimmed.is_empty() && !current_statement.is_empty() {
let q = current_statement.trim().to_string();
current_statement.clear();
if !q.is_empty() {
let statements = split_sql_statements(&q);
for stmt in statements {
let stmt = stmt.trim();
if stmt.is_empty() {
continue;
}
let start = Instant::now();
if let Err(e) = execute_query_with_options(
db,
stmt,
json_output,
quiet,
row_limit,
timeout_ms,
) {
eprintln!("Error: {}", e);
} else if !json_output && !quiet {
println!("Query executed in {:?}", start.elapsed());
}
}
}
} else {
current_statement.push_str(&line);
current_statement.push('\n');
}
}
if !current_statement.is_empty() {
let q = current_statement.trim().to_string();
if !q.is_empty() {
let statements = split_sql_statements(&q);
for stmt in statements {
let stmt = stmt.trim();
if stmt.is_empty() {
continue;
}
let start = Instant::now();
if let Err(e) =
execute_query_with_options(db, stmt, json_output, quiet, row_limit, timeout_ms)
{
eprintln!("Error: {}", e);
} else if !json_output && !quiet {
println!("Query executed in {:?}", start.elapsed());
}
}
}
}
Ok(())
}
fn execute_query_with_options(
db: &Database,
query: &str,
json_output: bool,
quiet: bool,
row_limit: usize,
timeout_ms: u64,
) -> Result<(), String> {
let upper_query = query.to_uppercase();
let upper_query = upper_query.trim();
match upper_query {
"HELP" | "\\H" | "\\?" => {
print_help_main();
return Ok(());
}
"EXIT" | "QUIT" | "\\Q" => {
return Err("exit requested".to_string());
}
_ => {}
}
let (sql, params) = parse_params(query);
if upper_query.starts_with("SELECT")
|| upper_query.starts_with("WITH")
|| upper_query.starts_with("SHOW")
|| upper_query.starts_with("DESCRIBE")
|| upper_query.starts_with("DESC ")
|| upper_query.starts_with("EXPLAIN")
|| (upper_query.starts_with("PRAGMA") && !upper_query.contains('='))
|| upper_query.contains(" RETURNING ")
|| upper_query.ends_with(" RETURNING")
{
let rows_result = if timeout_ms > 0 {
db.query_with_timeout(&sql, params, timeout_ms)
.map_err(|e| e.to_string())?
} else {
db.query(&sql, params).map_err(|e| e.to_string())?
};
let columns: Vec<String> = rows_result.columns().to_vec();
let mut all_rows: Vec<Vec<Value>> = Vec::new();
for row_result in rows_result {
let row = row_result.map_err(|e| e.to_string())?;
let mut values = Vec::new();
for i in 0..row.len() {
values.push(row.get_value(i).cloned().unwrap_or(Value::null_unknown()));
}
all_rows.push(values);
}
let row_count = all_rows.len();
if json_output {
output_json(&columns, &all_rows, row_count)?;
} else {
output_table(&columns, &all_rows, row_count, row_limit, quiet)?;
}
} else {
let rows_affected = if timeout_ms > 0 {
db.execute_with_timeout(&sql, params, timeout_ms)
.map_err(|e| e.to_string())?
} else {
db.execute(&sql, params).map_err(|e| e.to_string())?
};
if json_output {
println!(r#"{{"rows_affected":{}}}"#, rows_affected);
} else if !quiet {
println!("{} rows affected", rows_affected);
}
}
Ok(())
}
fn parse_params(query: &str) -> (String, Vec<Value>) {
let parts: Vec<&str> = query.split(" -- PARAMS: ").collect();
if parts.len() <= 1 {
let sql = parts[0].trim().trim_end_matches(';').to_string();
return (sql, Vec::new());
}
let sql = parts[0].trim().trim_end_matches(';').to_string();
let param_string = parts[1].trim();
let mut params = Vec::new();
for val in param_string.split(',') {
params.push(convert_param_value(val.trim()));
}
(sql, params)
}
fn convert_param_value(value: &str) -> Value {
if let Ok(i) = value.parse::<i64>() {
return Value::Integer(i);
}
if value.contains('.') {
if let Ok(f) = value.parse::<f64>() {
return Value::Float(f);
}
}
if value == "true" {
return Value::Boolean(true);
}
if value == "false" {
return Value::Boolean(false);
}
if value.eq_ignore_ascii_case("null") {
return Value::null_unknown();
}
Value::text(value)
}
fn output_json(columns: &[String], rows: &[Vec<Value>], row_count: usize) -> Result<(), String> {
let json_rows: Vec<Vec<serde_json::Value>> = rows
.iter()
.map(|row| row.iter().map(value_to_json).collect())
.collect();
let result = serde_json::json!({
"columns": columns,
"rows": json_rows,
"count": row_count
});
println!(
"{}",
serde_json::to_string(&result).map_err(|e| e.to_string())?
);
Ok(())
}
fn output_table(
columns: &[String],
rows: &[Vec<Value>],
row_count: usize,
row_limit: usize,
quiet: bool,
) -> Result<(), String> {
for (i, column) in columns.iter().enumerate() {
if i > 0 {
print!(" | ");
}
print!("{}", column);
}
println!();
for (i, _) in columns.iter().enumerate() {
if i > 0 {
print!("-+-");
}
print!("----");
}
println!();
if row_limit == 0 || row_count <= row_limit {
for row in rows {
for (i, value) in row.iter().enumerate() {
if i > 0 {
print!(" | ");
}
print!("{}", format_value(value));
}
println!();
}
if !quiet {
println!("{} rows in set", row_count);
}
} else {
let top_rows = row_limit / 2;
let bottom_rows = row_limit - top_rows;
for row in rows.iter().take(top_rows) {
for (i, value) in row.iter().enumerate() {
if i > 0 {
print!(" | ");
}
print!("{}", format_value(value));
}
println!();
}
let hidden_rows = row_count - row_limit;
println!();
println!(" \x1b[2m... ({} more rows) ...\x1b[0m", hidden_rows);
println!();
let start_idx = row_count.saturating_sub(bottom_rows).max(top_rows);
for row in rows.iter().skip(start_idx) {
for (i, value) in row.iter().enumerate() {
if i > 0 {
print!(" | ");
}
print!("{}", format_value(value));
}
println!();
}
if !quiet {
println!("{} rows in set (showing {})", row_count, row_limit);
}
}
Ok(())
}
fn format_value(value: &Value) -> String {
match value {
Value::Null(_) => "NULL".to_string(),
Value::Integer(i) => i.to_string(),
Value::Float(f) => {
if *f == f.trunc() {
format!("{:.1}", f)
} else {
format!("{:.4}", f)
.trim_end_matches('0')
.trim_end_matches('.')
.to_string()
}
}
Value::Text(s) => s.to_string(),
Value::Boolean(b) => if *b { "true" } else { "false" }.to_string(),
Value::Timestamp(ts) => ts.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
Value::Json(s) => s.to_string(),
}
}
fn value_to_json(value: &Value) -> serde_json::Value {
match value {
Value::Null(_) => serde_json::Value::Null,
Value::Integer(i) => serde_json::json!(i),
Value::Float(f) => serde_json::json!(f),
Value::Text(s) => serde_json::json!(s.as_ref()),
Value::Boolean(b) => serde_json::json!(b),
Value::Timestamp(ts) => serde_json::json!(ts.format("%Y-%m-%dT%H:%M:%SZ").to_string()),
Value::Json(s) => serde_json::json!(s.as_ref()),
}
}
fn split_sql_statements(input: &str) -> Vec<String> {
let mut statements = Vec::new();
let mut current_statement = String::new();
let mut in_single_quotes = false;
let mut in_double_quotes = false;
let mut in_line_comment = false;
let mut in_block_comment = false;
let chars: Vec<char> = input.chars().collect();
let mut i = 0;
while i < chars.len() {
let char = chars[i];
if in_line_comment {
if char == '\n' {
in_line_comment = false;
current_statement.push(char);
}
i += 1;
continue;
}
if !in_single_quotes
&& !in_double_quotes
&& !in_block_comment
&& char == '-'
&& i + 1 < chars.len()
&& chars[i + 1] == '-'
{
let after_second_dash = if i + 2 < chars.len() {
chars[i + 2]
} else {
'\0' };
if after_second_dash == '\0'
|| after_second_dash == ' '
|| after_second_dash == '\t'
|| after_second_dash == '\n'
|| after_second_dash == '\r'
{
in_line_comment = true;
i += 2;
continue;
}
}
if in_block_comment {
if char == '*' && i + 1 < chars.len() && chars[i + 1] == '/' {
in_block_comment = false;
i += 2;
continue;
}
i += 1;
continue;
}
if !in_single_quotes
&& !in_double_quotes
&& char == '/'
&& i + 1 < chars.len()
&& chars[i + 1] == '*'
{
in_block_comment = true;
i += 2;
continue;
}
if !in_block_comment && !in_line_comment {
if char == '\'' && (i == 0 || chars[i - 1] != '\\') {
in_single_quotes = !in_single_quotes;
} else if char == '"' && (i == 0 || chars[i - 1] != '\\') {
in_double_quotes = !in_double_quotes;
}
}
if char == ';'
&& !in_single_quotes
&& !in_double_quotes
&& !in_block_comment
&& !in_line_comment
{
statements.push(current_statement.clone());
current_statement.clear();
} else {
current_statement.push(char);
}
i += 1;
}
if !current_statement.is_empty() {
statements.push(current_statement);
}
statements
}
fn print_help_main() {
println!("Oxibase SQL CLI");
println!();
println!(" SQL Commands:");
println!(" SELECT ... Execute a SELECT query");
println!(" INSERT ... Insert data into a table");
println!(" UPDATE ... Update data in a table");
println!(" DELETE ... Delete data from a table");
println!(" CREATE TABLE ... Create a new table");
println!(" CREATE INDEX ... Create an index on a column");
println!(" SHOW TABLES List all tables");
println!(" SHOW CREATE TABLE ... Show CREATE TABLE statement for a table");
println!(" SHOW INDEXES FROM ... Show indexes for a table");
println!();
println!(" Transaction Commands:");
println!(" BEGIN Start a new transaction");
println!(" COMMIT Commit the current transaction");
println!(" ROLLBACK Rollback the current transaction");
println!();
println!(" Special Commands:");
println!(" help, \\h, \\? Show this help message");
println!();
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sql_helper_keyword_completion() {
let db = Database::open_in_memory().unwrap();
let helper = SqlHelper::new(db);
let history = DefaultHistory::new();
let ctx = Context::new(&history);
let (pos, candidates) = helper.complete("", 0, &ctx).unwrap();
assert_eq!(pos, 0);
assert!(candidates.is_empty());
let (pos, candidates) = helper.complete("SEL", 3, &ctx).unwrap();
assert_eq!(pos, 0);
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].display, "select");
let (pos, candidates) = helper.complete("crea", 4, &ctx).unwrap();
assert_eq!(pos, 0);
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].display, "create");
let (pos, candidates) = helper.complete("C", 1, &ctx).unwrap();
assert_eq!(pos, 0);
assert!(candidates.iter().any(|c| c.display == "create"));
assert!(candidates.iter().any(|c| c.display == "commit"));
let (pos, candidates) = helper.complete("CREATE tab", 10, &ctx).unwrap();
assert_eq!(pos, 7); assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].display, "table");
let (pos, candidates) = helper.complete("he", 2, &ctx).unwrap();
assert_eq!(pos, 0);
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].display, "help");
}
#[test]
fn test_sql_helper_context_aware_completion() {
let db = Database::open_in_memory().unwrap();
db.execute("CREATE TABLE my_awesome_table (id INTEGER)", ())
.unwrap();
let helper = SqlHelper::new(db);
let history = DefaultHistory::new();
let ctx = Context::new(&history);
let (pos, candidates) = helper.complete("SELECT * FROM my_a", 18, &ctx).unwrap();
assert_eq!(pos, 14); assert!(candidates.iter().any(|c| c.display == "my_awesome_table"));
let (pos, candidates) = helper.complete("UPDATE my", 9, &ctx).unwrap();
assert_eq!(pos, 7); assert!(candidates.iter().any(|c| c.display == "my_awesome_table"));
let (pos, candidates) = helper.complete("INSERT INTO my", 14, &ctx).unwrap();
assert_eq!(pos, 12); assert!(candidates.iter().any(|c| c.display == "my_awesome_table"));
let (pos, candidates) = helper.complete("SELECT * FROM ", 14, &ctx).unwrap();
assert_eq!(pos, 14); assert!(candidates.iter().any(|c| c.display == "my_awesome_table"));
let (pos, candidates) = helper.complete("SELECT * FROM pub", 17, &ctx).unwrap();
assert_eq!(pos, 14); assert!(candidates.iter().any(|c| c.display == "public."));
let (pos, candidates) = helper
.complete("SELECT * FROM public.my_", 24, &ctx)
.unwrap();
assert_eq!(pos, 14); assert!(candidates.iter().any(|c| c.display == "my_awesome_table"));
assert!(candidates
.iter()
.any(|c| c.replacement == "public.my_awesome_table "));
}
}