use crate::tpch_cli::{OutputFormat, Table};
use log::debug;
use std::fmt::Display;
use std::ops::RangeInclusive;
use tpchgen::generators::{
CustomerGenerator, OrderGenerator, PartGenerator, PartSuppGenerator, SupplierGenerator,
};
#[derive(Debug, Clone, PartialEq)]
pub struct GenerationPlan {
part_count: i32,
part_list: RangeInclusive<i32>,
}
pub const DEFAULT_PARQUET_ROW_GROUP_BYTES: i64 = 7 * 1024 * 1024;
impl GenerationPlan {
pub fn try_new(
table: Table,
format: OutputFormat,
scale_factor: f64,
cli_part: Option<i32>,
cli_part_count: Option<i32>,
parquet_row_group_bytes: i64,
) -> Result<Self, String> {
match (cli_part, cli_part_count) {
(Some(_part), None) => Err(String::from(
"The --part option requires the --parts option to be set",
)),
(None, Some(_part_count)) => {
Err(String::from(
"The --part_count option requires the --part option to be set",
))
}
(Some(part), Some(part_count)) => Self::try_new_with_parts(
table,
format,
scale_factor,
part,
part_count,
parquet_row_group_bytes,
),
(None, None) => {
Self::try_new_without_parts(table, format, scale_factor, parquet_row_group_bytes)
}
}
}
pub fn partitioned_table(table: Table) -> bool {
table != Table::Nation && table != Table::Region
}
fn try_new_with_parts(
table: Table,
format: OutputFormat,
scale_factor: f64,
cli_part: i32,
cli_part_count: i32,
parquet_row_group_bytes: i64,
) -> Result<Self, String> {
if cli_part < 1 {
return Err(format!(
"Invalid --part. Expected a number greater than zero, got {cli_part}"
));
}
if cli_part_count < 1 {
return Err(format!(
"Invalid --part_count. Expected a number greater than zero, got {cli_part_count}"
));
}
if cli_part > cli_part_count {
return Err(format!(
"Invalid --part. Expected at most the value of --parts ({cli_part_count}), got {cli_part}"));
}
if !Self::partitioned_table(table) {
return Ok(Self {
part_count: 1,
part_list: 1..=1,
});
}
let num_chunks = OutputSize::new(table, scale_factor, format, parquet_row_group_bytes)
.with_scaled_row_count(cli_part_count)
.part_count();
let new_total_parts = cli_part_count * num_chunks;
let start_part = (cli_part - 1) * num_chunks + 1;
let end_part = cli_part * num_chunks;
let new_parts_to_generate = start_part..=end_part;
debug!(
"User specified cli_parts={cli_part_count}, cli_part={cli_part}. \
Generating {new_total_parts} partitions for table {table:?} \
with scale factor {scale_factor}: {new_parts_to_generate:?}"
);
Ok(Self {
part_count: new_total_parts,
part_list: new_parts_to_generate,
})
}
fn try_new_without_parts(
table: Table,
format: OutputFormat,
scale_factor: f64,
parquet_row_group_bytes: i64,
) -> Result<Self, String> {
let output_size = OutputSize::new(table, scale_factor, format, parquet_row_group_bytes);
let num_parts = output_size.part_count();
Ok(Self {
part_count: num_parts,
part_list: 1..=num_parts,
})
}
pub fn chunk_count(&self) -> usize {
self.part_list.clone().count()
}
}
impl IntoIterator for GenerationPlan {
type Item = (i32, i32);
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.part_list
.map(|part_number| (part_number, self.part_count))
.collect::<Vec<_>>()
.into_iter()
}
}
impl Display for GenerationPlan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "GenerationPlan for {} parts", self.part_count)
}
}
#[derive(Debug)]
struct OutputSize {
avg_row_size_bytes: i64,
row_count: i64,
target_chunk_size_bytes: i64,
max_part_count: Option<i64>,
}
impl OutputSize {
pub fn new(
table: Table,
scale_factor: f64,
format: OutputFormat,
parquet_row_group_bytes: i64,
) -> Self {
let row_count = Self::row_count_for_table(table, scale_factor);
let avg_row_size_bytes = match format {
OutputFormat::Tbl | OutputFormat::Csv => match table {
Table::Nation => 88,
Table::Region => 77,
Table::Part => 115,
Table::Supplier => 140,
Table::Partsupp => 148,
Table::Customer => 160,
Table::Orders => 114,
Table::Lineitem => 128,
},
OutputFormat::Parquet => match table {
Table::Nation => 117,
Table::Region => 151,
Table::Part => 70,
Table::Supplier => 164,
Table::Partsupp => 141 * 4, Table::Customer => 168,
Table::Orders => 75,
Table::Lineitem => 64,
},
};
let target_chunk_size_bytes = match format {
OutputFormat::Tbl | OutputFormat::Csv => 15 * 1024 * 1024,
OutputFormat::Parquet => parquet_row_group_bytes,
};
let max_part_count = match format {
OutputFormat::Tbl | OutputFormat::Csv => None,
OutputFormat::Parquet => Some(32767),
};
debug!(
"Output size for table {table:?} with scale factor {scale_factor}: \
avg_row_size_bytes={avg_row_size_bytes}, row_count={row_count} \
target_chunk_size_bytes={target_chunk_size_bytes}, max_part_count={max_part_count:?}",
);
OutputSize {
avg_row_size_bytes,
row_count,
target_chunk_size_bytes,
max_part_count,
}
}
pub fn part_count(&self) -> i32 {
let mut num_parts =
((self.row_count * self.avg_row_size_bytes) / self.target_chunk_size_bytes) + 1;
if let Some(max_part_count) = self.max_part_count {
num_parts = num_parts.min(max_part_count)
}
num_parts.try_into().unwrap()
}
pub fn with_scaled_row_count(&self, cli_part_count: i32) -> OutputSize {
let scaled_row_count = self.row_count / cli_part_count as i64;
debug!(
"Scaling row count from {} to {scaled_row_count}",
self.row_count,
);
OutputSize {
avg_row_size_bytes: self.avg_row_size_bytes,
row_count: scaled_row_count,
target_chunk_size_bytes: self.target_chunk_size_bytes,
max_part_count: self.max_part_count,
}
}
fn row_count_for_table(table: Table, scale_factor: f64) -> i64 {
match table {
Table::Nation => 1,
Table::Region => 1,
Table::Part => PartGenerator::calculate_row_count(scale_factor, 1, 1),
Table::Supplier => SupplierGenerator::calculate_row_count(scale_factor, 1, 1),
Table::Partsupp => PartSuppGenerator::calculate_row_count(scale_factor, 1, 1),
Table::Customer => CustomerGenerator::calculate_row_count(scale_factor, 1, 1),
Table::Orders => OrderGenerator::calculate_row_count(scale_factor, 1, 1),
Table::Lineitem => {
4 * OrderGenerator::calculate_row_count(scale_factor, 1, 1)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
mod default_layouts {
use super::*;
#[test]
fn tbl_sf1_default_nation() {
Test::new()
.with_table(Table::Nation)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.assert(1, 1..=1)
}
#[test]
fn tbl_sf1_default_region() {
Test::new()
.with_table(Table::Region)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.assert(1, 1..=1)
}
#[test]
fn tbl_sf1_default_part() {
Test::new()
.with_table(Table::Part)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.assert(2, 1..=2)
}
#[test]
fn tbl_sf1_default_supplier() {
Test::new()
.with_table(Table::Supplier)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.assert(1, 1..=1)
}
#[test]
fn tbl_sf1_default_partsupp() {
Test::new()
.with_table(Table::Partsupp)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.assert(2, 1..=2)
}
#[test]
fn tbl_sf1_default_customer() {
Test::new()
.with_table(Table::Customer)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.assert(2, 1..=2)
}
#[test]
fn tbl_sf1_default_orders() {
Test::new()
.with_table(Table::Orders)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.assert(11, 1..=11)
}
#[test]
fn tbl_sf1_default_lineitem() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.assert(49, 1..=49)
}
#[test]
fn parquet_sf1_default_nation() {
Test::new()
.with_table(Table::Nation)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1.0)
.assert(1, 1..=1)
}
#[test]
fn parquet_sf1_default_region() {
Test::new()
.with_table(Table::Region)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1.0)
.assert(1, 1..=1)
}
#[test]
fn parquet_sf1_default_part() {
Test::new()
.with_table(Table::Part)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1.0)
.assert(2, 1..=2)
}
#[test]
fn parquet_sf1_default_supplier() {
Test::new()
.with_table(Table::Supplier)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1.0)
.assert(1, 1..=1)
}
#[test]
fn parquet_sf1_default_partsupp() {
Test::new()
.with_table(Table::Partsupp)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1.0)
.assert(16, 1..=16)
}
#[test]
fn parquet_sf1_default_customer() {
Test::new()
.with_table(Table::Customer)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1.0)
.assert(4, 1..=4)
}
#[test]
fn parquet_sf1_default_orders() {
Test::new()
.with_table(Table::Orders)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1.0)
.assert(16, 1..=16)
}
#[test]
fn parquet_sf1_default_lineitem() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1.0)
.assert(53, 1..=53)
}
}
mod partitions {
use super::*;
#[test]
fn tbl_sf1_nation_cli_parts() {
Test::new()
.with_table(Table::Nation)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.with_cli_part(1)
.with_cli_part_count(10)
.assert(1, 1..=1)
}
#[test]
fn tbl_sf1_region_cli_parts() {
Test::new()
.with_table(Table::Region)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.with_cli_part(1)
.with_cli_part_count(10)
.assert(1, 1..=1)
}
#[test]
fn tbl_sf1_lineitem_cli_parts_1() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.with_cli_part(1)
.with_cli_part_count(10)
.assert(50, 1..=5)
}
#[test]
fn tbl_sf1_lineitem_cli_parts_4() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.with_cli_part(4) .with_cli_part_count(10)
.assert(50, 16..=20)
}
#[test]
fn parquet_sf1_region_cli_parts() {
Test::new()
.with_table(Table::Region)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1.0)
.with_cli_part(1)
.with_cli_part_count(10)
.assert(1, 1..=1)
}
#[test]
fn parquet_sf1_lineitem_cli_parts_1() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1.0)
.with_cli_part(1)
.with_cli_part_count(10)
.assert(60, 1..=6)
}
#[test]
fn parquet_sf1_lineitem_cli_parts_4() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1.0)
.with_cli_part(4) .with_cli_part_count(10)
.assert(60, 19..=24)
}
#[test]
fn parquet_sf1_lineitem_cli_parts_10() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1.0)
.with_cli_part(10) .with_cli_part_count(10)
.assert(60, 55..=60)
}
#[test]
fn tbl_sf1_lineitem_cli_invalid_part() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.with_cli_part(0) .with_cli_part_count(10)
.assert_err("Invalid --part. Expected a number greater than zero, got 0")
}
}
mod errors {
use super::*;
#[test]
fn sf1_lineitem_cli_invalid_part() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.with_cli_part(0) .with_cli_part_count(10)
.assert_err("Invalid --part. Expected a number greater than zero, got 0")
}
#[test]
fn tbl_sf1_lineitem_cli_parts_invalid_big() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.with_cli_part(11) .with_cli_part_count(10)
.assert_err("Invalid --part. Expected at most the value of --parts (10), got 11");
}
#[test]
fn tbl_sf1_lineitem_cli_invalid_part_count() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1.0)
.with_cli_part(1) .with_cli_part_count(0)
.assert_err("Invalid --part_count. Expected a number greater than zero, got 0");
}
}
mod limits {
use super::*;
#[test]
fn parquet_sf10_lineitem_limit() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Parquet)
.with_scale_factor(10.0)
.assert(524, 1..=524);
}
#[test]
fn tbl_sf10_lineitem_limit() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Tbl)
.with_scale_factor(10.0)
.assert(489, 1..=489);
}
#[test]
fn tbl_sf1000_lineitem_limit() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Tbl)
.with_scale_factor(1000.0)
.assert(48829, 1..=48829);
}
#[test]
fn parquet_sf1000_lineitem_limit() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1000.0)
.assert(32767, 1..=32767);
}
#[test]
fn parquet_sf1000_lineitem_cli_parts_limit() {
let expected_parts = 15697..=20928;
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Parquet)
.with_scale_factor(1000.0)
.with_cli_part(4) .with_cli_part_count(10)
.assert(52320, expected_parts.clone());
assert!(
expected_parts.end() - expected_parts.start() <= 32767,
"Expected parts {expected_parts:?} should not exceed 32k row groups",
);
}
#[test]
fn parquet_sf100000_lineitem_cli_parts_limit() {
let expected_parts = 98302..=131068;
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Parquet)
.with_scale_factor(100000.0)
.with_cli_part(4) .with_cli_part_count(10)
.assert(327670, expected_parts.clone());
assert!(
expected_parts.end() - expected_parts.start() <= 32767,
"Expected parts {expected_parts:?} should not exceed 32k row groups",
);
}
mod parquet_row_group_size {
use super::*;
#[test]
fn parquet_sf1_lineitem_default_row_group() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Parquet)
.with_scale_factor(10.0)
.assert(524, 1..=524);
}
#[test]
fn parquet_sf1_lineitem_small_row_group() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Parquet)
.with_scale_factor(10.0)
.with_parquet_row_group_bytes(1024 * 1024) .assert(3663, 1..=3663);
}
#[test]
fn parquet_sf1_lineitem_large_row_group() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Parquet)
.with_scale_factor(10.0)
.with_parquet_row_group_bytes(20 * 1024 * 1024) .assert(184, 1..=184);
}
#[test]
fn parquet_sf1_lineitem_small_row_group_max_groups() {
Test::new()
.with_table(Table::Lineitem)
.with_format(OutputFormat::Parquet)
.with_scale_factor(100000.0)
.with_parquet_row_group_bytes(1024 * 1024) .assert(32767, 1..=32767);
}
}
}
#[derive(Debug)]
struct Test {
table: Table,
format: OutputFormat,
scale_factor: f64,
cli_part: Option<i32>,
cli_part_count: Option<i32>,
parquet_row_group_bytes: i64,
}
impl Test {
fn new() -> Self {
Default::default()
}
fn assert(self, expected_part_count: i32, expected_part_numbers: RangeInclusive<i32>) {
let plan = GenerationPlan::try_new(
self.table,
self.format,
self.scale_factor,
self.cli_part,
self.cli_part_count,
self.parquet_row_group_bytes,
)
.unwrap();
assert_eq!(plan.part_count, expected_part_count);
assert_eq!(plan.part_list, expected_part_numbers);
}
fn assert_err(self, expected_error: &str) {
let actual_error = GenerationPlan::try_new(
self.table,
self.format,
self.scale_factor,
self.cli_part,
self.cli_part_count,
self.parquet_row_group_bytes,
)
.unwrap_err();
assert_eq!(actual_error, expected_error);
}
fn with_table(mut self, table: Table) -> Self {
self.table = table;
self
}
fn with_format(mut self, format: OutputFormat) -> Self {
self.format = format;
self
}
fn with_scale_factor(mut self, scale_factor: f64) -> Self {
self.scale_factor = scale_factor;
self
}
fn with_cli_part(mut self, cli_part: i32) -> Self {
self.cli_part = Some(cli_part);
self
}
fn with_cli_part_count(mut self, cli_part_count: i32) -> Self {
self.cli_part_count = Some(cli_part_count);
self
}
fn with_parquet_row_group_bytes(mut self, parquet_row_group_bytes: i64) -> Self {
self.parquet_row_group_bytes = parquet_row_group_bytes;
self
}
}
impl Default for Test {
fn default() -> Self {
Self {
table: Table::Orders,
format: OutputFormat::Tbl,
scale_factor: 1.0,
cli_part: None,
cli_part_count: None,
parquet_row_group_bytes: DEFAULT_PARQUET_ROW_GROUP_BYTES,
}
}
}
}