floe_core/io/write/delta/
commit_metrics.rs1use std::path::Path;
2
3use deltalake::logstore::read_commit_entry;
4use deltalake::table::builder::DeltaTableBuilder;
5use serde_json::Value;
6
7use crate::errors::RunError;
8use crate::io::storage::{object_store, Target};
9use crate::io::write::metrics;
10use crate::{config, FloeResult};
11
12pub(super) fn delta_commit_metrics_for_target(
13 runtime: &tokio::runtime::Runtime,
14 target: &Target,
15 resolver: &config::StorageResolver,
16 entity: &config::EntityConfig,
17 version: i64,
18 small_file_threshold_bytes: u64,
19) -> FloeResult<(
20 Option<u64>,
21 Vec<String>,
22 crate::io::format::AcceptedWriteMetrics,
23)> {
24 match target {
25 Target::Local { base_path, .. } => {
26 let stats = delta_commit_add_stats(Path::new(base_path), version)?;
27 Ok(delta_commit_stats_to_output(
28 stats,
29 small_file_threshold_bytes,
30 ))
31 }
32 Target::S3 { .. } | Target::Gcs { .. } | Target::Adls { .. } => {
35 match delta_commit_add_stats_via_object_store(
36 runtime, target, resolver, entity, version,
37 ) {
38 Ok(stats) => Ok(delta_commit_stats_to_output(
39 stats,
40 small_file_threshold_bytes,
41 )),
42 Err(_) => Ok(delta_commit_metrics_fallback_unknown()),
43 }
44 }
45 }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Default)]
49pub struct DeltaCommitAddStats {
50 files_written: u64,
51 part_files: Vec<String>,
52 file_sizes: Vec<u64>,
53}
54
55fn delta_commit_add_stats(table_root: &Path, version: i64) -> FloeResult<DeltaCommitAddStats> {
56 let log_path = table_root
57 .join("_delta_log")
58 .join(format!("{version:020}.json"));
59 let bytes = std::fs::read(&log_path).map_err(|err| {
60 Box::new(RunError(format!(
61 "delta metrics failed to open commit log {}: {err}",
62 log_path.display()
63 )))
64 })?;
65 parse_delta_commit_add_stats_bytes_with_context(&bytes, &log_path.display().to_string())
66}
67
68fn delta_commit_add_stats_via_object_store(
69 runtime: &tokio::runtime::Runtime,
70 target: &Target,
71 resolver: &config::StorageResolver,
72 entity: &config::EntityConfig,
73 version: i64,
74) -> FloeResult<DeltaCommitAddStats> {
75 let store = object_store::delta_store_config(target, resolver, entity)?;
76 let builder = DeltaTableBuilder::from_url(store.table_url.clone())
77 .map_err(|err| Box::new(RunError(format!("delta metrics builder failed: {err}"))))?
78 .with_storage_options(store.storage_options);
79 let log_store = builder.build_storage().map_err(|err| {
80 Box::new(RunError(format!(
81 "delta metrics log store init failed: {err}"
82 )))
83 })?;
84 let bytes = runtime
85 .block_on(async { read_commit_entry(log_store.object_store(None).as_ref(), version).await })
86 .map_err(|err| Box::new(RunError(format!("delta metrics commit read failed: {err}"))))?
87 .ok_or_else(|| {
88 Box::new(RunError(format!(
89 "delta metrics commit log missing for version {version}"
90 ))) as Box<dyn std::error::Error + Send + Sync>
91 })?;
92 parse_delta_commit_add_stats_bytes_with_context(
93 bytes.as_ref(),
94 &format!("remote delta commit version {version}"),
95 )
96}
97
98#[doc(hidden)]
99pub fn parse_delta_commit_add_stats_bytes(bytes: &[u8]) -> FloeResult<DeltaCommitAddStats> {
100 parse_delta_commit_add_stats_bytes_with_context(bytes, "delta commit log bytes")
101}
102
103fn parse_delta_commit_add_stats_bytes_with_context(
104 bytes: &[u8],
105 context: &str,
106) -> FloeResult<DeltaCommitAddStats> {
107 let content = std::str::from_utf8(bytes).map_err(|err| {
108 Box::new(RunError(format!(
109 "delta metrics failed to decode {context} as utf-8: {err}"
110 )))
111 })?;
112 let mut stats = DeltaCommitAddStats::default();
113 for line in content.lines() {
114 let record: Value = serde_json::from_str(line).map_err(|err| {
115 Box::new(RunError(format!(
116 "delta metrics failed to parse {context}: {err}"
117 )))
118 })?;
119 let Some(add) = record.get("add") else {
120 continue;
121 };
122 stats.files_written += 1;
123 if stats.part_files.len() < 50 {
124 if let Some(path) = add.get("path").and_then(|value| value.as_str()) {
125 let display_name = Path::new(path)
126 .file_name()
127 .and_then(|name| name.to_str())
128 .map(ToOwned::to_owned)
129 .unwrap_or_else(|| path.to_string());
130 stats.part_files.push(display_name);
131 }
132 }
133 if let Some(size) = add.get("size").and_then(|value| value.as_u64()) {
134 stats.file_sizes.push(size);
135 }
136 }
137 Ok(stats)
138}
139
140#[doc(hidden)]
141pub fn delta_commit_metrics_from_log_bytes(
142 bytes: &[u8],
143 small_file_threshold_bytes: u64,
144) -> FloeResult<(
145 Option<u64>,
146 Vec<String>,
147 crate::io::format::AcceptedWriteMetrics,
148)> {
149 let stats = parse_delta_commit_add_stats_bytes(bytes)?;
150 Ok(delta_commit_stats_to_output(
151 stats,
152 small_file_threshold_bytes,
153 ))
154}
155
156#[doc(hidden)]
157pub fn delta_commit_metrics_from_log_bytes_best_effort(
158 bytes: &[u8],
159 small_file_threshold_bytes: u64,
160) -> (
161 Option<u64>,
162 Vec<String>,
163 crate::io::format::AcceptedWriteMetrics,
164) {
165 match delta_commit_metrics_from_log_bytes(bytes, small_file_threshold_bytes) {
166 Ok(output) => output,
167 Err(_) => delta_commit_metrics_fallback_unknown(),
168 }
169}
170
171fn delta_commit_stats_to_output(
172 stats: DeltaCommitAddStats,
173 small_file_threshold_bytes: u64,
174) -> (
175 Option<u64>,
176 Vec<String>,
177 crate::io::format::AcceptedWriteMetrics,
178) {
179 let metrics = metrics::summarize_written_file_sizes(
180 &stats.file_sizes,
181 stats.files_written,
182 small_file_threshold_bytes,
183 );
184 (Some(stats.files_written), stats.part_files, metrics)
185}
186
187fn delta_commit_metrics_fallback_unknown() -> (
188 Option<u64>,
189 Vec<String>,
190 crate::io::format::AcceptedWriteMetrics,
191) {
192 (None, Vec::new(), metrics::null_accepted_write_metrics())
193}