tpchgen_arrow/order.rs
1use crate::conversions::{
2 decimal128_array_from_iter, string_view_array_from_display_iter, to_arrow_date32,
3};
4use crate::{DEFAULT_BATCH_SIZE, RecordBatchIterator};
5use arrow::array::{Date32Array, Int32Array, Int64Array, RecordBatch, StringViewArray};
6use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
7use std::sync::{Arc, LazyLock};
8use tpchgen::generators::{OrderGenerator, OrderGeneratorIterator};
9
10/// Generate [`Order`]s in [`RecordBatch`] format
11///
12/// [`Order`]: tpchgen::generators::Order
13///
14/// # Example
15/// ```
16/// # use tpchgen::generators::{OrderGenerator};
17/// # use tpchgen_arrow::OrderArrow;
18///
19/// // Create a SF=1.0 generator and wrap it in an Arrow generator
20/// let generator = OrderGenerator::new(1.0, 1, 1);
21/// let mut arrow_generator = OrderArrow::new(generator)
22/// .with_batch_size(10);
23/// // Read the first 10 batches
24/// let batch = arrow_generator.next().unwrap();
25/// // compare the output by pretty printing it
26/// let formatted_batches = arrow::util::pretty::pretty_format_batches(&[batch])
27/// .unwrap()
28/// .to_string();
29/// let lines = formatted_batches.lines().collect::<Vec<_>>();
30/// assert_eq!(lines, vec![
31/// "+------------+-----------+---------------+--------------+-------------+-----------------+-----------------+----------------+---------------------------------------------------------------------------+",
32/// "| o_orderkey | o_custkey | o_orderstatus | o_totalprice | o_orderdate | o_orderpriority | o_clerk | o_shippriority | o_comment |",
33/// "+------------+-----------+---------------+--------------+-------------+-----------------+-----------------+----------------+---------------------------------------------------------------------------+",
34/// "| 1 | 36901 | O | 173665.47 | 1996-01-02 | 5-LOW | Clerk#000000951 | 0 | nstructions sleep furiously among |",
35/// "| 2 | 78002 | O | 46929.18 | 1996-12-01 | 1-URGENT | Clerk#000000880 | 0 | foxes. pending accounts at the pending, silent asymptot |",
36/// "| 3 | 123314 | F | 193846.25 | 1993-10-14 | 5-LOW | Clerk#000000955 | 0 | sly final accounts boost. carefully regular ideas cajole carefully. depos |",
37/// "| 4 | 136777 | O | 32151.78 | 1995-10-11 | 5-LOW | Clerk#000000124 | 0 | sits. slyly regular warthogs cajole. regular, regular theodolites acro |",
38/// "| 5 | 44485 | F | 144659.20 | 1994-07-30 | 5-LOW | Clerk#000000925 | 0 | quickly. bold deposits sleep slyly. packages use slyly |",
39/// "| 6 | 55624 | F | 58749.59 | 1992-02-21 | 4-NOT SPECIFIED | Clerk#000000058 | 0 | ggle. special, final requests are against the furiously specia |",
40/// "| 7 | 39136 | O | 252004.18 | 1996-01-10 | 2-HIGH | Clerk#000000470 | 0 | ly special requests |",
41/// "| 32 | 130057 | O | 208660.75 | 1995-07-16 | 2-HIGH | Clerk#000000616 | 0 | ise blithely bold, regular requests. quickly unusual dep |",
42/// "| 33 | 66958 | F | 163243.98 | 1993-10-27 | 3-MEDIUM | Clerk#000000409 | 0 | uriously. furiously final request |",
43/// "| 34 | 61001 | O | 58949.67 | 1998-07-21 | 3-MEDIUM | Clerk#000000223 | 0 | ly final packages. fluffily final deposits wake blithely ideas. spe |",
44/// "+------------+-----------+---------------+--------------+-------------+-----------------+-----------------+----------------+---------------------------------------------------------------------------+"
45/// ]);
46/// ```
47pub struct OrderArrow {
48 inner: OrderGeneratorIterator<'static>,
49 batch_size: usize,
50}
51
52impl OrderArrow {
53 pub fn new(generator: OrderGenerator<'static>) -> Self {
54 Self {
55 inner: generator.iter(),
56 batch_size: DEFAULT_BATCH_SIZE,
57 }
58 }
59
60 /// Set the batch size
61 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
62 self.batch_size = batch_size;
63 self
64 }
65}
66
67impl RecordBatchIterator for OrderArrow {
68 fn schema(&self) -> &SchemaRef {
69 &ORDER_SCHEMA
70 }
71}
72
73impl Iterator for OrderArrow {
74 type Item = RecordBatch;
75
76 fn next(&mut self) -> Option<Self::Item> {
77 // Get next rows to convert
78 let rows: Vec<_> = self.inner.by_ref().take(self.batch_size).collect();
79 if rows.is_empty() {
80 return None;
81 }
82
83 let o_orderkey = Int64Array::from_iter_values(rows.iter().map(|r| r.o_orderkey));
84 let o_custkey = Int64Array::from_iter_values(rows.iter().map(|r| r.o_custkey));
85 let o_orderstatus =
86 string_view_array_from_display_iter(rows.iter().map(|r| r.o_orderstatus));
87 let o_totalprice = decimal128_array_from_iter(rows.iter().map(|r| r.o_totalprice));
88 let o_orderdate =
89 Date32Array::from_iter_values(rows.iter().map(|r| r.o_orderdate).map(to_arrow_date32));
90 let o_orderpriority =
91 StringViewArray::from_iter_values(rows.iter().map(|r| r.o_orderpriority));
92 let o_clerk = string_view_array_from_display_iter(rows.iter().map(|r| r.o_clerk));
93 let o_shippriority = Int32Array::from_iter_values(rows.iter().map(|r| r.o_shippriority));
94 let o_comment = StringViewArray::from_iter_values(rows.iter().map(|r| r.o_comment));
95
96 let batch = RecordBatch::try_new(
97 Arc::clone(self.schema()),
98 vec![
99 Arc::new(o_orderkey),
100 Arc::new(o_custkey),
101 Arc::new(o_orderstatus),
102 Arc::new(o_totalprice),
103 Arc::new(o_orderdate),
104 Arc::new(o_orderpriority),
105 Arc::new(o_clerk),
106 Arc::new(o_shippriority),
107 Arc::new(o_comment),
108 ],
109 )
110 .unwrap();
111 Some(batch)
112 }
113}
114
115/// Schema for the Order
116static ORDER_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(make_order_schema);
117fn make_order_schema() -> SchemaRef {
118 Arc::new(Schema::new(vec![
119 Field::new("o_orderkey", DataType::Int64, false),
120 Field::new("o_custkey", DataType::Int64, false),
121 Field::new("o_orderstatus", DataType::Utf8View, false),
122 Field::new("o_totalprice", DataType::Decimal128(15, 2), false),
123 Field::new("o_orderdate", DataType::Date32, false),
124 Field::new("o_orderpriority", DataType::Utf8View, false),
125 Field::new("o_clerk", DataType::Utf8View, false),
126 Field::new("o_shippriority", DataType::Int32, false),
127 Field::new("o_comment", DataType::Utf8View, false),
128 ]))
129}