Skip to main content

scouter_dataframe/parquet/bifrost/
stats.rs

1use crate::error::DatasetEngineError;
2use crate::parquet::utils::register_cloud_logstore_factories;
3use crate::storage::ObjectStore;
4use deltalake::DeltaTableBuilder;
5use scouter_types::dataset::DatasetNamespace;
6use tracing::debug;
7use url::Url;
8
9/// Table-level statistics extracted from the Delta Lake transaction log.
10/// No data files are read — only the log metadata.
11#[derive(Debug, Clone, Default)]
12pub struct TableStats {
13    pub row_count: Option<u64>,
14    pub file_count: Option<u64>,
15    pub size_bytes: Option<u64>,
16    pub delta_version: Option<u64>,
17}
18
19fn build_table_url(
20    object_store: &ObjectStore,
21    namespace: &DatasetNamespace,
22) -> Result<Url, DatasetEngineError> {
23    let mut base = object_store.get_base_url()?;
24    let mut path = base.path().to_string();
25    if !path.ends_with('/') {
26        path.push('/');
27    }
28    path.push_str(&namespace.storage_path());
29    base.set_path(&path);
30    Ok(base)
31}
32
33/// Load stats from a Delta table's transaction log without scanning data.
34///
35/// For inactive tables (no engine loaded), this transiently loads the Delta log.
36/// For active tables, prefer reading from the engine's `DeltaTable` directly.
37pub async fn load_table_stats(
38    object_store: &ObjectStore,
39    namespace: &DatasetNamespace,
40) -> Result<TableStats, DatasetEngineError> {
41    register_cloud_logstore_factories();
42    let table_url = build_table_url(object_store, namespace)?;
43
44    let store = object_store.as_dyn_object_store();
45    let builder =
46        DeltaTableBuilder::from_url(table_url.clone())?.with_storage_backend(store, table_url);
47
48    let table = match builder.load().await {
49        Ok(t) => t,
50        Err(e) => {
51            // Only treat "table doesn't exist yet" as empty stats
52            let msg = e.to_string().to_lowercase();
53            if msg.contains("not a delta table")
54                || msg.contains("no such file")
55                || msg.contains("does not exist")
56            {
57                return Ok(TableStats::default());
58            }
59            return Err(DatasetEngineError::DeltaTableError(e));
60        }
61    };
62
63    extract_stats_from_snapshot(&table)
64}
65
66/// Extract stats from a loaded DeltaTable's snapshot.
67pub fn extract_stats_from_snapshot(
68    table: &deltalake::DeltaTable,
69) -> Result<TableStats, DatasetEngineError> {
70    let snapshot = table.snapshot()?;
71    let version = snapshot.version();
72    let log_data = snapshot.log_data();
73
74    let file_count = log_data.num_files() as u64;
75    let mut row_count: u64 = 0;
76    let mut size_bytes: u64 = 0;
77    let mut has_row_stats = false;
78
79    for file_view in log_data.iter() {
80        size_bytes = size_bytes.saturating_add(file_view.size().max(0) as u64);
81        if let Some(n) = file_view.num_records() {
82            row_count += n as u64;
83            has_row_stats = true;
84        }
85    }
86
87    debug!(
88        "Delta stats: version={}, files={}, size={}B, rows={}",
89        version,
90        file_count,
91        size_bytes,
92        if has_row_stats {
93            row_count.to_string()
94        } else {
95            "unknown".to_string()
96        }
97    );
98
99    Ok(TableStats {
100        row_count: if has_row_stats { Some(row_count) } else { None },
101        file_count: Some(file_count),
102        size_bytes: Some(size_bytes),
103        delta_version: if version >= 0 {
104            Some(version as u64)
105        } else {
106            None
107        },
108    })
109}