Skip to main content

floe_core/io/
format.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12    pub source_uri: String,
13    pub source_local_path: PathBuf,
14    pub source_name: String,
15    pub source_stem: String,
16    pub source_size: Option<u64>,
17    pub source_mtime: Option<String>,
18}
19
20#[derive(Debug, Clone)]
21pub struct FileReadError {
22    pub rule: String,
23    pub message: String,
24}
25
26pub enum ReadInput {
27    Data {
28        input_file: InputFile,
29        raw_df: Option<DataFrame>,
30        typed_df: DataFrame,
31    },
32    FileError {
33        input_file: InputFile,
34        error: FileReadError,
35    },
36}
37
38#[derive(Debug, Clone)]
39pub struct AcceptedWriteMetrics {
40    pub total_bytes_written: Option<u64>,
41    pub avg_file_size_mb: Option<f64>,
42    pub small_files_count: Option<u64>,
43}
44
45#[derive(Debug, Clone, Default)]
46pub struct AcceptedSchemaEvolution {
47    pub enabled: bool,
48    pub mode: String,
49    pub applied: bool,
50    pub added_columns: Vec<String>,
51    pub incompatible_changes_detected: bool,
52}
53
54#[derive(Debug, Clone, Default)]
55pub struct AcceptedWritePerfBreakdown {
56    pub conversion_ms: Option<u64>,
57    pub source_df_build_ms: Option<u64>,
58    pub merge_exec_ms: Option<u64>,
59    pub data_write_ms: Option<u64>,
60    pub commit_ms: Option<u64>,
61    pub metrics_read_ms: Option<u64>,
62}
63
64#[derive(Debug, Clone)]
65pub struct AcceptedMergeMetrics {
66    pub merge_key: Vec<String>,
67    pub inserted_count: u64,
68    pub updated_count: u64,
69    pub closed_count: Option<u64>,
70    pub unchanged_count: Option<u64>,
71    pub target_rows_before: u64,
72    pub target_rows_after: u64,
73    pub merge_elapsed_ms: u64,
74}
75
76#[derive(Debug, Clone)]
77pub struct AcceptedWriteOutput {
78    pub files_written: Option<u64>,
79    pub parts_written: u64,
80    pub part_files: Vec<String>,
81    pub table_version: Option<i64>,
82    pub snapshot_id: Option<i64>,
83    pub table_root_uri: Option<String>,
84    pub iceberg_catalog_name: Option<String>,
85    pub iceberg_database: Option<String>,
86    pub iceberg_namespace: Option<String>,
87    pub iceberg_table: Option<String>,
88    pub metrics: AcceptedWriteMetrics,
89    pub merge: Option<AcceptedMergeMetrics>,
90    pub schema_evolution: AcceptedSchemaEvolution,
91    pub perf: Option<AcceptedWritePerfBreakdown>,
92}
93
94pub trait InputAdapter: Send + Sync {
95    fn format(&self) -> &'static str;
96
97    fn default_globs(&self) -> FloeResult<Vec<String>> {
98        io::storage::extensions::glob_patterns_for_format(self.format())
99    }
100
101    fn suffixes(&self) -> FloeResult<Vec<String>> {
102        io::storage::extensions::suffixes_for_format(self.format())
103    }
104
105    fn resolve_local_inputs(
106        &self,
107        config_dir: &Path,
108        entity_name: &str,
109        source: &config::SourceConfig,
110        storage: &str,
111    ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
112        let default_globs = self.default_globs()?;
113        io::storage::local::resolve_local_inputs(
114            config_dir,
115            entity_name,
116            source,
117            storage,
118            &default_globs,
119        )
120    }
121
122    fn read_input_columns(
123        &self,
124        entity: &config::EntityConfig,
125        input_file: &InputFile,
126        columns: &[config::ColumnConfig],
127    ) -> Result<Vec<String>, FileReadError>;
128
129    fn read_inputs(
130        &self,
131        entity: &config::EntityConfig,
132        files: &[InputFile],
133        columns: &[config::ColumnConfig],
134        normalize_strategy: Option<&str>,
135        collect_raw: bool,
136    ) -> FloeResult<Vec<ReadInput>>;
137
138    fn read_inputs_with_prechecked_columns(
139        &self,
140        entity: &config::EntityConfig,
141        files: &[InputFile],
142        columns: &[config::ColumnConfig],
143        normalize_strategy: Option<&str>,
144        collect_raw: bool,
145        prechecked_input_columns: Option<&[String]>,
146    ) -> FloeResult<Vec<ReadInput>> {
147        let _ = prechecked_input_columns;
148        self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
149    }
150}
151
152pub trait AcceptedSinkAdapter: Send + Sync {
153    #[allow(clippy::too_many_arguments)]
154    fn write_accepted(
155        &self,
156        target: &Target,
157        df: &mut DataFrame,
158        mode: config::WriteMode,
159        output_stem: &str,
160        temp_dir: Option<&Path>,
161        cloud: &mut io::storage::CloudClient,
162        resolver: &config::StorageResolver,
163        catalogs: &config::CatalogResolver,
164        entity: &config::EntityConfig,
165    ) -> FloeResult<AcceptedWriteOutput>;
166}
167
168pub struct RejectedWriteRequest<'a> {
169    pub target: &'a Target,
170    pub df: &'a mut DataFrame,
171    pub source_stem: &'a str,
172    pub temp_dir: Option<&'a Path>,
173    pub cloud: &'a mut io::storage::CloudClient,
174    pub resolver: &'a config::StorageResolver,
175    pub entity: &'a config::EntityConfig,
176    pub mode: config::WriteMode,
177}
178
179pub trait RejectedSinkAdapter: Send + Sync {
180    fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
181}
182
183#[derive(Debug, Clone, Copy)]
184pub enum FormatKind {
185    Source,
186    SinkAccepted,
187    SinkRejected,
188}
189
190impl FormatKind {
191    fn field_path(self) -> &'static str {
192        match self {
193            FormatKind::Source => "source.format",
194            FormatKind::SinkAccepted => "sink.accepted.format",
195            FormatKind::SinkRejected => "sink.rejected.format",
196        }
197    }
198
199    fn description(self) -> &'static str {
200        match self {
201            FormatKind::Source => "source format",
202            FormatKind::SinkAccepted => "accepted sink format",
203            FormatKind::SinkRejected => "rejected sink format",
204        }
205    }
206}
207
208fn unsupported_format_error(
209    kind: FormatKind,
210    format: &str,
211    entity_name: Option<&str>,
212) -> ConfigError {
213    if let Some(entity_name) = entity_name {
214        return ConfigError(format!(
215            "entity.name={} {}={} is unsupported",
216            entity_name,
217            kind.field_path(),
218            format
219        ));
220    }
221    ConfigError(format!("unsupported {}: {format}", kind.description()))
222}
223
224pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
225    if input_adapter(format).is_err() {
226        return Err(Box::new(unsupported_format_error(
227            FormatKind::Source,
228            format,
229            Some(entity_name),
230        )));
231    }
232    Ok(())
233}
234
235pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
236    if accepted_sink_adapter(format).is_err() {
237        return Err(Box::new(unsupported_format_error(
238            FormatKind::SinkAccepted,
239            format,
240            Some(entity_name),
241        )));
242    }
243    Ok(())
244}
245
246pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
247    if rejected_sink_adapter(format).is_err() {
248        return Err(Box::new(unsupported_format_error(
249            FormatKind::SinkRejected,
250            format,
251            Some(entity_name),
252        )));
253    }
254    Ok(())
255}
256
257pub fn resolve_read_columns(
258    entity: &config::EntityConfig,
259    normalized_columns: &[config::ColumnConfig],
260    normalize_strategy: Option<&str>,
261) -> FloeResult<Vec<config::ColumnConfig>> {
262    if entity.source.format == "json" || entity.source.format == "xml" {
263        check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
264    } else {
265        Ok(normalized_columns.to_vec())
266    }
267}
268
269pub fn sink_options_warning(
270    entity_name: &str,
271    format: &str,
272    options: Option<&config::SinkOptions>,
273) -> Option<String> {
274    let options = options?;
275    if format == "parquet" {
276        return None;
277    }
278    let mut keys = Vec::new();
279    if options.compression.is_some() {
280        keys.push("compression");
281    }
282    if options.row_group_size.is_some() {
283        keys.push("row_group_size");
284    }
285    if options.max_size_per_file.is_some() {
286        keys.push("max_size_per_file");
287    }
288    let detail = if keys.is_empty() {
289        "options".to_string()
290    } else {
291        keys.join(", ")
292    };
293    Some(format!(
294        "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
295        entity_name, format
296    ))
297}
298
299pub fn validate_sink_options(
300    entity_name: &str,
301    format: &str,
302    options: Option<&config::SinkOptions>,
303) -> FloeResult<()> {
304    let options = match options {
305        Some(options) => options,
306        None => return Ok(()),
307    };
308    if format != "parquet" {
309        return Ok(());
310    }
311    if let Some(compression) = &options.compression {
312        match compression.as_str() {
313            "snappy" | "gzip" | "zstd" | "uncompressed" => {}
314            _ => {
315                return Err(Box::new(ConfigError(format!(
316                    "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
317                    entity_name, compression
318                ))))
319            }
320        }
321    }
322    if let Some(row_group_size) = options.row_group_size {
323        if row_group_size == 0 {
324            return Err(Box::new(ConfigError(format!(
325                "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
326                entity_name
327            ))));
328        }
329    }
330    if let Some(max_size_per_file) = options.max_size_per_file {
331        if max_size_per_file == 0 {
332            return Err(Box::new(ConfigError(format!(
333                "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
334                entity_name
335            ))));
336        }
337    }
338    Ok(())
339}
340
341pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
342    match format {
343        "csv" => Ok(io::read::csv::csv_input_adapter()),
344        "tsv" => Ok(io::read::csv::tsv_input_adapter()),
345        "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
346        "orc" => Ok(io::read::orc::orc_input_adapter()),
347        "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
348        "json" => Ok(io::read::json::json_input_adapter()),
349        "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
350        "avro" => Ok(io::read::avro::avro_input_adapter()),
351        "xml" => Ok(io::read::xml::xml_input_adapter()),
352        _ => Err(Box::new(unsupported_format_error(
353            FormatKind::Source,
354            format,
355            None,
356        ))),
357    }
358}
359
360pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
361    match format {
362        "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
363        "delta" => Ok(io::write::delta::delta_accepted_adapter()),
364        "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
365        _ => Err(Box::new(unsupported_format_error(
366            FormatKind::SinkAccepted,
367            format,
368            None,
369        ))),
370    }
371}
372
373pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
374    match format {
375        "csv" => Ok(io::write::csv::csv_rejected_adapter()),
376        _ => Err(Box::new(unsupported_format_error(
377            FormatKind::SinkRejected,
378            format,
379            None,
380        ))),
381    }
382}
383
384pub(crate) fn read_input_from_df(
385    input_file: &InputFile,
386    df: &DataFrame,
387    columns: &[config::ColumnConfig],
388    normalize_strategy: Option<&str>,
389    collect_raw: bool,
390) -> FloeResult<ReadInput> {
391    let input_columns = df
392        .get_column_names()
393        .iter()
394        .map(|name| name.to_string())
395        .collect::<Vec<_>>();
396    let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
397    let raw_df = if collect_raw {
398        Some(cast_df_to_string(df)?)
399    } else {
400        None
401    };
402    let typed_df = cast_df_to_schema(df, &typed_schema)?;
403    finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
404}
405
406pub(crate) fn finalize_read_input(
407    input_file: &InputFile,
408    mut raw_df: Option<DataFrame>,
409    mut typed_df: DataFrame,
410    normalize_strategy: Option<&str>,
411) -> FloeResult<ReadInput> {
412    if let Some(strategy) = normalize_strategy {
413        if let Some(raw_df) = raw_df.as_mut() {
414            crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
415        }
416        crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
417    }
418    Ok(ReadInput::Data {
419        input_file: input_file.clone(),
420        raw_df,
421        typed_df,
422    })
423}
424
425pub(crate) fn build_typed_schema(
426    input_columns: &[String],
427    declared_columns: &[config::ColumnConfig],
428    normalize_strategy: Option<&str>,
429) -> FloeResult<Schema> {
430    let mut declared_types = HashMap::new();
431    for column in declared_columns {
432        declared_types.insert(
433            column.name.as_str(),
434            config::parse_data_type(&column.column_type)?,
435        );
436    }
437
438    let mut schema = Schema::with_capacity(input_columns.len());
439    for name in input_columns {
440        let normalized = if let Some(strategy) = normalize_strategy {
441            crate::checks::normalize::normalize_name(name, strategy)
442        } else {
443            name.to_string()
444        };
445        let dtype = declared_types
446            .get(normalized.as_str())
447            .cloned()
448            .unwrap_or(DataType::String);
449        schema.insert(name.as_str().into(), dtype);
450    }
451    Ok(schema)
452}
453
454pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
455    cast_df_with_type(df, &DataType::String)
456}
457
458pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
459    let mut columns = Vec::with_capacity(schema.len());
460    for (name, dtype) in schema.iter() {
461        let series = df.column(name.as_str()).map_err(|err| {
462            Box::new(ConfigError(format!(
463                "input column {} not found: {err}",
464                name.as_str()
465            )))
466        })?;
467        let casted =
468            if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
469                cast_string_to_bool(name.as_str(), series)?
470            } else {
471                series
472                    .cast_with_options(dtype, CastOptions::NonStrict)
473                    .map_err(|err| {
474                        Box::new(ConfigError(format!(
475                            "failed to cast input column {}: {err}",
476                            name.as_str()
477                        )))
478                    })?
479            };
480        columns.push(casted);
481    }
482    DataFrame::new(columns).map_err(|err| {
483        Box::new(ConfigError(format!(
484            "failed to build typed dataframe: {err}"
485        ))) as Box<dyn std::error::Error + Send + Sync>
486    })
487}
488
489fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
490    let string_values = series.as_materialized_series().str().map_err(|err| {
491        Box::new(ConfigError(format!(
492            "failed to read boolean column {} as string: {err}",
493            name
494        )))
495    })?;
496    let mut values = Vec::with_capacity(series.len());
497    for value in string_values {
498        let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
499            "true" | "1" => Some(true),
500            "false" | "0" => Some(false),
501            _ => None,
502        });
503        values.push(parsed);
504    }
505    Ok(Series::new(name.into(), values).into())
506}
507
508fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
509    let mut out = df.clone();
510    let names = out
511        .get_column_names()
512        .iter()
513        .map(|name| name.to_string())
514        .collect::<Vec<_>>();
515    for name in names {
516        let series = out.column(&name).map_err(|err| {
517            Box::new(ConfigError(format!(
518                "input column {} not found: {err}",
519                name
520            )))
521        })?;
522        let casted = series
523            .cast_with_options(dtype, CastOptions::NonStrict)
524            .map_err(|err| {
525                Box::new(ConfigError(format!(
526                    "failed to cast input column {}: {err}",
527                    name
528                )))
529            })?;
530        let idx = out.get_column_index(&name).ok_or_else(|| {
531            Box::new(ConfigError(format!(
532                "input column {} not found for update",
533                name
534            )))
535        })?;
536        out.replace_column(idx, casted).map_err(|err| {
537            Box::new(ConfigError(format!(
538                "failed to update input column {}: {err}",
539                name
540            )))
541        })?;
542    }
543    Ok(out)
544}
545pub fn collect_row_errors(
546    raw_df: &DataFrame,
547    typed_df: &DataFrame,
548    required_cols: &[String],
549    columns: &[config::ColumnConfig],
550    track_cast_errors: bool,
551    raw_indices: &check::ColumnIndex,
552    typed_indices: &check::ColumnIndex,
553) -> FloeResult<Vec<Vec<check::RowError>>> {
554    let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
555    if track_cast_errors {
556        let cast_errors =
557            check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
558        for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
559            errors.extend(cast);
560        }
561    }
562    Ok(error_lists)
563}