use crate::{ast::GenType, session::Session};
use anyhow::anyhow;
use anyhow::Result;
use databend_driver::DataType;
use databend_driver::Field;
use databend_driver::LoadMethod;
use databend_driver::NumberDataType;
use databend_driver::RowStatsIterator;
use databend_driver::Schema;
use databend_driver::{NumberValue, Row, RowWithStats, Value};
use serde_json::Value as JsonValue;
use std::process::Command;
use std::sync::Arc;
use tempfile::tempdir;
impl Session {
pub(crate) async fn gendata(
&self,
t: GenType,
scale: f32,
drop_override: bool,
) -> Result<RowStatsIterator> {
let duckdb_check = Command::new("duckdb").arg("--version").output();
if duckdb_check.is_err() {
return Err(anyhow!(
"DuckDB is not installed. Please install it first by running: !install duckdb"
));
}
let temp_dir = tempdir()?;
let db_path = temp_dir.path().join("gendata.db");
let export_path = temp_dir.path().join("export");
std::fs::create_dir_all(&export_path)?;
let commands = match t {
GenType::TPCH => vec![
"install tpch;".to_string(),
"load tpch;".to_string(),
format!("CALL DBGEN(sf = {});", scale),
format!(
"EXPORT DATABASE '{}' (FORMAT PARQUET);",
export_path.display()
),
],
GenType::TPCDS => vec![
"install tpcds;".to_string(),
"load tpcds;".to_string(),
format!("CALL DSDGEN(sf = {});", scale),
format!(
"EXPORT DATABASE '{}' (FORMAT PARQUET);",
export_path.display()
),
],
};
for command in commands {
let output = Command::new("duckdb")
.arg(db_path.to_str().unwrap())
.arg("-c")
.arg(&command)
.output()
.map_err(|e| anyhow!("Failed to execute DuckDB command '{}': {}", command, e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!("DuckDB command '{}' failed: {}", command, stderr));
}
}
let mut results = vec![];
let schema = Arc::new(gendata_schema());
let mut entries: Vec<_> = std::fs::read_dir(&export_path)?.collect();
entries.sort_by_key(|e| e.as_ref().unwrap().path());
for f in entries {
let f = f?;
let path = f.path();
if path.is_dir() || path.extension().is_none_or(|ext| ext != "parquet") {
continue;
}
let table_name = path.file_stem().unwrap().to_str().unwrap().to_string();
let create = if drop_override {
"CREATE OR REPLACE"
} else {
"CREATE"
};
let table_schema = get_table_schema_from_duckdb(&path)?;
let create_table_sql = format!("{create} TABLE {table_name} ({table_schema})");
let _ = self.conn.exec(&create_table_sql).await?;
let sql = format!(
"INSERT INTO {table_name} from @_databend_load file_format = (type='parquet')"
);
let _stats = self
.conn
.load_file(&sql, &path, LoadMethod::Streaming)
.await?;
let size = std::fs::metadata(&path)?.len();
results.push(Ok(RowWithStats::Row(Row::from_vec(
schema.clone(),
vec![
Value::String(table_name),
Value::String("OK".to_string()),
Value::Number(NumberValue::UInt64(size)),
],
))));
}
Ok(RowStatsIterator::new(
schema,
Box::pin(tokio_stream::iter(results)),
))
}
}
fn get_table_schema_from_duckdb(path: &std::path::Path) -> Result<String> {
let describe_command = format!(
"create table t as select * from read_parquet('{}');DESCRIBE t;",
path.display()
);
let output = Command::new("duckdb")
.arg("-json")
.arg("-c")
.arg(&describe_command)
.output()
.map_err(|e| anyhow!("Failed to describe table '{}': {}", path.display(), e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!(
"Failed to describe table '{}': {}",
path.display(),
stderr
));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let lines: Vec<&str> = stdout.trim().lines().collect();
let line = lines.join("");
let result = serde_json::from_str::<JsonValue>(&line)?;
let mut columns = Vec::new();
for row in result.as_array().unwrap() {
if let (Some(column_name), Some(column_type)) = (
row.get("column_name").and_then(|v| v.as_str()),
row.get("column_type").and_then(|v| v.as_str()),
) {
let databend_type = convert_duckdb_type_to_databend(column_type)?;
columns.push(format!("{} {}", column_name, databend_type));
}
}
Ok(columns.join(", "))
}
fn convert_duckdb_type_to_databend(duckdb_type: &str) -> Result<String> {
let duckdb_type = duckdb_type.to_uppercase();
let databend_type = match duckdb_type.as_str() {
"BOOLEAN" | "BOOL" => "BOOLEAN".to_string(),
"TINYINT" | "INT1" => "TINYINT".to_string(),
"SMALLINT" | "INT2" | "SHORT" => "SMALLINT".to_string(),
"INTEGER" | "INT4" | "INT" | "SIGNED" => "INT".to_string(),
"BIGINT" | "INT8" | "LONG" => "BIGINT".to_string(),
"REAL" | "FLOAT4" | "FLOAT" => "FLOAT".to_string(),
"DOUBLE" | "FLOAT8" | "NUMERIC" => "DOUBLE".to_string(),
"VARCHAR" | "CHAR" | "BPCHAR" | "TEXT" | "STRING" => "STRING".to_string(),
"DATE" => "DATE".to_string(),
"TIME" => "TIME".to_string(),
"TIMESTAMP" | "DATETIME" => "TIMESTAMP".to_string(),
t if t.starts_with("DECIMAL(") => {
if let Some(params) = t.strip_prefix("DECIMAL(").and_then(|s| s.strip_suffix(")")) {
format!("DECIMAL({})", params)
} else {
"DECIMAL(38, 10)".to_string() }
}
t if t.starts_with("VARCHAR(") => "STRING".to_string(),
t if t.starts_with("CHAR(") => "STRING".to_string(),
_ => {
eprintln!(
"Warning: Unknown DuckDB type '{}', using STRING as fallback",
duckdb_type
);
"STRING".to_string()
}
};
Ok(databend_type)
}
fn gendata_schema() -> Schema {
Schema::from_vec(vec![
Field {
name: "table".to_string(),
data_type: DataType::String,
},
Field {
name: "status".to_string(),
data_type: DataType::String,
},
Field {
name: "size".to_string(),
data_type: DataType::Number(NumberDataType::UInt64),
},
])
}