Skip to main content

velesdb_core/collection/core/
statistics.rs

1//! Collection statistics methods (EPIC-046 US-001).
2//!
3//! Provides the `analyze()` method for collecting runtime statistics
4//! to support cost-based query planning.
5
6use crate::collection::stats::{CollectionStats, IndexStats, StatsCollector};
7use crate::collection::Collection;
8use crate::error::Error;
9use crate::storage::PayloadStorage;
10use std::collections::{HashMap, HashSet};
11use std::time::{Duration, Instant};
12
13/// TTL for cached collection statistics used by the cost-based query planner.
14const STATS_TTL: Duration = Duration::from_secs(30);
15
16impl Collection {
17    /// Analyzes the collection and returns statistics.
18    ///
19    /// This method collects:
20    /// - Row count and deleted count
21    /// - Index statistics (HNSW entry count)
22    ///
23    /// # Example
24    ///
25    /// ```ignore
26    /// let stats = collection.analyze()?;
27    /// println!("Row count: {}", stats.row_count);
28    /// println!("Deletion ratio: {:.1}%", stats.deletion_ratio() * 100.0);
29    /// ```
30    ///
31    /// # Errors
32    ///
33    /// Returns an error if statistics cannot be collected.
34    ///
35    /// # Panics
36    ///
37    /// Panics if `point_count` exceeds `u64::MAX` (extremely unlikely on 64-bit systems).
38    pub fn analyze(&self) -> Result<CollectionStats, Error> {
39        let mut collector = StatsCollector::new();
40
41        // Basic counts from config
42        // Note: deleted_count and column_stats are placeholders for future tombstone tracking
43        // and per-column cardinality analysis (EPIC-046 future work)
44        let config = self.config.read();
45        // Reason: Collection sizes are bounded by available memory, always < u64::MAX on 64-bit systems
46        collector.set_row_count(u64::try_from(config.point_count).unwrap_or(u64::MAX));
47        drop(config);
48
49        let mut distinct_values: HashMap<String, HashSet<String>> = HashMap::new();
50        let mut null_counts: HashMap<String, u64> = HashMap::new();
51        let mut payload_size_bytes = 0u64;
52
53        let payload_storage = self.payload_storage.read();
54        let ids = payload_storage.ids();
55        for id in ids.into_iter().take(1_000) {
56            if let Ok(Some(payload)) = payload_storage.retrieve(id) {
57                if let Ok(payload_bytes) = serde_json::to_vec(&payload) {
58                    payload_size_bytes = payload_size_bytes
59                        .saturating_add(u64::try_from(payload_bytes.len()).unwrap_or(u64::MAX));
60                }
61
62                if let Some(obj) = payload.as_object() {
63                    for (key, value) in obj {
64                        if value.is_null() {
65                            *null_counts.entry(key.clone()).or_insert(0) += 1;
66                        } else {
67                            distinct_values
68                                .entry(key.clone())
69                                .or_default()
70                                .insert(value.to_string());
71                        }
72                    }
73                }
74            }
75        }
76        drop(payload_storage);
77
78        collector.set_total_size(payload_size_bytes);
79
80        for (field, values) in distinct_values {
81            let mut col = crate::collection::stats::ColumnStats::new(field.clone())
82                .with_distinct_count(u64::try_from(values.len()).unwrap_or(u64::MAX));
83            if let Some(null_count) = null_counts.get(&field) {
84                col = col.with_null_count(*null_count);
85            }
86            collector.add_column_stats(col);
87        }
88
89        // HNSW index statistics
90        let hnsw_len = self.index.len();
91        let hnsw_stats = IndexStats::new("hnsw_primary", "HNSW")
92            .with_entry_count(u64::try_from(hnsw_len).unwrap_or(u64::MAX));
93        collector.add_index_stats(hnsw_stats);
94
95        // BM25 index statistics - use len() if available
96        let bm25_len = self.text_index.len();
97        if bm25_len > 0 {
98            let bm25_stats = IndexStats::new("bm25_text", "BM25")
99                .with_entry_count(u64::try_from(bm25_len).unwrap_or(u64::MAX));
100            collector.add_index_stats(bm25_stats);
101        }
102
103        Ok(collector.build())
104    }
105
106    /// Returns cached statistics if available and fresh, otherwise recomputes.
107    ///
108    /// Results are cached for 30 seconds (`STATS_TTL`) to avoid re-scanning payload
109    /// storage on every `execute_query()` call. Mutating methods (`upsert`,
110    /// `delete`, etc.) invalidate the cache so the next call always recomputes.
111    ///
112    /// # Note
113    /// Returns default stats on error (intentional for convenience).
114    /// Use `analyze()` directly if error handling is required.
115    #[must_use]
116    pub fn get_stats(&self) -> CollectionStats {
117        let mut cached = self.cached_stats.lock();
118        if let Some((ref stats, ts)) = *cached {
119            if ts.elapsed() < STATS_TTL {
120                return stats.clone();
121            }
122        }
123        match self.analyze() {
124            Ok(stats) => {
125                *cached = Some((stats.clone(), Instant::now()));
126                stats
127            }
128            Err(e) => {
129                tracing::warn!(
130                    "Failed to compute collection statistics: {}. Returning defaults.",
131                    e
132                );
133                CollectionStats::default()
134            }
135        }
136    }
137
138    /// Returns the selectivity estimate for a column.
139    ///
140    /// Selectivity is 1/cardinality, representing the probability
141    /// that a random row matches a specific value.
142    #[must_use]
143    pub fn estimate_column_selectivity(&self, column: &str) -> f64 {
144        let stats = self.get_stats();
145        stats.estimate_selectivity(column)
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use crate::distance::DistanceMetric;
153    use tempfile::TempDir;
154
155    #[test]
156    fn test_analyze_empty_collection() {
157        let temp_dir = TempDir::new().unwrap();
158        let collection =
159            Collection::create(temp_dir.path().to_path_buf(), 128, DistanceMetric::Cosine).unwrap();
160
161        let stats = collection.analyze().unwrap();
162
163        assert_eq!(stats.row_count, 0);
164        assert_eq!(stats.deleted_count, 0);
165        assert!(stats.index_stats.contains_key("hnsw_primary"));
166    }
167
168    #[test]
169    fn test_analyze_with_data() {
170        use crate::point::Point;
171
172        let temp_dir = TempDir::new().unwrap();
173        let collection =
174            Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
175
176        // Insert some vectors using Point
177        let points: Vec<Point> = (0..10)
178            .map(|i| {
179                #[allow(clippy::cast_precision_loss)] // Reason: i < 20 in test; u64→f32 exact.
180                Point::new(
181                    i,
182                    vec![i as f32; 4],
183                    Some(serde_json::json!({"category": format!("cat_{}", i % 3)})),
184                )
185            })
186            .collect();
187        collection.upsert(points).unwrap();
188
189        let stats = collection.analyze().unwrap();
190
191        assert_eq!(stats.row_count, 10);
192        assert!(stats.index_stats.get("hnsw_primary").unwrap().entry_count >= 10);
193    }
194
195    #[test]
196    fn test_get_stats_returns_defaults_on_error() {
197        let temp_dir = TempDir::new().unwrap();
198        let collection =
199            Collection::create(temp_dir.path().to_path_buf(), 128, DistanceMetric::Cosine).unwrap();
200
201        let stats = collection.get_stats();
202
203        // Should not panic, returns default on any issue
204        assert_eq!(stats.live_row_count(), 0);
205    }
206
207    #[test]
208    fn test_get_stats_uses_cache_within_ttl() {
209        let temp_dir = TempDir::new().unwrap();
210        let collection =
211            Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
212
213        // First call populates the cache.
214        let stats1 = collection.get_stats();
215        assert_eq!(stats1.row_count, 0);
216
217        // Insert a point — but bypass invalidation by calling the storage directly
218        // so we can verify the cache is still served unchanged.
219        // We just call get_stats() again immediately: within TTL it must return
220        // the same object (row_count == 0) without re-scanning.
221        let stats2 = collection.get_stats();
222        assert_eq!(
223            stats1.row_count, stats2.row_count,
224            "get_stats should return cached value within TTL"
225        );
226    }
227
228    #[test]
229    fn test_get_stats_invalidated_after_upsert() {
230        use crate::point::Point;
231
232        let temp_dir = TempDir::new().unwrap();
233        let collection =
234            Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
235
236        // Warm the cache.
237        let stats_before = collection.get_stats();
238        assert_eq!(stats_before.row_count, 0);
239
240        // upsert() must invalidate the cache.
241        let points = vec![Point::new(1, vec![0.1, 0.2, 0.3, 0.4], None)];
242        collection.upsert(points).unwrap();
243
244        // Next get_stats() should recompute and reflect the new point.
245        let stats_after = collection.get_stats();
246        assert_eq!(
247            stats_after.row_count, 1,
248            "get_stats should recompute after upsert invalidates the cache"
249        );
250    }
251}