use arrow_array::{
Array, RecordBatch,
builder::{
FixedSizeListBuilder, Float64Builder, Int64Builder, StringBuilder,
TimestampNanosecondBuilder, UInt8Builder,
},
};
use chrono::{DateTime, Utc};
use narrow::{
ArrayType,
array::StructArray,
arrow_schema::{DataType, Field, TimeUnit},
};
use rand::{Rng, SeedableRng, prelude::SmallRng};
use std::sync::Arc;
#[derive(ArrayType, Clone, Debug)]
struct LineItem {
l_orderkey: i64,
l_partkey: i64,
l_suppkey: i64,
l_linenumber: i64,
l_quantity: f64,
l_extendedprice: f64,
l_discount: f64,
l_tax: f64,
l_returnflag: u8,
l_linestatus: u8,
l_shipdate: DateTime<Utc>,
l_commitdate: DateTime<Utc>,
l_receiptdate: DateTime<Utc>,
l_shipinstruct: [u8; 25],
l_shipmode: [u8; 10],
l_comment: String,
}
fn make_recordbatch_narrow(rows: impl Iterator<Item = LineItem>) -> RecordBatch {
rows.into_iter().collect::<StructArray<LineItem>>().into()
}
struct LineItemBuilder {
l_orderkey: Int64Builder,
l_partkey: Int64Builder,
l_suppkey: Int64Builder,
l_linenumber: Int64Builder,
l_quantity: Float64Builder,
l_extendedprice: Float64Builder,
l_discount: Float64Builder,
l_tax: Float64Builder,
l_returnflag: UInt8Builder,
l_linestatus: UInt8Builder,
l_shipdate: TimestampNanosecondBuilder,
l_commitdate: TimestampNanosecondBuilder,
l_receiptdate: TimestampNanosecondBuilder,
l_shipinstruct: FixedSizeListBuilder<UInt8Builder>,
l_shipmode: FixedSizeListBuilder<UInt8Builder>,
l_comment: StringBuilder,
}
impl Default for LineItemBuilder {
fn default() -> Self {
Self {
l_orderkey: Default::default(),
l_partkey: Default::default(),
l_suppkey: Default::default(),
l_linenumber: Default::default(),
l_quantity: Default::default(),
l_extendedprice: Default::default(),
l_discount: Default::default(),
l_tax: Default::default(),
l_returnflag: Default::default(),
l_linestatus: Default::default(),
l_shipdate: Default::default(),
l_commitdate: Default::default(),
l_receiptdate: Default::default(),
l_shipinstruct: FixedSizeListBuilder::new(Default::default(), 25),
l_shipmode: FixedSizeListBuilder::new(Default::default(), 10),
l_comment: Default::default(),
}
}
}
impl LineItemBuilder {
fn append(&mut self, row: LineItem) {
self.l_orderkey.append_value(row.l_orderkey);
self.l_partkey.append_value(row.l_partkey);
self.l_suppkey.append_value(row.l_suppkey);
self.l_linenumber.append_value(row.l_linenumber);
self.l_quantity.append_value(row.l_quantity);
self.l_extendedprice.append_value(row.l_extendedprice);
self.l_discount.append_value(row.l_discount);
self.l_tax.append_value(row.l_tax);
self.l_returnflag.append_value(row.l_returnflag);
self.l_linestatus.append_value(row.l_linestatus);
self.l_shipdate
.append_option(row.l_shipdate.timestamp_nanos_opt());
self.l_commitdate
.append_option(row.l_commitdate.timestamp_nanos_opt());
self.l_receiptdate
.append_option(row.l_receiptdate.timestamp_nanos_opt());
self.l_shipinstruct
.values()
.append_values(&row.l_shipinstruct, &[true; 25]);
self.l_shipinstruct.append(true);
self.l_shipmode
.values()
.append_values(&row.l_shipmode, &[true; 10]);
self.l_shipmode.append(true);
self.l_comment.append_value(row.l_comment);
}
fn finish(mut self) -> RecordBatch {
let utc: Arc<str> = Arc::from("UTC");
let schema = arrow_schema::Schema::new(vec![
Field::new("l_orderkey", DataType::Int64, true),
Field::new("l_partkey", DataType::Int64, true),
Field::new("l_suppkey", DataType::Int64, true),
Field::new("l_linenumber", DataType::Int64, true),
Field::new("l_quantity", DataType::Float64, true),
Field::new("l_extendedprice", DataType::Float64, true),
Field::new("l_discount", DataType::Float64, true),
Field::new("l_tax", DataType::Float64, true),
Field::new("l_returnflag", DataType::UInt8, true),
Field::new("l_linestatus", DataType::UInt8, true),
Field::new(
"l_shipdate",
DataType::Timestamp(TimeUnit::Nanosecond, Some(utc.clone())),
true,
),
Field::new(
"l_commitdate",
DataType::Timestamp(TimeUnit::Nanosecond, Some(utc.clone())),
true,
),
Field::new(
"l_receiptdate",
DataType::Timestamp(TimeUnit::Nanosecond, Some(utc.clone())),
true,
),
Field::new(
"l_shipinstruct",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::UInt8, true)), 25),
true,
),
Field::new(
"l_shipmode",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::UInt8, true)), 10),
true,
),
Field::new("l_comment", DataType::Utf8, true),
]);
let columns: Vec<Arc<dyn Array>> = vec![
Arc::new(self.l_orderkey.finish()),
Arc::new(self.l_partkey.finish()),
Arc::new(self.l_suppkey.finish()),
Arc::new(self.l_linenumber.finish()),
Arc::new(self.l_quantity.finish()),
Arc::new(self.l_extendedprice.finish()),
Arc::new(self.l_discount.finish()),
Arc::new(self.l_tax.finish()),
Arc::new(self.l_returnflag.finish()),
Arc::new(self.l_linestatus.finish()),
Arc::new(self.l_shipdate.with_timezone(utc.clone()).finish()),
Arc::new(self.l_commitdate.with_timezone(utc.clone()).finish()),
Arc::new(self.l_receiptdate.with_timezone(utc.clone()).finish()),
Arc::new(self.l_shipinstruct.finish()),
Arc::new(self.l_shipmode.finish()),
Arc::new(self.l_comment.finish()),
];
RecordBatch::try_new(Arc::new(schema), columns).unwrap()
}
}
fn make_recordbatch_arrow(rows: impl Iterator<Item = LineItem>) -> RecordBatch {
let mut builder = LineItemBuilder::default();
rows.for_each(|row| builder.append(row));
builder.finish()
}
fn make_native_row_oriented(size: usize) -> Vec<LineItem> {
let mut rng = SmallRng::seed_from_u64(0);
(0..size)
.map(|_| LineItem {
l_orderkey: rng.random_range(0..i64::MAX),
l_partkey: rng.random_range(0..i64::MAX),
l_suppkey: rng.random_range(0..i64::MAX),
l_linenumber: rng.random_range(0..i64::MAX),
l_quantity: rng.random_range(0f64..42f64),
l_extendedprice: rng.random_range(0f64..1337f64),
l_discount: rng.random_range(0f64..0.1),
l_tax: rng.random_range(0f64..0.3),
l_returnflag: rng.random_range(0..u8::MAX),
l_linestatus: rng.random_range(0..u8::MAX),
l_shipdate: DateTime::from_timestamp_nanos(rng.random_range(0..i64::MAX)),
l_commitdate: DateTime::from_timestamp_nanos(rng.random_range(0..i64::MAX)),
l_receiptdate: DateTime::from_timestamp_nanos(rng.random_range(0..i64::MAX)),
l_shipinstruct: [rng.random_range(0..u8::MAX); 25],
l_shipmode: [rng.random_range(0..u8::MAX); 10],
l_comment: String::from_iter(
(0..rng.random_range(0..44)).map(|_| rng.random_range('a'..='z')),
),
})
.collect()
}
const NUM_ROWS: usize = 1 << 20;
#[rustversion::attr(nightly, allow(non_local_definitions))]
fn main() {
let input = make_native_row_oriented(NUM_ROWS);
let narrow = make_recordbatch_narrow(input.clone().into_iter());
let arrow = make_recordbatch_arrow(input.into_iter());
assert_eq!(narrow.num_rows(), arrow.num_rows());
assert_eq!(narrow.num_columns(), arrow.num_columns());
}