Skip to main content

floe_core/io/write/delta/
commit_metrics.rs

1use std::path::Path;
2
3use deltalake::logstore::read_commit_entry;
4use deltalake::table::builder::DeltaTableBuilder;
5use serde_json::Value;
6
7use crate::errors::RunError;
8use crate::io::storage::{object_store, Target};
9use crate::io::write::metrics;
10use crate::{config, FloeResult};
11
12pub(super) fn delta_commit_metrics_for_target(
13    runtime: &tokio::runtime::Runtime,
14    target: &Target,
15    resolver: &config::StorageResolver,
16    entity: &config::EntityConfig,
17    version: i64,
18    small_file_threshold_bytes: u64,
19) -> FloeResult<(
20    Option<u64>,
21    Vec<String>,
22    crate::io::format::AcceptedWriteMetrics,
23)> {
24    match target {
25        Target::Local { base_path, .. } => {
26            let stats = delta_commit_add_stats(Path::new(base_path), version)?;
27            Ok(delta_commit_stats_to_output(
28                stats,
29                small_file_threshold_bytes,
30            ))
31        }
32        // Best-effort metrics for remote targets: never fail a successful write because the
33        // commit log could not be read or parsed after commit.
34        Target::S3 { .. } | Target::Gcs { .. } | Target::Adls { .. } => {
35            match delta_commit_add_stats_via_object_store(
36                runtime, target, resolver, entity, version,
37            ) {
38                Ok(stats) => Ok(delta_commit_stats_to_output(
39                    stats,
40                    small_file_threshold_bytes,
41                )),
42                Err(_) => Ok(delta_commit_metrics_fallback_unknown()),
43            }
44        }
45    }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Default)]
49pub struct DeltaCommitAddStats {
50    files_written: u64,
51    part_files: Vec<String>,
52    file_sizes: Vec<u64>,
53}
54
55fn delta_commit_add_stats(table_root: &Path, version: i64) -> FloeResult<DeltaCommitAddStats> {
56    let log_path = table_root
57        .join("_delta_log")
58        .join(format!("{version:020}.json"));
59    let bytes = std::fs::read(&log_path).map_err(|err| {
60        Box::new(RunError(format!(
61            "delta metrics failed to open commit log {}: {err}",
62            log_path.display()
63        )))
64    })?;
65    parse_delta_commit_add_stats_bytes_with_context(&bytes, &log_path.display().to_string())
66}
67
68fn delta_commit_add_stats_via_object_store(
69    runtime: &tokio::runtime::Runtime,
70    target: &Target,
71    resolver: &config::StorageResolver,
72    entity: &config::EntityConfig,
73    version: i64,
74) -> FloeResult<DeltaCommitAddStats> {
75    let store = object_store::delta_store_config(target, resolver, entity)?;
76    let builder = DeltaTableBuilder::from_url(store.table_url.clone())
77        .map_err(|err| Box::new(RunError(format!("delta metrics builder failed: {err}"))))?
78        .with_storage_options(store.storage_options);
79    let log_store = builder.build_storage().map_err(|err| {
80        Box::new(RunError(format!(
81            "delta metrics log store init failed: {err}"
82        )))
83    })?;
84    let bytes = runtime
85        .block_on(async { read_commit_entry(log_store.object_store(None).as_ref(), version).await })
86        .map_err(|err| Box::new(RunError(format!("delta metrics commit read failed: {err}"))))?
87        .ok_or_else(|| {
88            Box::new(RunError(format!(
89                "delta metrics commit log missing for version {version}"
90            ))) as Box<dyn std::error::Error + Send + Sync>
91        })?;
92    parse_delta_commit_add_stats_bytes_with_context(
93        bytes.as_ref(),
94        &format!("remote delta commit version {version}"),
95    )
96}
97
98#[doc(hidden)]
99pub fn parse_delta_commit_add_stats_bytes(bytes: &[u8]) -> FloeResult<DeltaCommitAddStats> {
100    parse_delta_commit_add_stats_bytes_with_context(bytes, "delta commit log bytes")
101}
102
103fn parse_delta_commit_add_stats_bytes_with_context(
104    bytes: &[u8],
105    context: &str,
106) -> FloeResult<DeltaCommitAddStats> {
107    let content = std::str::from_utf8(bytes).map_err(|err| {
108        Box::new(RunError(format!(
109            "delta metrics failed to decode {context} as utf-8: {err}"
110        )))
111    })?;
112    let mut stats = DeltaCommitAddStats::default();
113    for line in content.lines() {
114        let record: Value = serde_json::from_str(line).map_err(|err| {
115            Box::new(RunError(format!(
116                "delta metrics failed to parse {context}: {err}"
117            )))
118        })?;
119        let Some(add) = record.get("add") else {
120            continue;
121        };
122        stats.files_written += 1;
123        if stats.part_files.len() < 50 {
124            if let Some(path) = add.get("path").and_then(|value| value.as_str()) {
125                let display_name = Path::new(path)
126                    .file_name()
127                    .and_then(|name| name.to_str())
128                    .map(ToOwned::to_owned)
129                    .unwrap_or_else(|| path.to_string());
130                stats.part_files.push(display_name);
131            }
132        }
133        if let Some(size) = add.get("size").and_then(|value| value.as_u64()) {
134            stats.file_sizes.push(size);
135        }
136    }
137    Ok(stats)
138}
139
140#[doc(hidden)]
141pub fn delta_commit_metrics_from_log_bytes(
142    bytes: &[u8],
143    small_file_threshold_bytes: u64,
144) -> FloeResult<(
145    Option<u64>,
146    Vec<String>,
147    crate::io::format::AcceptedWriteMetrics,
148)> {
149    let stats = parse_delta_commit_add_stats_bytes(bytes)?;
150    Ok(delta_commit_stats_to_output(
151        stats,
152        small_file_threshold_bytes,
153    ))
154}
155
156#[doc(hidden)]
157pub fn delta_commit_metrics_from_log_bytes_best_effort(
158    bytes: &[u8],
159    small_file_threshold_bytes: u64,
160) -> (
161    Option<u64>,
162    Vec<String>,
163    crate::io::format::AcceptedWriteMetrics,
164) {
165    match delta_commit_metrics_from_log_bytes(bytes, small_file_threshold_bytes) {
166        Ok(output) => output,
167        Err(_) => delta_commit_metrics_fallback_unknown(),
168    }
169}
170
171fn delta_commit_stats_to_output(
172    stats: DeltaCommitAddStats,
173    small_file_threshold_bytes: u64,
174) -> (
175    Option<u64>,
176    Vec<String>,
177    crate::io::format::AcceptedWriteMetrics,
178) {
179    let metrics = metrics::summarize_written_file_sizes(
180        &stats.file_sizes,
181        stats.files_written,
182        small_file_threshold_bytes,
183    );
184    (Some(stats.files_written), stats.part_files, metrics)
185}
186
187fn delta_commit_metrics_fallback_unknown() -> (
188    Option<u64>,
189    Vec<String>,
190    crate::io::format::AcceptedWriteMetrics,
191) {
192    (None, Vec::new(), metrics::null_accepted_write_metrics())
193}