Skip to main content

tpcgen_cli/tpcds_cli/
dat.rs

1/*
2 * Licensed under the Apache License, Version 2.0 (the "License");
3 * you may not use this file except in compliance with the License.
4 * You may obtain a copy of the License at
5 *
6 *     http://www.apache.org/licenses/LICENSE-2.0
7 *
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
13 */
14
15//! TPC-DS Data Generator - Rust Implementation
16//!
17//! Generates TPC-DS benchmark data with byte-for-byte compatibility with the Java reference.
18
19use std::fs::File;
20use std::io::{BufWriter, Write};
21use std::path::Path;
22use std::time::Instant;
23
24use tpcdsgen::config::{Session, Table};
25use tpcdsgen::output::CompatWriter;
26use tpcdsgen::row::*;
27use tpcdsgen::types::Date;
28
29type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
30
31/// Generate TPC-DS data in DAT format.
32pub fn generate(session: &Session) -> Result<()> {
33    println!("TPC-DS Data Generator (Rust)");
34    println!("Scale factor: {}", session.get_scaling().get_scale());
35    println!("Output directory: {}", session.get_target_directory());
36
37    let start = Instant::now();
38
39    if session.generate_only_one_table() {
40        let table = session.get_only_table_to_generate();
41        generate_table(table, session)?;
42    } else {
43        for table in Table::main_tables() {
44            generate_table(table, session)?;
45        }
46    }
47
48    let elapsed = start.elapsed();
49    println!("\nCompleted in {:.2}s", elapsed.as_secs_f64());
50
51    Ok(())
52}
53
54fn generate_table(table: Table, session: &Session) -> Result<()> {
55    match table {
56        // Simple dimension tables
57        Table::CallCenter => generate_simple::<CallCenterRowGenerator>(table, session),
58        Table::CatalogPage => generate_simple::<CatalogPageRowGenerator>(table, session),
59        Table::Customer => generate_simple::<CustomerRowGenerator>(table, session),
60        Table::CustomerAddress => generate_simple::<CustomerAddressRowGenerator>(table, session),
61        Table::CustomerDemographics => {
62            generate_simple::<CustomerDemographicsRowGenerator>(table, session)
63        }
64        Table::DateDim => generate_simple::<DateDimRowGenerator>(table, session),
65        Table::DbgenVersion => generate_simple::<DbgenVersionRowGenerator>(table, session),
66        Table::HouseholdDemographics => {
67            generate_simple::<HouseholdDemographicsRowGenerator>(table, session)
68        }
69        Table::IncomeBand => generate_simple::<IncomeBandRowGenerator>(table, session),
70        Table::Item => generate_simple::<ItemRowGenerator>(table, session),
71        Table::Promotion => generate_simple::<PromotionRowGenerator>(table, session),
72        Table::Reason => generate_simple::<ReasonRowGenerator>(table, session),
73        Table::ShipMode => generate_simple::<ShipModeRowGenerator>(table, session),
74        Table::Store => generate_simple::<StoreRowGenerator>(table, session),
75        Table::TimeDim => generate_simple::<TimeDimRowGenerator>(table, session),
76        Table::Warehouse => generate_simple::<WarehouseRowGenerator>(table, session),
77        Table::WebPage => generate_simple::<WebPageRowGenerator>(table, session),
78        Table::WebSite => generate_simple::<WebSiteRowGenerator>(table, session),
79
80        // Sales + Returns pairs
81        Table::StoreSales => generate_store_sales(session),
82        Table::StoreReturns => Ok(()), // Generated with StoreSales
83        Table::CatalogSales => generate_catalog_sales(session),
84        Table::CatalogReturns => Ok(()), // Generated with CatalogSales
85        Table::WebSales => generate_web_sales(session),
86        Table::WebReturns => Ok(()), // Generated with WebSales
87
88        // Special tables
89        Table::Inventory => generate_inventory(session),
90
91        // Source tables - skip
92        _ => Ok(()),
93    }
94}
95
96/// Trait for creating row generators
97trait RowGeneratorFactory: RowGenerator + Sized {
98    fn create() -> Self;
99}
100
101// Implement factory for all simple generators
102macro_rules! impl_factory {
103    ($($gen:ty),*) => {
104        $(
105            impl RowGeneratorFactory for $gen {
106                fn create() -> Self { Self::new() }
107            }
108        )*
109    };
110}
111
112impl_factory!(
113    CallCenterRowGenerator,
114    CatalogPageRowGenerator,
115    CustomerRowGenerator,
116    CustomerAddressRowGenerator,
117    CustomerDemographicsRowGenerator,
118    DateDimRowGenerator,
119    DbgenVersionRowGenerator,
120    HouseholdDemographicsRowGenerator,
121    IncomeBandRowGenerator,
122    ItemRowGenerator,
123    PromotionRowGenerator,
124    ReasonRowGenerator,
125    ShipModeRowGenerator,
126    StoreRowGenerator,
127    TimeDimRowGenerator,
128    WarehouseRowGenerator,
129    WebPageRowGenerator,
130    WebSiteRowGenerator
131);
132
133/// Generate a simple table (one row per row_number, no child tables)
134fn generate_simple<G: RowGeneratorFactory>(table: Table, session: &Session) -> Result<()> {
135    let mut generator = G::create();
136    let row_count = session.get_scaling().get_row_count(table);
137
138    let path = get_output_path(table, session);
139    let file = File::create(&path)?;
140    let mut writer = CompatWriter::new(BufWriter::new(file), session.get_compat_mode());
141
142    print!("Generating {}... ", table.get_name());
143    std::io::stdout().flush()?;
144
145    for row_number in 1..=row_count {
146        let result = generator.generate_row_and_child_rows(row_number, session, None, None)?;
147
148        for row in result.get_rows() {
149            row.write_to(&mut writer, session.get_separator())?;
150        }
151
152        generator.consume_remaining_seeds_for_row();
153    }
154
155    writer.flush()?;
156    println!("{} rows -> {}", row_count, path.display());
157
158    Ok(())
159}
160
161/// Generate store_sales and store_returns together
162fn generate_store_sales(session: &Session) -> Result<()> {
163    let mut generator = StoreSalesRowGenerator::new();
164    let num_orders = session.get_scaling().get_row_count(Table::StoreSales);
165
166    let sales_path = get_output_path(Table::StoreSales, session);
167    let returns_path = get_output_path(Table::StoreReturns, session);
168
169    let compat_mode = session.get_compat_mode();
170    let mut sales_writer =
171        CompatWriter::new(BufWriter::new(File::create(&sales_path)?), compat_mode);
172    let mut returns_writer =
173        CompatWriter::new(BufWriter::new(File::create(&returns_path)?), compat_mode);
174
175    print!("Generating store_sales + store_returns... ");
176    std::io::stdout().flush()?;
177
178    let mut sales_count = 0i64;
179    let mut returns_count = 0i64;
180    let mut row_number = 1i64;
181
182    while row_number <= num_orders {
183        let result = generator.generate_row_and_child_rows(row_number, session, None, None)?;
184        let rows = result.get_rows();
185
186        if !rows.is_empty() {
187            rows[0].write_to(&mut sales_writer, session.get_separator())?;
188            sales_count += 1;
189        }
190
191        if rows.len() > 1 {
192            rows[1].write_to(&mut returns_writer, session.get_separator())?;
193            returns_count += 1;
194        }
195
196        if result.should_end_row() {
197            generator.consume_remaining_seeds_for_row();
198            row_number += 1;
199        }
200    }
201
202    sales_writer.flush()?;
203    returns_writer.flush()?;
204
205    println!(
206        "{} sales, {} returns -> {}, {}",
207        sales_count,
208        returns_count,
209        sales_path.display(),
210        returns_path.display()
211    );
212
213    Ok(())
214}
215
216/// Generate catalog_sales and catalog_returns together
217fn generate_catalog_sales(session: &Session) -> Result<()> {
218    let mut generator = CatalogSalesRowGenerator::new();
219    let num_orders = session.get_scaling().get_row_count(Table::CatalogSales);
220
221    let sales_path = get_output_path(Table::CatalogSales, session);
222    let returns_path = get_output_path(Table::CatalogReturns, session);
223
224    let compat_mode = session.get_compat_mode();
225    let mut sales_writer =
226        CompatWriter::new(BufWriter::new(File::create(&sales_path)?), compat_mode);
227    let mut returns_writer =
228        CompatWriter::new(BufWriter::new(File::create(&returns_path)?), compat_mode);
229
230    print!("Generating catalog_sales + catalog_returns... ");
231    std::io::stdout().flush()?;
232
233    let mut sales_count = 0i64;
234    let mut returns_count = 0i64;
235    let mut row_number = 1i64;
236
237    while row_number <= num_orders {
238        let result = generator.generate_row_and_child_rows(row_number, session, None, None)?;
239        let rows = result.get_rows();
240
241        if !rows.is_empty() {
242            rows[0].write_to(&mut sales_writer, session.get_separator())?;
243            sales_count += 1;
244        }
245
246        if rows.len() > 1 {
247            rows[1].write_to(&mut returns_writer, session.get_separator())?;
248            returns_count += 1;
249        }
250
251        if result.should_end_row() {
252            generator.consume_remaining_seeds_for_row();
253            row_number += 1;
254        }
255    }
256
257    sales_writer.flush()?;
258    returns_writer.flush()?;
259
260    println!(
261        "{} sales, {} returns -> {}, {}",
262        sales_count,
263        returns_count,
264        sales_path.display(),
265        returns_path.display()
266    );
267
268    Ok(())
269}
270
271/// Generate web_sales and web_returns together
272fn generate_web_sales(session: &Session) -> Result<()> {
273    let mut generator = WebSalesRowGenerator::new();
274    let num_orders = session.get_scaling().get_row_count(Table::WebSales);
275
276    let sales_path = get_output_path(Table::WebSales, session);
277    let returns_path = get_output_path(Table::WebReturns, session);
278
279    let compat_mode = session.get_compat_mode();
280    let mut sales_writer =
281        CompatWriter::new(BufWriter::new(File::create(&sales_path)?), compat_mode);
282    let mut returns_writer =
283        CompatWriter::new(BufWriter::new(File::create(&returns_path)?), compat_mode);
284
285    print!("Generating web_sales + web_returns... ");
286    std::io::stdout().flush()?;
287
288    let mut sales_count = 0i64;
289    let mut returns_count = 0i64;
290    let mut row_number = 1i64;
291
292    while row_number <= num_orders {
293        let result = generator.generate_row_and_child_rows(row_number, session, None, None)?;
294        let rows = result.get_rows();
295
296        if !rows.is_empty() {
297            rows[0].write_to(&mut sales_writer, session.get_separator())?;
298            sales_count += 1;
299        }
300
301        if rows.len() > 1 {
302            rows[1].write_to(&mut returns_writer, session.get_separator())?;
303            returns_count += 1;
304        }
305
306        if result.should_end_row() {
307            generator.consume_remaining_seeds_for_row();
308            row_number += 1;
309        }
310    }
311
312    sales_writer.flush()?;
313    returns_writer.flush()?;
314
315    println!(
316        "{} sales, {} returns -> {}, {}",
317        sales_count,
318        returns_count,
319        sales_path.display(),
320        returns_path.display()
321    );
322
323    Ok(())
324}
325
326/// Generate inventory table (special row count calculation)
327fn generate_inventory(session: &Session) -> Result<()> {
328    let mut generator = InventoryRowGenerator::new();
329    let scaling = session.get_scaling();
330
331    let item_count = scaling.get_id_count(Table::Item);
332    let warehouse_count = scaling.get_row_count(Table::Warehouse);
333    let n_days = Date::JULIAN_DATE_MAXIMUM - Date::JULIAN_DATE_MINIMUM;
334    let n_weeks = (n_days + 7) / 7;
335    let num_rows = item_count * warehouse_count * n_weeks as i64;
336
337    let path = get_output_path(Table::Inventory, session);
338    let mut writer = CompatWriter::new(
339        BufWriter::new(File::create(&path)?),
340        session.get_compat_mode(),
341    );
342
343    print!("Generating inventory... ");
344    std::io::stdout().flush()?;
345
346    for row_number in 1..=num_rows {
347        let result = generator.generate_row_and_child_rows(row_number, session, None, None)?;
348
349        for row in result.get_rows() {
350            row.write_to(&mut writer, session.get_separator())?;
351        }
352
353        generator.consume_remaining_seeds_for_row();
354    }
355
356    writer.flush()?;
357    println!("{} rows -> {}", num_rows, path.display());
358
359    Ok(())
360}
361
362/// Get output file path for a table
363fn get_output_path(table: Table, session: &Session) -> std::path::PathBuf {
364    Path::new(session.get_target_directory()).join(format!(
365        "{}{}",
366        table.get_name(),
367        session.get_suffix()
368    ))
369}