tpchgen_arrow/partsupp.rs
1use crate::conversions::{decimal128_array_from_iter, string_view_array_from_display_iter};
2use crate::{DEFAULT_BATCH_SIZE, RecordBatchIterator};
3use arrow::array::{Int32Array, Int64Array, RecordBatch};
4use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
5use std::sync::{Arc, LazyLock};
6use tpchgen::generators::{PartSuppGenerator, PartSuppGeneratorIterator};
7
8/// Generate [`PartSupp`]s in [`RecordBatch`] format
9///
10/// [`PartSupp`]: tpchgen::generators::PartSupp
11///
12/// # Example
13/// ```
14/// # use tpchgen::generators::{PartSuppGenerator};
15/// # use tpchgen_arrow::PartSuppArrow;
16///
17/// // Create a SF=1.0 generator and wrap it in an Arrow generator
18/// let generator = PartSuppGenerator::new(1.0, 1, 1);
19/// let mut arrow_generator = PartSuppArrow::new(generator)
20/// .with_batch_size(10);
21/// // Read the first 10 batches
22/// let batch = arrow_generator.next().unwrap();
23/// // compare the output by pretty printing it
24/// let formatted_batches = arrow::util::pretty::pretty_format_batches(&[batch])
25/// .unwrap()
26/// .to_string();
27/// let lines = formatted_batches.lines().collect::<Vec<_>>();
28/// assert_eq!(lines, vec![
29/// "+------------+------------+-------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
30/// "| ps_partkey | ps_suppkey | ps_availqty | ps_supplycost | ps_comment |",
31/// "+------------+------------+-------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
32/// "| 1 | 2 | 3325 | 771.64 | , even theodolites. regular, final theodolites eat after the carefully pending foxes. furiously regular deposits sleep slyly. carefully bold realms above the ironic dependencies haggle careful |",
33/// "| 1 | 2502 | 8076 | 993.49 | ven ideas. quickly even packages print. pending multipliers must have to are fluff |",
34/// "| 1 | 5002 | 3956 | 337.09 | after the fluffily ironic deposits? blithely special dependencies integrate furiously even excuses. blithely silent theodolites could have to haggle pending, express requests; fu |",
35/// "| 1 | 7502 | 4069 | 357.84 | al, regular dependencies serve carefully after the quickly final pinto beans. furiously even deposits sleep quickly final, silent pinto beans. fluffily reg |",
36/// "| 2 | 3 | 8895 | 378.49 | nic accounts. final accounts sleep furiously about the ironic, bold packages. regular, regular accounts |",
37/// "| 2 | 2503 | 4969 | 915.27 | ptotes. quickly pending dependencies integrate furiously. fluffily ironic ideas impress blithely above the express accounts. furiously even epitaphs need to wak |",
38/// "| 2 | 5003 | 8539 | 438.37 | blithely bold ideas. furiously stealthy packages sleep fluffily. slyly special deposits snooze furiously carefully regular accounts. regular deposits according to the accounts nag carefully slyl |",
39/// "| 2 | 7503 | 3025 | 306.39 | olites. deposits wake carefully. even, express requests cajole. carefully regular ex |",
40/// "| 3 | 4 | 4651 | 920.92 | ilent foxes affix furiously quickly unusual requests. even packages across the carefully even theodolites nag above the sp |",
41/// "| 3 | 2504 | 4093 | 498.13 | ending dependencies haggle fluffily. regular deposits boost quickly carefully regular requests. deposits affix furiously around the pinto beans. ironic, unusual platelets across the p |",
42/// "+------------+------------+-------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+"
43/// ]);
44/// ```
45pub struct PartSuppArrow {
46 inner: PartSuppGeneratorIterator<'static>,
47 batch_size: usize,
48}
49
50impl PartSuppArrow {
51 pub fn new(generator: PartSuppGenerator<'static>) -> Self {
52 Self {
53 inner: generator.iter(),
54 batch_size: DEFAULT_BATCH_SIZE,
55 }
56 }
57
58 /// Set the batch size
59 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
60 self.batch_size = batch_size;
61 self
62 }
63}
64
65impl RecordBatchIterator for PartSuppArrow {
66 fn schema(&self) -> &SchemaRef {
67 &PARTSUPP_SCHEMA
68 }
69}
70
71impl Iterator for PartSuppArrow {
72 type Item = RecordBatch;
73
74 fn next(&mut self) -> Option<Self::Item> {
75 // Get next rows to convert
76 let rows: Vec<_> = self.inner.by_ref().take(self.batch_size).collect();
77 if rows.is_empty() {
78 return None;
79 }
80
81 let ps_partkey = Int64Array::from_iter_values(rows.iter().map(|r| r.ps_partkey));
82 let ps_suppkey = Int64Array::from_iter_values(rows.iter().map(|r| r.ps_suppkey));
83 let ps_availqty = Int32Array::from_iter_values(rows.iter().map(|r| r.ps_availqty));
84 let ps_supplycost = decimal128_array_from_iter(rows.iter().map(|r| r.ps_supplycost));
85 let ps_comment = string_view_array_from_display_iter(rows.iter().map(|r| r.ps_comment));
86
87 let batch = RecordBatch::try_new(
88 Arc::clone(self.schema()),
89 vec![
90 Arc::new(ps_partkey),
91 Arc::new(ps_suppkey),
92 Arc::new(ps_availqty),
93 Arc::new(ps_supplycost),
94 Arc::new(ps_comment),
95 ],
96 )
97 .unwrap();
98 Some(batch)
99 }
100}
101
102/// Schema for the PartSupp
103static PARTSUPP_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(make_partsupp_schema);
104fn make_partsupp_schema() -> SchemaRef {
105 Arc::new(Schema::new(vec![
106 Field::new("ps_partkey", DataType::Int64, false),
107 Field::new("ps_suppkey", DataType::Int64, false),
108 Field::new("ps_availqty", DataType::Int32, false),
109 Field::new("ps_supplycost", DataType::Decimal128(15, 2), false),
110 Field::new("ps_comment", DataType::Utf8View, false),
111 ]))
112}