1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12 pub source_uri: String,
13 pub source_local_path: PathBuf,
14 pub source_name: String,
15 pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20 pub rule: String,
21 pub message: String,
22}
23
24pub enum ReadInput {
25 Data {
26 input_file: InputFile,
27 raw_df: Option<DataFrame>,
28 typed_df: DataFrame,
29 },
30 FileError {
31 input_file: InputFile,
32 error: FileReadError,
33 },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteMetrics {
38 pub total_bytes_written: Option<u64>,
39 pub avg_file_size_mb: Option<f64>,
40 pub small_files_count: Option<u64>,
41}
42
43#[derive(Debug, Clone)]
44pub struct AcceptedWriteOutput {
45 pub files_written: u64,
46 pub parts_written: u64,
47 pub part_files: Vec<String>,
48 pub table_version: Option<i64>,
49 pub snapshot_id: Option<i64>,
50 pub table_root_uri: Option<String>,
51 pub iceberg_catalog_name: Option<String>,
52 pub iceberg_database: Option<String>,
53 pub iceberg_namespace: Option<String>,
54 pub iceberg_table: Option<String>,
55 pub metrics: AcceptedWriteMetrics,
56}
57
58pub trait InputAdapter: Send + Sync {
59 fn format(&self) -> &'static str;
60
61 fn default_globs(&self) -> FloeResult<Vec<String>> {
62 io::storage::extensions::glob_patterns_for_format(self.format())
63 }
64
65 fn suffixes(&self) -> FloeResult<Vec<String>> {
66 io::storage::extensions::suffixes_for_format(self.format())
67 }
68
69 fn resolve_local_inputs(
70 &self,
71 config_dir: &Path,
72 entity_name: &str,
73 source: &config::SourceConfig,
74 storage: &str,
75 ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
76 let default_globs = self.default_globs()?;
77 io::storage::local::resolve_local_inputs(
78 config_dir,
79 entity_name,
80 source,
81 storage,
82 &default_globs,
83 )
84 }
85
86 fn read_input_columns(
87 &self,
88 entity: &config::EntityConfig,
89 input_file: &InputFile,
90 columns: &[config::ColumnConfig],
91 ) -> Result<Vec<String>, FileReadError>;
92
93 fn read_inputs(
94 &self,
95 entity: &config::EntityConfig,
96 files: &[InputFile],
97 columns: &[config::ColumnConfig],
98 normalize_strategy: Option<&str>,
99 collect_raw: bool,
100 ) -> FloeResult<Vec<ReadInput>>;
101}
102
103pub trait AcceptedSinkAdapter: Send + Sync {
104 #[allow(clippy::too_many_arguments)]
105 fn write_accepted(
106 &self,
107 target: &Target,
108 df: &mut DataFrame,
109 mode: config::WriteMode,
110 output_stem: &str,
111 temp_dir: Option<&Path>,
112 cloud: &mut io::storage::CloudClient,
113 resolver: &config::StorageResolver,
114 catalogs: &config::CatalogResolver,
115 entity: &config::EntityConfig,
116 ) -> FloeResult<AcceptedWriteOutput>;
117}
118
119pub struct RejectedWriteRequest<'a> {
120 pub target: &'a Target,
121 pub df: &'a mut DataFrame,
122 pub source_stem: &'a str,
123 pub temp_dir: Option<&'a Path>,
124 pub cloud: &'a mut io::storage::CloudClient,
125 pub resolver: &'a config::StorageResolver,
126 pub entity: &'a config::EntityConfig,
127 pub mode: config::WriteMode,
128}
129
130pub trait RejectedSinkAdapter: Send + Sync {
131 fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
132}
133
134#[derive(Debug, Clone, Copy)]
135pub enum FormatKind {
136 Source,
137 SinkAccepted,
138 SinkRejected,
139}
140
141impl FormatKind {
142 fn field_path(self) -> &'static str {
143 match self {
144 FormatKind::Source => "source.format",
145 FormatKind::SinkAccepted => "sink.accepted.format",
146 FormatKind::SinkRejected => "sink.rejected.format",
147 }
148 }
149
150 fn description(self) -> &'static str {
151 match self {
152 FormatKind::Source => "source format",
153 FormatKind::SinkAccepted => "accepted sink format",
154 FormatKind::SinkRejected => "rejected sink format",
155 }
156 }
157}
158
159fn unsupported_format_error(
160 kind: FormatKind,
161 format: &str,
162 entity_name: Option<&str>,
163) -> ConfigError {
164 if let Some(entity_name) = entity_name {
165 return ConfigError(format!(
166 "entity.name={} {}={} is unsupported",
167 entity_name,
168 kind.field_path(),
169 format
170 ));
171 }
172 ConfigError(format!("unsupported {}: {format}", kind.description()))
173}
174
175pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
176 if input_adapter(format).is_err() {
177 return Err(Box::new(unsupported_format_error(
178 FormatKind::Source,
179 format,
180 Some(entity_name),
181 )));
182 }
183 Ok(())
184}
185
186pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
187 if accepted_sink_adapter(format).is_err() {
188 return Err(Box::new(unsupported_format_error(
189 FormatKind::SinkAccepted,
190 format,
191 Some(entity_name),
192 )));
193 }
194 Ok(())
195}
196
197pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
198 if rejected_sink_adapter(format).is_err() {
199 return Err(Box::new(unsupported_format_error(
200 FormatKind::SinkRejected,
201 format,
202 Some(entity_name),
203 )));
204 }
205 Ok(())
206}
207
208pub fn resolve_read_columns(
209 entity: &config::EntityConfig,
210 normalized_columns: &[config::ColumnConfig],
211 normalize_strategy: Option<&str>,
212) -> FloeResult<Vec<config::ColumnConfig>> {
213 if entity.source.format == "json" || entity.source.format == "xml" {
214 check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
215 } else {
216 Ok(normalized_columns.to_vec())
217 }
218}
219
220pub fn sink_options_warning(
221 entity_name: &str,
222 format: &str,
223 options: Option<&config::SinkOptions>,
224) -> Option<String> {
225 let options = options?;
226 if format == "parquet" {
227 return None;
228 }
229 let mut keys = Vec::new();
230 if options.compression.is_some() {
231 keys.push("compression");
232 }
233 if options.row_group_size.is_some() {
234 keys.push("row_group_size");
235 }
236 if options.max_size_per_file.is_some() {
237 keys.push("max_size_per_file");
238 }
239 let detail = if keys.is_empty() {
240 "options".to_string()
241 } else {
242 keys.join(", ")
243 };
244 Some(format!(
245 "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
246 entity_name, format
247 ))
248}
249
250pub fn validate_sink_options(
251 entity_name: &str,
252 format: &str,
253 options: Option<&config::SinkOptions>,
254) -> FloeResult<()> {
255 let options = match options {
256 Some(options) => options,
257 None => return Ok(()),
258 };
259 if format != "parquet" {
260 return Ok(());
261 }
262 if let Some(compression) = &options.compression {
263 match compression.as_str() {
264 "snappy" | "gzip" | "zstd" | "uncompressed" => {}
265 _ => {
266 return Err(Box::new(ConfigError(format!(
267 "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
268 entity_name, compression
269 ))))
270 }
271 }
272 }
273 if let Some(row_group_size) = options.row_group_size {
274 if row_group_size == 0 {
275 return Err(Box::new(ConfigError(format!(
276 "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
277 entity_name
278 ))));
279 }
280 }
281 if let Some(max_size_per_file) = options.max_size_per_file {
282 if max_size_per_file == 0 {
283 return Err(Box::new(ConfigError(format!(
284 "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
285 entity_name
286 ))));
287 }
288 }
289 Ok(())
290}
291
292pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
293 match format {
294 "csv" => Ok(io::read::csv::csv_input_adapter()),
295 "tsv" => Ok(io::read::csv::tsv_input_adapter()),
296 "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
297 "orc" => Ok(io::read::orc::orc_input_adapter()),
298 "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
299 "json" => Ok(io::read::json::json_input_adapter()),
300 "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
301 "avro" => Ok(io::read::avro::avro_input_adapter()),
302 "xml" => Ok(io::read::xml::xml_input_adapter()),
303 _ => Err(Box::new(unsupported_format_error(
304 FormatKind::Source,
305 format,
306 None,
307 ))),
308 }
309}
310
311pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
312 match format {
313 "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
314 "delta" => Ok(io::write::delta::delta_accepted_adapter()),
315 "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
316 _ => Err(Box::new(unsupported_format_error(
317 FormatKind::SinkAccepted,
318 format,
319 None,
320 ))),
321 }
322}
323
324pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
325 match format {
326 "csv" => Ok(io::write::csv::csv_rejected_adapter()),
327 _ => Err(Box::new(unsupported_format_error(
328 FormatKind::SinkRejected,
329 format,
330 None,
331 ))),
332 }
333}
334
335pub(crate) fn read_input_from_df(
336 input_file: &InputFile,
337 df: &DataFrame,
338 columns: &[config::ColumnConfig],
339 normalize_strategy: Option<&str>,
340 collect_raw: bool,
341) -> FloeResult<ReadInput> {
342 let input_columns = df
343 .get_column_names()
344 .iter()
345 .map(|name| name.to_string())
346 .collect::<Vec<_>>();
347 let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
348 let raw_df = if collect_raw {
349 Some(cast_df_to_string(df)?)
350 } else {
351 None
352 };
353 let typed_df = cast_df_to_schema(df, &typed_schema)?;
354 finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
355}
356
357pub(crate) fn finalize_read_input(
358 input_file: &InputFile,
359 mut raw_df: Option<DataFrame>,
360 mut typed_df: DataFrame,
361 normalize_strategy: Option<&str>,
362) -> FloeResult<ReadInput> {
363 if let Some(strategy) = normalize_strategy {
364 if let Some(raw_df) = raw_df.as_mut() {
365 crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
366 }
367 crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
368 }
369 Ok(ReadInput::Data {
370 input_file: input_file.clone(),
371 raw_df,
372 typed_df,
373 })
374}
375
376pub(crate) fn build_typed_schema(
377 input_columns: &[String],
378 declared_columns: &[config::ColumnConfig],
379 normalize_strategy: Option<&str>,
380) -> FloeResult<Schema> {
381 let mut declared_types = HashMap::new();
382 for column in declared_columns {
383 declared_types.insert(
384 column.name.as_str(),
385 config::parse_data_type(&column.column_type)?,
386 );
387 }
388
389 let mut schema = Schema::with_capacity(input_columns.len());
390 for name in input_columns {
391 let normalized = if let Some(strategy) = normalize_strategy {
392 crate::checks::normalize::normalize_name(name, strategy)
393 } else {
394 name.to_string()
395 };
396 let dtype = declared_types
397 .get(normalized.as_str())
398 .cloned()
399 .unwrap_or(DataType::String);
400 schema.insert(name.as_str().into(), dtype);
401 }
402 Ok(schema)
403}
404
405pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
406 cast_df_with_type(df, &DataType::String)
407}
408
409pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
410 let mut columns = Vec::with_capacity(schema.len());
411 for (name, dtype) in schema.iter() {
412 let series = df.column(name.as_str()).map_err(|err| {
413 Box::new(ConfigError(format!(
414 "input column {} not found: {err}",
415 name.as_str()
416 )))
417 })?;
418 let casted =
419 if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
420 cast_string_to_bool(name.as_str(), series)?
421 } else {
422 series
423 .cast_with_options(dtype, CastOptions::NonStrict)
424 .map_err(|err| {
425 Box::new(ConfigError(format!(
426 "failed to cast input column {}: {err}",
427 name.as_str()
428 )))
429 })?
430 };
431 columns.push(casted);
432 }
433 DataFrame::new(columns).map_err(|err| {
434 Box::new(ConfigError(format!(
435 "failed to build typed dataframe: {err}"
436 ))) as Box<dyn std::error::Error + Send + Sync>
437 })
438}
439
440fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
441 let string_values = series.as_materialized_series().str().map_err(|err| {
442 Box::new(ConfigError(format!(
443 "failed to read boolean column {} as string: {err}",
444 name
445 )))
446 })?;
447 let mut values = Vec::with_capacity(series.len());
448 for value in string_values {
449 let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
450 "true" | "1" => Some(true),
451 "false" | "0" => Some(false),
452 _ => None,
453 });
454 values.push(parsed);
455 }
456 Ok(Series::new(name.into(), values).into())
457}
458
459fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
460 let mut out = df.clone();
461 let names = out
462 .get_column_names()
463 .iter()
464 .map(|name| name.to_string())
465 .collect::<Vec<_>>();
466 for name in names {
467 let series = out.column(&name).map_err(|err| {
468 Box::new(ConfigError(format!(
469 "input column {} not found: {err}",
470 name
471 )))
472 })?;
473 let casted = series
474 .cast_with_options(dtype, CastOptions::NonStrict)
475 .map_err(|err| {
476 Box::new(ConfigError(format!(
477 "failed to cast input column {}: {err}",
478 name
479 )))
480 })?;
481 let idx = out.get_column_index(&name).ok_or_else(|| {
482 Box::new(ConfigError(format!(
483 "input column {} not found for update",
484 name
485 )))
486 })?;
487 out.replace_column(idx, casted).map_err(|err| {
488 Box::new(ConfigError(format!(
489 "failed to update input column {}: {err}",
490 name
491 )))
492 })?;
493 }
494 Ok(out)
495}
496pub fn collect_row_errors(
497 raw_df: &DataFrame,
498 typed_df: &DataFrame,
499 required_cols: &[String],
500 columns: &[config::ColumnConfig],
501 track_cast_errors: bool,
502 raw_indices: &check::ColumnIndex,
503 typed_indices: &check::ColumnIndex,
504) -> FloeResult<Vec<Vec<check::RowError>>> {
505 let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
506 if track_cast_errors {
507 let cast_errors =
508 check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
509 for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
510 errors.extend(cast);
511 }
512 }
513 Ok(error_lists)
514}