tpchgen_arrow/lineitem.rs
1use crate::conversions::{decimal128_array_from_iter, to_arrow_date32};
2use crate::{DEFAULT_BATCH_SIZE, RecordBatchIterator};
3use arrow::array::{
4 Date32Array, Decimal128Array, Int32Array, Int64Array, RecordBatch, StringViewArray,
5};
6use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
7use std::sync::{Arc, LazyLock};
8use tpchgen::generators::{LineItemGenerator, LineItemGeneratorIterator};
9
10/// Generate [`LineItem`]s in [`RecordBatch`] format
11///
12/// [`LineItem`]: tpchgen::generators::LineItem
13///
14/// # Example
15/// ```
16/// # use tpchgen::generators::LineItemGenerator;
17/// # use tpchgen_arrow::LineItemArrow;
18///
19/// // Create a SF=1.0 generator and wrap it in an Arrow generator
20/// let generator = LineItemGenerator::new(1.0, 1, 1);
21/// let mut arrow_generator = LineItemArrow::new(generator)
22/// .with_batch_size(10);
23/// // Read the first 10 batches
24/// let batch = arrow_generator.next().unwrap();
25/// // compare the output by pretty printing it
26/// let formatted_batches = arrow::util::pretty::pretty_format_batches(&[batch])
27/// .unwrap()
28/// .to_string();
29/// let lines = formatted_batches.lines().collect::<Vec<_>>();
30/// assert_eq!(lines, vec![
31/// "+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+-------------------+------------+-------------------------------------+",
32/// "| l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment |",
33/// "+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+-------------------+------------+-------------------------------------+",
34/// "| 1 | 155190 | 7706 | 1 | 17.00 | 21168.23 | 0.04 | 0.02 | N | O | 1996-03-13 | 1996-02-12 | 1996-03-22 | DELIVER IN PERSON | TRUCK | egular courts above the |",
35/// "| 1 | 67310 | 7311 | 2 | 36.00 | 45983.16 | 0.09 | 0.06 | N | O | 1996-04-12 | 1996-02-28 | 1996-04-20 | TAKE BACK RETURN | MAIL | ly final dependencies: slyly bold |",
36/// "| 1 | 63700 | 3701 | 3 | 8.00 | 13309.60 | 0.10 | 0.02 | N | O | 1996-01-29 | 1996-03-05 | 1996-01-31 | TAKE BACK RETURN | REG AIR | riously. regular, express dep |",
37/// "| 1 | 2132 | 4633 | 4 | 28.00 | 28955.64 | 0.09 | 0.06 | N | O | 1996-04-21 | 1996-03-30 | 1996-05-16 | NONE | AIR | lites. fluffily even de |",
38/// "| 1 | 24027 | 1534 | 5 | 24.00 | 22824.48 | 0.10 | 0.04 | N | O | 1996-03-30 | 1996-03-14 | 1996-04-01 | NONE | FOB | pending foxes. slyly re |",
39/// "| 1 | 15635 | 638 | 6 | 32.00 | 49620.16 | 0.07 | 0.02 | N | O | 1996-01-30 | 1996-02-07 | 1996-02-03 | DELIVER IN PERSON | MAIL | arefully slyly ex |",
40/// "| 2 | 106170 | 1191 | 1 | 38.00 | 44694.46 | 0.00 | 0.05 | N | O | 1997-01-28 | 1997-01-14 | 1997-02-02 | TAKE BACK RETURN | RAIL | ven requests. deposits breach a |",
41/// "| 3 | 4297 | 1798 | 1 | 45.00 | 54058.05 | 0.06 | 0.00 | R | F | 1994-02-02 | 1994-01-04 | 1994-02-23 | NONE | AIR | ongside of the furiously brave acco |",
42/// "| 3 | 19036 | 6540 | 2 | 49.00 | 46796.47 | 0.10 | 0.00 | R | F | 1993-11-09 | 1993-12-20 | 1993-11-24 | TAKE BACK RETURN | RAIL | unusual accounts. eve |",
43/// "| 3 | 128449 | 3474 | 3 | 27.00 | 39890.88 | 0.06 | 0.07 | A | F | 1994-01-16 | 1993-11-22 | 1994-01-23 | DELIVER IN PERSON | SHIP | nal foxes wake. |",
44/// "+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+-------------------+------------+-------------------------------------+"
45/// ]);
46/// ```
47// # TODOs:
48// 1. create individual column iterators to avoid a copy into rows
49// 2. Maybe Recycle buffers (don't reallocate new ones all the time) :thinking:
50// Based off code / types from DataFusion
51// https://github.com/apache/datafusion/blob/a1ae15826245097e7c12d4f0ed3425b25af6c431/benchmarks/src/tpch/mod.rs#L104-L103
52pub struct LineItemArrow {
53 inner: LineItemGeneratorIterator<'static>,
54 batch_size: usize,
55}
56
57impl LineItemArrow {
58 pub fn new(generator: LineItemGenerator<'static>) -> Self {
59 Self {
60 inner: generator.iter(),
61 batch_size: DEFAULT_BATCH_SIZE,
62 }
63 }
64
65 /// Set the batch size
66 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
67 self.batch_size = batch_size;
68 self
69 }
70}
71
72impl RecordBatchIterator for LineItemArrow {
73 fn schema(&self) -> &SchemaRef {
74 &LINEITEM_SCHEMA
75 }
76}
77
78impl Iterator for LineItemArrow {
79 type Item = RecordBatch;
80
81 /// Generate the next batch of data, if there is one
82 fn next(&mut self) -> Option<Self::Item> {
83 // Get next rows to convert
84 let rows: Vec<_> = self.inner.by_ref().take(self.batch_size).collect();
85 if rows.is_empty() {
86 return None;
87 }
88
89 // Convert column by column
90 let l_orderkey = Int64Array::from_iter_values(rows.iter().map(|row| row.l_orderkey));
91 let l_partkey = Int64Array::from_iter_values(rows.iter().map(|row| row.l_partkey));
92 let l_suppkey = Int64Array::from_iter_values(rows.iter().map(|row| row.l_suppkey));
93 let l_linenumber = Int32Array::from_iter_values(rows.iter().map(|row| row.l_linenumber));
94 let l_quantity = Decimal128Array::from_iter_values(rows.iter().map(|row| {
95 // Convert the i64 to Arrow Decimal(15,2)
96 // TODO it is supposed to be decimal in the spec
97 (row.l_quantity as i128) * 100
98 }))
99 .with_precision_and_scale(15, 2)
100 .unwrap();
101 let l_extended_price =
102 decimal128_array_from_iter(rows.iter().map(|row| row.l_extendedprice));
103 let l_discount = decimal128_array_from_iter(rows.iter().map(|row| row.l_discount));
104 let l_tax = decimal128_array_from_iter(rows.iter().map(|row| row.l_tax));
105 let l_returnflag =
106 StringViewArray::from_iter_values(rows.iter().map(|row| row.l_returnflag));
107 let l_linestatus =
108 StringViewArray::from_iter_values(rows.iter().map(|row| row.l_linestatus));
109 let l_shipdate = Date32Array::from_iter_values(
110 rows.iter().map(|row| row.l_shipdate).map(to_arrow_date32),
111 );
112 let l_commitdate = Date32Array::from_iter_values(
113 rows.iter().map(|row| row.l_commitdate).map(to_arrow_date32),
114 );
115 let l_receiptdate = Date32Array::from_iter_values(
116 rows.iter()
117 .map(|row| row.l_receiptdate)
118 .map(to_arrow_date32),
119 );
120 let l_shipinstruct =
121 StringViewArray::from_iter_values(rows.iter().map(|row| row.l_shipinstruct));
122 let l_shipmode = StringViewArray::from_iter_values(rows.iter().map(|row| row.l_shipmode));
123 let l_comment = StringViewArray::from_iter_values(rows.iter().map(|row| row.l_comment));
124
125 let batch = RecordBatch::try_new(
126 Arc::clone(self.schema()),
127 vec![
128 Arc::new(l_orderkey),
129 Arc::new(l_partkey),
130 Arc::new(l_suppkey),
131 Arc::new(l_linenumber),
132 Arc::new(l_quantity),
133 Arc::new(l_extended_price),
134 Arc::new(l_discount),
135 Arc::new(l_tax),
136 Arc::new(l_returnflag),
137 Arc::new(l_linestatus),
138 Arc::new(l_shipdate),
139 Arc::new(l_commitdate),
140 Arc::new(l_receiptdate),
141 Arc::new(l_shipinstruct),
142 Arc::new(l_shipmode),
143 Arc::new(l_comment),
144 ],
145 )
146 .unwrap();
147
148 Some(batch)
149 }
150}
151
152/// Schema for the LineItem table
153static LINEITEM_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(make_lineitem_schema);
154
155fn make_lineitem_schema() -> SchemaRef {
156 Arc::new(Schema::new(vec![
157 Field::new("l_orderkey", DataType::Int64, false),
158 Field::new("l_partkey", DataType::Int64, false),
159 Field::new("l_suppkey", DataType::Int64, false),
160 Field::new("l_linenumber", DataType::Int32, false),
161 Field::new("l_quantity", DataType::Decimal128(15, 2), false),
162 Field::new("l_extendedprice", DataType::Decimal128(15, 2), false),
163 Field::new("l_discount", DataType::Decimal128(15, 2), false),
164 Field::new("l_tax", DataType::Decimal128(15, 2), false),
165 Field::new("l_returnflag", DataType::Utf8View, false),
166 Field::new("l_linestatus", DataType::Utf8View, false),
167 Field::new("l_shipdate", DataType::Date32, false),
168 Field::new("l_commitdate", DataType::Date32, false),
169 Field::new("l_receiptdate", DataType::Date32, false),
170 Field::new("l_shipinstruct", DataType::Utf8View, false),
171 Field::new("l_shipmode", DataType::Utf8View, false),
172 Field::new("l_comment", DataType::Utf8View, false),
173 ]))
174}