scouter_dataframe/parquet/bifrost/
stats.rs1use crate::error::DatasetEngineError;
2use crate::parquet::utils::register_cloud_logstore_factories;
3use crate::storage::ObjectStore;
4use deltalake::DeltaTableBuilder;
5use scouter_types::dataset::DatasetNamespace;
6use tracing::debug;
7use url::Url;
8
9#[derive(Debug, Clone, Default)]
12pub struct TableStats {
13 pub row_count: Option<u64>,
14 pub file_count: Option<u64>,
15 pub size_bytes: Option<u64>,
16 pub delta_version: Option<u64>,
17}
18
19fn build_table_url(
20 object_store: &ObjectStore,
21 namespace: &DatasetNamespace,
22) -> Result<Url, DatasetEngineError> {
23 let mut base = object_store.get_base_url()?;
24 let mut path = base.path().to_string();
25 if !path.ends_with('/') {
26 path.push('/');
27 }
28 path.push_str(&namespace.storage_path());
29 base.set_path(&path);
30 Ok(base)
31}
32
33pub async fn load_table_stats(
38 object_store: &ObjectStore,
39 namespace: &DatasetNamespace,
40) -> Result<TableStats, DatasetEngineError> {
41 register_cloud_logstore_factories();
42 let table_url = build_table_url(object_store, namespace)?;
43
44 let store = object_store.as_dyn_object_store();
45 let builder =
46 DeltaTableBuilder::from_url(table_url.clone())?.with_storage_backend(store, table_url);
47
48 let table = match builder.load().await {
49 Ok(t) => t,
50 Err(e) => {
51 let msg = e.to_string().to_lowercase();
53 if msg.contains("not a delta table")
54 || msg.contains("no such file")
55 || msg.contains("does not exist")
56 {
57 return Ok(TableStats::default());
58 }
59 return Err(DatasetEngineError::DeltaTableError(e));
60 }
61 };
62
63 extract_stats_from_snapshot(&table)
64}
65
66pub fn extract_stats_from_snapshot(
68 table: &deltalake::DeltaTable,
69) -> Result<TableStats, DatasetEngineError> {
70 let snapshot = table.snapshot()?;
71 let version = snapshot.version();
72 let log_data = snapshot.log_data();
73
74 let file_count = log_data.num_files() as u64;
75 let mut row_count: u64 = 0;
76 let mut size_bytes: u64 = 0;
77 let mut has_row_stats = false;
78
79 for file_view in log_data.iter() {
80 size_bytes = size_bytes.saturating_add(file_view.size().max(0) as u64);
81 if let Some(n) = file_view.num_records() {
82 row_count += n as u64;
83 has_row_stats = true;
84 }
85 }
86
87 debug!(
88 "Delta stats: version={}, files={}, size={}B, rows={}",
89 version,
90 file_count,
91 size_bytes,
92 if has_row_stats {
93 row_count.to_string()
94 } else {
95 "unknown".to_string()
96 }
97 );
98
99 Ok(TableStats {
100 row_count: if has_row_stats { Some(row_count) } else { None },
101 file_count: Some(file_count),
102 size_bytes: Some(size_bytes),
103 delta_version: if version >= 0 {
104 Some(version as u64)
105 } else {
106 None
107 },
108 })
109}