1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12 pub source_uri: String,
13 pub source_local_path: PathBuf,
14 pub source_name: String,
15 pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20 pub rule: String,
21 pub message: String,
22}
23
24pub enum ReadInput {
25 Data {
26 input_file: InputFile,
27 raw_df: Option<DataFrame>,
28 typed_df: DataFrame,
29 },
30 FileError {
31 input_file: InputFile,
32 error: FileReadError,
33 },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteMetrics {
38 pub total_bytes_written: Option<u64>,
39 pub avg_file_size_mb: Option<f64>,
40 pub small_files_count: Option<u64>,
41}
42
43#[derive(Debug, Clone)]
44pub struct AcceptedWriteOutput {
45 pub files_written: u64,
46 pub parts_written: u64,
47 pub part_files: Vec<String>,
48 pub table_version: Option<i64>,
49 pub snapshot_id: Option<i64>,
50 pub table_root_uri: Option<String>,
51 pub iceberg_catalog_name: Option<String>,
52 pub iceberg_database: Option<String>,
53 pub iceberg_namespace: Option<String>,
54 pub iceberg_table: Option<String>,
55 pub metrics: AcceptedWriteMetrics,
56}
57
58pub trait InputAdapter: Send + Sync {
59 fn format(&self) -> &'static str;
60
61 fn default_globs(&self) -> FloeResult<Vec<String>> {
62 io::storage::extensions::glob_patterns_for_format(self.format())
63 }
64
65 fn suffixes(&self) -> FloeResult<Vec<String>> {
66 io::storage::extensions::suffixes_for_format(self.format())
67 }
68
69 fn resolve_local_inputs(
70 &self,
71 config_dir: &Path,
72 entity_name: &str,
73 source: &config::SourceConfig,
74 storage: &str,
75 ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
76 let default_globs = self.default_globs()?;
77 io::storage::local::resolve_local_inputs(
78 config_dir,
79 entity_name,
80 source,
81 storage,
82 &default_globs,
83 )
84 }
85
86 fn read_input_columns(
87 &self,
88 entity: &config::EntityConfig,
89 input_file: &InputFile,
90 columns: &[config::ColumnConfig],
91 ) -> Result<Vec<String>, FileReadError>;
92
93 fn read_inputs(
94 &self,
95 entity: &config::EntityConfig,
96 files: &[InputFile],
97 columns: &[config::ColumnConfig],
98 normalize_strategy: Option<&str>,
99 collect_raw: bool,
100 ) -> FloeResult<Vec<ReadInput>>;
101
102 fn read_inputs_with_prechecked_columns(
103 &self,
104 entity: &config::EntityConfig,
105 files: &[InputFile],
106 columns: &[config::ColumnConfig],
107 normalize_strategy: Option<&str>,
108 collect_raw: bool,
109 prechecked_input_columns: Option<&[String]>,
110 ) -> FloeResult<Vec<ReadInput>> {
111 let _ = prechecked_input_columns;
112 self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
113 }
114}
115
116pub trait AcceptedSinkAdapter: Send + Sync {
117 #[allow(clippy::too_many_arguments)]
118 fn write_accepted(
119 &self,
120 target: &Target,
121 df: &mut DataFrame,
122 mode: config::WriteMode,
123 output_stem: &str,
124 temp_dir: Option<&Path>,
125 cloud: &mut io::storage::CloudClient,
126 resolver: &config::StorageResolver,
127 catalogs: &config::CatalogResolver,
128 entity: &config::EntityConfig,
129 ) -> FloeResult<AcceptedWriteOutput>;
130}
131
132pub struct RejectedWriteRequest<'a> {
133 pub target: &'a Target,
134 pub df: &'a mut DataFrame,
135 pub source_stem: &'a str,
136 pub temp_dir: Option<&'a Path>,
137 pub cloud: &'a mut io::storage::CloudClient,
138 pub resolver: &'a config::StorageResolver,
139 pub entity: &'a config::EntityConfig,
140 pub mode: config::WriteMode,
141}
142
143pub trait RejectedSinkAdapter: Send + Sync {
144 fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
145}
146
147#[derive(Debug, Clone, Copy)]
148pub enum FormatKind {
149 Source,
150 SinkAccepted,
151 SinkRejected,
152}
153
154impl FormatKind {
155 fn field_path(self) -> &'static str {
156 match self {
157 FormatKind::Source => "source.format",
158 FormatKind::SinkAccepted => "sink.accepted.format",
159 FormatKind::SinkRejected => "sink.rejected.format",
160 }
161 }
162
163 fn description(self) -> &'static str {
164 match self {
165 FormatKind::Source => "source format",
166 FormatKind::SinkAccepted => "accepted sink format",
167 FormatKind::SinkRejected => "rejected sink format",
168 }
169 }
170}
171
172fn unsupported_format_error(
173 kind: FormatKind,
174 format: &str,
175 entity_name: Option<&str>,
176) -> ConfigError {
177 if let Some(entity_name) = entity_name {
178 return ConfigError(format!(
179 "entity.name={} {}={} is unsupported",
180 entity_name,
181 kind.field_path(),
182 format
183 ));
184 }
185 ConfigError(format!("unsupported {}: {format}", kind.description()))
186}
187
188pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
189 if input_adapter(format).is_err() {
190 return Err(Box::new(unsupported_format_error(
191 FormatKind::Source,
192 format,
193 Some(entity_name),
194 )));
195 }
196 Ok(())
197}
198
199pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
200 if accepted_sink_adapter(format).is_err() {
201 return Err(Box::new(unsupported_format_error(
202 FormatKind::SinkAccepted,
203 format,
204 Some(entity_name),
205 )));
206 }
207 Ok(())
208}
209
210pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
211 if rejected_sink_adapter(format).is_err() {
212 return Err(Box::new(unsupported_format_error(
213 FormatKind::SinkRejected,
214 format,
215 Some(entity_name),
216 )));
217 }
218 Ok(())
219}
220
221pub fn resolve_read_columns(
222 entity: &config::EntityConfig,
223 normalized_columns: &[config::ColumnConfig],
224 normalize_strategy: Option<&str>,
225) -> FloeResult<Vec<config::ColumnConfig>> {
226 if entity.source.format == "json" || entity.source.format == "xml" {
227 check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
228 } else {
229 Ok(normalized_columns.to_vec())
230 }
231}
232
233pub fn sink_options_warning(
234 entity_name: &str,
235 format: &str,
236 options: Option<&config::SinkOptions>,
237) -> Option<String> {
238 let options = options?;
239 if format == "parquet" {
240 return None;
241 }
242 let mut keys = Vec::new();
243 if options.compression.is_some() {
244 keys.push("compression");
245 }
246 if options.row_group_size.is_some() {
247 keys.push("row_group_size");
248 }
249 if options.max_size_per_file.is_some() {
250 keys.push("max_size_per_file");
251 }
252 let detail = if keys.is_empty() {
253 "options".to_string()
254 } else {
255 keys.join(", ")
256 };
257 Some(format!(
258 "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
259 entity_name, format
260 ))
261}
262
263pub fn validate_sink_options(
264 entity_name: &str,
265 format: &str,
266 options: Option<&config::SinkOptions>,
267) -> FloeResult<()> {
268 let options = match options {
269 Some(options) => options,
270 None => return Ok(()),
271 };
272 if format != "parquet" {
273 return Ok(());
274 }
275 if let Some(compression) = &options.compression {
276 match compression.as_str() {
277 "snappy" | "gzip" | "zstd" | "uncompressed" => {}
278 _ => {
279 return Err(Box::new(ConfigError(format!(
280 "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
281 entity_name, compression
282 ))))
283 }
284 }
285 }
286 if let Some(row_group_size) = options.row_group_size {
287 if row_group_size == 0 {
288 return Err(Box::new(ConfigError(format!(
289 "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
290 entity_name
291 ))));
292 }
293 }
294 if let Some(max_size_per_file) = options.max_size_per_file {
295 if max_size_per_file == 0 {
296 return Err(Box::new(ConfigError(format!(
297 "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
298 entity_name
299 ))));
300 }
301 }
302 Ok(())
303}
304
305pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
306 match format {
307 "csv" => Ok(io::read::csv::csv_input_adapter()),
308 "tsv" => Ok(io::read::csv::tsv_input_adapter()),
309 "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
310 "orc" => Ok(io::read::orc::orc_input_adapter()),
311 "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
312 "json" => Ok(io::read::json::json_input_adapter()),
313 "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
314 "avro" => Ok(io::read::avro::avro_input_adapter()),
315 "xml" => Ok(io::read::xml::xml_input_adapter()),
316 _ => Err(Box::new(unsupported_format_error(
317 FormatKind::Source,
318 format,
319 None,
320 ))),
321 }
322}
323
324pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
325 match format {
326 "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
327 "delta" => Ok(io::write::delta::delta_accepted_adapter()),
328 "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
329 _ => Err(Box::new(unsupported_format_error(
330 FormatKind::SinkAccepted,
331 format,
332 None,
333 ))),
334 }
335}
336
337pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
338 match format {
339 "csv" => Ok(io::write::csv::csv_rejected_adapter()),
340 _ => Err(Box::new(unsupported_format_error(
341 FormatKind::SinkRejected,
342 format,
343 None,
344 ))),
345 }
346}
347
348pub(crate) fn read_input_from_df(
349 input_file: &InputFile,
350 df: &DataFrame,
351 columns: &[config::ColumnConfig],
352 normalize_strategy: Option<&str>,
353 collect_raw: bool,
354) -> FloeResult<ReadInput> {
355 let input_columns = df
356 .get_column_names()
357 .iter()
358 .map(|name| name.to_string())
359 .collect::<Vec<_>>();
360 let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
361 let raw_df = if collect_raw {
362 Some(cast_df_to_string(df)?)
363 } else {
364 None
365 };
366 let typed_df = cast_df_to_schema(df, &typed_schema)?;
367 finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
368}
369
370pub(crate) fn finalize_read_input(
371 input_file: &InputFile,
372 mut raw_df: Option<DataFrame>,
373 mut typed_df: DataFrame,
374 normalize_strategy: Option<&str>,
375) -> FloeResult<ReadInput> {
376 if let Some(strategy) = normalize_strategy {
377 if let Some(raw_df) = raw_df.as_mut() {
378 crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
379 }
380 crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
381 }
382 Ok(ReadInput::Data {
383 input_file: input_file.clone(),
384 raw_df,
385 typed_df,
386 })
387}
388
389pub(crate) fn build_typed_schema(
390 input_columns: &[String],
391 declared_columns: &[config::ColumnConfig],
392 normalize_strategy: Option<&str>,
393) -> FloeResult<Schema> {
394 let mut declared_types = HashMap::new();
395 for column in declared_columns {
396 declared_types.insert(
397 column.name.as_str(),
398 config::parse_data_type(&column.column_type)?,
399 );
400 }
401
402 let mut schema = Schema::with_capacity(input_columns.len());
403 for name in input_columns {
404 let normalized = if let Some(strategy) = normalize_strategy {
405 crate::checks::normalize::normalize_name(name, strategy)
406 } else {
407 name.to_string()
408 };
409 let dtype = declared_types
410 .get(normalized.as_str())
411 .cloned()
412 .unwrap_or(DataType::String);
413 schema.insert(name.as_str().into(), dtype);
414 }
415 Ok(schema)
416}
417
418pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
419 cast_df_with_type(df, &DataType::String)
420}
421
422pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
423 let mut columns = Vec::with_capacity(schema.len());
424 for (name, dtype) in schema.iter() {
425 let series = df.column(name.as_str()).map_err(|err| {
426 Box::new(ConfigError(format!(
427 "input column {} not found: {err}",
428 name.as_str()
429 )))
430 })?;
431 let casted =
432 if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
433 cast_string_to_bool(name.as_str(), series)?
434 } else {
435 series
436 .cast_with_options(dtype, CastOptions::NonStrict)
437 .map_err(|err| {
438 Box::new(ConfigError(format!(
439 "failed to cast input column {}: {err}",
440 name.as_str()
441 )))
442 })?
443 };
444 columns.push(casted);
445 }
446 DataFrame::new(columns).map_err(|err| {
447 Box::new(ConfigError(format!(
448 "failed to build typed dataframe: {err}"
449 ))) as Box<dyn std::error::Error + Send + Sync>
450 })
451}
452
453fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
454 let string_values = series.as_materialized_series().str().map_err(|err| {
455 Box::new(ConfigError(format!(
456 "failed to read boolean column {} as string: {err}",
457 name
458 )))
459 })?;
460 let mut values = Vec::with_capacity(series.len());
461 for value in string_values {
462 let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
463 "true" | "1" => Some(true),
464 "false" | "0" => Some(false),
465 _ => None,
466 });
467 values.push(parsed);
468 }
469 Ok(Series::new(name.into(), values).into())
470}
471
472fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
473 let mut out = df.clone();
474 let names = out
475 .get_column_names()
476 .iter()
477 .map(|name| name.to_string())
478 .collect::<Vec<_>>();
479 for name in names {
480 let series = out.column(&name).map_err(|err| {
481 Box::new(ConfigError(format!(
482 "input column {} not found: {err}",
483 name
484 )))
485 })?;
486 let casted = series
487 .cast_with_options(dtype, CastOptions::NonStrict)
488 .map_err(|err| {
489 Box::new(ConfigError(format!(
490 "failed to cast input column {}: {err}",
491 name
492 )))
493 })?;
494 let idx = out.get_column_index(&name).ok_or_else(|| {
495 Box::new(ConfigError(format!(
496 "input column {} not found for update",
497 name
498 )))
499 })?;
500 out.replace_column(idx, casted).map_err(|err| {
501 Box::new(ConfigError(format!(
502 "failed to update input column {}: {err}",
503 name
504 )))
505 })?;
506 }
507 Ok(out)
508}
509pub fn collect_row_errors(
510 raw_df: &DataFrame,
511 typed_df: &DataFrame,
512 required_cols: &[String],
513 columns: &[config::ColumnConfig],
514 track_cast_errors: bool,
515 raw_indices: &check::ColumnIndex,
516 typed_indices: &check::ColumnIndex,
517) -> FloeResult<Vec<Vec<check::RowError>>> {
518 let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
519 if track_cast_errors {
520 let cast_errors =
521 check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
522 for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
523 errors.extend(cast);
524 }
525 }
526 Ok(error_lists)
527}