Skip to main content

velesdb_core/collection/core/
statistics.rs

1//! Collection statistics methods (EPIC-046 US-001).
2//!
3//! Provides the `analyze()` method for collecting runtime statistics
4//! to support cost-based query planning.
5
6use crate::collection::stats::{CollectionStats, IndexStats, StatsCollector};
7use crate::collection::Collection;
8use crate::error::Error;
9use crate::storage::PayloadStorage;
10use std::collections::{HashMap, HashSet};
11use std::time::{Duration, Instant};
12
13/// Converts a `usize` to `u64`, saturating to `u64::MAX` on overflow.
14///
15/// Used throughout statistics collection where collection sizes are bounded by
16/// available memory and precision is non-critical.
17fn saturating_u64(value: usize) -> u64 {
18    u64::try_from(value).unwrap_or(u64::MAX)
19}
20
21/// TTL for cached collection statistics used by the cost-based query planner.
22const STATS_TTL: Duration = Duration::from_secs(30);
23
24impl Collection {
25    /// Analyzes the collection and returns statistics.
26    ///
27    /// This method collects:
28    /// - Row count and deleted count
29    /// - Index statistics (HNSW entry count)
30    ///
31    /// # Example
32    ///
33    /// ```ignore
34    /// let stats = collection.analyze()?;
35    /// println!("Row count: {}", stats.row_count);
36    /// println!("Deletion ratio: {:.1}%", stats.deletion_ratio() * 100.0);
37    /// ```
38    ///
39    /// # Errors
40    ///
41    /// Returns an error if statistics cannot be collected.
42    ///
43    /// # Panics
44    ///
45    /// Panics if `point_count` exceeds `u64::MAX` (extremely unlikely on 64-bit systems).
46    pub fn analyze(&self) -> Result<CollectionStats, Error> {
47        let mut collector = StatsCollector::new();
48
49        // Basic counts from config
50        // Note: deleted_count and column_stats are placeholders for future tombstone tracking
51        // and per-column cardinality analysis (EPIC-046 future work)
52        let config = self.config.read();
53        collector.set_row_count(saturating_u64(config.point_count));
54        drop(config);
55
56        let mut distinct_values: HashMap<String, HashSet<String>> = HashMap::new();
57        let mut null_counts: HashMap<String, u64> = HashMap::new();
58        let mut payload_size_bytes = 0u64;
59
60        let payload_storage = self.payload_storage.read();
61        let ids = payload_storage.ids();
62        for id in ids.into_iter().take(1_000) {
63            if let Ok(Some(payload)) = payload_storage.retrieve(id) {
64                if let Ok(payload_bytes) = serde_json::to_vec(&payload) {
65                    payload_size_bytes =
66                        payload_size_bytes.saturating_add(saturating_u64(payload_bytes.len()));
67                }
68
69                if let Some(obj) = payload.as_object() {
70                    for (key, value) in obj {
71                        if value.is_null() {
72                            *null_counts.entry(key.clone()).or_insert(0) += 1;
73                        } else {
74                            distinct_values
75                                .entry(key.clone())
76                                .or_default()
77                                .insert(value.to_string());
78                        }
79                    }
80                }
81            }
82        }
83        drop(payload_storage);
84
85        collector.set_total_size(payload_size_bytes);
86
87        for (field, values) in distinct_values {
88            let mut col = crate::collection::stats::ColumnStats::new(field.clone())
89                .with_distinct_count(saturating_u64(values.len()));
90            if let Some(null_count) = null_counts.get(&field) {
91                col = col.with_null_count(*null_count);
92            }
93            collector.add_column_stats(col);
94        }
95
96        // HNSW index statistics
97        let hnsw_len = self.index.len();
98        let hnsw_stats =
99            IndexStats::new("hnsw_primary", "HNSW").with_entry_count(saturating_u64(hnsw_len));
100        collector.add_index_stats(hnsw_stats);
101
102        // BM25 index statistics - use len() if available
103        let bm25_len = self.text_index.len();
104        if bm25_len > 0 {
105            let bm25_stats =
106                IndexStats::new("bm25_text", "BM25").with_entry_count(saturating_u64(bm25_len));
107            collector.add_index_stats(bm25_stats);
108        }
109
110        Ok(collector.build())
111    }
112
113    /// Returns cached statistics if available and fresh, otherwise recomputes.
114    ///
115    /// Results are cached for 30 seconds (`STATS_TTL`) to avoid re-scanning payload
116    /// storage on every `execute_query()` call. Mutating methods (`upsert`,
117    /// `delete`, etc.) invalidate the cache so the next call always recomputes.
118    ///
119    /// # Note
120    /// Returns default stats on error (intentional for convenience).
121    /// Use `analyze()` directly if error handling is required.
122    #[must_use]
123    pub fn get_stats(&self) -> CollectionStats {
124        let mut cached = self.cached_stats.lock();
125        if let Some((ref stats, ts)) = *cached {
126            if ts.elapsed() < STATS_TTL {
127                return stats.clone();
128            }
129        }
130        match self.analyze() {
131            Ok(stats) => {
132                *cached = Some((stats.clone(), Instant::now()));
133                stats
134            }
135            Err(e) => {
136                tracing::warn!(
137                    "Failed to compute collection statistics: {}. Returning defaults.",
138                    e
139                );
140                CollectionStats::default()
141            }
142        }
143    }
144
145    /// Returns the selectivity estimate for a column.
146    ///
147    /// Selectivity is 1/cardinality, representing the probability
148    /// that a random row matches a specific value.
149    #[must_use]
150    pub fn estimate_column_selectivity(&self, column: &str) -> f64 {
151        let stats = self.get_stats();
152        stats.estimate_selectivity(column)
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::distance::DistanceMetric;
160    use tempfile::TempDir;
161
162    #[test]
163    fn test_analyze_empty_collection() {
164        let temp_dir = TempDir::new().unwrap();
165        let collection =
166            Collection::create(temp_dir.path().to_path_buf(), 128, DistanceMetric::Cosine).unwrap();
167
168        let stats = collection.analyze().unwrap();
169
170        assert_eq!(stats.row_count, 0);
171        assert_eq!(stats.deleted_count, 0);
172        assert!(stats.index_stats.contains_key("hnsw_primary"));
173    }
174
175    #[test]
176    fn test_analyze_with_data() {
177        use crate::point::Point;
178
179        let temp_dir = TempDir::new().unwrap();
180        let collection =
181            Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
182
183        // Insert some vectors using Point
184        let points: Vec<Point> = (0..10)
185            .map(|i| {
186                #[allow(clippy::cast_precision_loss)] // Reason: i < 20 in test; u64→f32 exact.
187                Point::new(
188                    i,
189                    vec![i as f32; 4],
190                    Some(serde_json::json!({"category": format!("cat_{}", i % 3)})),
191                )
192            })
193            .collect();
194        collection.upsert(points).unwrap();
195
196        let stats = collection.analyze().unwrap();
197
198        assert_eq!(stats.row_count, 10);
199        assert!(stats.index_stats.get("hnsw_primary").unwrap().entry_count >= 10);
200    }
201
202    #[test]
203    fn test_get_stats_returns_defaults_on_error() {
204        let temp_dir = TempDir::new().unwrap();
205        let collection =
206            Collection::create(temp_dir.path().to_path_buf(), 128, DistanceMetric::Cosine).unwrap();
207
208        let stats = collection.get_stats();
209
210        // Should not panic, returns default on any issue
211        assert_eq!(stats.live_row_count(), 0);
212    }
213
214    #[test]
215    fn test_get_stats_uses_cache_within_ttl() {
216        let temp_dir = TempDir::new().unwrap();
217        let collection =
218            Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
219
220        // First call populates the cache.
221        let stats1 = collection.get_stats();
222        assert_eq!(stats1.row_count, 0);
223
224        // Insert a point — but bypass invalidation by calling the storage directly
225        // so we can verify the cache is still served unchanged.
226        // We just call get_stats() again immediately: within TTL it must return
227        // the same object (row_count == 0) without re-scanning.
228        let stats2 = collection.get_stats();
229        assert_eq!(
230            stats1.row_count, stats2.row_count,
231            "get_stats should return cached value within TTL"
232        );
233    }
234
235    #[test]
236    fn test_get_stats_invalidated_after_upsert() {
237        use crate::point::Point;
238
239        let temp_dir = TempDir::new().unwrap();
240        let collection =
241            Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
242
243        // Warm the cache.
244        let stats_before = collection.get_stats();
245        assert_eq!(stats_before.row_count, 0);
246
247        // upsert() must invalidate the cache.
248        let points = vec![Point::new(1, vec![0.1, 0.2, 0.3, 0.4], None)];
249        collection.upsert(points).unwrap();
250
251        // Next get_stats() should recompute and reflect the new point.
252        let stats_after = collection.get_stats();
253        assert_eq!(
254            stats_after.row_count, 1,
255            "get_stats should recompute after upsert invalidates the cache"
256        );
257    }
258}