1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use polars::chunked_array::cast::CastOptions;
5use polars::prelude::{Column, DataFrame, DataType, NamedFrom, Schema, Series};
6
7use crate::io::storage::Target;
8use crate::{check, config, io, ConfigError, FloeResult};
9
10#[derive(Debug, Clone)]
11pub struct InputFile {
12 pub source_uri: String,
13 pub source_local_path: PathBuf,
14 pub source_name: String,
15 pub source_stem: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct FileReadError {
20 pub rule: String,
21 pub message: String,
22}
23
24pub enum ReadInput {
25 Data {
26 input_file: InputFile,
27 raw_df: Option<DataFrame>,
28 typed_df: DataFrame,
29 },
30 FileError {
31 input_file: InputFile,
32 error: FileReadError,
33 },
34}
35
36#[derive(Debug, Clone)]
37pub struct AcceptedWriteOutput {
38 pub parts_written: u64,
39 pub part_files: Vec<String>,
40 pub table_version: Option<i64>,
41}
42
43pub trait InputAdapter: Send + Sync {
44 fn format(&self) -> &'static str;
45
46 fn default_globs(&self) -> FloeResult<Vec<String>> {
47 io::storage::extensions::glob_patterns_for_format(self.format())
48 }
49
50 fn suffixes(&self) -> FloeResult<Vec<String>> {
51 io::storage::extensions::suffixes_for_format(self.format())
52 }
53
54 fn resolve_local_inputs(
55 &self,
56 config_dir: &Path,
57 entity_name: &str,
58 source: &config::SourceConfig,
59 storage: &str,
60 ) -> FloeResult<io::storage::local::ResolvedLocalInputs> {
61 let default_globs = self.default_globs()?;
62 io::storage::local::resolve_local_inputs(
63 config_dir,
64 entity_name,
65 source,
66 storage,
67 &default_globs,
68 )
69 }
70
71 fn read_input_columns(
72 &self,
73 entity: &config::EntityConfig,
74 input_file: &InputFile,
75 columns: &[config::ColumnConfig],
76 ) -> Result<Vec<String>, FileReadError>;
77
78 fn read_inputs(
79 &self,
80 entity: &config::EntityConfig,
81 files: &[InputFile],
82 columns: &[config::ColumnConfig],
83 normalize_strategy: Option<&str>,
84 collect_raw: bool,
85 ) -> FloeResult<Vec<ReadInput>>;
86}
87
88pub trait AcceptedSinkAdapter: Send + Sync {
89 #[allow(clippy::too_many_arguments)]
90 fn write_accepted(
91 &self,
92 target: &Target,
93 df: &mut DataFrame,
94 mode: config::WriteMode,
95 output_stem: &str,
96 temp_dir: Option<&Path>,
97 cloud: &mut io::storage::CloudClient,
98 resolver: &config::StorageResolver,
99 entity: &config::EntityConfig,
100 ) -> FloeResult<AcceptedWriteOutput>;
101}
102
103pub struct RejectedWriteRequest<'a> {
104 pub target: &'a Target,
105 pub df: &'a mut DataFrame,
106 pub source_stem: &'a str,
107 pub temp_dir: Option<&'a Path>,
108 pub cloud: &'a mut io::storage::CloudClient,
109 pub resolver: &'a config::StorageResolver,
110 pub entity: &'a config::EntityConfig,
111 pub mode: config::WriteMode,
112}
113
114pub trait RejectedSinkAdapter: Send + Sync {
115 fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String>;
116}
117
118#[derive(Debug, Clone, Copy)]
119pub enum FormatKind {
120 Source,
121 SinkAccepted,
122 SinkRejected,
123}
124
125impl FormatKind {
126 fn field_path(self) -> &'static str {
127 match self {
128 FormatKind::Source => "source.format",
129 FormatKind::SinkAccepted => "sink.accepted.format",
130 FormatKind::SinkRejected => "sink.rejected.format",
131 }
132 }
133
134 fn description(self) -> &'static str {
135 match self {
136 FormatKind::Source => "source format",
137 FormatKind::SinkAccepted => "accepted sink format",
138 FormatKind::SinkRejected => "rejected sink format",
139 }
140 }
141}
142
143fn unsupported_format_error(
144 kind: FormatKind,
145 format: &str,
146 entity_name: Option<&str>,
147) -> ConfigError {
148 if let Some(entity_name) = entity_name {
149 return ConfigError(format!(
150 "entity.name={} {}={} is unsupported",
151 entity_name,
152 kind.field_path(),
153 format
154 ));
155 }
156 ConfigError(format!("unsupported {}: {format}", kind.description()))
157}
158
159pub fn ensure_input_format(entity_name: &str, format: &str) -> FloeResult<()> {
160 if input_adapter(format).is_err() {
161 return Err(Box::new(unsupported_format_error(
162 FormatKind::Source,
163 format,
164 Some(entity_name),
165 )));
166 }
167 Ok(())
168}
169
170pub fn ensure_accepted_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
171 if accepted_sink_adapter(format).is_err() {
172 return Err(Box::new(unsupported_format_error(
173 FormatKind::SinkAccepted,
174 format,
175 Some(entity_name),
176 )));
177 }
178 Ok(())
179}
180
181pub fn ensure_rejected_sink_format(entity_name: &str, format: &str) -> FloeResult<()> {
182 if rejected_sink_adapter(format).is_err() {
183 return Err(Box::new(unsupported_format_error(
184 FormatKind::SinkRejected,
185 format,
186 Some(entity_name),
187 )));
188 }
189 Ok(())
190}
191
192pub fn resolve_read_columns(
193 entity: &config::EntityConfig,
194 normalized_columns: &[config::ColumnConfig],
195 normalize_strategy: Option<&str>,
196) -> FloeResult<Vec<config::ColumnConfig>> {
197 if entity.source.format == "json" || entity.source.format == "xml" {
198 check::normalize::resolve_source_columns(&entity.schema.columns, normalize_strategy, true)
199 } else {
200 Ok(normalized_columns.to_vec())
201 }
202}
203
204pub fn sink_options_warning(
205 entity_name: &str,
206 format: &str,
207 options: Option<&config::SinkOptions>,
208) -> Option<String> {
209 let options = options?;
210 if format == "parquet" {
211 return None;
212 }
213 let mut keys = Vec::new();
214 if options.compression.is_some() {
215 keys.push("compression");
216 }
217 if options.row_group_size.is_some() {
218 keys.push("row_group_size");
219 }
220 if options.max_size_per_file.is_some() {
221 keys.push("max_size_per_file");
222 }
223 let detail = if keys.is_empty() {
224 "options".to_string()
225 } else {
226 keys.join(", ")
227 };
228 Some(format!(
229 "entity.name={} sink.accepted.options ({detail}) ignored for format={}",
230 entity_name, format
231 ))
232}
233
234pub fn validate_sink_options(
235 entity_name: &str,
236 format: &str,
237 options: Option<&config::SinkOptions>,
238) -> FloeResult<()> {
239 let options = match options {
240 Some(options) => options,
241 None => return Ok(()),
242 };
243 if format != "parquet" {
244 return Ok(());
245 }
246 if let Some(compression) = &options.compression {
247 match compression.as_str() {
248 "snappy" | "gzip" | "zstd" | "uncompressed" => {}
249 _ => {
250 return Err(Box::new(ConfigError(format!(
251 "entity.name={} sink.accepted.options.compression={} is unsupported (allowed: snappy, gzip, zstd, uncompressed)",
252 entity_name, compression
253 ))))
254 }
255 }
256 }
257 if let Some(row_group_size) = options.row_group_size {
258 if row_group_size == 0 {
259 return Err(Box::new(ConfigError(format!(
260 "entity.name={} sink.accepted.options.row_group_size must be greater than 0",
261 entity_name
262 ))));
263 }
264 }
265 if let Some(max_size_per_file) = options.max_size_per_file {
266 if max_size_per_file == 0 {
267 return Err(Box::new(ConfigError(format!(
268 "entity.name={} sink.accepted.options.max_size_per_file must be greater than 0",
269 entity_name
270 ))));
271 }
272 }
273 Ok(())
274}
275
276pub fn input_adapter(format: &str) -> FloeResult<&'static dyn InputAdapter> {
277 match format {
278 "csv" => Ok(io::read::csv::csv_input_adapter()),
279 "tsv" => Ok(io::read::csv::tsv_input_adapter()),
280 "fixed" => Ok(io::read::fixed_width::fixed_width_input_adapter()),
281 "orc" => Ok(io::read::orc::orc_input_adapter()),
282 "parquet" => Ok(io::read::parquet::parquet_input_adapter()),
283 "json" => Ok(io::read::json::json_input_adapter()),
284 "xlsx" => Ok(io::read::xlsx::xlsx_input_adapter()),
285 "avro" => Ok(io::read::avro::avro_input_adapter()),
286 "xml" => Ok(io::read::xml::xml_input_adapter()),
287 _ => Err(Box::new(unsupported_format_error(
288 FormatKind::Source,
289 format,
290 None,
291 ))),
292 }
293}
294
295pub fn accepted_sink_adapter(format: &str) -> FloeResult<&'static dyn AcceptedSinkAdapter> {
296 match format {
297 "parquet" => Ok(io::write::parquet::parquet_accepted_adapter()),
298 "delta" => Ok(io::write::delta::delta_accepted_adapter()),
299 "iceberg" => Ok(io::write::iceberg::iceberg_accepted_adapter()),
300 _ => Err(Box::new(unsupported_format_error(
301 FormatKind::SinkAccepted,
302 format,
303 None,
304 ))),
305 }
306}
307
308pub fn rejected_sink_adapter(format: &str) -> FloeResult<&'static dyn RejectedSinkAdapter> {
309 match format {
310 "csv" => Ok(io::write::csv::csv_rejected_adapter()),
311 _ => Err(Box::new(unsupported_format_error(
312 FormatKind::SinkRejected,
313 format,
314 None,
315 ))),
316 }
317}
318
319pub(crate) fn read_input_from_df(
320 input_file: &InputFile,
321 df: &DataFrame,
322 columns: &[config::ColumnConfig],
323 normalize_strategy: Option<&str>,
324 collect_raw: bool,
325) -> FloeResult<ReadInput> {
326 let input_columns = df
327 .get_column_names()
328 .iter()
329 .map(|name| name.to_string())
330 .collect::<Vec<_>>();
331 let typed_schema = build_typed_schema(&input_columns, columns, normalize_strategy)?;
332 let raw_df = if collect_raw {
333 Some(cast_df_to_string(df)?)
334 } else {
335 None
336 };
337 let typed_df = cast_df_to_schema(df, &typed_schema)?;
338 finalize_read_input(input_file, raw_df, typed_df, normalize_strategy)
339}
340
341pub(crate) fn finalize_read_input(
342 input_file: &InputFile,
343 mut raw_df: Option<DataFrame>,
344 mut typed_df: DataFrame,
345 normalize_strategy: Option<&str>,
346) -> FloeResult<ReadInput> {
347 if let Some(strategy) = normalize_strategy {
348 if let Some(raw_df) = raw_df.as_mut() {
349 crate::checks::normalize::normalize_dataframe_columns(raw_df, strategy)?;
350 }
351 crate::checks::normalize::normalize_dataframe_columns(&mut typed_df, strategy)?;
352 }
353 Ok(ReadInput::Data {
354 input_file: input_file.clone(),
355 raw_df,
356 typed_df,
357 })
358}
359
360pub(crate) fn build_typed_schema(
361 input_columns: &[String],
362 declared_columns: &[config::ColumnConfig],
363 normalize_strategy: Option<&str>,
364) -> FloeResult<Schema> {
365 let mut declared_types = HashMap::new();
366 for column in declared_columns {
367 declared_types.insert(
368 column.name.as_str(),
369 config::parse_data_type(&column.column_type)?,
370 );
371 }
372
373 let mut schema = Schema::with_capacity(input_columns.len());
374 for name in input_columns {
375 let normalized = if let Some(strategy) = normalize_strategy {
376 crate::checks::normalize::normalize_name(name, strategy)
377 } else {
378 name.to_string()
379 };
380 let dtype = declared_types
381 .get(normalized.as_str())
382 .cloned()
383 .unwrap_or(DataType::String);
384 schema.insert(name.as_str().into(), dtype);
385 }
386 Ok(schema)
387}
388
389pub(crate) fn cast_df_to_string(df: &DataFrame) -> FloeResult<DataFrame> {
390 cast_df_with_type(df, &DataType::String)
391}
392
393pub(crate) fn cast_df_to_schema(df: &DataFrame, schema: &Schema) -> FloeResult<DataFrame> {
394 let mut columns = Vec::with_capacity(schema.len());
395 for (name, dtype) in schema.iter() {
396 let series = df.column(name.as_str()).map_err(|err| {
397 Box::new(ConfigError(format!(
398 "input column {} not found: {err}",
399 name.as_str()
400 )))
401 })?;
402 let casted =
403 if matches!(dtype, DataType::Boolean) && matches!(series.dtype(), DataType::String) {
404 cast_string_to_bool(name.as_str(), series)?
405 } else {
406 series
407 .cast_with_options(dtype, CastOptions::NonStrict)
408 .map_err(|err| {
409 Box::new(ConfigError(format!(
410 "failed to cast input column {}: {err}",
411 name.as_str()
412 )))
413 })?
414 };
415 columns.push(casted);
416 }
417 DataFrame::new(columns).map_err(|err| {
418 Box::new(ConfigError(format!(
419 "failed to build typed dataframe: {err}"
420 ))) as Box<dyn std::error::Error + Send + Sync>
421 })
422}
423
424fn cast_string_to_bool(name: &str, series: &Column) -> FloeResult<Column> {
425 let string_values = series.as_materialized_series().str().map_err(|err| {
426 Box::new(ConfigError(format!(
427 "failed to read boolean column {} as string: {err}",
428 name
429 )))
430 })?;
431 let mut values = Vec::with_capacity(series.len());
432 for value in string_values {
433 let parsed = value.and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() {
434 "true" | "1" => Some(true),
435 "false" | "0" => Some(false),
436 _ => None,
437 });
438 values.push(parsed);
439 }
440 Ok(Series::new(name.into(), values).into())
441}
442
443fn cast_df_with_type(df: &DataFrame, dtype: &DataType) -> FloeResult<DataFrame> {
444 let mut out = df.clone();
445 let names = out
446 .get_column_names()
447 .iter()
448 .map(|name| name.to_string())
449 .collect::<Vec<_>>();
450 for name in names {
451 let series = out.column(&name).map_err(|err| {
452 Box::new(ConfigError(format!(
453 "input column {} not found: {err}",
454 name
455 )))
456 })?;
457 let casted = series
458 .cast_with_options(dtype, CastOptions::NonStrict)
459 .map_err(|err| {
460 Box::new(ConfigError(format!(
461 "failed to cast input column {}: {err}",
462 name
463 )))
464 })?;
465 let idx = out.get_column_index(&name).ok_or_else(|| {
466 Box::new(ConfigError(format!(
467 "input column {} not found for update",
468 name
469 )))
470 })?;
471 out.replace_column(idx, casted).map_err(|err| {
472 Box::new(ConfigError(format!(
473 "failed to update input column {}: {err}",
474 name
475 )))
476 })?;
477 }
478 Ok(out)
479}
480pub fn collect_row_errors(
481 raw_df: &DataFrame,
482 typed_df: &DataFrame,
483 required_cols: &[String],
484 columns: &[config::ColumnConfig],
485 track_cast_errors: bool,
486 raw_indices: &check::ColumnIndex,
487 typed_indices: &check::ColumnIndex,
488) -> FloeResult<Vec<Vec<check::RowError>>> {
489 let mut error_lists = check::not_null_errors(typed_df, required_cols, typed_indices)?;
490 if track_cast_errors {
491 let cast_errors =
492 check::cast_mismatch_errors(raw_df, typed_df, columns, raw_indices, typed_indices)?;
493 for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
494 errors.extend(cast);
495 }
496 }
497 Ok(error_lists)
498}