1use std::collections::{BTreeMap, HashMap};
2use std::path::{Path, PathBuf};
3use std::time::Instant;
4
5use polars::prelude::DataFrame;
6use serde_json::{Map, Value};
7
8use crate::{check, config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
9
10mod normalize;
11use normalize::{
12 normalize_dataframe_columns, normalize_schema_columns, resolve_normalize_strategy,
13};
14
15const MAX_EXAMPLES_PER_RULE: u64 = 3;
16const RULE_COUNT: usize = 4;
17const CAST_ERROR_INDEX: usize = 1;
18
19type ValidationCollect = (Vec<bool>, Vec<Option<String>>, Vec<Vec<check::RowError>>);
20
21#[derive(Debug, Clone)]
22pub struct RunOutcome {
23 pub run_id: String,
24 pub report_base_path: String,
25 pub entity_outcomes: Vec<EntityOutcome>,
26}
27
28#[derive(Debug, Clone)]
29pub struct EntityOutcome {
30 pub report: report::RunReport,
31 pub file_timings_ms: Vec<Option<u64>>,
32}
33
34pub(crate) fn validate_entities(
35 config: &config::RootConfig,
36 selected: &[String],
37) -> FloeResult<()> {
38 let missing: Vec<String> = selected
39 .iter()
40 .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
41 .cloned()
42 .collect();
43
44 if !missing.is_empty() {
45 return Err(Box::new(ConfigError(format!(
46 "entities not found: {}",
47 missing.join(", ")
48 ))));
49 }
50 Ok(())
51}
52
53pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
54 let validate_options = ValidateOptions {
55 entities: options.entities.clone(),
56 };
57 crate::validate(config_path, validate_options)?;
58 let config = config::parse_config(config_path)?;
59 if !options.entities.is_empty() {
60 validate_entities(&config, &options.entities)?;
61 }
62 let report_base_path = config.report.path.clone();
63 let started_at = report::now_rfc3339();
64 let run_id = options
65 .run_id
66 .clone()
67 .unwrap_or_else(|| report::run_id_from_timestamp(&started_at));
68 let run_timer = Instant::now();
69 let mut entity_outcomes = Vec::new();
70
71 for entity in &config.entities {
72 let input = &entity.source;
73 let input_path = Path::new(&input.path);
74 let normalize_strategy = resolve_normalize_strategy(entity)?;
75 let normalized_columns = if let Some(strategy) = normalize_strategy.as_deref() {
76 normalize_schema_columns(&entity.schema.columns, strategy)?
77 } else {
78 entity.schema.columns.clone()
79 };
80 let required_cols = required_columns(&normalized_columns);
81
82 let inputs = read_inputs(
83 entity,
84 input_path,
85 &normalized_columns,
86 normalize_strategy.as_deref(),
87 )?;
88 let resolved_files = inputs
89 .iter()
90 .map(|(path, _, _)| path.display().to_string())
91 .collect::<Vec<_>>();
92 let resolved_mode = if input_path.is_dir() {
93 report::ResolvedInputMode::Directory
94 } else {
95 report::ResolvedInputMode::File
96 };
97 let severity = match entity.policy.severity.as_str() {
98 "warn" => report::Severity::Warn,
99 "reject" => report::Severity::Reject,
100 "abort" => report::Severity::Abort,
101 severity => {
102 return Err(Box::new(ConfigError(format!(
103 "unsupported policy severity: {severity}"
104 ))))
105 }
106 };
107
108 let track_cast_errors = !matches!(input.cast_mode.as_deref(), Some("coerce"));
109 let mut file_reports = Vec::with_capacity(inputs.len());
110 let mut file_statuses = Vec::with_capacity(inputs.len());
111 let mut totals = report::ResultsTotals {
112 files_total: 0,
113 rows_total: 0,
114 accepted_total: 0,
115 rejected_total: 0,
116 warnings_total: 0,
117 errors_total: 0,
118 };
119 let archive_enabled = entity.sink.archive.is_some();
120 let archive_dir = entity
121 .sink
122 .archive
123 .as_ref()
124 .map(|archive| PathBuf::from(&archive.path));
125
126 let mut file_timings_ms = Vec::with_capacity(inputs.len());
127 for (source_path, raw_df, mut df) in inputs {
128 let file_timer = Instant::now();
129 let source_stem = source_path
130 .file_stem()
131 .and_then(|stem| stem.to_str())
132 .unwrap_or(entity.name.as_str());
133 let (accept_rows, errors_json, error_lists) = collect_errors(
134 &raw_df,
135 &df,
136 &required_cols,
137 &normalized_columns,
138 track_cast_errors,
139 )?;
140 let row_count = raw_df.height() as u64;
141 let row_error_count = error_lists
142 .iter()
143 .filter(|errors| !errors.is_empty())
144 .count() as u64;
145 let violation_count = error_lists
146 .iter()
147 .map(|errors| errors.len() as u64)
148 .sum::<u64>();
149 let accept_count = accept_rows.iter().filter(|accepted| **accepted).count() as u64;
150 let reject_count = row_count.saturating_sub(accept_count);
151 let has_errors = row_error_count > 0;
152 let mut accepted_path = None;
153 let mut rejected_path = None;
154 let mut errors_path = None;
155 let mut archived_path = None;
156 let (rules, examples) =
157 summarize_validation(&error_lists, &normalized_columns, severity);
158
159 match entity.policy.severity.as_str() {
160 "warn" => {
161 let output_path = write_accepted_output(entity, &mut df, source_stem)?;
162 accepted_path = Some(output_path.display().to_string());
163 }
164 "reject" => {
165 if has_errors {
166 let rejected_target = validate_rejected_target(entity, "reject")?;
167
168 let (accept_mask, reject_mask) = check::build_row_masks(&accept_rows);
169 let mut accepted_df = df.filter(&accept_mask).map_err(|err| {
170 Box::new(ConfigError(format!(
171 "failed to filter accepted rows: {err}"
172 )))
173 })?;
174 let mut rejected_df = df.filter(&reject_mask).map_err(|err| {
175 Box::new(ConfigError(format!(
176 "failed to filter rejected rows: {err}"
177 )))
178 })?;
179 append_rejection_columns(&mut rejected_df, &errors_json, false)?;
180
181 let output_path =
182 write_accepted_output(entity, &mut accepted_df, source_stem)?;
183 accepted_path = Some(output_path.display().to_string());
184 let rejected_path_buf = io::write::write_rejected_csv(
185 &mut rejected_df,
186 &rejected_target.path,
187 source_stem,
188 )?;
189 rejected_path = Some(rejected_path_buf.display().to_string());
190 } else {
191 let output_path = write_accepted_output(entity, &mut df, source_stem)?;
192 accepted_path = Some(output_path.display().to_string());
193 }
194 }
195 "abort" => {
196 if has_errors {
197 let rejected_target = validate_rejected_target(entity, "abort")?;
198 let rejected_path_buf =
199 io::write::write_rejected_raw(&source_path, &rejected_target.path)?;
200 let report_path = io::write::write_error_report(
201 &rejected_target.path,
202 source_stem,
203 &errors_json,
204 )?;
205 rejected_path = Some(rejected_path_buf.display().to_string());
206 errors_path = Some(report_path.display().to_string());
207 } else {
208 let output_path = write_accepted_output(entity, &mut df, source_stem)?;
209 accepted_path = Some(output_path.display().to_string());
210 }
211 }
212 severity => {
213 return Err(Box::new(ConfigError(format!(
214 "unsupported policy severity: {severity}"
215 ))))
216 }
217 }
218
219 if archive_enabled {
220 if let Some(dir) = &archive_dir {
221 let archived_path_buf = io::write::archive_input(&source_path, dir)?;
222 archived_path = Some(archived_path_buf.display().to_string());
223 }
224 }
225
226 let (status, accepted_count, rejected_count, errors, warnings) =
227 match entity.policy.severity.as_str() {
228 "warn" => (
229 report::FileStatus::Success,
230 row_count,
231 0,
232 0,
233 violation_count,
234 ),
235 "reject" => {
236 if has_errors {
237 (
238 report::FileStatus::Rejected,
239 accept_count,
240 reject_count,
241 violation_count,
242 0,
243 )
244 } else {
245 (report::FileStatus::Success, row_count, 0, 0, 0)
246 }
247 }
248 "abort" => {
249 if has_errors {
250 (
251 report::FileStatus::Aborted,
252 0,
253 row_count,
254 violation_count,
255 0,
256 )
257 } else {
258 (report::FileStatus::Success, row_count, 0, 0, 0)
259 }
260 }
261 _ => unreachable!("severity validated earlier"),
262 };
263
264 let file_report = report::FileReport {
265 input_file: source_path.display().to_string(),
266 status,
267 row_count,
268 accepted_count,
269 rejected_count,
270 output: report::FileOutput {
271 accepted_path,
272 rejected_path,
273 errors_path,
274 archived_path,
275 },
276 validation: report::FileValidation {
277 errors,
278 warnings,
279 rules,
280 examples,
281 },
282 };
283
284 totals.rows_total += row_count;
285 totals.accepted_total += accepted_count;
286 totals.rejected_total += rejected_count;
287 totals.errors_total += errors;
288 totals.warnings_total += warnings;
289 file_statuses.push(status);
290 file_reports.push(file_report);
291 file_timings_ms.push(Some(file_timer.elapsed().as_millis() as u64));
292 }
293
294 totals.files_total = file_reports.len() as u64;
295
296 let (mut run_status, exit_code) = report::compute_run_outcome(&file_statuses);
297 if run_status == report::RunStatus::Success && totals.warnings_total > 0 {
298 run_status = report::RunStatus::SuccessWithWarnings;
299 }
300
301 let report_dir = Path::new(&config.report.path);
302 let report_path = report::ReportWriter::report_path(report_dir, &run_id, &entity.name);
303 let finished_at = report::now_rfc3339();
304 let duration_ms = run_timer.elapsed().as_millis() as u64;
305 let run_report =
306 report::RunReport {
307 spec_version: config.version.clone(),
308 tool: report::ToolInfo {
309 name: "floe".to_string(),
310 version: env!("CARGO_PKG_VERSION").to_string(),
311 git: None,
312 },
313 run: report::RunInfo {
314 run_id: run_id.clone(),
315 started_at: started_at.clone(),
316 finished_at,
317 duration_ms,
318 status: run_status,
319 exit_code,
320 },
321 config: report::ConfigEcho {
322 path: config_path.display().to_string(),
323 version: config.version.clone(),
324 metadata: config.metadata.as_ref().map(project_metadata_json),
325 },
326 entity: report::EntityEcho {
327 name: entity.name.clone(),
328 metadata: entity.metadata.as_ref().map(entity_metadata_json),
329 },
330 source: report::SourceEcho {
331 format: input.format.clone(),
332 path: input.path.clone(),
333 options: input.options.as_ref().map(source_options_json),
334 cast_mode: input.cast_mode.clone(),
335 read_plan: report::SourceReadPlan::RawAndTyped,
336 resolved_inputs: report::ResolvedInputs {
337 mode: resolved_mode,
338 file_count: resolved_files.len() as u64,
339 files: resolved_files,
340 },
341 },
342 sink: report::SinkEcho {
343 accepted: report::SinkTargetEcho {
344 format: entity.sink.accepted.format.clone(),
345 path: entity.sink.accepted.path.clone(),
346 },
347 rejected: entity.sink.rejected.as_ref().map(|rejected| {
348 report::SinkTargetEcho {
349 format: rejected.format.clone(),
350 path: rejected.path.clone(),
351 }
352 }),
353 archive: report::SinkArchiveEcho {
354 enabled: entity.sink.archive.is_some(),
355 path: entity
356 .sink
357 .archive
358 .as_ref()
359 .map(|archive| archive.path.clone()),
360 },
361 },
362 report: report::ReportEcho {
363 path: config.report.path.clone(),
364 report_file: report_path.display().to_string(),
365 },
366 policy: report::PolicyEcho { severity },
367 results: totals,
368 files: file_reports,
369 };
370 report::ReportWriter::write_report(report_dir, &run_id, &entity.name, &run_report)?;
371 entity_outcomes.push(EntityOutcome {
372 report: run_report,
373 file_timings_ms,
374 });
375 }
376
377 Ok(RunOutcome {
378 run_id,
379 report_base_path,
380 entity_outcomes,
381 })
382}
383
384fn required_columns(columns: &[config::ColumnConfig]) -> Vec<String> {
385 columns
386 .iter()
387 .filter(|col| col.nullable == Some(false))
388 .map(|col| col.name.clone())
389 .collect()
390}
391
392fn read_inputs(
393 entity: &config::EntityConfig,
394 input_path: &Path,
395 columns: &[config::ColumnConfig],
396 normalize_strategy: Option<&str>,
397) -> FloeResult<Vec<(PathBuf, DataFrame, DataFrame)>> {
398 let input = &entity.source;
399 match input.format.as_str() {
400 "csv" => {
401 let default_options = config::SourceOptions::default();
402 let source_options = input.options.as_ref().unwrap_or(&default_options);
403 let normalized_schema = config::SchemaConfig {
404 normalize_columns: None,
405 columns: columns.to_vec(),
406 };
407 let typed_schema = normalized_schema.to_polars_schema()?;
408 let raw_schema = normalized_schema.to_polars_string_schema()?;
409 let files = io::read_csv::list_csv_files(input_path)?;
410 let mut inputs = Vec::with_capacity(files.len());
411 let raw_plan = io::read_csv::CsvReadPlan::strict(raw_schema);
412 let typed_plan = io::read_csv::CsvReadPlan::permissive(typed_schema);
413 for path in files {
414 let mut raw_df = io::read_csv::read_csv_file(&path, source_options, &raw_plan)?;
415 let mut typed_df = io::read_csv::read_csv_file(&path, source_options, &typed_plan)?;
416 if let Some(strategy) = normalize_strategy {
417 normalize_dataframe_columns(&mut raw_df, strategy)?;
418 normalize_dataframe_columns(&mut typed_df, strategy)?;
419 }
420 inputs.push((path, raw_df, typed_df));
421 }
422 Ok(inputs)
423 }
424 format => Err(Box::new(ConfigError(format!(
425 "unsupported source format for now: {format}"
426 )))),
427 }
428}
429
430fn collect_errors(
431 raw_df: &DataFrame,
432 typed_df: &DataFrame,
433 required_cols: &[String],
434 columns: &[config::ColumnConfig],
435 track_cast_errors: bool,
436) -> FloeResult<ValidationCollect> {
437 let mut error_lists = check::not_null_errors(typed_df, required_cols)?;
438 if track_cast_errors {
439 let cast_errors = check::cast_mismatch_errors(raw_df, typed_df, columns)?;
440 for (errors, cast) in error_lists.iter_mut().zip(cast_errors) {
441 errors.extend(cast);
442 }
443 }
444 let unique_errors = check::unique_errors(typed_df, columns)?;
445 for (errors, unique) in error_lists.iter_mut().zip(unique_errors) {
446 errors.extend(unique);
447 }
448 let (accept_rows, errors_json) = check::build_error_state(&error_lists);
449 Ok((accept_rows, errors_json, error_lists))
450}
451
452fn write_accepted_output(
453 entity: &config::EntityConfig,
454 df: &mut DataFrame,
455 source_stem: &str,
456) -> FloeResult<PathBuf> {
457 match entity.sink.accepted.format.as_str() {
458 "parquet" => {
459 let output_path =
460 io::write::write_parquet(df, &entity.sink.accepted.path, source_stem)?;
461 Ok(output_path)
462 }
463 format => Err(Box::new(ConfigError(format!(
464 "unsupported sink format for now: {format}"
465 )))),
466 }
467}
468
469fn validate_rejected_target<'a>(
470 entity: &'a config::EntityConfig,
471 severity: &str,
472) -> FloeResult<&'a config::SinkTarget> {
473 let rejected_target = entity.sink.rejected.as_ref().ok_or_else(|| {
474 Box::new(ConfigError(format!(
475 "sink.rejected is required for {severity} severity"
476 )))
477 })?;
478 match rejected_target.format.as_str() {
479 "csv" => Ok(rejected_target),
480 format => Err(Box::new(ConfigError(format!(
481 "unsupported rejected sink format for now: {format}"
482 )))),
483 }
484}
485
486fn append_rejection_columns(
487 df: &mut DataFrame,
488 errors_per_row: &[Option<String>],
489 include_all_rows: bool,
490) -> FloeResult<()> {
491 let (row_index, errors) = check::rejected_error_columns(errors_per_row, include_all_rows);
492 df.with_column(row_index).map_err(|err| {
493 Box::new(ConfigError(format!(
494 "failed to add __floe_row_index: {err}"
495 )))
496 })?;
497 df.with_column(errors)
498 .map_err(|err| Box::new(ConfigError(format!("failed to add __floe_errors: {err}"))))?;
499 Ok(())
500}
501
502fn summarize_validation(
503 errors_per_row: &[Vec<check::RowError>],
504 columns: &[config::ColumnConfig],
505 severity: report::Severity,
506) -> (Vec<report::RuleSummary>, report::ExampleSummary) {
507 if errors_per_row.iter().all(|errors| errors.is_empty()) {
508 return (
509 Vec::new(),
510 report::ExampleSummary {
511 max_examples_per_rule: MAX_EXAMPLES_PER_RULE,
512 items: Vec::new(),
513 },
514 );
515 }
516
517 let mut column_types = HashMap::new();
518 for column in columns {
519 column_types.insert(column.name.clone(), column.column_type.clone());
520 }
521
522 let mut accumulators = vec![RuleAccumulator::default(); RULE_COUNT];
523 let mut examples: Vec<Vec<report::ValidationExample>> = vec![Vec::new(); RULE_COUNT];
524
525 for (row_idx, errors) in errors_per_row.iter().enumerate() {
526 for error in errors {
527 let idx = rule_index(&error.rule);
528 let accumulator = &mut accumulators[idx];
529 accumulator.violations += 1;
530 let target_type = if idx == CAST_ERROR_INDEX {
531 column_types.get(&error.column).cloned()
532 } else {
533 None
534 };
535 let entry = accumulator
536 .columns
537 .entry(error.column.clone())
538 .or_insert_with(|| ColumnAccumulator {
539 violations: 0,
540 target_type,
541 });
542 entry.violations += 1;
543
544 if examples[idx].len() < MAX_EXAMPLES_PER_RULE as usize {
545 examples[idx].push(report::ValidationExample {
546 rule: rule_from_index(idx),
547 column: error.column.clone(),
548 row_index: row_idx as u64,
549 message: error.message.clone(),
550 });
551 }
552 }
553 }
554
555 let mut rules = Vec::new();
556 for (idx, accumulator) in accumulators.iter().enumerate() {
557 if accumulator.violations == 0 {
558 continue;
559 }
560 let mut columns = Vec::with_capacity(accumulator.columns.len());
561 for (name, column_acc) in &accumulator.columns {
562 columns.push(report::ColumnSummary {
563 column: name.clone(),
564 violations: column_acc.violations,
565 target_type: column_acc.target_type.clone(),
566 });
567 }
568 rules.push(report::RuleSummary {
569 rule: rule_from_index(idx),
570 severity,
571 violations: accumulator.violations,
572 columns,
573 });
574 }
575
576 let mut items = Vec::new();
577 for example_list in &examples {
578 items.extend(example_list.iter().cloned());
579 }
580
581 (
582 rules,
583 report::ExampleSummary {
584 max_examples_per_rule: MAX_EXAMPLES_PER_RULE,
585 items,
586 },
587 )
588}
589
590#[derive(Debug, Default, Clone)]
591struct RuleAccumulator {
592 violations: u64,
593 columns: BTreeMap<String, ColumnAccumulator>,
594}
595
596#[derive(Debug, Default, Clone)]
597struct ColumnAccumulator {
598 violations: u64,
599 target_type: Option<String>,
600}
601
602fn rule_index(rule: &str) -> usize {
603 match rule {
604 "not_null" => 0,
605 "cast_error" => 1,
606 "unique" => 2,
607 "schema_error" => 3,
608 _ => 3,
609 }
610}
611
612fn rule_from_index(idx: usize) -> report::RuleName {
613 match idx {
614 0 => report::RuleName::NotNull,
615 1 => report::RuleName::CastError,
616 2 => report::RuleName::Unique,
617 _ => report::RuleName::SchemaError,
618 }
619}
620
621fn project_metadata_json(meta: &config::ProjectMetadata) -> Value {
622 let mut map = Map::new();
623 map.insert("project".to_string(), Value::String(meta.project.clone()));
624 if let Some(description) = &meta.description {
625 map.insert(
626 "description".to_string(),
627 Value::String(description.clone()),
628 );
629 }
630 if let Some(owner) = &meta.owner {
631 map.insert("owner".to_string(), Value::String(owner.clone()));
632 }
633 if let Some(tags) = &meta.tags {
634 map.insert("tags".to_string(), string_array(tags));
635 }
636 Value::Object(map)
637}
638
639fn entity_metadata_json(meta: &config::EntityMetadata) -> Value {
640 let mut map = Map::new();
641 if let Some(data_product) = &meta.data_product {
642 map.insert(
643 "data_product".to_string(),
644 Value::String(data_product.clone()),
645 );
646 }
647 if let Some(domain) = &meta.domain {
648 map.insert("domain".to_string(), Value::String(domain.clone()));
649 }
650 if let Some(owner) = &meta.owner {
651 map.insert("owner".to_string(), Value::String(owner.clone()));
652 }
653 if let Some(description) = &meta.description {
654 map.insert(
655 "description".to_string(),
656 Value::String(description.clone()),
657 );
658 }
659 if let Some(tags) = &meta.tags {
660 map.insert("tags".to_string(), string_array(tags));
661 }
662 Value::Object(map)
663}
664
665fn source_options_json(options: &config::SourceOptions) -> Value {
666 let mut map = Map::new();
667 if let Some(header) = options.header {
668 map.insert("header".to_string(), Value::Bool(header));
669 }
670 if let Some(separator) = &options.separator {
671 map.insert("separator".to_string(), Value::String(separator.clone()));
672 }
673 if let Some(encoding) = &options.encoding {
674 map.insert("encoding".to_string(), Value::String(encoding.clone()));
675 }
676 if let Some(null_values) = &options.null_values {
677 map.insert("null_values".to_string(), string_array(null_values));
678 }
679 Value::Object(map)
680}
681
682fn string_array(values: &[String]) -> Value {
683 Value::Array(values.iter().cloned().map(Value::String).collect())
684}