use common::{NamespaceId, Vector, VectorId};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct CompactionConfig {
pub target_segment_size: usize,
pub min_segment_size: usize,
pub max_merge_segments: usize,
pub garbage_threshold: f32,
pub auto_compact: bool,
pub compaction_interval_secs: u64,
pub max_concurrent_jobs: usize,
pub tombstone_ttl_secs: u64,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
target_segment_size: 10_000,
min_segment_size: 1_000,
max_merge_segments: 5,
garbage_threshold: 0.3, auto_compact: true,
compaction_interval_secs: 300, max_concurrent_jobs: 2,
tombstone_ttl_secs: 3600, }
}
}
impl CompactionConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_target_size(mut self, size: usize) -> Self {
self.target_segment_size = size;
self
}
pub fn with_min_size(mut self, size: usize) -> Self {
self.min_segment_size = size;
self
}
pub fn with_garbage_threshold(mut self, threshold: f32) -> Self {
self.garbage_threshold = threshold.clamp(0.0, 1.0);
self
}
pub fn without_auto_compact(mut self) -> Self {
self.auto_compact = false;
self
}
pub fn with_interval(mut self, secs: u64) -> Self {
self.compaction_interval_secs = secs;
self
}
}
pub type SegmentId = String;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SegmentState {
Active,
Sealed,
Compacting,
Tombstone,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SegmentMetadata {
pub id: SegmentId,
pub namespace: NamespaceId,
pub state: SegmentState,
pub live_count: usize,
pub deleted_count: usize,
pub size_bytes: usize,
pub created_at: u64,
pub updated_at: u64,
pub min_id: Option<VectorId>,
pub max_id: Option<VectorId>,
pub level: u32,
}
impl SegmentMetadata {
pub fn new(id: SegmentId, namespace: NamespaceId) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
id,
namespace,
state: SegmentState::Active,
live_count: 0,
deleted_count: 0,
size_bytes: 0,
created_at: now,
updated_at: now,
min_id: None,
max_id: None,
level: 0,
}
}
pub fn garbage_ratio(&self) -> f32 {
let total = self.live_count + self.deleted_count;
if total == 0 {
0.0
} else {
self.deleted_count as f32 / total as f32
}
}
pub fn is_empty(&self) -> bool {
self.live_count == 0
}
pub fn total_count(&self) -> usize {
self.live_count + self.deleted_count
}
pub fn needs_compaction(&self, threshold: f32) -> bool {
self.garbage_ratio() >= threshold
}
pub fn update_id_range(&mut self, id: &VectorId) {
match &self.min_id {
None => self.min_id = Some(id.clone()),
Some(min) if id < min => self.min_id = Some(id.clone()),
_ => {}
}
match &self.max_id {
None => self.max_id = Some(id.clone()),
Some(max) if id > max => self.max_id = Some(id.clone()),
_ => {}
}
}
}
#[derive(Debug, Clone)]
pub struct Segment {
pub metadata: SegmentMetadata,
vectors: HashMap<VectorId, Vector>,
tombstones: HashSet<VectorId>,
}
impl Segment {
pub fn new(id: SegmentId, namespace: NamespaceId) -> Self {
Self {
metadata: SegmentMetadata::new(id, namespace),
vectors: HashMap::new(),
tombstones: HashSet::new(),
}
}
pub fn add(&mut self, vector: Vector) {
let size = estimate_vector_size(&vector);
self.metadata.update_id_range(&vector.id);
if self.tombstones.remove(&vector.id) {
self.metadata.deleted_count = self.metadata.deleted_count.saturating_sub(1);
}
if self.vectors.contains_key(&vector.id) {
let old_size = self
.vectors
.get(&vector.id)
.map(estimate_vector_size)
.unwrap_or(0);
self.metadata.size_bytes = self.metadata.size_bytes.saturating_sub(old_size);
} else {
self.metadata.live_count += 1;
}
self.metadata.size_bytes += size;
self.metadata.updated_at = current_timestamp();
self.vectors.insert(vector.id.clone(), vector);
}
pub fn delete(&mut self, id: &VectorId) -> bool {
if let Some(vector) = self.vectors.remove(id) {
let size = estimate_vector_size(&vector);
self.metadata.live_count = self.metadata.live_count.saturating_sub(1);
self.metadata.deleted_count += 1;
self.metadata.size_bytes = self.metadata.size_bytes.saturating_sub(size);
self.metadata.updated_at = current_timestamp();
self.tombstones.insert(id.clone());
true
} else {
false
}
}
pub fn get(&self, id: &VectorId) -> Option<&Vector> {
if self.tombstones.contains(id) {
None
} else {
self.vectors.get(id)
}
}
pub fn contains(&self, id: &VectorId) -> bool {
!self.tombstones.contains(id) && self.vectors.contains_key(id)
}
pub fn is_tombstoned(&self, id: &VectorId) -> bool {
self.tombstones.contains(id)
}
pub fn live_vectors(&self) -> impl Iterator<Item = &Vector> {
self.vectors
.values()
.filter(|v| !self.tombstones.contains(&v.id))
}
pub fn all_ids(&self) -> impl Iterator<Item = &VectorId> {
self.vectors.keys()
}
pub fn tombstone_ids(&self) -> impl Iterator<Item = &VectorId> {
self.tombstones.iter()
}
pub fn seal(&mut self) {
self.metadata.state = SegmentState::Sealed;
self.metadata.updated_at = current_timestamp();
}
pub fn mark_compacting(&mut self) {
self.metadata.state = SegmentState::Compacting;
self.metadata.updated_at = current_timestamp();
}
pub fn mark_tombstone(&mut self) {
self.metadata.state = SegmentState::Tombstone;
self.metadata.updated_at = current_timestamp();
}
}
#[derive(Debug, Clone)]
pub struct CompactionResult {
pub merged_segments: Vec<SegmentId>,
pub new_segment: Option<SegmentId>,
pub vectors_compacted: usize,
pub tombstones_removed: usize,
pub bytes_reclaimed: usize,
pub duration_ms: u64,
}
#[derive(Debug, Clone)]
pub struct CompactionJob {
pub id: String,
pub namespace: NamespaceId,
pub source_segments: Vec<SegmentId>,
pub target_segment: SegmentId,
pub state: CompactionJobState,
pub progress: f32,
pub started_at: u64,
pub completed_at: Option<u64>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionJobState {
Pending,
Running,
Completed,
Failed,
Cancelled,
}
pub struct NamespaceSegmentManager {
namespace: NamespaceId,
config: CompactionConfig,
active_segment: RwLock<Option<Segment>>,
sealed_segments: RwLock<HashMap<SegmentId, Segment>>,
vector_index: RwLock<HashMap<VectorId, SegmentId>>,
segment_counter: AtomicU64,
stats: CompactionStats,
}
impl NamespaceSegmentManager {
pub fn new(namespace: NamespaceId, config: CompactionConfig) -> Self {
Self {
namespace,
config,
active_segment: RwLock::new(None),
sealed_segments: RwLock::new(HashMap::new()),
vector_index: RwLock::new(HashMap::new()),
segment_counter: AtomicU64::new(0),
stats: CompactionStats::default(),
}
}
fn generate_segment_id(&self) -> SegmentId {
let counter = self.segment_counter.fetch_add(1, Ordering::SeqCst);
format!("{}_{:016x}", self.namespace, counter)
}
fn ensure_active_segment(&self) -> SegmentId {
let mut active = self.active_segment.write();
if active.is_none() {
let id = self.generate_segment_id();
*active = Some(Segment::new(id.clone(), self.namespace.clone()));
return id;
}
let segment = active.as_ref().unwrap();
if segment.metadata.live_count >= self.config.target_segment_size {
let mut sealed = active.take().unwrap();
sealed.seal();
let sealed_id = sealed.metadata.id.clone();
self.sealed_segments.write().insert(sealed_id, sealed);
let id = self.generate_segment_id();
*active = Some(Segment::new(id.clone(), self.namespace.clone()));
return id;
}
segment.metadata.id.clone()
}
pub fn add(&self, vector: Vector) {
let segment_id = self.ensure_active_segment();
self.vector_index
.write()
.insert(vector.id.clone(), segment_id.clone());
if let Some(ref mut segment) = *self.active_segment.write() {
segment.add(vector);
}
self.stats.vectors_written.fetch_add(1, Ordering::Relaxed);
}
pub fn get(&self, id: &VectorId) -> Option<Vector> {
let segment_id = self.vector_index.read().get(id)?.clone();
if let Some(ref segment) = *self.active_segment.read() {
if segment.metadata.id == segment_id {
return segment.get(id).cloned();
}
}
self.sealed_segments
.read()
.get(&segment_id)
.and_then(|s| s.get(id).cloned())
}
pub fn delete(&self, id: &VectorId) -> bool {
let segment_id = match self.vector_index.read().get(id) {
Some(id) => id.clone(),
None => return false,
};
if let Some(ref mut segment) = *self.active_segment.write() {
if segment.metadata.id == segment_id && segment.delete(id) {
self.stats.vectors_deleted.fetch_add(1, Ordering::Relaxed);
return true;
}
}
if let Some(segment) = self.sealed_segments.write().get_mut(&segment_id) {
if segment.delete(id) {
self.stats.vectors_deleted.fetch_add(1, Ordering::Relaxed);
return true;
}
}
false
}
pub fn get_all(&self) -> Vec<Vector> {
let mut result = Vec::new();
if let Some(ref segment) = *self.active_segment.read() {
result.extend(segment.live_vectors().cloned());
}
for segment in self.sealed_segments.read().values() {
result.extend(segment.live_vectors().cloned());
}
result
}
pub fn segments_needing_compaction(&self) -> Vec<SegmentMetadata> {
let sealed = self.sealed_segments.read();
sealed
.values()
.filter(|s| {
s.metadata.state == SegmentState::Sealed
&& s.metadata.needs_compaction(self.config.garbage_threshold)
})
.map(|s| s.metadata.clone())
.collect()
}
pub fn small_segments(&self) -> Vec<SegmentMetadata> {
let sealed = self.sealed_segments.read();
sealed
.values()
.filter(|s| {
s.metadata.state == SegmentState::Sealed
&& s.metadata.live_count < self.config.min_segment_size
})
.map(|s| s.metadata.clone())
.collect()
}
pub fn compact(&self, segment_ids: &[SegmentId]) -> Option<CompactionResult> {
if segment_ids.is_empty() {
return None;
}
let start = std::time::Instant::now();
let mut sealed = self.sealed_segments.write();
let mut vector_index = self.vector_index.write();
let mut segments_to_merge: Vec<Segment> = Vec::new();
for id in segment_ids {
if let Some(mut segment) = sealed.remove(id) {
segment.mark_compacting();
segments_to_merge.push(segment);
}
}
if segments_to_merge.is_empty() {
return None;
}
let new_id = self.generate_segment_id();
let mut new_segment = Segment::new(new_id.clone(), self.namespace.clone());
new_segment.metadata.level = segments_to_merge
.iter()
.map(|s| s.metadata.level)
.max()
.unwrap_or(0)
+ 1;
let mut vectors_compacted = 0;
let mut tombstones_removed = 0;
let mut bytes_reclaimed = 0;
for segment in &segments_to_merge {
bytes_reclaimed += segment.metadata.size_bytes;
tombstones_removed += segment.tombstones.len();
for vector in segment.live_vectors() {
new_segment.add(vector.clone());
vector_index.insert(vector.id.clone(), new_id.clone());
vectors_compacted += 1;
}
for tombstone_id in segment.tombstone_ids() {
vector_index.remove(tombstone_id);
}
}
if new_segment.metadata.live_count >= self.config.min_segment_size {
new_segment.seal();
}
bytes_reclaimed = bytes_reclaimed.saturating_sub(new_segment.metadata.size_bytes);
let merged_ids: Vec<_> = segments_to_merge
.iter()
.map(|s| s.metadata.id.clone())
.collect();
sealed.insert(new_id.clone(), new_segment);
self.stats
.compactions_completed
.fetch_add(1, Ordering::Relaxed);
self.stats
.bytes_reclaimed
.fetch_add(bytes_reclaimed as u64, Ordering::Relaxed);
self.stats
.tombstones_collected
.fetch_add(tombstones_removed as u64, Ordering::Relaxed);
Some(CompactionResult {
merged_segments: merged_ids,
new_segment: Some(new_id),
vectors_compacted,
tombstones_removed,
bytes_reclaimed,
duration_ms: start.elapsed().as_millis() as u64,
})
}
pub fn auto_compact(&self) -> Vec<CompactionResult> {
let mut results = Vec::new();
let garbage_segments = self.segments_needing_compaction();
for segment in garbage_segments {
if let Some(result) = self.compact(&[segment.id]) {
results.push(result);
}
}
let small_segs = self.small_segments();
if small_segs.len() >= 2 {
let ids: Vec<_> = small_segs
.into_iter()
.take(self.config.max_merge_segments)
.map(|s| s.id)
.collect();
if let Some(result) = self.compact(&ids) {
results.push(result);
}
}
results
}
pub fn stats(&self) -> SegmentStats {
let active_count = if self.active_segment.read().is_some() {
1
} else {
0
};
let sealed = self.sealed_segments.read();
let total_live: usize = sealed.values().map(|s| s.metadata.live_count).sum();
let total_deleted: usize = sealed.values().map(|s| s.metadata.deleted_count).sum();
let total_size: usize = sealed.values().map(|s| s.metadata.size_bytes).sum();
let active_live = self
.active_segment
.read()
.as_ref()
.map(|s| s.metadata.live_count)
.unwrap_or(0);
let active_size = self
.active_segment
.read()
.as_ref()
.map(|s| s.metadata.size_bytes)
.unwrap_or(0);
SegmentStats {
namespace: self.namespace.clone(),
active_segments: active_count,
sealed_segments: sealed.len(),
total_live_vectors: total_live + active_live,
total_deleted_vectors: total_deleted,
total_size_bytes: total_size + active_size,
average_segment_size: if !sealed.is_empty() {
total_live / sealed.len()
} else {
0
},
garbage_ratio: if total_live + total_deleted > 0 {
total_deleted as f32 / (total_live + total_deleted) as f32
} else {
0.0
},
}
}
pub fn clear(&self) {
*self.active_segment.write() = None;
self.sealed_segments.write().clear();
self.vector_index.write().clear();
}
}
pub struct CompactionManager {
config: CompactionConfig,
namespaces: RwLock<HashMap<NamespaceId, Arc<NamespaceSegmentManager>>>,
global_stats: CompactionStats,
}
impl CompactionManager {
pub fn new(config: CompactionConfig) -> Self {
Self {
config,
namespaces: RwLock::new(HashMap::new()),
global_stats: CompactionStats::default(),
}
}
pub fn namespace(&self, namespace: &NamespaceId) -> Arc<NamespaceSegmentManager> {
let mut namespaces = self.namespaces.write();
if let Some(manager) = namespaces.get(namespace) {
return Arc::clone(manager);
}
let manager = Arc::new(NamespaceSegmentManager::new(
namespace.clone(),
self.config.clone(),
));
namespaces.insert(namespace.clone(), Arc::clone(&manager));
manager
}
pub fn add(&self, namespace: &NamespaceId, vector: Vector) {
self.namespace(namespace).add(vector);
}
pub fn get(&self, namespace: &NamespaceId, id: &VectorId) -> Option<Vector> {
self.namespaces.read().get(namespace)?.get(id)
}
pub fn delete(&self, namespace: &NamespaceId, id: &VectorId) -> bool {
match self.namespaces.read().get(namespace) {
Some(manager) => manager.delete(id),
None => false,
}
}
pub fn get_all(&self, namespace: &NamespaceId) -> Vec<Vector> {
match self.namespaces.read().get(namespace) {
Some(manager) => manager.get_all(),
None => Vec::new(),
}
}
pub fn compact_namespace(&self, namespace: &NamespaceId) -> Vec<CompactionResult> {
match self.namespaces.read().get(namespace) {
Some(manager) => manager.auto_compact(),
None => Vec::new(),
}
}
pub fn compact_all(&self) -> HashMap<NamespaceId, Vec<CompactionResult>> {
let namespaces = self.namespaces.read();
let mut results = HashMap::new();
for (namespace, manager) in namespaces.iter() {
let namespace_results = manager.auto_compact();
if !namespace_results.is_empty() {
results.insert(namespace.clone(), namespace_results);
}
}
results
}
pub fn namespace_stats(&self, namespace: &NamespaceId) -> Option<SegmentStats> {
self.namespaces.read().get(namespace).map(|m| m.stats())
}
pub fn global_stats(&self) -> GlobalCompactionStats {
let namespaces = self.namespaces.read();
let mut total_live = 0usize;
let mut total_deleted = 0usize;
let mut total_size = 0usize;
let mut total_segments = 0usize;
for manager in namespaces.values() {
let stats = manager.stats();
total_live += stats.total_live_vectors;
total_deleted += stats.total_deleted_vectors;
total_size += stats.total_size_bytes;
total_segments += stats.active_segments + stats.sealed_segments;
}
GlobalCompactionStats {
total_namespaces: namespaces.len(),
total_segments,
total_live_vectors: total_live,
total_deleted_vectors: total_deleted,
total_size_bytes: total_size,
overall_garbage_ratio: if total_live + total_deleted > 0 {
total_deleted as f32 / (total_live + total_deleted) as f32
} else {
0.0
},
compactions_completed: self
.global_stats
.compactions_completed
.load(Ordering::Relaxed),
bytes_reclaimed: self.global_stats.bytes_reclaimed.load(Ordering::Relaxed),
}
}
pub fn delete_namespace(&self, namespace: &NamespaceId) -> bool {
self.namespaces.write().remove(namespace).is_some()
}
pub fn list_namespaces(&self) -> Vec<NamespaceId> {
self.namespaces.read().keys().cloned().collect()
}
}
#[derive(Debug, Default)]
pub struct CompactionStats {
pub vectors_written: AtomicU64,
pub vectors_deleted: AtomicU64,
pub compactions_completed: AtomicU64,
pub bytes_reclaimed: AtomicU64,
pub tombstones_collected: AtomicU64,
}
#[derive(Debug, Clone)]
pub struct SegmentStats {
pub namespace: NamespaceId,
pub active_segments: usize,
pub sealed_segments: usize,
pub total_live_vectors: usize,
pub total_deleted_vectors: usize,
pub total_size_bytes: usize,
pub average_segment_size: usize,
pub garbage_ratio: f32,
}
#[derive(Debug, Clone)]
pub struct GlobalCompactionStats {
pub total_namespaces: usize,
pub total_segments: usize,
pub total_live_vectors: usize,
pub total_deleted_vectors: usize,
pub total_size_bytes: usize,
pub overall_garbage_ratio: f32,
pub compactions_completed: u64,
pub bytes_reclaimed: u64,
}
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
fn estimate_vector_size(vector: &Vector) -> usize {
let values_size = vector.values.len() * 4;
let id_size = vector.id.len();
let metadata_size = vector
.metadata
.as_ref()
.map(|m| serde_json::to_string(m).map(|s| s.len()).unwrap_or(0))
.unwrap_or(0);
values_size + id_size + metadata_size + 32 }
#[cfg(test)]
mod tests {
use super::*;
fn make_vector(id: &str, dim: usize) -> Vector {
Vector {
id: id.to_string(),
values: (0..dim).map(|i| i as f32).collect(),
metadata: None,
ttl_seconds: None,
expires_at: None,
}
}
#[test]
fn test_compaction_config_builder() {
let config = CompactionConfig::new()
.with_target_size(5000)
.with_min_size(500)
.with_garbage_threshold(0.4)
.without_auto_compact();
assert_eq!(config.target_segment_size, 5000);
assert_eq!(config.min_segment_size, 500);
assert!((config.garbage_threshold - 0.4).abs() < 0.001);
assert!(!config.auto_compact);
}
#[test]
fn test_segment_metadata() {
let mut meta = SegmentMetadata::new("seg1".to_string(), "ns1".to_string());
assert_eq!(meta.state, SegmentState::Active);
assert_eq!(meta.live_count, 0);
assert!(meta.is_empty());
assert!((meta.garbage_ratio() - 0.0).abs() < 0.001);
meta.live_count = 100;
meta.deleted_count = 50;
assert!(!meta.is_empty());
assert_eq!(meta.total_count(), 150);
assert!((meta.garbage_ratio() - 0.333).abs() < 0.01);
assert!(meta.needs_compaction(0.3));
assert!(!meta.needs_compaction(0.5));
}
#[test]
fn test_segment_operations() {
let mut segment = Segment::new("seg1".to_string(), "ns1".to_string());
segment.add(make_vector("v1", 128));
segment.add(make_vector("v2", 128));
segment.add(make_vector("v3", 128));
assert_eq!(segment.metadata.live_count, 3);
assert!(segment.contains(&"v1".to_string()));
assert!(segment.get(&"v1".to_string()).is_some());
assert!(segment.delete(&"v2".to_string()));
assert_eq!(segment.metadata.live_count, 2);
assert_eq!(segment.metadata.deleted_count, 1);
assert!(!segment.contains(&"v2".to_string()));
assert!(segment.is_tombstoned(&"v2".to_string()));
let live: Vec<_> = segment.live_vectors().collect();
assert_eq!(live.len(), 2);
}
#[test]
fn test_segment_seal() {
let mut segment = Segment::new("seg1".to_string(), "ns1".to_string());
segment.add(make_vector("v1", 128));
assert_eq!(segment.metadata.state, SegmentState::Active);
segment.seal();
assert_eq!(segment.metadata.state, SegmentState::Sealed);
}
#[test]
fn test_namespace_segment_manager() {
let config = CompactionConfig::new().with_target_size(100);
let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
for i in 0..50 {
manager.add(make_vector(&format!("v{}", i), 128));
}
assert_eq!(manager.get_all().len(), 50);
assert!(manager.get(&"v10".to_string()).is_some());
assert!(manager.delete(&"v5".to_string()));
assert!(manager.delete(&"v15".to_string()));
assert_eq!(manager.get_all().len(), 48);
assert!(manager.get(&"v5".to_string()).is_none());
}
#[test]
fn test_namespace_manager_auto_seal() {
let config = CompactionConfig::new()
.with_target_size(10)
.with_min_size(5);
let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
for i in 0..25 {
manager.add(make_vector(&format!("v{}", i), 64));
}
let stats = manager.stats();
assert!(stats.sealed_segments >= 1);
assert_eq!(stats.total_live_vectors, 25);
}
#[test]
fn test_compaction() {
let config = CompactionConfig::new()
.with_target_size(10)
.with_min_size(5)
.with_garbage_threshold(0.2);
let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
for i in 0..30 {
manager.add(make_vector(&format!("v{}", i), 64));
}
for i in 0..10 {
manager.delete(&format!("v{}", i));
}
let _results = manager.auto_compact();
assert_eq!(manager.get_all().len(), 20);
let stats = manager.stats();
assert_eq!(stats.total_live_vectors, 20);
}
#[test]
fn test_compaction_manager() {
let config = CompactionConfig::new().with_target_size(50);
let manager = CompactionManager::new(config);
for i in 0..20 {
manager.add(&"ns1".to_string(), make_vector(&format!("v{}", i), 128));
manager.add(&"ns2".to_string(), make_vector(&format!("v{}", i), 128));
}
assert_eq!(manager.get_all(&"ns1".to_string()).len(), 20);
assert_eq!(manager.get_all(&"ns2".to_string()).len(), 20);
assert_eq!(manager.list_namespaces().len(), 2);
assert!(manager.get(&"ns1".to_string(), &"v5".to_string()).is_some());
assert!(manager.delete(&"ns1".to_string(), &"v5".to_string()));
assert!(manager.get(&"ns1".to_string(), &"v5".to_string()).is_none());
let global = manager.global_stats();
assert_eq!(global.total_namespaces, 2);
assert_eq!(global.total_live_vectors, 39); }
#[test]
fn test_compaction_result() {
let config = CompactionConfig::new()
.with_target_size(5)
.with_min_size(2)
.with_garbage_threshold(0.1);
let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
for i in 0..15 {
manager.add(make_vector(&format!("v{}", i), 32));
}
for i in 0..5 {
manager.delete(&format!("v{}", i));
}
let _results = manager.auto_compact();
assert_eq!(manager.get_all().len(), 10);
for i in 5..15 {
assert!(manager.get(&format!("v{}", i)).is_some());
}
}
#[test]
fn test_segment_stats() {
let config = CompactionConfig::new().with_target_size(20);
let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
for i in 0..30 {
manager.add(make_vector(&format!("v{}", i), 64));
}
for i in 0..5 {
manager.delete(&format!("v{}", i));
}
let stats = manager.stats();
assert_eq!(stats.namespace, "ns1");
assert_eq!(stats.total_live_vectors, 25);
assert_eq!(stats.total_deleted_vectors, 5);
assert!(stats.garbage_ratio > 0.0);
}
#[test]
fn test_delete_namespace() {
let config = CompactionConfig::new();
let manager = CompactionManager::new(config);
manager.add(&"ns1".to_string(), make_vector("v1", 128));
manager.add(&"ns2".to_string(), make_vector("v1", 128));
assert_eq!(manager.list_namespaces().len(), 2);
assert!(manager.delete_namespace(&"ns1".to_string()));
assert_eq!(manager.list_namespaces().len(), 1);
assert!(!manager.delete_namespace(&"ns1".to_string())); }
#[test]
fn test_small_segment_merge() {
let config = CompactionConfig::new()
.with_target_size(100)
.with_min_size(20);
let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
for i in 0..10 {
manager.add(make_vector(&format!("v{}", i), 64));
}
let _small = manager.small_segments();
assert_eq!(manager.get_all().len(), 10);
}
#[test]
fn test_vector_update() {
let config = CompactionConfig::new();
let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
let v1 = Vector {
id: "v1".to_string(),
values: vec![1.0, 2.0, 3.0],
metadata: None,
ttl_seconds: None,
expires_at: None,
};
manager.add(v1);
let v1_updated = Vector {
id: "v1".to_string(),
values: vec![4.0, 5.0, 6.0],
metadata: None,
ttl_seconds: None,
expires_at: None,
};
manager.add(v1_updated);
assert_eq!(manager.get_all().len(), 1);
let retrieved = manager.get(&"v1".to_string()).unwrap();
assert_eq!(retrieved.values, vec![4.0, 5.0, 6.0]);
}
}