1use crate::tpch_cli::plan::GenerationPlan;
6use crate::tpch_cli::{OutputFormat, Table};
7use log::debug;
8use parquet::basic::Compression;
9use std::collections::HashSet;
10use std::fmt::{Display, Formatter};
11use std::io;
12use std::path::PathBuf;
13
14#[derive(Debug, Clone, PartialEq)]
16pub enum OutputLocation {
17 File(PathBuf),
19 Stdout,
21}
22
23impl Display for OutputLocation {
24 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
25 match self {
26 OutputLocation::File(path) => {
27 let Some(file) = path.file_name() else {
28 return write!(f, "{}", path.display());
29 };
30 write!(f, "{}", file.to_string_lossy())
32 }
33 OutputLocation::Stdout => write!(f, "Stdout"),
34 }
35 }
36}
37
38#[derive(Debug, Clone, PartialEq)]
40pub struct OutputPlan {
41 table: Table,
43 scale_factor: f64,
45 output_format: OutputFormat,
47 parquet_compression: Compression,
49 output_location: OutputLocation,
51 generation_plan: GenerationPlan,
53 csv_delimiter: char,
55}
56
57impl OutputPlan {
58 pub fn new(
59 table: Table,
60 scale_factor: f64,
61 output_format: OutputFormat,
62 parquet_compression: Compression,
63 output_location: OutputLocation,
64 generation_plan: GenerationPlan,
65 csv_delimiter: char,
66 ) -> Self {
67 Self {
68 table,
69 scale_factor,
70 output_format,
71 parquet_compression,
72 output_location,
73 generation_plan,
74 csv_delimiter,
75 }
76 }
77
78 pub fn table(&self) -> Table {
80 self.table
81 }
82
83 pub fn scale_factor(&self) -> f64 {
85 self.scale_factor
86 }
87
88 pub fn output_format(&self) -> OutputFormat {
90 self.output_format
91 }
92
93 pub fn output_location(&self) -> &OutputLocation {
95 &self.output_location
96 }
97
98 pub fn parquet_compression(&self) -> Compression {
100 self.parquet_compression
101 }
102
103 pub fn chunk_count(&self) -> usize {
106 self.generation_plan.chunk_count()
107 }
108
109 pub fn generation_plan(&self) -> &GenerationPlan {
111 &self.generation_plan
112 }
113
114 pub fn csv_delimiter(&self) -> char {
116 self.csv_delimiter
117 }
118}
119
120impl Display for OutputPlan {
121 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
122 write!(
123 f,
124 "table {} (SF={}, {} chunks) to {}",
125 self.table,
126 self.scale_factor,
127 self.chunk_count(),
128 self.output_location
129 )
130 }
131}
132
133pub struct OutputPlanGenerator {
135 format: OutputFormat,
136 scale_factor: f64,
137 parquet_compression: Compression,
138 parquet_row_group_bytes: i64,
139 stdout: bool,
140 output_dir: PathBuf,
141 csv_delimiter: char,
142 output_plans: Vec<OutputPlan>,
144 created_directories: HashSet<PathBuf>,
147}
148
149impl OutputPlanGenerator {
150 pub fn new(
151 format: OutputFormat,
152 scale_factor: f64,
153 parquet_compression: Compression,
154 parquet_row_group_bytes: i64,
155 stdout: bool,
156 output_dir: PathBuf,
157 csv_delimiter: char,
158 ) -> Self {
159 Self {
160 format,
161 scale_factor,
162 parquet_compression,
163 parquet_row_group_bytes,
164 stdout,
165 output_dir,
166 csv_delimiter,
167 output_plans: Vec::new(),
168 created_directories: HashSet::new(),
169 }
170 }
171
172 pub fn generate_plans(
174 &mut self,
175 table: Table,
176 cli_part: Option<i32>,
177 cli_part_count: Option<i32>,
178 ) -> io::Result<()> {
179 if let (None, Some(part_count)) = (cli_part, cli_part_count) {
182 if GenerationPlan::partitioned_table(table) {
183 debug!("Generating all partitions for table {table} with part count {part_count}");
184 for part in 1..=part_count {
185 self.generate_plan_inner(table, Some(part), Some(part_count))?;
186 }
187 } else {
188 debug!("Generating single partition for table {table}");
190 self.generate_plan_inner(table, Some(1), Some(1))?;
191 }
192 } else {
193 self.generate_plan_inner(table, cli_part, cli_part_count)?;
194 }
195 Ok(())
196 }
197
198 fn generate_plan_inner(
199 &mut self,
200 table: Table,
201 cli_part: Option<i32>,
202 cli_part_count: Option<i32>,
203 ) -> io::Result<()> {
204 let generation_plan = GenerationPlan::try_new(
205 table,
206 self.format,
207 self.scale_factor,
208 cli_part,
209 cli_part_count,
210 self.parquet_row_group_bytes,
211 )
212 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
213
214 let output_location = self.output_location(table, cli_part)?;
215
216 let plan = OutputPlan::new(
217 table,
218 self.scale_factor,
219 self.format,
220 self.parquet_compression,
221 output_location,
222 generation_plan,
223 self.csv_delimiter,
224 );
225
226 self.output_plans.push(plan);
227 Ok(())
228 }
229
230 fn output_location(&mut self, table: Table, part: Option<i32>) -> io::Result<OutputLocation> {
238 if self.stdout {
239 Ok(OutputLocation::Stdout)
240 } else {
241 let extension = match self.format {
242 OutputFormat::Tbl => "tbl",
243 OutputFormat::Csv => "csv",
244 OutputFormat::Parquet => "parquet",
245 };
246
247 let mut output_path = self.output_dir.clone();
248 if let Some(part) = part {
249 output_path.push(table.to_string());
251 self.ensure_directory_exists(&output_path)?;
252 output_path.push(format!("{table}.{part}.{extension}"));
253 } else {
254 output_path.push(format!("{table}.{extension}"));
256 }
257 Ok(OutputLocation::File(output_path))
258 }
259 }
260
261 fn ensure_directory_exists(&mut self, dir: &PathBuf) -> io::Result<()> {
263 if self.created_directories.contains(dir) {
264 return Ok(());
265 }
266 std::fs::create_dir_all(dir).map_err(|e| {
267 io::Error::new(
268 io::ErrorKind::InvalidInput,
269 format!("Error creating directory {}: {}", dir.display(), e),
270 )
271 })?;
272 self.created_directories.insert(dir.clone());
273 Ok(())
274 }
275
276 pub fn build(self) -> Vec<OutputPlan> {
278 self.output_plans
279 }
280}