use crate::collection::stats::{CollectionStats, IndexStats, StatsCollector};
use crate::collection::Collection;
use crate::error::Error;
use crate::storage::PayloadStorage;
use std::collections::{HashMap, HashSet};
use std::time::{Duration, Instant};
fn saturating_u64(value: usize) -> u64 {
u64::try_from(value).unwrap_or(u64::MAX)
}
const STATS_TTL: Duration = Duration::from_secs(30);
impl Collection {
pub fn analyze(&self) -> Result<CollectionStats, Error> {
let mut collector = StatsCollector::new();
let config = self.config.read();
collector.set_row_count(saturating_u64(config.point_count));
drop(config);
let mut distinct_values: HashMap<String, HashSet<String>> = HashMap::new();
let mut null_counts: HashMap<String, u64> = HashMap::new();
let mut payload_size_bytes = 0u64;
let payload_storage = self.payload_storage.read();
let ids = payload_storage.ids();
for id in ids.into_iter().take(1_000) {
if let Ok(Some(payload)) = payload_storage.retrieve(id) {
if let Ok(payload_bytes) = serde_json::to_vec(&payload) {
payload_size_bytes =
payload_size_bytes.saturating_add(saturating_u64(payload_bytes.len()));
}
if let Some(obj) = payload.as_object() {
for (key, value) in obj {
if value.is_null() {
*null_counts.entry(key.clone()).or_insert(0) += 1;
} else {
distinct_values
.entry(key.clone())
.or_default()
.insert(value.to_string());
}
}
}
}
}
drop(payload_storage);
collector.set_total_size(payload_size_bytes);
for (field, values) in distinct_values {
let mut col = crate::collection::stats::ColumnStats::new(field.clone())
.with_distinct_count(saturating_u64(values.len()));
if let Some(null_count) = null_counts.get(&field) {
col = col.with_null_count(*null_count);
}
collector.add_column_stats(col);
}
let hnsw_len = self.index.len();
let hnsw_stats =
IndexStats::new("hnsw_primary", "HNSW").with_entry_count(saturating_u64(hnsw_len));
collector.add_index_stats(hnsw_stats);
let bm25_len = self.text_index.len();
if bm25_len > 0 {
let bm25_stats =
IndexStats::new("bm25_text", "BM25").with_entry_count(saturating_u64(bm25_len));
collector.add_index_stats(bm25_stats);
}
Ok(collector.build())
}
#[must_use]
pub fn get_stats(&self) -> CollectionStats {
let mut cached = self.cached_stats.lock();
if let Some((ref stats, ts)) = *cached {
if ts.elapsed() < STATS_TTL {
return stats.clone();
}
}
match self.analyze() {
Ok(stats) => {
*cached = Some((stats.clone(), Instant::now()));
stats
}
Err(e) => {
tracing::warn!(
"Failed to compute collection statistics: {}. Returning defaults.",
e
);
CollectionStats::default()
}
}
}
#[must_use]
pub fn estimate_column_selectivity(&self, column: &str) -> f64 {
let stats = self.get_stats();
stats.estimate_selectivity(column)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::distance::DistanceMetric;
use tempfile::TempDir;
#[test]
fn test_analyze_empty_collection() {
let temp_dir = TempDir::new().unwrap();
let collection =
Collection::create(temp_dir.path().to_path_buf(), 128, DistanceMetric::Cosine).unwrap();
let stats = collection.analyze().unwrap();
assert_eq!(stats.row_count, 0);
assert_eq!(stats.deleted_count, 0);
assert!(stats.index_stats.contains_key("hnsw_primary"));
}
#[test]
fn test_analyze_with_data() {
use crate::point::Point;
let temp_dir = TempDir::new().unwrap();
let collection =
Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let points: Vec<Point> = (0..10)
.map(|i| {
#[allow(clippy::cast_precision_loss)] Point::new(
i,
vec![i as f32; 4],
Some(serde_json::json!({"category": format!("cat_{}", i % 3)})),
)
})
.collect();
collection.upsert(points).unwrap();
let stats = collection.analyze().unwrap();
assert_eq!(stats.row_count, 10);
assert!(stats.index_stats.get("hnsw_primary").unwrap().entry_count >= 10);
}
#[test]
fn test_get_stats_returns_defaults_on_error() {
let temp_dir = TempDir::new().unwrap();
let collection =
Collection::create(temp_dir.path().to_path_buf(), 128, DistanceMetric::Cosine).unwrap();
let stats = collection.get_stats();
assert_eq!(stats.live_row_count(), 0);
}
#[test]
fn test_get_stats_uses_cache_within_ttl() {
let temp_dir = TempDir::new().unwrap();
let collection =
Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let stats1 = collection.get_stats();
assert_eq!(stats1.row_count, 0);
let stats2 = collection.get_stats();
assert_eq!(
stats1.row_count, stats2.row_count,
"get_stats should return cached value within TTL"
);
}
#[test]
fn test_get_stats_invalidated_after_upsert() {
use crate::point::Point;
let temp_dir = TempDir::new().unwrap();
let collection =
Collection::create(temp_dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let stats_before = collection.get_stats();
assert_eq!(stats_before.row_count, 0);
let points = vec![Point::new(1, vec![0.1, 0.2, 0.3, 0.4], None)];
collection.upsert(points).unwrap();
let stats_after = collection.get_stats();
assert_eq!(
stats_after.row_count, 1,
"get_stats should recompute after upsert invalidates the cache"
);
}
}