1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12 pub source_uri: String,
13 pub source_name: String,
14 pub source_stem: String,
15 pub source_size: Option<u64>,
16 pub source_mtime: Option<String>,
17}
18
19#[derive(Debug, Clone)]
25pub struct LocalInputFile {
26 pub file: InputFile,
27 pub local_path: PathBuf,
28 pub is_ephemeral: bool,
29}
30
31#[derive(Debug, Clone)]
32pub struct FileReadError {
33 pub rule: String,
34 pub message: String,
35}
36
37pub enum ReadInput {
38 Data {
39 input_file: InputFile,
40 raw_df: Option<DataFrame>,
41 typed_df: DataFrame,
42 },
43 FileError {
44 input_file: InputFile,
45 error: FileReadError,
46 },
47}
48
49#[derive(Debug, Clone)]
50pub struct AcceptedWriteMetrics {
51 pub total_bytes_written: Option<u64>,
52 pub avg_file_size_mb: Option<f64>,
53 pub small_files_count: Option<u64>,
54}
55
56#[derive(Debug, Clone, Default)]
57pub struct AcceptedSchemaEvolution {
58 pub enabled: bool,
59 pub mode: String,
60 pub applied: bool,
61 pub added_columns: Vec<String>,
62 pub incompatible_changes_detected: bool,
63}
64
65#[derive(Debug, Clone, Default)]
66pub struct AcceptedWritePerfBreakdown {
67 pub conversion_ms: Option<u64>,
68 pub source_df_build_ms: Option<u64>,
69 pub merge_exec_ms: Option<u64>,
70 pub data_write_ms: Option<u64>,
71 pub commit_ms: Option<u64>,
72 pub metrics_read_ms: Option<u64>,
73}
74
75#[derive(Debug, Clone)]
76pub struct AcceptedMergeMetrics {
77 pub merge_key: Vec<String>,
78 pub inserted_count: u64,
79 pub updated_count: u64,
80 pub closed_count: Option<u64>,
81 pub unchanged_count: Option<u64>,
82 pub target_rows_before: u64,
83 pub target_rows_after: u64,
84 pub merge_elapsed_ms: u64,
85}
86
87#[derive(Debug, Clone)]
88pub struct AcceptedWriteOutput {
89 pub files_written: Option<u64>,
90 pub parts_written: u64,
91 pub part_files: Vec<String>,
92 pub table_version: Option<i64>,
93 pub snapshot_id: Option<i64>,
94 pub table_root_uri: Option<String>,
95 pub iceberg_catalog_name: Option<String>,
96 pub iceberg_database: Option<String>,
97 pub iceberg_namespace: Option<String>,
98 pub iceberg_table: Option<String>,
99 pub delta_catalog_name: Option<String>,
100 pub delta_catalog_schema: Option<String>,
101 pub delta_catalog_table: Option<String>,
102 pub metrics: AcceptedWriteMetrics,
103 pub merge: Option<AcceptedMergeMetrics>,
104 pub schema_evolution: AcceptedSchemaEvolution,
105 pub perf: Option<AcceptedWritePerfBreakdown>,
106}
107
108pub trait InputAdapter: Send + Sync {
109 fn format(&self) -> &'static str;
110
111 fn default_globs(&self) -> FloeResult<Vec<String>> {
112 io::storage::extensions::glob_patterns_for_format(self.format())
113 }
114
115 fn suffixes(&self) -> FloeResult<Vec<String>> {
116 io::storage::extensions::suffixes_for_format(self.format())
117 }
118
119 fn resolve_local_inputs(
120 &self,
121 config_dir: &Path,
122 entity_name: &str,
123 source: &config::SourceConfig,
124 storage: &str,
125 ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
126 let default_globs = self.default_globs()?;
127 io::storage::local::resolve_local_inputs(
128 config_dir,
129 entity_name,
130 source,
131 storage,
132 &default_globs,
133 )
134 }
135
136 fn read_input_columns(
137 &self,
138 entity: &config::EntityConfig,
139 input_file: &LocalInputFile,
140 columns: &[config::ColumnConfig],
141 ) -> Result<Vec<String>, FileReadError>;
142
143 fn read_inputs(
144 &self,
145 entity: &config::EntityConfig,
146 files: &[LocalInputFile],
147 columns: &[config::ColumnConfig],
148 normalize_strategy: Option<&str>,
149 collect_raw: bool,
150 ) -> FloeResult<Vec<ReadInput>>;
151
152 fn read_inputs_with_prechecked_columns(
153 &self,
154 entity: &config::EntityConfig,
155 files: &[LocalInputFile],
156 columns: &[config::ColumnConfig],
157 normalize_strategy: Option<&str>,
158 collect_raw: bool,
159 prechecked_input_columns: Option<&[String]>,
160 ) -> FloeResult<Vec<ReadInput>> {
161 let _ = prechecked_input_columns;
162 self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
163 }
164}
165
166pub trait AcceptedSinkAdapter: Send + Sync {
167 #[allow(clippy::too_many_arguments)]
168 fn write_accepted(
169 &self,
170 target: &Target,
171 df: &mut DataFrame,
172 mode: config::WriteMode,
173 output_stem: &str,
174 temp_dir: Option<&Path>,
175 cloud: &mut io::storage::CloudClient,
176 resolver: &config::StorageResolver,
177 catalogs: &config::CatalogResolver,
178 entity: &config::EntityConfig,
179 ) -> FloeResult<AcceptedWriteOutput>;
180}
181
182pub struct RejectedWriteRequest<'a> {
183 pub target: &'a Target,
184 pub df: &'a mut DataFrame,
185 pub source_stem: &'a str,
186 pub temp_dir: Option<&'a Path>,
187 pub cloud: &'a mut io::storage::CloudClient,
188 pub resolver: &'a config::StorageResolver,
189 pub entity: &'a config::EntityConfig,
190 pub mode: config::WriteMode,
191}
192
193pub trait RejectedSinkAdapter: Send + Sync {
194 fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
195}
196
197#[derive(Debug, Clone, Copy)]
198pub enum FormatKind {
199 Source,
200 SinkAccepted,
201 SinkRejected,
202}
203
204impl FormatKind {
205 fn field_path(self) -> &'static str {
206 match self {
207 FormatKind::Source => "source.format",
208 FormatKind::SinkAccepted => "sink.accepted.format",
209 FormatKind::SinkRejected => "sink.rejected.format",
210 }
211 }
212
213 fn description(self) -> &'static str {
214 match self {
215 FormatKind::Source => "source format",
216 FormatKind::SinkAccepted => "accepted sink format",
217 FormatKind::SinkRejected => "rejected sink format",
218 }
219 }
220}
221
222fn unsupported_format_error(
223 kind: FormatKind,
224 format: &str,
225 entity_name: Option<&str>,
226) -> ConfigError {
227 if let Some(entity_name) = entity_name {
228 return ConfigError(format!(
229 "entity.name={} {}={} is unsupported",
230 entity_name,
231 kind.field_path(),
232 format
233 ));
234 }
235 ConfigError(format!("unsupported {}: {format}", kind.description()))
236}
237
238pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
239 if input_adapter(format).is_err() {
240 return Err(Box::new(unsupported_format_error(
241 FormatKind::Source,
242 format,
243 Some(entity_name),
244 )));
245 }
246 Ok(())
247}
248
249pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
250 if accepted_sink_adapter(format).is_err() {
251 return Err(Box::new(unsupported_format_error(
252 FormatKind::SinkAccepted,
253 format,
254 Some(entity_name),
255 )));
256 }
257 Ok(())
258}
259
260pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
261 if rejected_sink_adapter(format).is_err() {
262 return Err(Box::new(unsupported_format_error(
263 FormatKind::SinkRejected,
264 format,
265 Some(entity_name),
266 )));
267 }
268 Ok(())
269}
270
271pub fn resolve_read_columns(
272 entity: &config::EntityConfig,
273 normalized_columns: &[config::ColumnConfig],
274 normalize_strategy: Option<&str>,
275) -> FloeResult<Vec<config::ColumnConfig>> {
276 if entity.source.format == "json" || entity.source.format == "xml" {
277 check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
278 } else {
279 Ok(normalized_columns.to_vec())
280 }
281}
282
283pub fn sink_options_warning(
284 entity_name: &str,
285 format: &str,
286 options: Option<&config::SinkOptions>,
287) -> Option<String> {
288 let options = options?;
289 if format == "parquet" {
290 return None;
291 }
292 let mut keys = Vec::new();
293 if options.compression.is_some() {
294 keys.push("compression");
295 }
296 if options.row_group_size.is_some() {
297 keys.push("row_group_size");
298 }
299 if options.max_size_per_file.is_some() {
300 keys.push("max_size_per_file");
301 }
302 let detail = if keys.is_empty() {
303 "options".to_string()
304 } else {
305 keys.join(", ")
306 };
307 Some(format!(
308 "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
309 entity_name, format
310 ))
311}
312
313pub fn validate_sink_options(
314 entity_name: &str,
315 format: &str,
316 options: Option<&config::SinkOptions>,
317) -> FloeResult<()> {
318 let options = match options {
319 Some(options) => options,
320 None => return Ok(()),
321 };
322 if format != "parquet" {
323 return Ok(());
324 }
325 if let Some(compression) = &options.compression {
326 match compression.as_str() {
327 "snappy" | "gzip" | "zstd" | "uncompressed" => {}
328 _ => {
329 return Err(Box::new(ConfigError(format!(
330 "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
331 entity_name, compression
332 ))))
333 }
334 }
335 }
336 if let Some(row_group_size) = options.row_group_size {
337 if row_group_size == 0 {
338 return Err(Box::new(ConfigError(format!(
339 "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
340 entity_name
341 ))));
342 }
343 }
344 if let Some(max_size_per_file) = options.max_size_per_file {
345 if max_size_per_file == 0 {
346 return Err(Box::new(ConfigError(format!(
347 "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
348 entity_name
349 ))));
350 }
351 }
352 Ok(())
353}
354
355pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
356 match format {
357 "csv" => Ok(io::read::csv::csv_input_adapter()),
358 "tsv" => Ok(io::read::csv::tsv_input_adapter()),
359 "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
360 "orc" => Ok(io::read::orc::orc_input_adapter()),
361 "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
362 "json" => Ok(io::read::json::json_input_adapter()),
363 "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
364 "avro" => Ok(io::read::avro::avro_input_adapter()),
365 "xml" => Ok(io::read::xml::xml_input_adapter()),
366 _ => Err(Box::new(unsupported_format_error(
367 FormatKind::Source,
368 format,
369 None,
370 ))),
371 }
372}
373
374pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
375 match format {
376 "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
377 "delta" => Ok(io::write::delta::delta_accepted_adapter()),
378 "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
379 _ => Err(Box::new(unsupported_format_error(
380 FormatKind::SinkAccepted,
381 format,
382 None,
383 ))),
384 }
385}
386
387pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
388 match format {
389 "csv" => Ok(io::write::csv::csv_rejected_adapter()),
390 _ => Err(Box::new(unsupported_format_error(
391 FormatKind::SinkRejected,
392 format,
393 None,
394 ))),
395 }
396}
397
398pub(crate) fn read_input_from_df(
399 input_file: &LocalInputFile,
400 df: &DataFrame,
401 columns: &[config::ColumnConfig],
402 normalize_strategy: Option<&str>,
403 collect_raw: bool,
404) -> FloeResult<ReadInput> {
405 let input_columns = df
406 .get_column_names()
407 .iter()
408 .map(|name| name.to_string())
409 .collect::<Vec<_>>();
410 let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
411 let raw_df = if collect_raw {
412 Some(cast_df_to_string(df)?)
413 } else {
414 None
415 };
416 let typed_df = cast_df_to_schema(df, &typed_schema)?;
417 finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
418}
419
420pub(crate) fn finalize_read_input(
421 input_file: &LocalInputFile,
422 mut raw_df: Option<DataFrame>,
423 mut typed_df: DataFrame,
424 normalize_strategy: Option<&str>,
425) -> FloeResult<ReadInput> {
426 if let Some(strategy) = normalize_strategy {
427 if let Some(raw_df) = raw_df.as_mut() {
428 crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
429 }
430 crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
431 }
432 Ok(ReadInput::Data {
433 input_file: input_file.file.clone(),
434 raw_df,
435 typed_df,
436 })
437}
438
439pub(crate) fn build_typed_schema(
440 input_columns: &[String],
441 declared_columns: &[config::ColumnConfig],
442 normalize_strategy: Option<&str>,
443) -> FloeResult<Schema> {
444 let mut declared_types = HashMap::new();
445 for column in declared_columns {
446 declared_types.insert(
447 column.name.as_str(),
448 config::parse_data_type(&column.column_type)?,
449 );
450 }
451
452 let mut schema = Schema::with_capacity(input_columns.len());
453 for name in input_columns {
454 let normalized = if let Some(strategy) = normalize_strategy {
455 crate::checks::normalize::normalize_name(name, strategy)
456 } else {
457 name.to_string()
458 };
459 let dtype = declared_types
460 .get(normalized.as_str())
461 .cloned()
462 .unwrap_or(DataType::String);
463 schema.insert(name.as_str().into(), dtype);
464 }
465 Ok(schema)
466}
467
468pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
469 cast_df_with_type(df, &DataType::String)
470}
471
472pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
473 let mut columns = Vec::with_capacity(schema.len());
474 for (name, dtype) in schema.iter() {
475 let series = df.column(name.as_str()).map_err(|err| {
476 Box::new(ConfigError(format!(
477 "input column {} not found: {err}",
478 name.as_str()
479 )))
480 })?;
481 let casted =
482 if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
483 cast_string_to_bool(name.as_str(), series)?
484 } else {
485 series
486 .cast_with_options(dtype, CastOptions::NonStrict)
487 .map_err(|err| {
488 Box::new(ConfigError(format!(
489 "failed to cast input column {}: {err}",
490 name.as_str()
491 )))
492 })?
493 };
494 columns.push(casted);
495 }
496 DataFrame::new(columns).map_err(|err| {
497 Box::new(ConfigError(format!(
498 "failed to build typed dataframe: {err}"
499 ))) as Box<dyn std::error::Error + Send + Sync>
500 })
501}
502
503fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
504 let string_values = series.as_materialized_series().str().map_err(|err| {
505 Box::new(ConfigError(format!(
506 "failed to read boolean column {} as string: {err}",
507 name
508 )))
509 })?;
510 let mut values = Vec::with_capacity(series.len());
511 for value in string_values {
512 let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
513 "true" | "1" => Some(true),
514 "false" | "0" => Some(false),
515 _ => None,
516 });
517 values.push(parsed);
518 }
519 Ok(Series::new(name.into(), values).into())
520}
521
522fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
523 let mut out = df.clone();
524 let names = out
525 .get_column_names()
526 .iter()
527 .map(|name| name.to_string())
528 .collect::<Vec<_>>();
529 for name in names {
530 let series = out.column(&name).map_err(|err| {
531 Box::new(ConfigError(format!(
532 "input column {} not found: {err}",
533 name
534 )))
535 })?;
536 let casted = series
537 .cast_with_options(dtype, CastOptions::NonStrict)
538 .map_err(|err| {
539 Box::new(ConfigError(format!(
540 "failed to cast input column {}: {err}",
541 name
542 )))
543 })?;
544 let idx = out.get_column_index(&name).ok_or_else(|| {
545 Box::new(ConfigError(format!(
546 "input column {} not found for update",
547 name
548 )))
549 })?;
550 out.replace_column(idx, casted).map_err(|err| {
551 Box::new(ConfigError(format!(
552 "failed to update input column {}: {err}",
553 name
554 )))
555 })?;
556 }
557 Ok(out)
558}
559pub fn collect_row_errors(
560 raw_df: &DataFrame,
561 typed_df: &DataFrame,
562 required_cols: &[String],
563 columns: &[config::ColumnConfig],
564 track_cast_errors: bool,
565 raw_indices: &check::ColumnIndex,
566 typed_indices: &check::ColumnIndex,
567) -> FloeResult<Vec<Vec<check::RowError>>> {
568 let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
569 if track_cast_errors {
570 let cast_errors =
571 check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
572 for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
573 errors.extend(cast);
574 }
575 }
576 Ok(error_lists)
577}