Skip to main content

tpcgen_cli/tpch_cli/
output_plan.rs

1//! * [`OutputLocation`]: where to output the generated data
2//! * [`OutputPlan`]: an output file that will be generated
3//! * [`OutputPlanGenerator`]: plans the output files to be generated
4
5use crate::tpch_cli::plan::GenerationPlan;
6use crate::tpch_cli::{OutputFormat, Table};
7use log::debug;
8use parquet::basic::Compression;
9use std::collections::HashSet;
10use std::fmt::{Display, Formatter};
11use std::io;
12use std::path::PathBuf;
13
14/// Where a partition will be output
15#[derive(Debug, Clone, PartialEq)]
16pub enum OutputLocation {
17    /// Output to a file
18    File(PathBuf),
19    /// Output to stdout
20    Stdout,
21}
22
23impl Display for OutputLocation {
24    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
25        match self {
26            OutputLocation::File(path) => {
27                let Some(file) = path.file_name() else {
28                    return write!(f, "{}", path.display());
29                };
30                // Display the file name only, not the full path
31                write!(f, "{}", file.to_string_lossy())
32            }
33            OutputLocation::Stdout => write!(f, "Stdout"),
34        }
35    }
36}
37
38/// Describes an output partition (file) that will be generated
39#[derive(Debug, Clone, PartialEq)]
40pub struct OutputPlan {
41    /// The table
42    table: Table,
43    /// The scale factor
44    scale_factor: f64,
45    /// The output format (TODO don't depend back on something in main)
46    output_format: OutputFormat,
47    /// If the output is parquet, what compression level to use
48    parquet_compression: Compression,
49    /// Where to output
50    output_location: OutputLocation,
51    /// Plan for generating the table
52    generation_plan: GenerationPlan,
53    /// CSV delimiter character
54    csv_delimiter: char,
55}
56
57impl OutputPlan {
58    pub fn new(
59        table: Table,
60        scale_factor: f64,
61        output_format: OutputFormat,
62        parquet_compression: Compression,
63        output_location: OutputLocation,
64        generation_plan: GenerationPlan,
65        csv_delimiter: char,
66    ) -> Self {
67        Self {
68            table,
69            scale_factor,
70            output_format,
71            parquet_compression,
72            output_location,
73            generation_plan,
74            csv_delimiter,
75        }
76    }
77
78    /// Return the table this partition is for
79    pub fn table(&self) -> Table {
80        self.table
81    }
82
83    /// Return the scale factor for this partition
84    pub fn scale_factor(&self) -> f64 {
85        self.scale_factor
86    }
87
88    /// Return the output format for this partition
89    pub fn output_format(&self) -> OutputFormat {
90        self.output_format
91    }
92
93    /// return the output location
94    pub fn output_location(&self) -> &OutputLocation {
95        &self.output_location
96    }
97
98    /// Return the parquet compression level for this partition
99    pub fn parquet_compression(&self) -> Compression {
100        self.parquet_compression
101    }
102
103    /// Return the number of chunks part(ition) count (the number of data chunks
104    /// in the underlying generation plan)
105    pub fn chunk_count(&self) -> usize {
106        self.generation_plan.chunk_count()
107    }
108
109    /// return the generation plan for this partition
110    pub fn generation_plan(&self) -> &GenerationPlan {
111        &self.generation_plan
112    }
113
114    /// Return the CSV delimiter character for this partition
115    pub fn csv_delimiter(&self) -> char {
116        self.csv_delimiter
117    }
118}
119
120impl Display for OutputPlan {
121    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
122        write!(
123            f,
124            "table {} (SF={}, {} chunks) to {}",
125            self.table,
126            self.scale_factor,
127            self.chunk_count(),
128            self.output_location
129        )
130    }
131}
132
133/// Plans the creation of output files
134pub struct OutputPlanGenerator {
135    format: OutputFormat,
136    scale_factor: f64,
137    parquet_compression: Compression,
138    parquet_row_group_bytes: i64,
139    stdout: bool,
140    output_dir: PathBuf,
141    csv_delimiter: char,
142    /// The generated output plans
143    output_plans: Vec<OutputPlan>,
144    /// Output directories that have been created so far
145    /// (used to avoid creating the same directory multiple times)
146    created_directories: HashSet<PathBuf>,
147}
148
149impl OutputPlanGenerator {
150    pub fn new(
151        format: OutputFormat,
152        scale_factor: f64,
153        parquet_compression: Compression,
154        parquet_row_group_bytes: i64,
155        stdout: bool,
156        output_dir: PathBuf,
157        csv_delimiter: char,
158    ) -> Self {
159        Self {
160            format,
161            scale_factor,
162            parquet_compression,
163            parquet_row_group_bytes,
164            stdout,
165            output_dir,
166            csv_delimiter,
167            output_plans: Vec::new(),
168            created_directories: HashSet::new(),
169        }
170    }
171
172    /// Generate the output plans for the given table and partition options
173    pub fn generate_plans(
174        &mut self,
175        table: Table,
176        cli_part: Option<i32>,
177        cli_part_count: Option<i32>,
178    ) -> io::Result<()> {
179        // If the user specified only a part count, automatically create all
180        // partitions for the table
181        if let (None, Some(part_count)) = (cli_part, cli_part_count) {
182            if GenerationPlan::partitioned_table(table) {
183                debug!("Generating all partitions for table {table} with part count {part_count}");
184                for part in 1..=part_count {
185                    self.generate_plan_inner(table, Some(part), Some(part_count))?;
186                }
187            } else {
188                // there is only one partition for this table (e.g nation or region)
189                debug!("Generating single partition for table {table}");
190                self.generate_plan_inner(table, Some(1), Some(1))?;
191            }
192        } else {
193            self.generate_plan_inner(table, cli_part, cli_part_count)?;
194        }
195        Ok(())
196    }
197
198    fn generate_plan_inner(
199        &mut self,
200        table: Table,
201        cli_part: Option<i32>,
202        cli_part_count: Option<i32>,
203    ) -> io::Result<()> {
204        let generation_plan = GenerationPlan::try_new(
205            table,
206            self.format,
207            self.scale_factor,
208            cli_part,
209            cli_part_count,
210            self.parquet_row_group_bytes,
211        )
212        .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
213
214        let output_location = self.output_location(table, cli_part)?;
215
216        let plan = OutputPlan::new(
217            table,
218            self.scale_factor,
219            self.format,
220            self.parquet_compression,
221            output_location,
222            generation_plan,
223            self.csv_delimiter,
224        );
225
226        self.output_plans.push(plan);
227        Ok(())
228    }
229
230    /// Return the output location for the given table
231    ///
232    /// * if part of is None, the output location is `{output_dir}/{table}.{extension}`
233    ///
234    /// * if part is Some(part), then the output location
235    ///   will be `{output_dir}/{table}/{table}table.{part}.{extension}`
236    ///   (e.g. orders/orders.1.tbl, orders/orders.2.tbl, etc.)
237    fn output_location(&mut self, table: Table, part: Option<i32>) -> io::Result<OutputLocation> {
238        if self.stdout {
239            Ok(OutputLocation::Stdout)
240        } else {
241            let extension = match self.format {
242                OutputFormat::Tbl => "tbl",
243                OutputFormat::Csv => "csv",
244                OutputFormat::Parquet => "parquet",
245            };
246
247            let mut output_path = self.output_dir.clone();
248            if let Some(part) = part {
249                // If a partition is specified, create a subdirectory for it
250                output_path.push(table.to_string());
251                self.ensure_directory_exists(&output_path)?;
252                output_path.push(format!("{table}.{part}.{extension}"));
253            } else {
254                // No partition specified, output to a single file
255                output_path.push(format!("{table}.{extension}"));
256            }
257            Ok(OutputLocation::File(output_path))
258        }
259    }
260
261    /// Ensure the output directory exists, creating it if necessary
262    fn ensure_directory_exists(&mut self, dir: &PathBuf) -> io::Result<()> {
263        if self.created_directories.contains(dir) {
264            return Ok(());
265        }
266        std::fs::create_dir_all(dir).map_err(|e| {
267            io::Error::new(
268                io::ErrorKind::InvalidInput,
269                format!("Error creating directory {}: {}", dir.display(), e),
270            )
271        })?;
272        self.created_directories.insert(dir.clone());
273        Ok(())
274    }
275
276    /// Return the output plans generated so far
277    pub fn build(self) -> Vec<OutputPlan> {
278        self.output_plans
279    }
280}