Skip to main content

floe_core/io/write/delta/
commit_metrics.rs

1use std::path::Path;
2
3use deltalake::logstore::read_commit_entry;
4use deltalake::table::builder::DeltaTableBuilder;
5use serde_json::Value;
6
7use crate::errors::RunError;
8use crate::io::format::AcceptedWriteMetrics;
9use crate::io::storage::{object_store, Target};
10use crate::io::write::metrics;
11use crate::{config, FloeResult};
12
13pub(super) fn delta_commit_metrics_for_target(
14    runtime: &tokio::runtime::Runtime,
15    target: &Target,
16    resolver: &config::StorageResolver,
17    entity: &config::EntityConfig,
18    version: i64,
19    small_file_threshold_bytes: u64,
20) -> FloeResult<(u64, Vec<String>, AcceptedWriteMetrics)> {
21    match target {
22        Target::Local { base_path, .. } => {
23            let stats = delta_commit_add_stats(Path::new(base_path), version)?;
24            Ok(delta_commit_stats_to_output(
25                stats,
26                small_file_threshold_bytes,
27            ))
28        }
29        // Best-effort metrics for remote targets: never fail a successful write because the
30        // commit log could not be read or parsed after commit.
31        Target::S3 { .. } | Target::Gcs { .. } | Target::Adls { .. } => {
32            match delta_commit_add_stats_via_object_store(
33                runtime, target, resolver, entity, version,
34            ) {
35                Ok(stats) => Ok(delta_commit_stats_to_output(
36                    stats,
37                    small_file_threshold_bytes,
38                )),
39                Err(_) => Ok(delta_commit_metrics_fallback_unknown()),
40            }
41        }
42    }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Default)]
46pub struct DeltaCommitAddStats {
47    files_written: u64,
48    part_files: Vec<String>,
49    file_sizes: Vec<u64>,
50}
51
52fn delta_commit_add_stats(table_root: &Path, version: i64) -> FloeResult<DeltaCommitAddStats> {
53    let log_path = table_root
54        .join("_delta_log")
55        .join(format!("{version:020}.json"));
56    let bytes = std::fs::read(&log_path).map_err(|err| {
57        Box::new(RunError(format!(
58            "delta metrics failed to open commit log {}: {err}",
59            log_path.display()
60        )))
61    })?;
62    parse_delta_commit_add_stats_bytes_with_context(&bytes, &log_path.display().to_string())
63}
64
65fn delta_commit_add_stats_via_object_store(
66    runtime: &tokio::runtime::Runtime,
67    target: &Target,
68    resolver: &config::StorageResolver,
69    entity: &config::EntityConfig,
70    version: i64,
71) -> FloeResult<DeltaCommitAddStats> {
72    let store = object_store::delta_store_config(target, resolver, entity)?;
73    let builder = DeltaTableBuilder::from_url(store.table_url.clone())
74        .map_err(|err| Box::new(RunError(format!("delta metrics builder failed: {err}"))))?
75        .with_storage_options(store.storage_options);
76    let log_store = builder.build_storage().map_err(|err| {
77        Box::new(RunError(format!(
78            "delta metrics log store init failed: {err}"
79        )))
80    })?;
81    let bytes = runtime
82        .block_on(async { read_commit_entry(log_store.object_store(None).as_ref(), version).await })
83        .map_err(|err| Box::new(RunError(format!("delta metrics commit read failed: {err}"))))?
84        .ok_or_else(|| {
85            Box::new(RunError(format!(
86                "delta metrics commit log missing for version {version}"
87            ))) as Box<dyn std::error::Error + Send + Sync>
88        })?;
89    parse_delta_commit_add_stats_bytes_with_context(
90        bytes.as_ref(),
91        &format!("remote delta commit version {version}"),
92    )
93}
94
95#[doc(hidden)]
96pub fn parse_delta_commit_add_stats_bytes(bytes: &[u8]) -> FloeResult<DeltaCommitAddStats> {
97    parse_delta_commit_add_stats_bytes_with_context(bytes, "delta commit log bytes")
98}
99
100fn parse_delta_commit_add_stats_bytes_with_context(
101    bytes: &[u8],
102    context: &str,
103) -> FloeResult<DeltaCommitAddStats> {
104    let content = std::str::from_utf8(bytes).map_err(|err| {
105        Box::new(RunError(format!(
106            "delta metrics failed to decode {context} as utf-8: {err}"
107        )))
108    })?;
109    let mut stats = DeltaCommitAddStats::default();
110    for line in content.lines() {
111        let record: Value = serde_json::from_str(line).map_err(|err| {
112            Box::new(RunError(format!(
113                "delta metrics failed to parse {context}: {err}"
114            )))
115        })?;
116        let Some(add) = record.get("add") else {
117            continue;
118        };
119        stats.files_written += 1;
120        if stats.part_files.len() < 50 {
121            if let Some(path) = add.get("path").and_then(|value| value.as_str()) {
122                let display_name = Path::new(path)
123                    .file_name()
124                    .and_then(|name| name.to_str())
125                    .map(ToOwned::to_owned)
126                    .unwrap_or_else(|| path.to_string());
127                stats.part_files.push(display_name);
128            }
129        }
130        if let Some(size) = add.get("size").and_then(|value| value.as_u64()) {
131            stats.file_sizes.push(size);
132        }
133    }
134    Ok(stats)
135}
136
137#[doc(hidden)]
138pub fn delta_commit_metrics_from_log_bytes(
139    bytes: &[u8],
140    small_file_threshold_bytes: u64,
141) -> FloeResult<(u64, Vec<String>, AcceptedWriteMetrics)> {
142    let stats = parse_delta_commit_add_stats_bytes(bytes)?;
143    Ok(delta_commit_stats_to_output(
144        stats,
145        small_file_threshold_bytes,
146    ))
147}
148
149#[doc(hidden)]
150pub fn delta_commit_metrics_from_log_bytes_best_effort(
151    bytes: &[u8],
152    small_file_threshold_bytes: u64,
153) -> (u64, Vec<String>, AcceptedWriteMetrics) {
154    match delta_commit_metrics_from_log_bytes(bytes, small_file_threshold_bytes) {
155        Ok(output) => output,
156        Err(_) => delta_commit_metrics_fallback_unknown(),
157    }
158}
159
160fn delta_commit_stats_to_output(
161    stats: DeltaCommitAddStats,
162    small_file_threshold_bytes: u64,
163) -> (u64, Vec<String>, AcceptedWriteMetrics) {
164    let metrics = if stats.file_sizes.len() == stats.files_written as usize {
165        metrics::summarize_written_file_sizes(&stats.file_sizes, small_file_threshold_bytes)
166    } else {
167        null_accepted_write_metrics()
168    };
169    (stats.files_written, stats.part_files, metrics)
170}
171
172fn delta_commit_metrics_fallback_unknown() -> (u64, Vec<String>, AcceptedWriteMetrics) {
173    (0, Vec::new(), null_accepted_write_metrics())
174}
175
176fn null_accepted_write_metrics() -> AcceptedWriteMetrics {
177    AcceptedWriteMetrics {
178        total_bytes_written: None,
179        avg_file_size_mb: None,
180        small_files_count: None,
181    }
182}