floe_core/io/write/delta/
commit_metrics.rs1use std::path::Path;
2
3use deltalake::logstore::read_commit_entry;
4use deltalake::table::builder::DeltaTableBuilder;
5use serde_json::Value;
6
7use crate::errors::RunError;
8use crate::io::format::AcceptedWriteMetrics;
9use crate::io::storage::{object_store, Target};
10use crate::io::write::metrics;
11use crate::{config, FloeResult};
12
13pub(super) fn delta_commit_metrics_for_target(
14 runtime: &tokio::runtime::Runtime,
15 target: &Target,
16 resolver: &config::StorageResolver,
17 entity: &config::EntityConfig,
18 version: i64,
19 small_file_threshold_bytes: u64,
20) -> FloeResult<(u64, Vec<String>, AcceptedWriteMetrics)> {
21 match target {
22 Target::Local { base_path, .. } => {
23 let stats = delta_commit_add_stats(Path::new(base_path), version)?;
24 Ok(delta_commit_stats_to_output(
25 stats,
26 small_file_threshold_bytes,
27 ))
28 }
29 Target::S3 { .. } | Target::Gcs { .. } | Target::Adls { .. } => {
32 match delta_commit_add_stats_via_object_store(
33 runtime, target, resolver, entity, version,
34 ) {
35 Ok(stats) => Ok(delta_commit_stats_to_output(
36 stats,
37 small_file_threshold_bytes,
38 )),
39 Err(_) => Ok(delta_commit_metrics_fallback_unknown()),
40 }
41 }
42 }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Default)]
46pub struct DeltaCommitAddStats {
47 files_written: u64,
48 part_files: Vec<String>,
49 file_sizes: Vec<u64>,
50}
51
52fn delta_commit_add_stats(table_root: &Path, version: i64) -> FloeResult<DeltaCommitAddStats> {
53 let log_path = table_root
54 .join("_delta_log")
55 .join(format!("{version:020}.json"));
56 let bytes = std::fs::read(&log_path).map_err(|err| {
57 Box::new(RunError(format!(
58 "delta metrics failed to open commit log {}: {err}",
59 log_path.display()
60 )))
61 })?;
62 parse_delta_commit_add_stats_bytes_with_context(&bytes, &log_path.display().to_string())
63}
64
65fn delta_commit_add_stats_via_object_store(
66 runtime: &tokio::runtime::Runtime,
67 target: &Target,
68 resolver: &config::StorageResolver,
69 entity: &config::EntityConfig,
70 version: i64,
71) -> FloeResult<DeltaCommitAddStats> {
72 let store = object_store::delta_store_config(target, resolver, entity)?;
73 let builder = DeltaTableBuilder::from_url(store.table_url.clone())
74 .map_err(|err| Box::new(RunError(format!("delta metrics builder failed: {err}"))))?
75 .with_storage_options(store.storage_options);
76 let log_store = builder.build_storage().map_err(|err| {
77 Box::new(RunError(format!(
78 "delta metrics log store init failed: {err}"
79 )))
80 })?;
81 let bytes = runtime
82 .block_on(async { read_commit_entry(log_store.object_store(None).as_ref(), version).await })
83 .map_err(|err| Box::new(RunError(format!("delta metrics commit read failed: {err}"))))?
84 .ok_or_else(|| {
85 Box::new(RunError(format!(
86 "delta metrics commit log missing for version {version}"
87 ))) as Box<dyn std::error::Error + Send + Sync>
88 })?;
89 parse_delta_commit_add_stats_bytes_with_context(
90 bytes.as_ref(),
91 &format!("remote delta commit version {version}"),
92 )
93}
94
95#[doc(hidden)]
96pub fn parse_delta_commit_add_stats_bytes(bytes: &[u8]) -> FloeResult<DeltaCommitAddStats> {
97 parse_delta_commit_add_stats_bytes_with_context(bytes, "delta commit log bytes")
98}
99
100fn parse_delta_commit_add_stats_bytes_with_context(
101 bytes: &[u8],
102 context: &str,
103) -> FloeResult<DeltaCommitAddStats> {
104 let content = std::str::from_utf8(bytes).map_err(|err| {
105 Box::new(RunError(format!(
106 "delta metrics failed to decode {context} as utf-8: {err}"
107 )))
108 })?;
109 let mut stats = DeltaCommitAddStats::default();
110 for line in content.lines() {
111 let record: Value = serde_json::from_str(line).map_err(|err| {
112 Box::new(RunError(format!(
113 "delta metrics failed to parse {context}: {err}"
114 )))
115 })?;
116 let Some(add) = record.get("add") else {
117 continue;
118 };
119 stats.files_written += 1;
120 if stats.part_files.len() < 50 {
121 if let Some(path) = add.get("path").and_then(|value| value.as_str()) {
122 let display_name = Path::new(path)
123 .file_name()
124 .and_then(|name| name.to_str())
125 .map(ToOwned::to_owned)
126 .unwrap_or_else(|| path.to_string());
127 stats.part_files.push(display_name);
128 }
129 }
130 if let Some(size) = add.get("size").and_then(|value| value.as_u64()) {
131 stats.file_sizes.push(size);
132 }
133 }
134 Ok(stats)
135}
136
137#[doc(hidden)]
138pub fn delta_commit_metrics_from_log_bytes(
139 bytes: &[u8],
140 small_file_threshold_bytes: u64,
141) -> FloeResult<(u64, Vec<String>, AcceptedWriteMetrics)> {
142 let stats = parse_delta_commit_add_stats_bytes(bytes)?;
143 Ok(delta_commit_stats_to_output(
144 stats,
145 small_file_threshold_bytes,
146 ))
147}
148
149#[doc(hidden)]
150pub fn delta_commit_metrics_from_log_bytes_best_effort(
151 bytes: &[u8],
152 small_file_threshold_bytes: u64,
153) -> (u64, Vec<String>, AcceptedWriteMetrics) {
154 match delta_commit_metrics_from_log_bytes(bytes, small_file_threshold_bytes) {
155 Ok(output) => output,
156 Err(_) => delta_commit_metrics_fallback_unknown(),
157 }
158}
159
160fn delta_commit_stats_to_output(
161 stats: DeltaCommitAddStats,
162 small_file_threshold_bytes: u64,
163) -> (u64, Vec<String>, AcceptedWriteMetrics) {
164 let metrics = if stats.file_sizes.len() == stats.files_written as usize {
165 metrics::summarize_written_file_sizes(&stats.file_sizes, small_file_threshold_bytes)
166 } else {
167 null_accepted_write_metrics()
168 };
169 (stats.files_written, stats.part_files, metrics)
170}
171
172fn delta_commit_metrics_fallback_unknown() -> (u64, Vec<String>, AcceptedWriteMetrics) {
173 (0, Vec::new(), null_accepted_write_metrics())
174}
175
176fn null_accepted_write_metrics() -> AcceptedWriteMetrics {
177 AcceptedWriteMetrics {
178 total_bytes_written: None,
179 avg_file_size_mb: None,
180 small_files_count: None,
181 }
182}