Skip to main content

floe_core/io/
format.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12    pub source_uri: String,
13    pub source_local_path: PathBuf,
14    pub source_name: String,
15    pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20    pub rule: String,
21    pub message: String,
22}
23
24pub enum ReadInput {
25    Data {
26        input_file: InputFile,
27        raw_df: Option<DataFrame>,
28        typed_df: DataFrame,
29    },
30    FileError {
31        input_file: InputFile,
32        error: FileReadError,
33    },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteOutput {
38    pub parts_written: u64,
39    pub part_files: Vec<String>,
40    pub table_version: Option<i64>,
41    pub snapshot_id: Option<i64>,
42}
43
44pub trait InputAdapter: Send + Sync {
45    fn format(&self) -> &'static str;
46
47    fn default_globs(&self) -> FloeResult<Vec<String>> {
48        io::storage::extensions::glob_patterns_for_format(self.format())
49    }
50
51    fn suffixes(&self) -> FloeResult<Vec<String>> {
52        io::storage::extensions::suffixes_for_format(self.format())
53    }
54
55    fn resolve_local_inputs(
56        &self,
57        config_dir: &Path,
58        entity_name: &str,
59        source: &config::SourceConfig,
60        storage: &str,
61    ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
62        let default_globs = self.default_globs()?;
63        io::storage::local::resolve_local_inputs(
64            config_dir,
65            entity_name,
66            source,
67            storage,
68            &default_globs,
69        )
70    }
71
72    fn read_input_columns(
73        &self,
74        entity: &config::EntityConfig,
75        input_file: &InputFile,
76        columns: &[config::ColumnConfig],
77    ) -> Result<Vec<String>, FileReadError>;
78
79    fn read_inputs(
80        &self,
81        entity: &config::EntityConfig,
82        files: &[InputFile],
83        columns: &[config::ColumnConfig],
84        normalize_strategy: Option<&str>,
85        collect_raw: bool,
86    ) -> FloeResult<Vec<ReadInput>>;
87}
88
89pub trait AcceptedSinkAdapter: Send + Sync {
90    #[allow(clippy::too_many_arguments)]
91    fn write_accepted(
92        &self,
93        target: &Target,
94        df: &mut DataFrame,
95        mode: config::WriteMode,
96        output_stem: &str,
97        temp_dir: Option<&Path>,
98        cloud: &mut io::storage::CloudClient,
99        resolver: &config::StorageResolver,
100        entity: &config::EntityConfig,
101    ) -> FloeResult<AcceptedWriteOutput>;
102}
103
104pub struct RejectedWriteRequest<'a> {
105    pub target: &'a Target,
106    pub df: &'a mut DataFrame,
107    pub source_stem: &'a str,
108    pub temp_dir: Option<&'a Path>,
109    pub cloud: &'a mut io::storage::CloudClient,
110    pub resolver: &'a config::StorageResolver,
111    pub entity: &'a config::EntityConfig,
112    pub mode: config::WriteMode,
113}
114
115pub trait RejectedSinkAdapter: Send + Sync {
116    fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
117}
118
119#[derive(Debug, Clone, Copy)]
120pub enum FormatKind {
121    Source,
122    SinkAccepted,
123    SinkRejected,
124}
125
126impl FormatKind {
127    fn field_path(self) -> &'static str {
128        match self {
129            FormatKind::Source => "source.format",
130            FormatKind::SinkAccepted => "sink.accepted.format",
131            FormatKind::SinkRejected => "sink.rejected.format",
132        }
133    }
134
135    fn description(self) -> &'static str {
136        match self {
137            FormatKind::Source => "source format",
138            FormatKind::SinkAccepted => "accepted sink format",
139            FormatKind::SinkRejected => "rejected sink format",
140        }
141    }
142}
143
144fn unsupported_format_error(
145    kind: FormatKind,
146    format: &str,
147    entity_name: Option<&str>,
148) -> ConfigError {
149    if let Some(entity_name) = entity_name {
150        return ConfigError(format!(
151            "entity.name={} {}={} is unsupported",
152            entity_name,
153            kind.field_path(),
154            format
155        ));
156    }
157    ConfigError(format!("unsupported {}: {format}", kind.description()))
158}
159
160pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
161    if input_adapter(format).is_err() {
162        return Err(Box::new(unsupported_format_error(
163            FormatKind::Source,
164            format,
165            Some(entity_name),
166        )));
167    }
168    Ok(())
169}
170
171pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
172    if accepted_sink_adapter(format).is_err() {
173        return Err(Box::new(unsupported_format_error(
174            FormatKind::SinkAccepted,
175            format,
176            Some(entity_name),
177        )));
178    }
179    Ok(())
180}
181
182pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
183    if rejected_sink_adapter(format).is_err() {
184        return Err(Box::new(unsupported_format_error(
185            FormatKind::SinkRejected,
186            format,
187            Some(entity_name),
188        )));
189    }
190    Ok(())
191}
192
193pub fn resolve_read_columns(
194    entity: &config::EntityConfig,
195    normalized_columns: &[config::ColumnConfig],
196    normalize_strategy: Option<&str>,
197) -> FloeResult<Vec<config::ColumnConfig>> {
198    if entity.source.format == "json" || entity.source.format == "xml" {
199        check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
200    } else {
201        Ok(normalized_columns.to_vec())
202    }
203}
204
205pub fn sink_options_warning(
206    entity_name: &str,
207    format: &str,
208    options: Option<&config::SinkOptions>,
209) -> Option<String> {
210    let options = options?;
211    if format == "parquet" {
212        return None;
213    }
214    let mut keys = Vec::new();
215    if options.compression.is_some() {
216        keys.push("compression");
217    }
218    if options.row_group_size.is_some() {
219        keys.push("row_group_size");
220    }
221    if options.max_size_per_file.is_some() {
222        keys.push("max_size_per_file");
223    }
224    let detail = if keys.is_empty() {
225        "options".to_string()
226    } else {
227        keys.join(", ")
228    };
229    Some(format!(
230        "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
231        entity_name, format
232    ))
233}
234
235pub fn validate_sink_options(
236    entity_name: &str,
237    format: &str,
238    options: Option<&config::SinkOptions>,
239) -> FloeResult<()> {
240    let options = match options {
241        Some(options) => options,
242        None => return Ok(()),
243    };
244    if format != "parquet" {
245        return Ok(());
246    }
247    if let Some(compression) = &options.compression {
248        match compression.as_str() {
249            "snappy" | "gzip" | "zstd" | "uncompressed" => {}
250            _ => {
251                return Err(Box::new(ConfigError(format!(
252                    "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
253                    entity_name, compression
254                ))))
255            }
256        }
257    }
258    if let Some(row_group_size) = options.row_group_size {
259        if row_group_size == 0 {
260            return Err(Box::new(ConfigError(format!(
261                "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
262                entity_name
263            ))));
264        }
265    }
266    if let Some(max_size_per_file) = options.max_size_per_file {
267        if max_size_per_file == 0 {
268            return Err(Box::new(ConfigError(format!(
269                "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
270                entity_name
271            ))));
272        }
273    }
274    Ok(())
275}
276
277pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
278    match format {
279        "csv" => Ok(io::read::csv::csv_input_adapter()),
280        "tsv" => Ok(io::read::csv::tsv_input_adapter()),
281        "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
282        "orc" => Ok(io::read::orc::orc_input_adapter()),
283        "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
284        "json" => Ok(io::read::json::json_input_adapter()),
285        "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
286        "avro" => Ok(io::read::avro::avro_input_adapter()),
287        "xml" => Ok(io::read::xml::xml_input_adapter()),
288        _ => Err(Box::new(unsupported_format_error(
289            FormatKind::Source,
290            format,
291            None,
292        ))),
293    }
294}
295
296pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
297    match format {
298        "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
299        "delta" => Ok(io::write::delta::delta_accepted_adapter()),
300        "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
301        _ => Err(Box::new(unsupported_format_error(
302            FormatKind::SinkAccepted,
303            format,
304            None,
305        ))),
306    }
307}
308
309pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
310    match format {
311        "csv" => Ok(io::write::csv::csv_rejected_adapter()),
312        _ => Err(Box::new(unsupported_format_error(
313            FormatKind::SinkRejected,
314            format,
315            None,
316        ))),
317    }
318}
319
320pub(crate) fn read_input_from_df(
321    input_file: &InputFile,
322    df: &DataFrame,
323    columns: &[config::ColumnConfig],
324    normalize_strategy: Option<&str>,
325    collect_raw: bool,
326) -> FloeResult<ReadInput> {
327    let input_columns = df
328        .get_column_names()
329        .iter()
330        .map(|name| name.to_string())
331        .collect::<Vec<_>>();
332    let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
333    let raw_df = if collect_raw {
334        Some(cast_df_to_string(df)?)
335    } else {
336        None
337    };
338    let typed_df = cast_df_to_schema(df, &typed_schema)?;
339    finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
340}
341
342pub(crate) fn finalize_read_input(
343    input_file: &InputFile,
344    mut raw_df: Option<DataFrame>,
345    mut typed_df: DataFrame,
346    normalize_strategy: Option<&str>,
347) -> FloeResult<ReadInput> {
348    if let Some(strategy) = normalize_strategy {
349        if let Some(raw_df) = raw_df.as_mut() {
350            crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
351        }
352        crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
353    }
354    Ok(ReadInput::Data {
355        input_file: input_file.clone(),
356        raw_df,
357        typed_df,
358    })
359}
360
361pub(crate) fn build_typed_schema(
362    input_columns: &[String],
363    declared_columns: &[config::ColumnConfig],
364    normalize_strategy: Option<&str>,
365) -> FloeResult<Schema> {
366    let mut declared_types = HashMap::new();
367    for column in declared_columns {
368        declared_types.insert(
369            column.name.as_str(),
370            config::parse_data_type(&column.column_type)?,
371        );
372    }
373
374    let mut schema = Schema::with_capacity(input_columns.len());
375    for name in input_columns {
376        let normalized = if let Some(strategy) = normalize_strategy {
377            crate::checks::normalize::normalize_name(name, strategy)
378        } else {
379            name.to_string()
380        };
381        let dtype = declared_types
382            .get(normalized.as_str())
383            .cloned()
384            .unwrap_or(DataType::String);
385        schema.insert(name.as_str().into(), dtype);
386    }
387    Ok(schema)
388}
389
390pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
391    cast_df_with_type(df, &DataType::String)
392}
393
394pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
395    let mut columns = Vec::with_capacity(schema.len());
396    for (name, dtype) in schema.iter() {
397        let series = df.column(name.as_str()).map_err(|err| {
398            Box::new(ConfigError(format!(
399                "input column {} not found: {err}",
400                name.as_str()
401            )))
402        })?;
403        let casted =
404            if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
405                cast_string_to_bool(name.as_str(), series)?
406            } else {
407                series
408                    .cast_with_options(dtype, CastOptions::NonStrict)
409                    .map_err(|err| {
410                        Box::new(ConfigError(format!(
411                            "failed to cast input column {}: {err}",
412                            name.as_str()
413                        )))
414                    })?
415            };
416        columns.push(casted);
417    }
418    DataFrame::new(columns).map_err(|err| {
419        Box::new(ConfigError(format!(
420            "failed to build typed dataframe: {err}"
421        ))) as Box<dyn std::error::Error + Send + Sync>
422    })
423}
424
425fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
426    let string_values = series.as_materialized_series().str().map_err(|err| {
427        Box::new(ConfigError(format!(
428            "failed to read boolean column {} as string: {err}",
429            name
430        )))
431    })?;
432    let mut values = Vec::with_capacity(series.len());
433    for value in string_values {
434        let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
435            "true" | "1" => Some(true),
436            "false" | "0" => Some(false),
437            _ => None,
438        });
439        values.push(parsed);
440    }
441    Ok(Series::new(name.into(), values).into())
442}
443
444fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
445    let mut out = df.clone();
446    let names = out
447        .get_column_names()
448        .iter()
449        .map(|name| name.to_string())
450        .collect::<Vec<_>>();
451    for name in names {
452        let series = out.column(&name).map_err(|err| {
453            Box::new(ConfigError(format!(
454                "input column {} not found: {err}",
455                name
456            )))
457        })?;
458        let casted = series
459            .cast_with_options(dtype, CastOptions::NonStrict)
460            .map_err(|err| {
461                Box::new(ConfigError(format!(
462                    "failed to cast input column {}: {err}",
463                    name
464                )))
465            })?;
466        let idx = out.get_column_index(&name).ok_or_else(|| {
467            Box::new(ConfigError(format!(
468                "input column {} not found for update",
469                name
470            )))
471        })?;
472        out.replace_column(idx, casted).map_err(|err| {
473            Box::new(ConfigError(format!(
474                "failed to update input column {}: {err}",
475                name
476            )))
477        })?;
478    }
479    Ok(out)
480}
481pub fn collect_row_errors(
482    raw_df: &DataFrame,
483    typed_df: &DataFrame,
484    required_cols: &[String],
485    columns: &[config::ColumnConfig],
486    track_cast_errors: bool,
487    raw_indices: &check::ColumnIndex,
488    typed_indices: &check::ColumnIndex,
489) -> FloeResult<Vec<Vec<check::RowError>>> {
490    let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
491    if track_cast_errors {
492        let cast_errors =
493            check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
494        for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
495            errors.extend(cast);
496        }
497    }
498    Ok(error_lists)
499}