Skip to main content

floe_core/io/
format.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12    pub source_uri: String,
13    pub source_name: String,
14    pub source_stem: String,
15    pub source_size: Option<u64>,
16    pub source_mtime: Option<String>,
17}
18
19/// An `InputFile` paired with a guaranteed local filesystem path.
20///
21/// For local sources the path is the normalized source path (no copy).
22/// For cloud sources the path is a temp file downloaded just-in-time;
23/// `is_ephemeral` is true and the file should be deleted after use.
24#[derive(Debug, Clone)]
25pub struct LocalInputFile {
26    pub file: InputFile,
27    pub local_path: PathBuf,
28    pub is_ephemeral: bool,
29}
30
31#[derive(Debug, Clone)]
32pub struct FileReadError {
33    pub rule: String,
34    pub message: String,
35}
36
37pub enum ReadInput {
38    Data {
39        input_file: InputFile,
40        raw_df: Option<DataFrame>,
41        typed_df: DataFrame,
42    },
43    FileError {
44        input_file: InputFile,
45        error: FileReadError,
46    },
47}
48
49#[derive(Debug, Clone)]
50pub struct AcceptedWriteMetrics {
51    pub total_bytes_written: Option<u64>,
52    pub avg_file_size_mb: Option<f64>,
53    pub small_files_count: Option<u64>,
54}
55
56#[derive(Debug, Clone, Default)]
57pub struct AcceptedSchemaEvolution {
58    pub enabled: bool,
59    pub mode: String,
60    pub applied: bool,
61    pub added_columns: Vec<String>,
62    pub incompatible_changes_detected: bool,
63}
64
65#[derive(Debug, Clone, Default)]
66pub struct AcceptedWritePerfBreakdown {
67    pub conversion_ms: Option<u64>,
68    pub source_df_build_ms: Option<u64>,
69    pub merge_exec_ms: Option<u64>,
70    pub data_write_ms: Option<u64>,
71    pub commit_ms: Option<u64>,
72    pub metrics_read_ms: Option<u64>,
73}
74
75#[derive(Debug, Clone)]
76pub struct AcceptedMergeMetrics {
77    pub merge_key: Vec<String>,
78    pub inserted_count: u64,
79    pub updated_count: u64,
80    pub closed_count: Option<u64>,
81    pub unchanged_count: Option<u64>,
82    pub target_rows_before: u64,
83    pub target_rows_after: u64,
84    pub merge_elapsed_ms: u64,
85}
86
87#[derive(Debug, Clone)]
88pub struct AcceptedWriteOutput {
89    pub files_written: Option<u64>,
90    pub parts_written: u64,
91    pub part_files: Vec<String>,
92    pub table_version: Option<i64>,
93    pub snapshot_id: Option<i64>,
94    pub table_root_uri: Option<String>,
95    pub iceberg_catalog_name: Option<String>,
96    pub iceberg_database: Option<String>,
97    pub iceberg_namespace: Option<String>,
98    pub iceberg_table: Option<String>,
99    pub delta_catalog_name: Option<String>,
100    pub delta_catalog_schema: Option<String>,
101    pub delta_catalog_table: Option<String>,
102    pub metrics: AcceptedWriteMetrics,
103    pub merge: Option<AcceptedMergeMetrics>,
104    pub schema_evolution: AcceptedSchemaEvolution,
105    pub perf: Option<AcceptedWritePerfBreakdown>,
106}
107
108pub trait InputAdapter: Send + Sync {
109    fn format(&self) -> &'static str;
110
111    fn default_globs(&self) -> FloeResult<Vec<String>> {
112        io::storage::extensions::glob_patterns_for_format(self.format())
113    }
114
115    fn suffixes(&self) -> FloeResult<Vec<String>> {
116        io::storage::extensions::suffixes_for_format(self.format())
117    }
118
119    fn resolve_local_inputs(
120        &self,
121        config_dir: &Path,
122        entity_name: &str,
123        source: &config::SourceConfig,
124        storage: &str,
125    ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
126        let default_globs = self.default_globs()?;
127        io::storage::local::resolve_local_inputs(
128            config_dir,
129            entity_name,
130            source,
131            storage,
132            &default_globs,
133        )
134    }
135
136    fn read_input_columns(
137        &self,
138        entity: &config::EntityConfig,
139        input_file: &LocalInputFile,
140        columns: &[config::ColumnConfig],
141    ) -> Result<Vec<String>, FileReadError>;
142
143    fn read_inputs(
144        &self,
145        entity: &config::EntityConfig,
146        files: &[LocalInputFile],
147        columns: &[config::ColumnConfig],
148        normalize_strategy: Option<&str>,
149        collect_raw: bool,
150    ) -> FloeResult<Vec<ReadInput>>;
151
152    fn read_inputs_with_prechecked_columns(
153        &self,
154        entity: &config::EntityConfig,
155        files: &[LocalInputFile],
156        columns: &[config::ColumnConfig],
157        normalize_strategy: Option<&str>,
158        collect_raw: bool,
159        prechecked_input_columns: Option<&[String]>,
160    ) -> FloeResult<Vec<ReadInput>> {
161        let _ = prechecked_input_columns;
162        self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
163    }
164}
165
166pub trait AcceptedSinkAdapter: Send + Sync {
167    #[allow(clippy::too_many_arguments)]
168    fn write_accepted(
169        &self,
170        target: &Target,
171        df: &mut DataFrame,
172        mode: config::WriteMode,
173        output_stem: &str,
174        temp_dir: Option<&Path>,
175        cloud: &mut io::storage::CloudClient,
176        resolver: &config::StorageResolver,
177        catalogs: &config::CatalogResolver,
178        entity: &config::EntityConfig,
179    ) -> FloeResult<AcceptedWriteOutput>;
180}
181
182pub struct RejectedWriteRequest<'a> {
183    pub target: &'a Target,
184    pub df: &'a mut DataFrame,
185    pub source_stem: &'a str,
186    pub temp_dir: Option<&'a Path>,
187    pub cloud: &'a mut io::storage::CloudClient,
188    pub resolver: &'a config::StorageResolver,
189    pub entity: &'a config::EntityConfig,
190    pub mode: config::WriteMode,
191}
192
193pub trait RejectedSinkAdapter: Send + Sync {
194    fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
195}
196
197#[derive(Debug, Clone, Copy)]
198pub enum FormatKind {
199    Source,
200    SinkAccepted,
201    SinkRejected,
202}
203
204impl FormatKind {
205    fn field_path(self) -> &'static str {
206        match self {
207            FormatKind::Source => "source.format",
208            FormatKind::SinkAccepted => "sink.accepted.format",
209            FormatKind::SinkRejected => "sink.rejected.format",
210        }
211    }
212
213    fn description(self) -> &'static str {
214        match self {
215            FormatKind::Source => "source format",
216            FormatKind::SinkAccepted => "accepted sink format",
217            FormatKind::SinkRejected => "rejected sink format",
218        }
219    }
220}
221
222fn unsupported_format_error(
223    kind: FormatKind,
224    format: &str,
225    entity_name: Option<&str>,
226) -> ConfigError {
227    if let Some(entity_name) = entity_name {
228        return ConfigError(format!(
229            "entity.name={} {}={} is unsupported",
230            entity_name,
231            kind.field_path(),
232            format
233        ));
234    }
235    ConfigError(format!("unsupported {}: {format}", kind.description()))
236}
237
238pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
239    if input_adapter(format).is_err() {
240        return Err(Box::new(unsupported_format_error(
241            FormatKind::Source,
242            format,
243            Some(entity_name),
244        )));
245    }
246    Ok(())
247}
248
249pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
250    if accepted_sink_adapter(format).is_err() {
251        return Err(Box::new(unsupported_format_error(
252            FormatKind::SinkAccepted,
253            format,
254            Some(entity_name),
255        )));
256    }
257    Ok(())
258}
259
260pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
261    if rejected_sink_adapter(format).is_err() {
262        return Err(Box::new(unsupported_format_error(
263            FormatKind::SinkRejected,
264            format,
265            Some(entity_name),
266        )));
267    }
268    Ok(())
269}
270
271pub fn resolve_read_columns(
272    entity: &config::EntityConfig,
273    normalized_columns: &[config::ColumnConfig],
274    normalize_strategy: Option<&str>,
275) -> FloeResult<Vec<config::ColumnConfig>> {
276    if entity.source.format == "json" || entity.source.format == "xml" {
277        check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
278    } else {
279        Ok(normalized_columns.to_vec())
280    }
281}
282
283pub fn sink_options_warning(
284    entity_name: &str,
285    format: &str,
286    options: Option<&config::SinkOptions>,
287) -> Option<String> {
288    let options = options?;
289    if format == "parquet" {
290        return None;
291    }
292    let mut keys = Vec::new();
293    if options.compression.is_some() {
294        keys.push("compression");
295    }
296    if options.row_group_size.is_some() {
297        keys.push("row_group_size");
298    }
299    if options.max_size_per_file.is_some() {
300        keys.push("max_size_per_file");
301    }
302    let detail = if keys.is_empty() {
303        "options".to_string()
304    } else {
305        keys.join(", ")
306    };
307    Some(format!(
308        "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
309        entity_name, format
310    ))
311}
312
313pub fn validate_sink_options(
314    entity_name: &str,
315    format: &str,
316    options: Option<&config::SinkOptions>,
317) -> FloeResult<()> {
318    let options = match options {
319        Some(options) => options,
320        None => return Ok(()),
321    };
322    if format != "parquet" {
323        return Ok(());
324    }
325    if let Some(compression) = &options.compression {
326        match compression.as_str() {
327            "snappy" | "gzip" | "zstd" | "uncompressed" => {}
328            _ => {
329                return Err(Box::new(ConfigError(format!(
330                    "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
331                    entity_name, compression
332                ))))
333            }
334        }
335    }
336    if let Some(row_group_size) = options.row_group_size {
337        if row_group_size == 0 {
338            return Err(Box::new(ConfigError(format!(
339                "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
340                entity_name
341            ))));
342        }
343    }
344    if let Some(max_size_per_file) = options.max_size_per_file {
345        if max_size_per_file == 0 {
346            return Err(Box::new(ConfigError(format!(
347                "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
348                entity_name
349            ))));
350        }
351    }
352    Ok(())
353}
354
355pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
356    match format {
357        "csv" => Ok(io::read::csv::csv_input_adapter()),
358        "tsv" => Ok(io::read::csv::tsv_input_adapter()),
359        "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
360        "orc" => Ok(io::read::orc::orc_input_adapter()),
361        "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
362        "json" => Ok(io::read::json::json_input_adapter()),
363        "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
364        "avro" => Ok(io::read::avro::avro_input_adapter()),
365        "xml" => Ok(io::read::xml::xml_input_adapter()),
366        _ => Err(Box::new(unsupported_format_error(
367            FormatKind::Source,
368            format,
369            None,
370        ))),
371    }
372}
373
374pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
375    match format {
376        "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
377        "delta" => Ok(io::write::delta::delta_accepted_adapter()),
378        "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
379        _ => Err(Box::new(unsupported_format_error(
380            FormatKind::SinkAccepted,
381            format,
382            None,
383        ))),
384    }
385}
386
387pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
388    match format {
389        "csv" => Ok(io::write::csv::csv_rejected_adapter()),
390        _ => Err(Box::new(unsupported_format_error(
391            FormatKind::SinkRejected,
392            format,
393            None,
394        ))),
395    }
396}
397
398pub(crate) fn read_input_from_df(
399    input_file: &LocalInputFile,
400    df: &DataFrame,
401    columns: &[config::ColumnConfig],
402    normalize_strategy: Option<&str>,
403    collect_raw: bool,
404) -> FloeResult<ReadInput> {
405    let input_columns = df
406        .get_column_names()
407        .iter()
408        .map(|name| name.to_string())
409        .collect::<Vec<_>>();
410    let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
411    let raw_df = if collect_raw {
412        Some(cast_df_to_string(df)?)
413    } else {
414        None
415    };
416    let typed_df = cast_df_to_schema(df, &typed_schema)?;
417    finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
418}
419
420pub(crate) fn finalize_read_input(
421    input_file: &LocalInputFile,
422    mut raw_df: Option<DataFrame>,
423    mut typed_df: DataFrame,
424    normalize_strategy: Option<&str>,
425) -> FloeResult<ReadInput> {
426    if let Some(strategy) = normalize_strategy {
427        if let Some(raw_df) = raw_df.as_mut() {
428            crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
429        }
430        crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
431    }
432    Ok(ReadInput::Data {
433        input_file: input_file.file.clone(),
434        raw_df,
435        typed_df,
436    })
437}
438
439pub(crate) fn build_typed_schema(
440    input_columns: &[String],
441    declared_columns: &[config::ColumnConfig],
442    normalize_strategy: Option<&str>,
443) -> FloeResult<Schema> {
444    let mut declared_types = HashMap::new();
445    for column in declared_columns {
446        declared_types.insert(
447            column.name.as_str(),
448            config::parse_data_type(&column.column_type)?,
449        );
450    }
451
452    let mut schema = Schema::with_capacity(input_columns.len());
453    for name in input_columns {
454        let normalized = if let Some(strategy) = normalize_strategy {
455            crate::checks::normalize::normalize_name(name, strategy)
456        } else {
457            name.to_string()
458        };
459        let dtype = declared_types
460            .get(normalized.as_str())
461            .cloned()
462            .unwrap_or(DataType::String);
463        schema.insert(name.as_str().into(), dtype);
464    }
465    Ok(schema)
466}
467
468pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
469    cast_df_with_type(df, &DataType::String)
470}
471
472pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
473    let mut columns = Vec::with_capacity(schema.len());
474    for (name, dtype) in schema.iter() {
475        let series = df.column(name.as_str()).map_err(|err| {
476            Box::new(ConfigError(format!(
477                "input column {} not found: {err}",
478                name.as_str()
479            )))
480        })?;
481        let casted =
482            if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
483                cast_string_to_bool(name.as_str(), series)?
484            } else {
485                series
486                    .cast_with_options(dtype, CastOptions::NonStrict)
487                    .map_err(|err| {
488                        Box::new(ConfigError(format!(
489                            "failed to cast input column {}: {err}",
490                            name.as_str()
491                        )))
492                    })?
493            };
494        columns.push(casted);
495    }
496    DataFrame::new(columns).map_err(|err| {
497        Box::new(ConfigError(format!(
498            "failed to build typed dataframe: {err}"
499        ))) as Box<dyn std::error::Error + Send + Sync>
500    })
501}
502
503fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
504    let string_values = series.as_materialized_series().str().map_err(|err| {
505        Box::new(ConfigError(format!(
506            "failed to read boolean column {} as string: {err}",
507            name
508        )))
509    })?;
510    let mut values = Vec::with_capacity(series.len());
511    for value in string_values {
512        let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
513            "true" | "1" => Some(true),
514            "false" | "0" => Some(false),
515            _ => None,
516        });
517        values.push(parsed);
518    }
519    Ok(Series::new(name.into(), values).into())
520}
521
522fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
523    let mut out = df.clone();
524    let names = out
525        .get_column_names()
526        .iter()
527        .map(|name| name.to_string())
528        .collect::<Vec<_>>();
529    for name in names {
530        let series = out.column(&name).map_err(|err| {
531            Box::new(ConfigError(format!(
532                "input column {} not found: {err}",
533                name
534            )))
535        })?;
536        let casted = series
537            .cast_with_options(dtype, CastOptions::NonStrict)
538            .map_err(|err| {
539                Box::new(ConfigError(format!(
540                    "failed to cast input column {}: {err}",
541                    name
542                )))
543            })?;
544        let idx = out.get_column_index(&name).ok_or_else(|| {
545            Box::new(ConfigError(format!(
546                "input column {} not found for update",
547                name
548            )))
549        })?;
550        out.replace_column(idx, casted).map_err(|err| {
551            Box::new(ConfigError(format!(
552                "failed to update input column {}: {err}",
553                name
554            )))
555        })?;
556    }
557    Ok(out)
558}
559pub fn collect_row_errors(
560    raw_df: &DataFrame,
561    typed_df: &DataFrame,
562    required_cols: &[String],
563    columns: &[config::ColumnConfig],
564    track_cast_errors: bool,
565    raw_indices: &check::ColumnIndex,
566    typed_indices: &check::ColumnIndex,
567) -> FloeResult<Vec<Vec<check::RowError>>> {
568    let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
569    if track_cast_errors {
570        let cast_errors =
571            check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
572        for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
573            errors.extend(cast);
574        }
575    }
576    Ok(error_lists)
577}