floe_core/run/
mod.rs

1use std::collections::{BTreeMap, HashMap};
2use std::path::{Path, PathBuf};
3use std::time::Instant;
4
5use polars::prelude::DataFrame;
6use serde_json::{Map, Value};
7
8use crate::{check, config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
9
10mod normalize;
11use normalize::{
12    normalize_dataframe_columns, normalize_schema_columns, resolve_normalize_strategy,
13};
14
15const MAX_EXAMPLES_PER_RULE: u64 = 3;
16const RULE_COUNT: usize = 4;
17const CAST_ERROR_INDEX: usize = 1;
18
19type ValidationCollect = (Vec<bool>, Vec<Option<String>>, Vec<Vec<check::RowError>>);
20
21#[derive(Debug, Clone)]
22pub struct RunOutcome {
23    pub run_id: String,
24    pub report_base_path: String,
25    pub entity_outcomes: Vec<EntityOutcome>,
26}
27
28#[derive(Debug, Clone)]
29pub struct EntityOutcome {
30    pub report: report::RunReport,
31    pub file_timings_ms: Vec<Option<u64>>,
32}
33
34pub(crate) fn validate_entities(
35    config: &config::RootConfig,
36    selected: &[String],
37) -> FloeResult<()> {
38    let missing: Vec<String> = selected
39        .iter()
40        .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
41        .cloned()
42        .collect();
43
44    if !missing.is_empty() {
45        return Err(Box::new(ConfigError(format!(
46            "entities not found: {}",
47            missing.join(", ")
48        ))));
49    }
50    Ok(())
51}
52
53pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
54    let validate_options = ValidateOptions {
55        entities: options.entities.clone(),
56    };
57    crate::validate(config_path, validate_options)?;
58    let config = config::parse_config(config_path)?;
59    if !options.entities.is_empty() {
60        validate_entities(&config, &options.entities)?;
61    }
62    let report_base_path = config.report.path.clone();
63    let started_at = report::now_rfc3339();
64    let run_id = options
65        .run_id
66        .clone()
67        .unwrap_or_else(|| report::run_id_from_timestamp(&started_at));
68    let run_timer = Instant::now();
69    let mut entity_outcomes = Vec::new();
70
71    for entity in &config.entities {
72        let input = &entity.source;
73        let input_path = Path::new(&input.path);
74        let normalize_strategy = resolve_normalize_strategy(entity)?;
75        let normalized_columns = if let Some(strategy) = normalize_strategy.as_deref() {
76            normalize_schema_columns(&entity.schema.columns, strategy)?
77        } else {
78            entity.schema.columns.clone()
79        };
80        let required_cols = required_columns(&normalized_columns);
81
82        let inputs = read_inputs(
83            entity,
84            input_path,
85            &normalized_columns,
86            normalize_strategy.as_deref(),
87        )?;
88        let resolved_files = inputs
89            .iter()
90            .map(|(path, _, _)| path.display().to_string())
91            .collect::<Vec<_>>();
92        let resolved_mode = if input_path.is_dir() {
93            report::ResolvedInputMode::Directory
94        } else {
95            report::ResolvedInputMode::File
96        };
97        let severity = match entity.policy.severity.as_str() {
98            "warn" => report::Severity::Warn,
99            "reject" => report::Severity::Reject,
100            "abort" => report::Severity::Abort,
101            severity => {
102                return Err(Box::new(ConfigError(format!(
103                    "unsupported policy severity: {severity}"
104                ))))
105            }
106        };
107
108        let track_cast_errors = !matches!(input.cast_mode.as_deref(), Some("coerce"));
109        let mut file_reports = Vec::with_capacity(inputs.len());
110        let mut file_statuses = Vec::with_capacity(inputs.len());
111        let mut totals = report::ResultsTotals {
112            files_total: 0,
113            rows_total: 0,
114            accepted_total: 0,
115            rejected_total: 0,
116            warnings_total: 0,
117            errors_total: 0,
118        };
119        let archive_enabled = entity.sink.archive.is_some();
120        let archive_dir = entity
121            .sink
122            .archive
123            .as_ref()
124            .map(|archive| PathBuf::from(&archive.path));
125
126        let mut file_timings_ms = Vec::with_capacity(inputs.len());
127        for (source_path, raw_df, mut df) in inputs {
128            let file_timer = Instant::now();
129            let source_stem = source_path
130                .file_stem()
131                .and_then(|stem| stem.to_str())
132                .unwrap_or(entity.name.as_str());
133            let (accept_rows, errors_json, error_lists) = collect_errors(
134                &raw_df,
135                &df,
136                &required_cols,
137                &normalized_columns,
138                track_cast_errors,
139            )?;
140            let row_count = raw_df.height() as u64;
141            let row_error_count = error_lists
142                .iter()
143                .filter(|errors| !errors.is_empty())
144                .count() as u64;
145            let violation_count = error_lists
146                .iter()
147                .map(|errors| errors.len() as u64)
148                .sum::<u64>();
149            let accept_count = accept_rows.iter().filter(|accepted| **accepted).count() as u64;
150            let reject_count = row_count.saturating_sub(accept_count);
151            let has_errors = row_error_count > 0;
152            let mut accepted_path = None;
153            let mut rejected_path = None;
154            let mut errors_path = None;
155            let mut archived_path = None;
156            let (rules, examples) =
157                summarize_validation(&error_lists, &normalized_columns, severity);
158
159            match entity.policy.severity.as_str() {
160                "warn" => {
161                    let output_path = write_accepted_output(entity, &mut df, source_stem)?;
162                    accepted_path = Some(output_path.display().to_string());
163                }
164                "reject" => {
165                    if has_errors {
166                        let rejected_target = validate_rejected_target(entity, "reject")?;
167
168                        let (accept_mask, reject_mask) = check::build_row_masks(&accept_rows);
169                        let mut accepted_df = df.filter(&accept_mask).map_err(|err| {
170                            Box::new(ConfigError(format!(
171                                "failed to filter accepted rows: {err}"
172                            )))
173                        })?;
174                        let mut rejected_df = df.filter(&reject_mask).map_err(|err| {
175                            Box::new(ConfigError(format!(
176                                "failed to filter rejected rows: {err}"
177                            )))
178                        })?;
179                        append_rejection_columns(&mut rejected_df, &errors_json, false)?;
180
181                        let output_path =
182                            write_accepted_output(entity, &mut accepted_df, source_stem)?;
183                        accepted_path = Some(output_path.display().to_string());
184                        let rejected_path_buf = io::write::write_rejected_csv(
185                            &mut rejected_df,
186                            &rejected_target.path,
187                            source_stem,
188                        )?;
189                        rejected_path = Some(rejected_path_buf.display().to_string());
190                    } else {
191                        let output_path = write_accepted_output(entity, &mut df, source_stem)?;
192                        accepted_path = Some(output_path.display().to_string());
193                    }
194                }
195                "abort" => {
196                    if has_errors {
197                        let rejected_target = validate_rejected_target(entity, "abort")?;
198                        let rejected_path_buf =
199                            io::write::write_rejected_raw(&source_path, &rejected_target.path)?;
200                        let report_path = io::write::write_error_report(
201                            &rejected_target.path,
202                            source_stem,
203                            &errors_json,
204                        )?;
205                        rejected_path = Some(rejected_path_buf.display().to_string());
206                        errors_path = Some(report_path.display().to_string());
207                    } else {
208                        let output_path = write_accepted_output(entity, &mut df, source_stem)?;
209                        accepted_path = Some(output_path.display().to_string());
210                    }
211                }
212                severity => {
213                    return Err(Box::new(ConfigError(format!(
214                        "unsupported policy severity: {severity}"
215                    ))))
216                }
217            }
218
219            if archive_enabled {
220                if let Some(dir) = &archive_dir {
221                    let archived_path_buf = io::write::archive_input(&source_path, dir)?;
222                    archived_path = Some(archived_path_buf.display().to_string());
223                }
224            }
225
226            let (status, accepted_count, rejected_count, errors, warnings) =
227                match entity.policy.severity.as_str() {
228                    "warn" => (
229                        report::FileStatus::Success,
230                        row_count,
231                        0,
232                        0,
233                        violation_count,
234                    ),
235                    "reject" => {
236                        if has_errors {
237                            (
238                                report::FileStatus::Rejected,
239                                accept_count,
240                                reject_count,
241                                violation_count,
242                                0,
243                            )
244                        } else {
245                            (report::FileStatus::Success, row_count, 0, 0, 0)
246                        }
247                    }
248                    "abort" => {
249                        if has_errors {
250                            (
251                                report::FileStatus::Aborted,
252                                0,
253                                row_count,
254                                violation_count,
255                                0,
256                            )
257                        } else {
258                            (report::FileStatus::Success, row_count, 0, 0, 0)
259                        }
260                    }
261                    _ => unreachable!("severity validated earlier"),
262                };
263
264            let file_report = report::FileReport {
265                input_file: source_path.display().to_string(),
266                status,
267                row_count,
268                accepted_count,
269                rejected_count,
270                output: report::FileOutput {
271                    accepted_path,
272                    rejected_path,
273                    errors_path,
274                    archived_path,
275                },
276                validation: report::FileValidation {
277                    errors,
278                    warnings,
279                    rules,
280                    examples,
281                },
282            };
283
284            totals.rows_total += row_count;
285            totals.accepted_total += accepted_count;
286            totals.rejected_total += rejected_count;
287            totals.errors_total += errors;
288            totals.warnings_total += warnings;
289            file_statuses.push(status);
290            file_reports.push(file_report);
291            file_timings_ms.push(Some(file_timer.elapsed().as_millis() as u64));
292        }
293
294        totals.files_total = file_reports.len() as u64;
295
296        let (mut run_status, exit_code) = report::compute_run_outcome(&file_statuses);
297        if run_status == report::RunStatus::Success && totals.warnings_total > 0 {
298            run_status = report::RunStatus::SuccessWithWarnings;
299        }
300
301        let report_dir = Path::new(&config.report.path);
302        let report_path = report::ReportWriter::report_path(report_dir, &run_id, &entity.name);
303        let finished_at = report::now_rfc3339();
304        let duration_ms = run_timer.elapsed().as_millis() as u64;
305        let run_report =
306            report::RunReport {
307                spec_version: config.version.clone(),
308                tool: report::ToolInfo {
309                    name: "floe".to_string(),
310                    version: env!("CARGO_PKG_VERSION").to_string(),
311                    git: None,
312                },
313                run: report::RunInfo {
314                    run_id: run_id.clone(),
315                    started_at: started_at.clone(),
316                    finished_at,
317                    duration_ms,
318                    status: run_status,
319                    exit_code,
320                },
321                config: report::ConfigEcho {
322                    path: config_path.display().to_string(),
323                    version: config.version.clone(),
324                    metadata: config.metadata.as_ref().map(project_metadata_json),
325                },
326                entity: report::EntityEcho {
327                    name: entity.name.clone(),
328                    metadata: entity.metadata.as_ref().map(entity_metadata_json),
329                },
330                source: report::SourceEcho {
331                    format: input.format.clone(),
332                    path: input.path.clone(),
333                    options: input.options.as_ref().map(source_options_json),
334                    cast_mode: input.cast_mode.clone(),
335                    read_plan: report::SourceReadPlan::RawAndTyped,
336                    resolved_inputs: report::ResolvedInputs {
337                        mode: resolved_mode,
338                        file_count: resolved_files.len() as u64,
339                        files: resolved_files,
340                    },
341                },
342                sink: report::SinkEcho {
343                    accepted: report::SinkTargetEcho {
344                        format: entity.sink.accepted.format.clone(),
345                        path: entity.sink.accepted.path.clone(),
346                    },
347                    rejected: entity.sink.rejected.as_ref().map(|rejected| {
348                        report::SinkTargetEcho {
349                            format: rejected.format.clone(),
350                            path: rejected.path.clone(),
351                        }
352                    }),
353                    archive: report::SinkArchiveEcho {
354                        enabled: entity.sink.archive.is_some(),
355                        path: entity
356                            .sink
357                            .archive
358                            .as_ref()
359                            .map(|archive| archive.path.clone()),
360                    },
361                },
362                report: report::ReportEcho {
363                    path: config.report.path.clone(),
364                    report_file: report_path.display().to_string(),
365                },
366                policy: report::PolicyEcho { severity },
367                results: totals,
368                files: file_reports,
369            };
370        report::ReportWriter::write_report(report_dir, &run_id, &entity.name, &run_report)?;
371        entity_outcomes.push(EntityOutcome {
372            report: run_report,
373            file_timings_ms,
374        });
375    }
376
377    Ok(RunOutcome {
378        run_id,
379        report_base_path,
380        entity_outcomes,
381    })
382}
383
384fn required_columns(columns: &[config::ColumnConfig]) -> Vec<String> {
385    columns
386        .iter()
387        .filter(|col| col.nullable == Some(false))
388        .map(|col| col.name.clone())
389        .collect()
390}
391
392fn read_inputs(
393    entity: &config::EntityConfig,
394    input_path: &Path,
395    columns: &[config::ColumnConfig],
396    normalize_strategy: Option<&str>,
397) -> FloeResult<Vec<(PathBuf, DataFrame, DataFrame)>> {
398    let input = &entity.source;
399    match input.format.as_str() {
400        "csv" => {
401            let default_options = config::SourceOptions::default();
402            let source_options = input.options.as_ref().unwrap_or(&default_options);
403            let normalized_schema = config::SchemaConfig {
404                normalize_columns: None,
405                columns: columns.to_vec(),
406            };
407            let typed_schema = normalized_schema.to_polars_schema()?;
408            let raw_schema = normalized_schema.to_polars_string_schema()?;
409            let files = io::read_csv::list_csv_files(input_path)?;
410            let mut inputs = Vec::with_capacity(files.len());
411            let raw_plan = io::read_csv::CsvReadPlan::strict(raw_schema);
412            let typed_plan = io::read_csv::CsvReadPlan::permissive(typed_schema);
413            for path in files {
414                let mut raw_df = io::read_csv::read_csv_file(&path, source_options, &raw_plan)?;
415                let mut typed_df = io::read_csv::read_csv_file(&path, source_options, &typed_plan)?;
416                if let Some(strategy) = normalize_strategy {
417                    normalize_dataframe_columns(&mut raw_df, strategy)?;
418                    normalize_dataframe_columns(&mut typed_df, strategy)?;
419                }
420                inputs.push((path, raw_df, typed_df));
421            }
422            Ok(inputs)
423        }
424        format => Err(Box::new(ConfigError(format!(
425            "unsupported source format for now: {format}"
426        )))),
427    }
428}
429
430fn collect_errors(
431    raw_df: &DataFrame,
432    typed_df: &DataFrame,
433    required_cols: &[String],
434    columns: &[config::ColumnConfig],
435    track_cast_errors: bool,
436) -> FloeResult<ValidationCollect> {
437    let mut error_lists = check::not_null_errors(typed_df, required_cols)?;
438    if track_cast_errors {
439        let cast_errors = check::cast_mismatch_errors(raw_df, typed_df, columns)?;
440        for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
441            errors.extend(cast);
442        }
443    }
444    let unique_errors = check::unique_errors(typed_df, columns)?;
445    for (errors, unique) in error_lists.iter_mut().zip(unique_errors) {
446        errors.extend(unique);
447    }
448    let (accept_rows, errors_json) = check::build_error_state(&error_lists);
449    Ok((accept_rows, errors_json, error_lists))
450}
451
452fn write_accepted_output(
453    entity: &config::EntityConfig,
454    df: &mut DataFrame,
455    source_stem: &str,
456) -> FloeResult<PathBuf> {
457    match entity.sink.accepted.format.as_str() {
458        "parquet" => {
459            let output_path =
460                io::write::write_parquet(df, &entity.sink.accepted.path, source_stem)?;
461            Ok(output_path)
462        }
463        format => Err(Box::new(ConfigError(format!(
464            "unsupported sink format for now: {format}"
465        )))),
466    }
467}
468
469fn validate_rejected_target<'a>(
470    entity: &'a config::EntityConfig,
471    severity: &str,
472) -> FloeResult<&'a config::SinkTarget> {
473    let rejected_target = entity.sink.rejected.as_ref().ok_or_else(|| {
474        Box::new(ConfigError(format!(
475            "sink.rejected is required for {severity} severity"
476        )))
477    })?;
478    match rejected_target.format.as_str() {
479        "csv" => Ok(rejected_target),
480        format => Err(Box::new(ConfigError(format!(
481            "unsupported rejected sink format for now: {format}"
482        )))),
483    }
484}
485
486fn append_rejection_columns(
487    df: &mut DataFrame,
488    errors_per_row: &[Option<String>],
489    include_all_rows: bool,
490) -> FloeResult<()> {
491    let (row_index, errors) = check::rejected_error_columns(errors_per_row, include_all_rows);
492    df.with_column(row_index).map_err(|err| {
493        Box::new(ConfigError(format!(
494            "failed to add __floe_row_index: {err}"
495        )))
496    })?;
497    df.with_column(errors)
498        .map_err(|err| Box::new(ConfigError(format!("failed to add __floe_errors: {err}"))))?;
499    Ok(())
500}
501
502fn summarize_validation(
503    errors_per_row: &[Vec<check::RowError>],
504    columns: &[config::ColumnConfig],
505    severity: report::Severity,
506) -> (Vec<report::RuleSummary>, report::ExampleSummary) {
507    if errors_per_row.iter().all(|errors| errors.is_empty()) {
508        return (
509            Vec::new(),
510            report::ExampleSummary {
511                max_examples_per_rule: MAX_EXAMPLES_PER_RULE,
512                items: Vec::new(),
513            },
514        );
515    }
516
517    let mut column_types = HashMap::new();
518    for column in columns {
519        column_types.insert(column.name.clone(), column.column_type.clone());
520    }
521
522    let mut accumulators = vec![RuleAccumulator::default(); RULE_COUNT];
523    let mut examples: Vec<Vec<report::ValidationExample>> = vec![Vec::new(); RULE_COUNT];
524
525    for (row_idx, errors) in errors_per_row.iter().enumerate() {
526        for error in errors {
527            let idx = rule_index(&error.rule);
528            let accumulator = &mut accumulators[idx];
529            accumulator.violations += 1;
530            let target_type = if idx == CAST_ERROR_INDEX {
531                column_types.get(&error.column).cloned()
532            } else {
533                None
534            };
535            let entry = accumulator
536                .columns
537                .entry(error.column.clone())
538                .or_insert_with(|| ColumnAccumulator {
539                    violations: 0,
540                    target_type,
541                });
542            entry.violations += 1;
543
544            if examples[idx].len() < MAX_EXAMPLES_PER_RULE as usize {
545                examples[idx].push(report::ValidationExample {
546                    rule: rule_from_index(idx),
547                    column: error.column.clone(),
548                    row_index: row_idx as u64,
549                    message: error.message.clone(),
550                });
551            }
552        }
553    }
554
555    let mut rules = Vec::new();
556    for (idx, accumulator) in accumulators.iter().enumerate() {
557        if accumulator.violations == 0 {
558            continue;
559        }
560        let mut columns = Vec::with_capacity(accumulator.columns.len());
561        for (name, column_acc) in &accumulator.columns {
562            columns.push(report::ColumnSummary {
563                column: name.clone(),
564                violations: column_acc.violations,
565                target_type: column_acc.target_type.clone(),
566            });
567        }
568        rules.push(report::RuleSummary {
569            rule: rule_from_index(idx),
570            severity,
571            violations: accumulator.violations,
572            columns,
573        });
574    }
575
576    let mut items = Vec::new();
577    for example_list in &examples {
578        items.extend(example_list.iter().cloned());
579    }
580
581    (
582        rules,
583        report::ExampleSummary {
584            max_examples_per_rule: MAX_EXAMPLES_PER_RULE,
585            items,
586        },
587    )
588}
589
590#[derive(Debug, Default, Clone)]
591struct RuleAccumulator {
592    violations: u64,
593    columns: BTreeMap<String, ColumnAccumulator>,
594}
595
596#[derive(Debug, Default, Clone)]
597struct ColumnAccumulator {
598    violations: u64,
599    target_type: Option<String>,
600}
601
602fn rule_index(rule: &str) -> usize {
603    match rule {
604        "not_null" => 0,
605        "cast_error" => 1,
606        "unique" => 2,
607        "schema_error" => 3,
608        _ => 3,
609    }
610}
611
612fn rule_from_index(idx: usize) -> report::RuleName {
613    match idx {
614        0 => report::RuleName::NotNull,
615        1 => report::RuleName::CastError,
616        2 => report::RuleName::Unique,
617        _ => report::RuleName::SchemaError,
618    }
619}
620
621fn project_metadata_json(meta: &config::ProjectMetadata) -> Value {
622    let mut map = Map::new();
623    map.insert("project".to_string(), Value::String(meta.project.clone()));
624    if let Some(description) = &meta.description {
625        map.insert(
626            "description".to_string(),
627            Value::String(description.clone()),
628        );
629    }
630    if let Some(owner) = &meta.owner {
631        map.insert("owner".to_string(), Value::String(owner.clone()));
632    }
633    if let Some(tags) = &meta.tags {
634        map.insert("tags".to_string(), string_array(tags));
635    }
636    Value::Object(map)
637}
638
639fn entity_metadata_json(meta: &config::EntityMetadata) -> Value {
640    let mut map = Map::new();
641    if let Some(data_product) = &meta.data_product {
642        map.insert(
643            "data_product".to_string(),
644            Value::String(data_product.clone()),
645        );
646    }
647    if let Some(domain) = &meta.domain {
648        map.insert("domain".to_string(), Value::String(domain.clone()));
649    }
650    if let Some(owner) = &meta.owner {
651        map.insert("owner".to_string(), Value::String(owner.clone()));
652    }
653    if let Some(description) = &meta.description {
654        map.insert(
655            "description".to_string(),
656            Value::String(description.clone()),
657        );
658    }
659    if let Some(tags) = &meta.tags {
660        map.insert("tags".to_string(), string_array(tags));
661    }
662    Value::Object(map)
663}
664
665fn source_options_json(options: &config::SourceOptions) -> Value {
666    let mut map = Map::new();
667    if let Some(header) = options.header {
668        map.insert("header".to_string(), Value::Bool(header));
669    }
670    if let Some(separator) = &options.separator {
671        map.insert("separator".to_string(), Value::String(separator.clone()));
672    }
673    if let Some(encoding) = &options.encoding {
674        map.insert("encoding".to_string(), Value::String(encoding.clone()));
675    }
676    if let Some(null_values) = &options.null_values {
677        map.insert("null_values".to_string(), string_array(null_values));
678    }
679    Value::Object(map)
680}
681
682fn string_array(values: &[String]) -> Value {
683    Value::Array(values.iter().cloned().map(Value::String).collect())
684}