use anyhow::{Context, Result};
use sql_stream::{CliArgs, QueryEngine};
use tokio::signal;
use tracing::{error, info, warn};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
#[tokio::main]
async fn main() -> Result<()> {
let args = CliArgs::parse();
init_tracing(args.verbose);
info!("SQL Stream CLI starting");
if let Err(e) = args.validate() {
error!("Validation error: {}", e);
anyhow::bail!("{}", e);
}
let shutdown_handle = tokio::spawn(async {
match signal::ctrl_c().await {
Ok(()) => {
warn!("Received shutdown signal (Ctrl+C), terminating gracefully...");
std::process::exit(0);
}
Err(err) => {
error!("Failed to listen for shutdown signal: {}", err);
}
}
});
let result = run_query(&args).await;
shutdown_handle.abort();
match result {
Ok(()) => {
info!("Query executed successfully");
Ok(())
}
Err(e) => {
error!("Query execution failed: {}", e);
Err(e)
}
}
}
async fn run_query(args: &CliArgs) -> Result<()> {
let mut engine = QueryEngine::new().context("Failed to initialize query engine")?;
engine
.register_file(
args.file.to_str().context("Invalid file path")?,
&args.table_name,
)
.await
.context("Failed to register file")?;
info!(
"Registered file '{}' as table '{}'",
args.file.display(),
args.table_name
);
let dataframe = engine
.execute_query(&args.query)
.await
.context("Failed to execute query")?;
engine
.print_results(dataframe)
.await
.context("Failed to print results")?;
Ok(())
}
fn init_tracing(verbose: bool) {
let filter = if verbose {
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("debug"))
} else {
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))
};
tracing_subscriber::registry()
.with(fmt::layer().with_target(false))
.with(filter)
.init();
}