tpchgen_arrow/
partsupp.rs

1use crate::conversions::{decimal128_array_from_iter, string_view_array_from_display_iter};
2use crate::{DEFAULT_BATCH_SIZE, RecordBatchIterator};
3use arrow::array::{Int32Array, Int64Array, RecordBatch};
4use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
5use std::sync::{Arc, LazyLock};
6use tpchgen::generators::{PartSuppGenerator, PartSuppGeneratorIterator};
7
8/// Generate [`PartSupp`]s in [`RecordBatch`] format
9///
10/// [`PartSupp`]: tpchgen::generators::PartSupp
11///
12/// # Example
13/// ```
14/// # use tpchgen::generators::{PartSuppGenerator};
15/// # use tpchgen_arrow::PartSuppArrow;
16///
17/// // Create a SF=1.0 generator and wrap it in an Arrow generator
18/// let generator = PartSuppGenerator::new(1.0, 1, 1);
19/// let mut arrow_generator = PartSuppArrow::new(generator)
20///   .with_batch_size(10);
21/// // Read the first 10 batches
22/// let batch = arrow_generator.next().unwrap();
23/// // compare the output by pretty printing it
24/// let formatted_batches = arrow::util::pretty::pretty_format_batches(&[batch])
25///   .unwrap()
26///   .to_string();
27/// let lines = formatted_batches.lines().collect::<Vec<_>>();
28/// assert_eq!(lines, vec![
29///   "+------------+------------+-------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
30///   "| ps_partkey | ps_suppkey | ps_availqty | ps_supplycost | ps_comment                                                                                                                                                                                         |",
31///   "+------------+------------+-------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
32///   "| 1          | 2          | 3325        | 771.64        | , even theodolites. regular, final theodolites eat after the carefully pending foxes. furiously regular deposits sleep slyly. carefully bold realms above the ironic dependencies haggle careful   |",
33///   "| 1          | 2502       | 8076        | 993.49        | ven ideas. quickly even packages print. pending multipliers must have to are fluff                                                                                                                 |",
34///   "| 1          | 5002       | 3956        | 337.09        | after the fluffily ironic deposits? blithely special dependencies integrate furiously even excuses. blithely silent theodolites could have to haggle pending, express requests; fu                 |",
35///   "| 1          | 7502       | 4069        | 357.84        | al, regular dependencies serve carefully after the quickly final pinto beans. furiously even deposits sleep quickly final, silent pinto beans. fluffily reg                                        |",
36///   "| 2          | 3          | 8895        | 378.49        | nic accounts. final accounts sleep furiously about the ironic, bold packages. regular, regular accounts                                                                                            |",
37///   "| 2          | 2503       | 4969        | 915.27        | ptotes. quickly pending dependencies integrate furiously. fluffily ironic ideas impress blithely above the express accounts. furiously even epitaphs need to wak                                   |",
38///   "| 2          | 5003       | 8539        | 438.37        | blithely bold ideas. furiously stealthy packages sleep fluffily. slyly special deposits snooze furiously carefully regular accounts. regular deposits according to the accounts nag carefully slyl |",
39///   "| 2          | 7503       | 3025        | 306.39        | olites. deposits wake carefully. even, express requests cajole. carefully regular ex                                                                                                               |",
40///   "| 3          | 4          | 4651        | 920.92        | ilent foxes affix furiously quickly unusual requests. even packages across the carefully even theodolites nag above the sp                                                                         |",
41///   "| 3          | 2504       | 4093        | 498.13        | ending dependencies haggle fluffily. regular deposits boost quickly carefully regular requests. deposits affix furiously around the pinto beans. ironic, unusual platelets across the p            |",
42///   "+------------+------------+-------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+"
43/// ]);
44/// ```
45pub struct PartSuppArrow {
46    inner: PartSuppGeneratorIterator<'static>,
47    batch_size: usize,
48}
49
50impl PartSuppArrow {
51    pub fn new(generator: PartSuppGenerator<'static>) -> Self {
52        Self {
53            inner: generator.iter(),
54            batch_size: DEFAULT_BATCH_SIZE,
55        }
56    }
57
58    /// Set the batch size
59    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
60        self.batch_size = batch_size;
61        self
62    }
63}
64
65impl RecordBatchIterator for PartSuppArrow {
66    fn schema(&self) -> &SchemaRef {
67        &PARTSUPP_SCHEMA
68    }
69}
70
71impl Iterator for PartSuppArrow {
72    type Item = RecordBatch;
73
74    fn next(&mut self) -> Option<Self::Item> {
75        // Get next rows to convert
76        let rows: Vec<_> = self.inner.by_ref().take(self.batch_size).collect();
77        if rows.is_empty() {
78            return None;
79        }
80
81        let ps_partkey = Int64Array::from_iter_values(rows.iter().map(|r| r.ps_partkey));
82        let ps_suppkey = Int64Array::from_iter_values(rows.iter().map(|r| r.ps_suppkey));
83        let ps_availqty = Int32Array::from_iter_values(rows.iter().map(|r| r.ps_availqty));
84        let ps_supplycost = decimal128_array_from_iter(rows.iter().map(|r| r.ps_supplycost));
85        let ps_comment = string_view_array_from_display_iter(rows.iter().map(|r| r.ps_comment));
86
87        let batch = RecordBatch::try_new(
88            Arc::clone(self.schema()),
89            vec![
90                Arc::new(ps_partkey),
91                Arc::new(ps_suppkey),
92                Arc::new(ps_availqty),
93                Arc::new(ps_supplycost),
94                Arc::new(ps_comment),
95            ],
96        )
97        .unwrap();
98        Some(batch)
99    }
100}
101
102/// Schema for the PartSupp
103static PARTSUPP_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(make_partsupp_schema);
104fn make_partsupp_schema() -> SchemaRef {
105    Arc::new(Schema::new(vec![
106        Field::new("ps_partkey", DataType::Int64, false),
107        Field::new("ps_suppkey", DataType::Int64, false),
108        Field::new("ps_availqty", DataType::Int32, false),
109        Field::new("ps_supplycost", DataType::Decimal128(15, 2), false),
110        Field::new("ps_comment", DataType::Utf8View, false),
111    ]))
112}