Skip to main content

floe_core/run/entity/
mod.rs

1use crate::config::PolicySeverity;
2use crate::{check, io, report, ConfigError, FloeResult};
3
4impl From<PolicySeverity> for report::Severity {
5    fn from(s: PolicySeverity) -> Self {
6        match s {
7            PolicySeverity::Warn => report::Severity::Warn,
8            PolicySeverity::Reject => report::Severity::Reject,
9            PolicySeverity::Abort => report::Severity::Abort,
10        }
11    }
12}
13use serde_json::json;
14use std::collections::HashSet;
15use std::time::Instant;
16
17use super::file::required_columns;
18use super::{EntityOutcome, RunContext, MAX_RESOLVED_INPUTS};
19use crate::checks::normalize::{
20    output_column_mapping, pii_schema_to_runtime_mapping, resolve_normalize_strategy,
21    resolve_source_columns, source_column_mapping,
22};
23use io::storage::Target;
24
25mod accepted_write;
26mod incremental;
27mod pii;
28mod precheck;
29mod process;
30mod resolve;
31mod validate_split;
32pub(crate) use resolve::{resolve_entity_targets, ResolvedEntityTargets};
33
34use crate::report::entity::{build_run_report, RunReportContext};
35use crate::run::events::{event_time_ms, RunEvent, RunObserver};
36use accepted_write::{run_accepted_write_phase, AcceptedWritePhaseContext};
37use precheck::{run_precheck, PrecheckContext};
38use process::sink_options_warning;
39use validate_split::{run_validate_split_phase, ValidateSplitPhaseContext};
40
41pub(super) struct EntityRunResult {
42    pub outcome: EntityOutcome,
43    pub abort_run: bool,
44}
45
46#[derive(Debug, Default)]
47struct EntityPhaseTimings {
48    precheck_ms: u64,
49    read_parse_ms: u64,
50    checks_validation_ms: u64,
51    accept_reject_split_ms: u64,
52    write_rejected_ms: u64,
53    archive_ms: u64,
54    concat_accepted_ms: u64,
55    write_accepted_ms: u64,
56    write_delta_ms: u64,
57    write_iceberg_ms: u64,
58    report_write_ms: u64,
59}
60
61impl EntityPhaseTimings {
62    fn into_json(self) -> serde_json::Value {
63        json!({
64            "precheck": self.precheck_ms,
65            "read_parse": self.read_parse_ms,
66            "checks_validation": self.checks_validation_ms,
67            "accept_reject_split": self.accept_reject_split_ms,
68            "write_rejected": self.write_rejected_ms,
69            "archive_input": self.archive_ms,
70            "concat_accepted": self.concat_accepted_ms,
71            "write_accepted": self.write_accepted_ms,
72            "write_delta": self.write_delta_ms,
73            "write_iceberg": self.write_iceberg_ms,
74            "write_entity_report": self.report_write_ms,
75        })
76    }
77}
78
79pub(super) fn run_entity(
80    context: &RunContext,
81    runtime: &mut dyn crate::runtime::Runtime,
82    observer: &dyn RunObserver,
83    plan: super::EntityRunPlan<'_>,
84) -> FloeResult<EntityRunResult> {
85    let entity = plan.entity;
86    let perf_enabled = crate::run::perf::phase_timing_enabled();
87    let entity_start = perf_enabled.then(Instant::now);
88    let mut phase_timings = EntityPhaseTimings::default();
89    let input = &entity.source;
90    let write_mode = entity.sink.write_mode;
91    let input_adapter = runtime.input_adapter(input.format.as_str())?;
92    let resolved_targets = plan.resolved_targets;
93    let formatter_name = context
94        .config
95        .report
96        .as_ref()
97        .and_then(|report| report.formatter.as_deref())
98        .unwrap_or("json");
99
100    let normalize_strategy = resolve_normalize_strategy(entity)?;
101    let normalized_columns =
102        resolve_source_columns(&entity.schema.columns, normalize_strategy.as_deref(), false)?;
103    let source_column_map =
104        source_column_mapping(&entity.schema.columns, normalize_strategy.as_deref())?;
105    let row_error_formatter = if source_column_map.is_empty() {
106        check::row_error_formatter(formatter_name, None)?
107    } else {
108        check::row_error_formatter(formatter_name, Some(&source_column_map))?
109    };
110    let read_columns = io::format::resolve_read_columns(
111        entity,
112        &normalized_columns,
113        normalize_strategy.as_deref(),
114    )?;
115    let output_column_map =
116        output_column_mapping(&entity.schema.columns, normalize_strategy.as_deref())?;
117    let pii_runtime_map =
118        pii_schema_to_runtime_mapping(&entity.schema.columns, normalize_strategy.as_deref());
119    let mut required_cols = required_columns(&normalized_columns);
120    append_primary_key_required_columns(&mut required_cols, entity, normalize_strategy.as_deref())?;
121    let unique_constraints =
122        resolve_unique_constraints(entity, normalize_strategy.as_deref(), write_mode)?;
123    let accepted_target = resolved_targets.accepted.clone();
124    let rejected_target = resolved_targets.rejected.clone();
125    let temp_dir = plan.temp_dir;
126    let io::storage::inputs::ResolvedInputs {
127        files: input_files,
128        listed: resolved_files,
129        mode: resolved_mode,
130    } = plan.resolved_inputs;
131    let mut incremental =
132        incremental::prepare_incremental_context(context, runtime.storage(), entity, input_files)?;
133    let input_files = incremental.pending_inputs;
134    let pending_input_count = input_files.len();
135
136    let severity = report::Severity::from(entity.policy.severity);
137    let track_cast_errors = !matches!(input.cast_mode.as_deref(), Some("coerce"));
138
139    let reported_files = resolved_files
140        .iter()
141        .take(MAX_RESOLVED_INPUTS)
142        .cloned()
143        .collect::<Vec<_>>();
144
145    let mut file_reports =
146        Vec::with_capacity(input_files.len() + incremental.skipped_reports.len());
147    let mut totals = report::ResultsTotals {
148        files_total: 0,
149        files_skipped: 0,
150        rows_total: 0,
151        accepted_total: 0,
152        rejected_total: 0,
153        warnings_total: 0,
154        errors_total: 0,
155    };
156    let archive_target = if entity.archive_enabled() {
157        entity
158            .sink
159            .archive
160            .as_ref()
161            .map(|archive| {
162                let storage_name = archive
163                    .storage
164                    .as_deref()
165                    .or(entity.source.storage.as_deref());
166                let resolved = context.storage_resolver.resolve_path(
167                    &entity.name,
168                    "sink.archive.storage",
169                    storage_name,
170                    &archive.path,
171                )?;
172                Target::from_resolved(&resolved)
173            })
174            .transpose()?
175    } else {
176        None
177    };
178    let mut file_timings_ms =
179        Vec::with_capacity(input_files.len() + incremental.skipped_reports.len());
180    for skipped in incremental.skipped_reports {
181        observer.on_event(RunEvent::FileFinished {
182            run_id: context.run_id.clone(),
183            entity: entity.name.clone(),
184            input: skipped.input_file.clone(),
185            status: "skipped".to_string(),
186            skip_reason: skipped.skip_reason.clone(),
187            rows: 0,
188            accepted: 0,
189            rejected: 0,
190            elapsed_ms: 0,
191            ts_ms: event_time_ms(),
192        });
193        totals.files_total += 1;
194        totals.files_skipped += 1;
195        totals.warnings_total += skipped.validation.warnings;
196        file_reports.push(skipped);
197        file_timings_ms.push(Some(0));
198    }
199    let sink_options_warning = sink_options_warning(entity);
200    // Phase A: per-file precheck (schema mismatch / early rejection).
201    let precheck_start = perf_enabled.then(Instant::now);
202    let precheck = run_precheck(
203        PrecheckContext {
204            context,
205            entity,
206            input_adapter,
207            normalized_columns: &normalized_columns,
208            resolved_targets: &resolved_targets,
209            archive_target: archive_target.as_ref(),
210            temp_dir: temp_dir.as_ref(),
211            cloud: runtime.storage(),
212            observer,
213            file_reports: &mut file_reports,
214            file_timings_ms: &mut file_timings_ms,
215            totals: &mut totals,
216        },
217        input_files,
218    )?;
219    if let Some(start) = precheck_start {
220        phase_timings.precheck_ms += start.elapsed().as_millis() as u64;
221    }
222    let mut abort_run = precheck.abort_run;
223    let prechecked_inputs = precheck.prechecked;
224
225    let mut accepted_accum = Vec::new();
226    let temp_dir_path = temp_dir.as_ref().map(|dir| dir.path());
227    let mut unique_tracker = check::UniqueTracker::with_constraints(unique_constraints);
228    io::unique_seed::seed_unique_tracker_for_append(
229        &mut unique_tracker,
230        write_mode,
231        entity.sink.accepted.format.as_str(),
232        &accepted_target,
233        temp_dir.as_ref().map(|dir| dir.path()),
234        runtime.storage(),
235        &context.storage_resolver,
236        &context.catalog_resolver,
237        entity,
238    )?;
239    // Phase B: row-level validation + entity-level accumulation.
240    let phase_b = run_validate_split_phase(ValidateSplitPhaseContext {
241        run_context: context,
242        runtime,
243        observer,
244        entity,
245        input_adapter,
246        prechecked_inputs,
247        read_columns: &read_columns,
248        normalize_strategy: normalize_strategy.as_deref(),
249        normalized_columns: &normalized_columns,
250        required_cols: &required_cols,
251        source_column_map: &source_column_map,
252        output_column_map: &output_column_map,
253        pii_runtime_map: &pii_runtime_map,
254        row_error_formatter: row_error_formatter.as_ref(),
255        severity,
256        track_cast_errors,
257        write_mode,
258        rejected_target: rejected_target.as_ref(),
259        archive_target: archive_target.as_ref(),
260        temp_dir: temp_dir_path,
261        sink_options_warning: sink_options_warning.as_deref(),
262        perf_enabled,
263        phase_timings: &mut phase_timings,
264        file_reports: &mut file_reports,
265        file_timings_ms: &mut file_timings_ms,
266        totals: &mut totals,
267        unique_tracker: &mut unique_tracker,
268        accepted_accum: &mut accepted_accum,
269        initial_abort_run: abort_run,
270    })?;
271    abort_run = phase_b.abort_run;
272    let accepted_accum_rows = phase_b.accepted_accum_rows;
273    let accepted_accum_frames = phase_b.accepted_accum_frames;
274    let unique_constraints = unique_tracker.results();
275
276    totals.files_total = file_reports.len() as u64;
277
278    let accepted_target_uri = accepted_target.target_uri().to_string();
279    let aw = run_accepted_write_phase(AcceptedWritePhaseContext {
280        run_context: context,
281        observer,
282        runtime,
283        entity,
284        accepted_target: &accepted_target,
285        temp_dir: temp_dir_path,
286        write_mode,
287        perf_enabled,
288        phase_timings: &mut phase_timings,
289        pending_input_count,
290        accepted_accum,
291    })?;
292    if aw.parts_written > 0 {
293        for file_report in &mut file_reports {
294            file_report.output.accepted_path = Some(accepted_target_uri.clone());
295        }
296    }
297
298    let perf_files_total = totals.files_total;
299    let perf_rows_total = totals.rows_total;
300
301    let run_report = build_run_report(RunReportContext {
302        context,
303        entity,
304        input,
305        resolved_targets: &resolved_targets,
306        resolved_mode,
307        resolved_files: &resolved_files,
308        reported_files,
309        totals,
310        file_reports,
311        severity,
312        accepted_write_mode: write_mode,
313        accepted_parts_written: aw.parts_written,
314        accepted_files_written: aw.files_written,
315        accepted_part_files: aw.part_files,
316        accepted_table_version: aw.table_version,
317        accepted_snapshot_id: aw.snapshot_id,
318        accepted_table_root_uri: aw.table_root_uri,
319        accepted_catalog: aw.catalog,
320        accepted_total_bytes_written: aw.metrics.total_bytes_written,
321        accepted_avg_file_size_mb: aw.metrics.avg_file_size_mb,
322        accepted_small_files_count: aw.metrics.small_files_count,
323        accepted_merge_key: aw
324            .merge
325            .as_ref()
326            .map(|m| m.merge_key.clone())
327            .unwrap_or_default(),
328        accepted_inserted_count: aw.merge.as_ref().map(|m| m.inserted_count),
329        accepted_updated_count: aw.merge.as_ref().map(|m| m.updated_count),
330        accepted_closed_count: aw.merge.as_ref().and_then(|m| m.closed_count),
331        accepted_unchanged_count: aw.merge.as_ref().and_then(|m| m.unchanged_count),
332        accepted_target_rows_before: aw.merge.as_ref().map(|m| m.target_rows_before),
333        accepted_target_rows_after: aw.merge.as_ref().map(|m| m.target_rows_after),
334        accepted_merge_elapsed_ms: aw.merge.as_ref().map(|m| m.merge_elapsed_ms),
335        accepted_schema_evolution: report::SchemaEvolutionSummary {
336            enabled: aw.schema_evolution.enabled,
337            mode: aw.schema_evolution.mode,
338            applied: aw.schema_evolution.applied,
339            added_columns: aw.schema_evolution.added_columns,
340            incompatible_changes_detected: aw.schema_evolution.incompatible_changes_detected,
341        },
342        unique_constraints,
343    });
344
345    if let Some(report_target) = &context.report_target {
346        let report_write_start = perf_enabled.then(Instant::now);
347        crate::report::output::write_entity_report(
348            report_target,
349            &context.run_id,
350            entity,
351            &run_report,
352            runtime.storage(),
353            &context.storage_resolver,
354        )?;
355        if let Some(start) = report_write_start {
356            phase_timings.report_write_ms += start.elapsed().as_millis() as u64;
357        }
358    }
359
360    if let Some(pending_state) = incremental.pending_state.as_mut() {
361        let (status, _) = report::compute_run_outcome(
362            &run_report
363                .files
364                .iter()
365                .map(|file| file.status)
366                .collect::<Vec<_>>(),
367        );
368        if matches!(
369            status,
370            report::RunStatus::Success
371                | report::RunStatus::SuccessWithWarnings
372                | report::RunStatus::Rejected
373        ) {
374            pending_state.commit(context, runtime.storage(), entity)?;
375        } else {
376            pending_state.release(context, runtime.storage(), entity)?;
377        }
378    }
379
380    if let Some(start) = entity_start {
381        crate::run::perf::emit_perf_log(
382            observer,
383            &context.run_id,
384            Some(&entity.name),
385            "perf_entity_phase_timings",
386            json!({
387                "entity": entity.name,
388                "elapsed_ms": start.elapsed().as_millis() as u64,
389                "files_total": perf_files_total,
390                "rows_total": perf_rows_total,
391                "accepted_rows_accumulated": accepted_accum_rows,
392                "accepted_frames_accumulated": accepted_accum_frames,
393                "write_sink_format": entity.sink.accepted.format,
394                "write_sink_breakdown_ms": aw.perf.as_ref().map(write_perf_breakdown_json),
395                "phases_ms": phase_timings.into_json(),
396            }),
397        );
398    }
399
400    Ok(EntityRunResult {
401        outcome: EntityOutcome {
402            report: run_report,
403            file_timings_ms,
404        },
405        abort_run,
406    })
407}
408
409fn write_perf_breakdown_json(
410    perf: &crate::io::format::AcceptedWritePerfBreakdown,
411) -> serde_json::Value {
412    json!({
413        "conversion": perf.conversion_ms,
414        "source_df_build": perf.source_df_build_ms,
415        "merge_exec": perf.merge_exec_ms,
416        "data_write": perf.data_write_ms,
417        "commit": perf.commit_ms,
418        "metrics_read": perf.metrics_read_ms,
419    })
420}
421
422fn resolve_unique_constraints(
423    entity: &crate::config::EntityConfig,
424    normalize_strategy: Option<&str>,
425    write_mode: crate::config::WriteMode,
426) -> FloeResult<Vec<check::UniqueConstraint>> {
427    let unique_keys = check::resolve_schema_unique_keys(&entity.schema);
428    if unique_keys.is_empty() {
429        return Ok(Vec::new());
430    }
431    let merge_primary_key = if matches!(
432        write_mode,
433        crate::config::WriteMode::MergeScd1 | crate::config::WriteMode::MergeScd2
434    ) {
435        entity.schema.primary_key.as_ref().map(|primary_key| {
436            primary_key
437                .iter()
438                .map(|column| column.trim().to_string())
439                .collect::<Vec<_>>()
440        })
441    } else {
442        None
443    };
444    let mut constraints = Vec::with_capacity(unique_keys.len());
445    for key in unique_keys {
446        let mut runtime_columns = Vec::with_capacity(key.len());
447        for name in &key {
448            let column = entity
449                .schema
450                .columns
451                .iter()
452                .find(|column| column.name == *name)
453                .ok_or_else(|| {
454                    Box::new(ConfigError(format!(
455                        "entity.name={} schema unique key references unknown column {}",
456                        entity.name, name
457                    )))
458                })?;
459            runtime_columns.push(runtime_column_name(column, normalize_strategy));
460        }
461        let enforce_reject = merge_primary_key
462            .as_ref()
463            .map(|primary_key| primary_key == &key)
464            .unwrap_or(false);
465        constraints.push(check::UniqueConstraint {
466            runtime_columns,
467            report_columns: key,
468            enforce_reject,
469        });
470    }
471    Ok(constraints)
472}
473
474fn append_primary_key_required_columns(
475    required_cols: &mut Vec<String>,
476    entity: &crate::config::EntityConfig,
477    normalize_strategy: Option<&str>,
478) -> FloeResult<()> {
479    let Some(primary_key) = entity.schema.primary_key.as_ref() else {
480        return Ok(());
481    };
482    if primary_key.is_empty() {
483        return Ok(());
484    }
485    let mut seen = required_cols.iter().cloned().collect::<HashSet<_>>();
486    for key_column in primary_key {
487        let column = entity
488            .schema
489            .columns
490            .iter()
491            .find(|column| column.name == *key_column)
492            .ok_or_else(|| {
493                Box::new(ConfigError(format!(
494                    "entity.name={} schema.primary_key references unknown column {}",
495                    entity.name, key_column
496                )))
497            })?;
498        let runtime = runtime_column_name(column, normalize_strategy);
499        if seen.insert(runtime.clone()) {
500            required_cols.push(runtime);
501        }
502    }
503    Ok(())
504}
505
506fn runtime_column_name(
507    column: &crate::config::ColumnConfig,
508    normalize_strategy: Option<&str>,
509) -> String {
510    let source_name = column.source_or_name();
511    if let Some(strategy) = normalize_strategy {
512        check::normalize::normalize_name(source_name, strategy)
513    } else {
514        source_name.to_string()
515    }
516}