1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12 pub source_uri: String,
13 pub source_local_path: PathBuf,
14 pub source_name: String,
15 pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20 pub rule: String,
21 pub message: String,
22}
23
24pub enum ReadInput {
25 Data {
26 input_file: InputFile,
27 raw_df: Option<DataFrame>,
28 typed_df: DataFrame,
29 },
30 FileError {
31 input_file: InputFile,
32 error: FileReadError,
33 },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteOutput {
38 pub parts_written: u64,
39 pub part_files: Vec<String>,
40 pub table_version: Option<i64>,
41 pub snapshot_id: Option<i64>,
42}
43
44pub trait InputAdapter: Send + Sync {
45 fn format(&self) -> &'static str;
46
47 fn default_globs(&self) -> FloeResult<Vec<String>> {
48 io::storage::extensions::glob_patterns_for_format(self.format())
49 }
50
51 fn suffixes(&self) -> FloeResult<Vec<String>> {
52 io::storage::extensions::suffixes_for_format(self.format())
53 }
54
55 fn resolve_local_inputs(
56 &self,
57 config_dir: &Path,
58 entity_name: &str,
59 source: &config::SourceConfig,
60 storage: &str,
61 ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
62 let default_globs = self.default_globs()?;
63 io::storage::local::resolve_local_inputs(
64 config_dir,
65 entity_name,
66 source,
67 storage,
68 &default_globs,
69 )
70 }
71
72 fn read_input_columns(
73 &self,
74 entity: &config::EntityConfig,
75 input_file: &InputFile,
76 columns: &[config::ColumnConfig],
77 ) -> Result<Vec<String>, FileReadError>;
78
79 fn read_inputs(
80 &self,
81 entity: &config::EntityConfig,
82 files: &[InputFile],
83 columns: &[config::ColumnConfig],
84 normalize_strategy: Option<&str>,
85 collect_raw: bool,
86 ) -> FloeResult<Vec<ReadInput>>;
87}
88
89pub trait AcceptedSinkAdapter: Send + Sync {
90 #[allow(clippy::too_many_arguments)]
91 fn write_accepted(
92 &self,
93 target: &Target,
94 df: &mut DataFrame,
95 mode: config::WriteMode,
96 output_stem: &str,
97 temp_dir: Option<&Path>,
98 cloud: &mut io::storage::CloudClient,
99 resolver: &config::StorageResolver,
100 entity: &config::EntityConfig,
101 ) -> FloeResult<AcceptedWriteOutput>;
102}
103
104pub struct RejectedWriteRequest<'a> {
105 pub target: &'a Target,
106 pub df: &'a mut DataFrame,
107 pub source_stem: &'a str,
108 pub temp_dir: Option<&'a Path>,
109 pub cloud: &'a mut io::storage::CloudClient,
110 pub resolver: &'a config::StorageResolver,
111 pub entity: &'a config::EntityConfig,
112 pub mode: config::WriteMode,
113}
114
115pub trait RejectedSinkAdapter: Send + Sync {
116 fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
117}
118
119#[derive(Debug, Clone, Copy)]
120pub enum FormatKind {
121 Source,
122 SinkAccepted,
123 SinkRejected,
124}
125
126impl FormatKind {
127 fn field_path(self) -> &'static str {
128 match self {
129 FormatKind::Source => "source.format",
130 FormatKind::SinkAccepted => "sink.accepted.format",
131 FormatKind::SinkRejected => "sink.rejected.format",
132 }
133 }
134
135 fn description(self) -> &'static str {
136 match self {
137 FormatKind::Source => "source format",
138 FormatKind::SinkAccepted => "accepted sink format",
139 FormatKind::SinkRejected => "rejected sink format",
140 }
141 }
142}
143
144fn unsupported_format_error(
145 kind: FormatKind,
146 format: &str,
147 entity_name: Option<&str>,
148) -> ConfigError {
149 if let Some(entity_name) = entity_name {
150 return ConfigError(format!(
151 "entity.name={} {}={} is unsupported",
152 entity_name,
153 kind.field_path(),
154 format
155 ));
156 }
157 ConfigError(format!("unsupported {}: {format}", kind.description()))
158}
159
160pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
161 if input_adapter(format).is_err() {
162 return Err(Box::new(unsupported_format_error(
163 FormatKind::Source,
164 format,
165 Some(entity_name),
166 )));
167 }
168 Ok(())
169}
170
171pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
172 if accepted_sink_adapter(format).is_err() {
173 return Err(Box::new(unsupported_format_error(
174 FormatKind::SinkAccepted,
175 format,
176 Some(entity_name),
177 )));
178 }
179 Ok(())
180}
181
182pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
183 if rejected_sink_adapter(format).is_err() {
184 return Err(Box::new(unsupported_format_error(
185 FormatKind::SinkRejected,
186 format,
187 Some(entity_name),
188 )));
189 }
190 Ok(())
191}
192
193pub fn resolve_read_columns(
194 entity: &config::EntityConfig,
195 normalized_columns: &[config::ColumnConfig],
196 normalize_strategy: Option<&str>,
197) -> FloeResult<Vec<config::ColumnConfig>> {
198 if entity.source.format == "json" || entity.source.format == "xml" {
199 check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
200 } else {
201 Ok(normalized_columns.to_vec())
202 }
203}
204
205pub fn sink_options_warning(
206 entity_name: &str,
207 format: &str,
208 options: Option<&config::SinkOptions>,
209) -> Option<String> {
210 let options = options?;
211 if format == "parquet" {
212 return None;
213 }
214 let mut keys = Vec::new();
215 if options.compression.is_some() {
216 keys.push("compression");
217 }
218 if options.row_group_size.is_some() {
219 keys.push("row_group_size");
220 }
221 if options.max_size_per_file.is_some() {
222 keys.push("max_size_per_file");
223 }
224 let detail = if keys.is_empty() {
225 "options".to_string()
226 } else {
227 keys.join(", ")
228 };
229 Some(format!(
230 "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
231 entity_name, format
232 ))
233}
234
235pub fn validate_sink_options(
236 entity_name: &str,
237 format: &str,
238 options: Option<&config::SinkOptions>,
239) -> FloeResult<()> {
240 let options = match options {
241 Some(options) => options,
242 None => return Ok(()),
243 };
244 if format != "parquet" {
245 return Ok(());
246 }
247 if let Some(compression) = &options.compression {
248 match compression.as_str() {
249 "snappy" | "gzip" | "zstd" | "uncompressed" => {}
250 _ => {
251 return Err(Box::new(ConfigError(format!(
252 "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
253 entity_name, compression
254 ))))
255 }
256 }
257 }
258 if let Some(row_group_size) = options.row_group_size {
259 if row_group_size == 0 {
260 return Err(Box::new(ConfigError(format!(
261 "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
262 entity_name
263 ))));
264 }
265 }
266 if let Some(max_size_per_file) = options.max_size_per_file {
267 if max_size_per_file == 0 {
268 return Err(Box::new(ConfigError(format!(
269 "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
270 entity_name
271 ))));
272 }
273 }
274 Ok(())
275}
276
277pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
278 match format {
279 "csv" => Ok(io::read::csv::csv_input_adapter()),
280 "tsv" => Ok(io::read::csv::tsv_input_adapter()),
281 "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
282 "orc" => Ok(io::read::orc::orc_input_adapter()),
283 "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
284 "json" => Ok(io::read::json::json_input_adapter()),
285 "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
286 "avro" => Ok(io::read::avro::avro_input_adapter()),
287 "xml" => Ok(io::read::xml::xml_input_adapter()),
288 _ => Err(Box::new(unsupported_format_error(
289 FormatKind::Source,
290 format,
291 None,
292 ))),
293 }
294}
295
296pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
297 match format {
298 "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
299 "delta" => Ok(io::write::delta::delta_accepted_adapter()),
300 "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
301 _ => Err(Box::new(unsupported_format_error(
302 FormatKind::SinkAccepted,
303 format,
304 None,
305 ))),
306 }
307}
308
309pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
310 match format {
311 "csv" => Ok(io::write::csv::csv_rejected_adapter()),
312 _ => Err(Box::new(unsupported_format_error(
313 FormatKind::SinkRejected,
314 format,
315 None,
316 ))),
317 }
318}
319
320pub(crate) fn read_input_from_df(
321 input_file: &InputFile,
322 df: &DataFrame,
323 columns: &[config::ColumnConfig],
324 normalize_strategy: Option<&str>,
325 collect_raw: bool,
326) -> FloeResult<ReadInput> {
327 let input_columns = df
328 .get_column_names()
329 .iter()
330 .map(|name| name.to_string())
331 .collect::<Vec<_>>();
332 let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
333 let raw_df = if collect_raw {
334 Some(cast_df_to_string(df)?)
335 } else {
336 None
337 };
338 let typed_df = cast_df_to_schema(df, &typed_schema)?;
339 finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
340}
341
342pub(crate) fn finalize_read_input(
343 input_file: &InputFile,
344 mut raw_df: Option<DataFrame>,
345 mut typed_df: DataFrame,
346 normalize_strategy: Option<&str>,
347) -> FloeResult<ReadInput> {
348 if let Some(strategy) = normalize_strategy {
349 if let Some(raw_df) = raw_df.as_mut() {
350 crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
351 }
352 crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
353 }
354 Ok(ReadInput::Data {
355 input_file: input_file.clone(),
356 raw_df,
357 typed_df,
358 })
359}
360
361pub(crate) fn build_typed_schema(
362 input_columns: &[String],
363 declared_columns: &[config::ColumnConfig],
364 normalize_strategy: Option<&str>,
365) -> FloeResult<Schema> {
366 let mut declared_types = HashMap::new();
367 for column in declared_columns {
368 declared_types.insert(
369 column.name.as_str(),
370 config::parse_data_type(&column.column_type)?,
371 );
372 }
373
374 let mut schema = Schema::with_capacity(input_columns.len());
375 for name in input_columns {
376 let normalized = if let Some(strategy) = normalize_strategy {
377 crate::checks::normalize::normalize_name(name, strategy)
378 } else {
379 name.to_string()
380 };
381 let dtype = declared_types
382 .get(normalized.as_str())
383 .cloned()
384 .unwrap_or(DataType::String);
385 schema.insert(name.as_str().into(), dtype);
386 }
387 Ok(schema)
388}
389
390pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
391 cast_df_with_type(df, &DataType::String)
392}
393
394pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
395 let mut columns = Vec::with_capacity(schema.len());
396 for (name, dtype) in schema.iter() {
397 let series = df.column(name.as_str()).map_err(|err| {
398 Box::new(ConfigError(format!(
399 "input column {} not found: {err}",
400 name.as_str()
401 )))
402 })?;
403 let casted =
404 if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
405 cast_string_to_bool(name.as_str(), series)?
406 } else {
407 series
408 .cast_with_options(dtype, CastOptions::NonStrict)
409 .map_err(|err| {
410 Box::new(ConfigError(format!(
411 "failed to cast input column {}: {err}",
412 name.as_str()
413 )))
414 })?
415 };
416 columns.push(casted);
417 }
418 DataFrame::new(columns).map_err(|err| {
419 Box::new(ConfigError(format!(
420 "failed to build typed dataframe: {err}"
421 ))) as Box<dyn std::error::Error + Send + Sync>
422 })
423}
424
425fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
426 let string_values = series.as_materialized_series().str().map_err(|err| {
427 Box::new(ConfigError(format!(
428 "failed to read boolean column {} as string: {err}",
429 name
430 )))
431 })?;
432 let mut values = Vec::with_capacity(series.len());
433 for value in string_values {
434 let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
435 "true" | "1" => Some(true),
436 "false" | "0" => Some(false),
437 _ => None,
438 });
439 values.push(parsed);
440 }
441 Ok(Series::new(name.into(), values).into())
442}
443
444fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
445 let mut out = df.clone();
446 let names = out
447 .get_column_names()
448 .iter()
449 .map(|name| name.to_string())
450 .collect::<Vec<_>>();
451 for name in names {
452 let series = out.column(&name).map_err(|err| {
453 Box::new(ConfigError(format!(
454 "input column {} not found: {err}",
455 name
456 )))
457 })?;
458 let casted = series
459 .cast_with_options(dtype, CastOptions::NonStrict)
460 .map_err(|err| {
461 Box::new(ConfigError(format!(
462 "failed to cast input column {}: {err}",
463 name
464 )))
465 })?;
466 let idx = out.get_column_index(&name).ok_or_else(|| {
467 Box::new(ConfigError(format!(
468 "input column {} not found for update",
469 name
470 )))
471 })?;
472 out.replace_column(idx, casted).map_err(|err| {
473 Box::new(ConfigError(format!(
474 "failed to update input column {}: {err}",
475 name
476 )))
477 })?;
478 }
479 Ok(out)
480}
481pub fn collect_row_errors(
482 raw_df: &DataFrame,
483 typed_df: &DataFrame,
484 required_cols: &[String],
485 columns: &[config::ColumnConfig],
486 track_cast_errors: bool,
487 raw_indices: &check::ColumnIndex,
488 typed_indices: &check::ColumnIndex,
489) -> FloeResult<Vec<Vec<check::RowError>>> {
490 let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
491 if track_cast_errors {
492 let cast_errors =
493 check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
494 for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
495 errors.extend(cast);
496 }
497 }
498 Ok(error_lists)
499}