use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
use std::io::Write;
use std::sync::Arc;
use tpchgen::csv::{
CustomerCsv, LineItemCsv, NationCsv, OrderCsv, PartCsv, PartSuppCsv, RegionCsv, SupplierCsv,
};
use tpchgen::generators::{
Customer, CustomerGenerator, LineItem, LineItemGenerator, Nation, NationGenerator, Order,
OrderGenerator, Part, PartGenerator, PartSupp, PartSuppGenerator, Region, RegionGenerator,
Supplier, SupplierGenerator,
};
use tpchgen_arrow::{
CustomerArrow, LineItemArrow, NationArrow, OrderArrow, PartArrow, PartSuppArrow,
RecordBatchIterator, RegionArrow, SupplierArrow,
};
macro_rules! test_row_type {
($FUNCNAME:ident, $GENERATOR:ty, $ARROWITER:ty, $FORMATTYPE:expr) => {
#[test]
fn $FUNCNAME() {
let scale_factor = 0.1;
let batch_size = 1000;
let part = 1;
let part_count = 1;
let generator = <$GENERATOR>::new(scale_factor, part, part_count);
$FORMATTYPE.test(
generator.clone().iter(),
<$ARROWITER>::new(generator).with_batch_size(batch_size),
);
}
};
}
test_row_type!(customer_tbl, CustomerGenerator, CustomerArrow, Test::tbl());
test_row_type!(customer_csv, CustomerGenerator, CustomerArrow, Test::csv());
test_row_type!(lineitem_tbl, LineItemGenerator, LineItemArrow, Test::tbl());
test_row_type!(lineitem_csv, LineItemGenerator, LineItemArrow, Test::csv());
test_row_type!(nation_tbl, NationGenerator, NationArrow, Test::tbl());
test_row_type!(nation_csv, NationGenerator, NationArrow, Test::csv());
test_row_type!(order_tbl, OrderGenerator, OrderArrow, Test::tbl());
test_row_type!(order_csv, OrderGenerator, OrderArrow, Test::csv());
test_row_type!(part_tbl, PartGenerator, PartArrow, Test::tbl());
test_row_type!(part_csv, PartGenerator, PartArrow, Test::csv());
test_row_type!(partsupp_tbl, PartSuppGenerator, PartSuppArrow, Test::tbl());
test_row_type!(partsupp_csv, PartSuppGenerator, PartSuppArrow, Test::csv());
test_row_type!(region_tbl, RegionGenerator, RegionArrow, Test::tbl());
test_row_type!(region_csv, RegionGenerator, RegionArrow, Test::csv());
test_row_type!(supplier_tbl, SupplierGenerator, SupplierArrow, Test::tbl());
test_row_type!(supplier_csv, SupplierGenerator, SupplierArrow, Test::csv());
trait RowType {
fn write_tbl_row(self, text_data: &mut Vec<u8>);
fn write_csv_header(text_data: &mut Vec<u8>);
fn write_csv_row(self, text_data: &mut Vec<u8>);
}
macro_rules! impl_row_type {
($type:ty, $csv_type:ty) => {
impl RowType for $type {
fn write_tbl_row(self, text_data: &mut Vec<u8>) {
write!(text_data, "{}", self).unwrap();
}
fn write_csv_header(text_data: &mut Vec<u8>) {
writeln!(text_data, "{}", <$csv_type>::header()).unwrap();
}
fn write_csv_row(self, text_data: &mut Vec<u8>) {
writeln!(text_data, "{}", <$csv_type>::new(self)).unwrap();
}
}
};
}
impl_row_type!(Customer<'_>, CustomerCsv);
impl_row_type!(LineItem<'_>, LineItemCsv);
impl_row_type!(Nation<'_>, NationCsv);
impl_row_type!(Order<'_>, OrderCsv);
impl_row_type!(Part<'_>, PartCsv);
impl_row_type!(PartSupp<'_>, PartSuppCsv);
impl_row_type!(Region<'_>, RegionCsv);
impl_row_type!(Supplier, SupplierCsv);
#[derive(Debug, Clone, Copy)]
#[allow(clippy::upper_case_acronyms)]
enum Test {
TBL,
CSV,
}
impl Test {
fn tbl() -> Self {
Self::TBL
}
fn csv() -> Self {
Self::CSV
}
fn test<R, RI, RBI>(&self, mut row_iter: RI, mut arrow_iter: RBI)
where
R: RowType,
RI: Iterator<Item = R>,
RBI: RecordBatchIterator,
{
while let Some(arrow_batch) = arrow_iter.next() {
let batch_size = arrow_batch.num_rows();
let mut text_data = Vec::new();
self.write_header::<R>(&mut text_data);
row_iter.by_ref().take(batch_size).for_each(|item| {
self.write_row(item, &mut text_data);
});
let reparsed_batch = self.parse(&text_data, arrow_iter.schema(), batch_size);
assert_eq!(reparsed_batch, arrow_batch);
}
}
fn write_header<R: RowType>(&self, text_data: &mut Vec<u8>) {
match self {
Test::TBL => {}
Test::CSV => {
R::write_csv_header(text_data);
}
}
}
fn write_row<R: RowType>(&self, row: R, text_data: &mut Vec<u8>) {
match self {
Test::TBL => {
row.write_tbl_row(text_data);
let end_offset = text_data.len() - 1;
text_data[end_offset] = b'\n';
}
Test::CSV => {
row.write_csv_row(text_data);
}
}
}
fn parse(&self, data: &[u8], schema: &SchemaRef, batch_size: usize) -> RecordBatch {
let builder =
arrow_csv::reader::ReaderBuilder::new(Arc::clone(schema)).with_batch_size(batch_size);
let builder = match self {
Test::TBL => builder.with_header(false).with_delimiter(b'|'),
Test::CSV => builder.with_header(true),
};
let mut parser = builder.build(data).unwrap();
let batch = parser
.next()
.expect("should have a batch")
.expect("should have no errors parsing");
assert!(parser.next().is_none(), "should have only one batch");
batch
}
}