1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12 pub source_uri: String,
13 pub source_local_path: PathBuf,
14 pub source_name: String,
15 pub source_stem: String,
16 pub source_size: Option<u64>,
17 pub source_mtime: Option<String>,
18}
19
20#[derive(Debug, Clone)]
21pub struct FileReadError {
22 pub rule: String,
23 pub message: String,
24}
25
26pub enum ReadInput {
27 Data {
28 input_file: InputFile,
29 raw_df: Option<DataFrame>,
30 typed_df: DataFrame,
31 },
32 FileError {
33 input_file: InputFile,
34 error: FileReadError,
35 },
36}
37
38#[derive(Debug, Clone)]
39pub struct AcceptedWriteMetrics {
40 pub total_bytes_written: Option<u64>,
41 pub avg_file_size_mb: Option<f64>,
42 pub small_files_count: Option<u64>,
43}
44
45#[derive(Debug, Clone, Default)]
46pub struct AcceptedSchemaEvolution {
47 pub enabled: bool,
48 pub mode: String,
49 pub applied: bool,
50 pub added_columns: Vec<String>,
51 pub incompatible_changes_detected: bool,
52}
53
54#[derive(Debug, Clone, Default)]
55pub struct AcceptedWritePerfBreakdown {
56 pub conversion_ms: Option<u64>,
57 pub source_df_build_ms: Option<u64>,
58 pub merge_exec_ms: Option<u64>,
59 pub data_write_ms: Option<u64>,
60 pub commit_ms: Option<u64>,
61 pub metrics_read_ms: Option<u64>,
62}
63
64#[derive(Debug, Clone)]
65pub struct AcceptedMergeMetrics {
66 pub merge_key: Vec<String>,
67 pub inserted_count: u64,
68 pub updated_count: u64,
69 pub closed_count: Option<u64>,
70 pub unchanged_count: Option<u64>,
71 pub target_rows_before: u64,
72 pub target_rows_after: u64,
73 pub merge_elapsed_ms: u64,
74}
75
76#[derive(Debug, Clone)]
77pub struct AcceptedWriteOutput {
78 pub files_written: Option<u64>,
79 pub parts_written: u64,
80 pub part_files: Vec<String>,
81 pub table_version: Option<i64>,
82 pub snapshot_id: Option<i64>,
83 pub table_root_uri: Option<String>,
84 pub iceberg_catalog_name: Option<String>,
85 pub iceberg_database: Option<String>,
86 pub iceberg_namespace: Option<String>,
87 pub iceberg_table: Option<String>,
88 pub metrics: AcceptedWriteMetrics,
89 pub merge: Option<AcceptedMergeMetrics>,
90 pub schema_evolution: AcceptedSchemaEvolution,
91 pub perf: Option<AcceptedWritePerfBreakdown>,
92}
93
94pub trait InputAdapter: Send + Sync {
95 fn format(&self) -> &'static str;
96
97 fn default_globs(&self) -> FloeResult<Vec<String>> {
98 io::storage::extensions::glob_patterns_for_format(self.format())
99 }
100
101 fn suffixes(&self) -> FloeResult<Vec<String>> {
102 io::storage::extensions::suffixes_for_format(self.format())
103 }
104
105 fn resolve_local_inputs(
106 &self,
107 config_dir: &Path,
108 entity_name: &str,
109 source: &config::SourceConfig,
110 storage: &str,
111 ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
112 let default_globs = self.default_globs()?;
113 io::storage::local::resolve_local_inputs(
114 config_dir,
115 entity_name,
116 source,
117 storage,
118 &default_globs,
119 )
120 }
121
122 fn read_input_columns(
123 &self,
124 entity: &config::EntityConfig,
125 input_file: &InputFile,
126 columns: &[config::ColumnConfig],
127 ) -> Result<Vec<String>, FileReadError>;
128
129 fn read_inputs(
130 &self,
131 entity: &config::EntityConfig,
132 files: &[InputFile],
133 columns: &[config::ColumnConfig],
134 normalize_strategy: Option<&str>,
135 collect_raw: bool,
136 ) -> FloeResult<Vec<ReadInput>>;
137
138 fn read_inputs_with_prechecked_columns(
139 &self,
140 entity: &config::EntityConfig,
141 files: &[InputFile],
142 columns: &[config::ColumnConfig],
143 normalize_strategy: Option<&str>,
144 collect_raw: bool,
145 prechecked_input_columns: Option<&[String]>,
146 ) -> FloeResult<Vec<ReadInput>> {
147 let _ = prechecked_input_columns;
148 self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
149 }
150}
151
152pub trait AcceptedSinkAdapter: Send + Sync {
153 #[allow(clippy::too_many_arguments)]
154 fn write_accepted(
155 &self,
156 target: &Target,
157 df: &mut DataFrame,
158 mode: config::WriteMode,
159 output_stem: &str,
160 temp_dir: Option<&Path>,
161 cloud: &mut io::storage::CloudClient,
162 resolver: &config::StorageResolver,
163 catalogs: &config::CatalogResolver,
164 entity: &config::EntityConfig,
165 ) -> FloeResult<AcceptedWriteOutput>;
166}
167
168pub struct RejectedWriteRequest<'a> {
169 pub target: &'a Target,
170 pub df: &'a mut DataFrame,
171 pub source_stem: &'a str,
172 pub temp_dir: Option<&'a Path>,
173 pub cloud: &'a mut io::storage::CloudClient,
174 pub resolver: &'a config::StorageResolver,
175 pub entity: &'a config::EntityConfig,
176 pub mode: config::WriteMode,
177}
178
179pub trait RejectedSinkAdapter: Send + Sync {
180 fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
181}
182
183#[derive(Debug, Clone, Copy)]
184pub enum FormatKind {
185 Source,
186 SinkAccepted,
187 SinkRejected,
188}
189
190impl FormatKind {
191 fn field_path(self) -> &'static str {
192 match self {
193 FormatKind::Source => "source.format",
194 FormatKind::SinkAccepted => "sink.accepted.format",
195 FormatKind::SinkRejected => "sink.rejected.format",
196 }
197 }
198
199 fn description(self) -> &'static str {
200 match self {
201 FormatKind::Source => "source format",
202 FormatKind::SinkAccepted => "accepted sink format",
203 FormatKind::SinkRejected => "rejected sink format",
204 }
205 }
206}
207
208fn unsupported_format_error(
209 kind: FormatKind,
210 format: &str,
211 entity_name: Option<&str>,
212) -> ConfigError {
213 if let Some(entity_name) = entity_name {
214 return ConfigError(format!(
215 "entity.name={} {}={} is unsupported",
216 entity_name,
217 kind.field_path(),
218 format
219 ));
220 }
221 ConfigError(format!("unsupported {}: {format}", kind.description()))
222}
223
224pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
225 if input_adapter(format).is_err() {
226 return Err(Box::new(unsupported_format_error(
227 FormatKind::Source,
228 format,
229 Some(entity_name),
230 )));
231 }
232 Ok(())
233}
234
235pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
236 if accepted_sink_adapter(format).is_err() {
237 return Err(Box::new(unsupported_format_error(
238 FormatKind::SinkAccepted,
239 format,
240 Some(entity_name),
241 )));
242 }
243 Ok(())
244}
245
246pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
247 if rejected_sink_adapter(format).is_err() {
248 return Err(Box::new(unsupported_format_error(
249 FormatKind::SinkRejected,
250 format,
251 Some(entity_name),
252 )));
253 }
254 Ok(())
255}
256
257pub fn resolve_read_columns(
258 entity: &config::EntityConfig,
259 normalized_columns: &[config::ColumnConfig],
260 normalize_strategy: Option<&str>,
261) -> FloeResult<Vec<config::ColumnConfig>> {
262 if entity.source.format == "json" || entity.source.format == "xml" {
263 check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
264 } else {
265 Ok(normalized_columns.to_vec())
266 }
267}
268
269pub fn sink_options_warning(
270 entity_name: &str,
271 format: &str,
272 options: Option<&config::SinkOptions>,
273) -> Option<String> {
274 let options = options?;
275 if format == "parquet" {
276 return None;
277 }
278 let mut keys = Vec::new();
279 if options.compression.is_some() {
280 keys.push("compression");
281 }
282 if options.row_group_size.is_some() {
283 keys.push("row_group_size");
284 }
285 if options.max_size_per_file.is_some() {
286 keys.push("max_size_per_file");
287 }
288 let detail = if keys.is_empty() {
289 "options".to_string()
290 } else {
291 keys.join(", ")
292 };
293 Some(format!(
294 "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
295 entity_name, format
296 ))
297}
298
299pub fn validate_sink_options(
300 entity_name: &str,
301 format: &str,
302 options: Option<&config::SinkOptions>,
303) -> FloeResult<()> {
304 let options = match options {
305 Some(options) => options,
306 None => return Ok(()),
307 };
308 if format != "parquet" {
309 return Ok(());
310 }
311 if let Some(compression) = &options.compression {
312 match compression.as_str() {
313 "snappy" | "gzip" | "zstd" | "uncompressed" => {}
314 _ => {
315 return Err(Box::new(ConfigError(format!(
316 "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
317 entity_name, compression
318 ))))
319 }
320 }
321 }
322 if let Some(row_group_size) = options.row_group_size {
323 if row_group_size == 0 {
324 return Err(Box::new(ConfigError(format!(
325 "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
326 entity_name
327 ))));
328 }
329 }
330 if let Some(max_size_per_file) = options.max_size_per_file {
331 if max_size_per_file == 0 {
332 return Err(Box::new(ConfigError(format!(
333 "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
334 entity_name
335 ))));
336 }
337 }
338 Ok(())
339}
340
341pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
342 match format {
343 "csv" => Ok(io::read::csv::csv_input_adapter()),
344 "tsv" => Ok(io::read::csv::tsv_input_adapter()),
345 "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
346 "orc" => Ok(io::read::orc::orc_input_adapter()),
347 "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
348 "json" => Ok(io::read::json::json_input_adapter()),
349 "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
350 "avro" => Ok(io::read::avro::avro_input_adapter()),
351 "xml" => Ok(io::read::xml::xml_input_adapter()),
352 _ => Err(Box::new(unsupported_format_error(
353 FormatKind::Source,
354 format,
355 None,
356 ))),
357 }
358}
359
360pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
361 match format {
362 "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
363 "delta" => Ok(io::write::delta::delta_accepted_adapter()),
364 "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
365 _ => Err(Box::new(unsupported_format_error(
366 FormatKind::SinkAccepted,
367 format,
368 None,
369 ))),
370 }
371}
372
373pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
374 match format {
375 "csv" => Ok(io::write::csv::csv_rejected_adapter()),
376 _ => Err(Box::new(unsupported_format_error(
377 FormatKind::SinkRejected,
378 format,
379 None,
380 ))),
381 }
382}
383
384pub(crate) fn read_input_from_df(
385 input_file: &InputFile,
386 df: &DataFrame,
387 columns: &[config::ColumnConfig],
388 normalize_strategy: Option<&str>,
389 collect_raw: bool,
390) -> FloeResult<ReadInput> {
391 let input_columns = df
392 .get_column_names()
393 .iter()
394 .map(|name| name.to_string())
395 .collect::<Vec<_>>();
396 let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
397 let raw_df = if collect_raw {
398 Some(cast_df_to_string(df)?)
399 } else {
400 None
401 };
402 let typed_df = cast_df_to_schema(df, &typed_schema)?;
403 finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
404}
405
406pub(crate) fn finalize_read_input(
407 input_file: &InputFile,
408 mut raw_df: Option<DataFrame>,
409 mut typed_df: DataFrame,
410 normalize_strategy: Option<&str>,
411) -> FloeResult<ReadInput> {
412 if let Some(strategy) = normalize_strategy {
413 if let Some(raw_df) = raw_df.as_mut() {
414 crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
415 }
416 crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
417 }
418 Ok(ReadInput::Data {
419 input_file: input_file.clone(),
420 raw_df,
421 typed_df,
422 })
423}
424
425pub(crate) fn build_typed_schema(
426 input_columns: &[String],
427 declared_columns: &[config::ColumnConfig],
428 normalize_strategy: Option<&str>,
429) -> FloeResult<Schema> {
430 let mut declared_types = HashMap::new();
431 for column in declared_columns {
432 declared_types.insert(
433 column.name.as_str(),
434 config::parse_data_type(&column.column_type)?,
435 );
436 }
437
438 let mut schema = Schema::with_capacity(input_columns.len());
439 for name in input_columns {
440 let normalized = if let Some(strategy) = normalize_strategy {
441 crate::checks::normalize::normalize_name(name, strategy)
442 } else {
443 name.to_string()
444 };
445 let dtype = declared_types
446 .get(normalized.as_str())
447 .cloned()
448 .unwrap_or(DataType::String);
449 schema.insert(name.as_str().into(), dtype);
450 }
451 Ok(schema)
452}
453
454pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
455 cast_df_with_type(df, &DataType::String)
456}
457
458pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
459 let mut columns = Vec::with_capacity(schema.len());
460 for (name, dtype) in schema.iter() {
461 let series = df.column(name.as_str()).map_err(|err| {
462 Box::new(ConfigError(format!(
463 "input column {} not found: {err}",
464 name.as_str()
465 )))
466 })?;
467 let casted =
468 if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
469 cast_string_to_bool(name.as_str(), series)?
470 } else {
471 series
472 .cast_with_options(dtype, CastOptions::NonStrict)
473 .map_err(|err| {
474 Box::new(ConfigError(format!(
475 "failed to cast input column {}: {err}",
476 name.as_str()
477 )))
478 })?
479 };
480 columns.push(casted);
481 }
482 DataFrame::new(columns).map_err(|err| {
483 Box::new(ConfigError(format!(
484 "failed to build typed dataframe: {err}"
485 ))) as Box<dyn std::error::Error + Send + Sync>
486 })
487}
488
489fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
490 let string_values = series.as_materialized_series().str().map_err(|err| {
491 Box::new(ConfigError(format!(
492 "failed to read boolean column {} as string: {err}",
493 name
494 )))
495 })?;
496 let mut values = Vec::with_capacity(series.len());
497 for value in string_values {
498 let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
499 "true" | "1" => Some(true),
500 "false" | "0" => Some(false),
501 _ => None,
502 });
503 values.push(parsed);
504 }
505 Ok(Series::new(name.into(), values).into())
506}
507
508fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
509 let mut out = df.clone();
510 let names = out
511 .get_column_names()
512 .iter()
513 .map(|name| name.to_string())
514 .collect::<Vec<_>>();
515 for name in names {
516 let series = out.column(&name).map_err(|err| {
517 Box::new(ConfigError(format!(
518 "input column {} not found: {err}",
519 name
520 )))
521 })?;
522 let casted = series
523 .cast_with_options(dtype, CastOptions::NonStrict)
524 .map_err(|err| {
525 Box::new(ConfigError(format!(
526 "failed to cast input column {}: {err}",
527 name
528 )))
529 })?;
530 let idx = out.get_column_index(&name).ok_or_else(|| {
531 Box::new(ConfigError(format!(
532 "input column {} not found for update",
533 name
534 )))
535 })?;
536 out.replace_column(idx, casted).map_err(|err| {
537 Box::new(ConfigError(format!(
538 "failed to update input column {}: {err}",
539 name
540 )))
541 })?;
542 }
543 Ok(out)
544}
545pub fn collect_row_errors(
546 raw_df: &DataFrame,
547 typed_df: &DataFrame,
548 required_cols: &[String],
549 columns: &[config::ColumnConfig],
550 track_cast_errors: bool,
551 raw_indices: &check::ColumnIndex,
552 typed_indices: &check::ColumnIndex,
553) -> FloeResult<Vec<Vec<check::RowError>>> {
554 let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
555 if track_cast_errors {
556 let cast_errors =
557 check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
558 for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
559 errors.extend(cast);
560 }
561 }
562 Ok(error_lists)
563}