1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12 pub source_uri: String,
13 pub source_local_path: PathBuf,
14 pub source_name: String,
15 pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20 pub rule: String,
21 pub message: String,
22}
23
24pub enum ReadInput {
25 Data {
26 input_file: InputFile,
27 raw_df: Option<DataFrame>,
28 typed_df: DataFrame,
29 },
30 FileError {
31 input_file: InputFile,
32 error: FileReadError,
33 },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteMetrics {
38 pub total_bytes_written: Option<u64>,
39 pub avg_file_size_mb: Option<f64>,
40 pub small_files_count: Option<u64>,
41}
42
43#[derive(Debug, Clone)]
44pub struct AcceptedMergeMetrics {
45 pub merge_key: Vec<String>,
46 pub inserted_count: u64,
47 pub updated_count: u64,
48 pub target_rows_before: u64,
49 pub target_rows_after: u64,
50 pub merge_elapsed_ms: u64,
51}
52
53#[derive(Debug, Clone)]
54pub struct AcceptedWriteOutput {
55 pub files_written: u64,
56 pub parts_written: u64,
57 pub part_files: Vec<String>,
58 pub table_version: Option<i64>,
59 pub snapshot_id: Option<i64>,
60 pub table_root_uri: Option<String>,
61 pub iceberg_catalog_name: Option<String>,
62 pub iceberg_database: Option<String>,
63 pub iceberg_namespace: Option<String>,
64 pub iceberg_table: Option<String>,
65 pub metrics: AcceptedWriteMetrics,
66 pub merge: Option<AcceptedMergeMetrics>,
67}
68
69pub trait InputAdapter: Send + Sync {
70 fn format(&self) -> &'static str;
71
72 fn default_globs(&self) -> FloeResult<Vec<String>> {
73 io::storage::extensions::glob_patterns_for_format(self.format())
74 }
75
76 fn suffixes(&self) -> FloeResult<Vec<String>> {
77 io::storage::extensions::suffixes_for_format(self.format())
78 }
79
80 fn resolve_local_inputs(
81 &self,
82 config_dir: &Path,
83 entity_name: &str,
84 source: &config::SourceConfig,
85 storage: &str,
86 ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
87 let default_globs = self.default_globs()?;
88 io::storage::local::resolve_local_inputs(
89 config_dir,
90 entity_name,
91 source,
92 storage,
93 &default_globs,
94 )
95 }
96
97 fn read_input_columns(
98 &self,
99 entity: &config::EntityConfig,
100 input_file: &InputFile,
101 columns: &[config::ColumnConfig],
102 ) -> Result<Vec<String>, FileReadError>;
103
104 fn read_inputs(
105 &self,
106 entity: &config::EntityConfig,
107 files: &[InputFile],
108 columns: &[config::ColumnConfig],
109 normalize_strategy: Option<&str>,
110 collect_raw: bool,
111 ) -> FloeResult<Vec<ReadInput>>;
112
113 fn read_inputs_with_prechecked_columns(
114 &self,
115 entity: &config::EntityConfig,
116 files: &[InputFile],
117 columns: &[config::ColumnConfig],
118 normalize_strategy: Option<&str>,
119 collect_raw: bool,
120 prechecked_input_columns: Option<&[String]>,
121 ) -> FloeResult<Vec<ReadInput>> {
122 let _ = prechecked_input_columns;
123 self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
124 }
125}
126
127pub trait AcceptedSinkAdapter: Send + Sync {
128 #[allow(clippy::too_many_arguments)]
129 fn write_accepted(
130 &self,
131 target: &Target,
132 df: &mut DataFrame,
133 mode: config::WriteMode,
134 output_stem: &str,
135 temp_dir: Option<&Path>,
136 cloud: &mut io::storage::CloudClient,
137 resolver: &config::StorageResolver,
138 catalogs: &config::CatalogResolver,
139 entity: &config::EntityConfig,
140 ) -> FloeResult<AcceptedWriteOutput>;
141}
142
143pub struct RejectedWriteRequest<'a> {
144 pub target: &'a Target,
145 pub df: &'a mut DataFrame,
146 pub source_stem: &'a str,
147 pub temp_dir: Option<&'a Path>,
148 pub cloud: &'a mut io::storage::CloudClient,
149 pub resolver: &'a config::StorageResolver,
150 pub entity: &'a config::EntityConfig,
151 pub mode: config::WriteMode,
152}
153
154pub trait RejectedSinkAdapter: Send + Sync {
155 fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
156}
157
158#[derive(Debug, Clone, Copy)]
159pub enum FormatKind {
160 Source,
161 SinkAccepted,
162 SinkRejected,
163}
164
165impl FormatKind {
166 fn field_path(self) -> &'static str {
167 match self {
168 FormatKind::Source => "source.format",
169 FormatKind::SinkAccepted => "sink.accepted.format",
170 FormatKind::SinkRejected => "sink.rejected.format",
171 }
172 }
173
174 fn description(self) -> &'static str {
175 match self {
176 FormatKind::Source => "source format",
177 FormatKind::SinkAccepted => "accepted sink format",
178 FormatKind::SinkRejected => "rejected sink format",
179 }
180 }
181}
182
183fn unsupported_format_error(
184 kind: FormatKind,
185 format: &str,
186 entity_name: Option<&str>,
187) -> ConfigError {
188 if let Some(entity_name) = entity_name {
189 return ConfigError(format!(
190 "entity.name={} {}={} is unsupported",
191 entity_name,
192 kind.field_path(),
193 format
194 ));
195 }
196 ConfigError(format!("unsupported {}: {format}", kind.description()))
197}
198
199pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
200 if input_adapter(format).is_err() {
201 return Err(Box::new(unsupported_format_error(
202 FormatKind::Source,
203 format,
204 Some(entity_name),
205 )));
206 }
207 Ok(())
208}
209
210pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
211 if accepted_sink_adapter(format).is_err() {
212 return Err(Box::new(unsupported_format_error(
213 FormatKind::SinkAccepted,
214 format,
215 Some(entity_name),
216 )));
217 }
218 Ok(())
219}
220
221pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
222 if rejected_sink_adapter(format).is_err() {
223 return Err(Box::new(unsupported_format_error(
224 FormatKind::SinkRejected,
225 format,
226 Some(entity_name),
227 )));
228 }
229 Ok(())
230}
231
232pub fn resolve_read_columns(
233 entity: &config::EntityConfig,
234 normalized_columns: &[config::ColumnConfig],
235 normalize_strategy: Option<&str>,
236) -> FloeResult<Vec<config::ColumnConfig>> {
237 if entity.source.format == "json" || entity.source.format == "xml" {
238 check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
239 } else {
240 Ok(normalized_columns.to_vec())
241 }
242}
243
244pub fn sink_options_warning(
245 entity_name: &str,
246 format: &str,
247 options: Option<&config::SinkOptions>,
248) -> Option<String> {
249 let options = options?;
250 if format == "parquet" {
251 return None;
252 }
253 let mut keys = Vec::new();
254 if options.compression.is_some() {
255 keys.push("compression");
256 }
257 if options.row_group_size.is_some() {
258 keys.push("row_group_size");
259 }
260 if options.max_size_per_file.is_some() {
261 keys.push("max_size_per_file");
262 }
263 let detail = if keys.is_empty() {
264 "options".to_string()
265 } else {
266 keys.join(", ")
267 };
268 Some(format!(
269 "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
270 entity_name, format
271 ))
272}
273
274pub fn validate_sink_options(
275 entity_name: &str,
276 format: &str,
277 options: Option<&config::SinkOptions>,
278) -> FloeResult<()> {
279 let options = match options {
280 Some(options) => options,
281 None => return Ok(()),
282 };
283 if format != "parquet" {
284 return Ok(());
285 }
286 if let Some(compression) = &options.compression {
287 match compression.as_str() {
288 "snappy" | "gzip" | "zstd" | "uncompressed" => {}
289 _ => {
290 return Err(Box::new(ConfigError(format!(
291 "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
292 entity_name, compression
293 ))))
294 }
295 }
296 }
297 if let Some(row_group_size) = options.row_group_size {
298 if row_group_size == 0 {
299 return Err(Box::new(ConfigError(format!(
300 "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
301 entity_name
302 ))));
303 }
304 }
305 if let Some(max_size_per_file) = options.max_size_per_file {
306 if max_size_per_file == 0 {
307 return Err(Box::new(ConfigError(format!(
308 "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
309 entity_name
310 ))));
311 }
312 }
313 Ok(())
314}
315
316pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
317 match format {
318 "csv" => Ok(io::read::csv::csv_input_adapter()),
319 "tsv" => Ok(io::read::csv::tsv_input_adapter()),
320 "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
321 "orc" => Ok(io::read::orc::orc_input_adapter()),
322 "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
323 "json" => Ok(io::read::json::json_input_adapter()),
324 "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
325 "avro" => Ok(io::read::avro::avro_input_adapter()),
326 "xml" => Ok(io::read::xml::xml_input_adapter()),
327 _ => Err(Box::new(unsupported_format_error(
328 FormatKind::Source,
329 format,
330 None,
331 ))),
332 }
333}
334
335pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
336 match format {
337 "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
338 "delta" => Ok(io::write::delta::delta_accepted_adapter()),
339 "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
340 _ => Err(Box::new(unsupported_format_error(
341 FormatKind::SinkAccepted,
342 format,
343 None,
344 ))),
345 }
346}
347
348pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
349 match format {
350 "csv" => Ok(io::write::csv::csv_rejected_adapter()),
351 _ => Err(Box::new(unsupported_format_error(
352 FormatKind::SinkRejected,
353 format,
354 None,
355 ))),
356 }
357}
358
359pub(crate) fn read_input_from_df(
360 input_file: &InputFile,
361 df: &DataFrame,
362 columns: &[config::ColumnConfig],
363 normalize_strategy: Option<&str>,
364 collect_raw: bool,
365) -> FloeResult<ReadInput> {
366 let input_columns = df
367 .get_column_names()
368 .iter()
369 .map(|name| name.to_string())
370 .collect::<Vec<_>>();
371 let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
372 let raw_df = if collect_raw {
373 Some(cast_df_to_string(df)?)
374 } else {
375 None
376 };
377 let typed_df = cast_df_to_schema(df, &typed_schema)?;
378 finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
379}
380
381pub(crate) fn finalize_read_input(
382 input_file: &InputFile,
383 mut raw_df: Option<DataFrame>,
384 mut typed_df: DataFrame,
385 normalize_strategy: Option<&str>,
386) -> FloeResult<ReadInput> {
387 if let Some(strategy) = normalize_strategy {
388 if let Some(raw_df) = raw_df.as_mut() {
389 crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
390 }
391 crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
392 }
393 Ok(ReadInput::Data {
394 input_file: input_file.clone(),
395 raw_df,
396 typed_df,
397 })
398}
399
400pub(crate) fn build_typed_schema(
401 input_columns: &[String],
402 declared_columns: &[config::ColumnConfig],
403 normalize_strategy: Option<&str>,
404) -> FloeResult<Schema> {
405 let mut declared_types = HashMap::new();
406 for column in declared_columns {
407 declared_types.insert(
408 column.name.as_str(),
409 config::parse_data_type(&column.column_type)?,
410 );
411 }
412
413 let mut schema = Schema::with_capacity(input_columns.len());
414 for name in input_columns {
415 let normalized = if let Some(strategy) = normalize_strategy {
416 crate::checks::normalize::normalize_name(name, strategy)
417 } else {
418 name.to_string()
419 };
420 let dtype = declared_types
421 .get(normalized.as_str())
422 .cloned()
423 .unwrap_or(DataType::String);
424 schema.insert(name.as_str().into(), dtype);
425 }
426 Ok(schema)
427}
428
429pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
430 cast_df_with_type(df, &DataType::String)
431}
432
433pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
434 let mut columns = Vec::with_capacity(schema.len());
435 for (name, dtype) in schema.iter() {
436 let series = df.column(name.as_str()).map_err(|err| {
437 Box::new(ConfigError(format!(
438 "input column {} not found: {err}",
439 name.as_str()
440 )))
441 })?;
442 let casted =
443 if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
444 cast_string_to_bool(name.as_str(), series)?
445 } else {
446 series
447 .cast_with_options(dtype, CastOptions::NonStrict)
448 .map_err(|err| {
449 Box::new(ConfigError(format!(
450 "failed to cast input column {}: {err}",
451 name.as_str()
452 )))
453 })?
454 };
455 columns.push(casted);
456 }
457 DataFrame::new(columns).map_err(|err| {
458 Box::new(ConfigError(format!(
459 "failed to build typed dataframe: {err}"
460 ))) as Box<dyn std::error::Error + Send + Sync>
461 })
462}
463
464fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
465 let string_values = series.as_materialized_series().str().map_err(|err| {
466 Box::new(ConfigError(format!(
467 "failed to read boolean column {} as string: {err}",
468 name
469 )))
470 })?;
471 let mut values = Vec::with_capacity(series.len());
472 for value in string_values {
473 let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
474 "true" | "1" => Some(true),
475 "false" | "0" => Some(false),
476 _ => None,
477 });
478 values.push(parsed);
479 }
480 Ok(Series::new(name.into(), values).into())
481}
482
483fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
484 let mut out = df.clone();
485 let names = out
486 .get_column_names()
487 .iter()
488 .map(|name| name.to_string())
489 .collect::<Vec<_>>();
490 for name in names {
491 let series = out.column(&name).map_err(|err| {
492 Box::new(ConfigError(format!(
493 "input column {} not found: {err}",
494 name
495 )))
496 })?;
497 let casted = series
498 .cast_with_options(dtype, CastOptions::NonStrict)
499 .map_err(|err| {
500 Box::new(ConfigError(format!(
501 "failed to cast input column {}: {err}",
502 name
503 )))
504 })?;
505 let idx = out.get_column_index(&name).ok_or_else(|| {
506 Box::new(ConfigError(format!(
507 "input column {} not found for update",
508 name
509 )))
510 })?;
511 out.replace_column(idx, casted).map_err(|err| {
512 Box::new(ConfigError(format!(
513 "failed to update input column {}: {err}",
514 name
515 )))
516 })?;
517 }
518 Ok(out)
519}
520pub fn collect_row_errors(
521 raw_df: &DataFrame,
522 typed_df: &DataFrame,
523 required_cols: &[String],
524 columns: &[config::ColumnConfig],
525 track_cast_errors: bool,
526 raw_indices: &check::ColumnIndex,
527 typed_indices: &check::ColumnIndex,
528) -> FloeResult<Vec<Vec<check::RowError>>> {
529 let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
530 if track_cast_errors {
531 let cast_errors =
532 check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
533 for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
534 errors.extend(cast);
535 }
536 }
537 Ok(error_lists)
538}