velesdb_core/collection/core/
statistics.rs1use crate::collection::stats::{CollectionStats, IndexStats, StatsCollector};
7use crate::collection::Collection;
8use crate::error::Error;
9use crate::storage::PayloadStorage;
10use std::collections::{HashMap, HashSet};
11use std::time::{Duration, Instant};
12
13fn saturating_u64(value: usize) -> u64 {
18 u64::try_from(value).unwrap_or(u64::MAX)
19}
20
21const STATS_TTL: Duration = Duration::from_secs(30);
23
24impl Collection {
25 pub fn analyze(&self) -> Result<CollectionStats, Error> {
47 let mut collector = StatsCollector::new();
48
49 let config = self.config.read();
53 collector.set_row_count(saturating_u64(config.point_count));
54 drop(config);
55
56 let mut distinct_values: HashMap<String, HashSet<String>> = HashMap::new();
57 let mut null_counts: HashMap<String, u64> = HashMap::new();
58 let mut payload_size_bytes = 0u64;
59
60 let payload_storage = self.payload_storage.read();
61 let ids = payload_storage.ids();
62 for id in ids.into_iter().take(1_000) {
63 if let Ok(Some(payload)) = payload_storage.retrieve(id) {
64 if let Ok(payload_bytes) = serde_json::to_vec(&payload) {
65 payload_size_bytes =
66 payload_size_bytes.saturating_add(saturating_u64(payload_bytes.len()));
67 }
68
69 if let Some(obj) = payload.as_object() {
70 for (key, value) in obj {
71 if value.is_null() {
72 *null_counts.entry(key.clone()).or_insert(0) += 1;
73 } else {
74 distinct_values
75 .entry(key.clone())
76 .or_default()
77 .insert(value.to_string());
78 }
79 }
80 }
81 }
82 }
83 drop(payload_storage);
84
85 collector.set_total_size(payload_size_bytes);
86
87 for (field, values) in distinct_values {
88 let mut col = crate::collection::stats::ColumnStats::new(field.clone())
89 .with_distinct_count(saturating_u64(values.len()));
90 if let Some(null_count) = null_counts.get(&field) {
91 col = col.with_null_count(*null_count);
92 }
93 collector.add_column_stats(col);
94 }
95
96 let hnsw_len = self.index.len();
98 let hnsw_stats =
99 IndexStats::new("hnsw_primary", "HNSW").with_entry_count(saturating_u64(hnsw_len));
100 collector.add_index_stats(hnsw_stats);
101
102 let bm25_len = self.text_index.len();
104 if bm25_len > 0 {
105 let bm25_stats =
106 IndexStats::new("bm25_text", "BM25").with_entry_count(saturating_u64(bm25_len));
107 collector.add_index_stats(bm25_stats);
108 }
109
110 Ok(collector.build())
111 }
112
113 #[must_use]
123 pub fn get_stats(&self) -> CollectionStats {
124 let mut cached = self.cached_stats.lock();
125 if let Some((ref stats, ts)) = *cached {
126 if ts.elapsed() < STATS_TTL {
127 return stats.clone();
128 }
129 }
130 match self.analyze() {
131 Ok(stats) => {
132 *cached = Some((stats.clone(), Instant::now()));
133 stats
134 }
135 Err(e) => {
136 tracing::warn!(
137 "Failed to compute collection statistics: {}. Returning defaults.",
138 e
139 );
140 CollectionStats::default()
141 }
142 }
143 }
144
145 #[must_use]
150 pub fn estimate_column_selectivity(&self, column: &str) -> f64 {
151 let stats = self.get_stats();
152 stats.estimate_selectivity(column)
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::distance::DistanceMetric;
160 use tempfile::TempDir;
161
162 #[test]
163 fn test_analyze_empty_collection() {
164 let temp_dir = TempDir::new().unwrap();
165 let collection =
166 Collection::create(temp_dir.path().to_path_buf(), 128, DistanceMetric::Cosine).unwrap();
167
168 let stats = collection.analyze().unwrap();
169
170 assert_eq!(stats.row_count, 0);
171 assert_eq!(stats.deleted_count, 0);
172 assert!(stats.index_stats.contains_key("hnsw_primary"));
173 }
174
175 #[test]
176 fn test_analyze_with_data() {
177 use crate::point::Point;
178
179 let temp_dir = TempDir::new().unwrap();
180 let collection =
181 Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
182
183 let points: Vec<Point> = (0..10)
185 .map(|i| {
186 #[allow(clippy::cast_precision_loss)] Point::new(
188 i,
189 vec![i as f32; 4],
190 Some(serde_json::json!({"category": format!("cat_{}", i % 3)})),
191 )
192 })
193 .collect();
194 collection.upsert(points).unwrap();
195
196 let stats = collection.analyze().unwrap();
197
198 assert_eq!(stats.row_count, 10);
199 assert!(stats.index_stats.get("hnsw_primary").unwrap().entry_count >= 10);
200 }
201
202 #[test]
203 fn test_get_stats_returns_defaults_on_error() {
204 let temp_dir = TempDir::new().unwrap();
205 let collection =
206 Collection::create(temp_dir.path().to_path_buf(), 128, DistanceMetric::Cosine).unwrap();
207
208 let stats = collection.get_stats();
209
210 assert_eq!(stats.live_row_count(), 0);
212 }
213
214 #[test]
215 fn test_get_stats_uses_cache_within_ttl() {
216 let temp_dir = TempDir::new().unwrap();
217 let collection =
218 Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
219
220 let stats1 = collection.get_stats();
222 assert_eq!(stats1.row_count, 0);
223
224 let stats2 = collection.get_stats();
229 assert_eq!(
230 stats1.row_count, stats2.row_count,
231 "get_stats should return cached value within TTL"
232 );
233 }
234
235 #[test]
236 fn test_get_stats_invalidated_after_upsert() {
237 use crate::point::Point;
238
239 let temp_dir = TempDir::new().unwrap();
240 let collection =
241 Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
242
243 let stats_before = collection.get_stats();
245 assert_eq!(stats_before.row_count, 0);
246
247 let points = vec![Point::new(1, vec![0.1, 0.2, 0.3, 0.4], None)];
249 collection.upsert(points).unwrap();
250
251 let stats_after = collection.get_stats();
253 assert_eq!(
254 stats_after.row_count, 1,
255 "get_stats should recompute after upsert invalidates the cache"
256 );
257 }
258}