1use crate::plan::GenerationPlan;
6use crate::{OutputFormat, Table};
7use log::debug;
8use parquet::basic::Compression;
9use std::collections::HashSet;
10use std::fmt::{Display, Formatter};
11use std::io;
12use std::path::PathBuf;
13
14#[derive(Debug, Clone, PartialEq)]
16pub enum OutputLocation {
17 File(PathBuf),
19 Stdout,
21}
22
23impl Display for OutputLocation {
24 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
25 match self {
26 OutputLocation::File(path) => {
27 let Some(file) = path.file_name() else {
28 return write!(f, "{}", path.display());
29 };
30 write!(f, "{}", file.to_string_lossy())
32 }
33 OutputLocation::Stdout => write!(f, "Stdout"),
34 }
35 }
36}
37
38#[derive(Debug, Clone, PartialEq)]
40pub struct OutputPlan {
41 table: Table,
43 scale_factor: f64,
45 output_format: OutputFormat,
47 parquet_compression: Compression,
49 output_location: OutputLocation,
51 generation_plan: GenerationPlan,
53}
54
55impl OutputPlan {
56 pub fn new(
57 table: Table,
58 scale_factor: f64,
59 output_format: OutputFormat,
60 parquet_compression: Compression,
61 output_location: OutputLocation,
62 generation_plan: GenerationPlan,
63 ) -> Self {
64 Self {
65 table,
66 scale_factor,
67 output_format,
68 parquet_compression,
69 output_location,
70 generation_plan,
71 }
72 }
73
74 pub fn table(&self) -> Table {
76 self.table
77 }
78
79 pub fn scale_factor(&self) -> f64 {
81 self.scale_factor
82 }
83
84 pub fn output_format(&self) -> OutputFormat {
86 self.output_format
87 }
88
89 pub fn output_location(&self) -> &OutputLocation {
91 &self.output_location
92 }
93
94 pub fn parquet_compression(&self) -> Compression {
96 self.parquet_compression
97 }
98
99 pub fn chunk_count(&self) -> usize {
102 self.generation_plan.chunk_count()
103 }
104
105 pub fn generation_plan(&self) -> &GenerationPlan {
107 &self.generation_plan
108 }
109}
110
111impl Display for OutputPlan {
112 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
113 write!(
114 f,
115 "table {} (SF={}, {} chunks) to {}",
116 self.table,
117 self.scale_factor,
118 self.chunk_count(),
119 self.output_location
120 )
121 }
122}
123
124pub struct OutputPlanGenerator {
126 format: OutputFormat,
127 scale_factor: f64,
128 parquet_compression: Compression,
129 parquet_row_group_bytes: i64,
130 stdout: bool,
131 output_dir: PathBuf,
132 output_plans: Vec<OutputPlan>,
134 created_directories: HashSet<PathBuf>,
137}
138
139impl OutputPlanGenerator {
140 pub fn new(
141 format: OutputFormat,
142 scale_factor: f64,
143 parquet_compression: Compression,
144 parquet_row_group_bytes: i64,
145 stdout: bool,
146 output_dir: PathBuf,
147 ) -> Self {
148 Self {
149 format,
150 scale_factor,
151 parquet_compression,
152 parquet_row_group_bytes,
153 stdout,
154 output_dir,
155 output_plans: Vec::new(),
156 created_directories: HashSet::new(),
157 }
158 }
159
160 pub fn generate_plans(
162 &mut self,
163 table: Table,
164 cli_part: Option<i32>,
165 cli_part_count: Option<i32>,
166 ) -> io::Result<()> {
167 if let (None, Some(part_count)) = (cli_part, cli_part_count) {
170 if GenerationPlan::partitioned_table(table) {
171 debug!("Generating all partitions for table {table} with part count {part_count}");
172 for part in 1..=part_count {
173 self.generate_plan_inner(table, Some(part), Some(part_count))?;
174 }
175 } else {
176 debug!("Generating single partition for table {table}");
178 self.generate_plan_inner(table, Some(1), Some(1))?;
179 }
180 } else {
181 self.generate_plan_inner(table, cli_part, cli_part_count)?;
182 }
183 Ok(())
184 }
185
186 fn generate_plan_inner(
187 &mut self,
188 table: Table,
189 cli_part: Option<i32>,
190 cli_part_count: Option<i32>,
191 ) -> io::Result<()> {
192 let generation_plan = GenerationPlan::try_new(
193 table,
194 self.format,
195 self.scale_factor,
196 cli_part,
197 cli_part_count,
198 self.parquet_row_group_bytes,
199 )
200 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
201
202 let output_location = self.output_location(table, cli_part)?;
203
204 let plan = OutputPlan::new(
205 table,
206 self.scale_factor,
207 self.format,
208 self.parquet_compression,
209 output_location,
210 generation_plan,
211 );
212
213 self.output_plans.push(plan);
214 Ok(())
215 }
216
217 fn output_location(&mut self, table: Table, part: Option<i32>) -> io::Result<OutputLocation> {
225 if self.stdout {
226 Ok(OutputLocation::Stdout)
227 } else {
228 let extension = match self.format {
229 OutputFormat::Tbl => "tbl",
230 OutputFormat::Csv => "csv",
231 OutputFormat::Parquet => "parquet",
232 };
233
234 let mut output_path = self.output_dir.clone();
235 if let Some(part) = part {
236 output_path.push(table.to_string());
238 self.ensure_directory_exists(&output_path)?;
239 output_path.push(format!("{table}.{part}.{extension}"));
240 } else {
241 output_path.push(format!("{table}.{extension}"));
243 }
244 Ok(OutputLocation::File(output_path))
245 }
246 }
247
248 fn ensure_directory_exists(&mut self, dir: &PathBuf) -> io::Result<()> {
250 if self.created_directories.contains(dir) {
251 return Ok(());
252 }
253 std::fs::create_dir_all(dir).map_err(|e| {
254 io::Error::new(
255 io::ErrorKind::InvalidInput,
256 format!("Error creating directory {}: {}", dir.display(), e),
257 )
258 })?;
259 self.created_directories.insert(dir.clone());
260 Ok(())
261 }
262
263 pub fn build(self) -> Vec<OutputPlan> {
265 self.output_plans
266 }
267}