1use common::{NamespaceId, Vector, VectorId};
34use parking_lot::RwLock;
35use serde::{Deserialize, Serialize};
36use std::collections::{HashMap, HashSet};
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::sync::Arc;
39
40#[derive(Debug, Clone)]
46pub struct CompactionConfig {
47 pub target_segment_size: usize,
49 pub min_segment_size: usize,
51 pub max_merge_segments: usize,
53 pub garbage_threshold: f32,
55 pub auto_compact: bool,
57 pub compaction_interval_secs: u64,
59 pub max_concurrent_jobs: usize,
61 pub tombstone_ttl_secs: u64,
63}
64
65impl Default for CompactionConfig {
66 fn default() -> Self {
67 Self {
68 target_segment_size: 10_000,
69 min_segment_size: 1_000,
70 max_merge_segments: 5,
71 garbage_threshold: 0.3, auto_compact: true,
73 compaction_interval_secs: 300, max_concurrent_jobs: 2,
75 tombstone_ttl_secs: 3600, }
77 }
78}
79
80impl CompactionConfig {
81 pub fn new() -> Self {
83 Self::default()
84 }
85
86 pub fn with_target_size(mut self, size: usize) -> Self {
88 self.target_segment_size = size;
89 self
90 }
91
92 pub fn with_min_size(mut self, size: usize) -> Self {
94 self.min_segment_size = size;
95 self
96 }
97
98 pub fn with_garbage_threshold(mut self, threshold: f32) -> Self {
100 self.garbage_threshold = threshold.clamp(0.0, 1.0);
101 self
102 }
103
104 pub fn without_auto_compact(mut self) -> Self {
106 self.auto_compact = false;
107 self
108 }
109
110 pub fn with_interval(mut self, secs: u64) -> Self {
112 self.compaction_interval_secs = secs;
113 self
114 }
115}
116
117pub type SegmentId = String;
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
126pub enum SegmentState {
127 Active,
129 Sealed,
131 Compacting,
133 Tombstone,
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct SegmentMetadata {
140 pub id: SegmentId,
142 pub namespace: NamespaceId,
144 pub state: SegmentState,
146 pub live_count: usize,
148 pub deleted_count: usize,
150 pub size_bytes: usize,
152 pub created_at: u64,
154 pub updated_at: u64,
156 pub min_id: Option<VectorId>,
158 pub max_id: Option<VectorId>,
160 pub level: u32,
162}
163
164impl SegmentMetadata {
165 pub fn new(id: SegmentId, namespace: NamespaceId) -> Self {
167 let now = std::time::SystemTime::now()
168 .duration_since(std::time::UNIX_EPOCH)
169 .unwrap_or_default()
170 .as_secs();
171
172 Self {
173 id,
174 namespace,
175 state: SegmentState::Active,
176 live_count: 0,
177 deleted_count: 0,
178 size_bytes: 0,
179 created_at: now,
180 updated_at: now,
181 min_id: None,
182 max_id: None,
183 level: 0,
184 }
185 }
186
187 pub fn garbage_ratio(&self) -> f32 {
189 let total = self.live_count + self.deleted_count;
190 if total == 0 {
191 0.0
192 } else {
193 self.deleted_count as f32 / total as f32
194 }
195 }
196
197 pub fn is_empty(&self) -> bool {
199 self.live_count == 0
200 }
201
202 pub fn total_count(&self) -> usize {
204 self.live_count + self.deleted_count
205 }
206
207 pub fn needs_compaction(&self, threshold: f32) -> bool {
209 self.garbage_ratio() >= threshold
210 }
211
212 pub fn update_id_range(&mut self, id: &VectorId) {
214 match &self.min_id {
215 None => self.min_id = Some(id.clone()),
216 Some(min) if id < min => self.min_id = Some(id.clone()),
217 _ => {}
218 }
219 match &self.max_id {
220 None => self.max_id = Some(id.clone()),
221 Some(max) if id > max => self.max_id = Some(id.clone()),
222 _ => {}
223 }
224 }
225}
226
227#[derive(Debug, Clone)]
229pub struct Segment {
230 pub metadata: SegmentMetadata,
232 vectors: HashMap<VectorId, Vector>,
234 tombstones: HashSet<VectorId>,
236}
237
238impl Segment {
239 pub fn new(id: SegmentId, namespace: NamespaceId) -> Self {
241 Self {
242 metadata: SegmentMetadata::new(id, namespace),
243 vectors: HashMap::new(),
244 tombstones: HashSet::new(),
245 }
246 }
247
248 pub fn add(&mut self, vector: Vector) {
250 let size = estimate_vector_size(&vector);
251 self.metadata.update_id_range(&vector.id);
252
253 if self.tombstones.remove(&vector.id) {
255 self.metadata.deleted_count = self.metadata.deleted_count.saturating_sub(1);
256 }
257
258 if self.vectors.contains_key(&vector.id) {
260 let old_size = self
262 .vectors
263 .get(&vector.id)
264 .map(estimate_vector_size)
265 .unwrap_or(0);
266 self.metadata.size_bytes = self.metadata.size_bytes.saturating_sub(old_size);
267 } else {
268 self.metadata.live_count += 1;
269 }
270
271 self.metadata.size_bytes += size;
272 self.metadata.updated_at = current_timestamp();
273 self.vectors.insert(vector.id.clone(), vector);
274 }
275
276 pub fn delete(&mut self, id: &VectorId) -> bool {
278 if let Some(vector) = self.vectors.remove(id) {
279 let size = estimate_vector_size(&vector);
280 self.metadata.live_count = self.metadata.live_count.saturating_sub(1);
281 self.metadata.deleted_count += 1;
282 self.metadata.size_bytes = self.metadata.size_bytes.saturating_sub(size);
283 self.metadata.updated_at = current_timestamp();
284 self.tombstones.insert(id.clone());
285 true
286 } else {
287 false
288 }
289 }
290
291 pub fn get(&self, id: &VectorId) -> Option<&Vector> {
293 if self.tombstones.contains(id) {
294 None
295 } else {
296 self.vectors.get(id)
297 }
298 }
299
300 pub fn contains(&self, id: &VectorId) -> bool {
302 !self.tombstones.contains(id) && self.vectors.contains_key(id)
303 }
304
305 pub fn is_tombstoned(&self, id: &VectorId) -> bool {
307 self.tombstones.contains(id)
308 }
309
310 pub fn live_vectors(&self) -> impl Iterator<Item = &Vector> {
312 self.vectors
313 .values()
314 .filter(|v| !self.tombstones.contains(&v.id))
315 }
316
317 pub fn all_ids(&self) -> impl Iterator<Item = &VectorId> {
319 self.vectors.keys()
320 }
321
322 pub fn tombstone_ids(&self) -> impl Iterator<Item = &VectorId> {
324 self.tombstones.iter()
325 }
326
327 pub fn seal(&mut self) {
329 self.metadata.state = SegmentState::Sealed;
330 self.metadata.updated_at = current_timestamp();
331 }
332
333 pub fn mark_compacting(&mut self) {
335 self.metadata.state = SegmentState::Compacting;
336 self.metadata.updated_at = current_timestamp();
337 }
338
339 pub fn mark_tombstone(&mut self) {
341 self.metadata.state = SegmentState::Tombstone;
342 self.metadata.updated_at = current_timestamp();
343 }
344}
345
346#[derive(Debug, Clone)]
352pub struct CompactionResult {
353 pub merged_segments: Vec<SegmentId>,
355 pub new_segment: Option<SegmentId>,
357 pub vectors_compacted: usize,
359 pub tombstones_removed: usize,
361 pub bytes_reclaimed: usize,
363 pub duration_ms: u64,
365}
366
367#[derive(Debug, Clone)]
369pub struct CompactionJob {
370 pub id: String,
372 pub namespace: NamespaceId,
374 pub source_segments: Vec<SegmentId>,
376 pub target_segment: SegmentId,
378 pub state: CompactionJobState,
380 pub progress: f32,
382 pub started_at: u64,
384 pub completed_at: Option<u64>,
386 pub error: Option<String>,
388}
389
390#[derive(Debug, Clone, Copy, PartialEq, Eq)]
392pub enum CompactionJobState {
393 Pending,
395 Running,
397 Completed,
399 Failed,
401 Cancelled,
403}
404
405pub struct NamespaceSegmentManager {
411 namespace: NamespaceId,
413 config: CompactionConfig,
415 active_segment: RwLock<Option<Segment>>,
417 sealed_segments: RwLock<HashMap<SegmentId, Segment>>,
419 vector_index: RwLock<HashMap<VectorId, SegmentId>>,
421 segment_counter: AtomicU64,
423 stats: CompactionStats,
425}
426
427impl NamespaceSegmentManager {
428 pub fn new(namespace: NamespaceId, config: CompactionConfig) -> Self {
430 Self {
431 namespace,
432 config,
433 active_segment: RwLock::new(None),
434 sealed_segments: RwLock::new(HashMap::new()),
435 vector_index: RwLock::new(HashMap::new()),
436 segment_counter: AtomicU64::new(0),
437 stats: CompactionStats::default(),
438 }
439 }
440
441 fn generate_segment_id(&self) -> SegmentId {
443 let counter = self.segment_counter.fetch_add(1, Ordering::SeqCst);
444 format!("{}_{:016x}", self.namespace, counter)
445 }
446
447 fn ensure_active_segment(&self) -> SegmentId {
449 let mut active = self.active_segment.write();
450 if active.is_none() {
451 let id = self.generate_segment_id();
452 *active = Some(Segment::new(id.clone(), self.namespace.clone()));
453 return id;
454 }
455
456 let segment = active.as_ref().unwrap();
457
458 if segment.metadata.live_count >= self.config.target_segment_size {
460 let mut sealed = active.take().unwrap();
462 sealed.seal();
463 let sealed_id = sealed.metadata.id.clone();
464
465 self.sealed_segments.write().insert(sealed_id, sealed);
467
468 let id = self.generate_segment_id();
470 *active = Some(Segment::new(id.clone(), self.namespace.clone()));
471 return id;
472 }
473
474 segment.metadata.id.clone()
475 }
476
477 pub fn add(&self, vector: Vector) {
479 let segment_id = self.ensure_active_segment();
480
481 self.vector_index
483 .write()
484 .insert(vector.id.clone(), segment_id.clone());
485
486 if let Some(ref mut segment) = *self.active_segment.write() {
488 segment.add(vector);
489 }
490
491 self.stats.vectors_written.fetch_add(1, Ordering::Relaxed);
492 }
493
494 pub fn get(&self, id: &VectorId) -> Option<Vector> {
496 let segment_id = self.vector_index.read().get(id)?.clone();
498
499 if let Some(ref segment) = *self.active_segment.read() {
501 if segment.metadata.id == segment_id {
502 return segment.get(id).cloned();
503 }
504 }
505
506 self.sealed_segments
508 .read()
509 .get(&segment_id)
510 .and_then(|s| s.get(id).cloned())
511 }
512
513 pub fn delete(&self, id: &VectorId) -> bool {
515 let segment_id = match self.vector_index.read().get(id) {
517 Some(id) => id.clone(),
518 None => return false,
519 };
520
521 if let Some(ref mut segment) = *self.active_segment.write() {
523 if segment.metadata.id == segment_id && segment.delete(id) {
524 self.stats.vectors_deleted.fetch_add(1, Ordering::Relaxed);
525 return true;
526 }
527 }
528
529 if let Some(segment) = self.sealed_segments.write().get_mut(&segment_id) {
531 if segment.delete(id) {
532 self.stats.vectors_deleted.fetch_add(1, Ordering::Relaxed);
533 return true;
534 }
535 }
536
537 false
538 }
539
540 pub fn get_all(&self) -> Vec<Vector> {
542 let mut result = Vec::new();
543
544 if let Some(ref segment) = *self.active_segment.read() {
546 result.extend(segment.live_vectors().cloned());
547 }
548
549 for segment in self.sealed_segments.read().values() {
551 result.extend(segment.live_vectors().cloned());
552 }
553
554 result
555 }
556
557 pub fn segments_needing_compaction(&self) -> Vec<SegmentMetadata> {
559 let sealed = self.sealed_segments.read();
560 sealed
561 .values()
562 .filter(|s| {
563 s.metadata.state == SegmentState::Sealed
564 && s.metadata.needs_compaction(self.config.garbage_threshold)
565 })
566 .map(|s| s.metadata.clone())
567 .collect()
568 }
569
570 pub fn small_segments(&self) -> Vec<SegmentMetadata> {
572 let sealed = self.sealed_segments.read();
573 sealed
574 .values()
575 .filter(|s| {
576 s.metadata.state == SegmentState::Sealed
577 && s.metadata.live_count < self.config.min_segment_size
578 })
579 .map(|s| s.metadata.clone())
580 .collect()
581 }
582
583 pub fn compact(&self, segment_ids: &[SegmentId]) -> Option<CompactionResult> {
585 if segment_ids.is_empty() {
586 return None;
587 }
588
589 let start = std::time::Instant::now();
590 let mut sealed = self.sealed_segments.write();
591 let mut vector_index = self.vector_index.write();
592
593 let mut segments_to_merge: Vec<Segment> = Vec::new();
595 for id in segment_ids {
596 if let Some(mut segment) = sealed.remove(id) {
597 segment.mark_compacting();
598 segments_to_merge.push(segment);
599 }
600 }
601
602 if segments_to_merge.is_empty() {
603 return None;
604 }
605
606 let new_id = self.generate_segment_id();
608 let mut new_segment = Segment::new(new_id.clone(), self.namespace.clone());
609 new_segment.metadata.level = segments_to_merge
610 .iter()
611 .map(|s| s.metadata.level)
612 .max()
613 .unwrap_or(0)
614 + 1;
615
616 let mut vectors_compacted = 0;
617 let mut tombstones_removed = 0;
618 let mut bytes_reclaimed = 0;
619
620 for segment in &segments_to_merge {
622 bytes_reclaimed += segment.metadata.size_bytes;
623 tombstones_removed += segment.tombstones.len();
624
625 for vector in segment.live_vectors() {
626 new_segment.add(vector.clone());
627 vector_index.insert(vector.id.clone(), new_id.clone());
628 vectors_compacted += 1;
629 }
630
631 for tombstone_id in segment.tombstone_ids() {
633 vector_index.remove(tombstone_id);
634 }
635 }
636
637 if new_segment.metadata.live_count >= self.config.min_segment_size {
639 new_segment.seal();
640 }
641
642 bytes_reclaimed = bytes_reclaimed.saturating_sub(new_segment.metadata.size_bytes);
644
645 let merged_ids: Vec<_> = segments_to_merge
647 .iter()
648 .map(|s| s.metadata.id.clone())
649 .collect();
650 sealed.insert(new_id.clone(), new_segment);
651
652 self.stats
654 .compactions_completed
655 .fetch_add(1, Ordering::Relaxed);
656 self.stats
657 .bytes_reclaimed
658 .fetch_add(bytes_reclaimed as u64, Ordering::Relaxed);
659 self.stats
660 .tombstones_collected
661 .fetch_add(tombstones_removed as u64, Ordering::Relaxed);
662
663 Some(CompactionResult {
664 merged_segments: merged_ids,
665 new_segment: Some(new_id),
666 vectors_compacted,
667 tombstones_removed,
668 bytes_reclaimed,
669 duration_ms: start.elapsed().as_millis() as u64,
670 })
671 }
672
673 pub fn auto_compact(&self) -> Vec<CompactionResult> {
675 let mut results = Vec::new();
676
677 let garbage_segments = self.segments_needing_compaction();
679 for segment in garbage_segments {
680 if let Some(result) = self.compact(&[segment.id]) {
681 results.push(result);
682 }
683 }
684
685 let small_segs = self.small_segments();
687 if small_segs.len() >= 2 {
688 let ids: Vec<_> = small_segs
689 .into_iter()
690 .take(self.config.max_merge_segments)
691 .map(|s| s.id)
692 .collect();
693
694 if let Some(result) = self.compact(&ids) {
695 results.push(result);
696 }
697 }
698
699 results
700 }
701
702 pub fn stats(&self) -> SegmentStats {
704 let active_count = if self.active_segment.read().is_some() {
705 1
706 } else {
707 0
708 };
709 let sealed = self.sealed_segments.read();
710
711 let total_live: usize = sealed.values().map(|s| s.metadata.live_count).sum();
712 let total_deleted: usize = sealed.values().map(|s| s.metadata.deleted_count).sum();
713 let total_size: usize = sealed.values().map(|s| s.metadata.size_bytes).sum();
714
715 let active_live = self
716 .active_segment
717 .read()
718 .as_ref()
719 .map(|s| s.metadata.live_count)
720 .unwrap_or(0);
721 let active_size = self
722 .active_segment
723 .read()
724 .as_ref()
725 .map(|s| s.metadata.size_bytes)
726 .unwrap_or(0);
727
728 SegmentStats {
729 namespace: self.namespace.clone(),
730 active_segments: active_count,
731 sealed_segments: sealed.len(),
732 total_live_vectors: total_live + active_live,
733 total_deleted_vectors: total_deleted,
734 total_size_bytes: total_size + active_size,
735 average_segment_size: if !sealed.is_empty() {
736 total_live / sealed.len()
737 } else {
738 0
739 },
740 garbage_ratio: if total_live + total_deleted > 0 {
741 total_deleted as f32 / (total_live + total_deleted) as f32
742 } else {
743 0.0
744 },
745 }
746 }
747
748 pub fn clear(&self) {
750 *self.active_segment.write() = None;
751 self.sealed_segments.write().clear();
752 self.vector_index.write().clear();
753 }
754}
755
756pub struct CompactionManager {
762 config: CompactionConfig,
764 namespaces: RwLock<HashMap<NamespaceId, Arc<NamespaceSegmentManager>>>,
766 global_stats: CompactionStats,
768}
769
770impl CompactionManager {
771 pub fn new(config: CompactionConfig) -> Self {
773 Self {
774 config,
775 namespaces: RwLock::new(HashMap::new()),
776 global_stats: CompactionStats::default(),
777 }
778 }
779
780 pub fn namespace(&self, namespace: &NamespaceId) -> Arc<NamespaceSegmentManager> {
782 let mut namespaces = self.namespaces.write();
783
784 if let Some(manager) = namespaces.get(namespace) {
785 return Arc::clone(manager);
786 }
787
788 let manager = Arc::new(NamespaceSegmentManager::new(
789 namespace.clone(),
790 self.config.clone(),
791 ));
792 namespaces.insert(namespace.clone(), Arc::clone(&manager));
793 manager
794 }
795
796 pub fn add(&self, namespace: &NamespaceId, vector: Vector) {
798 self.namespace(namespace).add(vector);
799 }
800
801 pub fn get(&self, namespace: &NamespaceId, id: &VectorId) -> Option<Vector> {
803 self.namespaces.read().get(namespace)?.get(id)
804 }
805
806 pub fn delete(&self, namespace: &NamespaceId, id: &VectorId) -> bool {
808 match self.namespaces.read().get(namespace) {
809 Some(manager) => manager.delete(id),
810 None => false,
811 }
812 }
813
814 pub fn get_all(&self, namespace: &NamespaceId) -> Vec<Vector> {
816 match self.namespaces.read().get(namespace) {
817 Some(manager) => manager.get_all(),
818 None => Vec::new(),
819 }
820 }
821
822 pub fn compact_namespace(&self, namespace: &NamespaceId) -> Vec<CompactionResult> {
824 match self.namespaces.read().get(namespace) {
825 Some(manager) => manager.auto_compact(),
826 None => Vec::new(),
827 }
828 }
829
830 pub fn compact_all(&self) -> HashMap<NamespaceId, Vec<CompactionResult>> {
832 let namespaces = self.namespaces.read();
833 let mut results = HashMap::new();
834
835 for (namespace, manager) in namespaces.iter() {
836 let namespace_results = manager.auto_compact();
837 if !namespace_results.is_empty() {
838 results.insert(namespace.clone(), namespace_results);
839 }
840 }
841
842 results
843 }
844
845 pub fn namespace_stats(&self, namespace: &NamespaceId) -> Option<SegmentStats> {
847 self.namespaces.read().get(namespace).map(|m| m.stats())
848 }
849
850 pub fn global_stats(&self) -> GlobalCompactionStats {
852 let namespaces = self.namespaces.read();
853
854 let mut total_live = 0usize;
855 let mut total_deleted = 0usize;
856 let mut total_size = 0usize;
857 let mut total_segments = 0usize;
858
859 for manager in namespaces.values() {
860 let stats = manager.stats();
861 total_live += stats.total_live_vectors;
862 total_deleted += stats.total_deleted_vectors;
863 total_size += stats.total_size_bytes;
864 total_segments += stats.active_segments + stats.sealed_segments;
865 }
866
867 GlobalCompactionStats {
868 total_namespaces: namespaces.len(),
869 total_segments,
870 total_live_vectors: total_live,
871 total_deleted_vectors: total_deleted,
872 total_size_bytes: total_size,
873 overall_garbage_ratio: if total_live + total_deleted > 0 {
874 total_deleted as f32 / (total_live + total_deleted) as f32
875 } else {
876 0.0
877 },
878 compactions_completed: self
879 .global_stats
880 .compactions_completed
881 .load(Ordering::Relaxed),
882 bytes_reclaimed: self.global_stats.bytes_reclaimed.load(Ordering::Relaxed),
883 }
884 }
885
886 pub fn delete_namespace(&self, namespace: &NamespaceId) -> bool {
888 self.namespaces.write().remove(namespace).is_some()
889 }
890
891 pub fn list_namespaces(&self) -> Vec<NamespaceId> {
893 self.namespaces.read().keys().cloned().collect()
894 }
895}
896
897#[derive(Debug, Default)]
903pub struct CompactionStats {
904 pub vectors_written: AtomicU64,
905 pub vectors_deleted: AtomicU64,
906 pub compactions_completed: AtomicU64,
907 pub bytes_reclaimed: AtomicU64,
908 pub tombstones_collected: AtomicU64,
909}
910
911#[derive(Debug, Clone)]
913pub struct SegmentStats {
914 pub namespace: NamespaceId,
915 pub active_segments: usize,
916 pub sealed_segments: usize,
917 pub total_live_vectors: usize,
918 pub total_deleted_vectors: usize,
919 pub total_size_bytes: usize,
920 pub average_segment_size: usize,
921 pub garbage_ratio: f32,
922}
923
924#[derive(Debug, Clone)]
926pub struct GlobalCompactionStats {
927 pub total_namespaces: usize,
928 pub total_segments: usize,
929 pub total_live_vectors: usize,
930 pub total_deleted_vectors: usize,
931 pub total_size_bytes: usize,
932 pub overall_garbage_ratio: f32,
933 pub compactions_completed: u64,
934 pub bytes_reclaimed: u64,
935}
936
937fn current_timestamp() -> u64 {
943 std::time::SystemTime::now()
944 .duration_since(std::time::UNIX_EPOCH)
945 .unwrap_or_default()
946 .as_secs()
947}
948
949fn estimate_vector_size(vector: &Vector) -> usize {
951 let values_size = vector.values.len() * 4;
952 let id_size = vector.id.len();
953 let metadata_size = vector
954 .metadata
955 .as_ref()
956 .map(|m| serde_json::to_string(m).map(|s| s.len()).unwrap_or(0))
957 .unwrap_or(0);
958
959 values_size + id_size + metadata_size + 32 }
961
962#[cfg(test)]
967mod tests {
968 use super::*;
969
970 fn make_vector(id: &str, dim: usize) -> Vector {
971 Vector {
972 id: id.to_string(),
973 values: (0..dim).map(|i| i as f32).collect(),
974 metadata: None,
975 ttl_seconds: None,
976 expires_at: None,
977 }
978 }
979
980 #[test]
981 fn test_compaction_config_builder() {
982 let config = CompactionConfig::new()
983 .with_target_size(5000)
984 .with_min_size(500)
985 .with_garbage_threshold(0.4)
986 .without_auto_compact();
987
988 assert_eq!(config.target_segment_size, 5000);
989 assert_eq!(config.min_segment_size, 500);
990 assert!((config.garbage_threshold - 0.4).abs() < 0.001);
991 assert!(!config.auto_compact);
992 }
993
994 #[test]
995 fn test_segment_metadata() {
996 let mut meta = SegmentMetadata::new("seg1".to_string(), "ns1".to_string());
997
998 assert_eq!(meta.state, SegmentState::Active);
999 assert_eq!(meta.live_count, 0);
1000 assert!(meta.is_empty());
1001 assert!((meta.garbage_ratio() - 0.0).abs() < 0.001);
1002
1003 meta.live_count = 100;
1004 meta.deleted_count = 50;
1005
1006 assert!(!meta.is_empty());
1007 assert_eq!(meta.total_count(), 150);
1008 assert!((meta.garbage_ratio() - 0.333).abs() < 0.01);
1009 assert!(meta.needs_compaction(0.3));
1010 assert!(!meta.needs_compaction(0.5));
1011 }
1012
1013 #[test]
1014 fn test_segment_operations() {
1015 let mut segment = Segment::new("seg1".to_string(), "ns1".to_string());
1016
1017 segment.add(make_vector("v1", 128));
1019 segment.add(make_vector("v2", 128));
1020 segment.add(make_vector("v3", 128));
1021
1022 assert_eq!(segment.metadata.live_count, 3);
1023 assert!(segment.contains(&"v1".to_string()));
1024 assert!(segment.get(&"v1".to_string()).is_some());
1025
1026 assert!(segment.delete(&"v2".to_string()));
1028 assert_eq!(segment.metadata.live_count, 2);
1029 assert_eq!(segment.metadata.deleted_count, 1);
1030 assert!(!segment.contains(&"v2".to_string()));
1031 assert!(segment.is_tombstoned(&"v2".to_string()));
1032
1033 let live: Vec<_> = segment.live_vectors().collect();
1035 assert_eq!(live.len(), 2);
1036 }
1037
1038 #[test]
1039 fn test_segment_seal() {
1040 let mut segment = Segment::new("seg1".to_string(), "ns1".to_string());
1041 segment.add(make_vector("v1", 128));
1042
1043 assert_eq!(segment.metadata.state, SegmentState::Active);
1044
1045 segment.seal();
1046 assert_eq!(segment.metadata.state, SegmentState::Sealed);
1047 }
1048
1049 #[test]
1050 fn test_namespace_segment_manager() {
1051 let config = CompactionConfig::new().with_target_size(100);
1052 let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1053
1054 for i in 0..50 {
1056 manager.add(make_vector(&format!("v{}", i), 128));
1057 }
1058
1059 assert_eq!(manager.get_all().len(), 50);
1060 assert!(manager.get(&"v10".to_string()).is_some());
1061
1062 assert!(manager.delete(&"v5".to_string()));
1064 assert!(manager.delete(&"v15".to_string()));
1065
1066 assert_eq!(manager.get_all().len(), 48);
1067 assert!(manager.get(&"v5".to_string()).is_none());
1068 }
1069
1070 #[test]
1071 fn test_namespace_manager_auto_seal() {
1072 let config = CompactionConfig::new()
1073 .with_target_size(10)
1074 .with_min_size(5);
1075 let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1076
1077 for i in 0..25 {
1079 manager.add(make_vector(&format!("v{}", i), 64));
1080 }
1081
1082 let stats = manager.stats();
1083 assert!(stats.sealed_segments >= 1);
1085 assert_eq!(stats.total_live_vectors, 25);
1086 }
1087
1088 #[test]
1089 fn test_compaction() {
1090 let config = CompactionConfig::new()
1091 .with_target_size(10)
1092 .with_min_size(5)
1093 .with_garbage_threshold(0.2);
1094 let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1095
1096 for i in 0..30 {
1098 manager.add(make_vector(&format!("v{}", i), 64));
1099 }
1100
1101 for i in 0..10 {
1103 manager.delete(&format!("v{}", i));
1104 }
1105
1106 let _results = manager.auto_compact();
1108
1109 assert_eq!(manager.get_all().len(), 20);
1111
1112 let stats = manager.stats();
1114 assert_eq!(stats.total_live_vectors, 20);
1115 }
1116
1117 #[test]
1118 fn test_compaction_manager() {
1119 let config = CompactionConfig::new().with_target_size(50);
1120 let manager = CompactionManager::new(config);
1121
1122 for i in 0..20 {
1124 manager.add(&"ns1".to_string(), make_vector(&format!("v{}", i), 128));
1125 manager.add(&"ns2".to_string(), make_vector(&format!("v{}", i), 128));
1126 }
1127
1128 assert_eq!(manager.get_all(&"ns1".to_string()).len(), 20);
1129 assert_eq!(manager.get_all(&"ns2".to_string()).len(), 20);
1130 assert_eq!(manager.list_namespaces().len(), 2);
1131
1132 assert!(manager.get(&"ns1".to_string(), &"v5".to_string()).is_some());
1134
1135 assert!(manager.delete(&"ns1".to_string(), &"v5".to_string()));
1137 assert!(manager.get(&"ns1".to_string(), &"v5".to_string()).is_none());
1138
1139 let global = manager.global_stats();
1141 assert_eq!(global.total_namespaces, 2);
1142 assert_eq!(global.total_live_vectors, 39); }
1144
1145 #[test]
1146 fn test_compaction_result() {
1147 let config = CompactionConfig::new()
1148 .with_target_size(5)
1149 .with_min_size(2)
1150 .with_garbage_threshold(0.1);
1151 let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1152
1153 for i in 0..15 {
1155 manager.add(make_vector(&format!("v{}", i), 32));
1156 }
1157
1158 for i in 0..5 {
1160 manager.delete(&format!("v{}", i));
1161 }
1162
1163 let _results = manager.auto_compact();
1165
1166 assert_eq!(manager.get_all().len(), 10);
1168 for i in 5..15 {
1169 assert!(manager.get(&format!("v{}", i)).is_some());
1170 }
1171 }
1172
1173 #[test]
1174 fn test_segment_stats() {
1175 let config = CompactionConfig::new().with_target_size(20);
1176 let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1177
1178 for i in 0..30 {
1179 manager.add(make_vector(&format!("v{}", i), 64));
1180 }
1181
1182 for i in 0..5 {
1184 manager.delete(&format!("v{}", i));
1185 }
1186
1187 let stats = manager.stats();
1188 assert_eq!(stats.namespace, "ns1");
1189 assert_eq!(stats.total_live_vectors, 25);
1190 assert_eq!(stats.total_deleted_vectors, 5);
1191 assert!(stats.garbage_ratio > 0.0);
1192 }
1193
1194 #[test]
1195 fn test_delete_namespace() {
1196 let config = CompactionConfig::new();
1197 let manager = CompactionManager::new(config);
1198
1199 manager.add(&"ns1".to_string(), make_vector("v1", 128));
1200 manager.add(&"ns2".to_string(), make_vector("v1", 128));
1201
1202 assert_eq!(manager.list_namespaces().len(), 2);
1203
1204 assert!(manager.delete_namespace(&"ns1".to_string()));
1205 assert_eq!(manager.list_namespaces().len(), 1);
1206
1207 assert!(!manager.delete_namespace(&"ns1".to_string())); }
1209
1210 #[test]
1211 fn test_small_segment_merge() {
1212 let config = CompactionConfig::new()
1213 .with_target_size(100)
1214 .with_min_size(20);
1215 let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1216
1217 for i in 0..10 {
1219 manager.add(make_vector(&format!("v{}", i), 64));
1220 }
1221
1222 let _small = manager.small_segments();
1224 assert_eq!(manager.get_all().len(), 10);
1228 }
1229
1230 #[test]
1231 fn test_vector_update() {
1232 let config = CompactionConfig::new();
1233 let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1234
1235 let v1 = Vector {
1237 id: "v1".to_string(),
1238 values: vec![1.0, 2.0, 3.0],
1239 metadata: None,
1240 ttl_seconds: None,
1241 expires_at: None,
1242 };
1243 manager.add(v1);
1244
1245 let v1_updated = Vector {
1247 id: "v1".to_string(),
1248 values: vec![4.0, 5.0, 6.0],
1249 metadata: None,
1250 ttl_seconds: None,
1251 expires_at: None,
1252 };
1253 manager.add(v1_updated);
1254
1255 assert_eq!(manager.get_all().len(), 1);
1257
1258 let retrieved = manager.get(&"v1".to_string()).unwrap();
1260 assert_eq!(retrieved.values, vec![4.0, 5.0, 6.0]);
1261 }
1262}