use std::fs::File;
use std::io::{self, BufRead, BufReader, IsTerminal};
use std::time::Instant;
use clap::Parser;
use comfy_table::{presets::UTF8_FULL_CONDENSED, Cell, ContentArrangement, Table};
use rustyline::error::ReadlineError;
use rustyline::history::DefaultHistory;
use rustyline::{Config, DefaultEditor, EditMode, Editor};
use stoolap::api::{Database, Transaction as ApiTransaction};
use stoolap::common::version::{MAJOR, MINOR, PATCH};
use stoolap::Value;
const VERSION: &str = concat!(
env!("CARGO_PKG_VERSION_MAJOR"),
".",
env!("CARGO_PKG_VERSION_MINOR"),
".",
env!("CARGO_PKG_VERSION_PATCH")
);
#[derive(Parser, Debug)]
#[command(name = "stoolap")]
#[command(author = "Stoolap Contributors")]
#[command(version = VERSION)]
#[command(about = "High-performance embedded SQL database with MVCC")]
#[command(
long_about = "Stoolap is a high-performance embedded SQL database with MVCC transactions.\n\
This CLI provides an interactive interface to execute SQL queries and manage your database.\n\n\
PERSISTENCE DSN PARAMETERS:\n\
file:///path/to/db?param=value¶m2=value2\n\n\
sync_mode=none|normal|full Fsync mode (default: normal)\n\
none: no fsync, durable at checkpoint only\n\
normal: fsync every 1 second, DDL fsyncs immediately\n\
full: fsync on every write\n\
checkpoint_interval=SECS Checkpoint interval in seconds (default: 60)\n\
compact_threshold=COUNT Sub-target volumes per table before merging (default: 4)\n\
compression=on|off WAL + volume LZ4 compression (default: on)\n\
wal_compression=on|off WAL compression only (default: on)\n\
volume_compression=on|off Volume LZ4 compression only (default: on)\n\
keep_snapshots=COUNT Backup snapshots per table (default: 3)\n\
checkpoint_on_close=on|off Checkpoint on shutdown (default: on)\n\
wal_max_size=BYTES Max WAL file size before rotation (default: 67108864)\n\
wal_buffer_size=BYTES WAL buffer size (default: 65536)\n\
wal_flush_trigger=BYTES Buffer size to trigger flush (default: 32768)\n\
commit_batch_size=COUNT Commits to batch before sync (default: 100)\n\
sync_interval_ms=MS Min time between syncs in normal mode (default: 1000)\n\
compression_threshold=BYTES Min size to compress (default: 64)\n\
target_volume_rows=ROWS Target rows per cold volume (default: 1048576)\n\n\
EXAMPLES:\n\
stoolap -d memory:// In-memory database\n\
stoolap -d file:///tmp/mydb Persistent database\n\
stoolap -d file:///tmp/mydb?sync_mode=full Maximum durability\n\
stoolap -d file:///tmp/mydb?sync_mode=none&compression=off Max performance\n\
stoolap -d file:///tmp/mydb --profile durable Use durable preset\n\
stoolap -d file:///tmp/mydb --sync full --compression off\n\n\
BACKUP & RESTORE:\n\
stoolap -d file:///tmp/mydb --snapshot Create backup snapshot\n\
stoolap -d file:///tmp/mydb --restore Restore from latest snapshot\n\
stoolap -d file:///tmp/mydb --restore 20260315-100000 Restore specific snapshot\n\
stoolap -d file:///tmp/mydb --reset-volumes --restore Recovery from corrupted volumes"
)]
struct Args {
#[arg(short = 'd', long = "db", default_value = "memory://")]
db_path: String,
#[arg(short = 'j', long = "json", default_value = "false")]
json_output: bool,
#[arg(short = 'q', long = "quiet", default_value = "false")]
quiet: bool,
#[arg(short = 'l', long = "limit", default_value = "40")]
limit: usize,
#[arg(short = 'e', long = "execute")]
execute: Option<String>,
#[arg(short = 'f', long = "file")]
file: Option<String>,
#[arg(short = 's', long = "sync", value_name = "MODE")]
sync_mode: Option<String>,
#[arg(short = 'p', long = "profile", value_name = "PROFILE")]
persistence_profile: Option<String>,
#[arg(long = "checkpoint-interval", value_name = "SECONDS")]
checkpoint_interval: Option<u32>,
#[arg(long = "compact-threshold", value_name = "COUNT")]
compact_threshold: Option<u32>,
#[arg(long = "wal-max-size", value_name = "MB")]
wal_max_size: Option<u32>,
#[arg(long = "compression", value_name = "on|off")]
compression: Option<String>,
#[arg(long = "keep-snapshots", value_name = "COUNT")]
keep_snapshots: Option<u32>,
#[arg(long = "no-checkpoint-on-close")]
no_checkpoint_on_close: bool,
#[arg(long = "restore", value_name = "TIMESTAMP", num_args = 0..=1, default_missing_value = "")]
restore: Option<String>,
#[arg(long = "snapshot")]
snapshot: bool,
#[arg(long = "reset-volumes")]
reset_volumes: bool,
#[arg(short = 't', long = "timeout", value_name = "MS", default_value = "0")]
timeout_ms: u64,
}
struct Cli {
db: Database,
tx: Option<ApiTransaction>,
in_transaction: bool,
json_output: bool,
limit: usize,
#[allow(dead_code)]
quiet: bool,
timeout_ms: u64,
editor: Editor<(), DefaultHistory>,
current_query: String,
in_multi_line: bool,
}
impl Cli {
fn new(
db: Database,
json_output: bool,
limit: usize,
quiet: bool,
timeout_ms: u64,
) -> io::Result<Self> {
let config = Config::builder()
.history_ignore_space(true)
.edit_mode(EditMode::Emacs)
.build();
let mut editor =
DefaultEditor::with_config(config).map_err(|e| io::Error::other(e.to_string()))?;
if let Some(home) = dirs::home_dir() {
let history_file = home.join(".stoolap_history");
let _ = editor.load_history(&history_file);
}
Ok(Self {
db,
tx: None,
in_transaction: false,
json_output,
limit,
quiet,
timeout_ms,
editor,
current_query: String::new(),
in_multi_line: false,
})
}
fn get_prompt(&self) -> &'static str {
if self.in_multi_line {
if self.in_transaction {
"\x1b[1;33m[TXN]->\x1b[0m "
} else {
"\x1b[1;36m->\x1b[0m "
}
} else if self.in_transaction {
"\x1b[1;33m[TXN]>\x1b[0m "
} else {
"\x1b[1;36m>\x1b[0m "
}
}
fn run(&mut self) -> io::Result<()> {
println!("Stoolap v{}.{}.{}", MAJOR, MINOR, PATCH);
println!("Enter SQL commands, 'help' for assistance, or 'exit' to quit.");
println!("Use Up/Down arrows for history, Ctrl+R to search history.");
if self.json_output {
println!("JSON output mode enabled.");
}
println!();
loop {
let prompt = self.get_prompt();
match self.editor.readline(prompt) {
Ok(line) => {
let line = line.trim();
if !self.in_multi_line && line.is_empty() {
continue;
}
if !self.in_multi_line {
match line.to_lowercase().as_str() {
"exit" | "quit" | "\\q" => {
if self.in_transaction {
eprintln!("\x1b[1;33mWarning: Exiting with active transaction. Rolling back...\x1b[0m");
let _ = self.rollback_transaction();
}
break;
}
"help" | "\\h" | "\\?" => {
self.print_help();
continue;
}
_ => {}
}
}
let upper_line = line.to_uppercase();
if upper_line == "BEGIN"
|| upper_line == "COMMIT"
|| upper_line == "ROLLBACK"
|| upper_line.starts_with("BEGIN ")
{
let _ = self.editor.add_history_entry(line);
let start = Instant::now();
if let Err(e) = self.execute_query(line) {
eprintln!("\x1b[1;31mError:\x1b[0m {}", e);
} else {
println!("\x1b[1;32mQuery executed in {:?}\x1b[0m", start.elapsed());
}
continue;
}
if !self.current_query.is_empty() {
self.current_query.push('\n');
}
self.current_query.push_str(line);
let full_query = self.current_query.trim().to_string();
if full_query.ends_with(';') {
let history_entry = full_query.replace('\n', "\\n");
let _ = self.editor.add_history_entry(&history_entry);
self.in_multi_line = false;
let statements = split_sql_statements(&full_query);
for stmt in statements {
let stmt = stmt.trim();
if stmt.is_empty() {
continue;
}
let start = Instant::now();
if let Err(e) = self.execute_query(stmt) {
eprintln!("\x1b[1;31mError:\x1b[0m {}", e);
} else {
println!(
"\x1b[1;32mQuery executed in {:?}\x1b[0m",
start.elapsed()
);
}
}
self.current_query.clear();
} else {
self.in_multi_line = true;
}
}
Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => {
if self.in_transaction {
eprintln!("\n\x1b[1;33mWarning: Exiting with active transaction. Rolling back...\x1b[0m");
let _ = self.rollback_transaction();
}
break;
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
}
if let Some(home) = dirs::home_dir() {
let history_file = home.join(".stoolap_history");
let _ = self.editor.save_history(&history_file);
}
Ok(())
}
fn execute_query(&mut self, query: &str) -> Result<(), String> {
let upper_query = query.to_uppercase();
let upper_query = upper_query.trim();
match upper_query {
"HELP" | "\\H" | "\\?" => {
self.print_help();
return Ok(());
}
_ => {}
}
if upper_query.starts_with("BEGIN") {
return self.begin_transaction();
} else if upper_query == "COMMIT" {
return self.commit_transaction();
} else if upper_query == "ROLLBACK" {
return self.rollback_transaction();
}
if upper_query.starts_with("SELECT")
|| upper_query.starts_with("WITH")
|| upper_query.starts_with("SHOW")
|| upper_query.starts_with("DESCRIBE")
|| upper_query.starts_with("DESC ")
|| upper_query.starts_with("EXPLAIN")
|| upper_query.starts_with("VACUUM")
|| (upper_query.starts_with("PRAGMA") && !upper_query.contains('='))
|| upper_query.contains(" RETURNING ")
|| upper_query.ends_with(" RETURNING")
{
self.execute_read_query(query)
} else {
self.execute_write_query(query)
}
}
fn begin_transaction(&mut self) -> Result<(), String> {
if self.in_transaction {
return Err("already in a transaction".to_string());
}
let tx = self.db.begin().map_err(|e| e.to_string())?;
self.tx = Some(tx);
self.in_transaction = true;
println!("\x1b[1;32mTransaction started\x1b[0m");
Ok(())
}
fn commit_transaction(&mut self) -> Result<(), String> {
if !self.in_transaction {
return Err("not in a transaction".to_string());
}
if let Some(mut tx) = self.tx.take() {
tx.commit().map_err(|e| e.to_string())?;
}
self.in_transaction = false;
println!("\x1b[1;32mTransaction committed\x1b[0m");
Ok(())
}
fn rollback_transaction(&mut self) -> Result<(), String> {
if !self.in_transaction {
return Err("not in a transaction".to_string());
}
if let Some(mut tx) = self.tx.take() {
tx.rollback().map_err(|e| e.to_string())?;
}
self.in_transaction = false;
println!("\x1b[1;33mTransaction rolled back\x1b[0m");
Ok(())
}
fn execute_read_query(&mut self, query: &str) -> Result<(), String> {
let rows_result = if self.in_transaction {
if let Some(ref mut tx) = self.tx {
tx.query(query, ()).map_err(|e| e.to_string())?
} else {
return Err("Transaction not available".to_string());
}
} else if self.timeout_ms > 0 {
self.db
.query_with_timeout(query, (), self.timeout_ms)
.map_err(|e| e.to_string())?
} else {
self.db.query(query, ()).map_err(|e| e.to_string())?
};
let columns: Vec<String> = rows_result.columns().to_vec();
let mut all_rows: Vec<Vec<Value>> = Vec::new();
for row_result in rows_result {
let row = row_result.map_err(|e| e.to_string())?;
let mut values = Vec::new();
for i in 0..row.len() {
values.push(row.get_value(i).cloned().unwrap_or(Value::null_unknown()));
}
all_rows.push(values);
}
let row_count = all_rows.len();
if self.json_output {
self.output_json(&columns, &all_rows, row_count)?;
} else {
self.output_table(&columns, &all_rows, row_count)?;
}
Ok(())
}
fn execute_write_query(&mut self, query: &str) -> Result<(), String> {
let rows_affected = if self.in_transaction {
if let Some(ref mut tx) = self.tx {
tx.execute(query, ()).map_err(|e| e.to_string())?
} else {
return Err("Transaction not available".to_string());
}
} else if self.timeout_ms > 0 {
self.db
.execute_with_timeout(query, (), self.timeout_ms)
.map_err(|e| e.to_string())?
} else {
self.db.execute(query, ()).map_err(|e| e.to_string())?
};
if self.json_output {
println!(r#"{{"rows_affected":{}}}"#, rows_affected);
} else {
let row_text = if rows_affected == 1 { "row" } else { "rows" };
println!("\x1b[1;32m{} {} affected\x1b[0m", rows_affected, row_text);
}
Ok(())
}
fn output_json(
&self,
columns: &[String],
rows: &[Vec<Value>],
row_count: usize,
) -> Result<(), String> {
let json_rows: Vec<Vec<serde_json::Value>> = rows
.iter()
.map(|row| row.iter().map(value_to_json).collect())
.collect();
let result = serde_json::json!({
"columns": columns,
"rows": json_rows,
"count": row_count
});
println!(
"{}",
serde_json::to_string(&result).map_err(|e| e.to_string())?
);
Ok(())
}
fn output_explain_plan(&self, rows: &[Vec<Value>], row_count: usize) -> Result<(), String> {
println!();
for row in rows {
if let Some(Value::Text(line)) = row.first() {
println!("{}", colorize_plan_line(line));
}
}
println!();
let line_text = if row_count == 1 { "line" } else { "lines" };
println!("\x1b[2m({} {} in plan)\x1b[0m", row_count, line_text);
Ok(())
}
fn output_table(
&self,
columns: &[String],
rows: &[Vec<Value>],
row_count: usize,
) -> Result<(), String> {
if columns.len() == 1 && columns[0] == "plan" && is_explain_output(rows) {
return self.output_explain_plan(rows, row_count);
}
let mut table = Table::new();
table
.load_preset(UTF8_FULL_CONDENSED)
.set_content_arrangement(ContentArrangement::Dynamic);
table.set_header(columns.iter().map(Cell::new));
if self.limit > 0 && row_count > self.limit {
let top_rows = self.limit / 2;
let bottom_rows = self.limit - top_rows;
for row in rows.iter().take(top_rows) {
table.add_row(row.iter().map(|v| Cell::new(format_value(v))));
}
let hidden_rows = row_count - self.limit;
let mut truncation_row: Vec<Cell> = Vec::new();
let message = format!("... ({} more rows) ...", hidden_rows);
for (i, _) in columns.iter().enumerate() {
if i == columns.len() / 2 {
truncation_row.push(Cell::new(&message));
} else {
truncation_row.push(Cell::new(""));
}
}
table.add_row(truncation_row);
let start_idx = row_count.saturating_sub(bottom_rows).max(top_rows);
for row in rows.iter().skip(start_idx) {
table.add_row(row.iter().map(|v| Cell::new(format_value(v))));
}
} else {
for row in rows {
table.add_row(row.iter().map(|v| Cell::new(format_value(v))));
}
}
println!("{table}");
let row_text = if row_count == 1 { "row" } else { "rows" };
if self.limit > 0 && row_count > self.limit {
println!(
"\x1b[1;32m{} {} in set (showing {})\x1b[0m",
row_count, row_text, self.limit
);
} else {
println!("\x1b[1;32m{} {} in set\x1b[0m", row_count, row_text);
}
Ok(())
}
fn print_help(&self) {
println!("\x1b[1mStoolap SQL CLI Commands:\x1b[0m");
println!();
println!(" \x1b[1;33mSQL Commands:\x1b[0m");
println!(" SELECT ... Execute a SELECT query");
println!(" INSERT ... Insert data into a table");
println!(" UPDATE ... Update data in a table");
println!(" DELETE ... Delete data from a table");
println!(" CREATE TABLE ... Create a new table");
println!(" CREATE INDEX ... Create an index on a column");
println!(" ALTER TABLE ... Modify table schema");
println!(" SHOW TABLES List all tables");
println!(" SHOW CREATE TABLE ... Show CREATE TABLE statement for a table");
println!(" SHOW INDEXES FROM ... Show indexes for a table");
println!();
println!(" \x1b[1;33mTransaction Commands:\x1b[0m");
println!(" BEGIN Start a new transaction");
println!(" COMMIT Commit the current transaction");
println!(" ROLLBACK Rollback the current transaction");
println!();
println!(" \x1b[1;33mPRAGMA Commands:\x1b[0m");
println!(" PRAGMA CHECKPOINT Seal hot data to volumes, compact, truncate WAL");
println!(" PRAGMA SNAPSHOT Create backup snapshot (.bin files)");
println!(" PRAGMA RESTORE Restore from latest backup snapshot");
println!(
" PRAGMA key=value Set configuration (sync_mode, checkpoint_interval, ...)"
);
println!(" PRAGMA key Show current configuration value");
println!();
println!(" \x1b[1;33mSpecial Commands:\x1b[0m");
println!(" exit, quit, \\q Exit the CLI");
println!(" help, \\h, \\? Show this help message");
println!();
println!(" \x1b[1;33mKeyboard Shortcuts:\x1b[0m");
println!(" Up/Down arrow keys Navigate command history");
println!(" Ctrl+R Search command history");
println!(" Ctrl+A Move cursor to beginning of line");
println!(" Ctrl+E Move cursor to end of line");
println!(" Ctrl+W Delete word before cursor");
println!(" Ctrl+U Delete from cursor to beginning of line");
println!(" Ctrl+K Delete from cursor to end of line");
println!(" Ctrl+L Clear screen");
println!();
}
}
fn build_dsn(args: &Args) -> String {
let mut dsn = args.db_path.clone();
if !dsn.starts_with("file://") {
return dsn;
}
let mut params = Vec::new();
if let Some(ref profile) = args.persistence_profile {
match profile.to_lowercase().as_str() {
"fast" => {
params.push("sync=none".to_string());
}
"durable" => {
params.push("sync=full".to_string());
}
"normal" => {
}
_ => {
eprintln!(
"Warning: Unknown profile '{}', using 'normal'. Valid: fast, normal, durable",
profile
);
}
}
}
if let Some(ref sync) = args.sync_mode {
params.retain(|p| !p.starts_with("sync="));
match sync.to_lowercase().as_str() {
"none" | "off" => params.push("sync=none".to_string()),
"normal" => params.push("sync=normal".to_string()),
"full" => params.push("sync=full".to_string()),
_ => {
eprintln!(
"Warning: Unknown sync mode '{}', using 'normal'. Valid: none, normal, full",
sync
);
}
}
}
if let Some(interval) = args.checkpoint_interval {
params.push(format!("checkpoint_interval={}", interval));
}
if let Some(count) = args.compact_threshold {
params.push(format!("compact_threshold={}", count));
}
if let Some(mb) = args.wal_max_size {
params.push(format!("wal_max_size={}", mb as u64 * 1024 * 1024));
}
if let Some(ref comp) = args.compression {
match comp.to_lowercase().as_str() {
"on" | "true" | "1" | "yes" => params.push("compression=on".to_string()),
"off" | "false" | "0" | "no" => params.push("compression=off".to_string()),
_ => {
eprintln!(
"Warning: Unknown compression value '{}', using 'on'. Valid: on, off",
comp
);
}
}
}
if let Some(count) = args.keep_snapshots {
params.push(format!("keep_snapshots={}", count));
}
if args.no_checkpoint_on_close {
params.push("checkpoint_on_close=off".to_string());
}
if !params.is_empty() {
let separator = if dsn.contains('?') { "&" } else { "?" };
dsn.push_str(separator);
dsn.push_str(¶ms.join("&"));
}
dsn
}
fn print_persistence_info(args: &Args) {
let sync_mode = if let Some(ref sync) = args.sync_mode {
sync.to_lowercase()
} else if let Some(ref profile) = args.persistence_profile {
match profile.to_lowercase().as_str() {
"fast" => "none".to_string(),
"durable" => "full".to_string(),
_ => "normal".to_string(),
}
} else {
"normal".to_string()
};
let sync_desc = match sync_mode.as_str() {
"none" | "off" => "none (fastest, less durable)",
"full" => "full (slowest, most durable)",
_ => "normal (balanced)",
};
println!("Persistence: WAL sync mode = {}", sync_desc);
if let Some(interval) = args.checkpoint_interval {
println!("Persistence: Checkpoint interval = {}s", interval);
}
if let Some(count) = args.compact_threshold {
println!("Persistence: Compact threshold = {}", count);
}
if let Some(count) = args.keep_snapshots {
println!("Persistence: Keep snapshots = {}", count);
}
if let Some(mb) = args.wal_max_size {
println!("Persistence: WAL max size = {}MB", mb);
}
if let Some(ref comp) = args.compression {
println!("Persistence: Compression = {}", comp);
}
if args.no_checkpoint_on_close {
println!("Persistence: Checkpoint on close = off");
}
}
fn main() {
let args = Args::parse();
let db_path = build_dsn(&args);
let db_dir = if let Some(path) = db_path.strip_prefix("file://") {
Some(std::path::PathBuf::from(
path.split('?').next().unwrap_or(path),
))
} else {
None
};
if args.reset_volumes {
if let Some(ref dir) = db_dir {
let vol_dir = dir.join("volumes");
if vol_dir.exists() {
match std::fs::remove_dir_all(&vol_dir) {
Ok(()) => {
if !args.quiet {
eprintln!("Removed volumes/ directory for recovery");
}
}
Err(e) => {
eprintln!("Error removing volumes/ directory: {}", e);
std::process::exit(1);
}
}
}
let wal_dir = dir.join("wal");
if wal_dir.exists() {
match std::fs::remove_dir_all(&wal_dir) {
Ok(()) => {
if !args.quiet {
eprintln!("Removed wal/ directory for recovery");
}
}
Err(e) => {
eprintln!("Error removing wal/ directory: {}", e);
std::process::exit(1);
}
}
}
let lock_file = dir.join("db.lock");
if lock_file.exists() {
let _ = std::fs::remove_file(&lock_file);
}
} else if !args.quiet {
eprintln!("Warning: --reset-volumes only applies to file:// databases");
}
}
if args.restore.is_some() {
let dir = match &db_dir {
Some(d) => d,
None => {
eprintln!("Error: --restore requires a file:// database");
std::process::exit(1);
}
};
let snapshot_dir = dir.join("snapshots");
if !snapshot_dir.exists() {
eprintln!("Error: No snapshots/ directory found in {:?}", dir);
eprintln!(
"Create a backup first with: stoolap -d {} --snapshot",
db_path
);
std::process::exit(1);
}
let mut table_count = 0u32;
if let Ok(entries) = std::fs::read_dir(&snapshot_dir) {
for entry in entries.flatten() {
if entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
let table_name = entry.file_name().to_string_lossy().to_string();
let has_bin = std::fs::read_dir(entry.path())
.ok()
.map(|e| {
e.flatten().any(|f| {
f.path()
.extension()
.map(|ext| ext == "bin")
.unwrap_or(false)
})
})
.unwrap_or(false);
if has_bin {
table_count += 1;
if !args.quiet {
eprintln!("[restore] Found snapshot for table '{}'", table_name);
}
}
}
}
}
if table_count == 0 {
eprintln!("Error: No snapshot .bin files found in {:?}", snapshot_dir);
std::process::exit(1);
}
if let Some(ref ts) = args.restore {
if !ts.is_empty() {
if !args.quiet {
eprintln!("[restore] Restoring to timestamp: {}", ts);
}
let db = match Database::open(&db_path) {
Ok(db) => db,
Err(e) => {
eprintln!("Error opening database: {}", e);
std::process::exit(1);
}
};
match db.restore_snapshot(Some(ts)) {
Ok(msg) => {
println!("{}", msg);
}
Err(e) => {
eprintln!("Error restoring snapshot: {}", e);
db.close().unwrap_or(());
std::process::exit(1);
}
}
db.close().unwrap_or(());
return;
}
}
let snap_dir = dir.join("snapshots");
let restore_ts = {
let mut manifests: Vec<String> = std::fs::read_dir(&snap_dir)
.ok()
.map(|entries| {
entries
.filter_map(|e| e.ok())
.filter_map(|e| {
let name = e.file_name().to_string_lossy().to_string();
name.strip_prefix("manifest-")
.and_then(|s| s.strip_suffix(".json"))
.map(|ts| ts.to_string())
})
.collect()
})
.unwrap_or_default();
manifests.sort();
manifests.reverse();
let mut chosen: Option<String> = None;
for ts in &manifests {
let manifest_path = snap_dir.join(format!("manifest-{}.json", ts));
let Ok(content) = std::fs::read_to_string(&manifest_path) else {
continue;
};
let table_names: Vec<String> = if let Some(start) = content.find("\"tables\":[") {
let after = &content[start + 10..];
let mut names = Vec::new();
let mut i = 0;
let bytes = after.as_bytes();
while i < bytes.len() {
if bytes[i] == b']' {
break;
}
if bytes[i] == b'"' {
i += 1;
let mut name = String::new();
while i < bytes.len() && bytes[i] != b'"' {
if bytes[i] == b'\\' && i + 1 < bytes.len() {
i += 1; }
name.push(bytes[i] as char);
i += 1;
}
if !name.is_empty() {
names.push(name);
}
i += 1; } else {
i += 1;
}
}
names
} else {
Vec::new()
};
if table_names.is_empty() {
continue;
}
let ddl_file = snap_dir.join(format!("ddl-{}.bin", ts));
let all_exist = ddl_file.exists()
&& table_names.iter().all(|table| {
let snapshot_file =
snap_dir.join(table).join(format!("snapshot-{}.bin", ts));
snapshot_file.exists()
});
if all_exist {
if !args.quiet {
eprintln!(
"[restore] Found complete manifest for timestamp {} ({} tables)",
ts,
table_names.len()
);
}
chosen = Some(ts.clone());
break;
} else if !args.quiet {
eprintln!(
"[restore] Manifest {} has missing table snapshots, trying older...",
ts
);
}
}
chosen
};
if let Some(ts) = restore_ts {
if !args.quiet {
eprintln!("[restore] Restoring from timestamp {}...", ts);
}
let db = match Database::open(&db_path) {
Ok(db) => db,
Err(e) => {
eprintln!("Error opening database: {}", e);
std::process::exit(1);
}
};
match db.restore_snapshot(Some(&ts)) {
Ok(msg) => println!("{}", msg),
Err(e) => {
eprintln!("Error restoring snapshot: {}", e);
db.close().unwrap_or(());
std::process::exit(1);
}
}
db.close().unwrap_or(());
} else {
if !args.quiet {
eprintln!("[restore] No snapshot manifest found, using legacy restore path.");
eprintln!("[restore] Warning: DDL (indexes, views) may not match restored data.");
}
let vol_dir = dir.join("volumes");
if vol_dir.exists() {
if let Err(e) = std::fs::remove_dir_all(&vol_dir) {
eprintln!("Error removing volumes/: {}", e);
std::process::exit(1);
}
}
let wal_dir = dir.join("wal");
if wal_dir.exists() {
if let Err(e) = std::fs::remove_dir_all(&wal_dir) {
eprintln!("Error removing wal/: {}", e);
std::process::exit(1);
}
}
let db = match Database::open(&db_path) {
Ok(db) => db,
Err(e) => {
eprintln!("Error opening database after restore: {}", e);
std::process::exit(1);
}
};
match db.query("SHOW TABLES", ()) {
Ok(rows) => {
let tables: Vec<String> = rows
.filter_map(|r| r.ok())
.filter_map(|r| r.get::<String>(0).ok())
.collect();
println!(
"Restore complete (legacy). {} tables recovered: {}",
tables.len(),
tables.join(", ")
);
}
Err(_) => {
println!(
"Restore complete (legacy). {} tables recovered.",
table_count
);
}
}
db.close().unwrap_or(());
}
return;
}
let db = match Database::open(&db_path) {
Ok(db) => db,
Err(e) => {
eprintln!("Error opening database: {}", e);
std::process::exit(1);
}
};
if !args.quiet {
println!("Connected to database: {}", db_path);
if db_path.starts_with("file://") {
print_persistence_info(&args);
}
}
if args.snapshot {
match db.query("PRAGMA SNAPSHOT", ()) {
Ok(mut rows) => {
if let Some(Ok(row)) = rows.next() {
if let Ok(msg) = row.get::<String>(0) {
println!("{}", msg);
}
}
}
Err(e) => {
eprintln!("Error creating snapshot: {}", e);
std::process::exit(1);
}
}
db.close().unwrap_or(());
return;
}
if let Some(ref sql) = args.execute {
if let Err(e) = execute_query_with_options(
&db,
sql,
args.json_output,
args.quiet,
args.limit,
args.timeout_ms,
) {
eprintln!("Error: {}", e);
std::process::exit(1);
}
return;
}
if let Some(ref filename) = args.file {
let result = execute_from_file(
&db,
filename,
args.json_output,
args.quiet,
args.limit,
args.timeout_ms,
);
drop(db); if let Err(e) = result {
eprintln!("Error: {}", e);
std::process::exit(1);
}
return;
}
let is_pipe = !std::io::stdin().is_terminal();
if is_pipe {
let result = execute_piped_input(
&db,
args.json_output,
args.quiet,
args.limit,
args.timeout_ms,
);
drop(db); if let Err(e) = result {
eprintln!("Error: {}", e);
std::process::exit(1);
}
return;
}
let mut cli = match Cli::new(
db,
args.json_output,
args.limit,
args.quiet,
args.timeout_ms,
) {
Ok(cli) => cli,
Err(e) => {
eprintln!("Error initializing CLI: {}", e);
std::process::exit(1);
}
};
if let Err(e) = cli.run() {
drop(cli);
eprintln!("Error: {}", e);
std::process::exit(1);
}
}
fn execute_from_file(
db: &Database,
filename: &str,
json_output: bool,
quiet: bool,
row_limit: usize,
timeout_ms: u64,
) -> Result<(), String> {
let file =
File::open(filename).map_err(|e| format!("Error opening file {}: {}", filename, e))?;
let reader = BufReader::new(file);
let mut current_statement = String::new();
for line_result in reader.lines() {
let line = line_result.map_err(|e| format!("Error reading file: {}", e))?;
let trimmed = line.trim();
if trimmed.starts_with('#') {
continue;
}
if trimmed.starts_with("--") || (trimmed.starts_with("/*") && trimmed.ends_with("*/")) {
continue;
}
if trimmed.is_empty() && !current_statement.is_empty() {
let q = current_statement.trim().to_string();
current_statement.clear();
if !q.is_empty() {
let statements = split_sql_statements(&q);
for stmt in statements {
let stmt = stmt.trim();
if stmt.is_empty() {
continue;
}
if let Err(e) = execute_query_with_options(
db,
stmt,
json_output,
quiet,
row_limit,
timeout_ms,
) {
eprintln!("Error: {}", e);
}
}
}
} else {
current_statement.push_str(&line);
current_statement.push('\n');
}
}
if !current_statement.is_empty() {
let q = current_statement.trim().to_string();
if !q.is_empty() {
let statements = split_sql_statements(&q);
for stmt in statements {
let stmt = stmt.trim();
if stmt.is_empty() {
continue;
}
if let Err(e) =
execute_query_with_options(db, stmt, json_output, quiet, row_limit, timeout_ms)
{
eprintln!("Error: {}", e);
}
}
}
}
Ok(())
}
fn execute_piped_input(
db: &Database,
json_output: bool,
quiet: bool,
row_limit: usize,
timeout_ms: u64,
) -> Result<(), String> {
let stdin = io::stdin();
let reader = stdin.lock();
let mut current_statement = String::new();
for line_result in reader.lines() {
let line = line_result.map_err(|e| format!("Error reading input: {}", e))?;
let trimmed = line.trim();
if trimmed.starts_with('#') {
continue;
}
if trimmed.starts_with("--") || (trimmed.starts_with("/*") && trimmed.ends_with("*/")) {
continue;
}
if trimmed.is_empty() && !current_statement.is_empty() {
let q = current_statement.trim().to_string();
current_statement.clear();
if !q.is_empty() {
let statements = split_sql_statements(&q);
for stmt in statements {
let stmt = stmt.trim();
if stmt.is_empty() {
continue;
}
let start = Instant::now();
if let Err(e) = execute_query_with_options(
db,
stmt,
json_output,
quiet,
row_limit,
timeout_ms,
) {
eprintln!("Error: {}", e);
} else if !json_output && !quiet {
println!("Query executed in {:?}", start.elapsed());
}
}
}
} else {
current_statement.push_str(&line);
current_statement.push('\n');
}
}
if !current_statement.is_empty() {
let q = current_statement.trim().to_string();
if !q.is_empty() {
let statements = split_sql_statements(&q);
for stmt in statements {
let stmt = stmt.trim();
if stmt.is_empty() {
continue;
}
let start = Instant::now();
if let Err(e) =
execute_query_with_options(db, stmt, json_output, quiet, row_limit, timeout_ms)
{
eprintln!("Error: {}", e);
} else if !json_output && !quiet {
println!("Query executed in {:?}", start.elapsed());
}
}
}
}
Ok(())
}
fn execute_query_with_options(
db: &Database,
query: &str,
json_output: bool,
quiet: bool,
row_limit: usize,
timeout_ms: u64,
) -> Result<(), String> {
let upper_query = query.to_uppercase();
let upper_query = upper_query.trim();
match upper_query {
"HELP" | "\\H" | "\\?" => {
print_help_main();
return Ok(());
}
"EXIT" | "QUIT" | "\\Q" => {
return Err("exit requested".to_string());
}
_ => {}
}
let (sql, params) = parse_params(query);
if upper_query.starts_with("SELECT")
|| upper_query.starts_with("WITH")
|| upper_query.starts_with("SHOW")
|| upper_query.starts_with("DESCRIBE")
|| upper_query.starts_with("DESC ")
|| upper_query.starts_with("EXPLAIN")
|| upper_query.starts_with("VACUUM")
|| (upper_query.starts_with("PRAGMA") && !upper_query.contains('='))
|| upper_query.contains(" RETURNING ")
|| upper_query.ends_with(" RETURNING")
{
let rows_result = if timeout_ms > 0 {
db.query_with_timeout(&sql, params, timeout_ms)
.map_err(|e| e.to_string())?
} else {
db.query(&sql, params).map_err(|e| e.to_string())?
};
let columns: Vec<String> = rows_result.columns().to_vec();
let mut all_rows: Vec<Vec<Value>> = Vec::new();
for row_result in rows_result {
let row = row_result.map_err(|e| e.to_string())?;
let mut values = Vec::new();
for i in 0..row.len() {
values.push(row.get_value(i).cloned().unwrap_or(Value::null_unknown()));
}
all_rows.push(values);
}
let row_count = all_rows.len();
if json_output {
output_json(&columns, &all_rows, row_count)?;
} else {
output_table(&columns, &all_rows, row_count, row_limit, quiet)?;
}
} else {
let rows_affected = if timeout_ms > 0 {
db.execute_with_timeout(&sql, params, timeout_ms)
.map_err(|e| e.to_string())?
} else {
db.execute(&sql, params).map_err(|e| e.to_string())?
};
if json_output {
println!(r#"{{"rows_affected":{}}}"#, rows_affected);
} else if !quiet {
println!("{} rows affected", rows_affected);
}
}
Ok(())
}
fn parse_params(query: &str) -> (String, Vec<Value>) {
let parts: Vec<&str> = query.split(" -- PARAMS: ").collect();
if parts.len() <= 1 {
let sql = parts[0].trim().trim_end_matches(';').to_string();
return (sql, Vec::new());
}
let sql = parts[0].trim().trim_end_matches(';').to_string();
let param_string = parts[1].trim();
let mut params = Vec::new();
for val in param_string.split(',') {
params.push(convert_param_value(val.trim()));
}
(sql, params)
}
fn convert_param_value(value: &str) -> Value {
if let Ok(i) = value.parse::<i64>() {
return Value::Integer(i);
}
if value.contains('.') {
if let Ok(f) = value.parse::<f64>() {
return Value::Float(f);
}
}
if value == "true" {
return Value::Boolean(true);
}
if value == "false" {
return Value::Boolean(false);
}
if value.eq_ignore_ascii_case("null") {
return Value::null_unknown();
}
Value::text(value)
}
fn output_json(columns: &[String], rows: &[Vec<Value>], row_count: usize) -> Result<(), String> {
let json_rows: Vec<Vec<serde_json::Value>> = rows
.iter()
.map(|row| row.iter().map(value_to_json).collect())
.collect();
let result = serde_json::json!({
"columns": columns,
"rows": json_rows,
"count": row_count
});
println!(
"{}",
serde_json::to_string(&result).map_err(|e| e.to_string())?
);
Ok(())
}
fn output_table(
columns: &[String],
rows: &[Vec<Value>],
row_count: usize,
row_limit: usize,
quiet: bool,
) -> Result<(), String> {
for (i, column) in columns.iter().enumerate() {
if i > 0 {
print!(" | ");
}
print!("{}", column);
}
println!();
for (i, _) in columns.iter().enumerate() {
if i > 0 {
print!("-+-");
}
print!("----");
}
println!();
if row_limit == 0 || row_count <= row_limit {
for row in rows {
for (i, value) in row.iter().enumerate() {
if i > 0 {
print!(" | ");
}
print!("{}", format_value(value));
}
println!();
}
if !quiet {
println!("{} rows in set", row_count);
}
} else {
let top_rows = row_limit / 2;
let bottom_rows = row_limit - top_rows;
for row in rows.iter().take(top_rows) {
for (i, value) in row.iter().enumerate() {
if i > 0 {
print!(" | ");
}
print!("{}", format_value(value));
}
println!();
}
let hidden_rows = row_count - row_limit;
println!();
println!(" \x1b[2m... ({} more rows) ...\x1b[0m", hidden_rows);
println!();
let start_idx = row_count.saturating_sub(bottom_rows).max(top_rows);
for row in rows.iter().skip(start_idx) {
for (i, value) in row.iter().enumerate() {
if i > 0 {
print!(" | ");
}
print!("{}", format_value(value));
}
println!();
}
if !quiet {
println!("{} rows in set (showing {})", row_count, row_limit);
}
}
Ok(())
}
const C_HEADER: &str = "\x1b[1;4m"; const C_DIM: &str = "\x1b[38;5;245m"; const C_RESET: &str = "\x1b[0m";
const C_INDEX: &str = "\x1b[1;38;5;46m"; const C_SEQ: &str = "\x1b[1;38;5;208m"; const C_JOIN: &str = "\x1b[1;38;5;75m"; const C_STATS: &str = "\x1b[1;38;5;196m"; const C_LABEL: &str = "\x1b[38;5;117m"; const C_SUBIDX: &str = "\x1b[38;5;114m"; const C_CTE: &str = "\x1b[38;5;183m";
fn is_explain_output(rows: &[Vec<Value>]) -> bool {
if let Some(first_row) = rows.first() {
if let Some(Value::Text(line)) = first_row.first() {
let trimmed = line.trim();
return trimmed.starts_with("SELECT")
|| trimmed.starts_with("INSERT")
|| trimmed.starts_with("UPDATE")
|| trimmed.starts_with("DELETE")
|| trimmed.starts_with("WITH ");
}
}
false
}
fn colorize_plan_line(line: &str) -> String {
let trimmed = line.trim_start();
let indent = &line[..line.len() - trimmed.len()];
let mut result = String::with_capacity(line.len() + 80);
result.push_str(indent);
if trimmed.starts_with("-> ") {
colorize_node(trimmed, &mut result);
} else if trimmed.starts_with("SELECT")
|| trimmed.starts_with("INSERT")
|| trimmed.starts_with("UPDATE")
|| trimmed.starts_with("DELETE")
{
colorize_header(trimmed, &mut result);
} else if trimmed.starts_with("WITH ") || trimmed.starts_with("Statement:") {
result.push_str(C_HEADER);
result.push_str(trimmed);
result.push_str(C_RESET);
} else {
colorize_detail(trimmed, &mut result);
}
result
}
fn split_stats(line: &str) -> (&str, Option<&str>) {
for marker in &["(actual ", "(cost="] {
if let Some(pos) = line.rfind(marker) {
return (line[..pos].trim_end(), Some(&line[pos..]));
}
}
(line, None)
}
fn colorize_node(line: &str, result: &mut String) {
let after_arrow = &line[3..];
result.push_str(C_DIM);
result.push_str("->");
result.push_str(C_RESET);
result.push(' ');
let indexed = [
"Index Scan",
"PK Lookup",
"Multi-Index Scan",
"Composite Index Scan",
"Index Nested Loop",
];
let sequential = ["Seq Scan", "Parallel Seq Scan"];
let joins = ["Hash Join", "Merge Join", "Nested Loop ("];
let color = if indexed.iter().any(|p| after_arrow.starts_with(p)) {
C_INDEX
} else if sequential.iter().any(|p| after_arrow.starts_with(p)) {
C_SEQ
} else if joins.iter().any(|p| after_arrow.starts_with(p)) {
C_JOIN
} else if after_arrow.starts_with("Subquery Scan") || after_arrow.starts_with("CTE Scan") {
C_CTE
} else {
result.push_str(C_SUBIDX);
result.push_str(after_arrow);
result.push_str(C_RESET);
return;
};
let (scan_part, stats_part) = split_stats(after_arrow);
result.push_str(color);
result.push_str(scan_part);
result.push_str(C_RESET);
if let Some(stats) = stats_part {
result.push(' ');
result.push_str(C_STATS);
result.push_str(stats);
result.push_str(C_RESET);
}
}
fn colorize_header(line: &str, result: &mut String) {
let (scan_part, stats_part) = split_stats(line);
result.push_str(C_HEADER);
result.push_str(scan_part);
result.push_str(C_RESET);
if let Some(stats) = stats_part {
result.push(' ');
result.push_str(C_STATS);
result.push_str(stats);
result.push_str(C_RESET);
}
}
fn colorize_detail(line: &str, result: &mut String) {
let labels = [
"Columns:",
"Filter:",
"Index Cond:",
"Join Cond:",
"Group By:",
"Order By:",
"Having:",
"Limit:",
"Offset:",
"Using:",
"Alias:",
"Values:",
"Set:",
"Source:",
];
for label in &labels {
if let Some(rest) = line.strip_prefix(label) {
result.push_str(C_LABEL);
result.push_str(label);
result.push_str(C_RESET);
result.push_str(rest);
return;
}
}
result.push_str(line);
}
fn format_value(value: &Value) -> String {
match value {
Value::Null(_) => "NULL".to_string(),
Value::Integer(i) => i.to_string(),
Value::Float(f) => format_float(*f),
Value::Text(s) => s.to_string(),
Value::Boolean(b) => if *b { "true" } else { "false" }.to_string(),
Value::Timestamp(ts) => format_timestamp(ts),
Value::Extension(data) if data.first() == Some(&(stoolap::DataType::Json as u8)) => {
std::str::from_utf8(&data[1..]).unwrap_or("").to_string()
}
Value::Extension(data) if data.first() == Some(&(stoolap::DataType::Vector as u8)) => {
stoolap::core::value::format_vector_bytes(&data[1..])
}
Value::Extension(_) => "<extension>".to_string(),
}
}
fn format_timestamp(ts: &chrono::DateTime<chrono::Utc>) -> String {
let nanos = ts.timestamp_subsec_nanos();
if nanos == 0 {
return ts.format("%Y-%m-%dT%H:%M:%SZ").to_string();
}
let s = ts.format("%Y-%m-%dT%H:%M:%S.%9fZ").to_string();
if let Some(z_pos) = s.rfind('Z') {
let trimmed = s[..z_pos].trim_end_matches('0');
format!("{}Z", trimmed)
} else {
s
}
}
fn format_float(v: f64) -> String {
if v.is_nan() {
return "NaN".to_string();
}
if v.is_infinite() {
return if v.is_sign_positive() {
"Infinity"
} else {
"-Infinity"
}
.to_string();
}
let abs_v = v.abs();
if abs_v != 0.0 && !(1e-4..1e15).contains(&abs_v) {
let s = format!("{:e}", v);
if let Some(e_pos) = s.find('e') {
let (mantissa, exp) = s.split_at(e_pos);
let clean_mantissa = if mantissa.contains('.') {
mantissa
.trim_end_matches('0')
.trim_end_matches('.')
.to_string()
} else {
mantissa.to_string()
};
return format!("{}{}", clean_mantissa, exp);
}
return s;
}
if v == v.trunc() {
format!("{:.1}", v)
} else {
format!("{:.6}", v)
.trim_end_matches('0')
.trim_end_matches('.')
.to_string()
}
}
fn value_to_json(value: &Value) -> serde_json::Value {
match value {
Value::Null(_) => serde_json::Value::Null,
Value::Integer(i) => serde_json::json!(i),
Value::Float(f) => serde_json::json!(f),
Value::Text(s) => serde_json::json!(s.as_str()),
Value::Boolean(b) => serde_json::json!(b),
Value::Timestamp(ts) => serde_json::json!(format_timestamp(ts)),
Value::Extension(data) if data.first() == Some(&(stoolap::DataType::Json as u8)) => {
serde_json::json!(std::str::from_utf8(&data[1..]).unwrap_or(""))
}
Value::Extension(data) if data.first() == Some(&(stoolap::DataType::Vector as u8)) => {
serde_json::json!(stoolap::core::value::format_vector_bytes(&data[1..]))
}
Value::Extension(_) => serde_json::Value::Null,
}
}
fn split_sql_statements(input: &str) -> Vec<String> {
let mut statements = Vec::new();
let mut current_statement = String::new();
let mut in_single_quotes = false;
let mut in_double_quotes = false;
let mut in_line_comment = false;
let mut in_block_comment = false;
let chars: Vec<char> = input.chars().collect();
let mut i = 0;
while i < chars.len() {
let char = chars[i];
if in_line_comment {
if char == '\n' {
in_line_comment = false;
current_statement.push(char);
}
i += 1;
continue;
}
if !in_single_quotes
&& !in_double_quotes
&& !in_block_comment
&& char == '-'
&& i + 1 < chars.len()
&& chars[i + 1] == '-'
{
let after_second_dash = if i + 2 < chars.len() {
chars[i + 2]
} else {
'\0' };
if after_second_dash == '\0'
|| after_second_dash == ' '
|| after_second_dash == '\t'
|| after_second_dash == '\n'
|| after_second_dash == '\r'
{
in_line_comment = true;
i += 2;
continue;
}
}
if in_block_comment {
if char == '*' && i + 1 < chars.len() && chars[i + 1] == '/' {
in_block_comment = false;
i += 2;
continue;
}
i += 1;
continue;
}
if !in_single_quotes
&& !in_double_quotes
&& char == '/'
&& i + 1 < chars.len()
&& chars[i + 1] == '*'
{
in_block_comment = true;
i += 2;
continue;
}
if !in_block_comment && !in_line_comment {
if char == '\'' && (i == 0 || chars[i - 1] != '\\') {
in_single_quotes = !in_single_quotes;
} else if char == '"' && (i == 0 || chars[i - 1] != '\\') {
in_double_quotes = !in_double_quotes;
}
}
if char == ';'
&& !in_single_quotes
&& !in_double_quotes
&& !in_block_comment
&& !in_line_comment
{
statements.push(current_statement.clone());
current_statement.clear();
} else {
current_statement.push(char);
}
i += 1;
}
if !current_statement.is_empty() {
statements.push(current_statement);
}
statements
}
fn print_help_main() {
println!("Stoolap SQL CLI");
println!();
println!(" SQL Commands:");
println!(" SELECT ... Execute a SELECT query");
println!(" INSERT ... Insert data into a table");
println!(" UPDATE ... Update data in a table");
println!(" DELETE ... Delete data from a table");
println!(" CREATE TABLE ... Create a new table");
println!(" CREATE INDEX ... Create an index on a column");
println!(" SHOW TABLES List all tables");
println!(" SHOW CREATE TABLE ... Show CREATE TABLE statement for a table");
println!(" SHOW INDEXES FROM ... Show indexes for a table");
println!();
println!(" Transaction Commands:");
println!(" BEGIN Start a new transaction");
println!(" COMMIT Commit the current transaction");
println!(" ROLLBACK Rollback the current transaction");
println!();
println!(" Special Commands:");
println!(" help, \\h, \\? Show this help message");
println!();
}