use crate::conversions::{decimal128_array_from_iter, to_arrow_date32};
use crate::{DEFAULT_BATCH_SIZE, RecordBatchIterator};
use arrow::array::{
Date32Array, Decimal128Array, Int32Array, Int64Array, RecordBatch, StringViewArray,
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use std::sync::{Arc, LazyLock};
use tpchgen::generators::{LineItemGenerator, LineItemGeneratorIterator};
pub struct LineItemArrow {
inner: LineItemGeneratorIterator<'static>,
batch_size: usize,
}
impl LineItemArrow {
pub fn new(generator: LineItemGenerator<'static>) -> Self {
Self {
inner: generator.iter(),
batch_size: DEFAULT_BATCH_SIZE,
}
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
}
impl RecordBatchIterator for LineItemArrow {
fn schema(&self) -> &SchemaRef {
&LINEITEM_SCHEMA
}
}
impl Iterator for LineItemArrow {
type Item = RecordBatch;
fn next(&mut self) -> Option<Self::Item> {
let rows: Vec<_> = self.inner.by_ref().take(self.batch_size).collect();
if rows.is_empty() {
return None;
}
let l_orderkey = Int64Array::from_iter_values(rows.iter().map(|row| row.l_orderkey));
let l_partkey = Int64Array::from_iter_values(rows.iter().map(|row| row.l_partkey));
let l_suppkey = Int64Array::from_iter_values(rows.iter().map(|row| row.l_suppkey));
let l_linenumber = Int32Array::from_iter_values(rows.iter().map(|row| row.l_linenumber));
let l_quantity = Decimal128Array::from_iter_values(rows.iter().map(|row| {
(row.l_quantity as i128) * 100
}))
.with_precision_and_scale(15, 2)
.unwrap();
let l_extended_price =
decimal128_array_from_iter(rows.iter().map(|row| row.l_extendedprice));
let l_discount = decimal128_array_from_iter(rows.iter().map(|row| row.l_discount));
let l_tax = decimal128_array_from_iter(rows.iter().map(|row| row.l_tax));
let l_returnflag =
StringViewArray::from_iter_values(rows.iter().map(|row| row.l_returnflag));
let l_linestatus =
StringViewArray::from_iter_values(rows.iter().map(|row| row.l_linestatus));
let l_shipdate = Date32Array::from_iter_values(
rows.iter().map(|row| row.l_shipdate).map(to_arrow_date32),
);
let l_commitdate = Date32Array::from_iter_values(
rows.iter().map(|row| row.l_commitdate).map(to_arrow_date32),
);
let l_receiptdate = Date32Array::from_iter_values(
rows.iter()
.map(|row| row.l_receiptdate)
.map(to_arrow_date32),
);
let l_shipinstruct =
StringViewArray::from_iter_values(rows.iter().map(|row| row.l_shipinstruct));
let l_shipmode = StringViewArray::from_iter_values(rows.iter().map(|row| row.l_shipmode));
let l_comment = StringViewArray::from_iter_values(rows.iter().map(|row| row.l_comment));
let batch = RecordBatch::try_new(
Arc::clone(self.schema()),
vec![
Arc::new(l_orderkey),
Arc::new(l_partkey),
Arc::new(l_suppkey),
Arc::new(l_linenumber),
Arc::new(l_quantity),
Arc::new(l_extended_price),
Arc::new(l_discount),
Arc::new(l_tax),
Arc::new(l_returnflag),
Arc::new(l_linestatus),
Arc::new(l_shipdate),
Arc::new(l_commitdate),
Arc::new(l_receiptdate),
Arc::new(l_shipinstruct),
Arc::new(l_shipmode),
Arc::new(l_comment),
],
)
.unwrap();
Some(batch)
}
}
static LINEITEM_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(make_lineitem_schema);
fn make_lineitem_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("l_orderkey", DataType::Int64, false),
Field::new("l_partkey", DataType::Int64, false),
Field::new("l_suppkey", DataType::Int64, false),
Field::new("l_linenumber", DataType::Int32, false),
Field::new("l_quantity", DataType::Decimal128(15, 2), false),
Field::new("l_extendedprice", DataType::Decimal128(15, 2), false),
Field::new("l_discount", DataType::Decimal128(15, 2), false),
Field::new("l_tax", DataType::Decimal128(15, 2), false),
Field::new("l_returnflag", DataType::Utf8View, false),
Field::new("l_linestatus", DataType::Utf8View, false),
Field::new("l_shipdate", DataType::Date32, false),
Field::new("l_commitdate", DataType::Date32, false),
Field::new("l_receiptdate", DataType::Date32, false),
Field::new("l_shipinstruct", DataType::Utf8View, false),
Field::new("l_shipmode", DataType::Utf8View, false),
Field::new("l_comment", DataType::Utf8View, false),
]))
}