tpchgen_cli/
lib.rs

1//! TPC-H Data Generator Library
2//!
3//! This crate provides both a command-line tool and a library for generating
4//! TPC-H benchmark data in various formats (TBL, CSV, Parquet).
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use tpchgen_cli::{TpchGenerator, Table, OutputFormat};
10//! use std::path::PathBuf;
11//!
12//! # async fn example() -> std::io::Result<()> {
13//! let generator = TpchGenerator::builder()
14//!     .with_scale_factor(10.0)
15//!     .with_output_dir(PathBuf::from("./data"))
16//!     .with_tables(vec![Table::Customer, Table::Orders])
17//!     .with_format(OutputFormat::Parquet)
18//!     .with_num_threads(8)
19//!     .build();
20//!
21//! generator.generate().await?;
22//! # Ok(())
23//! # }
24//! ```
25
26pub use crate::plan::{GenerationPlan, DEFAULT_PARQUET_ROW_GROUP_BYTES};
27pub use ::parquet::basic::Compression;
28
29pub mod csv;
30pub mod generate;
31pub mod output_plan;
32pub mod parquet;
33pub mod plan;
34pub mod runner;
35pub mod statistics;
36pub mod tbl;
37
38use crate::generate::Sink;
39use crate::parquet::IntoSize;
40use crate::statistics::WriteStatistics;
41use std::fmt::Display;
42use std::fs::File;
43use std::io::{self, BufWriter, Stdout, Write};
44use std::str::FromStr;
45
46/// Wrapper around a buffer writer that counts the number of buffers and bytes written
47pub struct WriterSink<W: Write> {
48    statistics: WriteStatistics,
49    inner: W,
50}
51
52impl<W: Write> WriterSink<W> {
53    pub fn new(inner: W) -> Self {
54        Self {
55            inner,
56            statistics: WriteStatistics::new("buffers"),
57        }
58    }
59}
60
61impl<W: Write + Send> Sink for WriterSink<W> {
62    fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error> {
63        self.statistics.increment_chunks(1);
64        self.statistics.increment_bytes(buffer.len());
65        self.inner.write_all(buffer)
66    }
67
68    fn flush(mut self) -> Result<(), io::Error> {
69        self.inner.flush()
70    }
71}
72
73impl IntoSize for BufWriter<Stdout> {
74    fn into_size(self) -> Result<usize, io::Error> {
75        // we can't get the size of stdout, so just return 0
76        Ok(0)
77    }
78}
79
80impl IntoSize for BufWriter<File> {
81    fn into_size(self) -> Result<usize, io::Error> {
82        let file = self.into_inner()?;
83        let metadata = file.metadata()?;
84        Ok(metadata.len() as usize)
85    }
86}
87
88/// TPC-H table types
89///
90/// Represents the 8 tables in the TPC-H benchmark schema.
91/// Tables are ordered by size (smallest to largest at SF=1).
92#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
93pub enum Table {
94    /// Nation table (25 rows)
95    Nation,
96    /// Region table (5 rows)
97    Region,
98    /// Part table (200,000 rows at SF=1)
99    Part,
100    /// Supplier table (10,000 rows at SF=1)
101    Supplier,
102    /// Part-Supplier relationship table (800,000 rows at SF=1)
103    Partsupp,
104    /// Customer table (150,000 rows at SF=1)
105    Customer,
106    /// Orders table (1,500,000 rows at SF=1)
107    Orders,
108    /// Line item table (6,000,000 rows at SF=1)
109    Lineitem,
110}
111
112impl Display for Table {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        write!(f, "{}", self.name())
115    }
116}
117
118impl FromStr for Table {
119    type Err = &'static str;
120
121    /// Returns the table enum value from the given string full name or abbreviation
122    ///
123    /// The original dbgen tool allows some abbreviations to mean two different tables
124    /// like 'p' which aliases to both 'part' and 'partsupp'. This implementation does
125    /// not support this since it just adds unnecessary complexity and confusion so we
126    /// only support the exclusive abbreviations.
127    fn from_str(s: &str) -> Result<Self, Self::Err> {
128        match s {
129            "n" | "nation" => Ok(Table::Nation),
130            "r" | "region" => Ok(Table::Region),
131            "s" | "supplier" => Ok(Table::Supplier),
132            "P" | "part" => Ok(Table::Part),
133            "S" | "partsupp" => Ok(Table::Partsupp),
134            "c" | "customer" => Ok(Table::Customer),
135            "O" | "orders" => Ok(Table::Orders),
136            "L" | "lineitem" => Ok(Table::Lineitem),
137            _ => Err("Invalid table name {s}"),
138        }
139    }
140}
141
142impl Table {
143    fn name(&self) -> &'static str {
144        match self {
145            Table::Nation => "nation",
146            Table::Region => "region",
147            Table::Part => "part",
148            Table::Supplier => "supplier",
149            Table::Partsupp => "partsupp",
150            Table::Customer => "customer",
151            Table::Orders => "orders",
152            Table::Lineitem => "lineitem",
153        }
154    }
155}
156
157/// Output format for generated data
158///
159/// # Format Details
160///
161/// - **TBL**: Pipe-delimited format compatible with original dbgen tool
162/// - **CSV**: Comma-separated values with proper escaping
163/// - **Parquet**: Columnar Apache Parquet format with configurable compression
164#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
165pub enum OutputFormat {
166    /// TBL format (pipe-delimited, dbgen-compatible)
167    Tbl,
168    /// CSV format (comma-separated values)
169    Csv,
170    /// Apache Parquet format (columnar, compressed)
171    Parquet,
172}
173
174impl FromStr for OutputFormat {
175    type Err = String;
176
177    fn from_str(s: &str) -> Result<Self, Self::Err> {
178        match s.to_lowercase().as_str() {
179            "tbl" => Ok(OutputFormat::Tbl),
180            "csv" => Ok(OutputFormat::Csv),
181            "parquet" => Ok(OutputFormat::Parquet),
182            _ => Err(format!(
183                "Invalid output format: {s}. Valid formats are: tbl, csv, parquet"
184            )),
185        }
186    }
187}
188
189impl Display for OutputFormat {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        match self {
192            OutputFormat::Tbl => write!(f, "tbl"),
193            OutputFormat::Csv => write!(f, "csv"),
194            OutputFormat::Parquet => write!(f, "parquet"),
195        }
196    }
197}
198
199/// Configuration for TPC-H data generation
200///
201/// This struct holds all the parameters needed to generate TPC-H benchmark data.
202/// It's typically not constructed directly - use [`TpchGeneratorBuilder`] instead.
203///
204/// # Examples
205///
206/// ```no_run
207/// use tpchgen_cli::{GeneratorConfig, OutputFormat};
208///
209/// // Usually you would use TpchGenerator::builder() instead
210/// let config = GeneratorConfig {
211///     scale_factor: 10.0,
212///     ..Default::default()
213/// };
214/// ```
215#[derive(Debug, Clone)]
216pub struct GeneratorConfig {
217    /// Scale factor (e.g., 1.0 for 1GB, 10.0 for 10GB)
218    pub scale_factor: f64,
219    /// Output directory for generated files
220    pub output_dir: std::path::PathBuf,
221    /// Tables to generate (if None, generates all tables)
222    pub tables: Option<Vec<Table>>,
223    /// Output format (TBL, CSV, or Parquet)
224    pub format: OutputFormat,
225    /// Number of threads for parallel generation
226    pub num_threads: usize,
227    /// Parquet compression format
228    pub parquet_compression: Compression,
229    /// Target row group size in bytes for Parquet files
230    pub parquet_row_group_bytes: i64,
231    /// Number of partitions to generate (if None, generates a single file per table)
232    pub parts: Option<i32>,
233    /// Specific partition to generate (1-based, requires parts to be set)
234    pub part: Option<i32>,
235    /// Write output to stdout instead of files
236    pub stdout: bool,
237}
238
239impl Default for GeneratorConfig {
240    fn default() -> Self {
241        Self {
242            scale_factor: 1.0,
243            output_dir: std::path::PathBuf::from("."),
244            tables: None,
245            format: OutputFormat::Tbl,
246            num_threads: num_cpus::get(),
247            parquet_compression: Compression::SNAPPY,
248            parquet_row_group_bytes: DEFAULT_PARQUET_ROW_GROUP_BYTES,
249            parts: None,
250            part: None,
251            stdout: false,
252        }
253    }
254}
255
256/// TPC-H data generator
257///
258/// The main entry point for generating TPC-H benchmark data.
259/// Use the builder pattern via [`TpchGenerator::builder()`] to configure and create instances.
260///
261/// # Examples
262///
263/// ```no_run
264/// use tpchgen_cli::{TpchGenerator, Table, OutputFormat};
265/// use std::path::PathBuf;
266/// use ::parquet::basic::ZstdLevel;
267/// # async fn example() -> std::io::Result<()> {
268/// // Generate all tables at scale factor 1 in TBL format
269/// TpchGenerator::builder()
270///     .with_scale_factor(1.0)
271///     .with_output_dir(PathBuf::from("./data"))
272///     .build()
273///     .generate()
274///     .await?;
275///
276/// // Generate specific tables in Parquet format with compression
277/// TpchGenerator::builder()
278///     .with_scale_factor(10.0)
279///     .with_output_dir(PathBuf::from("./benchmark_data"))
280///     .with_tables(vec![Table::Orders, Table::Lineitem])
281///     .with_format(OutputFormat::Parquet)
282///     .with_parquet_compression(tpchgen_cli::Compression::ZSTD(ZstdLevel::try_new(1).unwrap()))
283///     .with_num_threads(16)
284///     .build()
285///     .generate()
286///     .await?;
287/// # Ok(())
288/// # }
289/// ```
290pub struct TpchGenerator {
291    config: GeneratorConfig,
292}
293
294impl TpchGenerator {
295    /// Create a new builder for configuring the generator
296    ///
297    /// This is the recommended way to construct a [`TpchGenerator`].
298    ///
299    /// # Examples
300    ///
301    /// ```no_run
302    /// use tpchgen_cli::TpchGenerator;
303    ///
304    /// let generator = TpchGenerator::builder()
305    ///     .with_scale_factor(1.0)
306    ///     .build();
307    /// ```
308    pub fn builder() -> TpchGeneratorBuilder {
309        TpchGeneratorBuilder::new()
310    }
311
312    /// Generate TPC-H data with the configured settings
313    ///
314    /// This async method performs the actual data generation, creating files
315    /// in the configured output directory (or writing to stdout if configured).
316    ///
317    /// # Returns
318    ///
319    /// - `Ok(())` on successful generation
320    /// - `Err(io::Error)` if file I/O or generation fails
321    ///
322    /// # Examples
323    ///
324    /// ```no_run
325    /// use tpchgen_cli::TpchGenerator;
326    ///
327    /// # async fn example() -> std::io::Result<()> {
328    /// TpchGenerator::builder()
329    ///     .with_scale_factor(1.0)
330    ///     .build()
331    ///     .generate()
332    ///     .await?;
333    /// # Ok(())
334    /// # }
335    /// ```
336    pub async fn generate(self) -> io::Result<()> {
337        use crate::output_plan::OutputPlanGenerator;
338        use crate::runner::PlanRunner;
339        use log::info;
340        use std::time::Instant;
341        use tpchgen::distribution::Distributions;
342        use tpchgen::text::TextPool;
343
344        let config = self.config;
345
346        // Create output directory if it doesn't exist and we are not writing to stdout
347        if !config.stdout {
348            std::fs::create_dir_all(&config.output_dir)?;
349        }
350
351        // Determine which tables to generate
352        let tables: Vec<Table> = if let Some(tables) = config.tables {
353            tables
354        } else {
355            vec![
356                Table::Nation,
357                Table::Region,
358                Table::Part,
359                Table::Supplier,
360                Table::Partsupp,
361                Table::Customer,
362                Table::Orders,
363                Table::Lineitem,
364            ]
365        };
366
367        // Determine what files to generate
368        let mut output_plan_generator = OutputPlanGenerator::new(
369            config.format,
370            config.scale_factor,
371            config.parquet_compression,
372            config.parquet_row_group_bytes,
373            config.stdout,
374            config.output_dir,
375        );
376
377        for table in tables {
378            output_plan_generator.generate_plans(table, config.part, config.parts)?;
379        }
380        let output_plans = output_plan_generator.build();
381
382        // Force the creation of the distributions and text pool so it doesn't
383        // get charged to the first table
384        let start = Instant::now();
385        Distributions::static_default();
386        TextPool::get_or_init_default();
387        let elapsed = start.elapsed();
388        info!("Created static distributions and text pools in {elapsed:?}");
389
390        // Run
391        let runner = PlanRunner::new(output_plans, config.num_threads);
392        runner.run().await?;
393        info!("Generation complete!");
394        Ok(())
395    }
396}
397
398/// Builder for constructing a [`TpchGenerator`]
399///
400/// Provides a fluent interface for configuring TPC-H data generation parameters.
401/// All builder methods can be chained, and calling [`build()`](TpchGeneratorBuilder::build)
402/// produces a [`TpchGenerator`] ready to generate data.
403///
404/// # Defaults
405///
406/// - Scale factor: 1.0
407/// - Output directory: current directory (".")
408/// - Tables: all 8 tables
409/// - Format: TBL
410/// - Threads: number of CPUs
411/// - Parquet compression: SNAPPY
412/// - Row group size: 7MB
413///
414/// # Examples
415///
416/// ```no_run
417/// use tpchgen_cli::{TpchGenerator, Table, OutputFormat, Compression};
418/// use std::path::PathBuf;
419/// use ::parquet::basic::ZstdLevel;
420///
421/// # async fn example() -> std::io::Result<()> {
422/// let generator = TpchGenerator::builder()
423///     .with_scale_factor(100.0)
424///     .with_output_dir(PathBuf::from("/data/tpch"))
425///     .with_tables(vec![Table::Lineitem, Table::Orders])
426///     .with_format(OutputFormat::Parquet)
427///     .with_parquet_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
428///     .with_num_threads(32)
429///     .build();
430///
431/// generator.generate().await?;
432/// # Ok(())
433/// # }
434/// ```
435#[derive(Debug, Clone)]
436pub struct TpchGeneratorBuilder {
437    config: GeneratorConfig,
438}
439
440impl TpchGeneratorBuilder {
441    /// Create a new builder with default configuration
442    ///
443    /// # Examples
444    ///
445    /// ```
446    /// use tpchgen_cli::TpchGeneratorBuilder;
447    ///
448    /// let builder = TpchGeneratorBuilder::new();
449    /// ```
450    pub fn new() -> Self {
451        Self {
452            config: GeneratorConfig::default(),
453        }
454    }
455
456    /// Returns the scale factor.
457    pub fn scale_factor(&self) -> f64 {
458        self.config.scale_factor
459    }
460
461    /// Set the scale factor (e.g., 1.0 for 1GB, 10.0 for 10GB)
462    pub fn with_scale_factor(mut self, scale_factor: f64) -> Self {
463        self.config.scale_factor = scale_factor;
464        self
465    }
466
467    /// Set the output directory
468    pub fn with_output_dir(mut self, output_dir: impl Into<std::path::PathBuf>) -> Self {
469        self.config.output_dir = output_dir.into();
470        self
471    }
472
473    /// Set which tables to generate (default: all tables)
474    pub fn with_tables(mut self, tables: Vec<Table>) -> Self {
475        self.config.tables = Some(tables);
476        self
477    }
478
479    /// Set the output format (default: TBL)
480    pub fn with_format(mut self, format: OutputFormat) -> Self {
481        self.config.format = format;
482        self
483    }
484
485    /// Set the number of threads for parallel generation (default: number of CPUs)
486    pub fn with_num_threads(mut self, num_threads: usize) -> Self {
487        self.config.num_threads = num_threads;
488        self
489    }
490
491    /// Set Parquet compression format (default: SNAPPY)
492    pub fn with_parquet_compression(mut self, compression: Compression) -> Self {
493        self.config.parquet_compression = compression;
494        self
495    }
496
497    /// Set target row group size in bytes for Parquet files (default: 7MB)
498    pub fn with_parquet_row_group_bytes(mut self, bytes: i64) -> Self {
499        self.config.parquet_row_group_bytes = bytes;
500        self
501    }
502
503    /// Set the number of partitions to generate
504    pub fn with_parts(mut self, parts: i32) -> Self {
505        self.config.parts = Some(parts);
506        self
507    }
508
509    /// Set the specific partition to generate (1-based, requires parts to be set)
510    pub fn with_part(mut self, part: i32) -> Self {
511        self.config.part = Some(part);
512        self
513    }
514
515    /// Write output to stdout instead of files
516    pub fn with_stdout(mut self, stdout: bool) -> Self {
517        self.config.stdout = stdout;
518        self
519    }
520
521    /// Build the [`TpchGenerator`] with the configured settings
522    pub fn build(self) -> TpchGenerator {
523        TpchGenerator {
524            config: self.config,
525        }
526    }
527}
528
529impl Default for TpchGeneratorBuilder {
530    fn default() -> Self {
531        Self::new()
532    }
533}