1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12 pub source_uri: String,
13 pub source_name: String,
14 pub source_stem: String,
15 pub source_size: Option<u64>,
16 pub source_mtime: Option<String>,
17}
18
19#[derive(Debug, Clone)]
25pub struct LocalInputFile {
26 pub file: InputFile,
27 pub local_path: PathBuf,
28 pub is_ephemeral: bool,
29}
30
31#[derive(Debug, Clone)]
32pub struct FileReadError {
33 pub rule: String,
34 pub message: String,
35}
36
37pub enum ReadInput {
38 Data {
39 input_file: InputFile,
40 raw_df: Option<DataFrame>,
41 typed_df: DataFrame,
42 },
43 FileError {
44 input_file: InputFile,
45 error: FileReadError,
46 },
47}
48
49#[derive(Debug, Clone)]
50pub struct AcceptedWriteMetrics {
51 pub total_bytes_written: Option<u64>,
52 pub avg_file_size_mb: Option<f64>,
53 pub small_files_count: Option<u64>,
54}
55
56#[derive(Debug, Clone, Default)]
57pub struct AcceptedSchemaEvolution {
58 pub enabled: bool,
59 pub mode: String,
60 pub applied: bool,
61 pub added_columns: Vec<String>,
62 pub incompatible_changes_detected: bool,
63}
64
65#[derive(Debug, Clone, Default)]
66pub struct AcceptedWritePerfBreakdown {
67 pub conversion_ms: Option<u64>,
68 pub source_df_build_ms: Option<u64>,
69 pub merge_exec_ms: Option<u64>,
70 pub data_write_ms: Option<u64>,
71 pub commit_ms: Option<u64>,
72 pub metrics_read_ms: Option<u64>,
73}
74
75#[derive(Debug, Clone)]
76pub struct AcceptedMergeMetrics {
77 pub merge_key: Vec<String>,
78 pub inserted_count: u64,
79 pub updated_count: u64,
80 pub closed_count: Option<u64>,
81 pub unchanged_count: Option<u64>,
82 pub target_rows_before: u64,
83 pub target_rows_after: u64,
84 pub merge_elapsed_ms: u64,
85}
86
87#[derive(Debug, Clone)]
88pub struct AcceptedWriteOutput {
89 pub files_written: Option<u64>,
90 pub parts_written: u64,
91 pub part_files: Vec<String>,
92 pub table_version: Option<i64>,
93 pub snapshot_id: Option<i64>,
94 pub table_root_uri: Option<String>,
95 pub iceberg_catalog_name: Option<String>,
96 pub iceberg_database: Option<String>,
97 pub iceberg_namespace: Option<String>,
98 pub iceberg_table: Option<String>,
99 pub metrics: AcceptedWriteMetrics,
100 pub merge: Option<AcceptedMergeMetrics>,
101 pub schema_evolution: AcceptedSchemaEvolution,
102 pub perf: Option<AcceptedWritePerfBreakdown>,
103}
104
105pub trait InputAdapter: Send + Sync {
106 fn format(&self) -> &'static str;
107
108 fn default_globs(&self) -> FloeResult<Vec<String>> {
109 io::storage::extensions::glob_patterns_for_format(self.format())
110 }
111
112 fn suffixes(&self) -> FloeResult<Vec<String>> {
113 io::storage::extensions::suffixes_for_format(self.format())
114 }
115
116 fn resolve_local_inputs(
117 &self,
118 config_dir: &Path,
119 entity_name: &str,
120 source: &config::SourceConfig,
121 storage: &str,
122 ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
123 let default_globs = self.default_globs()?;
124 io::storage::local::resolve_local_inputs(
125 config_dir,
126 entity_name,
127 source,
128 storage,
129 &default_globs,
130 )
131 }
132
133 fn read_input_columns(
134 &self,
135 entity: &config::EntityConfig,
136 input_file: &LocalInputFile,
137 columns: &[config::ColumnConfig],
138 ) -> Result<Vec<String>, FileReadError>;
139
140 fn read_inputs(
141 &self,
142 entity: &config::EntityConfig,
143 files: &[LocalInputFile],
144 columns: &[config::ColumnConfig],
145 normalize_strategy: Option<&str>,
146 collect_raw: bool,
147 ) -> FloeResult<Vec<ReadInput>>;
148
149 fn read_inputs_with_prechecked_columns(
150 &self,
151 entity: &config::EntityConfig,
152 files: &[LocalInputFile],
153 columns: &[config::ColumnConfig],
154 normalize_strategy: Option<&str>,
155 collect_raw: bool,
156 prechecked_input_columns: Option<&[String]>,
157 ) -> FloeResult<Vec<ReadInput>> {
158 let _ = prechecked_input_columns;
159 self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
160 }
161}
162
163pub trait AcceptedSinkAdapter: Send + Sync {
164 #[allow(clippy::too_many_arguments)]
165 fn write_accepted(
166 &self,
167 target: &Target,
168 df: &mut DataFrame,
169 mode: config::WriteMode,
170 output_stem: &str,
171 temp_dir: Option<&Path>,
172 cloud: &mut io::storage::CloudClient,
173 resolver: &config::StorageResolver,
174 catalogs: &config::CatalogResolver,
175 entity: &config::EntityConfig,
176 ) -> FloeResult<AcceptedWriteOutput>;
177}
178
179pub struct RejectedWriteRequest<'a> {
180 pub target: &'a Target,
181 pub df: &'a mut DataFrame,
182 pub source_stem: &'a str,
183 pub temp_dir: Option<&'a Path>,
184 pub cloud: &'a mut io::storage::CloudClient,
185 pub resolver: &'a config::StorageResolver,
186 pub entity: &'a config::EntityConfig,
187 pub mode: config::WriteMode,
188}
189
190pub trait RejectedSinkAdapter: Send + Sync {
191 fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
192}
193
194#[derive(Debug, Clone, Copy)]
195pub enum FormatKind {
196 Source,
197 SinkAccepted,
198 SinkRejected,
199}
200
201impl FormatKind {
202 fn field_path(self) -> &'static str {
203 match self {
204 FormatKind::Source => "source.format",
205 FormatKind::SinkAccepted => "sink.accepted.format",
206 FormatKind::SinkRejected => "sink.rejected.format",
207 }
208 }
209
210 fn description(self) -> &'static str {
211 match self {
212 FormatKind::Source => "source format",
213 FormatKind::SinkAccepted => "accepted sink format",
214 FormatKind::SinkRejected => "rejected sink format",
215 }
216 }
217}
218
219fn unsupported_format_error(
220 kind: FormatKind,
221 format: &str,
222 entity_name: Option<&str>,
223) -> ConfigError {
224 if let Some(entity_name) = entity_name {
225 return ConfigError(format!(
226 "entity.name={} {}={} is unsupported",
227 entity_name,
228 kind.field_path(),
229 format
230 ));
231 }
232 ConfigError(format!("unsupported {}: {format}", kind.description()))
233}
234
235pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
236 if input_adapter(format).is_err() {
237 return Err(Box::new(unsupported_format_error(
238 FormatKind::Source,
239 format,
240 Some(entity_name),
241 )));
242 }
243 Ok(())
244}
245
246pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
247 if accepted_sink_adapter(format).is_err() {
248 return Err(Box::new(unsupported_format_error(
249 FormatKind::SinkAccepted,
250 format,
251 Some(entity_name),
252 )));
253 }
254 Ok(())
255}
256
257pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
258 if rejected_sink_adapter(format).is_err() {
259 return Err(Box::new(unsupported_format_error(
260 FormatKind::SinkRejected,
261 format,
262 Some(entity_name),
263 )));
264 }
265 Ok(())
266}
267
268pub fn resolve_read_columns(
269 entity: &config::EntityConfig,
270 normalized_columns: &[config::ColumnConfig],
271 normalize_strategy: Option<&str>,
272) -> FloeResult<Vec<config::ColumnConfig>> {
273 if entity.source.format == "json" || entity.source.format == "xml" {
274 check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
275 } else {
276 Ok(normalized_columns.to_vec())
277 }
278}
279
280pub fn sink_options_warning(
281 entity_name: &str,
282 format: &str,
283 options: Option<&config::SinkOptions>,
284) -> Option<String> {
285 let options = options?;
286 if format == "parquet" {
287 return None;
288 }
289 let mut keys = Vec::new();
290 if options.compression.is_some() {
291 keys.push("compression");
292 }
293 if options.row_group_size.is_some() {
294 keys.push("row_group_size");
295 }
296 if options.max_size_per_file.is_some() {
297 keys.push("max_size_per_file");
298 }
299 let detail = if keys.is_empty() {
300 "options".to_string()
301 } else {
302 keys.join(", ")
303 };
304 Some(format!(
305 "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
306 entity_name, format
307 ))
308}
309
310pub fn validate_sink_options(
311 entity_name: &str,
312 format: &str,
313 options: Option<&config::SinkOptions>,
314) -> FloeResult<()> {
315 let options = match options {
316 Some(options) => options,
317 None => return Ok(()),
318 };
319 if format != "parquet" {
320 return Ok(());
321 }
322 if let Some(compression) = &options.compression {
323 match compression.as_str() {
324 "snappy" | "gzip" | "zstd" | "uncompressed" => {}
325 _ => {
326 return Err(Box::new(ConfigError(format!(
327 "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
328 entity_name, compression
329 ))))
330 }
331 }
332 }
333 if let Some(row_group_size) = options.row_group_size {
334 if row_group_size == 0 {
335 return Err(Box::new(ConfigError(format!(
336 "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
337 entity_name
338 ))));
339 }
340 }
341 if let Some(max_size_per_file) = options.max_size_per_file {
342 if max_size_per_file == 0 {
343 return Err(Box::new(ConfigError(format!(
344 "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
345 entity_name
346 ))));
347 }
348 }
349 Ok(())
350}
351
352pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
353 match format {
354 "csv" => Ok(io::read::csv::csv_input_adapter()),
355 "tsv" => Ok(io::read::csv::tsv_input_adapter()),
356 "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
357 "orc" => Ok(io::read::orc::orc_input_adapter()),
358 "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
359 "json" => Ok(io::read::json::json_input_adapter()),
360 "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
361 "avro" => Ok(io::read::avro::avro_input_adapter()),
362 "xml" => Ok(io::read::xml::xml_input_adapter()),
363 _ => Err(Box::new(unsupported_format_error(
364 FormatKind::Source,
365 format,
366 None,
367 ))),
368 }
369}
370
371pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
372 match format {
373 "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
374 "delta" => Ok(io::write::delta::delta_accepted_adapter()),
375 "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
376 _ => Err(Box::new(unsupported_format_error(
377 FormatKind::SinkAccepted,
378 format,
379 None,
380 ))),
381 }
382}
383
384pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
385 match format {
386 "csv" => Ok(io::write::csv::csv_rejected_adapter()),
387 _ => Err(Box::new(unsupported_format_error(
388 FormatKind::SinkRejected,
389 format,
390 None,
391 ))),
392 }
393}
394
395pub(crate) fn read_input_from_df(
396 input_file: &LocalInputFile,
397 df: &DataFrame,
398 columns: &[config::ColumnConfig],
399 normalize_strategy: Option<&str>,
400 collect_raw: bool,
401) -> FloeResult<ReadInput> {
402 let input_columns = df
403 .get_column_names()
404 .iter()
405 .map(|name| name.to_string())
406 .collect::<Vec<_>>();
407 let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
408 let raw_df = if collect_raw {
409 Some(cast_df_to_string(df)?)
410 } else {
411 None
412 };
413 let typed_df = cast_df_to_schema(df, &typed_schema)?;
414 finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
415}
416
417pub(crate) fn finalize_read_input(
418 input_file: &LocalInputFile,
419 mut raw_df: Option<DataFrame>,
420 mut typed_df: DataFrame,
421 normalize_strategy: Option<&str>,
422) -> FloeResult<ReadInput> {
423 if let Some(strategy) = normalize_strategy {
424 if let Some(raw_df) = raw_df.as_mut() {
425 crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
426 }
427 crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
428 }
429 Ok(ReadInput::Data {
430 input_file: input_file.file.clone(),
431 raw_df,
432 typed_df,
433 })
434}
435
436pub(crate) fn build_typed_schema(
437 input_columns: &[String],
438 declared_columns: &[config::ColumnConfig],
439 normalize_strategy: Option<&str>,
440) -> FloeResult<Schema> {
441 let mut declared_types = HashMap::new();
442 for column in declared_columns {
443 declared_types.insert(
444 column.name.as_str(),
445 config::parse_data_type(&column.column_type)?,
446 );
447 }
448
449 let mut schema = Schema::with_capacity(input_columns.len());
450 for name in input_columns {
451 let normalized = if let Some(strategy) = normalize_strategy {
452 crate::checks::normalize::normalize_name(name, strategy)
453 } else {
454 name.to_string()
455 };
456 let dtype = declared_types
457 .get(normalized.as_str())
458 .cloned()
459 .unwrap_or(DataType::String);
460 schema.insert(name.as_str().into(), dtype);
461 }
462 Ok(schema)
463}
464
465pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
466 cast_df_with_type(df, &DataType::String)
467}
468
469pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
470 let mut columns = Vec::with_capacity(schema.len());
471 for (name, dtype) in schema.iter() {
472 let series = df.column(name.as_str()).map_err(|err| {
473 Box::new(ConfigError(format!(
474 "input column {} not found: {err}",
475 name.as_str()
476 )))
477 })?;
478 let casted =
479 if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
480 cast_string_to_bool(name.as_str(), series)?
481 } else {
482 series
483 .cast_with_options(dtype, CastOptions::NonStrict)
484 .map_err(|err| {
485 Box::new(ConfigError(format!(
486 "failed to cast input column {}: {err}",
487 name.as_str()
488 )))
489 })?
490 };
491 columns.push(casted);
492 }
493 DataFrame::new(columns).map_err(|err| {
494 Box::new(ConfigError(format!(
495 "failed to build typed dataframe: {err}"
496 ))) as Box<dyn std::error::Error + Send + Sync>
497 })
498}
499
500fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
501 let string_values = series.as_materialized_series().str().map_err(|err| {
502 Box::new(ConfigError(format!(
503 "failed to read boolean column {} as string: {err}",
504 name
505 )))
506 })?;
507 let mut values = Vec::with_capacity(series.len());
508 for value in string_values {
509 let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
510 "true" | "1" => Some(true),
511 "false" | "0" => Some(false),
512 _ => None,
513 });
514 values.push(parsed);
515 }
516 Ok(Series::new(name.into(), values).into())
517}
518
519fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
520 let mut out = df.clone();
521 let names = out
522 .get_column_names()
523 .iter()
524 .map(|name| name.to_string())
525 .collect::<Vec<_>>();
526 for name in names {
527 let series = out.column(&name).map_err(|err| {
528 Box::new(ConfigError(format!(
529 "input column {} not found: {err}",
530 name
531 )))
532 })?;
533 let casted = series
534 .cast_with_options(dtype, CastOptions::NonStrict)
535 .map_err(|err| {
536 Box::new(ConfigError(format!(
537 "failed to cast input column {}: {err}",
538 name
539 )))
540 })?;
541 let idx = out.get_column_index(&name).ok_or_else(|| {
542 Box::new(ConfigError(format!(
543 "input column {} not found for update",
544 name
545 )))
546 })?;
547 out.replace_column(idx, casted).map_err(|err| {
548 Box::new(ConfigError(format!(
549 "failed to update input column {}: {err}",
550 name
551 )))
552 })?;
553 }
554 Ok(out)
555}
556pub fn collect_row_errors(
557 raw_df: &DataFrame,
558 typed_df: &DataFrame,
559 required_cols: &[String],
560 columns: &[config::ColumnConfig],
561 track_cast_errors: bool,
562 raw_indices: &check::ColumnIndex,
563 typed_indices: &check::ColumnIndex,
564) -> FloeResult<Vec<Vec<check::RowError>>> {
565 let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
566 if track_cast_errors {
567 let cast_errors =
568 check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
569 for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
570 errors.extend(cast);
571 }
572 }
573 Ok(error_lists)
574}