Skip to main content

floe_core/run/entity/
mod.rs

1use crate::config::PolicySeverity;
2use crate::{check, io, report, ConfigError, FloeResult};
3
4impl From<PolicySeverity> for report::Severity {
5    fn from(s: PolicySeverity) -> Self {
6        match s {
7            PolicySeverity::Warn => report::Severity::Warn,
8            PolicySeverity::Reject => report::Severity::Reject,
9            PolicySeverity::Abort => report::Severity::Abort,
10        }
11    }
12}
13use serde_json::json;
14use std::collections::HashSet;
15use std::time::Instant;
16
17use super::file::required_columns;
18use super::{EntityOutcome, RunContext, MAX_RESOLVED_INPUTS};
19use crate::checks::normalize::{
20    output_column_mapping, pii_schema_to_runtime_mapping, resolve_normalize_strategy,
21    resolve_source_columns, source_column_mapping,
22};
23use io::storage::Target;
24
25mod accepted_write;
26mod incremental;
27mod pii;
28mod precheck;
29mod process;
30mod resolve;
31mod validate_split;
32pub(crate) use resolve::{resolve_entity_targets, ResolvedEntityTargets};
33
34use crate::report::entity::{build_run_report, RunReportContext};
35use crate::run::events::RunObserver;
36use accepted_write::{run_accepted_write_phase, AcceptedWritePhaseContext};
37use precheck::{run_precheck, PrecheckContext};
38use process::sink_options_warning;
39use validate_split::{run_validate_split_phase, ValidateSplitPhaseContext};
40
41pub(super) struct EntityRunResult {
42    pub outcome: EntityOutcome,
43    pub abort_run: bool,
44}
45
46#[derive(Debug, Default)]
47struct EntityPhaseTimings {
48    precheck_ms: u64,
49    read_parse_ms: u64,
50    checks_validation_ms: u64,
51    accept_reject_split_ms: u64,
52    write_rejected_ms: u64,
53    archive_ms: u64,
54    concat_accepted_ms: u64,
55    write_accepted_ms: u64,
56    write_delta_ms: u64,
57    write_iceberg_ms: u64,
58    report_write_ms: u64,
59}
60
61impl EntityPhaseTimings {
62    fn into_json(self) -> serde_json::Value {
63        json!({
64            "precheck": self.precheck_ms,
65            "read_parse": self.read_parse_ms,
66            "checks_validation": self.checks_validation_ms,
67            "accept_reject_split": self.accept_reject_split_ms,
68            "write_rejected": self.write_rejected_ms,
69            "archive_input": self.archive_ms,
70            "concat_accepted": self.concat_accepted_ms,
71            "write_accepted": self.write_accepted_ms,
72            "write_delta": self.write_delta_ms,
73            "write_iceberg": self.write_iceberg_ms,
74            "write_entity_report": self.report_write_ms,
75        })
76    }
77}
78
79pub(super) fn run_entity(
80    context: &RunContext,
81    runtime: &mut dyn crate::runtime::Runtime,
82    observer: &dyn RunObserver,
83    plan: super::EntityRunPlan<'_>,
84) -> FloeResult<EntityRunResult> {
85    let entity = plan.entity;
86    let perf_enabled = crate::run::perf::phase_timing_enabled();
87    let entity_start = perf_enabled.then(Instant::now);
88    let mut phase_timings = EntityPhaseTimings::default();
89    let input = &entity.source;
90    let write_mode = entity.sink.write_mode;
91    let input_adapter = runtime.input_adapter(input.format.as_str())?;
92    let resolved_targets = plan.resolved_targets;
93    let formatter_name = context
94        .config
95        .report
96        .as_ref()
97        .and_then(|report| report.formatter.as_deref())
98        .unwrap_or("json");
99
100    let normalize_strategy = resolve_normalize_strategy(entity)?;
101    let normalized_columns =
102        resolve_source_columns(&entity.schema.columns, normalize_strategy.as_deref(), false)?;
103    let source_column_map =
104        source_column_mapping(&entity.schema.columns, normalize_strategy.as_deref())?;
105    let row_error_formatter = if source_column_map.is_empty() {
106        check::row_error_formatter(formatter_name, None)?
107    } else {
108        check::row_error_formatter(formatter_name, Some(&source_column_map))?
109    };
110    let read_columns = io::format::resolve_read_columns(
111        entity,
112        &normalized_columns,
113        normalize_strategy.as_deref(),
114    )?;
115    let output_column_map =
116        output_column_mapping(&entity.schema.columns, normalize_strategy.as_deref())?;
117    let pii_runtime_map =
118        pii_schema_to_runtime_mapping(&entity.schema.columns, normalize_strategy.as_deref());
119    let mut required_cols = required_columns(&normalized_columns);
120    append_primary_key_required_columns(&mut required_cols, entity, normalize_strategy.as_deref())?;
121    let unique_constraints =
122        resolve_unique_constraints(entity, normalize_strategy.as_deref(), write_mode)?;
123    let accepted_target = resolved_targets.accepted.clone();
124    let rejected_target = resolved_targets.rejected.clone();
125    let temp_dir = plan.temp_dir;
126    let io::storage::inputs::ResolvedInputs {
127        files: input_files,
128        listed: resolved_files,
129        mode: resolved_mode,
130    } = plan.resolved_inputs;
131    let mut incremental =
132        incremental::prepare_incremental_context(context, runtime.storage(), entity, input_files)?;
133    let input_files = incremental.pending_inputs;
134    let pending_input_count = input_files.len();
135
136    let severity = report::Severity::from(entity.policy.severity);
137    let track_cast_errors = !matches!(input.cast_mode.as_deref(), Some("coerce"));
138
139    let reported_files = resolved_files
140        .iter()
141        .take(MAX_RESOLVED_INPUTS)
142        .cloned()
143        .collect::<Vec<_>>();
144
145    let mut file_reports =
146        Vec::with_capacity(input_files.len() + incremental.skipped_reports.len());
147    let mut totals = report::ResultsTotals {
148        files_total: 0,
149        rows_total: 0,
150        accepted_total: 0,
151        rejected_total: 0,
152        warnings_total: 0,
153        errors_total: 0,
154    };
155    let archive_target = if entity.archive_enabled() {
156        entity
157            .sink
158            .archive
159            .as_ref()
160            .map(|archive| {
161                let storage_name = archive
162                    .storage
163                    .as_deref()
164                    .or(entity.source.storage.as_deref());
165                let resolved = context.storage_resolver.resolve_path(
166                    &entity.name,
167                    "sink.archive.storage",
168                    storage_name,
169                    &archive.path,
170                )?;
171                Target::from_resolved(&resolved)
172            })
173            .transpose()?
174    } else {
175        None
176    };
177    let mut file_timings_ms =
178        Vec::with_capacity(input_files.len() + incremental.skipped_reports.len());
179    for skipped in incremental.skipped_reports {
180        totals.files_total += 1;
181        totals.warnings_total += skipped.validation.warnings;
182        file_reports.push(skipped);
183        file_timings_ms.push(Some(0));
184    }
185    let sink_options_warning = sink_options_warning(entity);
186    // Phase A: per-file precheck (schema mismatch / early rejection).
187    let precheck_start = perf_enabled.then(Instant::now);
188    let precheck = run_precheck(
189        PrecheckContext {
190            context,
191            entity,
192            input_adapter,
193            normalized_columns: &normalized_columns,
194            resolved_targets: &resolved_targets,
195            archive_target: archive_target.as_ref(),
196            temp_dir: temp_dir.as_ref(),
197            cloud: runtime.storage(),
198            observer,
199            file_reports: &mut file_reports,
200            file_timings_ms: &mut file_timings_ms,
201            totals: &mut totals,
202        },
203        input_files,
204    )?;
205    if let Some(start) = precheck_start {
206        phase_timings.precheck_ms += start.elapsed().as_millis() as u64;
207    }
208    let mut abort_run = precheck.abort_run;
209    let prechecked_inputs = precheck.prechecked;
210
211    let mut accepted_accum = Vec::new();
212    let temp_dir_path = temp_dir.as_ref().map(|dir| dir.path());
213    let mut unique_tracker = check::UniqueTracker::with_constraints(unique_constraints);
214    io::unique_seed::seed_unique_tracker_for_append(
215        &mut unique_tracker,
216        write_mode,
217        entity.sink.accepted.format.as_str(),
218        &accepted_target,
219        temp_dir.as_ref().map(|dir| dir.path()),
220        runtime.storage(),
221        &context.storage_resolver,
222        &context.catalog_resolver,
223        entity,
224    )?;
225    // Phase B: row-level validation + entity-level accumulation.
226    let phase_b = run_validate_split_phase(ValidateSplitPhaseContext {
227        run_context: context,
228        runtime,
229        observer,
230        entity,
231        input_adapter,
232        prechecked_inputs,
233        read_columns: &read_columns,
234        normalize_strategy: normalize_strategy.as_deref(),
235        normalized_columns: &normalized_columns,
236        required_cols: &required_cols,
237        source_column_map: &source_column_map,
238        output_column_map: &output_column_map,
239        pii_runtime_map: &pii_runtime_map,
240        row_error_formatter: row_error_formatter.as_ref(),
241        severity,
242        track_cast_errors,
243        write_mode,
244        rejected_target: rejected_target.as_ref(),
245        archive_target: archive_target.as_ref(),
246        temp_dir: temp_dir_path,
247        sink_options_warning: sink_options_warning.as_deref(),
248        perf_enabled,
249        phase_timings: &mut phase_timings,
250        file_reports: &mut file_reports,
251        file_timings_ms: &mut file_timings_ms,
252        totals: &mut totals,
253        unique_tracker: &mut unique_tracker,
254        accepted_accum: &mut accepted_accum,
255        initial_abort_run: abort_run,
256    })?;
257    abort_run = phase_b.abort_run;
258    let accepted_accum_rows = phase_b.accepted_accum_rows;
259    let accepted_accum_frames = phase_b.accepted_accum_frames;
260    let unique_constraints = unique_tracker.results();
261
262    totals.files_total = file_reports.len() as u64;
263
264    let accepted_target_uri = accepted_target.target_uri().to_string();
265    let aw = run_accepted_write_phase(AcceptedWritePhaseContext {
266        run_context: context,
267        observer,
268        runtime,
269        entity,
270        accepted_target: &accepted_target,
271        temp_dir: temp_dir_path,
272        write_mode,
273        perf_enabled,
274        phase_timings: &mut phase_timings,
275        pending_input_count,
276        accepted_accum,
277    })?;
278    if aw.parts_written > 0 {
279        for file_report in &mut file_reports {
280            file_report.output.accepted_path = Some(accepted_target_uri.clone());
281        }
282    }
283
284    let perf_files_total = totals.files_total;
285    let perf_rows_total = totals.rows_total;
286
287    let run_report = build_run_report(RunReportContext {
288        context,
289        entity,
290        input,
291        resolved_targets: &resolved_targets,
292        resolved_mode,
293        resolved_files: &resolved_files,
294        reported_files,
295        totals,
296        file_reports,
297        severity,
298        accepted_write_mode: write_mode,
299        accepted_parts_written: aw.parts_written,
300        accepted_files_written: aw.files_written,
301        accepted_part_files: aw.part_files,
302        accepted_table_version: aw.table_version,
303        accepted_snapshot_id: aw.snapshot_id,
304        accepted_table_root_uri: aw.table_root_uri,
305        accepted_catalog: aw.catalog,
306        accepted_total_bytes_written: aw.metrics.total_bytes_written,
307        accepted_avg_file_size_mb: aw.metrics.avg_file_size_mb,
308        accepted_small_files_count: aw.metrics.small_files_count,
309        accepted_merge_key: aw
310            .merge
311            .as_ref()
312            .map(|m| m.merge_key.clone())
313            .unwrap_or_default(),
314        accepted_inserted_count: aw.merge.as_ref().map(|m| m.inserted_count),
315        accepted_updated_count: aw.merge.as_ref().map(|m| m.updated_count),
316        accepted_closed_count: aw.merge.as_ref().and_then(|m| m.closed_count),
317        accepted_unchanged_count: aw.merge.as_ref().and_then(|m| m.unchanged_count),
318        accepted_target_rows_before: aw.merge.as_ref().map(|m| m.target_rows_before),
319        accepted_target_rows_after: aw.merge.as_ref().map(|m| m.target_rows_after),
320        accepted_merge_elapsed_ms: aw.merge.as_ref().map(|m| m.merge_elapsed_ms),
321        accepted_schema_evolution: report::SchemaEvolutionSummary {
322            enabled: aw.schema_evolution.enabled,
323            mode: aw.schema_evolution.mode,
324            applied: aw.schema_evolution.applied,
325            added_columns: aw.schema_evolution.added_columns,
326            incompatible_changes_detected: aw.schema_evolution.incompatible_changes_detected,
327        },
328        unique_constraints,
329    });
330
331    if let Some(report_target) = &context.report_target {
332        let report_write_start = perf_enabled.then(Instant::now);
333        crate::report::output::write_entity_report(
334            report_target,
335            &context.run_id,
336            entity,
337            &run_report,
338            runtime.storage(),
339            &context.storage_resolver,
340        )?;
341        if let Some(start) = report_write_start {
342            phase_timings.report_write_ms += start.elapsed().as_millis() as u64;
343        }
344    }
345
346    if let Some(pending_state) = incremental.pending_state.as_mut() {
347        let (status, _) = report::compute_run_outcome(
348            &run_report
349                .files
350                .iter()
351                .map(|file| file.status)
352                .collect::<Vec<_>>(),
353        );
354        if matches!(
355            status,
356            report::RunStatus::Success | report::RunStatus::SuccessWithWarnings
357        ) {
358            pending_state.commit(context, runtime.storage(), entity)?;
359        } else {
360            pending_state.release(context, runtime.storage(), entity)?;
361        }
362    }
363
364    if let Some(start) = entity_start {
365        crate::run::perf::emit_perf_log(
366            observer,
367            &context.run_id,
368            Some(&entity.name),
369            "perf_entity_phase_timings",
370            json!({
371                "entity": entity.name,
372                "elapsed_ms": start.elapsed().as_millis() as u64,
373                "files_total": perf_files_total,
374                "rows_total": perf_rows_total,
375                "accepted_rows_accumulated": accepted_accum_rows,
376                "accepted_frames_accumulated": accepted_accum_frames,
377                "write_sink_format": entity.sink.accepted.format,
378                "write_sink_breakdown_ms": aw.perf.as_ref().map(write_perf_breakdown_json),
379                "phases_ms": phase_timings.into_json(),
380            }),
381        );
382    }
383
384    Ok(EntityRunResult {
385        outcome: EntityOutcome {
386            report: run_report,
387            file_timings_ms,
388        },
389        abort_run,
390    })
391}
392
393fn write_perf_breakdown_json(
394    perf: &crate::io::format::AcceptedWritePerfBreakdown,
395) -> serde_json::Value {
396    json!({
397        "conversion": perf.conversion_ms,
398        "source_df_build": perf.source_df_build_ms,
399        "merge_exec": perf.merge_exec_ms,
400        "data_write": perf.data_write_ms,
401        "commit": perf.commit_ms,
402        "metrics_read": perf.metrics_read_ms,
403    })
404}
405
406fn resolve_unique_constraints(
407    entity: &crate::config::EntityConfig,
408    normalize_strategy: Option<&str>,
409    write_mode: crate::config::WriteMode,
410) -> FloeResult<Vec<check::UniqueConstraint>> {
411    let unique_keys = check::resolve_schema_unique_keys(&entity.schema);
412    if unique_keys.is_empty() {
413        return Ok(Vec::new());
414    }
415    let merge_primary_key = if matches!(
416        write_mode,
417        crate::config::WriteMode::MergeScd1 | crate::config::WriteMode::MergeScd2
418    ) {
419        entity.schema.primary_key.as_ref().map(|primary_key| {
420            primary_key
421                .iter()
422                .map(|column| column.trim().to_string())
423                .collect::<Vec<_>>()
424        })
425    } else {
426        None
427    };
428    let mut constraints = Vec::with_capacity(unique_keys.len());
429    for key in unique_keys {
430        let mut runtime_columns = Vec::with_capacity(key.len());
431        for name in &key {
432            let column = entity
433                .schema
434                .columns
435                .iter()
436                .find(|column| column.name == *name)
437                .ok_or_else(|| {
438                    Box::new(ConfigError(format!(
439                        "entity.name={} schema unique key references unknown column {}",
440                        entity.name, name
441                    )))
442                })?;
443            runtime_columns.push(runtime_column_name(column, normalize_strategy));
444        }
445        let enforce_reject = merge_primary_key
446            .as_ref()
447            .map(|primary_key| primary_key == &key)
448            .unwrap_or(false);
449        constraints.push(check::UniqueConstraint {
450            runtime_columns,
451            report_columns: key,
452            enforce_reject,
453        });
454    }
455    Ok(constraints)
456}
457
458fn append_primary_key_required_columns(
459    required_cols: &mut Vec<String>,
460    entity: &crate::config::EntityConfig,
461    normalize_strategy: Option<&str>,
462) -> FloeResult<()> {
463    let Some(primary_key) = entity.schema.primary_key.as_ref() else {
464        return Ok(());
465    };
466    if primary_key.is_empty() {
467        return Ok(());
468    }
469    let mut seen = required_cols.iter().cloned().collect::<HashSet<_>>();
470    for key_column in primary_key {
471        let column = entity
472            .schema
473            .columns
474            .iter()
475            .find(|column| column.name == *key_column)
476            .ok_or_else(|| {
477                Box::new(ConfigError(format!(
478                    "entity.name={} schema.primary_key references unknown column {}",
479                    entity.name, key_column
480                )))
481            })?;
482        let runtime = runtime_column_name(column, normalize_strategy);
483        if seen.insert(runtime.clone()) {
484            required_cols.push(runtime);
485        }
486    }
487    Ok(())
488}
489
490fn runtime_column_name(
491    column: &crate::config::ColumnConfig,
492    normalize_strategy: Option<&str>,
493) -> String {
494    let source_name = column.source_or_name();
495    if let Some(strategy) = normalize_strategy {
496        check::normalize::normalize_name(source_name, strategy)
497    } else {
498        source_name.to_string()
499    }
500}