Skip to main content

floe_core/io/
format.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12    pub source_uri: String,
13    pub source_local_path: PathBuf,
14    pub source_name: String,
15    pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20    pub rule: String,
21    pub message: String,
22}
23
24pub enum ReadInput {
25    Data {
26        input_file: InputFile,
27        raw_df: Option<DataFrame>,
28        typed_df: DataFrame,
29    },
30    FileError {
31        input_file: InputFile,
32        error: FileReadError,
33    },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteMetrics {
38    pub total_bytes_written: Option<u64>,
39    pub avg_file_size_mb: Option<f64>,
40    pub small_files_count: Option<u64>,
41}
42
43#[derive(Debug, Clone)]
44pub struct AcceptedMergeMetrics {
45    pub merge_key: Vec<String>,
46    pub inserted_count: u64,
47    pub updated_count: u64,
48    pub target_rows_before: u64,
49    pub target_rows_after: u64,
50    pub merge_elapsed_ms: u64,
51}
52
53#[derive(Debug, Clone)]
54pub struct AcceptedWriteOutput {
55    pub files_written: u64,
56    pub parts_written: u64,
57    pub part_files: Vec<String>,
58    pub table_version: Option<i64>,
59    pub snapshot_id: Option<i64>,
60    pub table_root_uri: Option<String>,
61    pub iceberg_catalog_name: Option<String>,
62    pub iceberg_database: Option<String>,
63    pub iceberg_namespace: Option<String>,
64    pub iceberg_table: Option<String>,
65    pub metrics: AcceptedWriteMetrics,
66    pub merge: Option<AcceptedMergeMetrics>,
67}
68
69pub trait InputAdapter: Send + Sync {
70    fn format(&self) -> &'static str;
71
72    fn default_globs(&self) -> FloeResult<Vec<String>> {
73        io::storage::extensions::glob_patterns_for_format(self.format())
74    }
75
76    fn suffixes(&self) -> FloeResult<Vec<String>> {
77        io::storage::extensions::suffixes_for_format(self.format())
78    }
79
80    fn resolve_local_inputs(
81        &self,
82        config_dir: &Path,
83        entity_name: &str,
84        source: &config::SourceConfig,
85        storage: &str,
86    ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
87        let default_globs = self.default_globs()?;
88        io::storage::local::resolve_local_inputs(
89            config_dir,
90            entity_name,
91            source,
92            storage,
93            &default_globs,
94        )
95    }
96
97    fn read_input_columns(
98        &self,
99        entity: &config::EntityConfig,
100        input_file: &InputFile,
101        columns: &[config::ColumnConfig],
102    ) -> Result<Vec<String>, FileReadError>;
103
104    fn read_inputs(
105        &self,
106        entity: &config::EntityConfig,
107        files: &[InputFile],
108        columns: &[config::ColumnConfig],
109        normalize_strategy: Option<&str>,
110        collect_raw: bool,
111    ) -> FloeResult<Vec<ReadInput>>;
112
113    fn read_inputs_with_prechecked_columns(
114        &self,
115        entity: &config::EntityConfig,
116        files: &[InputFile],
117        columns: &[config::ColumnConfig],
118        normalize_strategy: Option<&str>,
119        collect_raw: bool,
120        prechecked_input_columns: Option<&[String]>,
121    ) -> FloeResult<Vec<ReadInput>> {
122        let _ = prechecked_input_columns;
123        self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
124    }
125}
126
127pub trait AcceptedSinkAdapter: Send + Sync {
128    #[allow(clippy::too_many_arguments)]
129    fn write_accepted(
130        &self,
131        target: &Target,
132        df: &mut DataFrame,
133        mode: config::WriteMode,
134        output_stem: &str,
135        temp_dir: Option<&Path>,
136        cloud: &mut io::storage::CloudClient,
137        resolver: &config::StorageResolver,
138        catalogs: &config::CatalogResolver,
139        entity: &config::EntityConfig,
140    ) -> FloeResult<AcceptedWriteOutput>;
141}
142
143pub struct RejectedWriteRequest<'a> {
144    pub target: &'a Target,
145    pub df: &'a mut DataFrame,
146    pub source_stem: &'a str,
147    pub temp_dir: Option<&'a Path>,
148    pub cloud: &'a mut io::storage::CloudClient,
149    pub resolver: &'a config::StorageResolver,
150    pub entity: &'a config::EntityConfig,
151    pub mode: config::WriteMode,
152}
153
154pub trait RejectedSinkAdapter: Send + Sync {
155    fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
156}
157
158#[derive(Debug, Clone, Copy)]
159pub enum FormatKind {
160    Source,
161    SinkAccepted,
162    SinkRejected,
163}
164
165impl FormatKind {
166    fn field_path(self) -> &'static str {
167        match self {
168            FormatKind::Source => "source.format",
169            FormatKind::SinkAccepted => "sink.accepted.format",
170            FormatKind::SinkRejected => "sink.rejected.format",
171        }
172    }
173
174    fn description(self) -> &'static str {
175        match self {
176            FormatKind::Source => "source format",
177            FormatKind::SinkAccepted => "accepted sink format",
178            FormatKind::SinkRejected => "rejected sink format",
179        }
180    }
181}
182
183fn unsupported_format_error(
184    kind: FormatKind,
185    format: &str,
186    entity_name: Option<&str>,
187) -> ConfigError {
188    if let Some(entity_name) = entity_name {
189        return ConfigError(format!(
190            "entity.name={} {}={} is unsupported",
191            entity_name,
192            kind.field_path(),
193            format
194        ));
195    }
196    ConfigError(format!("unsupported {}: {format}", kind.description()))
197}
198
199pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
200    if input_adapter(format).is_err() {
201        return Err(Box::new(unsupported_format_error(
202            FormatKind::Source,
203            format,
204            Some(entity_name),
205        )));
206    }
207    Ok(())
208}
209
210pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
211    if accepted_sink_adapter(format).is_err() {
212        return Err(Box::new(unsupported_format_error(
213            FormatKind::SinkAccepted,
214            format,
215            Some(entity_name),
216        )));
217    }
218    Ok(())
219}
220
221pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
222    if rejected_sink_adapter(format).is_err() {
223        return Err(Box::new(unsupported_format_error(
224            FormatKind::SinkRejected,
225            format,
226            Some(entity_name),
227        )));
228    }
229    Ok(())
230}
231
232pub fn resolve_read_columns(
233    entity: &config::EntityConfig,
234    normalized_columns: &[config::ColumnConfig],
235    normalize_strategy: Option<&str>,
236) -> FloeResult<Vec<config::ColumnConfig>> {
237    if entity.source.format == "json" || entity.source.format == "xml" {
238        check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
239    } else {
240        Ok(normalized_columns.to_vec())
241    }
242}
243
244pub fn sink_options_warning(
245    entity_name: &str,
246    format: &str,
247    options: Option<&config::SinkOptions>,
248) -> Option<String> {
249    let options = options?;
250    if format == "parquet" {
251        return None;
252    }
253    let mut keys = Vec::new();
254    if options.compression.is_some() {
255        keys.push("compression");
256    }
257    if options.row_group_size.is_some() {
258        keys.push("row_group_size");
259    }
260    if options.max_size_per_file.is_some() {
261        keys.push("max_size_per_file");
262    }
263    let detail = if keys.is_empty() {
264        "options".to_string()
265    } else {
266        keys.join(", ")
267    };
268    Some(format!(
269        "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
270        entity_name, format
271    ))
272}
273
274pub fn validate_sink_options(
275    entity_name: &str,
276    format: &str,
277    options: Option<&config::SinkOptions>,
278) -> FloeResult<()> {
279    let options = match options {
280        Some(options) => options,
281        None => return Ok(()),
282    };
283    if format != "parquet" {
284        return Ok(());
285    }
286    if let Some(compression) = &options.compression {
287        match compression.as_str() {
288            "snappy" | "gzip" | "zstd" | "uncompressed" => {}
289            _ => {
290                return Err(Box::new(ConfigError(format!(
291                    "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
292                    entity_name, compression
293                ))))
294            }
295        }
296    }
297    if let Some(row_group_size) = options.row_group_size {
298        if row_group_size == 0 {
299            return Err(Box::new(ConfigError(format!(
300                "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
301                entity_name
302            ))));
303        }
304    }
305    if let Some(max_size_per_file) = options.max_size_per_file {
306        if max_size_per_file == 0 {
307            return Err(Box::new(ConfigError(format!(
308                "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
309                entity_name
310            ))));
311        }
312    }
313    Ok(())
314}
315
316pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
317    match format {
318        "csv" => Ok(io::read::csv::csv_input_adapter()),
319        "tsv" => Ok(io::read::csv::tsv_input_adapter()),
320        "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
321        "orc" => Ok(io::read::orc::orc_input_adapter()),
322        "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
323        "json" => Ok(io::read::json::json_input_adapter()),
324        "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
325        "avro" => Ok(io::read::avro::avro_input_adapter()),
326        "xml" => Ok(io::read::xml::xml_input_adapter()),
327        _ => Err(Box::new(unsupported_format_error(
328            FormatKind::Source,
329            format,
330            None,
331        ))),
332    }
333}
334
335pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
336    match format {
337        "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
338        "delta" => Ok(io::write::delta::delta_accepted_adapter()),
339        "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
340        _ => Err(Box::new(unsupported_format_error(
341            FormatKind::SinkAccepted,
342            format,
343            None,
344        ))),
345    }
346}
347
348pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
349    match format {
350        "csv" => Ok(io::write::csv::csv_rejected_adapter()),
351        _ => Err(Box::new(unsupported_format_error(
352            FormatKind::SinkRejected,
353            format,
354            None,
355        ))),
356    }
357}
358
359pub(crate) fn read_input_from_df(
360    input_file: &InputFile,
361    df: &DataFrame,
362    columns: &[config::ColumnConfig],
363    normalize_strategy: Option<&str>,
364    collect_raw: bool,
365) -> FloeResult<ReadInput> {
366    let input_columns = df
367        .get_column_names()
368        .iter()
369        .map(|name| name.to_string())
370        .collect::<Vec<_>>();
371    let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
372    let raw_df = if collect_raw {
373        Some(cast_df_to_string(df)?)
374    } else {
375        None
376    };
377    let typed_df = cast_df_to_schema(df, &typed_schema)?;
378    finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
379}
380
381pub(crate) fn finalize_read_input(
382    input_file: &InputFile,
383    mut raw_df: Option<DataFrame>,
384    mut typed_df: DataFrame,
385    normalize_strategy: Option<&str>,
386) -> FloeResult<ReadInput> {
387    if let Some(strategy) = normalize_strategy {
388        if let Some(raw_df) = raw_df.as_mut() {
389            crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
390        }
391        crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
392    }
393    Ok(ReadInput::Data {
394        input_file: input_file.clone(),
395        raw_df,
396        typed_df,
397    })
398}
399
400pub(crate) fn build_typed_schema(
401    input_columns: &[String],
402    declared_columns: &[config::ColumnConfig],
403    normalize_strategy: Option<&str>,
404) -> FloeResult<Schema> {
405    let mut declared_types = HashMap::new();
406    for column in declared_columns {
407        declared_types.insert(
408            column.name.as_str(),
409            config::parse_data_type(&column.column_type)?,
410        );
411    }
412
413    let mut schema = Schema::with_capacity(input_columns.len());
414    for name in input_columns {
415        let normalized = if let Some(strategy) = normalize_strategy {
416            crate::checks::normalize::normalize_name(name, strategy)
417        } else {
418            name.to_string()
419        };
420        let dtype = declared_types
421            .get(normalized.as_str())
422            .cloned()
423            .unwrap_or(DataType::String);
424        schema.insert(name.as_str().into(), dtype);
425    }
426    Ok(schema)
427}
428
429pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
430    cast_df_with_type(df, &DataType::String)
431}
432
433pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
434    let mut columns = Vec::with_capacity(schema.len());
435    for (name, dtype) in schema.iter() {
436        let series = df.column(name.as_str()).map_err(|err| {
437            Box::new(ConfigError(format!(
438                "input column {} not found: {err}",
439                name.as_str()
440            )))
441        })?;
442        let casted =
443            if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
444                cast_string_to_bool(name.as_str(), series)?
445            } else {
446                series
447                    .cast_with_options(dtype, CastOptions::NonStrict)
448                    .map_err(|err| {
449                        Box::new(ConfigError(format!(
450                            "failed to cast input column {}: {err}",
451                            name.as_str()
452                        )))
453                    })?
454            };
455        columns.push(casted);
456    }
457    DataFrame::new(columns).map_err(|err| {
458        Box::new(ConfigError(format!(
459            "failed to build typed dataframe: {err}"
460        ))) as Box<dyn std::error::Error + Send + Sync>
461    })
462}
463
464fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
465    let string_values = series.as_materialized_series().str().map_err(|err| {
466        Box::new(ConfigError(format!(
467            "failed to read boolean column {} as string: {err}",
468            name
469        )))
470    })?;
471    let mut values = Vec::with_capacity(series.len());
472    for value in string_values {
473        let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
474            "true" | "1" => Some(true),
475            "false" | "0" => Some(false),
476            _ => None,
477        });
478        values.push(parsed);
479    }
480    Ok(Series::new(name.into(), values).into())
481}
482
483fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
484    let mut out = df.clone();
485    let names = out
486        .get_column_names()
487        .iter()
488        .map(|name| name.to_string())
489        .collect::<Vec<_>>();
490    for name in names {
491        let series = out.column(&name).map_err(|err| {
492            Box::new(ConfigError(format!(
493                "input column {} not found: {err}",
494                name
495            )))
496        })?;
497        let casted = series
498            .cast_with_options(dtype, CastOptions::NonStrict)
499            .map_err(|err| {
500                Box::new(ConfigError(format!(
501                    "failed to cast input column {}: {err}",
502                    name
503                )))
504            })?;
505        let idx = out.get_column_index(&name).ok_or_else(|| {
506            Box::new(ConfigError(format!(
507                "input column {} not found for update",
508                name
509            )))
510        })?;
511        out.replace_column(idx, casted).map_err(|err| {
512            Box::new(ConfigError(format!(
513                "failed to update input column {}: {err}",
514                name
515            )))
516        })?;
517    }
518    Ok(out)
519}
520pub fn collect_row_errors(
521    raw_df: &DataFrame,
522    typed_df: &DataFrame,
523    required_cols: &[String],
524    columns: &[config::ColumnConfig],
525    track_cast_errors: bool,
526    raw_indices: &check::ColumnIndex,
527    typed_indices: &check::ColumnIndex,
528) -> FloeResult<Vec<Vec<check::RowError>>> {
529    let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
530    if track_cast_errors {
531        let cast_errors =
532            check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
533        for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
534            errors.extend(cast);
535        }
536    }
537    Ok(error_lists)
538}