tpchgen_cli/
plan.rs

1//! * [`GenerationPlan`]: how to generate a specific TPC-H dataset.
2
3use crate::{OutputFormat, Table};
4use log::debug;
5use std::fmt::Display;
6use std::ops::RangeInclusive;
7use tpchgen::generators::{
8    CustomerGenerator, OrderGenerator, PartGenerator, PartSuppGenerator, SupplierGenerator,
9};
10
11/// A list of generator "parts" (data generator chunks, not TPCH parts) for a
12/// single output file.
13///
14/// Controls the parallelization and layout of Parquet files in `tpchgen-cli`.
15///
16/// # Background
17///
18/// A "part" is a logical partition of a particular output table. Each data
19/// generator can create parts individually.
20///
21/// For example, the parameters to [`OrderGenerator::new`] `scale_factor,
22/// `part_count` and `part_count` together define a partition of the `Order`
23/// table.
24///
25/// The entire output table results from generating each of the `part_count` parts. For
26/// example, if `part_count` is 10, appending parts 1 to 10 results in a
27/// complete `Order` table.
28///
29/// Interesting properties of parts:
30/// 1. They are independent of each other, so they can be generated in parallel.
31/// 2. They scale. So for example, parts `0..10` with a `part_count` of 50
32///    will generate the same data as parts `1` with a `part_count` of 5.
33///
34/// # Implication for tpchgen-cli
35///
36/// For `tbl` and `csv` files, tpchgen-cli generates `num-threads` parts in
37/// parallel.
38///
39/// For Parquet files, the output file has one row group for each "part".
40///
41/// # Example
42/// ```
43/// use tpchgen_cli::{GenerationPlan, OutputFormat, Table};
44///
45/// let plan = GenerationPlan::try_new(
46///   Table::Orders,
47///   OutputFormat::Parquet,
48///   1.0, // scale factor
49///   Some(-1), // cli_part
50///   Some(-1), // cli_parts
51///    0,
52///  );
53/// let results = plan.into_iter().collect::<Vec<_>>();
54/// /// assert_eq!(results.len(), 1);
55/// ```
56#[derive(Debug, Clone, PartialEq)]
57pub struct GenerationPlan {
58    /// Total number of parts to generate
59    part_count: i32,
60    /// List of parts (1..=part_count)
61    part_list: RangeInclusive<i32>,
62}
63
64pub const DEFAULT_PARQUET_ROW_GROUP_BYTES: i64 = 7 * 1024 * 1024;
65
66impl GenerationPlan {
67    /// Returns a GenerationPlan number of parts to generate
68    ///
69    /// # Arguments
70    /// * `cli_part`: optional part number to generate (1-based), `--part` CLI argument
71    /// * `cli_part_count`: optional total number of parts, `--parts` CLI argument
72    /// * `parquet_row_group_size`: optional parquet row group size, `--parquet-row-group-size` CLI argument
73    pub fn try_new(
74        table: Table,
75        format: OutputFormat,
76        scale_factor: f64,
77        cli_part: Option<i32>,
78        cli_part_count: Option<i32>,
79        parquet_row_group_bytes: i64,
80    ) -> Result<Self, String> {
81        // If a single part is specified, split it into chunks to enable parallel generation.
82        match (cli_part, cli_part_count) {
83            (Some(_part), None) => Err(String::from(
84                "The --part option requires the --parts option to be set",
85            )),
86            (None, Some(_part_count)) => {
87                // TODO automatically create multiple files if part_count > 1
88                // and part is not specified
89                Err(String::from(
90                    "The --part_count option requires the --part option to be set",
91                ))
92            }
93            (Some(part), Some(part_count)) => Self::try_new_with_parts(
94                table,
95                format,
96                scale_factor,
97                part,
98                part_count,
99                parquet_row_group_bytes,
100            ),
101            (None, None) => {
102                Self::try_new_without_parts(table, format, scale_factor, parquet_row_group_bytes)
103            }
104        }
105    }
106
107    /// Return true if the tables is unpartitionable (not parameterized by part
108    /// count)
109    pub fn partitioned_table(table: Table) -> bool {
110        table != Table::Nation && table != Table::Region
111    }
112
113    /// Returns a new `GenerationPlan` when partitioning
114    ///
115    /// See [`GenerationPlan::try_new`] for argument documentation.
116    fn try_new_with_parts(
117        table: Table,
118        format: OutputFormat,
119        scale_factor: f64,
120        cli_part: i32,
121        cli_part_count: i32,
122        parquet_row_group_bytes: i64,
123    ) -> Result<Self, String> {
124        if cli_part < 1 {
125            return Err(format!(
126                "Invalid --part. Expected a number greater than zero, got {cli_part}"
127            ));
128        }
129        if cli_part_count < 1 {
130            return Err(format!(
131                "Invalid --part_count. Expected a number greater than zero, got {cli_part_count}"
132            ));
133        }
134        if cli_part > cli_part_count {
135            return Err(format!(
136                    "Invalid --part. Expected at most the value of --parts ({cli_part_count}), got {cli_part}"));
137        }
138
139        // These tables are so small they are not parameterized by part count,
140        // so only a single part.
141        if !Self::partitioned_table(table) {
142            return Ok(Self {
143                part_count: 1,
144                part_list: 1..=1,
145            });
146        }
147
148        // scale down the row count by the number of partitions being generated
149        // so that the output is consistent with the original part count
150        let num_chunks = OutputSize::new(table, scale_factor, format, parquet_row_group_bytes)
151            .with_scaled_row_count(cli_part_count)
152            .part_count();
153
154        // The new total number of partitions is the original number of
155        // partitions multiplied by the number of chunks.
156        let new_total_parts = cli_part_count * num_chunks;
157
158        // The new partitions to generate correspond to the chunks that make up
159        // the original part.
160        //
161        // So for example, if the original partition count was 10 and the part was 2
162        // and the number of chunks is 5, then:
163        //
164        // * new_total_parts = 10 * 5 = 50
165        // * new_parts_to_generate = (2-1)*5+1 ..= 2*5 = 6..=10
166        let start_part = (cli_part - 1) * num_chunks + 1;
167        let end_part = cli_part * num_chunks;
168        let new_parts_to_generate = start_part..=end_part;
169        debug!(
170            "User specified cli_parts={cli_part_count}, cli_part={cli_part}. \
171            Generating {new_total_parts} partitions for table {table:?} \
172            with scale factor {scale_factor}: {new_parts_to_generate:?}"
173        );
174        Ok(Self {
175            part_count: new_total_parts,
176            part_list: new_parts_to_generate,
177        })
178    }
179
180    /// Returns a new `GenerationPlan` when no partitioning is specified on the command line
181    fn try_new_without_parts(
182        table: Table,
183        format: OutputFormat,
184        scale_factor: f64,
185        parquet_row_group_bytes: i64,
186    ) -> Result<Self, String> {
187        let output_size = OutputSize::new(table, scale_factor, format, parquet_row_group_bytes);
188        let num_parts = output_size.part_count();
189
190        Ok(Self {
191            part_count: num_parts,
192            part_list: 1..=num_parts,
193        })
194    }
195
196    /// Return the number of part(ititions) this plan will generate
197    pub fn chunk_count(&self) -> usize {
198        self.part_list.clone().count()
199    }
200}
201
202/// Converts the `GenerationPlan` into an iterator of (part_number, num_parts)
203impl IntoIterator for GenerationPlan {
204    type Item = (i32, i32);
205    type IntoIter = std::vec::IntoIter<Self::Item>;
206
207    fn into_iter(self) -> Self::IntoIter {
208        self.part_list
209            .map(|part_number| (part_number, self.part_count))
210            .collect::<Vec<_>>()
211            .into_iter()
212    }
213}
214
215impl Display for GenerationPlan {
216    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217        write!(f, "GenerationPlan for {} parts", self.part_count)
218    }
219}
220
221/// output size of a table
222#[derive(Debug)]
223struct OutputSize {
224    /// Average row size in bytes
225    avg_row_size_bytes: i64,
226    /// Number of rows in the table
227    row_count: i64,
228    /// output target chunk size in bytes
229    target_chunk_size_bytes: i64,
230    /// maximum part count, if any
231    max_part_count: Option<i64>,
232}
233
234impl OutputSize {
235    pub fn new(
236        table: Table,
237        scale_factor: f64,
238        format: OutputFormat,
239        parquet_row_group_bytes: i64,
240    ) -> Self {
241        let row_count = Self::row_count_for_table(table, scale_factor);
242
243        // The average row size in bytes for each table in the TPC-H schema
244        // this was determined by sampling the data
245        let avg_row_size_bytes = match format {
246            OutputFormat::Tbl | OutputFormat::Csv => match table {
247                Table::Nation => 88,
248                Table::Region => 77,
249                Table::Part => 115,
250                Table::Supplier => 140,
251                Table::Partsupp => 148,
252                Table::Customer => 160,
253                Table::Orders => 114,
254                Table::Lineitem => 128,
255            },
256            // Average row size in bytes for each table at scale factor 1.0
257            // computed using datafusion-cli:
258            // ```shell
259            // datafusion-cli -c "datafusion-cli -c "select row_group_id, count(*), min(row_group_bytes)::float/min(row_group_num_rows)::float as bytes_per_row from parquet_metadata('lineitem.parquet') GROUP BY 1 ORDER BY 1""
260            // ```
261            OutputFormat::Parquet => match table {
262                Table::Nation => 117,
263                Table::Region => 151,
264                Table::Part => 70,
265                Table::Supplier => 164,
266                Table::Partsupp => 141 * 4, // needed to match observed size
267                Table::Customer => 168,
268                Table::Orders => 75,
269                Table::Lineitem => 64,
270            },
271        };
272
273        let target_chunk_size_bytes = match format {
274            // for tbl/csv target chunks, this value does not affect the output
275            // file. Use 15MB, slightly smaller than the 16MB buffer size,  to
276            // ensure small overages don't exceed the buffer size and require a
277            // reallocation
278            OutputFormat::Tbl | OutputFormat::Csv => 15 * 1024 * 1024,
279            OutputFormat::Parquet => parquet_row_group_bytes,
280        };
281
282        // parquet files can have at most 32767 row groups so cap the number of parts at that number
283        let max_part_count = match format {
284            OutputFormat::Tbl | OutputFormat::Csv => None,
285            OutputFormat::Parquet => Some(32767),
286        };
287
288        debug!(
289            "Output size for table {table:?} with scale factor {scale_factor}: \
290                avg_row_size_bytes={avg_row_size_bytes}, row_count={row_count} \
291                target_chunk_size_bytes={target_chunk_size_bytes}, max_part_count={max_part_count:?}",
292        );
293
294        OutputSize {
295            avg_row_size_bytes,
296            row_count,
297            target_chunk_size_bytes,
298            max_part_count,
299        }
300    }
301
302    /// Return the number of parts to generate
303    pub fn part_count(&self) -> i32 {
304        let mut num_parts =
305            ((self.row_count * self.avg_row_size_bytes) / self.target_chunk_size_bytes) + 1; // +1 to ensure we have at least one part
306
307        if let Some(max_part_count) = self.max_part_count {
308            // if the max part count is set, cap the number of parts at that number
309            num_parts = num_parts.min(max_part_count)
310        }
311
312        // convert to i32
313        num_parts.try_into().unwrap()
314    }
315
316    /// Scale the row count for the output by the number of partitions
317    ///
318    /// So for example if the row count is 1000 and the number of partitions is 10,
319    /// the scaled row count will be 100.
320    pub fn with_scaled_row_count(&self, cli_part_count: i32) -> OutputSize {
321        // scale the row count by the number of partitions being generated
322        let scaled_row_count = self.row_count / cli_part_count as i64;
323        debug!(
324            "Scaling row count from {} to {scaled_row_count}",
325            self.row_count,
326        );
327        OutputSize {
328            avg_row_size_bytes: self.avg_row_size_bytes,
329            row_count: scaled_row_count,
330            target_chunk_size_bytes: self.target_chunk_size_bytes,
331            max_part_count: self.max_part_count,
332        }
333    }
334
335    fn row_count_for_table(table: Table, scale_factor: f64) -> i64 {
336        //let (avg_row_size_bytes, row_count) = match table {
337        match table {
338            Table::Nation => 1,
339            Table::Region => 1,
340            Table::Part => PartGenerator::calculate_row_count(scale_factor, 1, 1),
341            Table::Supplier => SupplierGenerator::calculate_row_count(scale_factor, 1, 1),
342            Table::Partsupp => PartSuppGenerator::calculate_row_count(scale_factor, 1, 1),
343            Table::Customer => CustomerGenerator::calculate_row_count(scale_factor, 1, 1),
344            Table::Orders => OrderGenerator::calculate_row_count(scale_factor, 1, 1),
345            Table::Lineitem => {
346                // there are on average 4 line items per order.
347                // For example, in SF=10,
348                // * orders has 15,000,000 rows
349                // * lineitem has around 60,000,000 rows
350                4 * OrderGenerator::calculate_row_count(scale_factor, 1, 1)
351            }
352        }
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359
360    // Default layouts for generating TPC-H tables (tbl/csv format)
361    // These tests explain the default layouts for each table (e.g. row groups in parquet)
362
363    mod default_layouts {
364        use super::*;
365        #[test]
366        fn tbl_sf1_default_nation() {
367            Test::new()
368                .with_table(Table::Nation)
369                .with_format(OutputFormat::Tbl)
370                .with_scale_factor(1.0)
371                .assert(1, 1..=1)
372        }
373
374        #[test]
375        fn tbl_sf1_default_region() {
376            Test::new()
377                .with_table(Table::Region)
378                .with_format(OutputFormat::Tbl)
379                .with_scale_factor(1.0)
380                .assert(1, 1..=1)
381        }
382
383        #[test]
384        fn tbl_sf1_default_part() {
385            Test::new()
386                .with_table(Table::Part)
387                .with_format(OutputFormat::Tbl)
388                .with_scale_factor(1.0)
389                .assert(2, 1..=2)
390        }
391
392        #[test]
393        fn tbl_sf1_default_supplier() {
394            Test::new()
395                .with_table(Table::Supplier)
396                .with_format(OutputFormat::Tbl)
397                .with_scale_factor(1.0)
398                .assert(1, 1..=1)
399        }
400
401        #[test]
402        fn tbl_sf1_default_partsupp() {
403            Test::new()
404                .with_table(Table::Partsupp)
405                .with_format(OutputFormat::Tbl)
406                .with_scale_factor(1.0)
407                .assert(2, 1..=2)
408        }
409
410        #[test]
411        fn tbl_sf1_default_customer() {
412            Test::new()
413                .with_table(Table::Customer)
414                .with_format(OutputFormat::Tbl)
415                .with_scale_factor(1.0)
416                .assert(2, 1..=2)
417        }
418
419        #[test]
420        fn tbl_sf1_default_orders() {
421            Test::new()
422                .with_table(Table::Orders)
423                .with_format(OutputFormat::Tbl)
424                .with_scale_factor(1.0)
425                .assert(11, 1..=11)
426        }
427
428        #[test]
429        fn tbl_sf1_default_lineitem() {
430            Test::new()
431                .with_table(Table::Lineitem)
432                .with_format(OutputFormat::Tbl)
433                .with_scale_factor(1.0)
434                .assert(49, 1..=49)
435        }
436
437        #[test]
438        fn parquet_sf1_default_nation() {
439            Test::new()
440                .with_table(Table::Nation)
441                .with_format(OutputFormat::Parquet)
442                .with_scale_factor(1.0)
443                .assert(1, 1..=1)
444        }
445
446        #[test]
447        fn parquet_sf1_default_region() {
448            Test::new()
449                .with_table(Table::Region)
450                .with_format(OutputFormat::Parquet)
451                .with_scale_factor(1.0)
452                .assert(1, 1..=1)
453        }
454
455        #[test]
456        fn parquet_sf1_default_part() {
457            Test::new()
458                .with_table(Table::Part)
459                .with_format(OutputFormat::Parquet)
460                .with_scale_factor(1.0)
461                .assert(2, 1..=2)
462        }
463
464        #[test]
465        fn parquet_sf1_default_supplier() {
466            Test::new()
467                .with_table(Table::Supplier)
468                .with_format(OutputFormat::Parquet)
469                .with_scale_factor(1.0)
470                .assert(1, 1..=1)
471        }
472
473        #[test]
474        fn parquet_sf1_default_partsupp() {
475            Test::new()
476                .with_table(Table::Partsupp)
477                .with_format(OutputFormat::Parquet)
478                .with_scale_factor(1.0)
479                .assert(16, 1..=16)
480        }
481
482        #[test]
483        fn parquet_sf1_default_customer() {
484            Test::new()
485                .with_table(Table::Customer)
486                .with_format(OutputFormat::Parquet)
487                .with_scale_factor(1.0)
488                .assert(4, 1..=4)
489        }
490
491        #[test]
492        fn parquet_sf1_default_orders() {
493            Test::new()
494                .with_table(Table::Orders)
495                .with_format(OutputFormat::Parquet)
496                .with_scale_factor(1.0)
497                .assert(16, 1..=16)
498        }
499
500        #[test]
501        fn parquet_sf1_default_lineitem() {
502            Test::new()
503                .with_table(Table::Lineitem)
504                .with_format(OutputFormat::Parquet)
505                .with_scale_factor(1.0)
506                .assert(53, 1..=53)
507        }
508    }
509
510    // Test plans with CLI parts and partition counts
511    mod partitions {
512        use super::*;
513
514        #[test]
515        fn tbl_sf1_nation_cli_parts() {
516            Test::new()
517                .with_table(Table::Nation)
518                .with_format(OutputFormat::Tbl)
519                .with_scale_factor(1.0)
520                // nation table is small, so it can not be made in parts
521                .with_cli_part(1)
522                .with_cli_part_count(10)
523                // we expect there is still only one part
524                .assert(1, 1..=1)
525        }
526
527        #[test]
528        fn tbl_sf1_region_cli_parts() {
529            Test::new()
530                .with_table(Table::Region)
531                .with_format(OutputFormat::Tbl)
532                .with_scale_factor(1.0)
533                // region table is small, so it can not be made in parts
534                .with_cli_part(1)
535                .with_cli_part_count(10)
536                // we expect there is still only one part
537                .assert(1, 1..=1)
538        }
539
540        #[test]
541        fn tbl_sf1_lineitem_cli_parts_1() {
542            Test::new()
543                .with_table(Table::Lineitem)
544                .with_format(OutputFormat::Tbl)
545                .with_scale_factor(1.0)
546                // Generate only part 1 of the lineitem table, but results in 10 partititions
547                .with_cli_part(1)
548                .with_cli_part_count(10)
549                .assert(50, 1..=5)
550        }
551
552        #[test]
553        fn tbl_sf1_lineitem_cli_parts_4() {
554            Test::new()
555                .with_table(Table::Lineitem)
556                .with_format(OutputFormat::Tbl)
557                .with_scale_factor(1.0)
558                .with_cli_part(4) // part 4 of 10
559                .with_cli_part_count(10)
560                .assert(50, 16..=20)
561        }
562
563        #[test]
564        fn parquet_sf1_region_cli_parts() {
565            Test::new()
566                .with_table(Table::Region)
567                .with_format(OutputFormat::Parquet)
568                .with_scale_factor(1.0)
569                // region table is small, so it can not be made in parts
570                .with_cli_part(1)
571                .with_cli_part_count(10)
572                // we expect there is still only one part
573                .assert(1, 1..=1)
574        }
575
576        #[test]
577        fn parquet_sf1_lineitem_cli_parts_1() {
578            Test::new()
579                .with_table(Table::Lineitem)
580                .with_format(OutputFormat::Parquet)
581                .with_scale_factor(1.0)
582                // Generate only part 1 of the lineitem table
583                .with_cli_part(1)
584                .with_cli_part_count(10)
585                // we expect to generate the first 6 / 60 row groups (1/10)
586                .assert(60, 1..=6)
587        }
588
589        #[test]
590        fn parquet_sf1_lineitem_cli_parts_4() {
591            Test::new()
592                .with_table(Table::Lineitem)
593                .with_format(OutputFormat::Parquet)
594                .with_scale_factor(1.0)
595                .with_cli_part(4) // part 4 of 10
596                .with_cli_part_count(10)
597                // we expect to generate the 4th set of row groups
598                .assert(60, 19..=24)
599        }
600
601        #[test]
602        fn parquet_sf1_lineitem_cli_parts_10() {
603            Test::new()
604                .with_table(Table::Lineitem)
605                .with_format(OutputFormat::Parquet)
606                .with_scale_factor(1.0)
607                .with_cli_part(10) // part 10 of 10
608                .with_cli_part_count(10)
609                // expect the last 6 row groups
610                .assert(60, 55..=60)
611        }
612
613        #[test]
614        fn tbl_sf1_lineitem_cli_invalid_part() {
615            Test::new()
616                .with_table(Table::Lineitem)
617                .with_format(OutputFormat::Tbl)
618                .with_scale_factor(1.0)
619                .with_cli_part(0) // part 0 of 10 (invalid)
620                .with_cli_part_count(10)
621                .assert_err("Invalid --part. Expected a number greater than zero, got 0")
622        }
623    }
624
625    //  Error cases for invalid CLI parts and partition
626    mod errors {
627        use super::*;
628
629        #[test]
630        fn sf1_lineitem_cli_invalid_part() {
631            Test::new()
632                .with_table(Table::Lineitem)
633                .with_format(OutputFormat::Tbl)
634                .with_scale_factor(1.0)
635                .with_cli_part(0) // part 0 of 10 (invalid)
636                .with_cli_part_count(10)
637                .assert_err("Invalid --part. Expected a number greater than zero, got 0")
638        }
639
640        #[test]
641        fn tbl_sf1_lineitem_cli_parts_invalid_big() {
642            Test::new()
643                .with_table(Table::Lineitem)
644                .with_format(OutputFormat::Tbl)
645                .with_scale_factor(1.0)
646                .with_cli_part(11) // part 11 of 10 (invalid)
647                .with_cli_part_count(10)
648                .assert_err("Invalid --part. Expected at most the value of --parts (10), got 11");
649        }
650
651        #[test]
652        fn tbl_sf1_lineitem_cli_invalid_part_count() {
653            Test::new()
654                .with_table(Table::Lineitem)
655                .with_format(OutputFormat::Tbl)
656                .with_scale_factor(1.0)
657                .with_cli_part(1) // part 0 of 0 (invalid)
658                .with_cli_part_count(0)
659                .assert_err("Invalid --part_count. Expected a number greater than zero, got 0");
660        }
661    }
662
663    // test the row group limits for parquet
664    mod limits {
665        use super::*;
666        #[test]
667        fn parquet_sf10_lineitem_limit() {
668            Test::new()
669                .with_table(Table::Lineitem)
670                .with_format(OutputFormat::Parquet)
671                .with_scale_factor(10.0)
672                .assert(524, 1..=524);
673        }
674
675        #[test]
676        fn tbl_sf10_lineitem_limit() {
677            Test::new()
678                .with_table(Table::Lineitem)
679                .with_format(OutputFormat::Tbl)
680                .with_scale_factor(10.0)
681                .assert(489, 1..=489);
682        }
683        #[test]
684        fn tbl_sf1000_lineitem_limit() {
685            Test::new()
686                .with_table(Table::Lineitem)
687                .with_format(OutputFormat::Tbl)
688                .with_scale_factor(1000.0)
689                .assert(48829, 1..=48829);
690        }
691
692        #[test]
693        fn parquet_sf1000_lineitem_limit() {
694            Test::new()
695                .with_table(Table::Lineitem)
696                .with_format(OutputFormat::Parquet)
697                .with_scale_factor(1000.0)
698                .assert(32767, 1..=32767);
699        }
700
701        // If we make a really large lineitem table, we can generate it in parts that will also go
702        // in a large number of row groups, but still limited to 32k row groups in total.
703        #[test]
704        fn parquet_sf1000_lineitem_cli_parts_limit() {
705            let expected_parts = 15697..=20928;
706            Test::new()
707                .with_table(Table::Lineitem)
708                .with_format(OutputFormat::Parquet)
709                .with_scale_factor(1000.0)
710                .with_cli_part(4) // part 4 of 10
711                .with_cli_part_count(10)
712                .assert(52320, expected_parts.clone());
713
714            // can not have more than 32k actual row groups in a parquet file
715            assert!(
716                expected_parts.end() - expected_parts.start() <= 32767,
717                "Expected parts {expected_parts:?} should not exceed 32k row groups",
718            );
719        }
720
721        #[test]
722        fn parquet_sf100000_lineitem_cli_parts_limit() {
723            let expected_parts = 98302..=131068;
724            Test::new()
725                .with_table(Table::Lineitem)
726                .with_format(OutputFormat::Parquet)
727                .with_scale_factor(100000.0)
728                .with_cli_part(4) // part 4 of 10
729                .with_cli_part_count(10)
730                .assert(327670, expected_parts.clone());
731
732            // can not have more than 32k actual row groups in a parquet file
733            assert!(
734                expected_parts.end() - expected_parts.start() <= 32767,
735                "Expected parts {expected_parts:?} should not exceed 32k row groups",
736            );
737        }
738
739        mod parquet_row_group_size {
740            use super::*;
741            #[test]
742            fn parquet_sf1_lineitem_default_row_group() {
743                Test::new()
744                    .with_table(Table::Lineitem)
745                    .with_format(OutputFormat::Parquet)
746                    .with_scale_factor(10.0)
747                    .assert(524, 1..=524);
748            }
749
750            #[test]
751            fn parquet_sf1_lineitem_small_row_group() {
752                Test::new()
753                    .with_table(Table::Lineitem)
754                    .with_format(OutputFormat::Parquet)
755                    .with_scale_factor(10.0)
756                    .with_parquet_row_group_bytes(1024 * 1024) // 1MB row groups
757                    .assert(3663, 1..=3663);
758            }
759
760            #[test]
761            fn parquet_sf1_lineitem_large_row_group() {
762                Test::new()
763                    .with_table(Table::Lineitem)
764                    .with_format(OutputFormat::Parquet)
765                    .with_scale_factor(10.0)
766                    .with_parquet_row_group_bytes(20 * 1024 * 1024) // 20MB row groups
767                    .assert(184, 1..=184);
768            }
769
770            #[test]
771            fn parquet_sf1_lineitem_small_row_group_max_groups() {
772                Test::new()
773                    .with_table(Table::Lineitem)
774                    .with_format(OutputFormat::Parquet)
775                    .with_scale_factor(100000.0)
776                    .with_parquet_row_group_bytes(1024 * 1024) // 1MB row groups
777                    // parquet is limited to no more than 32k actual row groups in a parquet file
778                    .assert(32767, 1..=32767);
779            }
780        }
781    }
782
783    /// Test fixture for [`GenerationPlan`].
784    #[derive(Debug)]
785    struct Test {
786        table: Table,
787        format: OutputFormat,
788        scale_factor: f64,
789        cli_part: Option<i32>,
790        cli_part_count: Option<i32>,
791        parquet_row_group_bytes: i64,
792    }
793
794    impl Test {
795        fn new() -> Self {
796            Default::default()
797        }
798
799        /// Create a [`GenerationPlan`] and assert it has the
800        /// expected number of parts and part numbers.
801        fn assert(self, expected_part_count: i32, expected_part_numbers: RangeInclusive<i32>) {
802            let plan = GenerationPlan::try_new(
803                self.table,
804                self.format,
805                self.scale_factor,
806                self.cli_part,
807                self.cli_part_count,
808                self.parquet_row_group_bytes,
809            )
810            .unwrap();
811            assert_eq!(plan.part_count, expected_part_count);
812            assert_eq!(plan.part_list, expected_part_numbers);
813        }
814
815        /// Assert that creating a [`GenerationPlan`] returns the specified error
816        fn assert_err(self, expected_error: &str) {
817            let actual_error = GenerationPlan::try_new(
818                self.table,
819                self.format,
820                self.scale_factor,
821                self.cli_part,
822                self.cli_part_count,
823                self.parquet_row_group_bytes,
824            )
825            .unwrap_err();
826            assert_eq!(actual_error, expected_error);
827        }
828
829        /// Set table
830        fn with_table(mut self, table: Table) -> Self {
831            self.table = table;
832            self
833        }
834
835        /// Set output format
836        fn with_format(mut self, format: OutputFormat) -> Self {
837            self.format = format;
838            self
839        }
840
841        /// Set scale factor
842        fn with_scale_factor(mut self, scale_factor: f64) -> Self {
843            self.scale_factor = scale_factor;
844            self
845        }
846
847        /// Set CLI part
848        fn with_cli_part(mut self, cli_part: i32) -> Self {
849            self.cli_part = Some(cli_part);
850            self
851        }
852
853        /// Set CLI partition count
854        fn with_cli_part_count(mut self, cli_part_count: i32) -> Self {
855            self.cli_part_count = Some(cli_part_count);
856            self
857        }
858
859        /// Set parquet row group size
860        fn with_parquet_row_group_bytes(mut self, parquet_row_group_bytes: i64) -> Self {
861            self.parquet_row_group_bytes = parquet_row_group_bytes;
862            self
863        }
864    }
865
866    impl Default for Test {
867        fn default() -> Self {
868            Self {
869                table: Table::Orders,
870                format: OutputFormat::Tbl,
871                scale_factor: 1.0,
872                cli_part: None,
873                cli_part_count: None,
874                parquet_row_group_bytes: DEFAULT_PARQUET_ROW_GROUP_BYTES,
875            }
876        }
877    }
878}