Skip to main content

floe_core/io/
format.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12    pub source_uri: String,
13    pub source_local_path: PathBuf,
14    pub source_name: String,
15    pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20    pub rule: String,
21    pub message: String,
22}
23
24pub enum ReadInput {
25    Data {
26        input_file: InputFile,
27        raw_df: Option<DataFrame>,
28        typed_df: DataFrame,
29    },
30    FileError {
31        input_file: InputFile,
32        error: FileReadError,
33    },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteMetrics {
38    pub total_bytes_written: Option<u64>,
39    pub avg_file_size_mb: Option<f64>,
40    pub small_files_count: Option<u64>,
41}
42
43#[derive(Debug, Clone, Default)]
44pub struct AcceptedSchemaEvolution {
45    pub enabled: bool,
46    pub mode: String,
47    pub applied: bool,
48    pub added_columns: Vec<String>,
49    pub incompatible_changes_detected: bool,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct AcceptedWritePerfBreakdown {
54    pub conversion_ms: Option<u64>,
55    pub source_df_build_ms: Option<u64>,
56    pub merge_exec_ms: Option<u64>,
57    pub data_write_ms: Option<u64>,
58    pub commit_ms: Option<u64>,
59    pub metrics_read_ms: Option<u64>,
60}
61
62#[derive(Debug, Clone)]
63pub struct AcceptedMergeMetrics {
64    pub merge_key: Vec<String>,
65    pub inserted_count: u64,
66    pub updated_count: u64,
67    pub closed_count: Option<u64>,
68    pub unchanged_count: Option<u64>,
69    pub target_rows_before: u64,
70    pub target_rows_after: u64,
71    pub merge_elapsed_ms: u64,
72}
73
74#[derive(Debug, Clone)]
75pub struct AcceptedWriteOutput {
76    pub files_written: Option<u64>,
77    pub parts_written: u64,
78    pub part_files: Vec<String>,
79    pub table_version: Option<i64>,
80    pub snapshot_id: Option<i64>,
81    pub table_root_uri: Option<String>,
82    pub iceberg_catalog_name: Option<String>,
83    pub iceberg_database: Option<String>,
84    pub iceberg_namespace: Option<String>,
85    pub iceberg_table: Option<String>,
86    pub metrics: AcceptedWriteMetrics,
87    pub merge: Option<AcceptedMergeMetrics>,
88    pub schema_evolution: AcceptedSchemaEvolution,
89    pub perf: Option<AcceptedWritePerfBreakdown>,
90}
91
92pub trait InputAdapter: Send + Sync {
93    fn format(&self) -> &'static str;
94
95    fn default_globs(&self) -> FloeResult<Vec<String>> {
96        io::storage::extensions::glob_patterns_for_format(self.format())
97    }
98
99    fn suffixes(&self) -> FloeResult<Vec<String>> {
100        io::storage::extensions::suffixes_for_format(self.format())
101    }
102
103    fn resolve_local_inputs(
104        &self,
105        config_dir: &Path,
106        entity_name: &str,
107        source: &config::SourceConfig,
108        storage: &str,
109    ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
110        let default_globs = self.default_globs()?;
111        io::storage::local::resolve_local_inputs(
112            config_dir,
113            entity_name,
114            source,
115            storage,
116            &default_globs,
117        )
118    }
119
120    fn read_input_columns(
121        &self,
122        entity: &config::EntityConfig,
123        input_file: &InputFile,
124        columns: &[config::ColumnConfig],
125    ) -> Result<Vec<String>, FileReadError>;
126
127    fn read_inputs(
128        &self,
129        entity: &config::EntityConfig,
130        files: &[InputFile],
131        columns: &[config::ColumnConfig],
132        normalize_strategy: Option<&str>,
133        collect_raw: bool,
134    ) -> FloeResult<Vec<ReadInput>>;
135
136    fn read_inputs_with_prechecked_columns(
137        &self,
138        entity: &config::EntityConfig,
139        files: &[InputFile],
140        columns: &[config::ColumnConfig],
141        normalize_strategy: Option<&str>,
142        collect_raw: bool,
143        prechecked_input_columns: Option<&[String]>,
144    ) -> FloeResult<Vec<ReadInput>> {
145        let _ = prechecked_input_columns;
146        self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
147    }
148}
149
150pub trait AcceptedSinkAdapter: Send + Sync {
151    #[allow(clippy::too_many_arguments)]
152    fn write_accepted(
153        &self,
154        target: &Target,
155        df: &mut DataFrame,
156        mode: config::WriteMode,
157        output_stem: &str,
158        temp_dir: Option<&Path>,
159        cloud: &mut io::storage::CloudClient,
160        resolver: &config::StorageResolver,
161        catalogs: &config::CatalogResolver,
162        entity: &config::EntityConfig,
163    ) -> FloeResult<AcceptedWriteOutput>;
164}
165
166pub struct RejectedWriteRequest<'a> {
167    pub target: &'a Target,
168    pub df: &'a mut DataFrame,
169    pub source_stem: &'a str,
170    pub temp_dir: Option<&'a Path>,
171    pub cloud: &'a mut io::storage::CloudClient,
172    pub resolver: &'a config::StorageResolver,
173    pub entity: &'a config::EntityConfig,
174    pub mode: config::WriteMode,
175}
176
177pub trait RejectedSinkAdapter: Send + Sync {
178    fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
179}
180
181#[derive(Debug, Clone, Copy)]
182pub enum FormatKind {
183    Source,
184    SinkAccepted,
185    SinkRejected,
186}
187
188impl FormatKind {
189    fn field_path(self) -> &'static str {
190        match self {
191            FormatKind::Source => "source.format",
192            FormatKind::SinkAccepted => "sink.accepted.format",
193            FormatKind::SinkRejected => "sink.rejected.format",
194        }
195    }
196
197    fn description(self) -> &'static str {
198        match self {
199            FormatKind::Source => "source format",
200            FormatKind::SinkAccepted => "accepted sink format",
201            FormatKind::SinkRejected => "rejected sink format",
202        }
203    }
204}
205
206fn unsupported_format_error(
207    kind: FormatKind,
208    format: &str,
209    entity_name: Option<&str>,
210) -> ConfigError {
211    if let Some(entity_name) = entity_name {
212        return ConfigError(format!(
213            "entity.name={} {}={} is unsupported",
214            entity_name,
215            kind.field_path(),
216            format
217        ));
218    }
219    ConfigError(format!("unsupported {}: {format}", kind.description()))
220}
221
222pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
223    if input_adapter(format).is_err() {
224        return Err(Box::new(unsupported_format_error(
225            FormatKind::Source,
226            format,
227            Some(entity_name),
228        )));
229    }
230    Ok(())
231}
232
233pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
234    if accepted_sink_adapter(format).is_err() {
235        return Err(Box::new(unsupported_format_error(
236            FormatKind::SinkAccepted,
237            format,
238            Some(entity_name),
239        )));
240    }
241    Ok(())
242}
243
244pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
245    if rejected_sink_adapter(format).is_err() {
246        return Err(Box::new(unsupported_format_error(
247            FormatKind::SinkRejected,
248            format,
249            Some(entity_name),
250        )));
251    }
252    Ok(())
253}
254
255pub fn resolve_read_columns(
256    entity: &config::EntityConfig,
257    normalized_columns: &[config::ColumnConfig],
258    normalize_strategy: Option<&str>,
259) -> FloeResult<Vec<config::ColumnConfig>> {
260    if entity.source.format == "json" || entity.source.format == "xml" {
261        check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
262    } else {
263        Ok(normalized_columns.to_vec())
264    }
265}
266
267pub fn sink_options_warning(
268    entity_name: &str,
269    format: &str,
270    options: Option<&config::SinkOptions>,
271) -> Option<String> {
272    let options = options?;
273    if format == "parquet" {
274        return None;
275    }
276    let mut keys = Vec::new();
277    if options.compression.is_some() {
278        keys.push("compression");
279    }
280    if options.row_group_size.is_some() {
281        keys.push("row_group_size");
282    }
283    if options.max_size_per_file.is_some() {
284        keys.push("max_size_per_file");
285    }
286    let detail = if keys.is_empty() {
287        "options".to_string()
288    } else {
289        keys.join(", ")
290    };
291    Some(format!(
292        "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
293        entity_name, format
294    ))
295}
296
297pub fn validate_sink_options(
298    entity_name: &str,
299    format: &str,
300    options: Option<&config::SinkOptions>,
301) -> FloeResult<()> {
302    let options = match options {
303        Some(options) => options,
304        None => return Ok(()),
305    };
306    if format != "parquet" {
307        return Ok(());
308    }
309    if let Some(compression) = &options.compression {
310        match compression.as_str() {
311            "snappy" | "gzip" | "zstd" | "uncompressed" => {}
312            _ => {
313                return Err(Box::new(ConfigError(format!(
314                    "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
315                    entity_name, compression
316                ))))
317            }
318        }
319    }
320    if let Some(row_group_size) = options.row_group_size {
321        if row_group_size == 0 {
322            return Err(Box::new(ConfigError(format!(
323                "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
324                entity_name
325            ))));
326        }
327    }
328    if let Some(max_size_per_file) = options.max_size_per_file {
329        if max_size_per_file == 0 {
330            return Err(Box::new(ConfigError(format!(
331                "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
332                entity_name
333            ))));
334        }
335    }
336    Ok(())
337}
338
339pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
340    match format {
341        "csv" => Ok(io::read::csv::csv_input_adapter()),
342        "tsv" => Ok(io::read::csv::tsv_input_adapter()),
343        "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
344        "orc" => Ok(io::read::orc::orc_input_adapter()),
345        "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
346        "json" => Ok(io::read::json::json_input_adapter()),
347        "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
348        "avro" => Ok(io::read::avro::avro_input_adapter()),
349        "xml" => Ok(io::read::xml::xml_input_adapter()),
350        _ => Err(Box::new(unsupported_format_error(
351            FormatKind::Source,
352            format,
353            None,
354        ))),
355    }
356}
357
358pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
359    match format {
360        "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
361        "delta" => Ok(io::write::delta::delta_accepted_adapter()),
362        "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
363        _ => Err(Box::new(unsupported_format_error(
364            FormatKind::SinkAccepted,
365            format,
366            None,
367        ))),
368    }
369}
370
371pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
372    match format {
373        "csv" => Ok(io::write::csv::csv_rejected_adapter()),
374        _ => Err(Box::new(unsupported_format_error(
375            FormatKind::SinkRejected,
376            format,
377            None,
378        ))),
379    }
380}
381
382pub(crate) fn read_input_from_df(
383    input_file: &InputFile,
384    df: &DataFrame,
385    columns: &[config::ColumnConfig],
386    normalize_strategy: Option<&str>,
387    collect_raw: bool,
388) -> FloeResult<ReadInput> {
389    let input_columns = df
390        .get_column_names()
391        .iter()
392        .map(|name| name.to_string())
393        .collect::<Vec<_>>();
394    let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
395    let raw_df = if collect_raw {
396        Some(cast_df_to_string(df)?)
397    } else {
398        None
399    };
400    let typed_df = cast_df_to_schema(df, &typed_schema)?;
401    finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
402}
403
404pub(crate) fn finalize_read_input(
405    input_file: &InputFile,
406    mut raw_df: Option<DataFrame>,
407    mut typed_df: DataFrame,
408    normalize_strategy: Option<&str>,
409) -> FloeResult<ReadInput> {
410    if let Some(strategy) = normalize_strategy {
411        if let Some(raw_df) = raw_df.as_mut() {
412            crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
413        }
414        crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
415    }
416    Ok(ReadInput::Data {
417        input_file: input_file.clone(),
418        raw_df,
419        typed_df,
420    })
421}
422
423pub(crate) fn build_typed_schema(
424    input_columns: &[String],
425    declared_columns: &[config::ColumnConfig],
426    normalize_strategy: Option<&str>,
427) -> FloeResult<Schema> {
428    let mut declared_types = HashMap::new();
429    for column in declared_columns {
430        declared_types.insert(
431            column.name.as_str(),
432            config::parse_data_type(&column.column_type)?,
433        );
434    }
435
436    let mut schema = Schema::with_capacity(input_columns.len());
437    for name in input_columns {
438        let normalized = if let Some(strategy) = normalize_strategy {
439            crate::checks::normalize::normalize_name(name, strategy)
440        } else {
441            name.to_string()
442        };
443        let dtype = declared_types
444            .get(normalized.as_str())
445            .cloned()
446            .unwrap_or(DataType::String);
447        schema.insert(name.as_str().into(), dtype);
448    }
449    Ok(schema)
450}
451
452pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
453    cast_df_with_type(df, &DataType::String)
454}
455
456pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
457    let mut columns = Vec::with_capacity(schema.len());
458    for (name, dtype) in schema.iter() {
459        let series = df.column(name.as_str()).map_err(|err| {
460            Box::new(ConfigError(format!(
461                "input column {} not found: {err}",
462                name.as_str()
463            )))
464        })?;
465        let casted =
466            if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
467                cast_string_to_bool(name.as_str(), series)?
468            } else {
469                series
470                    .cast_with_options(dtype, CastOptions::NonStrict)
471                    .map_err(|err| {
472                        Box::new(ConfigError(format!(
473                            "failed to cast input column {}: {err}",
474                            name.as_str()
475                        )))
476                    })?
477            };
478        columns.push(casted);
479    }
480    DataFrame::new(columns).map_err(|err| {
481        Box::new(ConfigError(format!(
482            "failed to build typed dataframe: {err}"
483        ))) as Box<dyn std::error::Error + Send + Sync>
484    })
485}
486
487fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
488    let string_values = series.as_materialized_series().str().map_err(|err| {
489        Box::new(ConfigError(format!(
490            "failed to read boolean column {} as string: {err}",
491            name
492        )))
493    })?;
494    let mut values = Vec::with_capacity(series.len());
495    for value in string_values {
496        let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
497            "true" | "1" => Some(true),
498            "false" | "0" => Some(false),
499            _ => None,
500        });
501        values.push(parsed);
502    }
503    Ok(Series::new(name.into(), values).into())
504}
505
506fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
507    let mut out = df.clone();
508    let names = out
509        .get_column_names()
510        .iter()
511        .map(|name| name.to_string())
512        .collect::<Vec<_>>();
513    for name in names {
514        let series = out.column(&name).map_err(|err| {
515            Box::new(ConfigError(format!(
516                "input column {} not found: {err}",
517                name
518            )))
519        })?;
520        let casted = series
521            .cast_with_options(dtype, CastOptions::NonStrict)
522            .map_err(|err| {
523                Box::new(ConfigError(format!(
524                    "failed to cast input column {}: {err}",
525                    name
526                )))
527            })?;
528        let idx = out.get_column_index(&name).ok_or_else(|| {
529            Box::new(ConfigError(format!(
530                "input column {} not found for update",
531                name
532            )))
533        })?;
534        out.replace_column(idx, casted).map_err(|err| {
535            Box::new(ConfigError(format!(
536                "failed to update input column {}: {err}",
537                name
538            )))
539        })?;
540    }
541    Ok(out)
542}
543pub fn collect_row_errors(
544    raw_df: &DataFrame,
545    typed_df: &DataFrame,
546    required_cols: &[String],
547    columns: &[config::ColumnConfig],
548    track_cast_errors: bool,
549    raw_indices: &check::ColumnIndex,
550    typed_indices: &check::ColumnIndex,
551) -> FloeResult<Vec<Vec<check::RowError>>> {
552    let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
553    if track_cast_errors {
554        let cast_errors =
555            check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
556        for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
557            errors.extend(cast);
558        }
559    }
560    Ok(error_lists)
561}