use anyhow::{Context, Result};
use duckdb::Connection;
use std::fs;
use std::path::Path;
use tracing::{info, warn};
pub struct DatasetManager {
conn: Connection,
}
impl DatasetManager {
pub fn new() -> Result<Self> {
let conn = Connection::open_in_memory().context("Failed to create DuckDB connection")?;
conn.execute_batch("INSTALL parquet; LOAD parquet; INSTALL tpch; LOAD tpch;")?;
Ok(Self { conn })
}
pub fn download_chinook(&self, output_dir: &str, format: &str) -> Result<()> {
info!(
"Downloading Chinook dataset in {} format to {}",
format, output_dir
);
fs::create_dir_all(output_dir)?;
self.create_sample_chinook_data(output_dir)?;
if format != "csv" {
self.convert_chinook_to_format(output_dir, format)?;
}
info!("✅ Chinook dataset downloaded to {}", output_dir);
Ok(())
}
pub fn download_tpch(&self, output_dir: &str, format: &str) -> Result<()> {
info!(
"Generating TPC-H dataset in {} format to {}",
format, output_dir
);
fs::create_dir_all(output_dir)?;
info!("🔄 Generating TPC-H data with scale factor 0.01...");
self.conn.execute("CALL dbgen(sf = 0.01)", [])?;
match format {
"duckdb" => {
let db_path = Path::new(output_dir).join("tpch.duckdb");
self.conn
.execute(&format!("EXPORT DATABASE '{}'", db_path.display()), [])?;
info!("✅ TPC-H dataset exported to DuckDB: {}", db_path.display());
}
"parquet" => {
self.export_tpch_tables_to_parquet(output_dir)?;
}
"csv" => {
self.export_tpch_tables_to_csv(output_dir)?;
}
_ => {
warn!("⚠️ Unsupported format for TPC-H: {}", format);
info!(" Available formats: duckdb, parquet, csv");
info!(" Defaulting to DuckDB format");
let db_path = Path::new(output_dir).join("tpch.duckdb");
self.conn
.execute(&format!("EXPORT DATABASE '{}'", db_path.display()), [])?;
}
}
info!("✅ TPC-H dataset generated to {}", output_dir);
Ok(())
}
fn export_tpch_tables_to_parquet(&self, output_dir: &str) -> Result<()> {
let tables = [
"customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier",
];
for table in &tables {
let parquet_path = Path::new(output_dir).join(format!("{}.parquet", table));
self.conn.execute(
&format!(
"COPY {} TO '{}' (FORMAT PARQUET)",
table,
parquet_path.display()
),
[],
)?;
}
info!("✅ TPC-H tables exported to Parquet format");
Ok(())
}
fn export_tpch_tables_to_csv(&self, output_dir: &str) -> Result<()> {
let tables = [
"customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier",
];
for table in &tables {
let csv_path = Path::new(output_dir).join(format!("{}.csv", table));
self.conn.execute(
&format!(
"COPY {} TO '{}' (FORMAT CSV, HEADER)",
table,
csv_path.display()
),
[],
)?;
}
info!("✅ TPC-H tables exported to CSV format");
Ok(())
}
fn create_sample_chinook_data(&self, output_dir: &str) -> Result<()> {
let csv_data = r#"ArtistId,Name
1,AC/DC
2,Aerosmith
3,Led Zeppelin
AlbumId,Title,ArtistId
1,For Those About To Rock We Salute You,1
2,Let There Be Rock,1
3,Toys In The Attic,2
TrackId,Name,AlbumId,Composer,Milliseconds,Bytes,UnitPrice
1,For Those About To Rock (We Salute You),1,Angus Young, Malcolm Young, Brian Johnson,343719,11170334,0.99
2,Put The Finger On You,1,Angus Young, Malcolm Young, Brian Johnson,205662,6713451,0.99
3,Walk This Way,3,Steven Tyler, Joe Perry,331180,10871135,0.99"#;
let csv_path = Path::new(output_dir).join("chinook.csv");
fs::write(&csv_path, csv_data)?;
info!("✅ Sample Chinook CSV created: {}", csv_path.display());
Ok(())
}
fn convert_chinook_to_format(&self, output_dir: &str, format: &str) -> Result<()> {
let csv_path = Path::new(output_dir).join("chinook.csv");
match format {
"parquet" => {
let parquet_path = Path::new(output_dir).join("chinook.parquet");
self.conn.execute(
&format!(
"COPY (SELECT * FROM read_csv('{}', header=true)) TO '{}' (FORMAT PARQUET)",
csv_path.display(),
parquet_path.display()
),
[],
)?;
info!("✅ Converted to Parquet: {}", parquet_path.display());
}
"arrow" => {
info!("ℹ️ Arrow format conversion requires DuckDB Arrow integration");
}
_ => {
warn!("⚠️ Unsupported format: {}", format);
}
}
Ok(())
}
pub fn convert_dataset(
&self,
input: &str,
output: &str,
input_format: &str,
output_format: &str,
) -> Result<()> {
info!(
"Converting {} from {} to {}",
input, input_format, output_format
);
let query = match (input_format, output_format) {
("csv", "parquet") => format!(
"COPY (SELECT * FROM read_csv('{}', header=true)) TO '{}' (FORMAT PARQUET)",
input, output
),
("parquet", "csv") => format!(
"COPY (SELECT * FROM read_parquet('{}')) TO '{}' (FORMAT CSV)",
input, output
),
_ => {
return Err(anyhow::anyhow!(
"Unsupported conversion: {} to {}",
input_format,
output_format
));
}
};
self.conn.execute(&query, [])?;
info!("✅ Converted {} to {}", input, output);
Ok(())
}
pub fn show_info(&self) -> Result<()> {
info!("🦆 Frozen DuckDB Information");
info!(" Version: {}", env!("CARGO_PKG_VERSION"));
info!(" Build Type: Pre-compiled binary");
info!(" Architecture: {}", std::env::consts::ARCH);
info!(" Target: {}", std::env::consts::OS);
let extensions: Vec<String> = self
.conn
.prepare("SELECT extension_name FROM duckdb_extensions() ORDER BY extension_name")?
.query_map([], |row| row.get(0))?
.collect::<Result<Vec<_>, _>>()?;
info!(" Available Extensions: {}", extensions.join(", "));
Ok(())
}
}