Skip to main content

floe_core/io/
format.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12    pub source_uri: String,
13    pub source_local_path: PathBuf,
14    pub source_name: String,
15    pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20    pub rule: String,
21    pub message: String,
22}
23
24pub enum ReadInput {
25    Data {
26        input_file: InputFile,
27        raw_df: Option<DataFrame>,
28        typed_df: DataFrame,
29    },
30    FileError {
31        input_file: InputFile,
32        error: FileReadError,
33    },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteMetrics {
38    pub total_bytes_written: Option<u64>,
39    pub avg_file_size_mb: Option<f64>,
40    pub small_files_count: Option<u64>,
41}
42
43#[derive(Debug, Clone)]
44pub struct AcceptedWriteOutput {
45    pub files_written: u64,
46    pub parts_written: u64,
47    pub part_files: Vec<String>,
48    pub table_version: Option<i64>,
49    pub snapshot_id: Option<i64>,
50    pub table_root_uri: Option<String>,
51    pub iceberg_catalog_name: Option<String>,
52    pub iceberg_database: Option<String>,
53    pub iceberg_namespace: Option<String>,
54    pub iceberg_table: Option<String>,
55    pub metrics: AcceptedWriteMetrics,
56}
57
58pub trait InputAdapter: Send + Sync {
59    fn format(&self) -> &'static str;
60
61    fn default_globs(&self) -> FloeResult<Vec<String>> {
62        io::storage::extensions::glob_patterns_for_format(self.format())
63    }
64
65    fn suffixes(&self) -> FloeResult<Vec<String>> {
66        io::storage::extensions::suffixes_for_format(self.format())
67    }
68
69    fn resolve_local_inputs(
70        &self,
71        config_dir: &Path,
72        entity_name: &str,
73        source: &config::SourceConfig,
74        storage: &str,
75    ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
76        let default_globs = self.default_globs()?;
77        io::storage::local::resolve_local_inputs(
78            config_dir,
79            entity_name,
80            source,
81            storage,
82            &default_globs,
83        )
84    }
85
86    fn read_input_columns(
87        &self,
88        entity: &config::EntityConfig,
89        input_file: &InputFile,
90        columns: &[config::ColumnConfig],
91    ) -> Result<Vec<String>, FileReadError>;
92
93    fn read_inputs(
94        &self,
95        entity: &config::EntityConfig,
96        files: &[InputFile],
97        columns: &[config::ColumnConfig],
98        normalize_strategy: Option<&str>,
99        collect_raw: bool,
100    ) -> FloeResult<Vec<ReadInput>>;
101
102    fn read_inputs_with_prechecked_columns(
103        &self,
104        entity: &config::EntityConfig,
105        files: &[InputFile],
106        columns: &[config::ColumnConfig],
107        normalize_strategy: Option<&str>,
108        collect_raw: bool,
109        prechecked_input_columns: Option<&[String]>,
110    ) -> FloeResult<Vec<ReadInput>> {
111        let _ = prechecked_input_columns;
112        self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
113    }
114}
115
116pub trait AcceptedSinkAdapter: Send + Sync {
117    #[allow(clippy::too_many_arguments)]
118    fn write_accepted(
119        &self,
120        target: &Target,
121        df: &mut DataFrame,
122        mode: config::WriteMode,
123        output_stem: &str,
124        temp_dir: Option<&Path>,
125        cloud: &mut io::storage::CloudClient,
126        resolver: &config::StorageResolver,
127        catalogs: &config::CatalogResolver,
128        entity: &config::EntityConfig,
129    ) -> FloeResult<AcceptedWriteOutput>;
130}
131
132pub struct RejectedWriteRequest<'a> {
133    pub target: &'a Target,
134    pub df: &'a mut DataFrame,
135    pub source_stem: &'a str,
136    pub temp_dir: Option<&'a Path>,
137    pub cloud: &'a mut io::storage::CloudClient,
138    pub resolver: &'a config::StorageResolver,
139    pub entity: &'a config::EntityConfig,
140    pub mode: config::WriteMode,
141}
142
143pub trait RejectedSinkAdapter: Send + Sync {
144    fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
145}
146
147#[derive(Debug, Clone, Copy)]
148pub enum FormatKind {
149    Source,
150    SinkAccepted,
151    SinkRejected,
152}
153
154impl FormatKind {
155    fn field_path(self) -> &'static str {
156        match self {
157            FormatKind::Source => "source.format",
158            FormatKind::SinkAccepted => "sink.accepted.format",
159            FormatKind::SinkRejected => "sink.rejected.format",
160        }
161    }
162
163    fn description(self) -> &'static str {
164        match self {
165            FormatKind::Source => "source format",
166            FormatKind::SinkAccepted => "accepted sink format",
167            FormatKind::SinkRejected => "rejected sink format",
168        }
169    }
170}
171
172fn unsupported_format_error(
173    kind: FormatKind,
174    format: &str,
175    entity_name: Option<&str>,
176) -> ConfigError {
177    if let Some(entity_name) = entity_name {
178        return ConfigError(format!(
179            "entity.name={} {}={} is unsupported",
180            entity_name,
181            kind.field_path(),
182            format
183        ));
184    }
185    ConfigError(format!("unsupported {}: {format}", kind.description()))
186}
187
188pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
189    if input_adapter(format).is_err() {
190        return Err(Box::new(unsupported_format_error(
191            FormatKind::Source,
192            format,
193            Some(entity_name),
194        )));
195    }
196    Ok(())
197}
198
199pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
200    if accepted_sink_adapter(format).is_err() {
201        return Err(Box::new(unsupported_format_error(
202            FormatKind::SinkAccepted,
203            format,
204            Some(entity_name),
205        )));
206    }
207    Ok(())
208}
209
210pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
211    if rejected_sink_adapter(format).is_err() {
212        return Err(Box::new(unsupported_format_error(
213            FormatKind::SinkRejected,
214            format,
215            Some(entity_name),
216        )));
217    }
218    Ok(())
219}
220
221pub fn resolve_read_columns(
222    entity: &config::EntityConfig,
223    normalized_columns: &[config::ColumnConfig],
224    normalize_strategy: Option<&str>,
225) -> FloeResult<Vec<config::ColumnConfig>> {
226    if entity.source.format == "json" || entity.source.format == "xml" {
227        check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
228    } else {
229        Ok(normalized_columns.to_vec())
230    }
231}
232
233pub fn sink_options_warning(
234    entity_name: &str,
235    format: &str,
236    options: Option<&config::SinkOptions>,
237) -> Option<String> {
238    let options = options?;
239    if format == "parquet" {
240        return None;
241    }
242    let mut keys = Vec::new();
243    if options.compression.is_some() {
244        keys.push("compression");
245    }
246    if options.row_group_size.is_some() {
247        keys.push("row_group_size");
248    }
249    if options.max_size_per_file.is_some() {
250        keys.push("max_size_per_file");
251    }
252    let detail = if keys.is_empty() {
253        "options".to_string()
254    } else {
255        keys.join(", ")
256    };
257    Some(format!(
258        "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
259        entity_name, format
260    ))
261}
262
263pub fn validate_sink_options(
264    entity_name: &str,
265    format: &str,
266    options: Option<&config::SinkOptions>,
267) -> FloeResult<()> {
268    let options = match options {
269        Some(options) => options,
270        None => return Ok(()),
271    };
272    if format != "parquet" {
273        return Ok(());
274    }
275    if let Some(compression) = &options.compression {
276        match compression.as_str() {
277            "snappy" | "gzip" | "zstd" | "uncompressed" => {}
278            _ => {
279                return Err(Box::new(ConfigError(format!(
280                    "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
281                    entity_name, compression
282                ))))
283            }
284        }
285    }
286    if let Some(row_group_size) = options.row_group_size {
287        if row_group_size == 0 {
288            return Err(Box::new(ConfigError(format!(
289                "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
290                entity_name
291            ))));
292        }
293    }
294    if let Some(max_size_per_file) = options.max_size_per_file {
295        if max_size_per_file == 0 {
296            return Err(Box::new(ConfigError(format!(
297                "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
298                entity_name
299            ))));
300        }
301    }
302    Ok(())
303}
304
305pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
306    match format {
307        "csv" => Ok(io::read::csv::csv_input_adapter()),
308        "tsv" => Ok(io::read::csv::tsv_input_adapter()),
309        "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
310        "orc" => Ok(io::read::orc::orc_input_adapter()),
311        "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
312        "json" => Ok(io::read::json::json_input_adapter()),
313        "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
314        "avro" => Ok(io::read::avro::avro_input_adapter()),
315        "xml" => Ok(io::read::xml::xml_input_adapter()),
316        _ => Err(Box::new(unsupported_format_error(
317            FormatKind::Source,
318            format,
319            None,
320        ))),
321    }
322}
323
324pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
325    match format {
326        "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
327        "delta" => Ok(io::write::delta::delta_accepted_adapter()),
328        "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
329        _ => Err(Box::new(unsupported_format_error(
330            FormatKind::SinkAccepted,
331            format,
332            None,
333        ))),
334    }
335}
336
337pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
338    match format {
339        "csv" => Ok(io::write::csv::csv_rejected_adapter()),
340        _ => Err(Box::new(unsupported_format_error(
341            FormatKind::SinkRejected,
342            format,
343            None,
344        ))),
345    }
346}
347
348pub(crate) fn read_input_from_df(
349    input_file: &InputFile,
350    df: &DataFrame,
351    columns: &[config::ColumnConfig],
352    normalize_strategy: Option<&str>,
353    collect_raw: bool,
354) -> FloeResult<ReadInput> {
355    let input_columns = df
356        .get_column_names()
357        .iter()
358        .map(|name| name.to_string())
359        .collect::<Vec<_>>();
360    let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
361    let raw_df = if collect_raw {
362        Some(cast_df_to_string(df)?)
363    } else {
364        None
365    };
366    let typed_df = cast_df_to_schema(df, &typed_schema)?;
367    finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
368}
369
370pub(crate) fn finalize_read_input(
371    input_file: &InputFile,
372    mut raw_df: Option<DataFrame>,
373    mut typed_df: DataFrame,
374    normalize_strategy: Option<&str>,
375) -> FloeResult<ReadInput> {
376    if let Some(strategy) = normalize_strategy {
377        if let Some(raw_df) = raw_df.as_mut() {
378            crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
379        }
380        crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
381    }
382    Ok(ReadInput::Data {
383        input_file: input_file.clone(),
384        raw_df,
385        typed_df,
386    })
387}
388
389pub(crate) fn build_typed_schema(
390    input_columns: &[String],
391    declared_columns: &[config::ColumnConfig],
392    normalize_strategy: Option<&str>,
393) -> FloeResult<Schema> {
394    let mut declared_types = HashMap::new();
395    for column in declared_columns {
396        declared_types.insert(
397            column.name.as_str(),
398            config::parse_data_type(&column.column_type)?,
399        );
400    }
401
402    let mut schema = Schema::with_capacity(input_columns.len());
403    for name in input_columns {
404        let normalized = if let Some(strategy) = normalize_strategy {
405            crate::checks::normalize::normalize_name(name, strategy)
406        } else {
407            name.to_string()
408        };
409        let dtype = declared_types
410            .get(normalized.as_str())
411            .cloned()
412            .unwrap_or(DataType::String);
413        schema.insert(name.as_str().into(), dtype);
414    }
415    Ok(schema)
416}
417
418pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
419    cast_df_with_type(df, &DataType::String)
420}
421
422pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
423    let mut columns = Vec::with_capacity(schema.len());
424    for (name, dtype) in schema.iter() {
425        let series = df.column(name.as_str()).map_err(|err| {
426            Box::new(ConfigError(format!(
427                "input column {} not found: {err}",
428                name.as_str()
429            )))
430        })?;
431        let casted =
432            if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
433                cast_string_to_bool(name.as_str(), series)?
434            } else {
435                series
436                    .cast_with_options(dtype, CastOptions::NonStrict)
437                    .map_err(|err| {
438                        Box::new(ConfigError(format!(
439                            "failed to cast input column {}: {err}",
440                            name.as_str()
441                        )))
442                    })?
443            };
444        columns.push(casted);
445    }
446    DataFrame::new(columns).map_err(|err| {
447        Box::new(ConfigError(format!(
448            "failed to build typed dataframe: {err}"
449        ))) as Box<dyn std::error::Error + Send + Sync>
450    })
451}
452
453fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
454    let string_values = series.as_materialized_series().str().map_err(|err| {
455        Box::new(ConfigError(format!(
456            "failed to read boolean column {} as string: {err}",
457            name
458        )))
459    })?;
460    let mut values = Vec::with_capacity(series.len());
461    for value in string_values {
462        let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
463            "true" | "1" => Some(true),
464            "false" | "0" => Some(false),
465            _ => None,
466        });
467        values.push(parsed);
468    }
469    Ok(Series::new(name.into(), values).into())
470}
471
472fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
473    let mut out = df.clone();
474    let names = out
475        .get_column_names()
476        .iter()
477        .map(|name| name.to_string())
478        .collect::<Vec<_>>();
479    for name in names {
480        let series = out.column(&name).map_err(|err| {
481            Box::new(ConfigError(format!(
482                "input column {} not found: {err}",
483                name
484            )))
485        })?;
486        let casted = series
487            .cast_with_options(dtype, CastOptions::NonStrict)
488            .map_err(|err| {
489                Box::new(ConfigError(format!(
490                    "failed to cast input column {}: {err}",
491                    name
492                )))
493            })?;
494        let idx = out.get_column_index(&name).ok_or_else(|| {
495            Box::new(ConfigError(format!(
496                "input column {} not found for update",
497                name
498            )))
499        })?;
500        out.replace_column(idx, casted).map_err(|err| {
501            Box::new(ConfigError(format!(
502                "failed to update input column {}: {err}",
503                name
504            )))
505        })?;
506    }
507    Ok(out)
508}
509pub fn collect_row_errors(
510    raw_df: &DataFrame,
511    typed_df: &DataFrame,
512    required_cols: &[String],
513    columns: &[config::ColumnConfig],
514    track_cast_errors: bool,
515    raw_indices: &check::ColumnIndex,
516    typed_indices: &check::ColumnIndex,
517) -> FloeResult<Vec<Vec<check::RowError>>> {
518    let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
519    if track_cast_errors {
520        let cast_errors =
521            check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
522        for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
523            errors.extend(cast);
524        }
525    }
526    Ok(error_lists)
527}