Skip to main content

floe_core/run/entity/
mod.rs

1use crate::config::PolicySeverity;
2use crate::{check, io, report, ConfigError, FloeResult};
3
4impl From<PolicySeverity> for report::Severity {
5    fn from(s: PolicySeverity) -> Self {
6        match s {
7            PolicySeverity::Warn => report::Severity::Warn,
8            PolicySeverity::Reject => report::Severity::Reject,
9            PolicySeverity::Abort => report::Severity::Abort,
10        }
11    }
12}
13use serde_json::json;
14use std::collections::HashSet;
15use std::time::Instant;
16
17use super::file::required_columns;
18use super::{EntityOutcome, RunContext, MAX_RESOLVED_INPUTS};
19use crate::checks::normalize::{
20    output_column_mapping, pii_schema_to_runtime_mapping, resolve_normalize_strategy,
21    resolve_source_columns, source_column_mapping,
22};
23use io::storage::Target;
24
25mod accepted_write;
26mod incremental;
27mod pii;
28mod precheck;
29mod process;
30mod resolve;
31mod validate_split;
32pub(crate) use resolve::{resolve_entity_targets, ResolvedEntityTargets};
33
34use crate::report::entity::{build_run_report, RunReportContext};
35use crate::run::events::{event_time_ms, RunEvent, RunObserver};
36use accepted_write::{run_accepted_write_phase, AcceptedWritePhaseContext};
37use precheck::{run_precheck, PrecheckContext};
38use process::sink_options_warning;
39use validate_split::{run_validate_split_phase, ValidateSplitPhaseContext};
40
41pub(super) struct EntityRunResult {
42    pub outcome: EntityOutcome,
43    pub abort_run: bool,
44}
45
46#[derive(Debug, Default)]
47struct EntityPhaseTimings {
48    precheck_ms: u64,
49    read_parse_ms: u64,
50    checks_validation_ms: u64,
51    accept_reject_split_ms: u64,
52    write_rejected_ms: u64,
53    archive_ms: u64,
54    concat_accepted_ms: u64,
55    write_accepted_ms: u64,
56    write_delta_ms: u64,
57    write_iceberg_ms: u64,
58    report_write_ms: u64,
59}
60
61impl EntityPhaseTimings {
62    fn into_json(self) -> serde_json::Value {
63        json!({
64            "precheck": self.precheck_ms,
65            "read_parse": self.read_parse_ms,
66            "checks_validation": self.checks_validation_ms,
67            "accept_reject_split": self.accept_reject_split_ms,
68            "write_rejected": self.write_rejected_ms,
69            "archive_input": self.archive_ms,
70            "concat_accepted": self.concat_accepted_ms,
71            "write_accepted": self.write_accepted_ms,
72            "write_delta": self.write_delta_ms,
73            "write_iceberg": self.write_iceberg_ms,
74            "write_entity_report": self.report_write_ms,
75        })
76    }
77}
78
79pub(super) fn run_entity(
80    context: &RunContext,
81    runtime: &mut dyn crate::runtime::Runtime,
82    observer: &dyn RunObserver,
83    plan: super::EntityRunPlan<'_>,
84) -> FloeResult<EntityRunResult> {
85    let entity = plan.entity;
86    let perf_enabled = crate::run::perf::phase_timing_enabled();
87    let entity_start = perf_enabled.then(Instant::now);
88    let mut phase_timings = EntityPhaseTimings::default();
89    let input = &entity.source;
90    let write_mode =
91        if context.full_refresh && entity.sink.write_mode == crate::config::WriteMode::Append {
92            crate::config::WriteMode::Overwrite
93        } else {
94            entity.sink.write_mode
95        };
96    let input_adapter = runtime.input_adapter(input.format.as_str())?;
97    let resolved_targets = plan.resolved_targets;
98    let formatter_name = context
99        .config
100        .report
101        .as_ref()
102        .and_then(|report| report.formatter.as_deref())
103        .unwrap_or("json");
104
105    let normalize_strategy = resolve_normalize_strategy(entity)?;
106    let normalized_columns =
107        resolve_source_columns(&entity.schema.columns, normalize_strategy.as_deref(), false)?;
108    let source_column_map =
109        source_column_mapping(&entity.schema.columns, normalize_strategy.as_deref())?;
110    let row_error_formatter = if source_column_map.is_empty() {
111        check::row_error_formatter(formatter_name, None)?
112    } else {
113        check::row_error_formatter(formatter_name, Some(&source_column_map))?
114    };
115    let read_columns = io::format::resolve_read_columns(
116        entity,
117        &normalized_columns,
118        normalize_strategy.as_deref(),
119    )?;
120    let output_column_map =
121        output_column_mapping(&entity.schema.columns, normalize_strategy.as_deref())?;
122    let pii_runtime_map =
123        pii_schema_to_runtime_mapping(&entity.schema.columns, normalize_strategy.as_deref());
124    let mut required_cols = required_columns(&normalized_columns);
125    append_primary_key_required_columns(&mut required_cols, entity, normalize_strategy.as_deref())?;
126    let unique_constraints =
127        resolve_unique_constraints(entity, normalize_strategy.as_deref(), write_mode)?;
128    let accepted_target = resolved_targets.accepted.clone();
129    let rejected_target = resolved_targets.rejected.clone();
130    let temp_dir = plan.temp_dir;
131    let io::storage::inputs::ResolvedInputs {
132        files: input_files,
133        listed: resolved_files,
134        mode: resolved_mode,
135    } = plan.resolved_inputs;
136    let mut incremental =
137        incremental::prepare_incremental_context(context, runtime.storage(), entity, input_files)?;
138    let input_files = incremental.pending_inputs;
139    let pending_input_count = input_files.len();
140
141    let severity = report::Severity::from(entity.policy.severity);
142    let track_cast_errors = !matches!(input.cast_mode.as_deref(), Some("coerce"));
143
144    let reported_files = resolved_files
145        .iter()
146        .take(MAX_RESOLVED_INPUTS)
147        .cloned()
148        .collect::<Vec<_>>();
149
150    let mut file_reports =
151        Vec::with_capacity(input_files.len() + incremental.skipped_reports.len());
152    let mut totals = report::ResultsTotals {
153        files_total: 0,
154        files_skipped: 0,
155        rows_total: 0,
156        accepted_total: 0,
157        rejected_total: 0,
158        warnings_total: 0,
159        errors_total: 0,
160    };
161    let archive_target = if entity.archive_enabled() {
162        entity
163            .sink
164            .archive
165            .as_ref()
166            .map(|archive| {
167                let storage_name = archive
168                    .storage
169                    .as_deref()
170                    .or(entity.source.storage.as_deref());
171                let resolved = context.storage_resolver.resolve_path(
172                    &entity.name,
173                    "sink.archive.storage",
174                    storage_name,
175                    &archive.path,
176                )?;
177                Target::from_resolved(&resolved)
178            })
179            .transpose()?
180    } else {
181        None
182    };
183    let mut file_timings_ms =
184        Vec::with_capacity(input_files.len() + incremental.skipped_reports.len());
185    for skipped in incremental.skipped_reports {
186        observer.on_event(RunEvent::FileFinished {
187            run_id: context.run_id.clone(),
188            entity: entity.name.clone(),
189            input: skipped.input_file.clone(),
190            status: "skipped".to_string(),
191            skip_reason: skipped.skip_reason.clone(),
192            rows: 0,
193            accepted: 0,
194            rejected: 0,
195            elapsed_ms: 0,
196            ts_ms: event_time_ms(),
197        });
198        totals.files_total += 1;
199        totals.files_skipped += 1;
200        totals.warnings_total += skipped.validation.warnings;
201        file_reports.push(skipped);
202        file_timings_ms.push(Some(0));
203    }
204    let sink_options_warning = sink_options_warning(entity);
205    // Phase A: per-file precheck (schema mismatch / early rejection).
206    let precheck_start = perf_enabled.then(Instant::now);
207    let precheck = run_precheck(
208        PrecheckContext {
209            context,
210            entity,
211            input_adapter,
212            normalized_columns: &normalized_columns,
213            resolved_targets: &resolved_targets,
214            archive_target: archive_target.as_ref(),
215            temp_dir: temp_dir.as_ref(),
216            cloud: runtime.storage(),
217            observer,
218            file_reports: &mut file_reports,
219            file_timings_ms: &mut file_timings_ms,
220            totals: &mut totals,
221        },
222        input_files,
223    )?;
224    if let Some(start) = precheck_start {
225        phase_timings.precheck_ms += start.elapsed().as_millis() as u64;
226    }
227    let mut abort_run = precheck.abort_run;
228    let prechecked_inputs = precheck.prechecked;
229
230    let mut accepted_accum = Vec::new();
231    let temp_dir_path = temp_dir.as_ref().map(|dir| dir.path());
232    let mut unique_tracker = check::UniqueTracker::with_constraints(unique_constraints);
233    io::unique_seed::seed_unique_tracker_for_append(
234        &mut unique_tracker,
235        write_mode,
236        entity.sink.accepted.format.as_str(),
237        &accepted_target,
238        temp_dir.as_ref().map(|dir| dir.path()),
239        runtime.storage(),
240        &context.storage_resolver,
241        &context.catalog_resolver,
242        entity,
243    )?;
244    // Phase B: row-level validation + entity-level accumulation.
245    let phase_b = run_validate_split_phase(ValidateSplitPhaseContext {
246        run_context: context,
247        runtime,
248        observer,
249        entity,
250        input_adapter,
251        prechecked_inputs,
252        read_columns: &read_columns,
253        normalize_strategy: normalize_strategy.as_deref(),
254        normalized_columns: &normalized_columns,
255        required_cols: &required_cols,
256        source_column_map: &source_column_map,
257        output_column_map: &output_column_map,
258        pii_runtime_map: &pii_runtime_map,
259        row_error_formatter: row_error_formatter.as_ref(),
260        severity,
261        track_cast_errors,
262        write_mode,
263        rejected_target: rejected_target.as_ref(),
264        archive_target: archive_target.as_ref(),
265        temp_dir: temp_dir_path,
266        sink_options_warning: sink_options_warning.as_deref(),
267        perf_enabled,
268        phase_timings: &mut phase_timings,
269        file_reports: &mut file_reports,
270        file_timings_ms: &mut file_timings_ms,
271        totals: &mut totals,
272        unique_tracker: &mut unique_tracker,
273        accepted_accum: &mut accepted_accum,
274        initial_abort_run: abort_run,
275    })?;
276    abort_run = phase_b.abort_run;
277    let accepted_accum_rows = phase_b.accepted_accum_rows;
278    let accepted_accum_frames = phase_b.accepted_accum_frames;
279    let unique_constraints = unique_tracker.results();
280
281    totals.files_total = file_reports.len() as u64;
282
283    let accepted_target_uri = accepted_target.target_uri().to_string();
284    let aw = run_accepted_write_phase(AcceptedWritePhaseContext {
285        run_context: context,
286        observer,
287        runtime,
288        entity,
289        accepted_target: &accepted_target,
290        temp_dir: temp_dir_path,
291        write_mode,
292        full_refresh: context.full_refresh,
293        perf_enabled,
294        phase_timings: &mut phase_timings,
295        pending_input_count,
296        accepted_accum,
297    })?;
298    if aw.parts_written > 0 {
299        for file_report in &mut file_reports {
300            file_report.output.accepted_path = Some(accepted_target_uri.clone());
301        }
302    }
303
304    let perf_files_total = totals.files_total;
305    let perf_rows_total = totals.rows_total;
306
307    let run_report = build_run_report(RunReportContext {
308        context,
309        entity,
310        input,
311        resolved_targets: &resolved_targets,
312        resolved_mode,
313        resolved_files: &resolved_files,
314        reported_files,
315        totals,
316        file_reports,
317        severity,
318        accepted_write_mode: write_mode,
319        accepted_parts_written: aw.parts_written,
320        accepted_files_written: aw.files_written,
321        accepted_part_files: aw.part_files,
322        accepted_table_version: aw.table_version,
323        accepted_snapshot_id: aw.snapshot_id,
324        accepted_table_root_uri: aw.table_root_uri,
325        accepted_catalog: aw.catalog,
326        accepted_total_bytes_written: aw.metrics.total_bytes_written,
327        accepted_avg_file_size_mb: aw.metrics.avg_file_size_mb,
328        accepted_small_files_count: aw.metrics.small_files_count,
329        accepted_merge_key: aw
330            .merge
331            .as_ref()
332            .map(|m| m.merge_key.clone())
333            .unwrap_or_default(),
334        accepted_inserted_count: aw.merge.as_ref().map(|m| m.inserted_count),
335        accepted_updated_count: aw.merge.as_ref().map(|m| m.updated_count),
336        accepted_closed_count: aw.merge.as_ref().and_then(|m| m.closed_count),
337        accepted_unchanged_count: aw.merge.as_ref().and_then(|m| m.unchanged_count),
338        accepted_target_rows_before: aw.merge.as_ref().map(|m| m.target_rows_before),
339        accepted_target_rows_after: aw.merge.as_ref().map(|m| m.target_rows_after),
340        accepted_merge_elapsed_ms: aw.merge.as_ref().map(|m| m.merge_elapsed_ms),
341        accepted_schema_evolution: report::SchemaEvolutionSummary {
342            enabled: aw.schema_evolution.enabled,
343            mode: aw.schema_evolution.mode,
344            applied: aw.schema_evolution.applied,
345            added_columns: aw.schema_evolution.added_columns,
346            incompatible_changes_detected: aw.schema_evolution.incompatible_changes_detected,
347        },
348        unique_constraints,
349    });
350
351    if let Some(report_target) = &context.report_target {
352        let report_write_start = perf_enabled.then(Instant::now);
353        crate::report::output::write_entity_report(
354            report_target,
355            &context.run_id,
356            entity,
357            &run_report,
358            runtime.storage(),
359            &context.storage_resolver,
360        )?;
361        if let Some(start) = report_write_start {
362            phase_timings.report_write_ms += start.elapsed().as_millis() as u64;
363        }
364    }
365
366    if let Some(pending_state) = incremental.pending_state.as_mut() {
367        let (status, _) = report::compute_run_outcome(
368            &run_report
369                .files
370                .iter()
371                .map(|file| file.status)
372                .collect::<Vec<_>>(),
373        );
374        if matches!(
375            status,
376            report::RunStatus::Success
377                | report::RunStatus::SuccessWithWarnings
378                | report::RunStatus::Rejected
379        ) {
380            pending_state.commit(context, runtime.storage(), entity)?;
381        } else {
382            pending_state.release(context, runtime.storage(), entity)?;
383        }
384    }
385
386    if let Some(start) = entity_start {
387        crate::run::perf::emit_perf_log(
388            observer,
389            &context.run_id,
390            Some(&entity.name),
391            "perf_entity_phase_timings",
392            json!({
393                "entity": entity.name,
394                "elapsed_ms": start.elapsed().as_millis() as u64,
395                "files_total": perf_files_total,
396                "rows_total": perf_rows_total,
397                "accepted_rows_accumulated": accepted_accum_rows,
398                "accepted_frames_accumulated": accepted_accum_frames,
399                "write_sink_format": entity.sink.accepted.format,
400                "write_sink_breakdown_ms": aw.perf.as_ref().map(write_perf_breakdown_json),
401                "phases_ms": phase_timings.into_json(),
402            }),
403        );
404    }
405
406    Ok(EntityRunResult {
407        outcome: EntityOutcome {
408            report: run_report,
409            file_timings_ms,
410        },
411        abort_run,
412    })
413}
414
415fn write_perf_breakdown_json(
416    perf: &crate::io::format::AcceptedWritePerfBreakdown,
417) -> serde_json::Value {
418    json!({
419        "conversion": perf.conversion_ms,
420        "source_df_build": perf.source_df_build_ms,
421        "merge_exec": perf.merge_exec_ms,
422        "data_write": perf.data_write_ms,
423        "commit": perf.commit_ms,
424        "metrics_read": perf.metrics_read_ms,
425    })
426}
427
428fn resolve_unique_constraints(
429    entity: &crate::config::EntityConfig,
430    normalize_strategy: Option<&str>,
431    write_mode: crate::config::WriteMode,
432) -> FloeResult<Vec<check::UniqueConstraint>> {
433    let unique_keys = check::resolve_schema_unique_keys(&entity.schema);
434    if unique_keys.is_empty() {
435        return Ok(Vec::new());
436    }
437    let merge_primary_key = if matches!(
438        write_mode,
439        crate::config::WriteMode::MergeScd1 | crate::config::WriteMode::MergeScd2
440    ) {
441        entity.schema.primary_key.as_ref().map(|primary_key| {
442            primary_key
443                .iter()
444                .map(|column| column.trim().to_string())
445                .collect::<Vec<_>>()
446        })
447    } else {
448        None
449    };
450    let mut constraints = Vec::with_capacity(unique_keys.len());
451    for key in unique_keys {
452        let mut runtime_columns = Vec::with_capacity(key.len());
453        for name in &key {
454            let column = entity
455                .schema
456                .columns
457                .iter()
458                .find(|column| column.name == *name)
459                .ok_or_else(|| {
460                    Box::new(ConfigError(format!(
461                        "entity.name={} schema unique key references unknown column {}",
462                        entity.name, name
463                    )))
464                })?;
465            runtime_columns.push(runtime_column_name(column, normalize_strategy));
466        }
467        let enforce_reject = merge_primary_key
468            .as_ref()
469            .map(|primary_key| primary_key == &key)
470            .unwrap_or(false);
471        constraints.push(check::UniqueConstraint {
472            runtime_columns,
473            report_columns: key,
474            enforce_reject,
475        });
476    }
477    Ok(constraints)
478}
479
480fn append_primary_key_required_columns(
481    required_cols: &mut Vec<String>,
482    entity: &crate::config::EntityConfig,
483    normalize_strategy: Option<&str>,
484) -> FloeResult<()> {
485    let Some(primary_key) = entity.schema.primary_key.as_ref() else {
486        return Ok(());
487    };
488    if primary_key.is_empty() {
489        return Ok(());
490    }
491    let mut seen = required_cols.iter().cloned().collect::<HashSet<_>>();
492    for key_column in primary_key {
493        let column = entity
494            .schema
495            .columns
496            .iter()
497            .find(|column| column.name == *key_column)
498            .ok_or_else(|| {
499                Box::new(ConfigError(format!(
500                    "entity.name={} schema.primary_key references unknown column {}",
501                    entity.name, key_column
502                )))
503            })?;
504        let runtime = runtime_column_name(column, normalize_strategy);
505        if seen.insert(runtime.clone()) {
506            required_cols.push(runtime);
507        }
508    }
509    Ok(())
510}
511
512fn runtime_column_name(
513    column: &crate::config::ColumnConfig,
514    normalize_strategy: Option<&str>,
515) -> String {
516    let source_name = column.source_or_name();
517    if let Some(strategy) = normalize_strategy {
518        check::normalize::normalize_name(source_name, strategy)
519    } else {
520        source_name.to_string()
521    }
522}