use anyhow::Result;
use clap::Args;
use colored::*;
use dataprof::output::output_with_adaptive_formatter;
use dataprof::{ColumnProfile, DatabaseConfig, OutputFormat, analyze_database};
#[derive(Debug, Args)]
pub struct DatabaseArgs {
pub connection_string: String,
#[arg(long)]
pub query: Option<String>,
#[arg(long)]
pub table: Option<String>,
#[arg(long, default_value = "10000")]
pub batch_size: usize,
#[arg(long)]
pub quality: bool,
}
pub fn execute(args: &DatabaseArgs) -> Result<()> {
run_database_analysis(args, &args.connection_string)
}
fn display_column_profile(profile: &ColumnProfile) {
println!(
"{} ({})",
profile.name.bright_cyan().bold(),
format!("{:?}", profile.data_type).yellow()
);
println!(
" Total: {} | Nulls: {} ({:.1}%)",
profile.total_count,
profile.null_count,
(profile.null_count as f64 / profile.total_count as f64) * 100.0
);
if let Some(unique_count) = profile.unique_count {
println!(
" Distinct: {} | Unique ratio: {:.1}%",
unique_count,
(unique_count as f64 / profile.total_count as f64) * 100.0
);
}
}
#[allow(deprecated)]
fn run_database_analysis(args: &DatabaseArgs, connection_string: &str) -> Result<()> {
use tokio;
println!(
"{}",
format!(
"DataProfiler v{} - Database Analysis",
env!("CARGO_PKG_VERSION")
)
.bright_blue()
.bold()
);
println!();
let query = if let Some(sql_query) = &args.query {
sql_query.to_string()
} else if let Some(table_name) = &args.table {
format!("SELECT * FROM {}", table_name)
} else {
return Err(anyhow::anyhow!(
"Please specify either --query 'SELECT * FROM table' or provide table name as argument"
));
};
let config = DatabaseConfig {
connection_string: connection_string.to_string(),
batch_size: args.batch_size,
max_connections: Some(10),
connection_timeout: Some(std::time::Duration::from_secs(30)),
retry_config: Some(dataprof::database::RetryConfig::default()),
sampling_config: None,
ssl_config: Some(dataprof::database::SslConfig::default()),
load_credentials_from_env: true,
};
let rt = tokio::runtime::Runtime::new()
.map_err(|e| anyhow::anyhow!("Failed to create async runtime: {}", e))?;
let report = rt.block_on(async { analyze_database(config, &query, true, None).await })?;
println!(
"{} | {} columns | {} rows",
connection_string
.split('@')
.next_back()
.unwrap_or(connection_string),
report.execution.columns_detected,
report.execution.rows_processed
);
if report.execution.rows_processed > 0 {
let scan_time_sec = report.execution.scan_time_ms as f64 / 1000.0;
let rows_per_sec = report.execution.rows_processed as f64 / scan_time_sec;
println!(
"Processed {} rows in {:.1}s ({:.0} rows/sec)",
report.execution.rows_processed, scan_time_sec, rows_per_sec
);
}
println!();
if args.quality {
output_with_adaptive_formatter(&report, Some(OutputFormat::Text))?;
}
for profile in &report.column_profiles {
display_column_profile(profile);
println!();
}
Ok(())
}