1use crate::config::PolicySeverity;
2use crate::{check, io, report, ConfigError, FloeResult};
3
4impl From<PolicySeverity> for report::Severity {
5 fn from(s: PolicySeverity) -> Self {
6 match s {
7 PolicySeverity::Warn => report::Severity::Warn,
8 PolicySeverity::Reject => report::Severity::Reject,
9 PolicySeverity::Abort => report::Severity::Abort,
10 }
11 }
12}
13use serde_json::json;
14use std::collections::HashSet;
15use std::time::Instant;
16
17use super::file::required_columns;
18use super::{EntityOutcome, RunContext, MAX_RESOLVED_INPUTS};
19use crate::checks::normalize::{
20 output_column_mapping, pii_schema_to_runtime_mapping, resolve_normalize_strategy,
21 resolve_source_columns, source_column_mapping,
22};
23use io::storage::Target;
24
25mod accepted_write;
26mod incremental;
27mod pii;
28mod precheck;
29mod process;
30mod resolve;
31mod validate_split;
32pub(crate) use resolve::{resolve_entity_targets, ResolvedEntityTargets};
33
34use crate::report::entity::{build_run_report, RunReportContext};
35use crate::run::events::{event_time_ms, RunEvent, RunObserver};
36use accepted_write::{run_accepted_write_phase, AcceptedWritePhaseContext};
37use precheck::{run_precheck, PrecheckContext};
38use process::sink_options_warning;
39use validate_split::{run_validate_split_phase, ValidateSplitPhaseContext};
40
41pub(super) struct EntityRunResult {
42 pub outcome: EntityOutcome,
43 pub abort_run: bool,
44}
45
46#[derive(Debug, Default)]
47struct EntityPhaseTimings {
48 precheck_ms: u64,
49 read_parse_ms: u64,
50 checks_validation_ms: u64,
51 accept_reject_split_ms: u64,
52 write_rejected_ms: u64,
53 archive_ms: u64,
54 concat_accepted_ms: u64,
55 write_accepted_ms: u64,
56 write_delta_ms: u64,
57 write_iceberg_ms: u64,
58 report_write_ms: u64,
59}
60
61impl EntityPhaseTimings {
62 fn into_json(self) -> serde_json::Value {
63 json!({
64 "precheck": self.precheck_ms,
65 "read_parse": self.read_parse_ms,
66 "checks_validation": self.checks_validation_ms,
67 "accept_reject_split": self.accept_reject_split_ms,
68 "write_rejected": self.write_rejected_ms,
69 "archive_input": self.archive_ms,
70 "concat_accepted": self.concat_accepted_ms,
71 "write_accepted": self.write_accepted_ms,
72 "write_delta": self.write_delta_ms,
73 "write_iceberg": self.write_iceberg_ms,
74 "write_entity_report": self.report_write_ms,
75 })
76 }
77}
78
79pub(super) fn run_entity(
80 context: &RunContext,
81 runtime: &mut dyn crate::runtime::Runtime,
82 observer: &dyn RunObserver,
83 plan: super::EntityRunPlan<'_>,
84) -> FloeResult<EntityRunResult> {
85 let entity = plan.entity;
86 let perf_enabled = crate::run::perf::phase_timing_enabled();
87 let entity_start = perf_enabled.then(Instant::now);
88 let mut phase_timings = EntityPhaseTimings::default();
89 let input = &entity.source;
90 let write_mode = entity.sink.write_mode;
91 let input_adapter = runtime.input_adapter(input.format.as_str())?;
92 let resolved_targets = plan.resolved_targets;
93 let formatter_name = context
94 .config
95 .report
96 .as_ref()
97 .and_then(|report| report.formatter.as_deref())
98 .unwrap_or("json");
99
100 let normalize_strategy = resolve_normalize_strategy(entity)?;
101 let normalized_columns =
102 resolve_source_columns(&entity.schema.columns, normalize_strategy.as_deref(), false)?;
103 let source_column_map =
104 source_column_mapping(&entity.schema.columns, normalize_strategy.as_deref())?;
105 let row_error_formatter = if source_column_map.is_empty() {
106 check::row_error_formatter(formatter_name, None)?
107 } else {
108 check::row_error_formatter(formatter_name, Some(&source_column_map))?
109 };
110 let read_columns = io::format::resolve_read_columns(
111 entity,
112 &normalized_columns,
113 normalize_strategy.as_deref(),
114 )?;
115 let output_column_map =
116 output_column_mapping(&entity.schema.columns, normalize_strategy.as_deref())?;
117 let pii_runtime_map =
118 pii_schema_to_runtime_mapping(&entity.schema.columns, normalize_strategy.as_deref());
119 let mut required_cols = required_columns(&normalized_columns);
120 append_primary_key_required_columns(&mut required_cols, entity, normalize_strategy.as_deref())?;
121 let unique_constraints =
122 resolve_unique_constraints(entity, normalize_strategy.as_deref(), write_mode)?;
123 let accepted_target = resolved_targets.accepted.clone();
124 let rejected_target = resolved_targets.rejected.clone();
125 let temp_dir = plan.temp_dir;
126 let io::storage::inputs::ResolvedInputs {
127 files: input_files,
128 listed: resolved_files,
129 mode: resolved_mode,
130 } = plan.resolved_inputs;
131 let mut incremental =
132 incremental::prepare_incremental_context(context, runtime.storage(), entity, input_files)?;
133 let input_files = incremental.pending_inputs;
134 let pending_input_count = input_files.len();
135
136 let severity = report::Severity::from(entity.policy.severity);
137 let track_cast_errors = !matches!(input.cast_mode.as_deref(), Some("coerce"));
138
139 let reported_files = resolved_files
140 .iter()
141 .take(MAX_RESOLVED_INPUTS)
142 .cloned()
143 .collect::<Vec<_>>();
144
145 let mut file_reports =
146 Vec::with_capacity(input_files.len() + incremental.skipped_reports.len());
147 let mut totals = report::ResultsTotals {
148 files_total: 0,
149 files_skipped: 0,
150 rows_total: 0,
151 accepted_total: 0,
152 rejected_total: 0,
153 warnings_total: 0,
154 errors_total: 0,
155 };
156 let archive_target = if entity.archive_enabled() {
157 entity
158 .sink
159 .archive
160 .as_ref()
161 .map(|archive| {
162 let storage_name = archive
163 .storage
164 .as_deref()
165 .or(entity.source.storage.as_deref());
166 let resolved = context.storage_resolver.resolve_path(
167 &entity.name,
168 "sink.archive.storage",
169 storage_name,
170 &archive.path,
171 )?;
172 Target::from_resolved(&resolved)
173 })
174 .transpose()?
175 } else {
176 None
177 };
178 let mut file_timings_ms =
179 Vec::with_capacity(input_files.len() + incremental.skipped_reports.len());
180 for skipped in incremental.skipped_reports {
181 observer.on_event(RunEvent::FileFinished {
182 run_id: context.run_id.clone(),
183 entity: entity.name.clone(),
184 input: skipped.input_file.clone(),
185 status: "skipped".to_string(),
186 skip_reason: skipped.skip_reason.clone(),
187 rows: 0,
188 accepted: 0,
189 rejected: 0,
190 elapsed_ms: 0,
191 ts_ms: event_time_ms(),
192 });
193 totals.files_total += 1;
194 totals.files_skipped += 1;
195 totals.warnings_total += skipped.validation.warnings;
196 file_reports.push(skipped);
197 file_timings_ms.push(Some(0));
198 }
199 let sink_options_warning = sink_options_warning(entity);
200 let precheck_start = perf_enabled.then(Instant::now);
202 let precheck = run_precheck(
203 PrecheckContext {
204 context,
205 entity,
206 input_adapter,
207 normalized_columns: &normalized_columns,
208 resolved_targets: &resolved_targets,
209 archive_target: archive_target.as_ref(),
210 temp_dir: temp_dir.as_ref(),
211 cloud: runtime.storage(),
212 observer,
213 file_reports: &mut file_reports,
214 file_timings_ms: &mut file_timings_ms,
215 totals: &mut totals,
216 },
217 input_files,
218 )?;
219 if let Some(start) = precheck_start {
220 phase_timings.precheck_ms += start.elapsed().as_millis() as u64;
221 }
222 let mut abort_run = precheck.abort_run;
223 let prechecked_inputs = precheck.prechecked;
224
225 let mut accepted_accum = Vec::new();
226 let temp_dir_path = temp_dir.as_ref().map(|dir| dir.path());
227 let mut unique_tracker = check::UniqueTracker::with_constraints(unique_constraints);
228 io::unique_seed::seed_unique_tracker_for_append(
229 &mut unique_tracker,
230 write_mode,
231 entity.sink.accepted.format.as_str(),
232 &accepted_target,
233 temp_dir.as_ref().map(|dir| dir.path()),
234 runtime.storage(),
235 &context.storage_resolver,
236 &context.catalog_resolver,
237 entity,
238 )?;
239 let phase_b = run_validate_split_phase(ValidateSplitPhaseContext {
241 run_context: context,
242 runtime,
243 observer,
244 entity,
245 input_adapter,
246 prechecked_inputs,
247 read_columns: &read_columns,
248 normalize_strategy: normalize_strategy.as_deref(),
249 normalized_columns: &normalized_columns,
250 required_cols: &required_cols,
251 source_column_map: &source_column_map,
252 output_column_map: &output_column_map,
253 pii_runtime_map: &pii_runtime_map,
254 row_error_formatter: row_error_formatter.as_ref(),
255 severity,
256 track_cast_errors,
257 write_mode,
258 rejected_target: rejected_target.as_ref(),
259 archive_target: archive_target.as_ref(),
260 temp_dir: temp_dir_path,
261 sink_options_warning: sink_options_warning.as_deref(),
262 perf_enabled,
263 phase_timings: &mut phase_timings,
264 file_reports: &mut file_reports,
265 file_timings_ms: &mut file_timings_ms,
266 totals: &mut totals,
267 unique_tracker: &mut unique_tracker,
268 accepted_accum: &mut accepted_accum,
269 initial_abort_run: abort_run,
270 })?;
271 abort_run = phase_b.abort_run;
272 let accepted_accum_rows = phase_b.accepted_accum_rows;
273 let accepted_accum_frames = phase_b.accepted_accum_frames;
274 let unique_constraints = unique_tracker.results();
275
276 totals.files_total = file_reports.len() as u64;
277
278 let accepted_target_uri = accepted_target.target_uri().to_string();
279 let aw = run_accepted_write_phase(AcceptedWritePhaseContext {
280 run_context: context,
281 observer,
282 runtime,
283 entity,
284 accepted_target: &accepted_target,
285 temp_dir: temp_dir_path,
286 write_mode,
287 perf_enabled,
288 phase_timings: &mut phase_timings,
289 pending_input_count,
290 accepted_accum,
291 })?;
292 if aw.parts_written > 0 {
293 for file_report in &mut file_reports {
294 file_report.output.accepted_path = Some(accepted_target_uri.clone());
295 }
296 }
297
298 let perf_files_total = totals.files_total;
299 let perf_rows_total = totals.rows_total;
300
301 let run_report = build_run_report(RunReportContext {
302 context,
303 entity,
304 input,
305 resolved_targets: &resolved_targets,
306 resolved_mode,
307 resolved_files: &resolved_files,
308 reported_files,
309 totals,
310 file_reports,
311 severity,
312 accepted_write_mode: write_mode,
313 accepted_parts_written: aw.parts_written,
314 accepted_files_written: aw.files_written,
315 accepted_part_files: aw.part_files,
316 accepted_table_version: aw.table_version,
317 accepted_snapshot_id: aw.snapshot_id,
318 accepted_table_root_uri: aw.table_root_uri,
319 accepted_catalog: aw.catalog,
320 accepted_total_bytes_written: aw.metrics.total_bytes_written,
321 accepted_avg_file_size_mb: aw.metrics.avg_file_size_mb,
322 accepted_small_files_count: aw.metrics.small_files_count,
323 accepted_merge_key: aw
324 .merge
325 .as_ref()
326 .map(|m| m.merge_key.clone())
327 .unwrap_or_default(),
328 accepted_inserted_count: aw.merge.as_ref().map(|m| m.inserted_count),
329 accepted_updated_count: aw.merge.as_ref().map(|m| m.updated_count),
330 accepted_closed_count: aw.merge.as_ref().and_then(|m| m.closed_count),
331 accepted_unchanged_count: aw.merge.as_ref().and_then(|m| m.unchanged_count),
332 accepted_target_rows_before: aw.merge.as_ref().map(|m| m.target_rows_before),
333 accepted_target_rows_after: aw.merge.as_ref().map(|m| m.target_rows_after),
334 accepted_merge_elapsed_ms: aw.merge.as_ref().map(|m| m.merge_elapsed_ms),
335 accepted_schema_evolution: report::SchemaEvolutionSummary {
336 enabled: aw.schema_evolution.enabled,
337 mode: aw.schema_evolution.mode,
338 applied: aw.schema_evolution.applied,
339 added_columns: aw.schema_evolution.added_columns,
340 incompatible_changes_detected: aw.schema_evolution.incompatible_changes_detected,
341 },
342 unique_constraints,
343 });
344
345 if let Some(report_target) = &context.report_target {
346 let report_write_start = perf_enabled.then(Instant::now);
347 crate::report::output::write_entity_report(
348 report_target,
349 &context.run_id,
350 entity,
351 &run_report,
352 runtime.storage(),
353 &context.storage_resolver,
354 )?;
355 if let Some(start) = report_write_start {
356 phase_timings.report_write_ms += start.elapsed().as_millis() as u64;
357 }
358 }
359
360 if let Some(pending_state) = incremental.pending_state.as_mut() {
361 let (status, _) = report::compute_run_outcome(
362 &run_report
363 .files
364 .iter()
365 .map(|file| file.status)
366 .collect::<Vec<_>>(),
367 );
368 if matches!(
369 status,
370 report::RunStatus::Success
371 | report::RunStatus::SuccessWithWarnings
372 | report::RunStatus::Rejected
373 ) {
374 pending_state.commit(context, runtime.storage(), entity)?;
375 } else {
376 pending_state.release(context, runtime.storage(), entity)?;
377 }
378 }
379
380 if let Some(start) = entity_start {
381 crate::run::perf::emit_perf_log(
382 observer,
383 &context.run_id,
384 Some(&entity.name),
385 "perf_entity_phase_timings",
386 json!({
387 "entity": entity.name,
388 "elapsed_ms": start.elapsed().as_millis() as u64,
389 "files_total": perf_files_total,
390 "rows_total": perf_rows_total,
391 "accepted_rows_accumulated": accepted_accum_rows,
392 "accepted_frames_accumulated": accepted_accum_frames,
393 "write_sink_format": entity.sink.accepted.format,
394 "write_sink_breakdown_ms": aw.perf.as_ref().map(write_perf_breakdown_json),
395 "phases_ms": phase_timings.into_json(),
396 }),
397 );
398 }
399
400 Ok(EntityRunResult {
401 outcome: EntityOutcome {
402 report: run_report,
403 file_timings_ms,
404 },
405 abort_run,
406 })
407}
408
409fn write_perf_breakdown_json(
410 perf: &crate::io::format::AcceptedWritePerfBreakdown,
411) -> serde_json::Value {
412 json!({
413 "conversion": perf.conversion_ms,
414 "source_df_build": perf.source_df_build_ms,
415 "merge_exec": perf.merge_exec_ms,
416 "data_write": perf.data_write_ms,
417 "commit": perf.commit_ms,
418 "metrics_read": perf.metrics_read_ms,
419 })
420}
421
422fn resolve_unique_constraints(
423 entity: &crate::config::EntityConfig,
424 normalize_strategy: Option<&str>,
425 write_mode: crate::config::WriteMode,
426) -> FloeResult<Vec<check::UniqueConstraint>> {
427 let unique_keys = check::resolve_schema_unique_keys(&entity.schema);
428 if unique_keys.is_empty() {
429 return Ok(Vec::new());
430 }
431 let merge_primary_key = if matches!(
432 write_mode,
433 crate::config::WriteMode::MergeScd1 | crate::config::WriteMode::MergeScd2
434 ) {
435 entity.schema.primary_key.as_ref().map(|primary_key| {
436 primary_key
437 .iter()
438 .map(|column| column.trim().to_string())
439 .collect::<Vec<_>>()
440 })
441 } else {
442 None
443 };
444 let mut constraints = Vec::with_capacity(unique_keys.len());
445 for key in unique_keys {
446 let mut runtime_columns = Vec::with_capacity(key.len());
447 for name in &key {
448 let column = entity
449 .schema
450 .columns
451 .iter()
452 .find(|column| column.name == *name)
453 .ok_or_else(|| {
454 Box::new(ConfigError(format!(
455 "entity.name={} schema unique key references unknown column {}",
456 entity.name, name
457 )))
458 })?;
459 runtime_columns.push(runtime_column_name(column, normalize_strategy));
460 }
461 let enforce_reject = merge_primary_key
462 .as_ref()
463 .map(|primary_key| primary_key == &key)
464 .unwrap_or(false);
465 constraints.push(check::UniqueConstraint {
466 runtime_columns,
467 report_columns: key,
468 enforce_reject,
469 });
470 }
471 Ok(constraints)
472}
473
474fn append_primary_key_required_columns(
475 required_cols: &mut Vec<String>,
476 entity: &crate::config::EntityConfig,
477 normalize_strategy: Option<&str>,
478) -> FloeResult<()> {
479 let Some(primary_key) = entity.schema.primary_key.as_ref() else {
480 return Ok(());
481 };
482 if primary_key.is_empty() {
483 return Ok(());
484 }
485 let mut seen = required_cols.iter().cloned().collect::<HashSet<_>>();
486 for key_column in primary_key {
487 let column = entity
488 .schema
489 .columns
490 .iter()
491 .find(|column| column.name == *key_column)
492 .ok_or_else(|| {
493 Box::new(ConfigError(format!(
494 "entity.name={} schema.primary_key references unknown column {}",
495 entity.name, key_column
496 )))
497 })?;
498 let runtime = runtime_column_name(column, normalize_strategy);
499 if seen.insert(runtime.clone()) {
500 required_cols.push(runtime);
501 }
502 }
503 Ok(())
504}
505
506fn runtime_column_name(
507 column: &crate::config::ColumnConfig,
508 normalize_strategy: Option<&str>,
509) -> String {
510 let source_name = column.source_or_name();
511 if let Some(strategy) = normalize_strategy {
512 check::normalize::normalize_name(source_name, strategy)
513 } else {
514 source_name.to_string()
515 }
516}