1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::Result;
use std::time::Instant;

use arrow::datatypes::Schema;
use async_trait::async_trait;
use datafusion::error::DataFusionError;
use datafusion::prelude::*;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;

pub mod tpcds;
pub mod tpch;

#[async_trait]
pub trait Tpc {
    fn generate(
        &self,
        scale: usize,
        partitions: usize,
        input_path: &str,
        output_path: &str,
    ) -> Result<()>;
    fn get_table_names(&self) -> Vec<&str>;
    fn get_schema(&self, table: &str) -> Schema;
    async fn convert_to_parquet(
        &self,
        input_path: &str,
        output_path: &str,
    ) -> datafusion::error::Result<()>;
}

pub async fn convert_tbl(
    input_path: &str,
    output_path: &str,
    options: CsvReadOptions<'_>,
    partitions: usize,
    file_format: &str,
    compression: &str,
    batch_size: usize,
) -> datafusion::error::Result<()> {
    println!(
        "Converting '{}' to {} files in directory '{}'",
        input_path, file_format, output_path
    );

    let start = Instant::now();

    let config = ExecutionConfig::new().with_batch_size(batch_size);
    let mut ctx = ExecutionContext::with_config(config);

    // build plan to read the TBL file
    let mut csv = ctx.read_csv(&input_path, options)?;

    // optionally, repartition the file
    if partitions > 1 {
        csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))?
    }

    // create the physical plan
    let csv = csv.to_logical_plan();
    let csv = ctx.optimize(&csv)?;
    let csv = ctx.create_physical_plan(&csv)?;

    match file_format {
        "csv" => ctx.write_csv(csv, output_path.to_string()).await?,
        "parquet" => {
            let compression = match compression {
                "none" => Compression::UNCOMPRESSED,
                "snappy" => Compression::SNAPPY,
                "brotli" => Compression::BROTLI,
                "gzip" => Compression::GZIP,
                "lz4" => Compression::LZ4,
                "lz0" => Compression::LZO,
                "zstd" => Compression::ZSTD,
                other => {
                    return Err(DataFusionError::NotImplemented(format!(
                        "Invalid compression format: {}",
                        other
                    )))
                }
            };
            let props = WriterProperties::builder()
                .set_compression(compression)
                .build();

            ctx.write_parquet(csv, output_path.to_string(), Some(props))
                .await?
        }
        other => {
            return Err(DataFusionError::NotImplemented(format!(
                "Invalid output format: {}",
                other
            )))
        }
    }
    println!("Conversion completed in {} ms", start.elapsed().as_millis());

    Ok(())
}