Skip to main content

tpcgen_cli/
tpcds_cli.rs

1//! TPC-DS data generation CLI with a dbgen compatible API.
2use crate::tpch_cli::{Compression, DEFAULT_PARQUET_ROW_GROUP_BYTES};
3use clap::{ArgAction, Args, Subcommand};
4use std::fmt;
5use std::path::PathBuf;
6use tpcdsgen::config::{CompatMode, Session, SessionBuilder, Table};
7use tpcdsgen::error::InvalidOptionError;
8
9pub mod dat;
10
11type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
12const NOT_IMPLEMENTED: &str = "TPC-DS data generation is not yet implemented";
13
14#[derive(Args)]
15#[command(version)]
16#[command(args_conflicts_with_subcommands = true)]
17pub struct Cli {
18    #[command(subcommand)]
19    command: Option<Commands>,
20
21    #[command(flatten)]
22    args: DatArgs,
23}
24
25#[derive(Subcommand)]
26enum Commands {
27    /// Generate DAT (pipe-delimited) output
28    Dat(DatArgs),
29    /// Generate CSV output with CSV-specific options
30    Csv(CsvArgs),
31    /// Generate Apache Parquet output with Parquet-specific options
32    Parquet(ParquetArgs),
33}
34
35#[derive(Args)]
36struct DatArgs {
37    #[command(flatten)]
38    common: CommonArgs,
39}
40
41#[derive(Args)]
42struct CsvArgs {
43    #[command(flatten)]
44    common: CommonArgs,
45
46    /// CSV delimiter character (default: ',')
47    ///
48    /// Specifies the delimiter character to use when generating CSV files.
49    ///
50    /// Supports escape sequences: \t (tab), \n (newline), \r (carriage return), \\ (backslash)
51    /// Common delimiters: ',' (comma), '|' (pipe), '\t' (tab), ';' (semicolon)
52    #[arg(long, default_value = ",", value_parser = parse_delimiter)]
53    delimiter: char,
54}
55
56#[derive(Args)]
57struct ParquetArgs {
58    #[command(flatten)]
59    common: CommonArgs,
60
61    /// Parquet block compression format.
62    ///
63    /// Supported values: UNCOMPRESSED, ZSTD(N), SNAPPY, GZIP, LZO, BROTLI, LZ4
64    ///
65    /// Note to use zstd you must supply the "compression" level (1-22)
66    /// as a number in parentheses, e.g. `ZSTD(1)` for level 1 compression.
67    ///
68    /// Using `ZSTD` results in the best compression, but is about 2x slower than
69    /// UNCOMPRESSED. For example, for the lineitem table at SF=10
70    ///
71    ///   ZSTD(1):      1.9G  (0.52 GB/sec)
72    ///   SNAPPY:       2.4G  (0.75 GB/sec)
73    ///   UNCOMPRESSED: 3.8G  (1.41 GB/sec)
74    #[arg(short = 'c', long, default_value = "SNAPPY")]
75    compression: Compression,
76
77    /// Target size in row group bytes in Parquet files
78    ///
79    /// Row groups are the typical unit of parallel processing and compression
80    /// with many query engines. Therefore, smaller row groups enable better
81    /// parallelism and lower peak memory use but may reduce compression
82    /// efficiency.
83    ///
84    /// Note: Parquet files are limited to 32k row groups, so at high scale
85    /// factors, the row group size may be increased to keep the number of row
86    /// groups under this limit.
87    ///
88    /// Typical values range from 10MB to 100MB.
89    #[arg(long, default_value_t = DEFAULT_PARQUET_ROW_GROUP_BYTES)]
90    row_group_bytes: i64,
91}
92
93#[derive(Args)]
94pub struct CommonArgs {
95    /// Scale factor to create
96    #[arg(short, long, default_value_t = 1.)]
97    scale_factor: f64,
98
99    /// Output directory for generated files (default: current directory)
100    #[arg(short, long, default_value = ".")]
101    output_dir: PathBuf,
102
103    /// Which tables to generate (default: all)
104    #[arg(short = 'T', long = "tables", value_delimiter = ',')]
105    tables: Option<Vec<String>>,
106
107    /// Reference implementation to match (default: trino)
108    #[arg(long, default_value_t = CompatMode::Trino)]
109    compat: CompatMode,
110
111    /// Verbose output
112    ///
113    /// When specified, sets the log level to `info` and ignores the `RUST_LOG`
114    /// environment variable. When not specified, uses `RUST_LOG`
115    #[arg(short, long, default_value_t = false, conflicts_with = "quiet")]
116    verbose: bool,
117
118    /// Quiet mode - only show error-level logs
119    #[arg(short, long, default_value_t = false, conflicts_with = "verbose")]
120    quiet: bool,
121
122    /// Disable progress bars during data generation.
123    ///
124    /// Bars are also auto-suppressed by `--quiet` or when stderr is not a terminal.
125    #[arg(long = "no-progress", action = ArgAction::SetFalse, default_value_t = true)]
126    progress_bars_enabled: bool,
127}
128
129impl Cli {
130    pub fn run(self) -> Result<()> {
131        match self.command {
132            Some(Commands::Dat(args)) => args.run(),
133            Some(Commands::Csv(args)) => args.run(),
134            Some(Commands::Parquet(args)) => args.run(),
135            None => self.args.run(),
136        }
137    }
138}
139
140impl DatArgs {
141    fn run(self) -> Result<()> {
142        self.common.run_dat()
143    }
144}
145
146impl CsvArgs {
147    fn run(self) -> Result<()> {
148        let _ = self.delimiter;
149        self.common.run_not_implemented()
150    }
151}
152
153impl ParquetArgs {
154    fn run(self) -> Result<()> {
155        let _ = (self.compression, self.row_group_bytes);
156        self.common.run_not_implemented()
157    }
158}
159
160impl CommonArgs {
161    fn run_dat(self) -> Result<()> {
162        let _ = self.progress_bars_enabled;
163        std::fs::create_dir_all(&self.output_dir)?;
164        if let Some(tables) = &self.tables {
165            for table in tables {
166                self.run_dat_for_table(Some(table.clone()))?;
167            }
168        } else {
169            self.run_dat_for_table(None)?;
170        }
171
172        Ok(())
173    }
174
175    fn run_dat_for_table(&self, table: Option<String>) -> Result<()> {
176        let session = self.to_session(table)?;
177        dat::generate(&session)
178    }
179
180    fn to_session(&self, table: Option<String>) -> Result<Session> {
181        let table = table
182            .as_deref()
183            .map(|table| {
184                table
185                    .parse::<Table>()
186                    .map_err(|_| InvalidOptionError::new("table", table))
187            })
188            .transpose()?;
189
190        // store the command line arguments used to create this
191        let command_line_arguments = std::env::args().collect::<Vec<_>>().join(" ");
192
193        let mut builder = SessionBuilder::new()
194            .with_scale_factor(self.scale_factor)
195            .with_target_directory(self.output_dir.to_string_lossy())
196            .with_compat_mode(self.compat)
197            .with_command_line_arguments(command_line_arguments);
198
199        if let Some(table) = table {
200            builder = builder.with_table(table);
201        }
202
203        Ok(builder.build()?)
204    }
205
206    fn run_not_implemented(self) -> Result<()> {
207        let _ = self;
208        Err(Box::new(NotImplemented))
209    }
210}
211
212struct NotImplemented;
213
214impl fmt::Display for NotImplemented {
215    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216        f.write_str(NOT_IMPLEMENTED)
217    }
218}
219
220impl fmt::Debug for NotImplemented {
221    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222        fmt::Display::fmt(self, f)
223    }
224}
225
226impl std::error::Error for NotImplemented {}
227
228fn parse_delimiter(s: &str) -> std::result::Result<char, String> {
229    let parsed = match s {
230        "\\t" => '\t',
231        "\\n" => '\n',
232        "\\r" => '\r',
233        "\\\\" => '\\',
234        _ => {
235            let chars: Vec<char> = s.chars().collect();
236            if chars.len() != 1 {
237                return Err(format!(
238                    "Delimiter must be a single character or escape sequence (\\t, \\n, \\r, \\\\), got: '{}'",
239                    s
240                ));
241            }
242            chars[0]
243        }
244    };
245    if !parsed.is_ascii() {
246        return Err(format!(
247            "Delimiter must be an ASCII character, got: '{}'",
248            parsed
249        ));
250    }
251    Ok(parsed)
252}