1use crate::config::PolicySeverity;
2use crate::{check, io, report, ConfigError, FloeResult};
3
4impl From<PolicySeverity> for report::Severity {
5 fn from(s: PolicySeverity) -> Self {
6 match s {
7 PolicySeverity::Warn => report::Severity::Warn,
8 PolicySeverity::Reject => report::Severity::Reject,
9 PolicySeverity::Abort => report::Severity::Abort,
10 }
11 }
12}
13use serde_json::json;
14use std::collections::HashSet;
15use std::time::Instant;
16
17use super::file::required_columns;
18use super::{EntityOutcome, RunContext, MAX_RESOLVED_INPUTS};
19use crate::checks::normalize::{
20 output_column_mapping, pii_schema_to_runtime_mapping, resolve_normalize_strategy,
21 resolve_source_columns, source_column_mapping,
22};
23use io::storage::Target;
24
25mod accepted_write;
26mod incremental;
27mod pii;
28mod precheck;
29mod process;
30mod resolve;
31mod validate_split;
32pub(crate) use resolve::{resolve_entity_targets, ResolvedEntityTargets};
33
34use crate::report::entity::{build_run_report, RunReportContext};
35use crate::run::events::{event_time_ms, RunEvent, RunObserver};
36use accepted_write::{run_accepted_write_phase, AcceptedWritePhaseContext};
37use precheck::{run_precheck, PrecheckContext};
38use process::sink_options_warning;
39use validate_split::{run_validate_split_phase, ValidateSplitPhaseContext};
40
41pub(super) struct EntityRunResult {
42 pub outcome: EntityOutcome,
43 pub abort_run: bool,
44}
45
46#[derive(Debug, Default)]
47struct EntityPhaseTimings {
48 precheck_ms: u64,
49 read_parse_ms: u64,
50 checks_validation_ms: u64,
51 accept_reject_split_ms: u64,
52 write_rejected_ms: u64,
53 archive_ms: u64,
54 concat_accepted_ms: u64,
55 write_accepted_ms: u64,
56 write_delta_ms: u64,
57 write_iceberg_ms: u64,
58 report_write_ms: u64,
59}
60
61impl EntityPhaseTimings {
62 fn into_json(self) -> serde_json::Value {
63 json!({
64 "precheck": self.precheck_ms,
65 "read_parse": self.read_parse_ms,
66 "checks_validation": self.checks_validation_ms,
67 "accept_reject_split": self.accept_reject_split_ms,
68 "write_rejected": self.write_rejected_ms,
69 "archive_input": self.archive_ms,
70 "concat_accepted": self.concat_accepted_ms,
71 "write_accepted": self.write_accepted_ms,
72 "write_delta": self.write_delta_ms,
73 "write_iceberg": self.write_iceberg_ms,
74 "write_entity_report": self.report_write_ms,
75 })
76 }
77}
78
79pub(super) fn run_entity(
80 context: &RunContext,
81 runtime: &mut dyn crate::runtime::Runtime,
82 observer: &dyn RunObserver,
83 plan: super::EntityRunPlan<'_>,
84) -> FloeResult<EntityRunResult> {
85 let entity = plan.entity;
86 let perf_enabled = crate::run::perf::phase_timing_enabled();
87 let entity_start = perf_enabled.then(Instant::now);
88 let mut phase_timings = EntityPhaseTimings::default();
89 let input = &entity.source;
90 let write_mode =
91 if context.full_refresh && entity.sink.write_mode == crate::config::WriteMode::Append {
92 crate::config::WriteMode::Overwrite
93 } else {
94 entity.sink.write_mode
95 };
96 let input_adapter = runtime.input_adapter(input.format.as_str())?;
97 let resolved_targets = plan.resolved_targets;
98 let formatter_name = context
99 .config
100 .report
101 .as_ref()
102 .and_then(|report| report.formatter.as_deref())
103 .unwrap_or("json");
104
105 let normalize_strategy = resolve_normalize_strategy(entity)?;
106 let normalized_columns =
107 resolve_source_columns(&entity.schema.columns, normalize_strategy.as_deref(), false)?;
108 let source_column_map =
109 source_column_mapping(&entity.schema.columns, normalize_strategy.as_deref())?;
110 let row_error_formatter = if source_column_map.is_empty() {
111 check::row_error_formatter(formatter_name, None)?
112 } else {
113 check::row_error_formatter(formatter_name, Some(&source_column_map))?
114 };
115 let read_columns = io::format::resolve_read_columns(
116 entity,
117 &normalized_columns,
118 normalize_strategy.as_deref(),
119 )?;
120 let output_column_map =
121 output_column_mapping(&entity.schema.columns, normalize_strategy.as_deref())?;
122 let pii_runtime_map =
123 pii_schema_to_runtime_mapping(&entity.schema.columns, normalize_strategy.as_deref());
124 let mut required_cols = required_columns(&normalized_columns);
125 append_primary_key_required_columns(&mut required_cols, entity, normalize_strategy.as_deref())?;
126 let unique_constraints =
127 resolve_unique_constraints(entity, normalize_strategy.as_deref(), write_mode)?;
128 let accepted_target = resolved_targets.accepted.clone();
129 let rejected_target = resolved_targets.rejected.clone();
130 let temp_dir = plan.temp_dir;
131 let io::storage::inputs::ResolvedInputs {
132 files: input_files,
133 listed: resolved_files,
134 mode: resolved_mode,
135 } = plan.resolved_inputs;
136 let mut incremental =
137 incremental::prepare_incremental_context(context, runtime.storage(), entity, input_files)?;
138 let input_files = incremental.pending_inputs;
139 let pending_input_count = input_files.len();
140
141 let severity = report::Severity::from(entity.policy.severity);
142 let track_cast_errors = !matches!(input.cast_mode.as_deref(), Some("coerce"));
143
144 let reported_files = resolved_files
145 .iter()
146 .take(MAX_RESOLVED_INPUTS)
147 .cloned()
148 .collect::<Vec<_>>();
149
150 let mut file_reports =
151 Vec::with_capacity(input_files.len() + incremental.skipped_reports.len());
152 let mut totals = report::ResultsTotals {
153 files_total: 0,
154 files_skipped: 0,
155 rows_total: 0,
156 accepted_total: 0,
157 rejected_total: 0,
158 warnings_total: 0,
159 errors_total: 0,
160 };
161 let archive_target = if entity.archive_enabled() {
162 entity
163 .sink
164 .archive
165 .as_ref()
166 .map(|archive| {
167 let storage_name = archive
168 .storage
169 .as_deref()
170 .or(entity.source.storage.as_deref());
171 let resolved = context.storage_resolver.resolve_path(
172 &entity.name,
173 "sink.archive.storage",
174 storage_name,
175 &archive.path,
176 )?;
177 Target::from_resolved(&resolved)
178 })
179 .transpose()?
180 } else {
181 None
182 };
183 let mut file_timings_ms =
184 Vec::with_capacity(input_files.len() + incremental.skipped_reports.len());
185 for skipped in incremental.skipped_reports {
186 observer.on_event(RunEvent::FileFinished {
187 run_id: context.run_id.clone(),
188 entity: entity.name.clone(),
189 input: skipped.input_file.clone(),
190 status: "skipped".to_string(),
191 skip_reason: skipped.skip_reason.clone(),
192 rows: 0,
193 accepted: 0,
194 rejected: 0,
195 elapsed_ms: 0,
196 ts_ms: event_time_ms(),
197 });
198 totals.files_total += 1;
199 totals.files_skipped += 1;
200 totals.warnings_total += skipped.validation.warnings;
201 file_reports.push(skipped);
202 file_timings_ms.push(Some(0));
203 }
204 let sink_options_warning = sink_options_warning(entity);
205 let precheck_start = perf_enabled.then(Instant::now);
207 let precheck = run_precheck(
208 PrecheckContext {
209 context,
210 entity,
211 input_adapter,
212 normalized_columns: &normalized_columns,
213 resolved_targets: &resolved_targets,
214 archive_target: archive_target.as_ref(),
215 temp_dir: temp_dir.as_ref(),
216 cloud: runtime.storage(),
217 observer,
218 file_reports: &mut file_reports,
219 file_timings_ms: &mut file_timings_ms,
220 totals: &mut totals,
221 },
222 input_files,
223 )?;
224 if let Some(start) = precheck_start {
225 phase_timings.precheck_ms += start.elapsed().as_millis() as u64;
226 }
227 let mut abort_run = precheck.abort_run;
228 let prechecked_inputs = precheck.prechecked;
229
230 let mut accepted_accum = Vec::new();
231 let temp_dir_path = temp_dir.as_ref().map(|dir| dir.path());
232 let mut unique_tracker = check::UniqueTracker::with_constraints(unique_constraints);
233 io::unique_seed::seed_unique_tracker_for_append(
234 &mut unique_tracker,
235 write_mode,
236 entity.sink.accepted.format.as_str(),
237 &accepted_target,
238 temp_dir.as_ref().map(|dir| dir.path()),
239 runtime.storage(),
240 &context.storage_resolver,
241 &context.catalog_resolver,
242 entity,
243 )?;
244 let phase_b = run_validate_split_phase(ValidateSplitPhaseContext {
246 run_context: context,
247 runtime,
248 observer,
249 entity,
250 input_adapter,
251 prechecked_inputs,
252 read_columns: &read_columns,
253 normalize_strategy: normalize_strategy.as_deref(),
254 normalized_columns: &normalized_columns,
255 required_cols: &required_cols,
256 source_column_map: &source_column_map,
257 output_column_map: &output_column_map,
258 pii_runtime_map: &pii_runtime_map,
259 row_error_formatter: row_error_formatter.as_ref(),
260 severity,
261 track_cast_errors,
262 write_mode,
263 rejected_target: rejected_target.as_ref(),
264 archive_target: archive_target.as_ref(),
265 temp_dir: temp_dir_path,
266 sink_options_warning: sink_options_warning.as_deref(),
267 perf_enabled,
268 phase_timings: &mut phase_timings,
269 file_reports: &mut file_reports,
270 file_timings_ms: &mut file_timings_ms,
271 totals: &mut totals,
272 unique_tracker: &mut unique_tracker,
273 accepted_accum: &mut accepted_accum,
274 initial_abort_run: abort_run,
275 })?;
276 abort_run = phase_b.abort_run;
277 let accepted_accum_rows = phase_b.accepted_accum_rows;
278 let accepted_accum_frames = phase_b.accepted_accum_frames;
279 let unique_constraints = unique_tracker.results();
280
281 totals.files_total = file_reports.len() as u64;
282
283 let accepted_target_uri = accepted_target.target_uri().to_string();
284 let aw = run_accepted_write_phase(AcceptedWritePhaseContext {
285 run_context: context,
286 observer,
287 runtime,
288 entity,
289 accepted_target: &accepted_target,
290 temp_dir: temp_dir_path,
291 write_mode,
292 full_refresh: context.full_refresh,
293 perf_enabled,
294 phase_timings: &mut phase_timings,
295 pending_input_count,
296 accepted_accum,
297 })?;
298 if aw.parts_written > 0 {
299 for file_report in &mut file_reports {
300 file_report.output.accepted_path = Some(accepted_target_uri.clone());
301 }
302 }
303
304 let perf_files_total = totals.files_total;
305 let perf_rows_total = totals.rows_total;
306
307 let run_report = build_run_report(RunReportContext {
308 context,
309 entity,
310 input,
311 resolved_targets: &resolved_targets,
312 resolved_mode,
313 resolved_files: &resolved_files,
314 reported_files,
315 totals,
316 file_reports,
317 severity,
318 accepted_write_mode: write_mode,
319 accepted_parts_written: aw.parts_written,
320 accepted_files_written: aw.files_written,
321 accepted_part_files: aw.part_files,
322 accepted_table_version: aw.table_version,
323 accepted_snapshot_id: aw.snapshot_id,
324 accepted_table_root_uri: aw.table_root_uri,
325 accepted_catalog: aw.catalog,
326 accepted_total_bytes_written: aw.metrics.total_bytes_written,
327 accepted_avg_file_size_mb: aw.metrics.avg_file_size_mb,
328 accepted_small_files_count: aw.metrics.small_files_count,
329 accepted_merge_key: aw
330 .merge
331 .as_ref()
332 .map(|m| m.merge_key.clone())
333 .unwrap_or_default(),
334 accepted_inserted_count: aw.merge.as_ref().map(|m| m.inserted_count),
335 accepted_updated_count: aw.merge.as_ref().map(|m| m.updated_count),
336 accepted_closed_count: aw.merge.as_ref().and_then(|m| m.closed_count),
337 accepted_unchanged_count: aw.merge.as_ref().and_then(|m| m.unchanged_count),
338 accepted_target_rows_before: aw.merge.as_ref().map(|m| m.target_rows_before),
339 accepted_target_rows_after: aw.merge.as_ref().map(|m| m.target_rows_after),
340 accepted_merge_elapsed_ms: aw.merge.as_ref().map(|m| m.merge_elapsed_ms),
341 accepted_schema_evolution: report::SchemaEvolutionSummary {
342 enabled: aw.schema_evolution.enabled,
343 mode: aw.schema_evolution.mode,
344 applied: aw.schema_evolution.applied,
345 added_columns: aw.schema_evolution.added_columns,
346 incompatible_changes_detected: aw.schema_evolution.incompatible_changes_detected,
347 },
348 unique_constraints,
349 });
350
351 if let Some(report_target) = &context.report_target {
352 let report_write_start = perf_enabled.then(Instant::now);
353 crate::report::output::write_entity_report(
354 report_target,
355 &context.run_id,
356 entity,
357 &run_report,
358 runtime.storage(),
359 &context.storage_resolver,
360 )?;
361 if let Some(start) = report_write_start {
362 phase_timings.report_write_ms += start.elapsed().as_millis() as u64;
363 }
364 }
365
366 if let Some(pending_state) = incremental.pending_state.as_mut() {
367 let (status, _) = report::compute_run_outcome(
368 &run_report
369 .files
370 .iter()
371 .map(|file| file.status)
372 .collect::<Vec<_>>(),
373 );
374 if matches!(
375 status,
376 report::RunStatus::Success
377 | report::RunStatus::SuccessWithWarnings
378 | report::RunStatus::Rejected
379 ) {
380 pending_state.commit(context, runtime.storage(), entity)?;
381 } else {
382 pending_state.release(context, runtime.storage(), entity)?;
383 }
384 }
385
386 if let Some(start) = entity_start {
387 crate::run::perf::emit_perf_log(
388 observer,
389 &context.run_id,
390 Some(&entity.name),
391 "perf_entity_phase_timings",
392 json!({
393 "entity": entity.name,
394 "elapsed_ms": start.elapsed().as_millis() as u64,
395 "files_total": perf_files_total,
396 "rows_total": perf_rows_total,
397 "accepted_rows_accumulated": accepted_accum_rows,
398 "accepted_frames_accumulated": accepted_accum_frames,
399 "write_sink_format": entity.sink.accepted.format,
400 "write_sink_breakdown_ms": aw.perf.as_ref().map(write_perf_breakdown_json),
401 "phases_ms": phase_timings.into_json(),
402 }),
403 );
404 }
405
406 Ok(EntityRunResult {
407 outcome: EntityOutcome {
408 report: run_report,
409 file_timings_ms,
410 },
411 abort_run,
412 })
413}
414
415fn write_perf_breakdown_json(
416 perf: &crate::io::format::AcceptedWritePerfBreakdown,
417) -> serde_json::Value {
418 json!({
419 "conversion": perf.conversion_ms,
420 "source_df_build": perf.source_df_build_ms,
421 "merge_exec": perf.merge_exec_ms,
422 "data_write": perf.data_write_ms,
423 "commit": perf.commit_ms,
424 "metrics_read": perf.metrics_read_ms,
425 })
426}
427
428fn resolve_unique_constraints(
429 entity: &crate::config::EntityConfig,
430 normalize_strategy: Option<&str>,
431 write_mode: crate::config::WriteMode,
432) -> FloeResult<Vec<check::UniqueConstraint>> {
433 let unique_keys = check::resolve_schema_unique_keys(&entity.schema);
434 if unique_keys.is_empty() {
435 return Ok(Vec::new());
436 }
437 let merge_primary_key = if matches!(
438 write_mode,
439 crate::config::WriteMode::MergeScd1 | crate::config::WriteMode::MergeScd2
440 ) {
441 entity.schema.primary_key.as_ref().map(|primary_key| {
442 primary_key
443 .iter()
444 .map(|column| column.trim().to_string())
445 .collect::<Vec<_>>()
446 })
447 } else {
448 None
449 };
450 let mut constraints = Vec::with_capacity(unique_keys.len());
451 for key in unique_keys {
452 let mut runtime_columns = Vec::with_capacity(key.len());
453 for name in &key {
454 let column = entity
455 .schema
456 .columns
457 .iter()
458 .find(|column| column.name == *name)
459 .ok_or_else(|| {
460 Box::new(ConfigError(format!(
461 "entity.name={} schema unique key references unknown column {}",
462 entity.name, name
463 )))
464 })?;
465 runtime_columns.push(runtime_column_name(column, normalize_strategy));
466 }
467 let enforce_reject = merge_primary_key
468 .as_ref()
469 .map(|primary_key| primary_key == &key)
470 .unwrap_or(false);
471 constraints.push(check::UniqueConstraint {
472 runtime_columns,
473 report_columns: key,
474 enforce_reject,
475 });
476 }
477 Ok(constraints)
478}
479
480fn append_primary_key_required_columns(
481 required_cols: &mut Vec<String>,
482 entity: &crate::config::EntityConfig,
483 normalize_strategy: Option<&str>,
484) -> FloeResult<()> {
485 let Some(primary_key) = entity.schema.primary_key.as_ref() else {
486 return Ok(());
487 };
488 if primary_key.is_empty() {
489 return Ok(());
490 }
491 let mut seen = required_cols.iter().cloned().collect::<HashSet<_>>();
492 for key_column in primary_key {
493 let column = entity
494 .schema
495 .columns
496 .iter()
497 .find(|column| column.name == *key_column)
498 .ok_or_else(|| {
499 Box::new(ConfigError(format!(
500 "entity.name={} schema.primary_key references unknown column {}",
501 entity.name, key_column
502 )))
503 })?;
504 let runtime = runtime_column_name(column, normalize_strategy);
505 if seen.insert(runtime.clone()) {
506 required_cols.push(runtime);
507 }
508 }
509 Ok(())
510}
511
512fn runtime_column_name(
513 column: &crate::config::ColumnConfig,
514 normalize_strategy: Option<&str>,
515) -> String {
516 let source_name = column.source_or_name();
517 if let Some(strategy) = normalize_strategy {
518 check::normalize::normalize_name(source_name, strategy)
519 } else {
520 source_name.to_string()
521 }
522}