use std::path::Path;
use std::time::Instant;
use polars::prelude::DataFrame;
use crate::errors::RunError;
use crate::{config, io, report, FloeResult};
use super::super::output::{write_accepted_output, AcceptedOutputContext};
use super::EntityPhaseTimings;
use crate::run::RunContext;
use io::storage::Target;
#[derive(Debug, Default)]
pub(super) struct AcceptedWriteReportState {
pub(super) parts_written: u64,
pub(super) files_written: u64,
pub(super) part_files: Vec<String>,
pub(super) table_version: Option<i64>,
pub(super) snapshot_id: Option<i64>,
pub(super) table_root_uri: Option<String>,
pub(super) iceberg_catalog_name: Option<String>,
pub(super) iceberg_database: Option<String>,
pub(super) iceberg_namespace: Option<String>,
pub(super) iceberg_table: Option<String>,
pub(super) total_bytes_written: Option<u64>,
pub(super) avg_file_size_mb: Option<f64>,
pub(super) small_files_count: Option<u64>,
pub(super) merge_key: Vec<String>,
pub(super) inserted_count: Option<u64>,
pub(super) updated_count: Option<u64>,
pub(super) target_rows_before: Option<u64>,
pub(super) target_rows_after: Option<u64>,
pub(super) merge_elapsed_ms: Option<u64>,
}
impl AcceptedWriteReportState {
pub(super) fn from_write_output(output: io::format::AcceptedWriteOutput) -> Self {
Self {
parts_written: output.parts_written,
files_written: output.files_written,
part_files: output.part_files,
table_version: output.table_version,
snapshot_id: output.snapshot_id,
table_root_uri: output.table_root_uri,
iceberg_catalog_name: output.iceberg_catalog_name,
iceberg_database: output.iceberg_database,
iceberg_namespace: output.iceberg_namespace,
iceberg_table: output.iceberg_table,
total_bytes_written: output.metrics.total_bytes_written,
avg_file_size_mb: output.metrics.avg_file_size_mb,
small_files_count: output.metrics.small_files_count,
merge_key: output
.merge
.as_ref()
.map(|merge| merge.merge_key.clone())
.unwrap_or_default(),
inserted_count: output.merge.as_ref().map(|merge| merge.inserted_count),
updated_count: output.merge.as_ref().map(|merge| merge.updated_count),
target_rows_before: output.merge.as_ref().map(|merge| merge.target_rows_before),
target_rows_after: output.merge.as_ref().map(|merge| merge.target_rows_after),
merge_elapsed_ms: output.merge.as_ref().map(|merge| merge.merge_elapsed_ms),
}
}
pub(super) fn apply_accepted_path_to_file_reports(
&self,
file_reports: &mut [report::FileReport],
accepted_target_uri: &str,
) {
if self.parts_written == 0 {
return;
}
let accepted_path = accepted_target_uri.to_string();
for file_report in file_reports {
file_report.output.accepted_path = Some(accepted_path.clone());
}
}
}
pub(super) struct AcceptedWritePhaseContext<'a> {
pub(super) run_context: &'a RunContext,
pub(super) runtime: &'a mut dyn crate::runtime::Runtime,
pub(super) entity: &'a config::EntityConfig,
pub(super) accepted_target: &'a Target,
pub(super) temp_dir: Option<&'a Path>,
pub(super) write_mode: config::WriteMode,
pub(super) perf_enabled: bool,
pub(super) phase_timings: &'a mut EntityPhaseTimings,
pub(super) accepted_accum: Vec<DataFrame>,
}
pub(super) fn run_accepted_write_phase(
context: AcceptedWritePhaseContext<'_>,
) -> FloeResult<AcceptedWriteReportState> {
let AcceptedWritePhaseContext {
run_context,
runtime,
entity,
accepted_target,
temp_dir,
write_mode,
perf_enabled,
phase_timings,
mut accepted_accum,
} = context;
let mut accepted_write_report = AcceptedWriteReportState::default();
if accepted_accum.is_empty() {
return Ok(accepted_write_report);
}
let concat_start = perf_enabled.then(Instant::now);
let mut accepted_df = accepted_accum
.pop()
.ok_or_else(|| Box::new(RunError("missing accepted dataframe".to_string())))?;
for frame in accepted_accum {
accepted_df
.vstack_mut(&frame)
.map_err(|err| Box::new(RunError(format!("failed to concat accepted rows: {err}"))))?;
}
if let Some(start) = concat_start {
phase_timings.concat_accepted_ms += start.elapsed().as_millis() as u64;
}
let output_stem = io::storage::paths::build_part_stem(0);
let accepted_adapter = runtime.accepted_sink_adapter(entity.sink.accepted.format.as_str())?;
let write_accepted_start = perf_enabled.then(Instant::now);
let accepted_output = write_accepted_output(AcceptedOutputContext {
adapter: accepted_adapter,
target: accepted_target,
df: &mut accepted_df,
output_stem: &output_stem,
temp_dir,
cloud: runtime.storage(),
resolver: &run_context.storage_resolver,
catalogs: &run_context.catalog_resolver,
entity,
mode: write_mode,
})?;
if let Some(start) = write_accepted_start {
phase_timings.write_accepted_ms += start.elapsed().as_millis() as u64;
}
accepted_write_report = AcceptedWriteReportState::from_write_output(accepted_output);
Ok(accepted_write_report)
}