1use crate::config::PolicySeverity;
2use crate::{check, io, report, ConfigError, FloeResult};
3
4impl From<PolicySeverity> for report::Severity {
5 fn from(s: PolicySeverity) -> Self {
6 match s {
7 PolicySeverity::Warn => report::Severity::Warn,
8 PolicySeverity::Reject => report::Severity::Reject,
9 PolicySeverity::Abort => report::Severity::Abort,
10 }
11 }
12}
13use serde_json::json;
14use std::collections::HashSet;
15use std::time::Instant;
16
17use super::file::required_columns;
18use super::{EntityOutcome, RunContext, MAX_RESOLVED_INPUTS};
19use crate::checks::normalize::{
20 output_column_mapping, pii_schema_to_runtime_mapping, resolve_normalize_strategy,
21 resolve_source_columns, source_column_mapping,
22};
23use io::storage::Target;
24
25mod accepted_write;
26mod incremental;
27mod pii;
28mod precheck;
29mod process;
30mod resolve;
31mod validate_split;
32pub(crate) use resolve::{resolve_entity_targets, ResolvedEntityTargets};
33
34use crate::report::entity::{build_run_report, RunReportContext};
35use crate::run::events::RunObserver;
36use accepted_write::{run_accepted_write_phase, AcceptedWritePhaseContext};
37use precheck::{run_precheck, PrecheckContext};
38use process::sink_options_warning;
39use validate_split::{run_validate_split_phase, ValidateSplitPhaseContext};
40
41pub(super) struct EntityRunResult {
42 pub outcome: EntityOutcome,
43 pub abort_run: bool,
44}
45
46#[derive(Debug, Default)]
47struct EntityPhaseTimings {
48 precheck_ms: u64,
49 read_parse_ms: u64,
50 checks_validation_ms: u64,
51 accept_reject_split_ms: u64,
52 write_rejected_ms: u64,
53 archive_ms: u64,
54 concat_accepted_ms: u64,
55 write_accepted_ms: u64,
56 write_delta_ms: u64,
57 write_iceberg_ms: u64,
58 report_write_ms: u64,
59}
60
61impl EntityPhaseTimings {
62 fn into_json(self) -> serde_json::Value {
63 json!({
64 "precheck": self.precheck_ms,
65 "read_parse": self.read_parse_ms,
66 "checks_validation": self.checks_validation_ms,
67 "accept_reject_split": self.accept_reject_split_ms,
68 "write_rejected": self.write_rejected_ms,
69 "archive_input": self.archive_ms,
70 "concat_accepted": self.concat_accepted_ms,
71 "write_accepted": self.write_accepted_ms,
72 "write_delta": self.write_delta_ms,
73 "write_iceberg": self.write_iceberg_ms,
74 "write_entity_report": self.report_write_ms,
75 })
76 }
77}
78
79pub(super) fn run_entity(
80 context: &RunContext,
81 runtime: &mut dyn crate::runtime::Runtime,
82 observer: &dyn RunObserver,
83 plan: super::EntityRunPlan<'_>,
84) -> FloeResult<EntityRunResult> {
85 let entity = plan.entity;
86 let perf_enabled = crate::run::perf::phase_timing_enabled();
87 let entity_start = perf_enabled.then(Instant::now);
88 let mut phase_timings = EntityPhaseTimings::default();
89 let input = &entity.source;
90 let write_mode = entity.sink.write_mode;
91 let input_adapter = runtime.input_adapter(input.format.as_str())?;
92 let resolved_targets = plan.resolved_targets;
93 let formatter_name = context
94 .config
95 .report
96 .as_ref()
97 .and_then(|report| report.formatter.as_deref())
98 .unwrap_or("json");
99
100 let normalize_strategy = resolve_normalize_strategy(entity)?;
101 let normalized_columns =
102 resolve_source_columns(&entity.schema.columns, normalize_strategy.as_deref(), false)?;
103 let source_column_map =
104 source_column_mapping(&entity.schema.columns, normalize_strategy.as_deref())?;
105 let row_error_formatter = if source_column_map.is_empty() {
106 check::row_error_formatter(formatter_name, None)?
107 } else {
108 check::row_error_formatter(formatter_name, Some(&source_column_map))?
109 };
110 let read_columns = io::format::resolve_read_columns(
111 entity,
112 &normalized_columns,
113 normalize_strategy.as_deref(),
114 )?;
115 let output_column_map =
116 output_column_mapping(&entity.schema.columns, normalize_strategy.as_deref())?;
117 let pii_runtime_map =
118 pii_schema_to_runtime_mapping(&entity.schema.columns, normalize_strategy.as_deref());
119 let mut required_cols = required_columns(&normalized_columns);
120 append_primary_key_required_columns(&mut required_cols, entity, normalize_strategy.as_deref())?;
121 let unique_constraints =
122 resolve_unique_constraints(entity, normalize_strategy.as_deref(), write_mode)?;
123 let accepted_target = resolved_targets.accepted.clone();
124 let rejected_target = resolved_targets.rejected.clone();
125 let temp_dir = plan.temp_dir;
126 let io::storage::inputs::ResolvedInputs {
127 files: input_files,
128 listed: resolved_files,
129 mode: resolved_mode,
130 } = plan.resolved_inputs;
131 let mut incremental =
132 incremental::prepare_incremental_context(context, runtime.storage(), entity, input_files)?;
133 let input_files = incremental.pending_inputs;
134 let pending_input_count = input_files.len();
135
136 let severity = report::Severity::from(entity.policy.severity);
137 let track_cast_errors = !matches!(input.cast_mode.as_deref(), Some("coerce"));
138
139 let reported_files = resolved_files
140 .iter()
141 .take(MAX_RESOLVED_INPUTS)
142 .cloned()
143 .collect::<Vec<_>>();
144
145 let mut file_reports =
146 Vec::with_capacity(input_files.len() + incremental.skipped_reports.len());
147 let mut totals = report::ResultsTotals {
148 files_total: 0,
149 rows_total: 0,
150 accepted_total: 0,
151 rejected_total: 0,
152 warnings_total: 0,
153 errors_total: 0,
154 };
155 let archive_target = if entity.archive_enabled() {
156 entity
157 .sink
158 .archive
159 .as_ref()
160 .map(|archive| {
161 let storage_name = archive
162 .storage
163 .as_deref()
164 .or(entity.source.storage.as_deref());
165 let resolved = context.storage_resolver.resolve_path(
166 &entity.name,
167 "sink.archive.storage",
168 storage_name,
169 &archive.path,
170 )?;
171 Target::from_resolved(&resolved)
172 })
173 .transpose()?
174 } else {
175 None
176 };
177 let mut file_timings_ms =
178 Vec::with_capacity(input_files.len() + incremental.skipped_reports.len());
179 for skipped in incremental.skipped_reports {
180 totals.files_total += 1;
181 totals.warnings_total += skipped.validation.warnings;
182 file_reports.push(skipped);
183 file_timings_ms.push(Some(0));
184 }
185 let sink_options_warning = sink_options_warning(entity);
186 let precheck_start = perf_enabled.then(Instant::now);
188 let precheck = run_precheck(
189 PrecheckContext {
190 context,
191 entity,
192 input_adapter,
193 normalized_columns: &normalized_columns,
194 resolved_targets: &resolved_targets,
195 archive_target: archive_target.as_ref(),
196 temp_dir: temp_dir.as_ref(),
197 cloud: runtime.storage(),
198 observer,
199 file_reports: &mut file_reports,
200 file_timings_ms: &mut file_timings_ms,
201 totals: &mut totals,
202 },
203 input_files,
204 )?;
205 if let Some(start) = precheck_start {
206 phase_timings.precheck_ms += start.elapsed().as_millis() as u64;
207 }
208 let mut abort_run = precheck.abort_run;
209 let prechecked_inputs = precheck.prechecked;
210
211 let mut accepted_accum = Vec::new();
212 let temp_dir_path = temp_dir.as_ref().map(|dir| dir.path());
213 let mut unique_tracker = check::UniqueTracker::with_constraints(unique_constraints);
214 io::unique_seed::seed_unique_tracker_for_append(
215 &mut unique_tracker,
216 write_mode,
217 entity.sink.accepted.format.as_str(),
218 &accepted_target,
219 temp_dir.as_ref().map(|dir| dir.path()),
220 runtime.storage(),
221 &context.storage_resolver,
222 &context.catalog_resolver,
223 entity,
224 )?;
225 let phase_b = run_validate_split_phase(ValidateSplitPhaseContext {
227 run_context: context,
228 runtime,
229 observer,
230 entity,
231 input_adapter,
232 prechecked_inputs,
233 read_columns: &read_columns,
234 normalize_strategy: normalize_strategy.as_deref(),
235 normalized_columns: &normalized_columns,
236 required_cols: &required_cols,
237 source_column_map: &source_column_map,
238 output_column_map: &output_column_map,
239 pii_runtime_map: &pii_runtime_map,
240 row_error_formatter: row_error_formatter.as_ref(),
241 severity,
242 track_cast_errors,
243 write_mode,
244 rejected_target: rejected_target.as_ref(),
245 archive_target: archive_target.as_ref(),
246 temp_dir: temp_dir_path,
247 sink_options_warning: sink_options_warning.as_deref(),
248 perf_enabled,
249 phase_timings: &mut phase_timings,
250 file_reports: &mut file_reports,
251 file_timings_ms: &mut file_timings_ms,
252 totals: &mut totals,
253 unique_tracker: &mut unique_tracker,
254 accepted_accum: &mut accepted_accum,
255 initial_abort_run: abort_run,
256 })?;
257 abort_run = phase_b.abort_run;
258 let accepted_accum_rows = phase_b.accepted_accum_rows;
259 let accepted_accum_frames = phase_b.accepted_accum_frames;
260 let unique_constraints = unique_tracker.results();
261
262 totals.files_total = file_reports.len() as u64;
263
264 let accepted_target_uri = accepted_target.target_uri().to_string();
265 let aw = run_accepted_write_phase(AcceptedWritePhaseContext {
266 run_context: context,
267 observer,
268 runtime,
269 entity,
270 accepted_target: &accepted_target,
271 temp_dir: temp_dir_path,
272 write_mode,
273 perf_enabled,
274 phase_timings: &mut phase_timings,
275 pending_input_count,
276 accepted_accum,
277 })?;
278 if aw.parts_written > 0 {
279 for file_report in &mut file_reports {
280 file_report.output.accepted_path = Some(accepted_target_uri.clone());
281 }
282 }
283
284 let perf_files_total = totals.files_total;
285 let perf_rows_total = totals.rows_total;
286
287 let run_report = build_run_report(RunReportContext {
288 context,
289 entity,
290 input,
291 resolved_targets: &resolved_targets,
292 resolved_mode,
293 resolved_files: &resolved_files,
294 reported_files,
295 totals,
296 file_reports,
297 severity,
298 accepted_write_mode: write_mode,
299 accepted_parts_written: aw.parts_written,
300 accepted_files_written: aw.files_written,
301 accepted_part_files: aw.part_files,
302 accepted_table_version: aw.table_version,
303 accepted_snapshot_id: aw.snapshot_id,
304 accepted_table_root_uri: aw.table_root_uri,
305 accepted_catalog: aw.catalog,
306 accepted_total_bytes_written: aw.metrics.total_bytes_written,
307 accepted_avg_file_size_mb: aw.metrics.avg_file_size_mb,
308 accepted_small_files_count: aw.metrics.small_files_count,
309 accepted_merge_key: aw
310 .merge
311 .as_ref()
312 .map(|m| m.merge_key.clone())
313 .unwrap_or_default(),
314 accepted_inserted_count: aw.merge.as_ref().map(|m| m.inserted_count),
315 accepted_updated_count: aw.merge.as_ref().map(|m| m.updated_count),
316 accepted_closed_count: aw.merge.as_ref().and_then(|m| m.closed_count),
317 accepted_unchanged_count: aw.merge.as_ref().and_then(|m| m.unchanged_count),
318 accepted_target_rows_before: aw.merge.as_ref().map(|m| m.target_rows_before),
319 accepted_target_rows_after: aw.merge.as_ref().map(|m| m.target_rows_after),
320 accepted_merge_elapsed_ms: aw.merge.as_ref().map(|m| m.merge_elapsed_ms),
321 accepted_schema_evolution: report::SchemaEvolutionSummary {
322 enabled: aw.schema_evolution.enabled,
323 mode: aw.schema_evolution.mode,
324 applied: aw.schema_evolution.applied,
325 added_columns: aw.schema_evolution.added_columns,
326 incompatible_changes_detected: aw.schema_evolution.incompatible_changes_detected,
327 },
328 unique_constraints,
329 });
330
331 if let Some(report_target) = &context.report_target {
332 let report_write_start = perf_enabled.then(Instant::now);
333 crate::report::output::write_entity_report(
334 report_target,
335 &context.run_id,
336 entity,
337 &run_report,
338 runtime.storage(),
339 &context.storage_resolver,
340 )?;
341 if let Some(start) = report_write_start {
342 phase_timings.report_write_ms += start.elapsed().as_millis() as u64;
343 }
344 }
345
346 if let Some(pending_state) = incremental.pending_state.as_mut() {
347 let (status, _) = report::compute_run_outcome(
348 &run_report
349 .files
350 .iter()
351 .map(|file| file.status)
352 .collect::<Vec<_>>(),
353 );
354 if matches!(
355 status,
356 report::RunStatus::Success | report::RunStatus::SuccessWithWarnings
357 ) {
358 pending_state.commit(context, runtime.storage(), entity)?;
359 } else {
360 pending_state.release(context, runtime.storage(), entity)?;
361 }
362 }
363
364 if let Some(start) = entity_start {
365 crate::run::perf::emit_perf_log(
366 observer,
367 &context.run_id,
368 Some(&entity.name),
369 "perf_entity_phase_timings",
370 json!({
371 "entity": entity.name,
372 "elapsed_ms": start.elapsed().as_millis() as u64,
373 "files_total": perf_files_total,
374 "rows_total": perf_rows_total,
375 "accepted_rows_accumulated": accepted_accum_rows,
376 "accepted_frames_accumulated": accepted_accum_frames,
377 "write_sink_format": entity.sink.accepted.format,
378 "write_sink_breakdown_ms": aw.perf.as_ref().map(write_perf_breakdown_json),
379 "phases_ms": phase_timings.into_json(),
380 }),
381 );
382 }
383
384 Ok(EntityRunResult {
385 outcome: EntityOutcome {
386 report: run_report,
387 file_timings_ms,
388 },
389 abort_run,
390 })
391}
392
393fn write_perf_breakdown_json(
394 perf: &crate::io::format::AcceptedWritePerfBreakdown,
395) -> serde_json::Value {
396 json!({
397 "conversion": perf.conversion_ms,
398 "source_df_build": perf.source_df_build_ms,
399 "merge_exec": perf.merge_exec_ms,
400 "data_write": perf.data_write_ms,
401 "commit": perf.commit_ms,
402 "metrics_read": perf.metrics_read_ms,
403 })
404}
405
406fn resolve_unique_constraints(
407 entity: &crate::config::EntityConfig,
408 normalize_strategy: Option<&str>,
409 write_mode: crate::config::WriteMode,
410) -> FloeResult<Vec<check::UniqueConstraint>> {
411 let unique_keys = check::resolve_schema_unique_keys(&entity.schema);
412 if unique_keys.is_empty() {
413 return Ok(Vec::new());
414 }
415 let merge_primary_key = if matches!(
416 write_mode,
417 crate::config::WriteMode::MergeScd1 | crate::config::WriteMode::MergeScd2
418 ) {
419 entity.schema.primary_key.as_ref().map(|primary_key| {
420 primary_key
421 .iter()
422 .map(|column| column.trim().to_string())
423 .collect::<Vec<_>>()
424 })
425 } else {
426 None
427 };
428 let mut constraints = Vec::with_capacity(unique_keys.len());
429 for key in unique_keys {
430 let mut runtime_columns = Vec::with_capacity(key.len());
431 for name in &key {
432 let column = entity
433 .schema
434 .columns
435 .iter()
436 .find(|column| column.name == *name)
437 .ok_or_else(|| {
438 Box::new(ConfigError(format!(
439 "entity.name={} schema unique key references unknown column {}",
440 entity.name, name
441 )))
442 })?;
443 runtime_columns.push(runtime_column_name(column, normalize_strategy));
444 }
445 let enforce_reject = merge_primary_key
446 .as_ref()
447 .map(|primary_key| primary_key == &key)
448 .unwrap_or(false);
449 constraints.push(check::UniqueConstraint {
450 runtime_columns,
451 report_columns: key,
452 enforce_reject,
453 });
454 }
455 Ok(constraints)
456}
457
458fn append_primary_key_required_columns(
459 required_cols: &mut Vec<String>,
460 entity: &crate::config::EntityConfig,
461 normalize_strategy: Option<&str>,
462) -> FloeResult<()> {
463 let Some(primary_key) = entity.schema.primary_key.as_ref() else {
464 return Ok(());
465 };
466 if primary_key.is_empty() {
467 return Ok(());
468 }
469 let mut seen = required_cols.iter().cloned().collect::<HashSet<_>>();
470 for key_column in primary_key {
471 let column = entity
472 .schema
473 .columns
474 .iter()
475 .find(|column| column.name == *key_column)
476 .ok_or_else(|| {
477 Box::new(ConfigError(format!(
478 "entity.name={} schema.primary_key references unknown column {}",
479 entity.name, key_column
480 )))
481 })?;
482 let runtime = runtime_column_name(column, normalize_strategy);
483 if seen.insert(runtime.clone()) {
484 required_cols.push(runtime);
485 }
486 }
487 Ok(())
488}
489
490fn runtime_column_name(
491 column: &crate::config::ColumnConfig,
492 normalize_strategy: Option<&str>,
493) -> String {
494 let source_name = column.source_or_name();
495 if let Some(strategy) = normalize_strategy {
496 check::normalize::normalize_name(source_name, strategy)
497 } else {
498 source_name.to_string()
499 }
500}