bendsql 0.34.0

Databend Native Command Line Tool
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::{ast::GenType, session::Session};
use anyhow::anyhow;
use anyhow::Result;
use databend_driver::DataType;
use databend_driver::Field;
use databend_driver::LoadMethod;
use databend_driver::NumberDataType;
use databend_driver::RowStatsIterator;
use databend_driver::Schema;
use databend_driver::{NumberValue, Row, RowWithStats, Value};
use serde_json::Value as JsonValue;
use std::process::Command;
use std::sync::Arc;
use tempfile::tempdir;

impl Session {
    pub(crate) async fn gendata(
        &self,
        t: GenType,
        scale: f32,
        drop_override: bool,
    ) -> Result<RowStatsIterator> {
        // Check if duckdb is available
        let duckdb_check = Command::new("duckdb").arg("--version").output();
        if duckdb_check.is_err() {
            return Err(anyhow!(
                "DuckDB is not installed. Please install it first by running: !install duckdb"
            ));
        }

        let temp_dir = tempdir()?;
        let db_path = temp_dir.path().join("gendata.db");
        let export_path = temp_dir.path().join("export");
        std::fs::create_dir_all(&export_path)?;

        // Create DuckDB commands based on type
        let commands = match t {
            GenType::TPCH => vec![
                "install tpch;".to_string(),
                "load tpch;".to_string(),
                format!("CALL DBGEN(sf = {});", scale),
                format!(
                    "EXPORT DATABASE '{}' (FORMAT PARQUET);",
                    export_path.display()
                ),
            ],
            GenType::TPCDS => vec![
                "install tpcds;".to_string(),
                "load tpcds;".to_string(),
                format!("CALL DSDGEN(sf = {});", scale),
                format!(
                    "EXPORT DATABASE '{}' (FORMAT PARQUET);",
                    export_path.display()
                ),
            ],
        };

        // Execute DuckDB commands
        for command in commands {
            let output = Command::new("duckdb")
                .arg(db_path.to_str().unwrap())
                .arg("-c")
                .arg(&command)
                .output()
                .map_err(|e| anyhow!("Failed to execute DuckDB command '{}': {}", command, e))?;

            if !output.status.success() {
                let stderr = String::from_utf8_lossy(&output.stderr);
                return Err(anyhow!("DuckDB command '{}' failed: {}", command, stderr));
            }
        }

        let mut results = vec![];
        let schema = Arc::new(gendata_schema());

        // Process exported parquet files
        let mut entries: Vec<_> = std::fs::read_dir(&export_path)?.collect();
        entries.sort_by_key(|e| e.as_ref().unwrap().path());

        for f in entries {
            let f = f?;
            let path = f.path();

            // Skip if the path is a directory or if it does not end with .parquet
            if path.is_dir() || path.extension().is_none_or(|ext| ext != "parquet") {
                continue;
            }
            let table_name = path.file_stem().unwrap().to_str().unwrap().to_string();

            let create = if drop_override {
                "CREATE OR REPLACE"
            } else {
                "CREATE"
            };
            // Get table schema from DuckDB
            let table_schema = get_table_schema_from_duckdb(&path)?;
            let create_table_sql = format!("{create} TABLE {table_name} ({table_schema})");
            let _ = self.conn.exec(&create_table_sql).await?;

            // Use stream_load with LoadMethod::Streaming
            let sql = format!(
                "INSERT INTO {table_name} from @_databend_load file_format = (type='parquet')"
            );
            let _stats = self
                .conn
                .load_file(&sql, &path, LoadMethod::Streaming)
                .await?;

            let size = std::fs::metadata(&path)?.len();

            results.push(Ok(RowWithStats::Row(Row::from_vec(
                schema.clone(),
                vec![
                    Value::String(table_name),
                    Value::String("OK".to_string()),
                    Value::Number(NumberValue::UInt64(size)),
                ],
            ))));
        }

        Ok(RowStatsIterator::new(
            schema,
            Box::pin(tokio_stream::iter(results)),
        ))
    }
}

