Skip to main content

floe_core/io/
format.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12    pub source_uri: String,
13    pub source_name: String,
14    pub source_stem: String,
15    pub source_size: Option<u64>,
16    pub source_mtime: Option<String>,
17}
18
19/// An `InputFile` paired with a guaranteed local filesystem path.
20///
21/// For local sources the path is the normalized source path (no copy).
22/// For cloud sources the path is a temp file downloaded just-in-time;
23/// `is_ephemeral` is true and the file should be deleted after use.
24#[derive(Debug, Clone)]
25pub struct LocalInputFile {
26    pub file: InputFile,
27    pub local_path: PathBuf,
28    pub is_ephemeral: bool,
29}
30
31#[derive(Debug, Clone)]
32pub struct FileReadError {
33    pub rule: String,
34    pub message: String,
35}
36
37pub enum ReadInput {
38    Data {
39        input_file: InputFile,
40        raw_df: Option<DataFrame>,
41        typed_df: DataFrame,
42    },
43    FileError {
44        input_file: InputFile,
45        error: FileReadError,
46    },
47}
48
49#[derive(Debug, Clone)]
50pub struct AcceptedWriteMetrics {
51    pub total_bytes_written: Option<u64>,
52    pub avg_file_size_mb: Option<f64>,
53    pub small_files_count: Option<u64>,
54}
55
56#[derive(Debug, Clone, Default)]
57pub struct AcceptedSchemaEvolution {
58    pub enabled: bool,
59    pub mode: String,
60    pub applied: bool,
61    pub added_columns: Vec<String>,
62    pub incompatible_changes_detected: bool,
63}
64
65#[derive(Debug, Clone, Default)]
66pub struct AcceptedWritePerfBreakdown {
67    pub conversion_ms: Option<u64>,
68    pub source_df_build_ms: Option<u64>,
69    pub merge_exec_ms: Option<u64>,
70    pub data_write_ms: Option<u64>,
71    pub commit_ms: Option<u64>,
72    pub metrics_read_ms: Option<u64>,
73}
74
75#[derive(Debug, Clone)]
76pub struct AcceptedMergeMetrics {
77    pub merge_key: Vec<String>,
78    pub inserted_count: u64,
79    pub updated_count: u64,
80    pub closed_count: Option<u64>,
81    pub unchanged_count: Option<u64>,
82    pub target_rows_before: u64,
83    pub target_rows_after: u64,
84    pub merge_elapsed_ms: u64,
85}
86
87#[derive(Debug, Clone)]
88pub struct AcceptedWriteOutput {
89    pub files_written: Option<u64>,
90    pub parts_written: u64,
91    pub part_files: Vec<String>,
92    pub table_version: Option<i64>,
93    pub snapshot_id: Option<i64>,
94    pub table_root_uri: Option<String>,
95    pub iceberg_catalog_name: Option<String>,
96    pub iceberg_database: Option<String>,
97    pub iceberg_namespace: Option<String>,
98    pub iceberg_table: Option<String>,
99    pub metrics: AcceptedWriteMetrics,
100    pub merge: Option<AcceptedMergeMetrics>,
101    pub schema_evolution: AcceptedSchemaEvolution,
102    pub perf: Option<AcceptedWritePerfBreakdown>,
103}
104
105pub trait InputAdapter: Send + Sync {
106    fn format(&self) -> &'static str;
107
108    fn default_globs(&self) -> FloeResult<Vec<String>> {
109        io::storage::extensions::glob_patterns_for_format(self.format())
110    }
111
112    fn suffixes(&self) -> FloeResult<Vec<String>> {
113        io::storage::extensions::suffixes_for_format(self.format())
114    }
115
116    fn resolve_local_inputs(
117        &self,
118        config_dir: &Path,
119        entity_name: &str,
120        source: &config::SourceConfig,
121        storage: &str,
122    ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
123        let default_globs = self.default_globs()?;
124        io::storage::local::resolve_local_inputs(
125            config_dir,
126            entity_name,
127            source,
128            storage,
129            &default_globs,
130        )
131    }
132
133    fn read_input_columns(
134        &self,
135        entity: &config::EntityConfig,
136        input_file: &LocalInputFile,
137        columns: &[config::ColumnConfig],
138    ) -> Result<Vec<String>, FileReadError>;
139
140    fn read_inputs(
141        &self,
142        entity: &config::EntityConfig,
143        files: &[LocalInputFile],
144        columns: &[config::ColumnConfig],
145        normalize_strategy: Option<&str>,
146        collect_raw: bool,
147    ) -> FloeResult<Vec<ReadInput>>;
148
149    fn read_inputs_with_prechecked_columns(
150        &self,
151        entity: &config::EntityConfig,
152        files: &[LocalInputFile],
153        columns: &[config::ColumnConfig],
154        normalize_strategy: Option<&str>,
155        collect_raw: bool,
156        prechecked_input_columns: Option<&[String]>,
157    ) -> FloeResult<Vec<ReadInput>> {
158        let _ = prechecked_input_columns;
159        self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
160    }
161}
162
163pub trait AcceptedSinkAdapter: Send + Sync {
164    #[allow(clippy::too_many_arguments)]
165    fn write_accepted(
166        &self,
167        target: &Target,
168        df: &mut DataFrame,
169        mode: config::WriteMode,
170        output_stem: &str,
171        temp_dir: Option<&Path>,
172        cloud: &mut io::storage::CloudClient,
173        resolver: &config::StorageResolver,
174        catalogs: &config::CatalogResolver,
175        entity: &config::EntityConfig,
176    ) -> FloeResult<AcceptedWriteOutput>;
177}
178
179pub struct RejectedWriteRequest<'a> {
180    pub target: &'a Target,
181    pub df: &'a mut DataFrame,
182    pub source_stem: &'a str,
183    pub temp_dir: Option<&'a Path>,
184    pub cloud: &'a mut io::storage::CloudClient,
185    pub resolver: &'a config::StorageResolver,
186    pub entity: &'a config::EntityConfig,
187    pub mode: config::WriteMode,
188}
189
190pub trait RejectedSinkAdapter: Send + Sync {
191    fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
192}
193
194#[derive(Debug, Clone, Copy)]
195pub enum FormatKind {
196    Source,
197    SinkAccepted,
198    SinkRejected,
199}
200
201impl FormatKind {
202    fn field_path(self) -> &'static str {
203        match self {
204            FormatKind::Source => "source.format",
205            FormatKind::SinkAccepted => "sink.accepted.format",
206            FormatKind::SinkRejected => "sink.rejected.format",
207        }
208    }
209
210    fn description(self) -> &'static str {
211        match self {
212            FormatKind::Source => "source format",
213            FormatKind::SinkAccepted => "accepted sink format",
214            FormatKind::SinkRejected => "rejected sink format",
215        }
216    }
217}
218
219fn unsupported_format_error(
220    kind: FormatKind,
221    format: &str,
222    entity_name: Option<&str>,
223) -> ConfigError {
224    if let Some(entity_name) = entity_name {
225        return ConfigError(format!(
226            "entity.name={} {}={} is unsupported",
227            entity_name,
228            kind.field_path(),
229            format
230        ));
231    }
232    ConfigError(format!("unsupported {}: {format}", kind.description()))
233}
234
235pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
236    if input_adapter(format).is_err() {
237        return Err(Box::new(unsupported_format_error(
238            FormatKind::Source,
239            format,
240            Some(entity_name),
241        )));
242    }
243    Ok(())
244}
245
246pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
247    if accepted_sink_adapter(format).is_err() {
248        return Err(Box::new(unsupported_format_error(
249            FormatKind::SinkAccepted,
250            format,
251            Some(entity_name),
252        )));
253    }
254    Ok(())
255}
256
257pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
258    if rejected_sink_adapter(format).is_err() {
259        return Err(Box::new(unsupported_format_error(
260            FormatKind::SinkRejected,
261            format,
262            Some(entity_name),
263        )));
264    }
265    Ok(())
266}
267
268pub fn resolve_read_columns(
269    entity: &config::EntityConfig,
270    normalized_columns: &[config::ColumnConfig],
271    normalize_strategy: Option<&str>,
272) -> FloeResult<Vec<config::ColumnConfig>> {
273    if entity.source.format == "json" || entity.source.format == "xml" {
274        check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
275    } else {
276        Ok(normalized_columns.to_vec())
277    }
278}
279
280pub fn sink_options_warning(
281    entity_name: &str,
282    format: &str,
283    options: Option<&config::SinkOptions>,
284) -> Option<String> {
285    let options = options?;
286    if format == "parquet" {
287        return None;
288    }
289    let mut keys = Vec::new();
290    if options.compression.is_some() {
291        keys.push("compression");
292    }
293    if options.row_group_size.is_some() {
294        keys.push("row_group_size");
295    }
296    if options.max_size_per_file.is_some() {
297        keys.push("max_size_per_file");
298    }
299    let detail = if keys.is_empty() {
300        "options".to_string()
301    } else {
302        keys.join(", ")
303    };
304    Some(format!(
305        "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
306        entity_name, format
307    ))
308}
309
310pub fn validate_sink_options(
311    entity_name: &str,
312    format: &str,
313    options: Option<&config::SinkOptions>,
314) -> FloeResult<()> {
315    let options = match options {
316        Some(options) => options,
317        None => return Ok(()),
318    };
319    if format != "parquet" {
320        return Ok(());
321    }
322    if let Some(compression) = &options.compression {
323        match compression.as_str() {
324            "snappy" | "gzip" | "zstd" | "uncompressed" => {}
325            _ => {
326                return Err(Box::new(ConfigError(format!(
327                    "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
328                    entity_name, compression
329                ))))
330            }
331        }
332    }
333    if let Some(row_group_size) = options.row_group_size {
334        if row_group_size == 0 {
335            return Err(Box::new(ConfigError(format!(
336                "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
337                entity_name
338            ))));
339        }
340    }
341    if let Some(max_size_per_file) = options.max_size_per_file {
342        if max_size_per_file == 0 {
343            return Err(Box::new(ConfigError(format!(
344                "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
345                entity_name
346            ))));
347        }
348    }
349    Ok(())
350}
351
352pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
353    match format {
354        "csv" => Ok(io::read::csv::csv_input_adapter()),
355        "tsv" => Ok(io::read::csv::tsv_input_adapter()),
356        "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
357        "orc" => Ok(io::read::orc::orc_input_adapter()),
358        "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
359        "json" => Ok(io::read::json::json_input_adapter()),
360        "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
361        "avro" => Ok(io::read::avro::avro_input_adapter()),
362        "xml" => Ok(io::read::xml::xml_input_adapter()),
363        _ => Err(Box::new(unsupported_format_error(
364            FormatKind::Source,
365            format,
366            None,
367        ))),
368    }
369}
370
371pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
372    match format {
373        "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
374        "delta" => Ok(io::write::delta::delta_accepted_adapter()),
375        "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
376        _ => Err(Box::new(unsupported_format_error(
377            FormatKind::SinkAccepted,
378            format,
379            None,
380        ))),
381    }
382}
383
384pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
385    match format {
386        "csv" => Ok(io::write::csv::csv_rejected_adapter()),
387        _ => Err(Box::new(unsupported_format_error(
388            FormatKind::SinkRejected,
389            format,
390            None,
391        ))),
392    }
393}
394
395pub(crate) fn read_input_from_df(
396    input_file: &LocalInputFile,
397    df: &DataFrame,
398    columns: &[config::ColumnConfig],
399    normalize_strategy: Option<&str>,
400    collect_raw: bool,
401) -> FloeResult<ReadInput> {
402    let input_columns = df
403        .get_column_names()
404        .iter()
405        .map(|name| name.to_string())
406        .collect::<Vec<_>>();
407    let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
408    let raw_df = if collect_raw {
409        Some(cast_df_to_string(df)?)
410    } else {
411        None
412    };
413    let typed_df = cast_df_to_schema(df, &typed_schema)?;
414    finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
415}
416
417pub(crate) fn finalize_read_input(
418    input_file: &LocalInputFile,
419    mut raw_df: Option<DataFrame>,
420    mut typed_df: DataFrame,
421    normalize_strategy: Option<&str>,
422) -> FloeResult<ReadInput> {
423    if let Some(strategy) = normalize_strategy {
424        if let Some(raw_df) = raw_df.as_mut() {
425            crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
426        }
427        crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
428    }
429    Ok(ReadInput::Data {
430        input_file: input_file.file.clone(),
431        raw_df,
432        typed_df,
433    })
434}
435
436pub(crate) fn build_typed_schema(
437    input_columns: &[String],
438    declared_columns: &[config::ColumnConfig],
439    normalize_strategy: Option<&str>,
440) -> FloeResult<Schema> {
441    let mut declared_types = HashMap::new();
442    for column in declared_columns {
443        declared_types.insert(
444            column.name.as_str(),
445            config::parse_data_type(&column.column_type)?,
446        );
447    }
448
449    let mut schema = Schema::with_capacity(input_columns.len());
450    for name in input_columns {
451        let normalized = if let Some(strategy) = normalize_strategy {
452            crate::checks::normalize::normalize_name(name, strategy)
453        } else {
454            name.to_string()
455        };
456        let dtype = declared_types
457            .get(normalized.as_str())
458            .cloned()
459            .unwrap_or(DataType::String);
460        schema.insert(name.as_str().into(), dtype);
461    }
462    Ok(schema)
463}
464
465pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
466    cast_df_with_type(df, &DataType::String)
467}
468
469pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
470    let mut columns = Vec::with_capacity(schema.len());
471    for (name, dtype) in schema.iter() {
472        let series = df.column(name.as_str()).map_err(|err| {
473            Box::new(ConfigError(format!(
474                "input column {} not found: {err}",
475                name.as_str()
476            )))
477        })?;
478        let casted =
479            if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
480                cast_string_to_bool(name.as_str(), series)?
481            } else {
482                series
483                    .cast_with_options(dtype, CastOptions::NonStrict)
484                    .map_err(|err| {
485                        Box::new(ConfigError(format!(
486                            "failed to cast input column {}: {err}",
487                            name.as_str()
488                        )))
489                    })?
490            };
491        columns.push(casted);
492    }
493    DataFrame::new(columns).map_err(|err| {
494        Box::new(ConfigError(format!(
495            "failed to build typed dataframe: {err}"
496        ))) as Box<dyn std::error::Error + Send + Sync>
497    })
498}
499
500fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
501    let string_values = series.as_materialized_series().str().map_err(|err| {
502        Box::new(ConfigError(format!(
503            "failed to read boolean column {} as string: {err}",
504            name
505        )))
506    })?;
507    let mut values = Vec::with_capacity(series.len());
508    for value in string_values {
509        let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
510            "true" | "1" => Some(true),
511            "false" | "0" => Some(false),
512            _ => None,
513        });
514        values.push(parsed);
515    }
516    Ok(Series::new(name.into(), values).into())
517}
518
519fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
520    let mut out = df.clone();
521    let names = out
522        .get_column_names()
523        .iter()
524        .map(|name| name.to_string())
525        .collect::<Vec<_>>();
526    for name in names {
527        let series = out.column(&name).map_err(|err| {
528            Box::new(ConfigError(format!(
529                "input column {} not found: {err}",
530                name
531            )))
532        })?;
533        let casted = series
534            .cast_with_options(dtype, CastOptions::NonStrict)
535            .map_err(|err| {
536                Box::new(ConfigError(format!(
537                    "failed to cast input column {}: {err}",
538                    name
539                )))
540            })?;
541        let idx = out.get_column_index(&name).ok_or_else(|| {
542            Box::new(ConfigError(format!(
543                "input column {} not found for update",
544                name
545            )))
546        })?;
547        out.replace_column(idx, casted).map_err(|err| {
548            Box::new(ConfigError(format!(
549                "failed to update input column {}: {err}",
550                name
551            )))
552        })?;
553    }
554    Ok(out)
555}
556pub fn collect_row_errors(
557    raw_df: &DataFrame,
558    typed_df: &DataFrame,
559    required_cols: &[String],
560    columns: &[config::ColumnConfig],
561    track_cast_errors: bool,
562    raw_indices: &check::ColumnIndex,
563    typed_indices: &check::ColumnIndex,
564) -> FloeResult<Vec<Vec<check::RowError>>> {
565    let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
566    if track_cast_errors {
567        let cast_errors =
568            check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
569        for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
570            errors.extend(cast);
571        }
572    }
573    Ok(error_lists)
574}