Skip to main content

floe_core/io/
format.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12    pub source_uri: String,
13    pub source_local_path: PathBuf,
14    pub source_name: String,
15    pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20    pub rule: String,
21    pub message: String,
22}
23
24pub enum ReadInput {
25    Data {
26        input_file: InputFile,
27        raw_df: Option<DataFrame>,
28        typed_df: DataFrame,
29    },
30    FileError {
31        input_file: InputFile,
32        error: FileReadError,
33    },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteOutput {
38    pub parts_written: u64,
39    pub part_files: Vec<String>,
40    pub table_version: Option<i64>,
41}
42
43pub trait InputAdapter: Send + Sync {
44    fn format(&self) -> &'static str;
45
46    fn default_globs(&self) -> FloeResult<Vec<String>> {
47        io::storage::extensions::glob_patterns_for_format(self.format())
48    }
49
50    fn suffixes(&self) -> FloeResult<Vec<String>> {
51        io::storage::extensions::suffixes_for_format(self.format())
52    }
53
54    fn resolve_local_inputs(
55        &self,
56        config_dir: &Path,
57        entity_name: &str,
58        source: &config::SourceConfig,
59        storage: &str,
60    ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
61        let default_globs = self.default_globs()?;
62        io::storage::local::resolve_local_inputs(
63            config_dir,
64            entity_name,
65            source,
66            storage,
67            &default_globs,
68        )
69    }
70
71    fn read_input_columns(
72        &self,
73        entity: &config::EntityConfig,
74        input_file: &InputFile,
75        columns: &[config::ColumnConfig],
76    ) -> Result<Vec<String>, FileReadError>;
77
78    fn read_inputs(
79        &self,
80        entity: &config::EntityConfig,
81        files: &[InputFile],
82        columns: &[config::ColumnConfig],
83        normalize_strategy: Option<&str>,
84        collect_raw: bool,
85    ) -> FloeResult<Vec<ReadInput>>;
86}
87
88pub trait AcceptedSinkAdapter: Send + Sync {
89    fn write_accepted(
90        &self,
91        target: &Target,
92        df: &mut DataFrame,
93        output_stem: &str,
94        temp_dir: Option<&Path>,
95        cloud: &mut io::storage::CloudClient,
96        resolver: &config::StorageResolver,
97        entity: &config::EntityConfig,
98    ) -> FloeResult<AcceptedWriteOutput>;
99}
100
101pub trait RejectedSinkAdapter: Send + Sync {
102    fn write_rejected(
103        &self,
104        target: &Target,
105        df: &mut DataFrame,
106        source_stem: &str,
107        temp_dir: Option<&Path>,
108        cloud: &mut io::storage::CloudClient,
109        resolver: &config::StorageResolver,
110        entity: &config::EntityConfig,
111    ) -> FloeResult<String>;
112}
113
114#[derive(Debug, Clone, Copy)]
115pub enum FormatKind {
116    Source,
117    SinkAccepted,
118    SinkRejected,
119}
120
121impl FormatKind {
122    fn field_path(self) -> &'static str {
123        match self {
124            FormatKind::Source => "source.format",
125            FormatKind::SinkAccepted => "sink.accepted.format",
126            FormatKind::SinkRejected => "sink.rejected.format",
127        }
128    }
129
130    fn description(self) -> &'static str {
131        match self {
132            FormatKind::Source => "source format",
133            FormatKind::SinkAccepted => "accepted sink format",
134            FormatKind::SinkRejected => "rejected sink format",
135        }
136    }
137}
138
139fn unsupported_format_error(
140    kind: FormatKind,
141    format: &str,
142    entity_name: Option<&str>,
143) -> ConfigError {
144    if let Some(entity_name) = entity_name {
145        return ConfigError(format!(
146            "entity.name={} {}={} is unsupported",
147            entity_name,
148            kind.field_path(),
149            format
150        ));
151    }
152    ConfigError(format!("unsupported {}: {format}", kind.description()))
153}
154
155pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
156    if input_adapter(format).is_err() {
157        return Err(Box::new(unsupported_format_error(
158            FormatKind::Source,
159            format,
160            Some(entity_name),
161        )));
162    }
163    Ok(())
164}
165
166pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
167    if accepted_sink_adapter(format).is_err() {
168        return Err(Box::new(unsupported_format_error(
169            FormatKind::SinkAccepted,
170            format,
171            Some(entity_name),
172        )));
173    }
174    Ok(())
175}
176
177pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
178    if rejected_sink_adapter(format).is_err() {
179        return Err(Box::new(unsupported_format_error(
180            FormatKind::SinkRejected,
181            format,
182            Some(entity_name),
183        )));
184    }
185    Ok(())
186}
187
188pub fn sink_options_warning(
189    entity_name: &str,
190    format: &str,
191    options: Option<&config::SinkOptions>,
192) -> Option<String> {
193    let options = options?;
194    if format == "parquet" {
195        return None;
196    }
197    let mut keys = Vec::new();
198    if options.compression.is_some() {
199        keys.push("compression");
200    }
201    if options.row_group_size.is_some() {
202        keys.push("row_group_size");
203    }
204    if options.max_size_per_file.is_some() {
205        keys.push("max_size_per_file");
206    }
207    let detail = if keys.is_empty() {
208        "options".to_string()
209    } else {
210        keys.join(", ")
211    };
212    Some(format!(
213        "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
214        entity_name, format
215    ))
216}
217
218pub fn validate_sink_options(
219    entity_name: &str,
220    format: &str,
221    options: Option<&config::SinkOptions>,
222) -> FloeResult<()> {
223    let options = match options {
224        Some(options) => options,
225        None => return Ok(()),
226    };
227    if format != "parquet" {
228        return Ok(());
229    }
230    if let Some(compression) = &options.compression {
231        match compression.as_str() {
232            "snappy" | "gzip" | "zstd" | "uncompressed" => {}
233            _ => {
234                return Err(Box::new(ConfigError(format!(
235                    "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
236                    entity_name, compression
237                ))))
238            }
239        }
240    }
241    if let Some(row_group_size) = options.row_group_size {
242        if row_group_size == 0 {
243            return Err(Box::new(ConfigError(format!(
244                "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
245                entity_name
246            ))));
247        }
248    }
249    if let Some(max_size_per_file) = options.max_size_per_file {
250        if max_size_per_file == 0 {
251            return Err(Box::new(ConfigError(format!(
252                "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
253                entity_name
254            ))));
255        }
256    }
257    Ok(())
258}
259
260pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
261    match format {
262        "csv" => Ok(io::read::csv::csv_input_adapter()),
263        "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
264        "json" => Ok(io::read::json::json_input_adapter()),
265        _ => Err(Box::new(unsupported_format_error(
266            FormatKind::Source,
267            format,
268            None,
269        ))),
270    }
271}
272
273pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
274    match format {
275        "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
276        "delta" => Ok(io::write::delta::delta_accepted_adapter()),
277        "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
278        _ => Err(Box::new(unsupported_format_error(
279            FormatKind::SinkAccepted,
280            format,
281            None,
282        ))),
283    }
284}
285
286pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
287    match format {
288        "csv" => Ok(io::write::csv::csv_rejected_adapter()),
289        _ => Err(Box::new(unsupported_format_error(
290            FormatKind::SinkRejected,
291            format,
292            None,
293        ))),
294    }
295}
296
297pub(crate) fn read_input_from_df(
298    input_file: &InputFile,
299    df: &DataFrame,
300    columns: &[config::ColumnConfig],
301    normalize_strategy: Option<&str>,
302    collect_raw: bool,
303) -> FloeResult<ReadInput> {
304    let input_columns = df
305        .get_column_names()
306        .iter()
307        .map(|name| name.to_string())
308        .collect::<Vec<_>>();
309    let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
310    let raw_df = if collect_raw {
311        Some(cast_df_to_string(df)?)
312    } else {
313        None
314    };
315    let typed_df = cast_df_to_schema(df, &typed_schema)?;
316    finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
317}
318
319pub(crate) fn finalize_read_input(
320    input_file: &InputFile,
321    mut raw_df: Option<DataFrame>,
322    mut typed_df: DataFrame,
323    normalize_strategy: Option<&str>,
324) -> FloeResult<ReadInput> {
325    if let Some(strategy) = normalize_strategy {
326        if let Some(raw_df) = raw_df.as_mut() {
327            crate::run::normalize::normalize_dataframe_columns(raw_df, strategy)?;
328        }
329        crate::run::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
330    }
331    Ok(ReadInput::Data {
332        input_file: input_file.clone(),
333        raw_df,
334        typed_df,
335    })
336}
337
338pub(crate) fn build_typed_schema(
339    input_columns: &[String],
340    declared_columns: &[config::ColumnConfig],
341    normalize_strategy: Option<&str>,
342) -> FloeResult<Schema> {
343    let mut declared_types = HashMap::new();
344    for column in declared_columns {
345        declared_types.insert(
346            column.name.as_str(),
347            config::parse_data_type(&column.column_type)?,
348        );
349    }
350
351    let mut schema = Schema::with_capacity(input_columns.len());
352    for name in input_columns {
353        let normalized = if let Some(strategy) = normalize_strategy {
354            crate::run::normalize::normalize_name(name, strategy)
355        } else {
356            name.to_string()
357        };
358        let dtype = declared_types
359            .get(normalized.as_str())
360            .cloned()
361            .unwrap_or(DataType::String);
362        schema.insert(name.as_str().into(), dtype);
363    }
364    Ok(schema)
365}
366
367pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
368    cast_df_with_type(df, &DataType::String)
369}
370
371pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
372    let mut columns = Vec::with_capacity(schema.len());
373    for (name, dtype) in schema.iter() {
374        let series = df.column(name.as_str()).map_err(|err| {
375            Box::new(ConfigError(format!(
376                "input column {} not found: {err}",
377                name.as_str()
378            )))
379        })?;
380        let casted =
381            if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
382                cast_string_to_bool(name.as_str(), series)?
383            } else {
384                series
385                    .cast_with_options(dtype, CastOptions::NonStrict)
386                    .map_err(|err| {
387                        Box::new(ConfigError(format!(
388                            "failed to cast input column {}: {err}",
389                            name.as_str()
390                        )))
391                    })?
392            };
393        columns.push(casted);
394    }
395    DataFrame::new(columns).map_err(|err| {
396        Box::new(ConfigError(format!(
397            "failed to build typed dataframe: {err}"
398        ))) as Box<dyn std::error::Error + Send + Sync>
399    })
400}
401
402fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
403    let string_values = series.as_materialized_series().str().map_err(|err| {
404        Box::new(ConfigError(format!(
405            "failed to read boolean column {} as string: {err}",
406            name
407        )))
408    })?;
409    let mut values = Vec::with_capacity(series.len());
410    for value in string_values {
411        let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
412            "true" | "1" => Some(true),
413            "false" | "0" => Some(false),
414            _ => None,
415        });
416        values.push(parsed);
417    }
418    Ok(Series::new(name.into(), values).into())
419}
420
421fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
422    let mut out = df.clone();
423    let names = out
424        .get_column_names()
425        .iter()
426        .map(|name| name.to_string())
427        .collect::<Vec<_>>();
428    for name in names {
429        let series = out.column(&name).map_err(|err| {
430            Box::new(ConfigError(format!(
431                "input column {} not found: {err}",
432                name
433            )))
434        })?;
435        let casted = series
436            .cast_with_options(dtype, CastOptions::NonStrict)
437            .map_err(|err| {
438                Box::new(ConfigError(format!(
439                    "failed to cast input column {}: {err}",
440                    name
441                )))
442            })?;
443        let idx = out.get_column_index(&name).ok_or_else(|| {
444            Box::new(ConfigError(format!(
445                "input column {} not found for update",
446                name
447            )))
448        })?;
449        out.replace_column(idx, casted).map_err(|err| {
450            Box::new(ConfigError(format!(
451                "failed to update input column {}: {err}",
452                name
453            )))
454        })?;
455    }
456    Ok(out)
457}
458pub fn collect_row_errors(
459    raw_df: &DataFrame,
460    typed_df: &DataFrame,
461    required_cols: &[String],
462    columns: &[config::ColumnConfig],
463    track_cast_errors: bool,
464    raw_indices: &check::ColumnIndex,
465    typed_indices: &check::ColumnIndex,
466) -> FloeResult<Vec<Vec<check::RowError>>> {
467    let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
468    if track_cast_errors {
469        let cast_errors =
470            check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
471        for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
472            errors.extend(cast);
473        }
474    }
475    Ok(error_lists)
476}