Skip to main content

tpcgen_cli/tpch_cli/
cli.rs

1#[cfg(feature = "indicatif-progress")]
2use super::progress::IndicatifProgress;
3use super::{
4    Compression, OutputFormat, Table, TpchGenerator, TpchGeneratorBuilder,
5    DEFAULT_PARQUET_ROW_GROUP_BYTES,
6};
7use clap::builder::TypedValueParser;
8use clap::{ArgAction, Parser};
9use log::{info, LevelFilter};
10use std::io;
11#[cfg(feature = "indicatif-progress")]
12use std::io::IsTerminal;
13use std::path::PathBuf;
14use std::str::FromStr;
15#[cfg(feature = "indicatif-progress")]
16use std::sync::Arc;
17
18#[derive(Parser)]
19#[command(name = "tpchgen")]
20#[command(version)]
21#[command(
22    // -h output
23    about = "TPC-H Data Generator",
24    // --help output
25    long_about = r#"
26TPCH Data Generator (https://github.com/clflushopt/tpchgen-rs)
27
28By default each table is written to a single file named <output_dir>/<table>.<format>
29
30If `--part` option is specified, each table is written to a subdirectory in
31multiple files named <output_dir>/<table>/<table>.<part>.<format>
32
33Examples
34
35# Generate all tables at scale factor 1 (1GB) in TBL format (default) to /tmp/tpch directory:
36
37tpchgen-cli -s 1 --output-dir=/tmp/tpch
38
39# Generate all tables in CSV format:
40
41tpchgen-cli csv -s 1 --output-dir=/tmp/tpch
42
43# Generate scale factor one in CSV format with tab delimiter:
44
45tpchgen-cli csv -s 1 --delimiter='\t' --output-dir=/tmp/tpch
46
47# Generate the lineitem table at scale factor 100 in 10 Apache Parquet files to
48# /tmp/tpch/lineitem:
49
50tpchgen-cli parquet -s 100 --tables=lineitem --parts=10 --output-dir=/tmp/tpch
51
52# Generate scale factor one in current directory, seeing debug output
53
54RUST_LOG=debug tpchgen-cli -s 1 --output-dir=/tmp/tpch
55"#,
56    args_conflicts_with_subcommands = true
57)]
58pub struct Cli {
59    #[command(subcommand)]
60    command: Option<Commands>,
61
62    // Top-level args are only used when no subcommand is given (legacy path).
63    // args_conflicts_with_subcommands prevents these from being silently ignored
64    // when a subcommand is present (e.g. `tpchgen-cli -s 10 parquet` is an error).
65    #[command(flatten)]
66    args: TopLevelArgs,
67}
68
69#[derive(clap::Subcommand)]
70enum Commands {
71    /// Generate TBL (pipe-delimited) output
72    Tbl(TblArgs),
73    /// Generate CSV output with CSV-specific options
74    Csv(CsvArgs),
75    /// Generate Apache Parquet output with Parquet-specific options
76    Parquet(ParquetArgs),
77}
78
79#[derive(clap::Args)]
80struct CommonArgs {
81    /// Scale factor to create
82    #[arg(short, long, default_value_t = 1.)]
83    scale_factor: f64,
84
85    /// Output directory for generated files (default: current directory)
86    #[arg(short, long, default_value = ".")]
87    output_dir: PathBuf,
88
89    /// Which tables to generate (default: all)
90    #[arg(short = 'T', long = "tables", value_delimiter = ',', value_parser = TableValueParser)]
91    tables: Option<Vec<Table>>,
92
93    /// Number of part(itions) to generate. If not specified creates a single file per table
94    #[arg(short, long)]
95    parts: Option<i32>,
96
97    /// Which part(ition) to generate (1-based). If not specified, generates all parts
98    #[arg(long)]
99    part: Option<i32>,
100
101    /// The number of threads for parallel generation, defaults to the number of CPUs
102    #[arg(short, long, default_value_t = num_cpus::get())]
103    num_threads: usize,
104
105    /// Verbose output
106    ///
107    /// When specified, sets the log level to `info` and ignores the `RUST_LOG`
108    /// environment variable. When not specified, uses `RUST_LOG`
109    #[arg(short, long, default_value_t = false, conflicts_with = "quiet")]
110    verbose: bool,
111
112    /// Quiet mode - only show error-level logs
113    #[arg(short, long, default_value_t = false, conflicts_with = "verbose")]
114    quiet: bool,
115
116    /// Write the output to stdout instead of a file.
117    #[arg(long, default_value_t = false)]
118    stdout: bool,
119
120    /// Disable progress bars during data generation.
121    ///
122    /// Bars are also auto-suppressed by `--quiet`, `--stdout`, or when
123    /// stderr is not a terminal.
124    #[arg(long = "no-progress", action = ArgAction::SetFalse, default_value_t = true)]
125    progress_bars_enabled: bool,
126}
127
128impl CommonArgs {
129    /// Initialize CLI logging/progress output and create a
130    /// [`TpchGeneratorBuilder`] pre-configured with the common options.
131    fn builder(self, format: OutputFormat) -> TpchGeneratorBuilder {
132        #[cfg(feature = "indicatif-progress")]
133        let progress = self
134            .should_show_progress_bars()
135            .then(|| Arc::new(IndicatifProgress::new()));
136
137        let mut builder = TpchGenerator::builder()
138            .with_scale_factor(self.scale_factor)
139            .with_output_dir(self.output_dir)
140            .with_format(format)
141            .with_num_threads(self.num_threads)
142            .with_stdout(self.stdout);
143
144        if let Some(tables) = self.tables {
145            builder = builder.with_tables(tables);
146        }
147        if let Some(parts) = self.parts {
148            builder = builder.with_parts(parts);
149        }
150        if let Some(part) = self.part {
151            builder = builder.with_part(part);
152        }
153
154        #[cfg(feature = "indicatif-progress")]
155        configure_logging(
156            self.verbose,
157            self.quiet,
158            progress.as_ref().map(|progress| progress.log_writer()),
159        );
160        #[cfg(not(feature = "indicatif-progress"))]
161        configure_logging(self.verbose, self.quiet, None);
162
163        #[cfg(feature = "indicatif-progress")]
164        if let Some(progress) = progress {
165            builder = builder.with_progress_tracker(progress);
166        }
167
168        builder
169    }
170
171    #[cfg(feature = "indicatif-progress")]
172    fn should_show_progress_bars(&self) -> bool {
173        // Show progress only on an interactive terminal and when no flag
174        // suppresses it. `--stdout` is included so piped data isn't
175        // interleaved with bar redraws on shared shells.
176        self.progress_bars_enabled && !self.quiet && !self.stdout && io::stderr().is_terminal()
177    }
178}
179
180#[derive(clap::Args)]
181struct TopLevelArgs {
182    #[command(flatten)]
183    common: CommonArgs,
184
185    /// Output format (deprecated: use subcommands `tbl`, `csv`, or `parquet` instead)
186    ///
187    /// The --format flag will be removed in v4.0.0.
188    #[arg(short, long, hide = true)]
189    format: Option<OutputFormat>,
190
191    /// Parquet block compression format (deprecated: use 'parquet' subcommand instead)
192    #[arg(short = 'c', long, hide = true)]
193    parquet_compression: Option<Compression>,
194
195    /// Target row group size in bytes (deprecated: use 'parquet' subcommand instead)
196    #[arg(long, hide = true)]
197    parquet_row_group_bytes: Option<i64>,
198}
199
200#[derive(clap::Args)]
201struct TblArgs {
202    #[command(flatten)]
203    common: CommonArgs,
204}
205
206#[derive(clap::Args)]
207struct CsvArgs {
208    #[command(flatten)]
209    common: CommonArgs,
210
211    /// CSV delimiter character (default: ',')
212    ///
213    /// Specifies the delimiter character to use when generating CSV files.
214    ///
215    /// Supports escape sequences: \t (tab), \n (newline), \r (carriage return), \\ (backslash)
216    /// Common delimiters: ',' (comma), '|' (pipe), '\t' (tab), ';' (semicolon)
217    #[arg(long, default_value = ",", value_parser = parse_delimiter)]
218    delimiter: char,
219}
220
221#[derive(clap::Args)]
222struct ParquetArgs {
223    #[command(flatten)]
224    common: CommonArgs,
225
226    /// Parquet block compression format.
227    ///
228    /// Supported values: UNCOMPRESSED, ZSTD(N), SNAPPY, GZIP, LZO, BROTLI, LZ4
229    ///
230    /// Note to use zstd you must supply the "compression" level (1-22)
231    /// as a number in parentheses, e.g. `ZSTD(1)` for level 1 compression.
232    ///
233    /// Using `ZSTD` results in the best compression, but is about 2x slower than
234    /// UNCOMPRESSED. For example, for the lineitem table at SF=10
235    ///
236    ///   ZSTD(1):      1.9G  (0.52 GB/sec)
237    ///   SNAPPY:       2.4G  (0.75 GB/sec)
238    ///   UNCOMPRESSED: 3.8G  (1.41 GB/sec)
239    #[arg(short = 'c', long, default_value = "SNAPPY")]
240    compression: Compression,
241
242    /// Target size in row group bytes in Parquet files
243    ///
244    /// Row groups are the typical unit of parallel processing and compression
245    /// with many query engines. Therefore, smaller row groups enable better
246    /// parallelism and lower peak memory use but may reduce compression
247    /// efficiency.
248    ///
249    /// Note: Parquet files are limited to 32k row groups, so at high scale
250    /// factors, the row group size may be increased to keep the number of row
251    /// groups under this limit.
252    ///
253    /// Typical values range from 10MB to 100MB.
254    #[arg(long, default_value_t = DEFAULT_PARQUET_ROW_GROUP_BYTES)]
255    row_group_bytes: i64,
256}
257
258/// Parse a delimiter string, handling escape sequences.
259///
260/// The underlying arrow-csv writer requires an ASCII byte for the delimiter,
261/// so non-ASCII characters are rejected here rather than failing mid-generation.
262fn parse_delimiter(s: &str) -> Result<char, String> {
263    // Handle common escape sequences
264    let parsed = match s {
265        "\\t" => '\t',
266        "\\n" => '\n',
267        "\\r" => '\r',
268        "\\\\" => '\\',
269        _ => {
270            // If it's not an escape sequence, it should be a single character
271            let chars: Vec<char> = s.chars().collect();
272            if chars.len() != 1 {
273                return Err(format!(
274                    "Delimiter must be a single character or escape sequence (\\t, \\n, \\r, \\\\), got: '{}'",
275                    s
276                ));
277            }
278            chars[0]
279        }
280    };
281    if !parsed.is_ascii() {
282        return Err(format!(
283            "Delimiter must be an ASCII character, got: '{}'",
284            parsed
285        ));
286    }
287    Ok(parsed)
288}
289
290// TableValueParser is CLI-specific and uses the Table type from the library
291#[derive(Debug, Clone)]
292struct TableValueParser;
293
294impl TypedValueParser for TableValueParser {
295    type Value = Table;
296
297    /// Parse the value into a Table enum.
298    fn parse_ref(
299        &self,
300        cmd: &clap::Command,
301        _: Option<&clap::Arg>,
302        value: &std::ffi::OsStr,
303    ) -> Result<Self::Value, clap::Error> {
304        let value = value
305            .to_str()
306            .ok_or_else(|| clap::Error::new(clap::error::ErrorKind::InvalidValue).with_cmd(cmd))?;
307        Table::from_str(value)
308            .map_err(|_| clap::Error::new(clap::error::ErrorKind::InvalidValue).with_cmd(cmd))
309    }
310
311    fn possible_values(
312        &self,
313    ) -> Option<Box<dyn Iterator<Item = clap::builder::PossibleValue> + '_>> {
314        Some(Box::new(
315            [
316                clap::builder::PossibleValue::new("region").help("Region table (alias: r)"),
317                clap::builder::PossibleValue::new("nation").help("Nation table (alias: n)"),
318                clap::builder::PossibleValue::new("supplier").help("Supplier table (alias: s)"),
319                clap::builder::PossibleValue::new("customer").help("Customer table (alias: c)"),
320                clap::builder::PossibleValue::new("part").help("Part table (alias: P)"),
321                clap::builder::PossibleValue::new("partsupp").help("PartSupp table (alias: S)"),
322                clap::builder::PossibleValue::new("orders").help("Orders table (alias: O)"),
323                clap::builder::PossibleValue::new("lineitem").help("LineItem table (alias: L)"),
324            ]
325            .into_iter(),
326        ))
327    }
328}
329
330impl Cli {
331    /// Run data generation for the selected command.
332    pub async fn run(self) -> io::Result<()> {
333        match self.command {
334            Some(Commands::Tbl(args)) => args.run().await,
335            Some(Commands::Csv(args)) => args.run().await,
336            Some(Commands::Parquet(args)) => args.run().await,
337            None => self.run_default().await,
338        }
339    }
340
341    async fn run_default(self) -> io::Result<()> {
342        // Warn about --format migration to subcommands (only when explicitly provided)
343        let (format, subcommand) = if let Some(format) = self.args.format {
344            let subcommand = match format {
345                OutputFormat::Parquet => "parquet",
346                OutputFormat::Csv => "csv",
347                OutputFormat::Tbl => "tbl",
348            };
349            (format, Some(subcommand))
350        } else {
351            (OutputFormat::Tbl, None)
352        };
353
354        let mut builder = self.args.common.builder(format);
355        if let Some(subcommand) = subcommand {
356            log::warn!(
357                "The --format flag will be removed in v4.0.0. Use `tpchgen-cli {subcommand}` instead."
358            );
359        }
360
361        if let Some(parquet_compression) = self.args.parquet_compression {
362            if format == OutputFormat::Parquet {
363                log::warn!("The --parquet-compression flag is deprecated. Use 'tpchgen-cli parquet --compression=...' instead");
364                builder = builder.with_parquet_compression(parquet_compression);
365            } else {
366                log::warn!("--parquet-compression ignored: output format is not parquet");
367            }
368        }
369
370        if let Some(parquet_row_group_bytes) = self.args.parquet_row_group_bytes {
371            if format == OutputFormat::Parquet {
372                log::warn!("The --parquet-row-group-bytes flag is deprecated. Use 'tpchgen-cli parquet --row-group-bytes=...' instead");
373                builder = builder.with_parquet_row_group_bytes(parquet_row_group_bytes);
374            } else {
375                log::warn!("--parquet-row-group-bytes ignored: output format is not parquet");
376            }
377        }
378
379        builder.build().generate().await
380    }
381}
382
383impl TblArgs {
384    async fn run(self) -> io::Result<()> {
385        self.common
386            .builder(OutputFormat::Tbl)
387            .build()
388            .generate()
389            .await
390    }
391}
392
393impl CsvArgs {
394    async fn run(self) -> io::Result<()> {
395        self.common
396            .builder(OutputFormat::Csv)
397            .with_csv_delimiter(self.delimiter)
398            .build()
399            .generate()
400            .await
401    }
402}
403
404impl ParquetArgs {
405    async fn run(self) -> io::Result<()> {
406        self.common
407            .builder(OutputFormat::Parquet)
408            .with_parquet_compression(self.compression)
409            .with_parquet_row_group_bytes(self.row_group_bytes)
410            .build()
411            .generate()
412            .await
413    }
414}
415
416fn configure_logging(
417    verbose: bool,
418    quiet: bool,
419    log_writer: Option<Box<dyn io::Write + Send + 'static>>,
420) {
421    let mut builder = env_logger::builder();
422    if quiet {
423        // Quiet mode: only show error-level logs
424        builder.filter_level(LevelFilter::Error);
425    } else if verbose {
426        builder.filter_level(LevelFilter::Info);
427    } else {
428        // Default: show warnings and errors, but respect RUST_LOG if set
429        builder.filter_level(LevelFilter::Warn).parse_default_env();
430    }
431    if let Some(log_writer) = log_writer {
432        builder.target(env_logger::Target::Pipe(log_writer));
433    }
434
435    builder.init();
436
437    if verbose {
438        info!("Verbose output enabled (ignoring RUST_LOG environment variable)");
439    }
440}