use crate::conversions::{decimal128_array_from_iter, string_view_array_from_display_iter};
use crate::{DEFAULT_BATCH_SIZE, RecordBatchIterator};
use arrow::array::{Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use std::sync::{Arc, LazyLock};
use tpchgen::generators::{CustomerGenerator, CustomerGeneratorIterator};
pub struct CustomerArrow {
inner: CustomerGeneratorIterator<'static>,
batch_size: usize,
}
impl CustomerArrow {
pub fn new(generator: CustomerGenerator<'static>) -> Self {
Self {
inner: generator.iter(),
batch_size: DEFAULT_BATCH_SIZE,
}
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
}
impl RecordBatchIterator for CustomerArrow {
fn schema(&self) -> &SchemaRef {
&CUSTOMER_SCHEMA
}
}
impl Iterator for CustomerArrow {
type Item = RecordBatch;
fn next(&mut self) -> Option<Self::Item> {
let rows: Vec<_> = self.inner.by_ref().take(self.batch_size).collect();
if rows.is_empty() {
return None;
}
let c_custkey = Int64Array::from_iter_values(rows.iter().map(|r| r.c_custkey));
let c_name = string_view_array_from_display_iter(rows.iter().map(|r| r.c_name));
let c_address = string_view_array_from_display_iter(rows.iter().map(|r| &r.c_address));
let c_nationkey = Int64Array::from_iter_values(rows.iter().map(|r| r.c_nationkey));
let c_phone = string_view_array_from_display_iter(rows.iter().map(|r| &r.c_phone));
let c_acctbal = decimal128_array_from_iter(rows.iter().map(|r| r.c_acctbal));
let c_mktsegment = string_view_array_from_display_iter(rows.iter().map(|r| r.c_mktsegment));
let c_comment = string_view_array_from_display_iter(rows.iter().map(|r| r.c_comment));
let batch = RecordBatch::try_new(
Arc::clone(self.schema()),
vec![
Arc::new(c_custkey),
Arc::new(c_name),
Arc::new(c_address),
Arc::new(c_nationkey),
Arc::new(c_phone),
Arc::new(c_acctbal),
Arc::new(c_mktsegment),
Arc::new(c_comment),
],
)
.unwrap();
Some(batch)
}
}
static CUSTOMER_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(make_customer_schema);
fn make_customer_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("c_custkey", DataType::Int64, false),
Field::new("c_name", DataType::Utf8View, false),
Field::new("c_address", DataType::Utf8View, false),
Field::new("c_nationkey", DataType::Int64, false),
Field::new("c_phone", DataType::Utf8View, false),
Field::new("c_acctbal", DataType::Decimal128(15, 2), false),
Field::new("c_mktsegment", DataType::Utf8View, false),
Field::new("c_comment", DataType::Utf8View, false),
]))
}