tpchgen_arrow/
lineitem.rs

1use crate::conversions::{decimal128_array_from_iter, to_arrow_date32};
2use crate::{DEFAULT_BATCH_SIZE, RecordBatchIterator};
3use arrow::array::{
4    Date32Array, Decimal128Array, Int32Array, Int64Array, RecordBatch, StringViewArray,
5};
6use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
7use std::sync::{Arc, LazyLock};
8use tpchgen::generators::{LineItemGenerator, LineItemGeneratorIterator};
9
10/// Generate  [`LineItem`]s in [`RecordBatch`] format
11///
12/// [`LineItem`]: tpchgen::generators::LineItem
13///
14/// # Example
15/// ```
16/// # use tpchgen::generators::LineItemGenerator;
17/// # use tpchgen_arrow::LineItemArrow;
18///
19/// // Create a SF=1.0 generator and wrap it in an Arrow generator
20/// let generator = LineItemGenerator::new(1.0, 1, 1);
21/// let mut arrow_generator = LineItemArrow::new(generator)
22///   .with_batch_size(10);
23/// // Read the first 10 batches
24/// let batch = arrow_generator.next().unwrap();
25/// // compare the output by pretty printing it
26/// let formatted_batches = arrow::util::pretty::pretty_format_batches(&[batch])
27///   .unwrap()
28///   .to_string();
29/// let lines = formatted_batches.lines().collect::<Vec<_>>();
30/// assert_eq!(lines, vec![
31///   "+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+-------------------+------------+-------------------------------------+",
32///   "| l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct    | l_shipmode | l_comment                           |",
33///   "+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+-------------------+------------+-------------------------------------+",
34///   "| 1          | 155190    | 7706      | 1            | 17.00      | 21168.23        | 0.04       | 0.02  | N            | O            | 1996-03-13 | 1996-02-12   | 1996-03-22    | DELIVER IN PERSON | TRUCK      | egular courts above the             |",
35///   "| 1          | 67310     | 7311      | 2            | 36.00      | 45983.16        | 0.09       | 0.06  | N            | O            | 1996-04-12 | 1996-02-28   | 1996-04-20    | TAKE BACK RETURN  | MAIL       | ly final dependencies: slyly bold   |",
36///   "| 1          | 63700     | 3701      | 3            | 8.00       | 13309.60        | 0.10       | 0.02  | N            | O            | 1996-01-29 | 1996-03-05   | 1996-01-31    | TAKE BACK RETURN  | REG AIR    | riously. regular, express dep       |",
37///   "| 1          | 2132      | 4633      | 4            | 28.00      | 28955.64        | 0.09       | 0.06  | N            | O            | 1996-04-21 | 1996-03-30   | 1996-05-16    | NONE              | AIR        | lites. fluffily even de             |",
38///   "| 1          | 24027     | 1534      | 5            | 24.00      | 22824.48        | 0.10       | 0.04  | N            | O            | 1996-03-30 | 1996-03-14   | 1996-04-01    | NONE              | FOB        |  pending foxes. slyly re            |",
39///   "| 1          | 15635     | 638       | 6            | 32.00      | 49620.16        | 0.07       | 0.02  | N            | O            | 1996-01-30 | 1996-02-07   | 1996-02-03    | DELIVER IN PERSON | MAIL       | arefully slyly ex                   |",
40///   "| 2          | 106170    | 1191      | 1            | 38.00      | 44694.46        | 0.00       | 0.05  | N            | O            | 1997-01-28 | 1997-01-14   | 1997-02-02    | TAKE BACK RETURN  | RAIL       | ven requests. deposits breach a     |",
41///   "| 3          | 4297      | 1798      | 1            | 45.00      | 54058.05        | 0.06       | 0.00  | R            | F            | 1994-02-02 | 1994-01-04   | 1994-02-23    | NONE              | AIR        | ongside of the furiously brave acco |",
42///   "| 3          | 19036     | 6540      | 2            | 49.00      | 46796.47        | 0.10       | 0.00  | R            | F            | 1993-11-09 | 1993-12-20   | 1993-11-24    | TAKE BACK RETURN  | RAIL       |  unusual accounts. eve              |",
43///   "| 3          | 128449    | 3474      | 3            | 27.00      | 39890.88        | 0.06       | 0.07  | A            | F            | 1994-01-16 | 1993-11-22   | 1994-01-23    | DELIVER IN PERSON | SHIP       | nal foxes wake.                     |",
44///   "+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+-------------------+------------+-------------------------------------+"
45/// ]);
46/// ```
47// # TODOs:
48// 1. create individual column iterators to avoid a copy into rows
49// 2. Maybe Recycle buffers (don't reallocate new ones all the time) :thinking:
50// Based off code / types from DataFusion
51// https://github.com/apache/datafusion/blob/a1ae15826245097e7c12d4f0ed3425b25af6c431/benchmarks/src/tpch/mod.rs#L104-L103
52pub struct LineItemArrow {
53    inner: LineItemGeneratorIterator<'static>,
54    batch_size: usize,
55}
56
57impl LineItemArrow {
58    pub fn new(generator: LineItemGenerator<'static>) -> Self {
59        Self {
60            inner: generator.iter(),
61            batch_size: DEFAULT_BATCH_SIZE,
62        }
63    }
64
65    /// Set the batch size
66    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
67        self.batch_size = batch_size;
68        self
69    }
70}
71
72impl RecordBatchIterator for LineItemArrow {
73    fn schema(&self) -> &SchemaRef {
74        &LINEITEM_SCHEMA
75    }
76}
77
78impl Iterator for LineItemArrow {
79    type Item = RecordBatch;
80
81    /// Generate the next batch of data, if there is one
82    fn next(&mut self) -> Option<Self::Item> {
83        // Get next rows to convert
84        let rows: Vec<_> = self.inner.by_ref().take(self.batch_size).collect();
85        if rows.is_empty() {
86            return None;
87        }
88
89        // Convert column by column
90        let l_orderkey = Int64Array::from_iter_values(rows.iter().map(|row| row.l_orderkey));
91        let l_partkey = Int64Array::from_iter_values(rows.iter().map(|row| row.l_partkey));
92        let l_suppkey = Int64Array::from_iter_values(rows.iter().map(|row| row.l_suppkey));
93        let l_linenumber = Int32Array::from_iter_values(rows.iter().map(|row| row.l_linenumber));
94        let l_quantity = Decimal128Array::from_iter_values(rows.iter().map(|row| {
95            // Convert the i64 to Arrow Decimal(15,2)
96            // TODO it is supposed to be decimal in the spec
97            (row.l_quantity as i128) * 100
98        }))
99        .with_precision_and_scale(15, 2)
100        .unwrap();
101        let l_extended_price =
102            decimal128_array_from_iter(rows.iter().map(|row| row.l_extendedprice));
103        let l_discount = decimal128_array_from_iter(rows.iter().map(|row| row.l_discount));
104        let l_tax = decimal128_array_from_iter(rows.iter().map(|row| row.l_tax));
105        let l_returnflag =
106            StringViewArray::from_iter_values(rows.iter().map(|row| row.l_returnflag));
107        let l_linestatus =
108            StringViewArray::from_iter_values(rows.iter().map(|row| row.l_linestatus));
109        let l_shipdate = Date32Array::from_iter_values(
110            rows.iter().map(|row| row.l_shipdate).map(to_arrow_date32),
111        );
112        let l_commitdate = Date32Array::from_iter_values(
113            rows.iter().map(|row| row.l_commitdate).map(to_arrow_date32),
114        );
115        let l_receiptdate = Date32Array::from_iter_values(
116            rows.iter()
117                .map(|row| row.l_receiptdate)
118                .map(to_arrow_date32),
119        );
120        let l_shipinstruct =
121            StringViewArray::from_iter_values(rows.iter().map(|row| row.l_shipinstruct));
122        let l_shipmode = StringViewArray::from_iter_values(rows.iter().map(|row| row.l_shipmode));
123        let l_comment = StringViewArray::from_iter_values(rows.iter().map(|row| row.l_comment));
124
125        let batch = RecordBatch::try_new(
126            Arc::clone(self.schema()),
127            vec![
128                Arc::new(l_orderkey),
129                Arc::new(l_partkey),
130                Arc::new(l_suppkey),
131                Arc::new(l_linenumber),
132                Arc::new(l_quantity),
133                Arc::new(l_extended_price),
134                Arc::new(l_discount),
135                Arc::new(l_tax),
136                Arc::new(l_returnflag),
137                Arc::new(l_linestatus),
138                Arc::new(l_shipdate),
139                Arc::new(l_commitdate),
140                Arc::new(l_receiptdate),
141                Arc::new(l_shipinstruct),
142                Arc::new(l_shipmode),
143                Arc::new(l_comment),
144            ],
145        )
146        .unwrap();
147
148        Some(batch)
149    }
150}
151
152/// Schema for the LineItem table
153static LINEITEM_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(make_lineitem_schema);
154
155fn make_lineitem_schema() -> SchemaRef {
156    Arc::new(Schema::new(vec![
157        Field::new("l_orderkey", DataType::Int64, false),
158        Field::new("l_partkey", DataType::Int64, false),
159        Field::new("l_suppkey", DataType::Int64, false),
160        Field::new("l_linenumber", DataType::Int32, false),
161        Field::new("l_quantity", DataType::Decimal128(15, 2), false),
162        Field::new("l_extendedprice", DataType::Decimal128(15, 2), false),
163        Field::new("l_discount", DataType::Decimal128(15, 2), false),
164        Field::new("l_tax", DataType::Decimal128(15, 2), false),
165        Field::new("l_returnflag", DataType::Utf8View, false),
166        Field::new("l_linestatus", DataType::Utf8View, false),
167        Field::new("l_shipdate", DataType::Date32, false),
168        Field::new("l_commitdate", DataType::Date32, false),
169        Field::new("l_receiptdate", DataType::Date32, false),
170        Field::new("l_shipinstruct", DataType::Utf8View, false),
171        Field::new("l_shipmode", DataType::Utf8View, false),
172        Field::new("l_comment", DataType::Utf8View, false),
173    ]))
174}