velesdb_core/collection/core/
statistics.rs1use crate::collection::stats::{CollectionStats, IndexStats, StatsCollector};
7use crate::collection::Collection;
8use crate::error::Error;
9use crate::storage::PayloadStorage;
10use std::collections::{HashMap, HashSet};
11use std::time::{Duration, Instant};
12
13const STATS_TTL: Duration = Duration::from_secs(30);
15
16impl Collection {
17 pub fn analyze(&self) -> Result<CollectionStats, Error> {
39 let mut collector = StatsCollector::new();
40
41 let config = self.config.read();
45 collector.set_row_count(u64::try_from(config.point_count).unwrap_or(u64::MAX));
47 drop(config);
48
49 let mut distinct_values: HashMap<String, HashSet<String>> = HashMap::new();
50 let mut null_counts: HashMap<String, u64> = HashMap::new();
51 let mut payload_size_bytes = 0u64;
52
53 let payload_storage = self.payload_storage.read();
54 let ids = payload_storage.ids();
55 for id in ids.into_iter().take(1_000) {
56 if let Ok(Some(payload)) = payload_storage.retrieve(id) {
57 if let Ok(payload_bytes) = serde_json::to_vec(&payload) {
58 payload_size_bytes = payload_size_bytes
59 .saturating_add(u64::try_from(payload_bytes.len()).unwrap_or(u64::MAX));
60 }
61
62 if let Some(obj) = payload.as_object() {
63 for (key, value) in obj {
64 if value.is_null() {
65 *null_counts.entry(key.clone()).or_insert(0) += 1;
66 } else {
67 distinct_values
68 .entry(key.clone())
69 .or_default()
70 .insert(value.to_string());
71 }
72 }
73 }
74 }
75 }
76 drop(payload_storage);
77
78 collector.set_total_size(payload_size_bytes);
79
80 for (field, values) in distinct_values {
81 let mut col = crate::collection::stats::ColumnStats::new(field.clone())
82 .with_distinct_count(u64::try_from(values.len()).unwrap_or(u64::MAX));
83 if let Some(null_count) = null_counts.get(&field) {
84 col = col.with_null_count(*null_count);
85 }
86 collector.add_column_stats(col);
87 }
88
89 let hnsw_len = self.index.len();
91 let hnsw_stats = IndexStats::new("hnsw_primary", "HNSW")
92 .with_entry_count(u64::try_from(hnsw_len).unwrap_or(u64::MAX));
93 collector.add_index_stats(hnsw_stats);
94
95 let bm25_len = self.text_index.len();
97 if bm25_len > 0 {
98 let bm25_stats = IndexStats::new("bm25_text", "BM25")
99 .with_entry_count(u64::try_from(bm25_len).unwrap_or(u64::MAX));
100 collector.add_index_stats(bm25_stats);
101 }
102
103 Ok(collector.build())
104 }
105
106 #[must_use]
116 pub fn get_stats(&self) -> CollectionStats {
117 let mut cached = self.cached_stats.lock();
118 if let Some((ref stats, ts)) = *cached {
119 if ts.elapsed() < STATS_TTL {
120 return stats.clone();
121 }
122 }
123 match self.analyze() {
124 Ok(stats) => {
125 *cached = Some((stats.clone(), Instant::now()));
126 stats
127 }
128 Err(e) => {
129 tracing::warn!(
130 "Failed to compute collection statistics: {}. Returning defaults.",
131 e
132 );
133 CollectionStats::default()
134 }
135 }
136 }
137
138 #[must_use]
143 pub fn estimate_column_selectivity(&self, column: &str) -> f64 {
144 let stats = self.get_stats();
145 stats.estimate_selectivity(column)
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use crate::distance::DistanceMetric;
153 use tempfile::TempDir;
154
155 #[test]
156 fn test_analyze_empty_collection() {
157 let temp_dir = TempDir::new().unwrap();
158 let collection =
159 Collection::create(temp_dir.path().to_path_buf(), 128, DistanceMetric::Cosine).unwrap();
160
161 let stats = collection.analyze().unwrap();
162
163 assert_eq!(stats.row_count, 0);
164 assert_eq!(stats.deleted_count, 0);
165 assert!(stats.index_stats.contains_key("hnsw_primary"));
166 }
167
168 #[test]
169 fn test_analyze_with_data() {
170 use crate::point::Point;
171
172 let temp_dir = TempDir::new().unwrap();
173 let collection =
174 Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
175
176 let points: Vec<Point> = (0..10)
178 .map(|i| {
179 #[allow(clippy::cast_precision_loss)] Point::new(
181 i,
182 vec![i as f32; 4],
183 Some(serde_json::json!({"category": format!("cat_{}", i % 3)})),
184 )
185 })
186 .collect();
187 collection.upsert(points).unwrap();
188
189 let stats = collection.analyze().unwrap();
190
191 assert_eq!(stats.row_count, 10);
192 assert!(stats.index_stats.get("hnsw_primary").unwrap().entry_count >= 10);
193 }
194
195 #[test]
196 fn test_get_stats_returns_defaults_on_error() {
197 let temp_dir = TempDir::new().unwrap();
198 let collection =
199 Collection::create(temp_dir.path().to_path_buf(), 128, DistanceMetric::Cosine).unwrap();
200
201 let stats = collection.get_stats();
202
203 assert_eq!(stats.live_row_count(), 0);
205 }
206
207 #[test]
208 fn test_get_stats_uses_cache_within_ttl() {
209 let temp_dir = TempDir::new().unwrap();
210 let collection =
211 Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
212
213 let stats1 = collection.get_stats();
215 assert_eq!(stats1.row_count, 0);
216
217 let stats2 = collection.get_stats();
222 assert_eq!(
223 stats1.row_count, stats2.row_count,
224 "get_stats should return cached value within TTL"
225 );
226 }
227
228 #[test]
229 fn test_get_stats_invalidated_after_upsert() {
230 use crate::point::Point;
231
232 let temp_dir = TempDir::new().unwrap();
233 let collection =
234 Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
235
236 let stats_before = collection.get_stats();
238 assert_eq!(stats_before.row_count, 0);
239
240 let points = vec![Point::new(1, vec![0.1, 0.2, 0.3, 0.4], None)];
242 collection.upsert(points).unwrap();
243
244 let stats_after = collection.get_stats();
246 assert_eq!(
247 stats_after.row_count, 1,
248 "get_stats should recompute after upsert invalidates the cache"
249 );
250 }
251}