1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12 pub source_uri: String,
13 pub source_local_path: PathBuf,
14 pub source_name: String,
15 pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20 pub rule: String,
21 pub message: String,
22}
23
24pub enum ReadInput {
25 Data {
26 input_file: InputFile,
27 raw_df: Option<DataFrame>,
28 typed_df: DataFrame,
29 },
30 FileError {
31 input_file: InputFile,
32 error: FileReadError,
33 },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteMetrics {
38 pub total_bytes_written: Option<u64>,
39 pub avg_file_size_mb: Option<f64>,
40 pub small_files_count: Option<u64>,
41}
42
43#[derive(Debug, Clone, Default)]
44pub struct AcceptedSchemaEvolution {
45 pub enabled: bool,
46 pub mode: String,
47 pub applied: bool,
48 pub added_columns: Vec<String>,
49 pub incompatible_changes_detected: bool,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct AcceptedWritePerfBreakdown {
54 pub conversion_ms: Option<u64>,
55 pub source_df_build_ms: Option<u64>,
56 pub merge_exec_ms: Option<u64>,
57 pub data_write_ms: Option<u64>,
58 pub commit_ms: Option<u64>,
59 pub metrics_read_ms: Option<u64>,
60}
61
62#[derive(Debug, Clone)]
63pub struct AcceptedMergeMetrics {
64 pub merge_key: Vec<String>,
65 pub inserted_count: u64,
66 pub updated_count: u64,
67 pub closed_count: Option<u64>,
68 pub unchanged_count: Option<u64>,
69 pub target_rows_before: u64,
70 pub target_rows_after: u64,
71 pub merge_elapsed_ms: u64,
72}
73
74#[derive(Debug, Clone)]
75pub struct AcceptedWriteOutput {
76 pub files_written: Option<u64>,
77 pub parts_written: u64,
78 pub part_files: Vec<String>,
79 pub table_version: Option<i64>,
80 pub snapshot_id: Option<i64>,
81 pub table_root_uri: Option<String>,
82 pub iceberg_catalog_name: Option<String>,
83 pub iceberg_database: Option<String>,
84 pub iceberg_namespace: Option<String>,
85 pub iceberg_table: Option<String>,
86 pub metrics: AcceptedWriteMetrics,
87 pub merge: Option<AcceptedMergeMetrics>,
88 pub schema_evolution: AcceptedSchemaEvolution,
89 pub perf: Option<AcceptedWritePerfBreakdown>,
90}
91
92pub trait InputAdapter: Send + Sync {
93 fn format(&self) -> &'static str;
94
95 fn default_globs(&self) -> FloeResult<Vec<String>> {
96 io::storage::extensions::glob_patterns_for_format(self.format())
97 }
98
99 fn suffixes(&self) -> FloeResult<Vec<String>> {
100 io::storage::extensions::suffixes_for_format(self.format())
101 }
102
103 fn resolve_local_inputs(
104 &self,
105 config_dir: &Path,
106 entity_name: &str,
107 source: &config::SourceConfig,
108 storage: &str,
109 ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
110 let default_globs = self.default_globs()?;
111 io::storage::local::resolve_local_inputs(
112 config_dir,
113 entity_name,
114 source,
115 storage,
116 &default_globs,
117 )
118 }
119
120 fn read_input_columns(
121 &self,
122 entity: &config::EntityConfig,
123 input_file: &InputFile,
124 columns: &[config::ColumnConfig],
125 ) -> Result<Vec<String>, FileReadError>;
126
127 fn read_inputs(
128 &self,
129 entity: &config::EntityConfig,
130 files: &[InputFile],
131 columns: &[config::ColumnConfig],
132 normalize_strategy: Option<&str>,
133 collect_raw: bool,
134 ) -> FloeResult<Vec<ReadInput>>;
135
136 fn read_inputs_with_prechecked_columns(
137 &self,
138 entity: &config::EntityConfig,
139 files: &[InputFile],
140 columns: &[config::ColumnConfig],
141 normalize_strategy: Option<&str>,
142 collect_raw: bool,
143 prechecked_input_columns: Option<&[String]>,
144 ) -> FloeResult<Vec<ReadInput>> {
145 let _ = prechecked_input_columns;
146 self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
147 }
148}
149
150pub trait AcceptedSinkAdapter: Send + Sync {
151 #[allow(clippy::too_many_arguments)]
152 fn write_accepted(
153 &self,
154 target: &Target,
155 df: &mut DataFrame,
156 mode: config::WriteMode,
157 output_stem: &str,
158 temp_dir: Option<&Path>,
159 cloud: &mut io::storage::CloudClient,
160 resolver: &config::StorageResolver,
161 catalogs: &config::CatalogResolver,
162 entity: &config::EntityConfig,
163 ) -> FloeResult<AcceptedWriteOutput>;
164}
165
166pub struct RejectedWriteRequest<'a> {
167 pub target: &'a Target,
168 pub df: &'a mut DataFrame,
169 pub source_stem: &'a str,
170 pub temp_dir: Option<&'a Path>,
171 pub cloud: &'a mut io::storage::CloudClient,
172 pub resolver: &'a config::StorageResolver,
173 pub entity: &'a config::EntityConfig,
174 pub mode: config::WriteMode,
175}
176
177pub trait RejectedSinkAdapter: Send + Sync {
178 fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
179}
180
181#[derive(Debug, Clone, Copy)]
182pub enum FormatKind {
183 Source,
184 SinkAccepted,
185 SinkRejected,
186}
187
188impl FormatKind {
189 fn field_path(self) -> &'static str {
190 match self {
191 FormatKind::Source => "source.format",
192 FormatKind::SinkAccepted => "sink.accepted.format",
193 FormatKind::SinkRejected => "sink.rejected.format",
194 }
195 }
196
197 fn description(self) -> &'static str {
198 match self {
199 FormatKind::Source => "source format",
200 FormatKind::SinkAccepted => "accepted sink format",
201 FormatKind::SinkRejected => "rejected sink format",
202 }
203 }
204}
205
206fn unsupported_format_error(
207 kind: FormatKind,
208 format: &str,
209 entity_name: Option<&str>,
210) -> ConfigError {
211 if let Some(entity_name) = entity_name {
212 return ConfigError(format!(
213 "entity.name={} {}={} is unsupported",
214 entity_name,
215 kind.field_path(),
216 format
217 ));
218 }
219 ConfigError(format!("unsupported {}: {format}", kind.description()))
220}
221
222pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
223 if input_adapter(format).is_err() {
224 return Err(Box::new(unsupported_format_error(
225 FormatKind::Source,
226 format,
227 Some(entity_name),
228 )));
229 }
230 Ok(())
231}
232
233pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
234 if accepted_sink_adapter(format).is_err() {
235 return Err(Box::new(unsupported_format_error(
236 FormatKind::SinkAccepted,
237 format,
238 Some(entity_name),
239 )));
240 }
241 Ok(())
242}
243
244pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
245 if rejected_sink_adapter(format).is_err() {
246 return Err(Box::new(unsupported_format_error(
247 FormatKind::SinkRejected,
248 format,
249 Some(entity_name),
250 )));
251 }
252 Ok(())
253}
254
255pub fn resolve_read_columns(
256 entity: &config::EntityConfig,
257 normalized_columns: &[config::ColumnConfig],
258 normalize_strategy: Option<&str>,
259) -> FloeResult<Vec<config::ColumnConfig>> {
260 if entity.source.format == "json" || entity.source.format == "xml" {
261 check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
262 } else {
263 Ok(normalized_columns.to_vec())
264 }
265}
266
267pub fn sink_options_warning(
268 entity_name: &str,
269 format: &str,
270 options: Option<&config::SinkOptions>,
271) -> Option<String> {
272 let options = options?;
273 if format == "parquet" {
274 return None;
275 }
276 let mut keys = Vec::new();
277 if options.compression.is_some() {
278 keys.push("compression");
279 }
280 if options.row_group_size.is_some() {
281 keys.push("row_group_size");
282 }
283 if options.max_size_per_file.is_some() {
284 keys.push("max_size_per_file");
285 }
286 let detail = if keys.is_empty() {
287 "options".to_string()
288 } else {
289 keys.join(", ")
290 };
291 Some(format!(
292 "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
293 entity_name, format
294 ))
295}
296
297pub fn validate_sink_options(
298 entity_name: &str,
299 format: &str,
300 options: Option<&config::SinkOptions>,
301) -> FloeResult<()> {
302 let options = match options {
303 Some(options) => options,
304 None => return Ok(()),
305 };
306 if format != "parquet" {
307 return Ok(());
308 }
309 if let Some(compression) = &options.compression {
310 match compression.as_str() {
311 "snappy" | "gzip" | "zstd" | "uncompressed" => {}
312 _ => {
313 return Err(Box::new(ConfigError(format!(
314 "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
315 entity_name, compression
316 ))))
317 }
318 }
319 }
320 if let Some(row_group_size) = options.row_group_size {
321 if row_group_size == 0 {
322 return Err(Box::new(ConfigError(format!(
323 "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
324 entity_name
325 ))));
326 }
327 }
328 if let Some(max_size_per_file) = options.max_size_per_file {
329 if max_size_per_file == 0 {
330 return Err(Box::new(ConfigError(format!(
331 "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
332 entity_name
333 ))));
334 }
335 }
336 Ok(())
337}
338
339pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
340 match format {
341 "csv" => Ok(io::read::csv::csv_input_adapter()),
342 "tsv" => Ok(io::read::csv::tsv_input_adapter()),
343 "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
344 "orc" => Ok(io::read::orc::orc_input_adapter()),
345 "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
346 "json" => Ok(io::read::json::json_input_adapter()),
347 "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
348 "avro" => Ok(io::read::avro::avro_input_adapter()),
349 "xml" => Ok(io::read::xml::xml_input_adapter()),
350 _ => Err(Box::new(unsupported_format_error(
351 FormatKind::Source,
352 format,
353 None,
354 ))),
355 }
356}
357
358pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
359 match format {
360 "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
361 "delta" => Ok(io::write::delta::delta_accepted_adapter()),
362 "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
363 _ => Err(Box::new(unsupported_format_error(
364 FormatKind::SinkAccepted,
365 format,
366 None,
367 ))),
368 }
369}
370
371pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
372 match format {
373 "csv" => Ok(io::write::csv::csv_rejected_adapter()),
374 _ => Err(Box::new(unsupported_format_error(
375 FormatKind::SinkRejected,
376 format,
377 None,
378 ))),
379 }
380}
381
382pub(crate) fn read_input_from_df(
383 input_file: &InputFile,
384 df: &DataFrame,
385 columns: &[config::ColumnConfig],
386 normalize_strategy: Option<&str>,
387 collect_raw: bool,
388) -> FloeResult<ReadInput> {
389 let input_columns = df
390 .get_column_names()
391 .iter()
392 .map(|name| name.to_string())
393 .collect::<Vec<_>>();
394 let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
395 let raw_df = if collect_raw {
396 Some(cast_df_to_string(df)?)
397 } else {
398 None
399 };
400 let typed_df = cast_df_to_schema(df, &typed_schema)?;
401 finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
402}
403
404pub(crate) fn finalize_read_input(
405 input_file: &InputFile,
406 mut raw_df: Option<DataFrame>,
407 mut typed_df: DataFrame,
408 normalize_strategy: Option<&str>,
409) -> FloeResult<ReadInput> {
410 if let Some(strategy) = normalize_strategy {
411 if let Some(raw_df) = raw_df.as_mut() {
412 crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
413 }
414 crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
415 }
416 Ok(ReadInput::Data {
417 input_file: input_file.clone(),
418 raw_df,
419 typed_df,
420 })
421}
422
423pub(crate) fn build_typed_schema(
424 input_columns: &[String],
425 declared_columns: &[config::ColumnConfig],
426 normalize_strategy: Option<&str>,
427) -> FloeResult<Schema> {
428 let mut declared_types = HashMap::new();
429 for column in declared_columns {
430 declared_types.insert(
431 column.name.as_str(),
432 config::parse_data_type(&column.column_type)?,
433 );
434 }
435
436 let mut schema = Schema::with_capacity(input_columns.len());
437 for name in input_columns {
438 let normalized = if let Some(strategy) = normalize_strategy {
439 crate::checks::normalize::normalize_name(name, strategy)
440 } else {
441 name.to_string()
442 };
443 let dtype = declared_types
444 .get(normalized.as_str())
445 .cloned()
446 .unwrap_or(DataType::String);
447 schema.insert(name.as_str().into(), dtype);
448 }
449 Ok(schema)
450}
451
452pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
453 cast_df_with_type(df, &DataType::String)
454}
455
456pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
457 let mut columns = Vec::with_capacity(schema.len());
458 for (name, dtype) in schema.iter() {
459 let series = df.column(name.as_str()).map_err(|err| {
460 Box::new(ConfigError(format!(
461 "input column {} not found: {err}",
462 name.as_str()
463 )))
464 })?;
465 let casted =
466 if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
467 cast_string_to_bool(name.as_str(), series)?
468 } else {
469 series
470 .cast_with_options(dtype, CastOptions::NonStrict)
471 .map_err(|err| {
472 Box::new(ConfigError(format!(
473 "failed to cast input column {}: {err}",
474 name.as_str()
475 )))
476 })?
477 };
478 columns.push(casted);
479 }
480 DataFrame::new(columns).map_err(|err| {
481 Box::new(ConfigError(format!(
482 "failed to build typed dataframe: {err}"
483 ))) as Box<dyn std::error::Error + Send + Sync>
484 })
485}
486
487fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
488 let string_values = series.as_materialized_series().str().map_err(|err| {
489 Box::new(ConfigError(format!(
490 "failed to read boolean column {} as string: {err}",
491 name
492 )))
493 })?;
494 let mut values = Vec::with_capacity(series.len());
495 for value in string_values {
496 let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
497 "true" | "1" => Some(true),
498 "false" | "0" => Some(false),
499 _ => None,
500 });
501 values.push(parsed);
502 }
503 Ok(Series::new(name.into(), values).into())
504}
505
506fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
507 let mut out = df.clone();
508 let names = out
509 .get_column_names()
510 .iter()
511 .map(|name| name.to_string())
512 .collect::<Vec<_>>();
513 for name in names {
514 let series = out.column(&name).map_err(|err| {
515 Box::new(ConfigError(format!(
516 "input column {} not found: {err}",
517 name
518 )))
519 })?;
520 let casted = series
521 .cast_with_options(dtype, CastOptions::NonStrict)
522 .map_err(|err| {
523 Box::new(ConfigError(format!(
524 "failed to cast input column {}: {err}",
525 name
526 )))
527 })?;
528 let idx = out.get_column_index(&name).ok_or_else(|| {
529 Box::new(ConfigError(format!(
530 "input column {} not found for update",
531 name
532 )))
533 })?;
534 out.replace_column(idx, casted).map_err(|err| {
535 Box::new(ConfigError(format!(
536 "failed to update input column {}: {err}",
537 name
538 )))
539 })?;
540 }
541 Ok(out)
542}
543pub fn collect_row_errors(
544 raw_df: &DataFrame,
545 typed_df: &DataFrame,
546 required_cols: &[String],
547 columns: &[config::ColumnConfig],
548 track_cast_errors: bool,
549 raw_indices: &check::ColumnIndex,
550 typed_indices: &check::ColumnIndex,
551) -> FloeResult<Vec<Vec<check::RowError>>> {
552 let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
553 if track_cast_errors {
554 let cast_errors =
555 check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
556 for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
557 errors.extend(cast);
558 }
559 }
560 Ok(error_lists)
561}