tpchgen_cli/lib.rs
1//! TPC-H Data Generator Library
2//!
3//! This crate provides both a command-line tool and a library for generating
4//! TPC-H benchmark data in various formats (TBL, CSV, Parquet).
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use tpchgen_cli::{TpchGenerator, Table, OutputFormat};
10//! use std::path::PathBuf;
11//!
12//! # async fn example() -> std::io::Result<()> {
13//! let generator = TpchGenerator::builder()
14//! .with_scale_factor(10.0)
15//! .with_output_dir(PathBuf::from("./data"))
16//! .with_tables(vec![Table::Customer, Table::Orders])
17//! .with_format(OutputFormat::Parquet)
18//! .with_num_threads(8)
19//! .build();
20//!
21//! generator.generate().await?;
22//! # Ok(())
23//! # }
24//! ```
25
26pub use crate::plan::{GenerationPlan, DEFAULT_PARQUET_ROW_GROUP_BYTES};
27pub use ::parquet::basic::Compression;
28
29pub mod csv;
30pub mod generate;
31pub mod output_plan;
32pub mod parquet;
33pub mod plan;
34pub mod runner;
35pub mod statistics;
36pub mod tbl;
37
38use crate::generate::Sink;
39use crate::parquet::IntoSize;
40use crate::statistics::WriteStatistics;
41use std::fmt::Display;
42use std::fs::File;
43use std::io::{self, BufWriter, Stdout, Write};
44use std::str::FromStr;
45
46/// Wrapper around a buffer writer that counts the number of buffers and bytes written
47pub struct WriterSink<W: Write> {
48 statistics: WriteStatistics,
49 inner: W,
50}
51
52impl<W: Write> WriterSink<W> {
53 pub fn new(inner: W) -> Self {
54 Self {
55 inner,
56 statistics: WriteStatistics::new("buffers"),
57 }
58 }
59}
60
61impl<W: Write + Send> Sink for WriterSink<W> {
62 fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error> {
63 self.statistics.increment_chunks(1);
64 self.statistics.increment_bytes(buffer.len());
65 self.inner.write_all(buffer)
66 }
67
68 fn flush(mut self) -> Result<(), io::Error> {
69 self.inner.flush()
70 }
71}
72
73impl IntoSize for BufWriter<Stdout> {
74 fn into_size(self) -> Result<usize, io::Error> {
75 // we can't get the size of stdout, so just return 0
76 Ok(0)
77 }
78}
79
80impl IntoSize for BufWriter<File> {
81 fn into_size(self) -> Result<usize, io::Error> {
82 let file = self.into_inner()?;
83 let metadata = file.metadata()?;
84 Ok(metadata.len() as usize)
85 }
86}
87
88/// TPC-H table types
89///
90/// Represents the 8 tables in the TPC-H benchmark schema.
91/// Tables are ordered by size (smallest to largest at SF=1).
92#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
93pub enum Table {
94 /// Nation table (25 rows)
95 Nation,
96 /// Region table (5 rows)
97 Region,
98 /// Part table (200,000 rows at SF=1)
99 Part,
100 /// Supplier table (10,000 rows at SF=1)
101 Supplier,
102 /// Part-Supplier relationship table (800,000 rows at SF=1)
103 Partsupp,
104 /// Customer table (150,000 rows at SF=1)
105 Customer,
106 /// Orders table (1,500,000 rows at SF=1)
107 Orders,
108 /// Line item table (6,000,000 rows at SF=1)
109 Lineitem,
110}
111
112impl Display for Table {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 write!(f, "{}", self.name())
115 }
116}
117
118impl FromStr for Table {
119 type Err = &'static str;
120
121 /// Returns the table enum value from the given string full name or abbreviation
122 ///
123 /// The original dbgen tool allows some abbreviations to mean two different tables
124 /// like 'p' which aliases to both 'part' and 'partsupp'. This implementation does
125 /// not support this since it just adds unnecessary complexity and confusion so we
126 /// only support the exclusive abbreviations.
127 fn from_str(s: &str) -> Result<Self, Self::Err> {
128 match s {
129 "n" | "nation" => Ok(Table::Nation),
130 "r" | "region" => Ok(Table::Region),
131 "s" | "supplier" => Ok(Table::Supplier),
132 "P" | "part" => Ok(Table::Part),
133 "S" | "partsupp" => Ok(Table::Partsupp),
134 "c" | "customer" => Ok(Table::Customer),
135 "O" | "orders" => Ok(Table::Orders),
136 "L" | "lineitem" => Ok(Table::Lineitem),
137 _ => Err("Invalid table name {s}"),
138 }
139 }
140}
141
142impl Table {
143 fn name(&self) -> &'static str {
144 match self {
145 Table::Nation => "nation",
146 Table::Region => "region",
147 Table::Part => "part",
148 Table::Supplier => "supplier",
149 Table::Partsupp => "partsupp",
150 Table::Customer => "customer",
151 Table::Orders => "orders",
152 Table::Lineitem => "lineitem",
153 }
154 }
155}
156
157/// Output format for generated data
158///
159/// # Format Details
160///
161/// - **TBL**: Pipe-delimited format compatible with original dbgen tool
162/// - **CSV**: Comma-separated values with proper escaping
163/// - **Parquet**: Columnar Apache Parquet format with configurable compression
164#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
165pub enum OutputFormat {
166 /// TBL format (pipe-delimited, dbgen-compatible)
167 Tbl,
168 /// CSV format (comma-separated values)
169 Csv,
170 /// Apache Parquet format (columnar, compressed)
171 Parquet,
172}
173
174impl FromStr for OutputFormat {
175 type Err = String;
176
177 fn from_str(s: &str) -> Result<Self, Self::Err> {
178 match s.to_lowercase().as_str() {
179 "tbl" => Ok(OutputFormat::Tbl),
180 "csv" => Ok(OutputFormat::Csv),
181 "parquet" => Ok(OutputFormat::Parquet),
182 _ => Err(format!(
183 "Invalid output format: {s}. Valid formats are: tbl, csv, parquet"
184 )),
185 }
186 }
187}
188
189impl Display for OutputFormat {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 match self {
192 OutputFormat::Tbl => write!(f, "tbl"),
193 OutputFormat::Csv => write!(f, "csv"),
194 OutputFormat::Parquet => write!(f, "parquet"),
195 }
196 }
197}
198
199/// Configuration for TPC-H data generation
200///
201/// This struct holds all the parameters needed to generate TPC-H benchmark data.
202/// It's typically not constructed directly - use [`TpchGeneratorBuilder`] instead.
203///
204/// # Examples
205///
206/// ```no_run
207/// use tpchgen_cli::{GeneratorConfig, OutputFormat};
208///
209/// // Usually you would use TpchGenerator::builder() instead
210/// let config = GeneratorConfig {
211/// scale_factor: 10.0,
212/// ..Default::default()
213/// };
214/// ```
215#[derive(Debug, Clone)]
216pub struct GeneratorConfig {
217 /// Scale factor (e.g., 1.0 for 1GB, 10.0 for 10GB)
218 pub scale_factor: f64,
219 /// Output directory for generated files
220 pub output_dir: std::path::PathBuf,
221 /// Tables to generate (if None, generates all tables)
222 pub tables: Option<Vec<Table>>,
223 /// Output format (TBL, CSV, or Parquet)
224 pub format: OutputFormat,
225 /// Number of threads for parallel generation
226 pub num_threads: usize,
227 /// Parquet compression format
228 pub parquet_compression: Compression,
229 /// Target row group size in bytes for Parquet files
230 pub parquet_row_group_bytes: i64,
231 /// Number of partitions to generate (if None, generates a single file per table)
232 pub parts: Option<i32>,
233 /// Specific partition to generate (1-based, requires parts to be set)
234 pub part: Option<i32>,
235 /// Write output to stdout instead of files
236 pub stdout: bool,
237}
238
239impl Default for GeneratorConfig {
240 fn default() -> Self {
241 Self {
242 scale_factor: 1.0,
243 output_dir: std::path::PathBuf::from("."),
244 tables: None,
245 format: OutputFormat::Tbl,
246 num_threads: num_cpus::get(),
247 parquet_compression: Compression::SNAPPY,
248 parquet_row_group_bytes: DEFAULT_PARQUET_ROW_GROUP_BYTES,
249 parts: None,
250 part: None,
251 stdout: false,
252 }
253 }
254}
255
256/// TPC-H data generator
257///
258/// The main entry point for generating TPC-H benchmark data.
259/// Use the builder pattern via [`TpchGenerator::builder()`] to configure and create instances.
260///
261/// # Examples
262///
263/// ```no_run
264/// use tpchgen_cli::{TpchGenerator, Table, OutputFormat};
265/// use std::path::PathBuf;
266/// use ::parquet::basic::ZstdLevel;
267/// # async fn example() -> std::io::Result<()> {
268/// // Generate all tables at scale factor 1 in TBL format
269/// TpchGenerator::builder()
270/// .with_scale_factor(1.0)
271/// .with_output_dir(PathBuf::from("./data"))
272/// .build()
273/// .generate()
274/// .await?;
275///
276/// // Generate specific tables in Parquet format with compression
277/// TpchGenerator::builder()
278/// .with_scale_factor(10.0)
279/// .with_output_dir(PathBuf::from("./benchmark_data"))
280/// .with_tables(vec![Table::Orders, Table::Lineitem])
281/// .with_format(OutputFormat::Parquet)
282/// .with_parquet_compression(tpchgen_cli::Compression::ZSTD(ZstdLevel::try_new(1).unwrap()))
283/// .with_num_threads(16)
284/// .build()
285/// .generate()
286/// .await?;
287/// # Ok(())
288/// # }
289/// ```
290pub struct TpchGenerator {
291 config: GeneratorConfig,
292}
293
294impl TpchGenerator {
295 /// Create a new builder for configuring the generator
296 ///
297 /// This is the recommended way to construct a [`TpchGenerator`].
298 ///
299 /// # Examples
300 ///
301 /// ```no_run
302 /// use tpchgen_cli::TpchGenerator;
303 ///
304 /// let generator = TpchGenerator::builder()
305 /// .with_scale_factor(1.0)
306 /// .build();
307 /// ```
308 pub fn builder() -> TpchGeneratorBuilder {
309 TpchGeneratorBuilder::new()
310 }
311
312 /// Generate TPC-H data with the configured settings
313 ///
314 /// This async method performs the actual data generation, creating files
315 /// in the configured output directory (or writing to stdout if configured).
316 ///
317 /// # Returns
318 ///
319 /// - `Ok(())` on successful generation
320 /// - `Err(io::Error)` if file I/O or generation fails
321 ///
322 /// # Examples
323 ///
324 /// ```no_run
325 /// use tpchgen_cli::TpchGenerator;
326 ///
327 /// # async fn example() -> std::io::Result<()> {
328 /// TpchGenerator::builder()
329 /// .with_scale_factor(1.0)
330 /// .build()
331 /// .generate()
332 /// .await?;
333 /// # Ok(())
334 /// # }
335 /// ```
336 pub async fn generate(self) -> io::Result<()> {
337 use crate::output_plan::OutputPlanGenerator;
338 use crate::runner::PlanRunner;
339 use log::info;
340 use std::time::Instant;
341 use tpchgen::distribution::Distributions;
342 use tpchgen::text::TextPool;
343
344 let config = self.config;
345
346 // Create output directory if it doesn't exist and we are not writing to stdout
347 if !config.stdout {
348 std::fs::create_dir_all(&config.output_dir)?;
349 }
350
351 // Determine which tables to generate
352 let tables: Vec<Table> = if let Some(tables) = config.tables {
353 tables
354 } else {
355 vec![
356 Table::Nation,
357 Table::Region,
358 Table::Part,
359 Table::Supplier,
360 Table::Partsupp,
361 Table::Customer,
362 Table::Orders,
363 Table::Lineitem,
364 ]
365 };
366
367 // Determine what files to generate
368 let mut output_plan_generator = OutputPlanGenerator::new(
369 config.format,
370 config.scale_factor,
371 config.parquet_compression,
372 config.parquet_row_group_bytes,
373 config.stdout,
374 config.output_dir,
375 );
376
377 for table in tables {
378 output_plan_generator.generate_plans(table, config.part, config.parts)?;
379 }
380 let output_plans = output_plan_generator.build();
381
382 // Force the creation of the distributions and text pool so it doesn't
383 // get charged to the first table
384 let start = Instant::now();
385 Distributions::static_default();
386 TextPool::get_or_init_default();
387 let elapsed = start.elapsed();
388 info!("Created static distributions and text pools in {elapsed:?}");
389
390 // Run
391 let runner = PlanRunner::new(output_plans, config.num_threads);
392 runner.run().await?;
393 info!("Generation complete!");
394 Ok(())
395 }
396}
397
398/// Builder for constructing a [`TpchGenerator`]
399///
400/// Provides a fluent interface for configuring TPC-H data generation parameters.
401/// All builder methods can be chained, and calling [`build()`](TpchGeneratorBuilder::build)
402/// produces a [`TpchGenerator`] ready to generate data.
403///
404/// # Defaults
405///
406/// - Scale factor: 1.0
407/// - Output directory: current directory (".")
408/// - Tables: all 8 tables
409/// - Format: TBL
410/// - Threads: number of CPUs
411/// - Parquet compression: SNAPPY
412/// - Row group size: 7MB
413///
414/// # Examples
415///
416/// ```no_run
417/// use tpchgen_cli::{TpchGenerator, Table, OutputFormat, Compression};
418/// use std::path::PathBuf;
419/// use ::parquet::basic::ZstdLevel;
420///
421/// # async fn example() -> std::io::Result<()> {
422/// let generator = TpchGenerator::builder()
423/// .with_scale_factor(100.0)
424/// .with_output_dir(PathBuf::from("/data/tpch"))
425/// .with_tables(vec![Table::Lineitem, Table::Orders])
426/// .with_format(OutputFormat::Parquet)
427/// .with_parquet_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
428/// .with_num_threads(32)
429/// .build();
430///
431/// generator.generate().await?;
432/// # Ok(())
433/// # }
434/// ```
435#[derive(Debug, Clone)]
436pub struct TpchGeneratorBuilder {
437 config: GeneratorConfig,
438}
439
440impl TpchGeneratorBuilder {
441 /// Create a new builder with default configuration
442 ///
443 /// # Examples
444 ///
445 /// ```
446 /// use tpchgen_cli::TpchGeneratorBuilder;
447 ///
448 /// let builder = TpchGeneratorBuilder::new();
449 /// ```
450 pub fn new() -> Self {
451 Self {
452 config: GeneratorConfig::default(),
453 }
454 }
455
456 /// Returns the scale factor.
457 pub fn scale_factor(&self) -> f64 {
458 self.config.scale_factor
459 }
460
461 /// Set the scale factor (e.g., 1.0 for 1GB, 10.0 for 10GB)
462 pub fn with_scale_factor(mut self, scale_factor: f64) -> Self {
463 self.config.scale_factor = scale_factor;
464 self
465 }
466
467 /// Set the output directory
468 pub fn with_output_dir(mut self, output_dir: impl Into<std::path::PathBuf>) -> Self {
469 self.config.output_dir = output_dir.into();
470 self
471 }
472
473 /// Set which tables to generate (default: all tables)
474 pub fn with_tables(mut self, tables: Vec<Table>) -> Self {
475 self.config.tables = Some(tables);
476 self
477 }
478
479 /// Set the output format (default: TBL)
480 pub fn with_format(mut self, format: OutputFormat) -> Self {
481 self.config.format = format;
482 self
483 }
484
485 /// Set the number of threads for parallel generation (default: number of CPUs)
486 pub fn with_num_threads(mut self, num_threads: usize) -> Self {
487 self.config.num_threads = num_threads;
488 self
489 }
490
491 /// Set Parquet compression format (default: SNAPPY)
492 pub fn with_parquet_compression(mut self, compression: Compression) -> Self {
493 self.config.parquet_compression = compression;
494 self
495 }
496
497 /// Set target row group size in bytes for Parquet files (default: 7MB)
498 pub fn with_parquet_row_group_bytes(mut self, bytes: i64) -> Self {
499 self.config.parquet_row_group_bytes = bytes;
500 self
501 }
502
503 /// Set the number of partitions to generate
504 pub fn with_parts(mut self, parts: i32) -> Self {
505 self.config.parts = Some(parts);
506 self
507 }
508
509 /// Set the specific partition to generate (1-based, requires parts to be set)
510 pub fn with_part(mut self, part: i32) -> Self {
511 self.config.part = Some(part);
512 self
513 }
514
515 /// Write output to stdout instead of files
516 pub fn with_stdout(mut self, stdout: bool) -> Self {
517 self.config.stdout = stdout;
518 self
519 }
520
521 /// Build the [`TpchGenerator`] with the configured settings
522 pub fn build(self) -> TpchGenerator {
523 TpchGenerator {
524 config: self.config,
525 }
526 }
527}
528
529impl Default for TpchGeneratorBuilder {
530 fn default() -> Self {
531 Self::new()
532 }
533}