1#[cfg(feature = "indicatif-progress")]
2use super::progress::IndicatifProgress;
3use super::{
4 Compression, OutputFormat, Table, TpchGenerator, TpchGeneratorBuilder,
5 DEFAULT_PARQUET_ROW_GROUP_BYTES,
6};
7use clap::builder::TypedValueParser;
8use clap::{ArgAction, Parser};
9use log::{info, LevelFilter};
10use std::io;
11#[cfg(feature = "indicatif-progress")]
12use std::io::IsTerminal;
13use std::path::PathBuf;
14use std::str::FromStr;
15#[cfg(feature = "indicatif-progress")]
16use std::sync::Arc;
17
18#[derive(Parser)]
19#[command(name = "tpchgen")]
20#[command(version)]
21#[command(
22 about = "TPC-H Data Generator",
24 long_about = r#"
26TPCH Data Generator (https://github.com/clflushopt/tpchgen-rs)
27
28By default each table is written to a single file named <output_dir>/<table>.<format>
29
30If `--part` option is specified, each table is written to a subdirectory in
31multiple files named <output_dir>/<table>/<table>.<part>.<format>
32
33Examples
34
35# Generate all tables at scale factor 1 (1GB) in TBL format (default) to /tmp/tpch directory:
36
37tpchgen-cli -s 1 --output-dir=/tmp/tpch
38
39# Generate all tables in CSV format:
40
41tpchgen-cli csv -s 1 --output-dir=/tmp/tpch
42
43# Generate scale factor one in CSV format with tab delimiter:
44
45tpchgen-cli csv -s 1 --delimiter='\t' --output-dir=/tmp/tpch
46
47# Generate the lineitem table at scale factor 100 in 10 Apache Parquet files to
48# /tmp/tpch/lineitem:
49
50tpchgen-cli parquet -s 100 --tables=lineitem --parts=10 --output-dir=/tmp/tpch
51
52# Generate scale factor one in current directory, seeing debug output
53
54RUST_LOG=debug tpchgen-cli -s 1 --output-dir=/tmp/tpch
55"#,
56 args_conflicts_with_subcommands = true
57)]
58pub struct Cli {
59 #[command(subcommand)]
60 command: Option<Commands>,
61
62 #[command(flatten)]
66 args: TopLevelArgs,
67}
68
69#[derive(clap::Subcommand)]
70enum Commands {
71 Tbl(TblArgs),
73 Csv(CsvArgs),
75 Parquet(ParquetArgs),
77}
78
79#[derive(clap::Args)]
80struct CommonArgs {
81 #[arg(short, long, default_value_t = 1.)]
83 scale_factor: f64,
84
85 #[arg(short, long, default_value = ".")]
87 output_dir: PathBuf,
88
89 #[arg(short = 'T', long = "tables", value_delimiter = ',', value_parser = TableValueParser)]
91 tables: Option<Vec<Table>>,
92
93 #[arg(short, long)]
95 parts: Option<i32>,
96
97 #[arg(long)]
99 part: Option<i32>,
100
101 #[arg(short, long, default_value_t = num_cpus::get())]
103 num_threads: usize,
104
105 #[arg(short, long, default_value_t = false, conflicts_with = "quiet")]
110 verbose: bool,
111
112 #[arg(short, long, default_value_t = false, conflicts_with = "verbose")]
114 quiet: bool,
115
116 #[arg(long, default_value_t = false)]
118 stdout: bool,
119
120 #[arg(long = "no-progress", action = ArgAction::SetFalse, default_value_t = true)]
125 progress_bars_enabled: bool,
126}
127
128impl CommonArgs {
129 fn builder(self, format: OutputFormat) -> TpchGeneratorBuilder {
132 #[cfg(feature = "indicatif-progress")]
133 let progress = self
134 .should_show_progress_bars()
135 .then(|| Arc::new(IndicatifProgress::new()));
136
137 let mut builder = TpchGenerator::builder()
138 .with_scale_factor(self.scale_factor)
139 .with_output_dir(self.output_dir)
140 .with_format(format)
141 .with_num_threads(self.num_threads)
142 .with_stdout(self.stdout);
143
144 if let Some(tables) = self.tables {
145 builder = builder.with_tables(tables);
146 }
147 if let Some(parts) = self.parts {
148 builder = builder.with_parts(parts);
149 }
150 if let Some(part) = self.part {
151 builder = builder.with_part(part);
152 }
153
154 #[cfg(feature = "indicatif-progress")]
155 configure_logging(
156 self.verbose,
157 self.quiet,
158 progress.as_ref().map(|progress| progress.log_writer()),
159 );
160 #[cfg(not(feature = "indicatif-progress"))]
161 configure_logging(self.verbose, self.quiet, None);
162
163 #[cfg(feature = "indicatif-progress")]
164 if let Some(progress) = progress {
165 builder = builder.with_progress_tracker(progress);
166 }
167
168 builder
169 }
170
171 #[cfg(feature = "indicatif-progress")]
172 fn should_show_progress_bars(&self) -> bool {
173 self.progress_bars_enabled && !self.quiet && !self.stdout && io::stderr().is_terminal()
177 }
178}
179
180#[derive(clap::Args)]
181struct TopLevelArgs {
182 #[command(flatten)]
183 common: CommonArgs,
184
185 #[arg(short, long, hide = true)]
189 format: Option<OutputFormat>,
190
191 #[arg(short = 'c', long, hide = true)]
193 parquet_compression: Option<Compression>,
194
195 #[arg(long, hide = true)]
197 parquet_row_group_bytes: Option<i64>,
198}
199
200#[derive(clap::Args)]
201struct TblArgs {
202 #[command(flatten)]
203 common: CommonArgs,
204}
205
206#[derive(clap::Args)]
207struct CsvArgs {
208 #[command(flatten)]
209 common: CommonArgs,
210
211 #[arg(long, default_value = ",", value_parser = parse_delimiter)]
218 delimiter: char,
219}
220
221#[derive(clap::Args)]
222struct ParquetArgs {
223 #[command(flatten)]
224 common: CommonArgs,
225
226 #[arg(short = 'c', long, default_value = "SNAPPY")]
240 compression: Compression,
241
242 #[arg(long, default_value_t = DEFAULT_PARQUET_ROW_GROUP_BYTES)]
255 row_group_bytes: i64,
256}
257
258fn parse_delimiter(s: &str) -> Result<char, String> {
263 let parsed = match s {
265 "\\t" => '\t',
266 "\\n" => '\n',
267 "\\r" => '\r',
268 "\\\\" => '\\',
269 _ => {
270 let chars: Vec<char> = s.chars().collect();
272 if chars.len() != 1 {
273 return Err(format!(
274 "Delimiter must be a single character or escape sequence (\\t, \\n, \\r, \\\\), got: '{}'",
275 s
276 ));
277 }
278 chars[0]
279 }
280 };
281 if !parsed.is_ascii() {
282 return Err(format!(
283 "Delimiter must be an ASCII character, got: '{}'",
284 parsed
285 ));
286 }
287 Ok(parsed)
288}
289
290#[derive(Debug, Clone)]
292struct TableValueParser;
293
294impl TypedValueParser for TableValueParser {
295 type Value = Table;
296
297 fn parse_ref(
299 &self,
300 cmd: &clap::Command,
301 _: Option<&clap::Arg>,
302 value: &std::ffi::OsStr,
303 ) -> Result<Self::Value, clap::Error> {
304 let value = value
305 .to_str()
306 .ok_or_else(|| clap::Error::new(clap::error::ErrorKind::InvalidValue).with_cmd(cmd))?;
307 Table::from_str(value)
308 .map_err(|_| clap::Error::new(clap::error::ErrorKind::InvalidValue).with_cmd(cmd))
309 }
310
311 fn possible_values(
312 &self,
313 ) -> Option<Box<dyn Iterator<Item = clap::builder::PossibleValue> + '_>> {
314 Some(Box::new(
315 [
316 clap::builder::PossibleValue::new("region").help("Region table (alias: r)"),
317 clap::builder::PossibleValue::new("nation").help("Nation table (alias: n)"),
318 clap::builder::PossibleValue::new("supplier").help("Supplier table (alias: s)"),
319 clap::builder::PossibleValue::new("customer").help("Customer table (alias: c)"),
320 clap::builder::PossibleValue::new("part").help("Part table (alias: P)"),
321 clap::builder::PossibleValue::new("partsupp").help("PartSupp table (alias: S)"),
322 clap::builder::PossibleValue::new("orders").help("Orders table (alias: O)"),
323 clap::builder::PossibleValue::new("lineitem").help("LineItem table (alias: L)"),
324 ]
325 .into_iter(),
326 ))
327 }
328}
329
330impl Cli {
331 pub async fn run(self) -> io::Result<()> {
333 match self.command {
334 Some(Commands::Tbl(args)) => args.run().await,
335 Some(Commands::Csv(args)) => args.run().await,
336 Some(Commands::Parquet(args)) => args.run().await,
337 None => self.run_default().await,
338 }
339 }
340
341 async fn run_default(self) -> io::Result<()> {
342 let (format, subcommand) = if let Some(format) = self.args.format {
344 let subcommand = match format {
345 OutputFormat::Parquet => "parquet",
346 OutputFormat::Csv => "csv",
347 OutputFormat::Tbl => "tbl",
348 };
349 (format, Some(subcommand))
350 } else {
351 (OutputFormat::Tbl, None)
352 };
353
354 let mut builder = self.args.common.builder(format);
355 if let Some(subcommand) = subcommand {
356 log::warn!(
357 "The --format flag will be removed in v4.0.0. Use `tpchgen-cli {subcommand}` instead."
358 );
359 }
360
361 if let Some(parquet_compression) = self.args.parquet_compression {
362 if format == OutputFormat::Parquet {
363 log::warn!("The --parquet-compression flag is deprecated. Use 'tpchgen-cli parquet --compression=...' instead");
364 builder = builder.with_parquet_compression(parquet_compression);
365 } else {
366 log::warn!("--parquet-compression ignored: output format is not parquet");
367 }
368 }
369
370 if let Some(parquet_row_group_bytes) = self.args.parquet_row_group_bytes {
371 if format == OutputFormat::Parquet {
372 log::warn!("The --parquet-row-group-bytes flag is deprecated. Use 'tpchgen-cli parquet --row-group-bytes=...' instead");
373 builder = builder.with_parquet_row_group_bytes(parquet_row_group_bytes);
374 } else {
375 log::warn!("--parquet-row-group-bytes ignored: output format is not parquet");
376 }
377 }
378
379 builder.build().generate().await
380 }
381}
382
383impl TblArgs {
384 async fn run(self) -> io::Result<()> {
385 self.common
386 .builder(OutputFormat::Tbl)
387 .build()
388 .generate()
389 .await
390 }
391}
392
393impl CsvArgs {
394 async fn run(self) -> io::Result<()> {
395 self.common
396 .builder(OutputFormat::Csv)
397 .with_csv_delimiter(self.delimiter)
398 .build()
399 .generate()
400 .await
401 }
402}
403
404impl ParquetArgs {
405 async fn run(self) -> io::Result<()> {
406 self.common
407 .builder(OutputFormat::Parquet)
408 .with_parquet_compression(self.compression)
409 .with_parquet_row_group_bytes(self.row_group_bytes)
410 .build()
411 .generate()
412 .await
413 }
414}
415
416fn configure_logging(
417 verbose: bool,
418 quiet: bool,
419 log_writer: Option<Box<dyn io::Write + Send + 'static>>,
420) {
421 let mut builder = env_logger::builder();
422 if quiet {
423 builder.filter_level(LevelFilter::Error);
425 } else if verbose {
426 builder.filter_level(LevelFilter::Info);
427 } else {
428 builder.filter_level(LevelFilter::Warn).parse_default_env();
430 }
431 if let Some(log_writer) = log_writer {
432 builder.target(env_logger::Target::Pipe(log_writer));
433 }
434
435 builder.init();
436
437 if verbose {
438 info!("Verbose output enabled (ignoring RUST_LOG environment variable)");
439 }
440}