1use crate::{OutputFormat, Table};
4use log::debug;
5use std::fmt::Display;
6use std::ops::RangeInclusive;
7use tpchgen::generators::{
8 CustomerGenerator, OrderGenerator, PartGenerator, PartSuppGenerator, SupplierGenerator,
9};
10
11#[derive(Debug, Clone, PartialEq)]
57pub struct GenerationPlan {
58 part_count: i32,
60 part_list: RangeInclusive<i32>,
62}
63
64pub const DEFAULT_PARQUET_ROW_GROUP_BYTES: i64 = 7 * 1024 * 1024;
65
66impl GenerationPlan {
67 pub fn try_new(
74 table: Table,
75 format: OutputFormat,
76 scale_factor: f64,
77 cli_part: Option<i32>,
78 cli_part_count: Option<i32>,
79 parquet_row_group_bytes: i64,
80 ) -> Result<Self, String> {
81 match (cli_part, cli_part_count) {
83 (Some(_part), None) => Err(String::from(
84 "The --part option requires the --parts option to be set",
85 )),
86 (None, Some(_part_count)) => {
87 Err(String::from(
90 "The --part_count option requires the --part option to be set",
91 ))
92 }
93 (Some(part), Some(part_count)) => Self::try_new_with_parts(
94 table,
95 format,
96 scale_factor,
97 part,
98 part_count,
99 parquet_row_group_bytes,
100 ),
101 (None, None) => {
102 Self::try_new_without_parts(table, format, scale_factor, parquet_row_group_bytes)
103 }
104 }
105 }
106
107 pub fn partitioned_table(table: Table) -> bool {
110 table != Table::Nation && table != Table::Region
111 }
112
113 fn try_new_with_parts(
117 table: Table,
118 format: OutputFormat,
119 scale_factor: f64,
120 cli_part: i32,
121 cli_part_count: i32,
122 parquet_row_group_bytes: i64,
123 ) -> Result<Self, String> {
124 if cli_part < 1 {
125 return Err(format!(
126 "Invalid --part. Expected a number greater than zero, got {cli_part}"
127 ));
128 }
129 if cli_part_count < 1 {
130 return Err(format!(
131 "Invalid --part_count. Expected a number greater than zero, got {cli_part_count}"
132 ));
133 }
134 if cli_part > cli_part_count {
135 return Err(format!(
136 "Invalid --part. Expected at most the value of --parts ({cli_part_count}), got {cli_part}"));
137 }
138
139 if !Self::partitioned_table(table) {
142 return Ok(Self {
143 part_count: 1,
144 part_list: 1..=1,
145 });
146 }
147
148 let num_chunks = OutputSize::new(table, scale_factor, format, parquet_row_group_bytes)
151 .with_scaled_row_count(cli_part_count)
152 .part_count();
153
154 let new_total_parts = cli_part_count * num_chunks;
157
158 let start_part = (cli_part - 1) * num_chunks + 1;
167 let end_part = cli_part * num_chunks;
168 let new_parts_to_generate = start_part..=end_part;
169 debug!(
170 "User specified cli_parts={cli_part_count}, cli_part={cli_part}. \
171 Generating {new_total_parts} partitions for table {table:?} \
172 with scale factor {scale_factor}: {new_parts_to_generate:?}"
173 );
174 Ok(Self {
175 part_count: new_total_parts,
176 part_list: new_parts_to_generate,
177 })
178 }
179
180 fn try_new_without_parts(
182 table: Table,
183 format: OutputFormat,
184 scale_factor: f64,
185 parquet_row_group_bytes: i64,
186 ) -> Result<Self, String> {
187 let output_size = OutputSize::new(table, scale_factor, format, parquet_row_group_bytes);
188 let num_parts = output_size.part_count();
189
190 Ok(Self {
191 part_count: num_parts,
192 part_list: 1..=num_parts,
193 })
194 }
195
196 pub fn chunk_count(&self) -> usize {
198 self.part_list.clone().count()
199 }
200}
201
202impl IntoIterator for GenerationPlan {
204 type Item = (i32, i32);
205 type IntoIter = std::vec::IntoIter<Self::Item>;
206
207 fn into_iter(self) -> Self::IntoIter {
208 self.part_list
209 .map(|part_number| (part_number, self.part_count))
210 .collect::<Vec<_>>()
211 .into_iter()
212 }
213}
214
215impl Display for GenerationPlan {
216 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217 write!(f, "GenerationPlan for {} parts", self.part_count)
218 }
219}
220
221#[derive(Debug)]
223struct OutputSize {
224 avg_row_size_bytes: i64,
226 row_count: i64,
228 target_chunk_size_bytes: i64,
230 max_part_count: Option<i64>,
232}
233
234impl OutputSize {
235 pub fn new(
236 table: Table,
237 scale_factor: f64,
238 format: OutputFormat,
239 parquet_row_group_bytes: i64,
240 ) -> Self {
241 let row_count = Self::row_count_for_table(table, scale_factor);
242
243 let avg_row_size_bytes = match format {
246 OutputFormat::Tbl | OutputFormat::Csv => match table {
247 Table::Nation => 88,
248 Table::Region => 77,
249 Table::Part => 115,
250 Table::Supplier => 140,
251 Table::Partsupp => 148,
252 Table::Customer => 160,
253 Table::Orders => 114,
254 Table::Lineitem => 128,
255 },
256 OutputFormat::Parquet => match table {
262 Table::Nation => 117,
263 Table::Region => 151,
264 Table::Part => 70,
265 Table::Supplier => 164,
266 Table::Partsupp => 141 * 4, Table::Customer => 168,
268 Table::Orders => 75,
269 Table::Lineitem => 64,
270 },
271 };
272
273 let target_chunk_size_bytes = match format {
274 OutputFormat::Tbl | OutputFormat::Csv => 15 * 1024 * 1024,
279 OutputFormat::Parquet => parquet_row_group_bytes,
280 };
281
282 let max_part_count = match format {
284 OutputFormat::Tbl | OutputFormat::Csv => None,
285 OutputFormat::Parquet => Some(32767),
286 };
287
288 debug!(
289 "Output size for table {table:?} with scale factor {scale_factor}: \
290 avg_row_size_bytes={avg_row_size_bytes}, row_count={row_count} \
291 target_chunk_size_bytes={target_chunk_size_bytes}, max_part_count={max_part_count:?}",
292 );
293
294 OutputSize {
295 avg_row_size_bytes,
296 row_count,
297 target_chunk_size_bytes,
298 max_part_count,
299 }
300 }
301
302 pub fn part_count(&self) -> i32 {
304 let mut num_parts =
305 ((self.row_count * self.avg_row_size_bytes) / self.target_chunk_size_bytes) + 1; if let Some(max_part_count) = self.max_part_count {
308 num_parts = num_parts.min(max_part_count)
310 }
311
312 num_parts.try_into().unwrap()
314 }
315
316 pub fn with_scaled_row_count(&self, cli_part_count: i32) -> OutputSize {
321 let scaled_row_count = self.row_count / cli_part_count as i64;
323 debug!(
324 "Scaling row count from {} to {scaled_row_count}",
325 self.row_count,
326 );
327 OutputSize {
328 avg_row_size_bytes: self.avg_row_size_bytes,
329 row_count: scaled_row_count,
330 target_chunk_size_bytes: self.target_chunk_size_bytes,
331 max_part_count: self.max_part_count,
332 }
333 }
334
335 fn row_count_for_table(table: Table, scale_factor: f64) -> i64 {
336 match table {
338 Table::Nation => 1,
339 Table::Region => 1,
340 Table::Part => PartGenerator::calculate_row_count(scale_factor, 1, 1),
341 Table::Supplier => SupplierGenerator::calculate_row_count(scale_factor, 1, 1),
342 Table::Partsupp => PartSuppGenerator::calculate_row_count(scale_factor, 1, 1),
343 Table::Customer => CustomerGenerator::calculate_row_count(scale_factor, 1, 1),
344 Table::Orders => OrderGenerator::calculate_row_count(scale_factor, 1, 1),
345 Table::Lineitem => {
346 4 * OrderGenerator::calculate_row_count(scale_factor, 1, 1)
351 }
352 }
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359
360 mod default_layouts {
364 use super::*;
365 #[test]
366 fn tbl_sf1_default_nation() {
367 Test::new()
368 .with_table(Table::Nation)
369 .with_format(OutputFormat::Tbl)
370 .with_scale_factor(1.0)
371 .assert(1, 1..=1)
372 }
373
374 #[test]
375 fn tbl_sf1_default_region() {
376 Test::new()
377 .with_table(Table::Region)
378 .with_format(OutputFormat::Tbl)
379 .with_scale_factor(1.0)
380 .assert(1, 1..=1)
381 }
382
383 #[test]
384 fn tbl_sf1_default_part() {
385 Test::new()
386 .with_table(Table::Part)
387 .with_format(OutputFormat::Tbl)
388 .with_scale_factor(1.0)
389 .assert(2, 1..=2)
390 }
391
392 #[test]
393 fn tbl_sf1_default_supplier() {
394 Test::new()
395 .with_table(Table::Supplier)
396 .with_format(OutputFormat::Tbl)
397 .with_scale_factor(1.0)
398 .assert(1, 1..=1)
399 }
400
401 #[test]
402 fn tbl_sf1_default_partsupp() {
403 Test::new()
404 .with_table(Table::Partsupp)
405 .with_format(OutputFormat::Tbl)
406 .with_scale_factor(1.0)
407 .assert(2, 1..=2)
408 }
409
410 #[test]
411 fn tbl_sf1_default_customer() {
412 Test::new()
413 .with_table(Table::Customer)
414 .with_format(OutputFormat::Tbl)
415 .with_scale_factor(1.0)
416 .assert(2, 1..=2)
417 }
418
419 #[test]
420 fn tbl_sf1_default_orders() {
421 Test::new()
422 .with_table(Table::Orders)
423 .with_format(OutputFormat::Tbl)
424 .with_scale_factor(1.0)
425 .assert(11, 1..=11)
426 }
427
428 #[test]
429 fn tbl_sf1_default_lineitem() {
430 Test::new()
431 .with_table(Table::Lineitem)
432 .with_format(OutputFormat::Tbl)
433 .with_scale_factor(1.0)
434 .assert(49, 1..=49)
435 }
436
437 #[test]
438 fn parquet_sf1_default_nation() {
439 Test::new()
440 .with_table(Table::Nation)
441 .with_format(OutputFormat::Parquet)
442 .with_scale_factor(1.0)
443 .assert(1, 1..=1)
444 }
445
446 #[test]
447 fn parquet_sf1_default_region() {
448 Test::new()
449 .with_table(Table::Region)
450 .with_format(OutputFormat::Parquet)
451 .with_scale_factor(1.0)
452 .assert(1, 1..=1)
453 }
454
455 #[test]
456 fn parquet_sf1_default_part() {
457 Test::new()
458 .with_table(Table::Part)
459 .with_format(OutputFormat::Parquet)
460 .with_scale_factor(1.0)
461 .assert(2, 1..=2)
462 }
463
464 #[test]
465 fn parquet_sf1_default_supplier() {
466 Test::new()
467 .with_table(Table::Supplier)
468 .with_format(OutputFormat::Parquet)
469 .with_scale_factor(1.0)
470 .assert(1, 1..=1)
471 }
472
473 #[test]
474 fn parquet_sf1_default_partsupp() {
475 Test::new()
476 .with_table(Table::Partsupp)
477 .with_format(OutputFormat::Parquet)
478 .with_scale_factor(1.0)
479 .assert(16, 1..=16)
480 }
481
482 #[test]
483 fn parquet_sf1_default_customer() {
484 Test::new()
485 .with_table(Table::Customer)
486 .with_format(OutputFormat::Parquet)
487 .with_scale_factor(1.0)
488 .assert(4, 1..=4)
489 }
490
491 #[test]
492 fn parquet_sf1_default_orders() {
493 Test::new()
494 .with_table(Table::Orders)
495 .with_format(OutputFormat::Parquet)
496 .with_scale_factor(1.0)
497 .assert(16, 1..=16)
498 }
499
500 #[test]
501 fn parquet_sf1_default_lineitem() {
502 Test::new()
503 .with_table(Table::Lineitem)
504 .with_format(OutputFormat::Parquet)
505 .with_scale_factor(1.0)
506 .assert(53, 1..=53)
507 }
508 }
509
510 mod partitions {
512 use super::*;
513
514 #[test]
515 fn tbl_sf1_nation_cli_parts() {
516 Test::new()
517 .with_table(Table::Nation)
518 .with_format(OutputFormat::Tbl)
519 .with_scale_factor(1.0)
520 .with_cli_part(1)
522 .with_cli_part_count(10)
523 .assert(1, 1..=1)
525 }
526
527 #[test]
528 fn tbl_sf1_region_cli_parts() {
529 Test::new()
530 .with_table(Table::Region)
531 .with_format(OutputFormat::Tbl)
532 .with_scale_factor(1.0)
533 .with_cli_part(1)
535 .with_cli_part_count(10)
536 .assert(1, 1..=1)
538 }
539
540 #[test]
541 fn tbl_sf1_lineitem_cli_parts_1() {
542 Test::new()
543 .with_table(Table::Lineitem)
544 .with_format(OutputFormat::Tbl)
545 .with_scale_factor(1.0)
546 .with_cli_part(1)
548 .with_cli_part_count(10)
549 .assert(50, 1..=5)
550 }
551
552 #[test]
553 fn tbl_sf1_lineitem_cli_parts_4() {
554 Test::new()
555 .with_table(Table::Lineitem)
556 .with_format(OutputFormat::Tbl)
557 .with_scale_factor(1.0)
558 .with_cli_part(4) .with_cli_part_count(10)
560 .assert(50, 16..=20)
561 }
562
563 #[test]
564 fn parquet_sf1_region_cli_parts() {
565 Test::new()
566 .with_table(Table::Region)
567 .with_format(OutputFormat::Parquet)
568 .with_scale_factor(1.0)
569 .with_cli_part(1)
571 .with_cli_part_count(10)
572 .assert(1, 1..=1)
574 }
575
576 #[test]
577 fn parquet_sf1_lineitem_cli_parts_1() {
578 Test::new()
579 .with_table(Table::Lineitem)
580 .with_format(OutputFormat::Parquet)
581 .with_scale_factor(1.0)
582 .with_cli_part(1)
584 .with_cli_part_count(10)
585 .assert(60, 1..=6)
587 }
588
589 #[test]
590 fn parquet_sf1_lineitem_cli_parts_4() {
591 Test::new()
592 .with_table(Table::Lineitem)
593 .with_format(OutputFormat::Parquet)
594 .with_scale_factor(1.0)
595 .with_cli_part(4) .with_cli_part_count(10)
597 .assert(60, 19..=24)
599 }
600
601 #[test]
602 fn parquet_sf1_lineitem_cli_parts_10() {
603 Test::new()
604 .with_table(Table::Lineitem)
605 .with_format(OutputFormat::Parquet)
606 .with_scale_factor(1.0)
607 .with_cli_part(10) .with_cli_part_count(10)
609 .assert(60, 55..=60)
611 }
612
613 #[test]
614 fn tbl_sf1_lineitem_cli_invalid_part() {
615 Test::new()
616 .with_table(Table::Lineitem)
617 .with_format(OutputFormat::Tbl)
618 .with_scale_factor(1.0)
619 .with_cli_part(0) .with_cli_part_count(10)
621 .assert_err("Invalid --part. Expected a number greater than zero, got 0")
622 }
623 }
624
625 mod errors {
627 use super::*;
628
629 #[test]
630 fn sf1_lineitem_cli_invalid_part() {
631 Test::new()
632 .with_table(Table::Lineitem)
633 .with_format(OutputFormat::Tbl)
634 .with_scale_factor(1.0)
635 .with_cli_part(0) .with_cli_part_count(10)
637 .assert_err("Invalid --part. Expected a number greater than zero, got 0")
638 }
639
640 #[test]
641 fn tbl_sf1_lineitem_cli_parts_invalid_big() {
642 Test::new()
643 .with_table(Table::Lineitem)
644 .with_format(OutputFormat::Tbl)
645 .with_scale_factor(1.0)
646 .with_cli_part(11) .with_cli_part_count(10)
648 .assert_err("Invalid --part. Expected at most the value of --parts (10), got 11");
649 }
650
651 #[test]
652 fn tbl_sf1_lineitem_cli_invalid_part_count() {
653 Test::new()
654 .with_table(Table::Lineitem)
655 .with_format(OutputFormat::Tbl)
656 .with_scale_factor(1.0)
657 .with_cli_part(1) .with_cli_part_count(0)
659 .assert_err("Invalid --part_count. Expected a number greater than zero, got 0");
660 }
661 }
662
663 mod limits {
665 use super::*;
666 #[test]
667 fn parquet_sf10_lineitem_limit() {
668 Test::new()
669 .with_table(Table::Lineitem)
670 .with_format(OutputFormat::Parquet)
671 .with_scale_factor(10.0)
672 .assert(524, 1..=524);
673 }
674
675 #[test]
676 fn tbl_sf10_lineitem_limit() {
677 Test::new()
678 .with_table(Table::Lineitem)
679 .with_format(OutputFormat::Tbl)
680 .with_scale_factor(10.0)
681 .assert(489, 1..=489);
682 }
683 #[test]
684 fn tbl_sf1000_lineitem_limit() {
685 Test::new()
686 .with_table(Table::Lineitem)
687 .with_format(OutputFormat::Tbl)
688 .with_scale_factor(1000.0)
689 .assert(48829, 1..=48829);
690 }
691
692 #[test]
693 fn parquet_sf1000_lineitem_limit() {
694 Test::new()
695 .with_table(Table::Lineitem)
696 .with_format(OutputFormat::Parquet)
697 .with_scale_factor(1000.0)
698 .assert(32767, 1..=32767);
699 }
700
701 #[test]
704 fn parquet_sf1000_lineitem_cli_parts_limit() {
705 let expected_parts = 15697..=20928;
706 Test::new()
707 .with_table(Table::Lineitem)
708 .with_format(OutputFormat::Parquet)
709 .with_scale_factor(1000.0)
710 .with_cli_part(4) .with_cli_part_count(10)
712 .assert(52320, expected_parts.clone());
713
714 assert!(
716 expected_parts.end() - expected_parts.start() <= 32767,
717 "Expected parts {expected_parts:?} should not exceed 32k row groups",
718 );
719 }
720
721 #[test]
722 fn parquet_sf100000_lineitem_cli_parts_limit() {
723 let expected_parts = 98302..=131068;
724 Test::new()
725 .with_table(Table::Lineitem)
726 .with_format(OutputFormat::Parquet)
727 .with_scale_factor(100000.0)
728 .with_cli_part(4) .with_cli_part_count(10)
730 .assert(327670, expected_parts.clone());
731
732 assert!(
734 expected_parts.end() - expected_parts.start() <= 32767,
735 "Expected parts {expected_parts:?} should not exceed 32k row groups",
736 );
737 }
738
739 mod parquet_row_group_size {
740 use super::*;
741 #[test]
742 fn parquet_sf1_lineitem_default_row_group() {
743 Test::new()
744 .with_table(Table::Lineitem)
745 .with_format(OutputFormat::Parquet)
746 .with_scale_factor(10.0)
747 .assert(524, 1..=524);
748 }
749
750 #[test]
751 fn parquet_sf1_lineitem_small_row_group() {
752 Test::new()
753 .with_table(Table::Lineitem)
754 .with_format(OutputFormat::Parquet)
755 .with_scale_factor(10.0)
756 .with_parquet_row_group_bytes(1024 * 1024) .assert(3663, 1..=3663);
758 }
759
760 #[test]
761 fn parquet_sf1_lineitem_large_row_group() {
762 Test::new()
763 .with_table(Table::Lineitem)
764 .with_format(OutputFormat::Parquet)
765 .with_scale_factor(10.0)
766 .with_parquet_row_group_bytes(20 * 1024 * 1024) .assert(184, 1..=184);
768 }
769
770 #[test]
771 fn parquet_sf1_lineitem_small_row_group_max_groups() {
772 Test::new()
773 .with_table(Table::Lineitem)
774 .with_format(OutputFormat::Parquet)
775 .with_scale_factor(100000.0)
776 .with_parquet_row_group_bytes(1024 * 1024) .assert(32767, 1..=32767);
779 }
780 }
781 }
782
783 #[derive(Debug)]
785 struct Test {
786 table: Table,
787 format: OutputFormat,
788 scale_factor: f64,
789 cli_part: Option<i32>,
790 cli_part_count: Option<i32>,
791 parquet_row_group_bytes: i64,
792 }
793
794 impl Test {
795 fn new() -> Self {
796 Default::default()
797 }
798
799 fn assert(self, expected_part_count: i32, expected_part_numbers: RangeInclusive<i32>) {
802 let plan = GenerationPlan::try_new(
803 self.table,
804 self.format,
805 self.scale_factor,
806 self.cli_part,
807 self.cli_part_count,
808 self.parquet_row_group_bytes,
809 )
810 .unwrap();
811 assert_eq!(plan.part_count, expected_part_count);
812 assert_eq!(plan.part_list, expected_part_numbers);
813 }
814
815 fn assert_err(self, expected_error: &str) {
817 let actual_error = GenerationPlan::try_new(
818 self.table,
819 self.format,
820 self.scale_factor,
821 self.cli_part,
822 self.cli_part_count,
823 self.parquet_row_group_bytes,
824 )
825 .unwrap_err();
826 assert_eq!(actual_error, expected_error);
827 }
828
829 fn with_table(mut self, table: Table) -> Self {
831 self.table = table;
832 self
833 }
834
835 fn with_format(mut self, format: OutputFormat) -> Self {
837 self.format = format;
838 self
839 }
840
841 fn with_scale_factor(mut self, scale_factor: f64) -> Self {
843 self.scale_factor = scale_factor;
844 self
845 }
846
847 fn with_cli_part(mut self, cli_part: i32) -> Self {
849 self.cli_part = Some(cli_part);
850 self
851 }
852
853 fn with_cli_part_count(mut self, cli_part_count: i32) -> Self {
855 self.cli_part_count = Some(cli_part_count);
856 self
857 }
858
859 fn with_parquet_row_group_bytes(mut self, parquet_row_group_bytes: i64) -> Self {
861 self.parquet_row_group_bytes = parquet_row_group_bytes;
862 self
863 }
864 }
865
866 impl Default for Test {
867 fn default() -> Self {
868 Self {
869 table: Table::Orders,
870 format: OutputFormat::Tbl,
871 scale_factor: 1.0,
872 cli_part: None,
873 cli_part_count: None,
874 parquet_row_group_bytes: DEFAULT_PARQUET_ROW_GROUP_BYTES,
875 }
876 }
877 }
878}