1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12 pub source_uri: String,
13 pub source_local_path: PathBuf,
14 pub source_name: String,
15 pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20 pub rule: String,
21 pub message: String,
22}
23
24pub enum ReadInput {
25 Data {
26 input_file: InputFile,
27 raw_df: Option<DataFrame>,
28 typed_df: DataFrame,
29 },
30 FileError {
31 input_file: InputFile,
32 error: FileReadError,
33 },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteOutput {
38 pub parts_written: u64,
39 pub part_files: Vec<String>,
40 pub table_version: Option<i64>,
41}
42
43pub trait InputAdapter: Send + Sync {
44 fn format(&self) -> &'static str;
45
46 fn default_globs(&self) -> FloeResult<Vec<String>> {
47 io::storage::extensions::glob_patterns_for_format(self.format())
48 }
49
50 fn suffixes(&self) -> FloeResult<Vec<String>> {
51 io::storage::extensions::suffixes_for_format(self.format())
52 }
53
54 fn resolve_local_inputs(
55 &self,
56 config_dir: &Path,
57 entity_name: &str,
58 source: &config::SourceConfig,
59 storage: &str,
60 ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
61 let default_globs = self.default_globs()?;
62 io::storage::local::resolve_local_inputs(
63 config_dir,
64 entity_name,
65 source,
66 storage,
67 &default_globs,
68 )
69 }
70
71 fn read_input_columns(
72 &self,
73 entity: &config::EntityConfig,
74 input_file: &InputFile,
75 columns: &[config::ColumnConfig],
76 ) -> Result<Vec<String>, FileReadError>;
77
78 fn read_inputs(
79 &self,
80 entity: &config::EntityConfig,
81 files: &[InputFile],
82 columns: &[config::ColumnConfig],
83 normalize_strategy: Option<&str>,
84 collect_raw: bool,
85 ) -> FloeResult<Vec<ReadInput>>;
86}
87
88pub trait AcceptedSinkAdapter: Send + Sync {
89 fn write_accepted(
90 &self,
91 target: &Target,
92 df: &mut DataFrame,
93 output_stem: &str,
94 temp_dir: Option<&Path>,
95 cloud: &mut io::storage::CloudClient,
96 resolver: &config::StorageResolver,
97 entity: &config::EntityConfig,
98 ) -> FloeResult<AcceptedWriteOutput>;
99}
100
101pub trait RejectedSinkAdapter: Send + Sync {
102 fn write_rejected(
103 &self,
104 target: &Target,
105 df: &mut DataFrame,
106 source_stem: &str,
107 temp_dir: Option<&Path>,
108 cloud: &mut io::storage::CloudClient,
109 resolver: &config::StorageResolver,
110 entity: &config::EntityConfig,
111 ) -> FloeResult<String>;
112}
113
114#[derive(Debug, Clone, Copy)]
115pub enum FormatKind {
116 Source,
117 SinkAccepted,
118 SinkRejected,
119}
120
121impl FormatKind {
122 fn field_path(self) -> &'static str {
123 match self {
124 FormatKind::Source => "source.format",
125 FormatKind::SinkAccepted => "sink.accepted.format",
126 FormatKind::SinkRejected => "sink.rejected.format",
127 }
128 }
129
130 fn description(self) -> &'static str {
131 match self {
132 FormatKind::Source => "source format",
133 FormatKind::SinkAccepted => "accepted sink format",
134 FormatKind::SinkRejected => "rejected sink format",
135 }
136 }
137}
138
139fn unsupported_format_error(
140 kind: FormatKind,
141 format: &str,
142 entity_name: Option<&str>,
143) -> ConfigError {
144 if let Some(entity_name) = entity_name {
145 return ConfigError(format!(
146 "entity.name={} {}={} is unsupported",
147 entity_name,
148 kind.field_path(),
149 format
150 ));
151 }
152 ConfigError(format!("unsupported {}: {format}", kind.description()))
153}
154
155pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
156 if input_adapter(format).is_err() {
157 return Err(Box::new(unsupported_format_error(
158 FormatKind::Source,
159 format,
160 Some(entity_name),
161 )));
162 }
163 Ok(())
164}
165
166pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
167 if accepted_sink_adapter(format).is_err() {
168 return Err(Box::new(unsupported_format_error(
169 FormatKind::SinkAccepted,
170 format,
171 Some(entity_name),
172 )));
173 }
174 Ok(())
175}
176
177pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
178 if rejected_sink_adapter(format).is_err() {
179 return Err(Box::new(unsupported_format_error(
180 FormatKind::SinkRejected,
181 format,
182 Some(entity_name),
183 )));
184 }
185 Ok(())
186}
187
188pub fn sink_options_warning(
189 entity_name: &str,
190 format: &str,
191 options: Option<&config::SinkOptions>,
192) -> Option<String> {
193 let options = options?;
194 if format == "parquet" {
195 return None;
196 }
197 let mut keys = Vec::new();
198 if options.compression.is_some() {
199 keys.push("compression");
200 }
201 if options.row_group_size.is_some() {
202 keys.push("row_group_size");
203 }
204 if options.max_size_per_file.is_some() {
205 keys.push("max_size_per_file");
206 }
207 let detail = if keys.is_empty() {
208 "options".to_string()
209 } else {
210 keys.join(", ")
211 };
212 Some(format!(
213 "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
214 entity_name, format
215 ))
216}
217
218pub fn validate_sink_options(
219 entity_name: &str,
220 format: &str,
221 options: Option<&config::SinkOptions>,
222) -> FloeResult<()> {
223 let options = match options {
224 Some(options) => options,
225 None => return Ok(()),
226 };
227 if format != "parquet" {
228 return Ok(());
229 }
230 if let Some(compression) = &options.compression {
231 match compression.as_str() {
232 "snappy" | "gzip" | "zstd" | "uncompressed" => {}
233 _ => {
234 return Err(Box::new(ConfigError(format!(
235 "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
236 entity_name, compression
237 ))))
238 }
239 }
240 }
241 if let Some(row_group_size) = options.row_group_size {
242 if row_group_size == 0 {
243 return Err(Box::new(ConfigError(format!(
244 "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
245 entity_name
246 ))));
247 }
248 }
249 if let Some(max_size_per_file) = options.max_size_per_file {
250 if max_size_per_file == 0 {
251 return Err(Box::new(ConfigError(format!(
252 "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
253 entity_name
254 ))));
255 }
256 }
257 Ok(())
258}
259
260pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
261 match format {
262 "csv" => Ok(io::read::csv::csv_input_adapter()),
263 "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
264 "json" => Ok(io::read::json::json_input_adapter()),
265 _ => Err(Box::new(unsupported_format_error(
266 FormatKind::Source,
267 format,
268 None,
269 ))),
270 }
271}
272
273pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
274 match format {
275 "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
276 "delta" => Ok(io::write::delta::delta_accepted_adapter()),
277 "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
278 _ => Err(Box::new(unsupported_format_error(
279 FormatKind::SinkAccepted,
280 format,
281 None,
282 ))),
283 }
284}
285
286pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
287 match format {
288 "csv" => Ok(io::write::csv::csv_rejected_adapter()),
289 _ => Err(Box::new(unsupported_format_error(
290 FormatKind::SinkRejected,
291 format,
292 None,
293 ))),
294 }
295}
296
297pub(crate) fn read_input_from_df(
298 input_file: &InputFile,
299 df: &DataFrame,
300 columns: &[config::ColumnConfig],
301 normalize_strategy: Option<&str>,
302 collect_raw: bool,
303) -> FloeResult<ReadInput> {
304 let input_columns = df
305 .get_column_names()
306 .iter()
307 .map(|name| name.to_string())
308 .collect::<Vec<_>>();
309 let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
310 let raw_df = if collect_raw {
311 Some(cast_df_to_string(df)?)
312 } else {
313 None
314 };
315 let typed_df = cast_df_to_schema(df, &typed_schema)?;
316 finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
317}
318
319pub(crate) fn finalize_read_input(
320 input_file: &InputFile,
321 mut raw_df: Option<DataFrame>,
322 mut typed_df: DataFrame,
323 normalize_strategy: Option<&str>,
324) -> FloeResult<ReadInput> {
325 if let Some(strategy) = normalize_strategy {
326 if let Some(raw_df) = raw_df.as_mut() {
327 crate::run::normalize::normalize_dataframe_columns(raw_df, strategy)?;
328 }
329 crate::run::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
330 }
331 Ok(ReadInput::Data {
332 input_file: input_file.clone(),
333 raw_df,
334 typed_df,
335 })
336}
337
338pub(crate) fn build_typed_schema(
339 input_columns: &[String],
340 declared_columns: &[config::ColumnConfig],
341 normalize_strategy: Option<&str>,
342) -> FloeResult<Schema> {
343 let mut declared_types = HashMap::new();
344 for column in declared_columns {
345 declared_types.insert(
346 column.name.as_str(),
347 config::parse_data_type(&column.column_type)?,
348 );
349 }
350
351 let mut schema = Schema::with_capacity(input_columns.len());
352 for name in input_columns {
353 let normalized = if let Some(strategy) = normalize_strategy {
354 crate::run::normalize::normalize_name(name, strategy)
355 } else {
356 name.to_string()
357 };
358 let dtype = declared_types
359 .get(normalized.as_str())
360 .cloned()
361 .unwrap_or(DataType::String);
362 schema.insert(name.as_str().into(), dtype);
363 }
364 Ok(schema)
365}
366
367pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
368 cast_df_with_type(df, &DataType::String)
369}
370
371pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
372 let mut columns = Vec::with_capacity(schema.len());
373 for (name, dtype) in schema.iter() {
374 let series = df.column(name.as_str()).map_err(|err| {
375 Box::new(ConfigError(format!(
376 "input column {} not found: {err}",
377 name.as_str()
378 )))
379 })?;
380 let casted =
381 if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
382 cast_string_to_bool(name.as_str(), series)?
383 } else {
384 series
385 .cast_with_options(dtype, CastOptions::NonStrict)
386 .map_err(|err| {
387 Box::new(ConfigError(format!(
388 "failed to cast input column {}: {err}",
389 name.as_str()
390 )))
391 })?
392 };
393 columns.push(casted);
394 }
395 DataFrame::new(columns).map_err(|err| {
396 Box::new(ConfigError(format!(
397 "failed to build typed dataframe: {err}"
398 ))) as Box<dyn std::error::Error + Send + Sync>
399 })
400}
401
402fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
403 let string_values = series.as_materialized_series().str().map_err(|err| {
404 Box::new(ConfigError(format!(
405 "failed to read boolean column {} as string: {err}",
406 name
407 )))
408 })?;
409 let mut values = Vec::with_capacity(series.len());
410 for value in string_values {
411 let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
412 "true" | "1" => Some(true),
413 "false" | "0" => Some(false),
414 _ => None,
415 });
416 values.push(parsed);
417 }
418 Ok(Series::new(name.into(), values).into())
419}
420
421fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
422 let mut out = df.clone();
423 let names = out
424 .get_column_names()
425 .iter()
426 .map(|name| name.to_string())
427 .collect::<Vec<_>>();
428 for name in names {
429 let series = out.column(&name).map_err(|err| {
430 Box::new(ConfigError(format!(
431 "input column {} not found: {err}",
432 name
433 )))
434 })?;
435 let casted = series
436 .cast_with_options(dtype, CastOptions::NonStrict)
437 .map_err(|err| {
438 Box::new(ConfigError(format!(
439 "failed to cast input column {}: {err}",
440 name
441 )))
442 })?;
443 let idx = out.get_column_index(&name).ok_or_else(|| {
444 Box::new(ConfigError(format!(
445 "input column {} not found for update",
446 name
447 )))
448 })?;
449 out.replace_column(idx, casted).map_err(|err| {
450 Box::new(ConfigError(format!(
451 "failed to update input column {}: {err}",
452 name
453 )))
454 })?;
455 }
456 Ok(out)
457}
458pub fn collect_row_errors(
459 raw_df: &DataFrame,
460 typed_df: &DataFrame,
461 required_cols: &[String],
462 columns: &[config::ColumnConfig],
463 track_cast_errors: bool,
464 raw_indices: &check::ColumnIndex,
465 typed_indices: &check::ColumnIndex,
466) -> FloeResult<Vec<Vec<check::RowError>>> {
467 let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
468 if track_cast_errors {
469 let cast_errors =
470 check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
471 for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
472 errors.extend(cast);
473 }
474 }
475 Ok(error_lists)
476}