tpchgen_cli/
output_plan.rs

1//! * [`OutputLocation`]: where to output the generated data
2//! * [`OutputPlan`]: an output file that will be generated
3//! * [`OutputPlanGenerator`]: plans the output files to be generated
4
5use crate::plan::GenerationPlan;
6use crate::{OutputFormat, Table};
7use log::debug;
8use parquet::basic::Compression;
9use std::collections::HashSet;
10use std::fmt::{Display, Formatter};
11use std::io;
12use std::path::PathBuf;
13
14/// Where a partition will be output
15#[derive(Debug, Clone, PartialEq)]
16pub enum OutputLocation {
17    /// Output to a file
18    File(PathBuf),
19    /// Output to stdout
20    Stdout,
21}
22
23impl Display for OutputLocation {
24    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
25        match self {
26            OutputLocation::File(path) => {
27                let Some(file) = path.file_name() else {
28                    return write!(f, "{}", path.display());
29                };
30                // Display the file name only, not the full path
31                write!(f, "{}", file.to_string_lossy())
32            }
33            OutputLocation::Stdout => write!(f, "Stdout"),
34        }
35    }
36}
37
38/// Describes an output partition (file) that will be generated
39#[derive(Debug, Clone, PartialEq)]
40pub struct OutputPlan {
41    /// The table
42    table: Table,
43    /// The scale factor
44    scale_factor: f64,
45    /// The output format (TODO don't depend back on something in main)
46    output_format: OutputFormat,
47    /// If the output is parquet, what compression level to use
48    parquet_compression: Compression,
49    /// Where to output
50    output_location: OutputLocation,
51    /// Plan for generating the table
52    generation_plan: GenerationPlan,
53}
54
55impl OutputPlan {
56    pub fn new(
57        table: Table,
58        scale_factor: f64,
59        output_format: OutputFormat,
60        parquet_compression: Compression,
61        output_location: OutputLocation,
62        generation_plan: GenerationPlan,
63    ) -> Self {
64        Self {
65            table,
66            scale_factor,
67            output_format,
68            parquet_compression,
69            output_location,
70            generation_plan,
71        }
72    }
73
74    /// Return the table this partition is for
75    pub fn table(&self) -> Table {
76        self.table
77    }
78
79    /// Return the scale factor for this partition
80    pub fn scale_factor(&self) -> f64 {
81        self.scale_factor
82    }
83
84    /// Return the output format for this partition
85    pub fn output_format(&self) -> OutputFormat {
86        self.output_format
87    }
88
89    /// return the output location
90    pub fn output_location(&self) -> &OutputLocation {
91        &self.output_location
92    }
93
94    /// Return the parquet compression level for this partition
95    pub fn parquet_compression(&self) -> Compression {
96        self.parquet_compression
97    }
98
99    /// Return the number of chunks part(ition) count (the number of data chunks
100    /// in the underlying generation plan)
101    pub fn chunk_count(&self) -> usize {
102        self.generation_plan.chunk_count()
103    }
104
105    /// return the generation plan for this partition
106    pub fn generation_plan(&self) -> &GenerationPlan {
107        &self.generation_plan
108    }
109}
110
111impl Display for OutputPlan {
112    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
113        write!(
114            f,
115            "table {} (SF={}, {} chunks) to {}",
116            self.table,
117            self.scale_factor,
118            self.chunk_count(),
119            self.output_location
120        )
121    }
122}
123
124/// Plans the creation of output files
125pub struct OutputPlanGenerator {
126    format: OutputFormat,
127    scale_factor: f64,
128    parquet_compression: Compression,
129    parquet_row_group_bytes: i64,
130    stdout: bool,
131    output_dir: PathBuf,
132    /// The generated output plans
133    output_plans: Vec<OutputPlan>,
134    /// Output directores that have been created so far
135    /// (used to avoid creating the same directory multiple times)
136    created_directories: HashSet<PathBuf>,
137}
138
139impl OutputPlanGenerator {
140    pub fn new(
141        format: OutputFormat,
142        scale_factor: f64,
143        parquet_compression: Compression,
144        parquet_row_group_bytes: i64,
145        stdout: bool,
146        output_dir: PathBuf,
147    ) -> Self {
148        Self {
149            format,
150            scale_factor,
151            parquet_compression,
152            parquet_row_group_bytes,
153            stdout,
154            output_dir,
155            output_plans: Vec::new(),
156            created_directories: HashSet::new(),
157        }
158    }
159
160    /// Generate the output plans for the given table and partition options
161    pub fn generate_plans(
162        &mut self,
163        table: Table,
164        cli_part: Option<i32>,
165        cli_part_count: Option<i32>,
166    ) -> io::Result<()> {
167        // If the user specified only a part count, automatically create all
168        // partitions for the table
169        if let (None, Some(part_count)) = (cli_part, cli_part_count) {
170            if GenerationPlan::partitioned_table(table) {
171                debug!("Generating all partitions for table {table} with part count {part_count}");
172                for part in 1..=part_count {
173                    self.generate_plan_inner(table, Some(part), Some(part_count))?;
174                }
175            } else {
176                // there is only one partition for this table (e.g nation or region)
177                debug!("Generating single partition for table {table}");
178                self.generate_plan_inner(table, Some(1), Some(1))?;
179            }
180        } else {
181            self.generate_plan_inner(table, cli_part, cli_part_count)?;
182        }
183        Ok(())
184    }
185
186    fn generate_plan_inner(
187        &mut self,
188        table: Table,
189        cli_part: Option<i32>,
190        cli_part_count: Option<i32>,
191    ) -> io::Result<()> {
192        let generation_plan = GenerationPlan::try_new(
193            table,
194            self.format,
195            self.scale_factor,
196            cli_part,
197            cli_part_count,
198            self.parquet_row_group_bytes,
199        )
200        .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
201
202        let output_location = self.output_location(table, cli_part)?;
203
204        let plan = OutputPlan::new(
205            table,
206            self.scale_factor,
207            self.format,
208            self.parquet_compression,
209            output_location,
210            generation_plan,
211        );
212
213        self.output_plans.push(plan);
214        Ok(())
215    }
216
217    /// Return the output location for the given table
218    ///
219    /// * if part of is None, the output location is `{output_dir}/{table}.{extension}`
220    ///
221    /// * if part is Some(part), then the output location
222    ///   will be `{output_dir}/{table}/{table}table.{part}.{extension}`
223    ///   (e.g. orders/orders.1.tbl, orders/orders.2.tbl, etc.)
224    fn output_location(&mut self, table: Table, part: Option<i32>) -> io::Result<OutputLocation> {
225        if self.stdout {
226            Ok(OutputLocation::Stdout)
227        } else {
228            let extension = match self.format {
229                OutputFormat::Tbl => "tbl",
230                OutputFormat::Csv => "csv",
231                OutputFormat::Parquet => "parquet",
232            };
233
234            let mut output_path = self.output_dir.clone();
235            if let Some(part) = part {
236                // If a partition is specified, create a subdirectory for it
237                output_path.push(table.to_string());
238                self.ensure_directory_exists(&output_path)?;
239                output_path.push(format!("{table}.{part}.{extension}"));
240            } else {
241                // No partition specified, output to a single file
242                output_path.push(format!("{table}.{extension}"));
243            }
244            Ok(OutputLocation::File(output_path))
245        }
246    }
247
248    /// Ensure the output directory exists, creating it if necessary
249    fn ensure_directory_exists(&mut self, dir: &PathBuf) -> io::Result<()> {
250        if self.created_directories.contains(dir) {
251            return Ok(());
252        }
253        std::fs::create_dir_all(dir).map_err(|e| {
254            io::Error::new(
255                io::ErrorKind::InvalidInput,
256                format!("Error creating directory {}: {}", dir.display(), e),
257            )
258        })?;
259        self.created_directories.insert(dir.clone());
260        Ok(())
261    }
262
263    /// Return the output plans generated so far
264    pub fn build(self) -> Vec<OutputPlan> {
265        self.output_plans
266    }
267}