Skip to main content

storage/
compaction.rs

1//! Compaction Module for Object Storage
2//!
3//! This module provides segment management and compaction for object storage backends.
4//! It helps maintain storage efficiency by:
5//!
6//! - Organizing vectors into segments for batch I/O
7//! - Merging small segments into larger ones
8//! - Garbage collecting deleted vectors
9//! - Running background compaction based on configurable thresholds
10//!
11//! # Architecture
12//!
13//! ```text
14//! ┌─────────────────────────────────────────────────────┐
15//! │                  CompactionManager                   │
16//! │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
17//! │  │  Segment 1  │  │  Segment 2  │  │  Segment N  │ │
18//! │  │  (1000 vec) │  │  (500 vec)  │  │  (750 vec)  │ │
19//! │  └─────────────┘  └─────────────┘  └─────────────┘ │
20//! │         │                │                │         │
21//! │         └────────────────┼────────────────┘         │
22//! │                          ▼                          │
23//! │                   Compaction Job                    │
24//! │                          │                          │
25//! │                          ▼                          │
26//! │              ┌─────────────────────┐               │
27//! │              │   Merged Segment    │               │
28//! │              │    (2250 vec)       │               │
29//! │              └─────────────────────┘               │
30//! └─────────────────────────────────────────────────────┘
31//! ```
32
33use common::{NamespaceId, Vector, VectorId};
34use parking_lot::RwLock;
35use serde::{Deserialize, Serialize};
36use std::collections::{HashMap, HashSet};
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::sync::Arc;
39
40// ============================================================================
41// Compaction Configuration
42// ============================================================================
43
44/// Configuration for compaction operations
45#[derive(Debug, Clone)]
46pub struct CompactionConfig {
47    /// Target segment size (number of vectors)
48    pub target_segment_size: usize,
49    /// Minimum segment size before considering for merge
50    pub min_segment_size: usize,
51    /// Maximum segments to merge in one operation
52    pub max_merge_segments: usize,
53    /// Garbage ratio threshold to trigger compaction (0.0 - 1.0)
54    pub garbage_threshold: f32,
55    /// Enable automatic background compaction
56    pub auto_compact: bool,
57    /// Interval between compaction checks (seconds)
58    pub compaction_interval_secs: u64,
59    /// Maximum concurrent compaction jobs
60    pub max_concurrent_jobs: usize,
61    /// Preserve tombstones for this duration (seconds) for consistency
62    pub tombstone_ttl_secs: u64,
63}
64
65impl Default for CompactionConfig {
66    fn default() -> Self {
67        Self {
68            target_segment_size: 10_000,
69            min_segment_size: 1_000,
70            max_merge_segments: 5,
71            garbage_threshold: 0.3, // Compact when 30% is garbage
72            auto_compact: true,
73            compaction_interval_secs: 300, // 5 minutes
74            max_concurrent_jobs: 2,
75            tombstone_ttl_secs: 3600, // 1 hour
76        }
77    }
78}
79
80impl CompactionConfig {
81    /// Create a new config
82    pub fn new() -> Self {
83        Self::default()
84    }
85
86    /// Set target segment size
87    pub fn with_target_size(mut self, size: usize) -> Self {
88        self.target_segment_size = size;
89        self
90    }
91
92    /// Set minimum segment size
93    pub fn with_min_size(mut self, size: usize) -> Self {
94        self.min_segment_size = size;
95        self
96    }
97
98    /// Set garbage threshold
99    pub fn with_garbage_threshold(mut self, threshold: f32) -> Self {
100        self.garbage_threshold = threshold.clamp(0.0, 1.0);
101        self
102    }
103
104    /// Disable automatic compaction
105    pub fn without_auto_compact(mut self) -> Self {
106        self.auto_compact = false;
107        self
108    }
109
110    /// Set compaction interval
111    pub fn with_interval(mut self, secs: u64) -> Self {
112        self.compaction_interval_secs = secs;
113        self
114    }
115}
116
117// ============================================================================
118// Segment Types
119// ============================================================================
120
121/// Unique identifier for a segment
122pub type SegmentId = String;
123
124/// Segment state
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
126pub enum SegmentState {
127    /// Segment is active and accepting writes
128    Active,
129    /// Segment is sealed and read-only
130    Sealed,
131    /// Segment is being compacted
132    Compacting,
133    /// Segment is marked for deletion
134    Tombstone,
135}
136
137/// Metadata for a segment
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct SegmentMetadata {
140    /// Segment ID
141    pub id: SegmentId,
142    /// Namespace this segment belongs to
143    pub namespace: NamespaceId,
144    /// Current state
145    pub state: SegmentState,
146    /// Number of live vectors
147    pub live_count: usize,
148    /// Number of deleted vectors (garbage)
149    pub deleted_count: usize,
150    /// Total size in bytes (estimated)
151    pub size_bytes: usize,
152    /// Creation timestamp
153    pub created_at: u64,
154    /// Last modified timestamp
155    pub updated_at: u64,
156    /// Minimum vector ID in segment (for range queries)
157    pub min_id: Option<VectorId>,
158    /// Maximum vector ID in segment (for range queries)
159    pub max_id: Option<VectorId>,
160    /// Level in LSM-like hierarchy (0 = most recent)
161    pub level: u32,
162}
163
164impl SegmentMetadata {
165    /// Create new segment metadata
166    pub fn new(id: SegmentId, namespace: NamespaceId) -> Self {
167        let now = std::time::SystemTime::now()
168            .duration_since(std::time::UNIX_EPOCH)
169            .unwrap_or_default()
170            .as_secs();
171
172        Self {
173            id,
174            namespace,
175            state: SegmentState::Active,
176            live_count: 0,
177            deleted_count: 0,
178            size_bytes: 0,
179            created_at: now,
180            updated_at: now,
181            min_id: None,
182            max_id: None,
183            level: 0,
184        }
185    }
186
187    /// Calculate garbage ratio
188    pub fn garbage_ratio(&self) -> f32 {
189        let total = self.live_count + self.deleted_count;
190        if total == 0 {
191            0.0
192        } else {
193            self.deleted_count as f32 / total as f32
194        }
195    }
196
197    /// Check if segment is empty
198    pub fn is_empty(&self) -> bool {
199        self.live_count == 0
200    }
201
202    /// Total vector count (live + deleted)
203    pub fn total_count(&self) -> usize {
204        self.live_count + self.deleted_count
205    }
206
207    /// Check if segment should be compacted based on garbage ratio
208    pub fn needs_compaction(&self, threshold: f32) -> bool {
209        self.garbage_ratio() >= threshold
210    }
211
212    /// Update the ID range
213    pub fn update_id_range(&mut self, id: &VectorId) {
214        match &self.min_id {
215            None => self.min_id = Some(id.clone()),
216            Some(min) if id < min => self.min_id = Some(id.clone()),
217            _ => {}
218        }
219        match &self.max_id {
220            None => self.max_id = Some(id.clone()),
221            Some(max) if id > max => self.max_id = Some(id.clone()),
222            _ => {}
223        }
224    }
225}
226
227/// A segment containing vectors
228#[derive(Debug, Clone)]
229pub struct Segment {
230    /// Segment metadata
231    pub metadata: SegmentMetadata,
232    /// Vectors in this segment
233    vectors: HashMap<VectorId, Vector>,
234    /// Set of deleted vector IDs (tombstones)
235    tombstones: HashSet<VectorId>,
236}
237
238impl Segment {
239    /// Create a new segment
240    pub fn new(id: SegmentId, namespace: NamespaceId) -> Self {
241        Self {
242            metadata: SegmentMetadata::new(id, namespace),
243            vectors: HashMap::new(),
244            tombstones: HashSet::new(),
245        }
246    }
247
248    /// Add a vector to the segment
249    pub fn add(&mut self, vector: Vector) {
250        let size = estimate_vector_size(&vector);
251        self.metadata.update_id_range(&vector.id);
252
253        // Remove from tombstones if re-adding
254        if self.tombstones.remove(&vector.id) {
255            self.metadata.deleted_count = self.metadata.deleted_count.saturating_sub(1);
256        }
257
258        // Check if updating existing
259        if self.vectors.contains_key(&vector.id) {
260            // Update existing - size change
261            let old_size = self
262                .vectors
263                .get(&vector.id)
264                .map(estimate_vector_size)
265                .unwrap_or(0);
266            self.metadata.size_bytes = self.metadata.size_bytes.saturating_sub(old_size);
267        } else {
268            self.metadata.live_count += 1;
269        }
270
271        self.metadata.size_bytes += size;
272        self.metadata.updated_at = current_timestamp();
273        self.vectors.insert(vector.id.clone(), vector);
274    }
275
276    /// Delete a vector from the segment (mark as tombstone)
277    pub fn delete(&mut self, id: &VectorId) -> bool {
278        if let Some(vector) = self.vectors.remove(id) {
279            let size = estimate_vector_size(&vector);
280            self.metadata.live_count = self.metadata.live_count.saturating_sub(1);
281            self.metadata.deleted_count += 1;
282            self.metadata.size_bytes = self.metadata.size_bytes.saturating_sub(size);
283            self.metadata.updated_at = current_timestamp();
284            self.tombstones.insert(id.clone());
285            true
286        } else {
287            false
288        }
289    }
290
291    /// Get a vector by ID
292    pub fn get(&self, id: &VectorId) -> Option<&Vector> {
293        if self.tombstones.contains(id) {
294            None
295        } else {
296            self.vectors.get(id)
297        }
298    }
299
300    /// Check if a vector exists (and is not deleted)
301    pub fn contains(&self, id: &VectorId) -> bool {
302        !self.tombstones.contains(id) && self.vectors.contains_key(id)
303    }
304
305    /// Check if a vector is tombstoned
306    pub fn is_tombstoned(&self, id: &VectorId) -> bool {
307        self.tombstones.contains(id)
308    }
309
310    /// Get all live vectors
311    pub fn live_vectors(&self) -> impl Iterator<Item = &Vector> {
312        self.vectors
313            .values()
314            .filter(|v| !self.tombstones.contains(&v.id))
315    }
316
317    /// Get all vector IDs (including tombstoned)
318    pub fn all_ids(&self) -> impl Iterator<Item = &VectorId> {
319        self.vectors.keys()
320    }
321
322    /// Get tombstone IDs
323    pub fn tombstone_ids(&self) -> impl Iterator<Item = &VectorId> {
324        self.tombstones.iter()
325    }
326
327    /// Seal the segment (make read-only)
328    pub fn seal(&mut self) {
329        self.metadata.state = SegmentState::Sealed;
330        self.metadata.updated_at = current_timestamp();
331    }
332
333    /// Mark as compacting
334    pub fn mark_compacting(&mut self) {
335        self.metadata.state = SegmentState::Compacting;
336        self.metadata.updated_at = current_timestamp();
337    }
338
339    /// Mark for deletion
340    pub fn mark_tombstone(&mut self) {
341        self.metadata.state = SegmentState::Tombstone;
342        self.metadata.updated_at = current_timestamp();
343    }
344}
345
346// ============================================================================
347// Compaction Operations
348// ============================================================================
349
350/// Result of a compaction operation
351#[derive(Debug, Clone)]
352pub struct CompactionResult {
353    /// Segments that were merged
354    pub merged_segments: Vec<SegmentId>,
355    /// New segment created (if any)
356    pub new_segment: Option<SegmentId>,
357    /// Number of vectors compacted
358    pub vectors_compacted: usize,
359    /// Number of tombstones removed
360    pub tombstones_removed: usize,
361    /// Bytes reclaimed
362    pub bytes_reclaimed: usize,
363    /// Duration of compaction (milliseconds)
364    pub duration_ms: u64,
365}
366
367/// Compaction job status
368#[derive(Debug, Clone)]
369pub struct CompactionJob {
370    /// Job ID
371    pub id: String,
372    /// Namespace being compacted
373    pub namespace: NamespaceId,
374    /// Segments being merged
375    pub source_segments: Vec<SegmentId>,
376    /// Target segment ID
377    pub target_segment: SegmentId,
378    /// Job state
379    pub state: CompactionJobState,
380    /// Progress (0.0 - 1.0)
381    pub progress: f32,
382    /// Start time
383    pub started_at: u64,
384    /// Completion time (if finished)
385    pub completed_at: Option<u64>,
386    /// Error message (if failed)
387    pub error: Option<String>,
388}
389
390/// State of a compaction job
391#[derive(Debug, Clone, Copy, PartialEq, Eq)]
392pub enum CompactionJobState {
393    /// Job is queued
394    Pending,
395    /// Job is running
396    Running,
397    /// Job completed successfully
398    Completed,
399    /// Job failed
400    Failed,
401    /// Job was cancelled
402    Cancelled,
403}
404
405// ============================================================================
406// Namespace Segment Manager
407// ============================================================================
408
409/// Manages segments for a namespace
410pub struct NamespaceSegmentManager {
411    /// Namespace ID
412    namespace: NamespaceId,
413    /// Configuration
414    config: CompactionConfig,
415    /// Active segment (for writes)
416    active_segment: RwLock<Option<Segment>>,
417    /// Sealed segments
418    sealed_segments: RwLock<HashMap<SegmentId, Segment>>,
419    /// Vector ID to segment ID mapping
420    vector_index: RwLock<HashMap<VectorId, SegmentId>>,
421    /// Segment counter for ID generation
422    segment_counter: AtomicU64,
423    /// Statistics
424    stats: CompactionStats,
425}
426
427impl NamespaceSegmentManager {
428    /// Create a new namespace segment manager
429    pub fn new(namespace: NamespaceId, config: CompactionConfig) -> Self {
430        Self {
431            namespace,
432            config,
433            active_segment: RwLock::new(None),
434            sealed_segments: RwLock::new(HashMap::new()),
435            vector_index: RwLock::new(HashMap::new()),
436            segment_counter: AtomicU64::new(0),
437            stats: CompactionStats::default(),
438        }
439    }
440
441    /// Generate a new segment ID
442    fn generate_segment_id(&self) -> SegmentId {
443        let counter = self.segment_counter.fetch_add(1, Ordering::SeqCst);
444        format!("{}_{:016x}", self.namespace, counter)
445    }
446
447    /// Get or create the active segment
448    fn ensure_active_segment(&self) -> SegmentId {
449        let mut active = self.active_segment.write();
450        if active.is_none() {
451            let id = self.generate_segment_id();
452            *active = Some(Segment::new(id.clone(), self.namespace.clone()));
453            return id;
454        }
455
456        let segment = active.as_ref().unwrap();
457
458        // Check if we need to seal and create new segment
459        if segment.metadata.live_count >= self.config.target_segment_size {
460            // Seal current segment
461            let mut sealed = active.take().unwrap();
462            sealed.seal();
463            let sealed_id = sealed.metadata.id.clone();
464
465            // Move to sealed segments
466            self.sealed_segments.write().insert(sealed_id, sealed);
467
468            // Create new active segment
469            let id = self.generate_segment_id();
470            *active = Some(Segment::new(id.clone(), self.namespace.clone()));
471            return id;
472        }
473
474        segment.metadata.id.clone()
475    }
476
477    /// Add a vector
478    pub fn add(&self, vector: Vector) {
479        let segment_id = self.ensure_active_segment();
480
481        // Update vector index
482        self.vector_index
483            .write()
484            .insert(vector.id.clone(), segment_id.clone());
485
486        // Add to active segment
487        if let Some(ref mut segment) = *self.active_segment.write() {
488            segment.add(vector);
489        }
490
491        self.stats.vectors_written.fetch_add(1, Ordering::Relaxed);
492    }
493
494    /// Get a vector by ID
495    pub fn get(&self, id: &VectorId) -> Option<Vector> {
496        // First check vector index
497        let segment_id = self.vector_index.read().get(id)?.clone();
498
499        // Check active segment
500        if let Some(ref segment) = *self.active_segment.read() {
501            if segment.metadata.id == segment_id {
502                return segment.get(id).cloned();
503            }
504        }
505
506        // Check sealed segments
507        self.sealed_segments
508            .read()
509            .get(&segment_id)
510            .and_then(|s| s.get(id).cloned())
511    }
512
513    /// Delete a vector
514    pub fn delete(&self, id: &VectorId) -> bool {
515        // Find the segment
516        let segment_id = match self.vector_index.read().get(id) {
517            Some(id) => id.clone(),
518            None => return false,
519        };
520
521        // Try active segment
522        if let Some(ref mut segment) = *self.active_segment.write() {
523            if segment.metadata.id == segment_id && segment.delete(id) {
524                self.stats.vectors_deleted.fetch_add(1, Ordering::Relaxed);
525                return true;
526            }
527        }
528
529        // Try sealed segments
530        if let Some(segment) = self.sealed_segments.write().get_mut(&segment_id) {
531            if segment.delete(id) {
532                self.stats.vectors_deleted.fetch_add(1, Ordering::Relaxed);
533                return true;
534            }
535        }
536
537        false
538    }
539
540    /// Get all vectors
541    pub fn get_all(&self) -> Vec<Vector> {
542        let mut result = Vec::new();
543
544        // Active segment
545        if let Some(ref segment) = *self.active_segment.read() {
546            result.extend(segment.live_vectors().cloned());
547        }
548
549        // Sealed segments
550        for segment in self.sealed_segments.read().values() {
551            result.extend(segment.live_vectors().cloned());
552        }
553
554        result
555    }
556
557    /// Get segments that need compaction
558    pub fn segments_needing_compaction(&self) -> Vec<SegmentMetadata> {
559        let sealed = self.sealed_segments.read();
560        sealed
561            .values()
562            .filter(|s| {
563                s.metadata.state == SegmentState::Sealed
564                    && s.metadata.needs_compaction(self.config.garbage_threshold)
565            })
566            .map(|s| s.metadata.clone())
567            .collect()
568    }
569
570    /// Get small segments that could be merged
571    pub fn small_segments(&self) -> Vec<SegmentMetadata> {
572        let sealed = self.sealed_segments.read();
573        sealed
574            .values()
575            .filter(|s| {
576                s.metadata.state == SegmentState::Sealed
577                    && s.metadata.live_count < self.config.min_segment_size
578            })
579            .map(|s| s.metadata.clone())
580            .collect()
581    }
582
583    /// Perform compaction on specified segments
584    pub fn compact(&self, segment_ids: &[SegmentId]) -> Option<CompactionResult> {
585        if segment_ids.is_empty() {
586            return None;
587        }
588
589        let start = std::time::Instant::now();
590        let mut sealed = self.sealed_segments.write();
591        let mut vector_index = self.vector_index.write();
592
593        // Collect segments to merge
594        let mut segments_to_merge: Vec<Segment> = Vec::new();
595        for id in segment_ids {
596            if let Some(mut segment) = sealed.remove(id) {
597                segment.mark_compacting();
598                segments_to_merge.push(segment);
599            }
600        }
601
602        if segments_to_merge.is_empty() {
603            return None;
604        }
605
606        // Create new merged segment
607        let new_id = self.generate_segment_id();
608        let mut new_segment = Segment::new(new_id.clone(), self.namespace.clone());
609        new_segment.metadata.level = segments_to_merge
610            .iter()
611            .map(|s| s.metadata.level)
612            .max()
613            .unwrap_or(0)
614            + 1;
615
616        let mut vectors_compacted = 0;
617        let mut tombstones_removed = 0;
618        let mut bytes_reclaimed = 0;
619
620        // Merge all live vectors into new segment
621        for segment in &segments_to_merge {
622            bytes_reclaimed += segment.metadata.size_bytes;
623            tombstones_removed += segment.tombstones.len();
624
625            for vector in segment.live_vectors() {
626                new_segment.add(vector.clone());
627                vector_index.insert(vector.id.clone(), new_id.clone());
628                vectors_compacted += 1;
629            }
630
631            // Remove tombstones from vector index
632            for tombstone_id in segment.tombstone_ids() {
633                vector_index.remove(tombstone_id);
634            }
635        }
636
637        // Seal new segment if it's large enough
638        if new_segment.metadata.live_count >= self.config.min_segment_size {
639            new_segment.seal();
640        }
641
642        // Calculate actual bytes (subtract new segment size from reclaimed)
643        bytes_reclaimed = bytes_reclaimed.saturating_sub(new_segment.metadata.size_bytes);
644
645        // Store new segment
646        let merged_ids: Vec<_> = segments_to_merge
647            .iter()
648            .map(|s| s.metadata.id.clone())
649            .collect();
650        sealed.insert(new_id.clone(), new_segment);
651
652        // Update stats
653        self.stats
654            .compactions_completed
655            .fetch_add(1, Ordering::Relaxed);
656        self.stats
657            .bytes_reclaimed
658            .fetch_add(bytes_reclaimed as u64, Ordering::Relaxed);
659        self.stats
660            .tombstones_collected
661            .fetch_add(tombstones_removed as u64, Ordering::Relaxed);
662
663        Some(CompactionResult {
664            merged_segments: merged_ids,
665            new_segment: Some(new_id),
666            vectors_compacted,
667            tombstones_removed,
668            bytes_reclaimed,
669            duration_ms: start.elapsed().as_millis() as u64,
670        })
671    }
672
673    /// Run automatic compaction based on configuration
674    pub fn auto_compact(&self) -> Vec<CompactionResult> {
675        let mut results = Vec::new();
676
677        // First, compact segments with high garbage ratio
678        let garbage_segments = self.segments_needing_compaction();
679        for segment in garbage_segments {
680            if let Some(result) = self.compact(&[segment.id]) {
681                results.push(result);
682            }
683        }
684
685        // Then, merge small segments
686        let small_segs = self.small_segments();
687        if small_segs.len() >= 2 {
688            let ids: Vec<_> = small_segs
689                .into_iter()
690                .take(self.config.max_merge_segments)
691                .map(|s| s.id)
692                .collect();
693
694            if let Some(result) = self.compact(&ids) {
695                results.push(result);
696            }
697        }
698
699        results
700    }
701
702    /// Get statistics
703    pub fn stats(&self) -> SegmentStats {
704        let active_count = if self.active_segment.read().is_some() {
705            1
706        } else {
707            0
708        };
709        let sealed = self.sealed_segments.read();
710
711        let total_live: usize = sealed.values().map(|s| s.metadata.live_count).sum();
712        let total_deleted: usize = sealed.values().map(|s| s.metadata.deleted_count).sum();
713        let total_size: usize = sealed.values().map(|s| s.metadata.size_bytes).sum();
714
715        let active_live = self
716            .active_segment
717            .read()
718            .as_ref()
719            .map(|s| s.metadata.live_count)
720            .unwrap_or(0);
721        let active_size = self
722            .active_segment
723            .read()
724            .as_ref()
725            .map(|s| s.metadata.size_bytes)
726            .unwrap_or(0);
727
728        SegmentStats {
729            namespace: self.namespace.clone(),
730            active_segments: active_count,
731            sealed_segments: sealed.len(),
732            total_live_vectors: total_live + active_live,
733            total_deleted_vectors: total_deleted,
734            total_size_bytes: total_size + active_size,
735            average_segment_size: if !sealed.is_empty() {
736                total_live / sealed.len()
737            } else {
738                0
739            },
740            garbage_ratio: if total_live + total_deleted > 0 {
741                total_deleted as f32 / (total_live + total_deleted) as f32
742            } else {
743                0.0
744            },
745        }
746    }
747
748    /// Clear all segments
749    pub fn clear(&self) {
750        *self.active_segment.write() = None;
751        self.sealed_segments.write().clear();
752        self.vector_index.write().clear();
753    }
754}
755
756// ============================================================================
757// Compaction Manager
758// ============================================================================
759
760/// Global compaction manager for all namespaces
761pub struct CompactionManager {
762    /// Configuration
763    config: CompactionConfig,
764    /// Per-namespace segment managers
765    namespaces: RwLock<HashMap<NamespaceId, Arc<NamespaceSegmentManager>>>,
766    /// Global statistics
767    global_stats: CompactionStats,
768}
769
770impl CompactionManager {
771    /// Create a new compaction manager
772    pub fn new(config: CompactionConfig) -> Self {
773        Self {
774            config,
775            namespaces: RwLock::new(HashMap::new()),
776            global_stats: CompactionStats::default(),
777        }
778    }
779
780    /// Get or create namespace manager
781    pub fn namespace(&self, namespace: &NamespaceId) -> Arc<NamespaceSegmentManager> {
782        let mut namespaces = self.namespaces.write();
783
784        if let Some(manager) = namespaces.get(namespace) {
785            return Arc::clone(manager);
786        }
787
788        let manager = Arc::new(NamespaceSegmentManager::new(
789            namespace.clone(),
790            self.config.clone(),
791        ));
792        namespaces.insert(namespace.clone(), Arc::clone(&manager));
793        manager
794    }
795
796    /// Add a vector to a namespace
797    pub fn add(&self, namespace: &NamespaceId, vector: Vector) {
798        self.namespace(namespace).add(vector);
799    }
800
801    /// Get a vector from a namespace
802    pub fn get(&self, namespace: &NamespaceId, id: &VectorId) -> Option<Vector> {
803        self.namespaces.read().get(namespace)?.get(id)
804    }
805
806    /// Delete a vector from a namespace
807    pub fn delete(&self, namespace: &NamespaceId, id: &VectorId) -> bool {
808        match self.namespaces.read().get(namespace) {
809            Some(manager) => manager.delete(id),
810            None => false,
811        }
812    }
813
814    /// Get all vectors in a namespace
815    pub fn get_all(&self, namespace: &NamespaceId) -> Vec<Vector> {
816        match self.namespaces.read().get(namespace) {
817            Some(manager) => manager.get_all(),
818            None => Vec::new(),
819        }
820    }
821
822    /// Run compaction on a namespace
823    pub fn compact_namespace(&self, namespace: &NamespaceId) -> Vec<CompactionResult> {
824        match self.namespaces.read().get(namespace) {
825            Some(manager) => manager.auto_compact(),
826            None => Vec::new(),
827        }
828    }
829
830    /// Run compaction on all namespaces
831    pub fn compact_all(&self) -> HashMap<NamespaceId, Vec<CompactionResult>> {
832        let namespaces = self.namespaces.read();
833        let mut results = HashMap::new();
834
835        for (namespace, manager) in namespaces.iter() {
836            let namespace_results = manager.auto_compact();
837            if !namespace_results.is_empty() {
838                results.insert(namespace.clone(), namespace_results);
839            }
840        }
841
842        results
843    }
844
845    /// Get statistics for a namespace
846    pub fn namespace_stats(&self, namespace: &NamespaceId) -> Option<SegmentStats> {
847        self.namespaces.read().get(namespace).map(|m| m.stats())
848    }
849
850    /// Get global statistics
851    pub fn global_stats(&self) -> GlobalCompactionStats {
852        let namespaces = self.namespaces.read();
853
854        let mut total_live = 0usize;
855        let mut total_deleted = 0usize;
856        let mut total_size = 0usize;
857        let mut total_segments = 0usize;
858
859        for manager in namespaces.values() {
860            let stats = manager.stats();
861            total_live += stats.total_live_vectors;
862            total_deleted += stats.total_deleted_vectors;
863            total_size += stats.total_size_bytes;
864            total_segments += stats.active_segments + stats.sealed_segments;
865        }
866
867        GlobalCompactionStats {
868            total_namespaces: namespaces.len(),
869            total_segments,
870            total_live_vectors: total_live,
871            total_deleted_vectors: total_deleted,
872            total_size_bytes: total_size,
873            overall_garbage_ratio: if total_live + total_deleted > 0 {
874                total_deleted as f32 / (total_live + total_deleted) as f32
875            } else {
876                0.0
877            },
878            compactions_completed: self
879                .global_stats
880                .compactions_completed
881                .load(Ordering::Relaxed),
882            bytes_reclaimed: self.global_stats.bytes_reclaimed.load(Ordering::Relaxed),
883        }
884    }
885
886    /// Delete a namespace
887    pub fn delete_namespace(&self, namespace: &NamespaceId) -> bool {
888        self.namespaces.write().remove(namespace).is_some()
889    }
890
891    /// List all namespaces
892    pub fn list_namespaces(&self) -> Vec<NamespaceId> {
893        self.namespaces.read().keys().cloned().collect()
894    }
895}
896
897// ============================================================================
898// Statistics Types
899// ============================================================================
900
901/// Statistics for compaction operations (atomic)
902#[derive(Debug, Default)]
903pub struct CompactionStats {
904    pub vectors_written: AtomicU64,
905    pub vectors_deleted: AtomicU64,
906    pub compactions_completed: AtomicU64,
907    pub bytes_reclaimed: AtomicU64,
908    pub tombstones_collected: AtomicU64,
909}
910
911/// Statistics for a namespace's segments
912#[derive(Debug, Clone)]
913pub struct SegmentStats {
914    pub namespace: NamespaceId,
915    pub active_segments: usize,
916    pub sealed_segments: usize,
917    pub total_live_vectors: usize,
918    pub total_deleted_vectors: usize,
919    pub total_size_bytes: usize,
920    pub average_segment_size: usize,
921    pub garbage_ratio: f32,
922}
923
924/// Global compaction statistics
925#[derive(Debug, Clone)]
926pub struct GlobalCompactionStats {
927    pub total_namespaces: usize,
928    pub total_segments: usize,
929    pub total_live_vectors: usize,
930    pub total_deleted_vectors: usize,
931    pub total_size_bytes: usize,
932    pub overall_garbage_ratio: f32,
933    pub compactions_completed: u64,
934    pub bytes_reclaimed: u64,
935}
936
937// ============================================================================
938// Helper Functions
939// ============================================================================
940
941/// Get current timestamp
942fn current_timestamp() -> u64 {
943    std::time::SystemTime::now()
944        .duration_since(std::time::UNIX_EPOCH)
945        .unwrap_or_default()
946        .as_secs()
947}
948
949/// Estimate size of a vector in bytes
950fn estimate_vector_size(vector: &Vector) -> usize {
951    let values_size = vector.values.len() * 4;
952    let id_size = vector.id.len();
953    let metadata_size = vector
954        .metadata
955        .as_ref()
956        .map(|m| serde_json::to_string(m).map(|s| s.len()).unwrap_or(0))
957        .unwrap_or(0);
958
959    values_size + id_size + metadata_size + 32 // overhead for struct fields
960}
961
962// ============================================================================
963// Tests
964// ============================================================================
965
966#[cfg(test)]
967mod tests {
968    use super::*;
969
970    fn make_vector(id: &str, dim: usize) -> Vector {
971        Vector {
972            id: id.to_string(),
973            values: (0..dim).map(|i| i as f32).collect(),
974            metadata: None,
975            ttl_seconds: None,
976            expires_at: None,
977        }
978    }
979
980    #[test]
981    fn test_compaction_config_builder() {
982        let config = CompactionConfig::new()
983            .with_target_size(5000)
984            .with_min_size(500)
985            .with_garbage_threshold(0.4)
986            .without_auto_compact();
987
988        assert_eq!(config.target_segment_size, 5000);
989        assert_eq!(config.min_segment_size, 500);
990        assert!((config.garbage_threshold - 0.4).abs() < 0.001);
991        assert!(!config.auto_compact);
992    }
993
994    #[test]
995    fn test_segment_metadata() {
996        let mut meta = SegmentMetadata::new("seg1".to_string(), "ns1".to_string());
997
998        assert_eq!(meta.state, SegmentState::Active);
999        assert_eq!(meta.live_count, 0);
1000        assert!(meta.is_empty());
1001        assert!((meta.garbage_ratio() - 0.0).abs() < 0.001);
1002
1003        meta.live_count = 100;
1004        meta.deleted_count = 50;
1005
1006        assert!(!meta.is_empty());
1007        assert_eq!(meta.total_count(), 150);
1008        assert!((meta.garbage_ratio() - 0.333).abs() < 0.01);
1009        assert!(meta.needs_compaction(0.3));
1010        assert!(!meta.needs_compaction(0.5));
1011    }
1012
1013    #[test]
1014    fn test_segment_operations() {
1015        let mut segment = Segment::new("seg1".to_string(), "ns1".to_string());
1016
1017        // Add vectors
1018        segment.add(make_vector("v1", 128));
1019        segment.add(make_vector("v2", 128));
1020        segment.add(make_vector("v3", 128));
1021
1022        assert_eq!(segment.metadata.live_count, 3);
1023        assert!(segment.contains(&"v1".to_string()));
1024        assert!(segment.get(&"v1".to_string()).is_some());
1025
1026        // Delete a vector
1027        assert!(segment.delete(&"v2".to_string()));
1028        assert_eq!(segment.metadata.live_count, 2);
1029        assert_eq!(segment.metadata.deleted_count, 1);
1030        assert!(!segment.contains(&"v2".to_string()));
1031        assert!(segment.is_tombstoned(&"v2".to_string()));
1032
1033        // Live vectors
1034        let live: Vec<_> = segment.live_vectors().collect();
1035        assert_eq!(live.len(), 2);
1036    }
1037
1038    #[test]
1039    fn test_segment_seal() {
1040        let mut segment = Segment::new("seg1".to_string(), "ns1".to_string());
1041        segment.add(make_vector("v1", 128));
1042
1043        assert_eq!(segment.metadata.state, SegmentState::Active);
1044
1045        segment.seal();
1046        assert_eq!(segment.metadata.state, SegmentState::Sealed);
1047    }
1048
1049    #[test]
1050    fn test_namespace_segment_manager() {
1051        let config = CompactionConfig::new().with_target_size(100);
1052        let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1053
1054        // Add vectors
1055        for i in 0..50 {
1056            manager.add(make_vector(&format!("v{}", i), 128));
1057        }
1058
1059        assert_eq!(manager.get_all().len(), 50);
1060        assert!(manager.get(&"v10".to_string()).is_some());
1061
1062        // Delete some
1063        assert!(manager.delete(&"v5".to_string()));
1064        assert!(manager.delete(&"v15".to_string()));
1065
1066        assert_eq!(manager.get_all().len(), 48);
1067        assert!(manager.get(&"v5".to_string()).is_none());
1068    }
1069
1070    #[test]
1071    fn test_namespace_manager_auto_seal() {
1072        let config = CompactionConfig::new()
1073            .with_target_size(10)
1074            .with_min_size(5);
1075        let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1076
1077        // Add vectors to trigger segment seal
1078        for i in 0..25 {
1079            manager.add(make_vector(&format!("v{}", i), 64));
1080        }
1081
1082        let stats = manager.stats();
1083        // Should have created sealed segments
1084        assert!(stats.sealed_segments >= 1);
1085        assert_eq!(stats.total_live_vectors, 25);
1086    }
1087
1088    #[test]
1089    fn test_compaction() {
1090        let config = CompactionConfig::new()
1091            .with_target_size(10)
1092            .with_min_size(5)
1093            .with_garbage_threshold(0.2);
1094        let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1095
1096        // Add vectors
1097        for i in 0..30 {
1098            manager.add(make_vector(&format!("v{}", i), 64));
1099        }
1100
1101        // Delete some to create garbage
1102        for i in 0..10 {
1103            manager.delete(&format!("v{}", i));
1104        }
1105
1106        // Run compaction
1107        let _results = manager.auto_compact();
1108
1109        // Verify all remaining vectors are accessible
1110        assert_eq!(manager.get_all().len(), 20);
1111
1112        // Check stats
1113        let stats = manager.stats();
1114        assert_eq!(stats.total_live_vectors, 20);
1115    }
1116
1117    #[test]
1118    fn test_compaction_manager() {
1119        let config = CompactionConfig::new().with_target_size(50);
1120        let manager = CompactionManager::new(config);
1121
1122        // Add vectors to multiple namespaces
1123        for i in 0..20 {
1124            manager.add(&"ns1".to_string(), make_vector(&format!("v{}", i), 128));
1125            manager.add(&"ns2".to_string(), make_vector(&format!("v{}", i), 128));
1126        }
1127
1128        assert_eq!(manager.get_all(&"ns1".to_string()).len(), 20);
1129        assert_eq!(manager.get_all(&"ns2".to_string()).len(), 20);
1130        assert_eq!(manager.list_namespaces().len(), 2);
1131
1132        // Get vector
1133        assert!(manager.get(&"ns1".to_string(), &"v5".to_string()).is_some());
1134
1135        // Delete
1136        assert!(manager.delete(&"ns1".to_string(), &"v5".to_string()));
1137        assert!(manager.get(&"ns1".to_string(), &"v5".to_string()).is_none());
1138
1139        // Stats
1140        let global = manager.global_stats();
1141        assert_eq!(global.total_namespaces, 2);
1142        assert_eq!(global.total_live_vectors, 39); // 40 - 1 deleted
1143    }
1144
1145    #[test]
1146    fn test_compaction_result() {
1147        let config = CompactionConfig::new()
1148            .with_target_size(5)
1149            .with_min_size(2)
1150            .with_garbage_threshold(0.1);
1151        let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1152
1153        // Create segments with garbage
1154        for i in 0..15 {
1155            manager.add(make_vector(&format!("v{}", i), 32));
1156        }
1157
1158        // Delete to create garbage
1159        for i in 0..5 {
1160            manager.delete(&format!("v{}", i));
1161        }
1162
1163        // Run compaction
1164        let _results = manager.auto_compact();
1165
1166        // Verify data integrity
1167        assert_eq!(manager.get_all().len(), 10);
1168        for i in 5..15 {
1169            assert!(manager.get(&format!("v{}", i)).is_some());
1170        }
1171    }
1172
1173    #[test]
1174    fn test_segment_stats() {
1175        let config = CompactionConfig::new().with_target_size(20);
1176        let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1177
1178        for i in 0..30 {
1179            manager.add(make_vector(&format!("v{}", i), 64));
1180        }
1181
1182        // Delete some
1183        for i in 0..5 {
1184            manager.delete(&format!("v{}", i));
1185        }
1186
1187        let stats = manager.stats();
1188        assert_eq!(stats.namespace, "ns1");
1189        assert_eq!(stats.total_live_vectors, 25);
1190        assert_eq!(stats.total_deleted_vectors, 5);
1191        assert!(stats.garbage_ratio > 0.0);
1192    }
1193
1194    #[test]
1195    fn test_delete_namespace() {
1196        let config = CompactionConfig::new();
1197        let manager = CompactionManager::new(config);
1198
1199        manager.add(&"ns1".to_string(), make_vector("v1", 128));
1200        manager.add(&"ns2".to_string(), make_vector("v1", 128));
1201
1202        assert_eq!(manager.list_namespaces().len(), 2);
1203
1204        assert!(manager.delete_namespace(&"ns1".to_string()));
1205        assert_eq!(manager.list_namespaces().len(), 1);
1206
1207        assert!(!manager.delete_namespace(&"ns1".to_string())); // Already deleted
1208    }
1209
1210    #[test]
1211    fn test_small_segment_merge() {
1212        let config = CompactionConfig::new()
1213            .with_target_size(100)
1214            .with_min_size(20);
1215        let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1216
1217        // Create small segments by adding few vectors then forcing seal
1218        for i in 0..10 {
1219            manager.add(make_vector(&format!("v{}", i), 64));
1220        }
1221
1222        // Check for small segments
1223        let _small = manager.small_segments();
1224        // Might be empty since active segment isn't sealed yet
1225
1226        // The manager should still work correctly
1227        assert_eq!(manager.get_all().len(), 10);
1228    }
1229
1230    #[test]
1231    fn test_vector_update() {
1232        let config = CompactionConfig::new();
1233        let manager = NamespaceSegmentManager::new("ns1".to_string(), config);
1234
1235        // Add initial vector
1236        let v1 = Vector {
1237            id: "v1".to_string(),
1238            values: vec![1.0, 2.0, 3.0],
1239            metadata: None,
1240            ttl_seconds: None,
1241            expires_at: None,
1242        };
1243        manager.add(v1);
1244
1245        // Update with new values
1246        let v1_updated = Vector {
1247            id: "v1".to_string(),
1248            values: vec![4.0, 5.0, 6.0],
1249            metadata: None,
1250            ttl_seconds: None,
1251            expires_at: None,
1252        };
1253        manager.add(v1_updated);
1254
1255        // Should still have only one vector
1256        assert_eq!(manager.get_all().len(), 1);
1257
1258        // Should have updated values
1259        let retrieved = manager.get(&"v1".to_string()).unwrap();
1260        assert_eq!(retrieved.values, vec![4.0, 5.0, 6.0]);
1261    }
1262}