1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12 pub source_uri: String,
13 pub source_name: String,
14 pub source_stem: String,
15 pub source_size: Option<u64>,
16 pub source_mtime: Option<String>,
17}
18
19#[derive(Debug, Clone)]
25pub struct LocalInputFile {
26 pub file: InputFile,
27 pub local_path: PathBuf,
28 pub is_ephemeral: bool,
29}
30
31#[derive(Debug, Clone)]
32pub struct FileReadError {
33 pub rule: String,
34 pub message: String,
35}
36
37pub enum ReadInput {
38 Data {
39 input_file: InputFile,
40 raw_df: Option<DataFrame>,
41 typed_df: DataFrame,
42 },
43 FileError {
44 input_file: InputFile,
45 error: FileReadError,
46 },
47}
48
49#[derive(Debug, Clone, Default)]
50pub struct AcceptedWriteMetrics {
51 pub total_bytes_written: Option<u64>,
52 pub avg_file_size_mb: Option<f64>,
53 pub small_files_count: Option<u64>,
54}
55
56#[derive(Debug, Clone, Default)]
57pub struct AcceptedSchemaEvolution {
58 pub enabled: bool,
59 pub mode: String,
60 pub applied: bool,
61 pub added_columns: Vec<String>,
62 pub incompatible_changes_detected: bool,
63}
64
65#[derive(Debug, Clone, Default)]
66pub struct AcceptedWritePerfBreakdown {
67 pub conversion_ms: Option<u64>,
68 pub source_df_build_ms: Option<u64>,
69 pub merge_exec_ms: Option<u64>,
70 pub data_write_ms: Option<u64>,
71 pub commit_ms: Option<u64>,
72 pub metrics_read_ms: Option<u64>,
73}
74
75#[derive(Debug, Clone)]
76pub struct AcceptedMergeMetrics {
77 pub merge_key: Vec<String>,
78 pub inserted_count: u64,
79 pub updated_count: u64,
80 pub closed_count: Option<u64>,
81 pub unchanged_count: Option<u64>,
82 pub target_rows_before: u64,
83 pub target_rows_after: u64,
84 pub merge_elapsed_ms: u64,
85}
86
87#[derive(Debug, Clone)]
88pub enum CatalogRegistration {
89 UnityDelta {
90 catalog_name: String,
91 schema: String,
92 table: String,
93 },
94 IcebergGlue {
95 catalog_name: String,
96 database: Option<String>,
97 namespace: String,
98 table: String,
99 },
100 IcebergRest {
101 catalog_name: String,
102 namespace: String,
103 table: String,
104 },
105}
106
107#[derive(Debug, Clone, Default)]
108pub struct AcceptedWriteOutput {
109 pub files_written: Option<u64>,
110 pub parts_written: u64,
111 pub part_files: Vec<String>,
112 pub table_version: Option<i64>,
113 pub snapshot_id: Option<i64>,
114 pub table_root_uri: Option<String>,
115 pub catalog: Option<CatalogRegistration>,
116 pub metrics: AcceptedWriteMetrics,
117 pub merge: Option<AcceptedMergeMetrics>,
118 pub schema_evolution: AcceptedSchemaEvolution,
119 pub perf: Option<AcceptedWritePerfBreakdown>,
120}
121
122pub trait InputAdapter: Send + Sync {
123 fn format(&self) -> &'static str;
124
125 fn default_globs(&self) -> FloeResult<Vec<String>> {
126 io::storage::extensions::glob_patterns_for_format(self.format())
127 }
128
129 fn suffixes(&self) -> FloeResult<Vec<String>> {
130 io::storage::extensions::suffixes_for_format(self.format())
131 }
132
133 fn resolve_local_inputs(
134 &self,
135 config_dir: &Path,
136 entity_name: &str,
137 source: &config::SourceConfig,
138 storage: &str,
139 ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
140 let default_globs = self.default_globs()?;
141 io::storage::local::resolve_local_inputs(
142 config_dir,
143 entity_name,
144 source,
145 storage,
146 &default_globs,
147 )
148 }
149
150 fn read_input_columns(
151 &self,
152 entity: &config::EntityConfig,
153 input_file: &LocalInputFile,
154 columns: &[config::ColumnConfig],
155 ) -> Result<Vec<String>, FileReadError>;
156
157 fn read_inputs(
158 &self,
159 entity: &config::EntityConfig,
160 files: &[LocalInputFile],
161 columns: &[config::ColumnConfig],
162 normalize_strategy: Option<&str>,
163 collect_raw: bool,
164 ) -> FloeResult<Vec<ReadInput>>;
165
166 fn read_inputs_with_prechecked_columns(
167 &self,
168 entity: &config::EntityConfig,
169 files: &[LocalInputFile],
170 columns: &[config::ColumnConfig],
171 normalize_strategy: Option<&str>,
172 collect_raw: bool,
173 prechecked_input_columns: Option<&[String]>,
174 ) -> FloeResult<Vec<ReadInput>> {
175 let _ = prechecked_input_columns;
176 self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
177 }
178}
179
180pub struct AcceptedWriteRequest<'a> {
181 pub target: &'a Target,
182 pub df: &'a mut DataFrame,
183 pub mode: config::WriteMode,
184 pub output_stem: &'a str,
185 pub temp_dir: Option<&'a Path>,
186 pub cloud: &'a mut io::storage::CloudClient,
187 pub resolver: &'a config::StorageResolver,
188 pub catalogs: &'a config::CatalogResolver,
189 pub entity: &'a config::EntityConfig,
190}
191
192pub struct RejectedWriteRequest<'a> {
193 pub target: &'a Target,
194 pub df: &'a mut DataFrame,
195 pub source_stem: &'a str,
196 pub temp_dir: Option<&'a Path>,
197 pub cloud: &'a mut io::storage::CloudClient,
198 pub resolver: &'a config::StorageResolver,
199 pub entity: &'a config::EntityConfig,
200 pub mode: config::WriteMode,
201}
202
203pub trait RejectedSinkAdapter: Send + Sync {
204 fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
205}
206
207#[derive(Debug, Clone, Copy)]
208pub enum FormatKind {
209 Source,
210 SinkAccepted,
211 SinkRejected,
212}
213
214impl FormatKind {
215 fn field_path(self) -> &'static str {
216 match self {
217 FormatKind::Source => "source.format",
218 FormatKind::SinkAccepted => "sink.accepted.format",
219 FormatKind::SinkRejected => "sink.rejected.format",
220 }
221 }
222
223 fn description(self) -> &'static str {
224 match self {
225 FormatKind::Source => "source format",
226 FormatKind::SinkAccepted => "accepted sink format",
227 FormatKind::SinkRejected => "rejected sink format",
228 }
229 }
230}
231
232fn unsupported_format_error(
233 kind: FormatKind,
234 format: &str,
235 entity_name: Option<&str>,
236) -> ConfigError {
237 if let Some(entity_name) = entity_name {
238 return ConfigError(format!(
239 "entity.name={} {}={} is unsupported",
240 entity_name,
241 kind.field_path(),
242 format
243 ));
244 }
245 ConfigError(format!("unsupported {}: {format}", kind.description()))
246}
247
248pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
249 if input_adapter(format).is_err() {
250 return Err(Box::new(unsupported_format_error(
251 FormatKind::Source,
252 format,
253 Some(entity_name),
254 )));
255 }
256 Ok(())
257}
258
259pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
260 if crate::io::write::sink_format::sink_format(format).is_err() {
261 return Err(Box::new(unsupported_format_error(
262 FormatKind::SinkAccepted,
263 format,
264 Some(entity_name),
265 )));
266 }
267 Ok(())
268}
269
270pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
271 if rejected_sink_adapter(format).is_err() {
272 return Err(Box::new(unsupported_format_error(
273 FormatKind::SinkRejected,
274 format,
275 Some(entity_name),
276 )));
277 }
278 Ok(())
279}
280
281pub fn resolve_read_columns(
282 entity: &config::EntityConfig,
283 normalized_columns: &[config::ColumnConfig],
284 normalize_strategy: Option<&str>,
285) -> FloeResult<Vec<config::ColumnConfig>> {
286 if entity.source.format == "json" || entity.source.format == "xml" {
287 check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
288 } else {
289 Ok(normalized_columns.to_vec())
290 }
291}
292
293pub fn sink_options_warning(
294 entity_name: &str,
295 format: &str,
296 options: Option<&config::SinkOptions>,
297) -> Option<String> {
298 let options = options?;
299 if format == "parquet" {
300 return None;
301 }
302 let mut keys = Vec::new();
303 if options.compression.is_some() {
304 keys.push("compression");
305 }
306 if options.row_group_size.is_some() {
307 keys.push("row_group_size");
308 }
309 if options.max_size_per_file.is_some() {
310 keys.push("max_size_per_file");
311 }
312 let detail = if keys.is_empty() {
313 "options".to_string()
314 } else {
315 keys.join(", ")
316 };
317 Some(format!(
318 "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
319 entity_name, format
320 ))
321}
322
323pub fn validate_sink_options(
324 entity_name: &str,
325 format: &str,
326 options: Option<&config::SinkOptions>,
327) -> FloeResult<()> {
328 let options = match options {
329 Some(options) => options,
330 None => return Ok(()),
331 };
332 if format != "parquet" {
333 return Ok(());
334 }
335 if let Some(compression) = &options.compression {
336 match compression.as_str() {
337 "snappy" | "gzip" | "zstd" | "uncompressed" => {}
338 _ => {
339 return Err(Box::new(ConfigError(format!(
340 "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
341 entity_name, compression
342 ))))
343 }
344 }
345 }
346 if let Some(row_group_size) = options.row_group_size {
347 if row_group_size == 0 {
348 return Err(Box::new(ConfigError(format!(
349 "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
350 entity_name
351 ))));
352 }
353 }
354 if let Some(max_size_per_file) = options.max_size_per_file {
355 if max_size_per_file == 0 {
356 return Err(Box::new(ConfigError(format!(
357 "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
358 entity_name
359 ))));
360 }
361 }
362 Ok(())
363}
364
365pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
366 match format {
367 "csv" => Ok(io::read::csv::csv_input_adapter()),
368 "tsv" => Ok(io::read::csv::tsv_input_adapter()),
369 "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
370 "orc" => Ok(io::read::orc::orc_input_adapter()),
371 "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
372 "json" => Ok(io::read::json::json_input_adapter()),
373 "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
374 "avro" => Ok(io::read::avro::avro_input_adapter()),
375 "xml" => Ok(io::read::xml::xml_input_adapter()),
376 _ => Err(Box::new(unsupported_format_error(
377 FormatKind::Source,
378 format,
379 None,
380 ))),
381 }
382}
383
384pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
385 match format {
386 "csv" => Ok(io::write::csv::csv_rejected_adapter()),
387 _ => Err(Box::new(unsupported_format_error(
388 FormatKind::SinkRejected,
389 format,
390 None,
391 ))),
392 }
393}
394
395pub(crate) fn read_input_from_df(
396 input_file: &LocalInputFile,
397 df: &DataFrame,
398 columns: &[config::ColumnConfig],
399 normalize_strategy: Option<&str>,
400 collect_raw: bool,
401) -> FloeResult<ReadInput> {
402 let input_columns = df
403 .get_column_names()
404 .iter()
405 .map(|name| name.to_string())
406 .collect::<Vec<_>>();
407 let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
408 let raw_df = if collect_raw {
409 Some(cast_df_to_string(df)?)
410 } else {
411 None
412 };
413 let typed_df = cast_df_to_schema(df, &typed_schema)?;
414 finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
415}
416
417pub(crate) fn finalize_read_input(
418 input_file: &LocalInputFile,
419 mut raw_df: Option<DataFrame>,
420 mut typed_df: DataFrame,
421 normalize_strategy: Option<&str>,
422) -> FloeResult<ReadInput> {
423 if let Some(strategy) = normalize_strategy {
424 if let Some(raw_df) = raw_df.as_mut() {
425 crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
426 }
427 crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
428 }
429 Ok(ReadInput::Data {
430 input_file: input_file.file.clone(),
431 raw_df,
432 typed_df,
433 })
434}
435
436pub(crate) fn build_typed_schema(
437 input_columns: &[String],
438 declared_columns: &[config::ColumnConfig],
439 normalize_strategy: Option<&str>,
440) -> FloeResult<Schema> {
441 let mut declared_types = HashMap::new();
442 for column in declared_columns {
443 declared_types.insert(
444 column.name.as_str(),
445 config::parse_data_type(&column.column_type)?,
446 );
447 }
448
449 let mut schema = Schema::with_capacity(input_columns.len());
450 for name in input_columns {
451 let normalized = if let Some(strategy) = normalize_strategy {
452 crate::checks::normalize::normalize_name(name, strategy)
453 } else {
454 name.to_string()
455 };
456 let dtype = declared_types
457 .get(normalized.as_str())
458 .cloned()
459 .unwrap_or(DataType::String);
460 schema.insert(name.as_str().into(), dtype);
461 }
462 Ok(schema)
463}
464
465pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
466 cast_df_with_type(df, &DataType::String)
467}
468
469pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
470 let mut columns = Vec::with_capacity(schema.len());
471 for (name, dtype) in schema.iter() {
472 let series = df.column(name.as_str()).map_err(|err| {
473 Box::new(ConfigError(format!(
474 "input column {} not found: {err}",
475 name.as_str()
476 )))
477 })?;
478 let casted =
479 if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
480 cast_string_to_bool(name.as_str(), series)?
481 } else {
482 series
483 .cast_with_options(dtype, CastOptions::NonStrict)
484 .map_err(|err| {
485 Box::new(ConfigError(format!(
486 "failed to cast input column {}: {err}",
487 name.as_str()
488 )))
489 })?
490 };
491 columns.push(casted);
492 }
493 DataFrame::new(columns).map_err(|err| {
494 Box::new(ConfigError(format!(
495 "failed to build typed dataframe: {err}"
496 ))) as Box<dyn std::error::Error + Send + Sync>
497 })
498}
499
500fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
501 let string_values = series.as_materialized_series().str().map_err(|err| {
502 Box::new(ConfigError(format!(
503 "failed to read boolean column {} as string: {err}",
504 name
505 )))
506 })?;
507 let mut values = Vec::with_capacity(series.len());
508 for value in string_values {
509 let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
510 "true" | "1" => Some(true),
511 "false" | "0" => Some(false),
512 _ => None,
513 });
514 values.push(parsed);
515 }
516 Ok(Series::new(name.into(), values).into())
517}
518
519fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
520 let mut out = df.clone();
521 let names = out
522 .get_column_names()
523 .iter()
524 .map(|name| name.to_string())
525 .collect::<Vec<_>>();
526 for name in names {
527 let series = out.column(&name).map_err(|err| {
528 Box::new(ConfigError(format!(
529 "input column {} not found: {err}",
530 name
531 )))
532 })?;
533 let casted = series
534 .cast_with_options(dtype, CastOptions::NonStrict)
535 .map_err(|err| {
536 Box::new(ConfigError(format!(
537 "failed to cast input column {}: {err}",
538 name
539 )))
540 })?;
541 let idx = out.get_column_index(&name).ok_or_else(|| {
542 Box::new(ConfigError(format!(
543 "input column {} not found for update",
544 name
545 )))
546 })?;
547 out.replace_column(idx, casted).map_err(|err| {
548 Box::new(ConfigError(format!(
549 "failed to update input column {}: {err}",
550 name
551 )))
552 })?;
553 }
554 Ok(out)
555}
556pub fn collect_row_errors(
557 raw_df: &DataFrame,
558 typed_df: &DataFrame,
559 required_cols: &[String],
560 columns: &[config::ColumnConfig],
561 track_cast_errors: bool,
562 raw_indices: &check::ColumnIndex,
563 typed_indices: &check::ColumnIndex,
564) -> FloeResult<Vec<Vec<check::RowError>>> {
565 let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
566 if track_cast_errors {
567 let cast_errors =
568 check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
569 for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
570 errors.extend(cast);
571 }
572 }
573 Ok(error_lists)
574}