#[cfg(feature = "indicatif-progress")]
use super::progress::IndicatifProgress;
use super::{
Compression, OutputFormat, Table, TpchGenerator, TpchGeneratorBuilder,
DEFAULT_PARQUET_ROW_GROUP_BYTES,
};
use clap::builder::TypedValueParser;
use clap::{ArgAction, Parser};
use log::{info, LevelFilter};
use std::io;
#[cfg(feature = "indicatif-progress")]
use std::io::IsTerminal;
use std::path::PathBuf;
use std::str::FromStr;
#[cfg(feature = "indicatif-progress")]
use std::sync::Arc;
#[derive(Parser)]
#[command(name = "tpchgen")]
#[command(version)]
#[command(
// -h output
about = "TPC-H Data Generator",
// --help output
long_about = r#"
TPCH Data Generator (https://github.com/clflushopt/tpchgen-rs)
By default each table is written to a single file named <output_dir>/<table>.<format>
If `--part` option is specified, each table is written to a subdirectory in
multiple files named <output_dir>/<table>/<table>.<part>.<format>
Examples
# Generate all tables at scale factor 1 (1GB) in TBL format (default) to /tmp/tpch directory:
tpchgen-cli -s 1 --output-dir=/tmp/tpch
# Generate all tables in CSV format:
tpchgen-cli csv -s 1 --output-dir=/tmp/tpch
# Generate scale factor one in CSV format with tab delimiter:
tpchgen-cli csv -s 1 --delimiter='\t' --output-dir=/tmp/tpch
# Generate the lineitem table at scale factor 100 in 10 Apache Parquet files to
# /tmp/tpch/lineitem:
tpchgen-cli parquet -s 100 --tables=lineitem --parts=10 --output-dir=/tmp/tpch
# Generate scale factor one in current directory, seeing debug output
RUST_LOG=debug tpchgen-cli -s 1 --output-dir=/tmp/tpch
"#,
args_conflicts_with_subcommands = true
)]
pub struct Cli {
#[command(subcommand)]
command: Option<Commands>,
#[command(flatten)]
args: TopLevelArgs,
}
#[derive(clap::Subcommand)]
enum Commands {
Tbl(TblArgs),
Csv(CsvArgs),
Parquet(ParquetArgs),
}
#[derive(clap::Args)]
struct CommonArgs {
#[arg(short, long, default_value_t = 1.)]
scale_factor: f64,
#[arg(short, long, default_value = ".")]
output_dir: PathBuf,
#[arg(short = 'T', long = "tables", value_delimiter = ',', value_parser = TableValueParser)]
tables: Option<Vec<Table>>,
#[arg(short, long)]
parts: Option<i32>,
#[arg(long)]
part: Option<i32>,
#[arg(short, long, default_value_t = num_cpus::get())]
num_threads: usize,
#[arg(short, long, default_value_t = false, conflicts_with = "quiet")]
verbose: bool,
#[arg(short, long, default_value_t = false, conflicts_with = "verbose")]
quiet: bool,
#[arg(long, default_value_t = false)]
stdout: bool,
#[arg(long = "no-progress", action = ArgAction::SetFalse, default_value_t = true)]
progress_bars_enabled: bool,
}
impl CommonArgs {
fn builder(self, format: OutputFormat) -> TpchGeneratorBuilder {
#[cfg(feature = "indicatif-progress")]
let progress = self
.should_show_progress_bars()
.then(|| Arc::new(IndicatifProgress::new()));
let mut builder = TpchGenerator::builder()
.with_scale_factor(self.scale_factor)
.with_output_dir(self.output_dir)
.with_format(format)
.with_num_threads(self.num_threads)
.with_stdout(self.stdout);
if let Some(tables) = self.tables {
builder = builder.with_tables(tables);
}
if let Some(parts) = self.parts {
builder = builder.with_parts(parts);
}
if let Some(part) = self.part {
builder = builder.with_part(part);
}
#[cfg(feature = "indicatif-progress")]
configure_logging(
self.verbose,
self.quiet,
progress.as_ref().map(|progress| progress.log_writer()),
);
#[cfg(not(feature = "indicatif-progress"))]
configure_logging(self.verbose, self.quiet, None);
#[cfg(feature = "indicatif-progress")]
if let Some(progress) = progress {
builder = builder.with_progress_tracker(progress);
}
builder
}
#[cfg(feature = "indicatif-progress")]
fn should_show_progress_bars(&self) -> bool {
self.progress_bars_enabled && !self.quiet && !self.stdout && io::stderr().is_terminal()
}
}
#[derive(clap::Args)]
struct TopLevelArgs {
#[command(flatten)]
common: CommonArgs,
#[arg(short, long, hide = true)]
format: Option<OutputFormat>,
#[arg(short = 'c', long, hide = true)]
parquet_compression: Option<Compression>,
#[arg(long, hide = true)]
parquet_row_group_bytes: Option<i64>,
}
#[derive(clap::Args)]
struct TblArgs {
#[command(flatten)]
common: CommonArgs,
}
#[derive(clap::Args)]
struct CsvArgs {
#[command(flatten)]
common: CommonArgs,
#[arg(long, default_value = ",", value_parser = parse_delimiter)]
delimiter: char,
}
#[derive(clap::Args)]
struct ParquetArgs {
#[command(flatten)]
common: CommonArgs,
#[arg(short = 'c', long, default_value = "SNAPPY")]
compression: Compression,
#[arg(long, default_value_t = DEFAULT_PARQUET_ROW_GROUP_BYTES)]
row_group_bytes: i64,
}
fn parse_delimiter(s: &str) -> Result<char, String> {
let parsed = match s {
"\\t" => '\t',
"\\n" => '\n',
"\\r" => '\r',
"\\\\" => '\\',
_ => {
let chars: Vec<char> = s.chars().collect();
if chars.len() != 1 {
return Err(format!(
"Delimiter must be a single character or escape sequence (\\t, \\n, \\r, \\\\), got: '{}'",
s
));
}
chars[0]
}
};
if !parsed.is_ascii() {
return Err(format!(
"Delimiter must be an ASCII character, got: '{}'",
parsed
));
}
Ok(parsed)
}
#[derive(Debug, Clone)]
struct TableValueParser;
impl TypedValueParser for TableValueParser {
type Value = Table;
fn parse_ref(
&self,
cmd: &clap::Command,
_: Option<&clap::Arg>,
value: &std::ffi::OsStr,
) -> Result<Self::Value, clap::Error> {
let value = value
.to_str()
.ok_or_else(|| clap::Error::new(clap::error::ErrorKind::InvalidValue).with_cmd(cmd))?;
Table::from_str(value)
.map_err(|_| clap::Error::new(clap::error::ErrorKind::InvalidValue).with_cmd(cmd))
}
fn possible_values(
&self,
) -> Option<Box<dyn Iterator<Item = clap::builder::PossibleValue> + '_>> {
Some(Box::new(
[
clap::builder::PossibleValue::new("region").help("Region table (alias: r)"),
clap::builder::PossibleValue::new("nation").help("Nation table (alias: n)"),
clap::builder::PossibleValue::new("supplier").help("Supplier table (alias: s)"),
clap::builder::PossibleValue::new("customer").help("Customer table (alias: c)"),
clap::builder::PossibleValue::new("part").help("Part table (alias: P)"),
clap::builder::PossibleValue::new("partsupp").help("PartSupp table (alias: S)"),
clap::builder::PossibleValue::new("orders").help("Orders table (alias: O)"),
clap::builder::PossibleValue::new("lineitem").help("LineItem table (alias: L)"),
]
.into_iter(),
))
}
}
impl Cli {
pub async fn run(self) -> io::Result<()> {
match self.command {
Some(Commands::Tbl(args)) => args.run().await,
Some(Commands::Csv(args)) => args.run().await,
Some(Commands::Parquet(args)) => args.run().await,
None => self.run_default().await,
}
}
async fn run_default(self) -> io::Result<()> {
let (format, subcommand) = if let Some(format) = self.args.format {
let subcommand = match format {
OutputFormat::Parquet => "parquet",
OutputFormat::Csv => "csv",
OutputFormat::Tbl => "tbl",
};
(format, Some(subcommand))
} else {
(OutputFormat::Tbl, None)
};
let mut builder = self.args.common.builder(format);
if let Some(subcommand) = subcommand {
log::warn!(
"The --format flag will be removed in v4.0.0. Use `tpchgen-cli {subcommand}` instead."
);
}
if let Some(parquet_compression) = self.args.parquet_compression {
if format == OutputFormat::Parquet {
log::warn!("The --parquet-compression flag is deprecated. Use 'tpchgen-cli parquet --compression=...' instead");
builder = builder.with_parquet_compression(parquet_compression);
} else {
log::warn!("--parquet-compression ignored: output format is not parquet");
}
}
if let Some(parquet_row_group_bytes) = self.args.parquet_row_group_bytes {
if format == OutputFormat::Parquet {
log::warn!("The --parquet-row-group-bytes flag is deprecated. Use 'tpchgen-cli parquet --row-group-bytes=...' instead");
builder = builder.with_parquet_row_group_bytes(parquet_row_group_bytes);
} else {
log::warn!("--parquet-row-group-bytes ignored: output format is not parquet");
}
}
builder.build().generate().await
}
}
impl TblArgs {
async fn run(self) -> io::Result<()> {
self.common
.builder(OutputFormat::Tbl)
.build()
.generate()
.await
}
}
impl CsvArgs {
async fn run(self) -> io::Result<()> {
self.common
.builder(OutputFormat::Csv)
.with_csv_delimiter(self.delimiter)
.build()
.generate()
.await
}
}
impl ParquetArgs {
async fn run(self) -> io::Result<()> {
self.common
.builder(OutputFormat::Parquet)
.with_parquet_compression(self.compression)
.with_parquet_row_group_bytes(self.row_group_bytes)
.build()
.generate()
.await
}
}
fn configure_logging(
verbose: bool,
quiet: bool,
log_writer: Option<Box<dyn io::Write + Send + 'static>>,
) {
let mut builder = env_logger::builder();
if quiet {
builder.filter_level(LevelFilter::Error);
} else if verbose {
builder.filter_level(LevelFilter::Info);
} else {
builder.filter_level(LevelFilter::Warn).parse_default_env();
}
if let Some(log_writer) = log_writer {
builder.target(env_logger::Target::Pipe(log_writer));
}
builder.init();
if verbose {
info!("Verbose output enabled (ignoring RUST_LOG environment variable)");
}
}