Skip to main content

floe_core/io/
format.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12    pub source_uri: String,
13    pub source_local_path: PathBuf,
14    pub source_name: String,
15    pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20    pub rule: String,
21    pub message: String,
22}
23
24pub enum ReadInput {
25    Data {
26        input_file: InputFile,
27        raw_df: Option<DataFrame>,
28        typed_df: DataFrame,
29    },
30    FileError {
31        input_file: InputFile,
32        error: FileReadError,
33    },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteOutput {
38    pub parts_written: u64,
39    pub part_files: Vec<String>,
40    pub table_version: Option<i64>,
41}
42
43pub trait InputAdapter: Send + Sync {
44    fn format(&self) -> &'static str;
45
46    fn default_globs(&self) -> FloeResult<Vec<String>> {
47        io::storage::extensions::glob_patterns_for_format(self.format())
48    }
49
50    fn suffixes(&self) -> FloeResult<Vec<String>> {
51        io::storage::extensions::suffixes_for_format(self.format())
52    }
53
54    fn resolve_local_inputs(
55        &self,
56        config_dir: &Path,
57        entity_name: &str,
58        source: &config::SourceConfig,
59        storage: &str,
60    ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
61        let default_globs = self.default_globs()?;
62        io::storage::local::resolve_local_inputs(
63            config_dir,
64            entity_name,
65            source,
66            storage,
67            &default_globs,
68        )
69    }
70
71    fn read_input_columns(
72        &self,
73        entity: &config::EntityConfig,
74        input_file: &InputFile,
75        columns: &[config::ColumnConfig],
76    ) -> Result<Vec<String>, FileReadError>;
77
78    fn read_inputs(
79        &self,
80        entity: &config::EntityConfig,
81        files: &[InputFile],
82        columns: &[config::ColumnConfig],
83        normalize_strategy: Option<&str>,
84        collect_raw: bool,
85    ) -> FloeResult<Vec<ReadInput>>;
86}
87
88pub trait AcceptedSinkAdapter: Send + Sync {
89    #[allow(clippy::too_many_arguments)]
90    fn write_accepted(
91        &self,
92        target: &Target,
93        df: &mut DataFrame,
94        mode: config::WriteMode,
95        output_stem: &str,
96        temp_dir: Option<&Path>,
97        cloud: &mut io::storage::CloudClient,
98        resolver: &config::StorageResolver,
99        entity: &config::EntityConfig,
100    ) -> FloeResult<AcceptedWriteOutput>;
101}
102
103pub struct RejectedWriteRequest<'a> {
104    pub target: &'a Target,
105    pub df: &'a mut DataFrame,
106    pub source_stem: &'a str,
107    pub temp_dir: Option<&'a Path>,
108    pub cloud: &'a mut io::storage::CloudClient,
109    pub resolver: &'a config::StorageResolver,
110    pub entity: &'a config::EntityConfig,
111    pub mode: config::WriteMode,
112}
113
114pub trait RejectedSinkAdapter: Send + Sync {
115    fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
116}
117
118#[derive(Debug, Clone, Copy)]
119pub enum FormatKind {
120    Source,
121    SinkAccepted,
122    SinkRejected,
123}
124
125impl FormatKind {
126    fn field_path(self) -> &'static str {
127        match self {
128            FormatKind::Source => "source.format",
129            FormatKind::SinkAccepted => "sink.accepted.format",
130            FormatKind::SinkRejected => "sink.rejected.format",
131        }
132    }
133
134    fn description(self) -> &'static str {
135        match self {
136            FormatKind::Source => "source format",
137            FormatKind::SinkAccepted => "accepted sink format",
138            FormatKind::SinkRejected => "rejected sink format",
139        }
140    }
141}
142
143fn unsupported_format_error(
144    kind: FormatKind,
145    format: &str,
146    entity_name: Option<&str>,
147) -> ConfigError {
148    if let Some(entity_name) = entity_name {
149        return ConfigError(format!(
150            "entity.name={} {}={} is unsupported",
151            entity_name,
152            kind.field_path(),
153            format
154        ));
155    }
156    ConfigError(format!("unsupported {}: {format}", kind.description()))
157}
158
159pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
160    if input_adapter(format).is_err() {
161        return Err(Box::new(unsupported_format_error(
162            FormatKind::Source,
163            format,
164            Some(entity_name),
165        )));
166    }
167    Ok(())
168}
169
170pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
171    if accepted_sink_adapter(format).is_err() {
172        return Err(Box::new(unsupported_format_error(
173            FormatKind::SinkAccepted,
174            format,
175            Some(entity_name),
176        )));
177    }
178    Ok(())
179}
180
181pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
182    if rejected_sink_adapter(format).is_err() {
183        return Err(Box::new(unsupported_format_error(
184            FormatKind::SinkRejected,
185            format,
186            Some(entity_name),
187        )));
188    }
189    Ok(())
190}
191
192pub fn resolve_read_columns(
193    entity: &config::EntityConfig,
194    normalized_columns: &[config::ColumnConfig],
195    normalize_strategy: Option<&str>,
196) -> FloeResult<Vec<config::ColumnConfig>> {
197    if entity.source.format == "json" || entity.source.format == "xml" {
198        check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
199    } else {
200        Ok(normalized_columns.to_vec())
201    }
202}
203
204pub fn sink_options_warning(
205    entity_name: &str,
206    format: &str,
207    options: Option<&config::SinkOptions>,
208) -> Option<String> {
209    let options = options?;
210    if format == "parquet" {
211        return None;
212    }
213    let mut keys = Vec::new();
214    if options.compression.is_some() {
215        keys.push("compression");
216    }
217    if options.row_group_size.is_some() {
218        keys.push("row_group_size");
219    }
220    if options.max_size_per_file.is_some() {
221        keys.push("max_size_per_file");
222    }
223    let detail = if keys.is_empty() {
224        "options".to_string()
225    } else {
226        keys.join(", ")
227    };
228    Some(format!(
229        "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
230        entity_name, format
231    ))
232}
233
234pub fn validate_sink_options(
235    entity_name: &str,
236    format: &str,
237    options: Option<&config::SinkOptions>,
238) -> FloeResult<()> {
239    let options = match options {
240        Some(options) => options,
241        None => return Ok(()),
242    };
243    if format != "parquet" {
244        return Ok(());
245    }
246    if let Some(compression) = &options.compression {
247        match compression.as_str() {
248            "snappy" | "gzip" | "zstd" | "uncompressed" => {}
249            _ => {
250                return Err(Box::new(ConfigError(format!(
251                    "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
252                    entity_name, compression
253                ))))
254            }
255        }
256    }
257    if let Some(row_group_size) = options.row_group_size {
258        if row_group_size == 0 {
259            return Err(Box::new(ConfigError(format!(
260                "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
261                entity_name
262            ))));
263        }
264    }
265    if let Some(max_size_per_file) = options.max_size_per_file {
266        if max_size_per_file == 0 {
267            return Err(Box::new(ConfigError(format!(
268                "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
269                entity_name
270            ))));
271        }
272    }
273    Ok(())
274}
275
276pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
277    match format {
278        "csv" => Ok(io::read::csv::csv_input_adapter()),
279        "tsv" => Ok(io::read::csv::tsv_input_adapter()),
280        "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
281        "orc" => Ok(io::read::orc::orc_input_adapter()),
282        "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
283        "json" => Ok(io::read::json::json_input_adapter()),
284        "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
285        "avro" => Ok(io::read::avro::avro_input_adapter()),
286        "xml" => Ok(io::read::xml::xml_input_adapter()),
287        _ => Err(Box::new(unsupported_format_error(
288            FormatKind::Source,
289            format,
290            None,
291        ))),
292    }
293}
294
295pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
296    match format {
297        "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
298        "delta" => Ok(io::write::delta::delta_accepted_adapter()),
299        "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
300        _ => Err(Box::new(unsupported_format_error(
301            FormatKind::SinkAccepted,
302            format,
303            None,
304        ))),
305    }
306}
307
308pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
309    match format {
310        "csv" => Ok(io::write::csv::csv_rejected_adapter()),
311        _ => Err(Box::new(unsupported_format_error(
312            FormatKind::SinkRejected,
313            format,
314            None,
315        ))),
316    }
317}
318
319pub(crate) fn read_input_from_df(
320    input_file: &InputFile,
321    df: &DataFrame,
322    columns: &[config::ColumnConfig],
323    normalize_strategy: Option<&str>,
324    collect_raw: bool,
325) -> FloeResult<ReadInput> {
326    let input_columns = df
327        .get_column_names()
328        .iter()
329        .map(|name| name.to_string())
330        .collect::<Vec<_>>();
331    let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
332    let raw_df = if collect_raw {
333        Some(cast_df_to_string(df)?)
334    } else {
335        None
336    };
337    let typed_df = cast_df_to_schema(df, &typed_schema)?;
338    finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
339}
340
341pub(crate) fn finalize_read_input(
342    input_file: &InputFile,
343    mut raw_df: Option<DataFrame>,
344    mut typed_df: DataFrame,
345    normalize_strategy: Option<&str>,
346) -> FloeResult<ReadInput> {
347    if let Some(strategy) = normalize_strategy {
348        if let Some(raw_df) = raw_df.as_mut() {
349            crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
350        }
351        crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
352    }
353    Ok(ReadInput::Data {
354        input_file: input_file.clone(),
355        raw_df,
356        typed_df,
357    })
358}
359
360pub(crate) fn build_typed_schema(
361    input_columns: &[String],
362    declared_columns: &[config::ColumnConfig],
363    normalize_strategy: Option<&str>,
364) -> FloeResult<Schema> {
365    let mut declared_types = HashMap::new();
366    for column in declared_columns {
367        declared_types.insert(
368            column.name.as_str(),
369            config::parse_data_type(&column.column_type)?,
370        );
371    }
372
373    let mut schema = Schema::with_capacity(input_columns.len());
374    for name in input_columns {
375        let normalized = if let Some(strategy) = normalize_strategy {
376            crate::checks::normalize::normalize_name(name, strategy)
377        } else {
378            name.to_string()
379        };
380        let dtype = declared_types
381            .get(normalized.as_str())
382            .cloned()
383            .unwrap_or(DataType::String);
384        schema.insert(name.as_str().into(), dtype);
385    }
386    Ok(schema)
387}
388
389pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
390    cast_df_with_type(df, &DataType::String)
391}
392
393pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
394    let mut columns = Vec::with_capacity(schema.len());
395    for (name, dtype) in schema.iter() {
396        let series = df.column(name.as_str()).map_err(|err| {
397            Box::new(ConfigError(format!(
398                "input column {} not found: {err}",
399                name.as_str()
400            )))
401        })?;
402        let casted =
403            if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
404                cast_string_to_bool(name.as_str(), series)?
405            } else {
406                series
407                    .cast_with_options(dtype, CastOptions::NonStrict)
408                    .map_err(|err| {
409                        Box::new(ConfigError(format!(
410                            "failed to cast input column {}: {err}",
411                            name.as_str()
412                        )))
413                    })?
414            };
415        columns.push(casted);
416    }
417    DataFrame::new(columns).map_err(|err| {
418        Box::new(ConfigError(format!(
419            "failed to build typed dataframe: {err}"
420        ))) as Box<dyn std::error::Error + Send + Sync>
421    })
422}
423
424fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
425    let string_values = series.as_materialized_series().str().map_err(|err| {
426        Box::new(ConfigError(format!(
427            "failed to read boolean column {} as string: {err}",
428            name
429        )))
430    })?;
431    let mut values = Vec::with_capacity(series.len());
432    for value in string_values {
433        let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
434            "true" | "1" => Some(true),
435            "false" | "0" => Some(false),
436            _ => None,
437        });
438        values.push(parsed);
439    }
440    Ok(Series::new(name.into(), values).into())
441}
442
443fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
444    let mut out = df.clone();
445    let names = out
446        .get_column_names()
447        .iter()
448        .map(|name| name.to_string())
449        .collect::<Vec<_>>();
450    for name in names {
451        let series = out.column(&name).map_err(|err| {
452            Box::new(ConfigError(format!(
453                "input column {} not found: {err}",
454                name
455            )))
456        })?;
457        let casted = series
458            .cast_with_options(dtype, CastOptions::NonStrict)
459            .map_err(|err| {
460                Box::new(ConfigError(format!(
461                    "failed to cast input column {}: {err}",
462                    name
463                )))
464            })?;
465        let idx = out.get_column_index(&name).ok_or_else(|| {
466            Box::new(ConfigError(format!(
467                "input column {} not found for update",
468                name
469            )))
470        })?;
471        out.replace_column(idx, casted).map_err(|err| {
472            Box::new(ConfigError(format!(
473                "failed to update input column {}: {err}",
474                name
475            )))
476        })?;
477    }
478    Ok(out)
479}
480pub fn collect_row_errors(
481    raw_df: &DataFrame,
482    typed_df: &DataFrame,
483    required_cols: &[String],
484    columns: &[config::ColumnConfig],
485    track_cast_errors: bool,
486    raw_indices: &check::ColumnIndex,
487    typed_indices: &check::ColumnIndex,
488) -> FloeResult<Vec<Vec<check::RowError>>> {
489    let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
490    if track_cast_errors {
491        let cast_errors =
492            check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
493        for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
494            errors.extend(cast);
495        }
496    }
497    Ok(error_lists)
498}