Skip to main content

floe_core/io/
format.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12    pub source_uri: String,
13    pub source_local_path: PathBuf,
14    pub source_name: String,
15    pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20    pub rule: String,
21    pub message: String,
22}
23
24pub enum ReadInput {
25    Data {
26        input_file: InputFile,
27        raw_df: Option<DataFrame>,
28        typed_df: DataFrame,
29    },
30    FileError {
31        input_file: InputFile,
32        error: FileReadError,
33    },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteMetrics {
38    pub total_bytes_written: Option<u64>,
39    pub avg_file_size_mb: Option<f64>,
40    pub small_files_count: Option<u64>,
41}
42
43#[derive(Debug, Clone)]
44pub struct AcceptedWriteOutput {
45    pub files_written: u64,
46    pub parts_written: u64,
47    pub part_files: Vec<String>,
48    pub table_version: Option<i64>,
49    pub snapshot_id: Option<i64>,
50    pub table_root_uri: Option<String>,
51    pub iceberg_catalog_name: Option<String>,
52    pub iceberg_database: Option<String>,
53    pub iceberg_namespace: Option<String>,
54    pub iceberg_table: Option<String>,
55    pub metrics: AcceptedWriteMetrics,
56}
57
58pub trait InputAdapter: Send + Sync {
59    fn format(&self) -> &'static str;
60
61    fn default_globs(&self) -> FloeResult<Vec<String>> {
62        io::storage::extensions::glob_patterns_for_format(self.format())
63    }
64
65    fn suffixes(&self) -> FloeResult<Vec<String>> {
66        io::storage::extensions::suffixes_for_format(self.format())
67    }
68
69    fn resolve_local_inputs(
70        &self,
71        config_dir: &Path,
72        entity_name: &str,
73        source: &config::SourceConfig,
74        storage: &str,
75    ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
76        let default_globs = self.default_globs()?;
77        io::storage::local::resolve_local_inputs(
78            config_dir,
79            entity_name,
80            source,
81            storage,
82            &default_globs,
83        )
84    }
85
86    fn read_input_columns(
87        &self,
88        entity: &config::EntityConfig,
89        input_file: &InputFile,
90        columns: &[config::ColumnConfig],
91    ) -> Result<Vec<String>, FileReadError>;
92
93    fn read_inputs(
94        &self,
95        entity: &config::EntityConfig,
96        files: &[InputFile],
97        columns: &[config::ColumnConfig],
98        normalize_strategy: Option<&str>,
99        collect_raw: bool,
100    ) -> FloeResult<Vec<ReadInput>>;
101}
102
103pub trait AcceptedSinkAdapter: Send + Sync {
104    #[allow(clippy::too_many_arguments)]
105    fn write_accepted(
106        &self,
107        target: &Target,
108        df: &mut DataFrame,
109        mode: config::WriteMode,
110        output_stem: &str,
111        temp_dir: Option<&Path>,
112        cloud: &mut io::storage::CloudClient,
113        resolver: &config::StorageResolver,
114        catalogs: &config::CatalogResolver,
115        entity: &config::EntityConfig,
116    ) -> FloeResult<AcceptedWriteOutput>;
117}
118
119pub struct RejectedWriteRequest<'a> {
120    pub target: &'a Target,
121    pub df: &'a mut DataFrame,
122    pub source_stem: &'a str,
123    pub temp_dir: Option<&'a Path>,
124    pub cloud: &'a mut io::storage::CloudClient,
125    pub resolver: &'a config::StorageResolver,
126    pub entity: &'a config::EntityConfig,
127    pub mode: config::WriteMode,
128}
129
130pub trait RejectedSinkAdapter: Send + Sync {
131    fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
132}
133
134#[derive(Debug, Clone, Copy)]
135pub enum FormatKind {
136    Source,
137    SinkAccepted,
138    SinkRejected,
139}
140
141impl FormatKind {
142    fn field_path(self) -> &'static str {
143        match self {
144            FormatKind::Source => "source.format",
145            FormatKind::SinkAccepted => "sink.accepted.format",
146            FormatKind::SinkRejected => "sink.rejected.format",
147        }
148    }
149
150    fn description(self) -> &'static str {
151        match self {
152            FormatKind::Source => "source format",
153            FormatKind::SinkAccepted => "accepted sink format",
154            FormatKind::SinkRejected => "rejected sink format",
155        }
156    }
157}
158
159fn unsupported_format_error(
160    kind: FormatKind,
161    format: &str,
162    entity_name: Option<&str>,
163) -> ConfigError {
164    if let Some(entity_name) = entity_name {
165        return ConfigError(format!(
166            "entity.name={} {}={} is unsupported",
167            entity_name,
168            kind.field_path(),
169            format
170        ));
171    }
172    ConfigError(format!("unsupported {}: {format}", kind.description()))
173}
174
175pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
176    if input_adapter(format).is_err() {
177        return Err(Box::new(unsupported_format_error(
178            FormatKind::Source,
179            format,
180            Some(entity_name),
181        )));
182    }
183    Ok(())
184}
185
186pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
187    if accepted_sink_adapter(format).is_err() {
188        return Err(Box::new(unsupported_format_error(
189            FormatKind::SinkAccepted,
190            format,
191            Some(entity_name),
192        )));
193    }
194    Ok(())
195}
196
197pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
198    if rejected_sink_adapter(format).is_err() {
199        return Err(Box::new(unsupported_format_error(
200            FormatKind::SinkRejected,
201            format,
202            Some(entity_name),
203        )));
204    }
205    Ok(())
206}
207
208pub fn resolve_read_columns(
209    entity: &config::EntityConfig,
210    normalized_columns: &[config::ColumnConfig],
211    normalize_strategy: Option<&str>,
212) -> FloeResult<Vec<config::ColumnConfig>> {
213    if entity.source.format == "json" || entity.source.format == "xml" {
214        check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
215    } else {
216        Ok(normalized_columns.to_vec())
217    }
218}
219
220pub fn sink_options_warning(
221    entity_name: &str,
222    format: &str,
223    options: Option<&config::SinkOptions>,
224) -> Option<String> {
225    let options = options?;
226    if format == "parquet" {
227        return None;
228    }
229    let mut keys = Vec::new();
230    if options.compression.is_some() {
231        keys.push("compression");
232    }
233    if options.row_group_size.is_some() {
234        keys.push("row_group_size");
235    }
236    if options.max_size_per_file.is_some() {
237        keys.push("max_size_per_file");
238    }
239    let detail = if keys.is_empty() {
240        "options".to_string()
241    } else {
242        keys.join(", ")
243    };
244    Some(format!(
245        "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
246        entity_name, format
247    ))
248}
249
250pub fn validate_sink_options(
251    entity_name: &str,
252    format: &str,
253    options: Option<&config::SinkOptions>,
254) -> FloeResult<()> {
255    let options = match options {
256        Some(options) => options,
257        None => return Ok(()),
258    };
259    if format != "parquet" {
260        return Ok(());
261    }
262    if let Some(compression) = &options.compression {
263        match compression.as_str() {
264            "snappy" | "gzip" | "zstd" | "uncompressed" => {}
265            _ => {
266                return Err(Box::new(ConfigError(format!(
267                    "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
268                    entity_name, compression
269                ))))
270            }
271        }
272    }
273    if let Some(row_group_size) = options.row_group_size {
274        if row_group_size == 0 {
275            return Err(Box::new(ConfigError(format!(
276                "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
277                entity_name
278            ))));
279        }
280    }
281    if let Some(max_size_per_file) = options.max_size_per_file {
282        if max_size_per_file == 0 {
283            return Err(Box::new(ConfigError(format!(
284                "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
285                entity_name
286            ))));
287        }
288    }
289    Ok(())
290}
291
292pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
293    match format {
294        "csv" => Ok(io::read::csv::csv_input_adapter()),
295        "tsv" => Ok(io::read::csv::tsv_input_adapter()),
296        "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
297        "orc" => Ok(io::read::orc::orc_input_adapter()),
298        "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
299        "json" => Ok(io::read::json::json_input_adapter()),
300        "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
301        "avro" => Ok(io::read::avro::avro_input_adapter()),
302        "xml" => Ok(io::read::xml::xml_input_adapter()),
303        _ => Err(Box::new(unsupported_format_error(
304            FormatKind::Source,
305            format,
306            None,
307        ))),
308    }
309}
310
311pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
312    match format {
313        "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
314        "delta" => Ok(io::write::delta::delta_accepted_adapter()),
315        "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
316        _ => Err(Box::new(unsupported_format_error(
317            FormatKind::SinkAccepted,
318            format,
319            None,
320        ))),
321    }
322}
323
324pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
325    match format {
326        "csv" => Ok(io::write::csv::csv_rejected_adapter()),
327        _ => Err(Box::new(unsupported_format_error(
328            FormatKind::SinkRejected,
329            format,
330            None,
331        ))),
332    }
333}
334
335pub(crate) fn read_input_from_df(
336    input_file: &InputFile,
337    df: &DataFrame,
338    columns: &[config::ColumnConfig],
339    normalize_strategy: Option<&str>,
340    collect_raw: bool,
341) -> FloeResult<ReadInput> {
342    let input_columns = df
343        .get_column_names()
344        .iter()
345        .map(|name| name.to_string())
346        .collect::<Vec<_>>();
347    let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
348    let raw_df = if collect_raw {
349        Some(cast_df_to_string(df)?)
350    } else {
351        None
352    };
353    let typed_df = cast_df_to_schema(df, &typed_schema)?;
354    finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
355}
356
357pub(crate) fn finalize_read_input(
358    input_file: &InputFile,
359    mut raw_df: Option<DataFrame>,
360    mut typed_df: DataFrame,
361    normalize_strategy: Option<&str>,
362) -> FloeResult<ReadInput> {
363    if let Some(strategy) = normalize_strategy {
364        if let Some(raw_df) = raw_df.as_mut() {
365            crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
366        }
367        crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
368    }
369    Ok(ReadInput::Data {
370        input_file: input_file.clone(),
371        raw_df,
372        typed_df,
373    })
374}
375
376pub(crate) fn build_typed_schema(
377    input_columns: &[String],
378    declared_columns: &[config::ColumnConfig],
379    normalize_strategy: Option<&str>,
380) -> FloeResult<Schema> {
381    let mut declared_types = HashMap::new();
382    for column in declared_columns {
383        declared_types.insert(
384            column.name.as_str(),
385            config::parse_data_type(&column.column_type)?,
386        );
387    }
388
389    let mut schema = Schema::with_capacity(input_columns.len());
390    for name in input_columns {
391        let normalized = if let Some(strategy) = normalize_strategy {
392            crate::checks::normalize::normalize_name(name, strategy)
393        } else {
394            name.to_string()
395        };
396        let dtype = declared_types
397            .get(normalized.as_str())
398            .cloned()
399            .unwrap_or(DataType::String);
400        schema.insert(name.as_str().into(), dtype);
401    }
402    Ok(schema)
403}
404
405pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
406    cast_df_with_type(df, &DataType::String)
407}
408
409pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
410    let mut columns = Vec::with_capacity(schema.len());
411    for (name, dtype) in schema.iter() {
412        let series = df.column(name.as_str()).map_err(|err| {
413            Box::new(ConfigError(format!(
414                "input column {} not found: {err}",
415                name.as_str()
416            )))
417        })?;
418        let casted =
419            if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
420                cast_string_to_bool(name.as_str(), series)?
421            } else {
422                series
423                    .cast_with_options(dtype, CastOptions::NonStrict)
424                    .map_err(|err| {
425                        Box::new(ConfigError(format!(
426                            "failed to cast input column {}: {err}",
427                            name.as_str()
428                        )))
429                    })?
430            };
431        columns.push(casted);
432    }
433    DataFrame::new(columns).map_err(|err| {
434        Box::new(ConfigError(format!(
435            "failed to build typed dataframe: {err}"
436        ))) as Box<dyn std::error::Error + Send + Sync>
437    })
438}
439
440fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
441    let string_values = series.as_materialized_series().str().map_err(|err| {
442        Box::new(ConfigError(format!(
443            "failed to read boolean column {} as string: {err}",
444            name
445        )))
446    })?;
447    let mut values = Vec::with_capacity(series.len());
448    for value in string_values {
449        let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
450            "true" | "1" => Some(true),
451            "false" | "0" => Some(false),
452            _ => None,
453        });
454        values.push(parsed);
455    }
456    Ok(Series::new(name.into(), values).into())
457}
458
459fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
460    let mut out = df.clone();
461    let names = out
462        .get_column_names()
463        .iter()
464        .map(|name| name.to_string())
465        .collect::<Vec<_>>();
466    for name in names {
467        let series = out.column(&name).map_err(|err| {
468            Box::new(ConfigError(format!(
469                "input column {} not found: {err}",
470                name
471            )))
472        })?;
473        let casted = series
474            .cast_with_options(dtype, CastOptions::NonStrict)
475            .map_err(|err| {
476                Box::new(ConfigError(format!(
477                    "failed to cast input column {}: {err}",
478                    name
479                )))
480            })?;
481        let idx = out.get_column_index(&name).ok_or_else(|| {
482            Box::new(ConfigError(format!(
483                "input column {} not found for update",
484                name
485            )))
486        })?;
487        out.replace_column(idx, casted).map_err(|err| {
488            Box::new(ConfigError(format!(
489                "failed to update input column {}: {err}",
490                name
491            )))
492        })?;
493    }
494    Ok(out)
495}
496pub fn collect_row_errors(
497    raw_df: &DataFrame,
498    typed_df: &DataFrame,
499    required_cols: &[String],
500    columns: &[config::ColumnConfig],
501    track_cast_errors: bool,
502    raw_indices: &check::ColumnIndex,
503    typed_indices: &check::ColumnIndex,
504) -> FloeResult<Vec<Vec<check::RowError>>> {
505    let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
506    if track_cast_errors {
507        let cast_errors =
508            check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
509        for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
510            errors.extend(cast);
511        }
512    }
513    Ok(error_lists)
514}