fn get_table_schema_from_duckdb(path: &std::path::Path) -> Result<String> {
    // Query DuckDB to get table schema information
    let describe_command = format!(
        "create table t as select * from  read_parquet('{}');DESCRIBE t;",
        path.display()
    );

    let output = Command::new("duckdb")
        .arg("-json")
        .arg("-c")
        .arg(&describe_command)
        .output()
        .map_err(|e| anyhow!("Failed to describe table '{}': {}", path.display(), e))?;

    if !output.status.success() {
        let stderr = String::from_utf8_lossy(&output.stderr);
        return Err(anyhow!(
            "Failed to describe table '{}': {}",
            path.display(),
            stderr
        ));
    }

    let stdout = String::from_utf8_lossy(&output.stdout);
    let lines: Vec<&str> = stdout.trim().lines().collect();
    let line = lines.join("");
    let result = serde_json::from_str::<JsonValue>(&line)?;
    let mut columns = Vec::new();

    for row in result.as_array().unwrap() {
        if let (Some(column_name), Some(column_type)) = (
            row.get("column_name").and_then(|v| v.as_str()),
            row.get("column_type").and_then(|v| v.as_str()),
        ) {
            let databend_type = convert_duckdb_type_to_databend(column_type)?;
            columns.push(format!("{} {}", column_name, databend_type));
        }
    }
    Ok(columns.join(", "))
}

fn convert_duckdb_type_to_databend(duckdb_type: &str) -> Result<String> {
    let duckdb_type = duckdb_type.to_uppercase();

    let databend_type = match duckdb_type.as_str() {
        // Integer types
        "BOOLEAN" | "BOOL" => "BOOLEAN".to_string(),
        "TINYINT" | "INT1" => "TINYINT".to_string(),
        "SMALLINT" | "INT2" | "SHORT" => "SMALLINT".to_string(),
        "INTEGER" | "INT4" | "INT" | "SIGNED" => "INT".to_string(),
        "BIGINT" | "INT8" | "LONG" => "BIGINT".to_string(),

        // Floating point types
        "REAL" | "FLOAT4" | "FLOAT" => "FLOAT".to_string(),
        "DOUBLE" | "FLOAT8" | "NUMERIC" => "DOUBLE".to_string(),

        // String types
        "VARCHAR" | "CHAR" | "BPCHAR" | "TEXT" | "STRING" => "STRING".to_string(),

        // Date/time types
        "DATE" => "DATE".to_string(),
        "TIME" => "TIME".to_string(),
        "TIMESTAMP" | "DATETIME" => "TIMESTAMP".to_string(),

        // Handle parameterized types
        t if t.starts_with("DECIMAL(") => {
            // Extract precision and scale from DECIMAL(precision, scale)
            if let Some(params) = t.strip_prefix("DECIMAL(").and_then(|s| s.strip_suffix(")")) {
                format!("DECIMAL({})", params)
            } else {
                "DECIMAL(38, 10)".to_string() // Default precision and scale
            }
        }
        t if t.starts_with("VARCHAR(") => "STRING".to_string(),
        t if t.starts_with("CHAR(") => "STRING".to_string(),

        // Default fallback
        _ => {
            eprintln!(
                "Warning: Unknown DuckDB type '{}', using STRING as fallback",
                duckdb_type
            );
            "STRING".to_string()
        }
    };

    Ok(databend_type)
}

fn gendata_schema() -> Schema {
    Schema::from_vec(vec![
        Field {
            name: "table".to_string(),
            data_type: DataType::String,
        },
        Field {
            name: "status".to_string(),
            data_type: DataType::String,
        },
        Field {
            name: "size".to_string(),
            data_type: DataType::Number(NumberDataType::UInt64),
        },
    ])
}