Skip to main content

sochdb_vector/
segment_compaction.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Drift-Resilient Segment & Compaction Strategy (Task 9)
19//!
20//! Provides immutable segment management with:
21//! - Quantizer error tracking and threshold-based retraining
22//! - Segment lifecycle governance
23//! - Atomic version transitions
24//!
25//! ## Philosophy
26//!
27//! 1. Segments are immutable once written
28//! 2. Deletes accumulate in tombstone bitvec
29//! 3. Compaction merges small segments, removes tombstones
30//! 4. Quantizer is retrained when drift exceeds threshold
31//!
32//! ## Usage
33//!
34//! ```rust,ignore
35//! use sochdb_vector::segment_compaction::{SegmentManager, CompactionPolicy, Segment};
36//!
37//! let manager = SegmentManager::new(policy);
38//! manager.add_segment(segment);
39//! manager.maybe_compact();
40//! ```
41
42use std::collections::HashMap;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::time::{Duration, Instant, SystemTime};
45
46// ============================================================================
47// Segment Metadata
48// ============================================================================
49
50/// Unique segment identifier
51#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
52pub struct SegmentId(pub u64);
53
54impl SegmentId {
55    /// Generate next ID
56    pub fn next() -> Self {
57        static COUNTER: AtomicU64 = AtomicU64::new(1);
58        Self(COUNTER.fetch_add(1, Ordering::SeqCst))
59    }
60}
61
62/// Segment state
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum SegmentState {
65    /// Being built, not yet searchable
66    Building,
67    /// Active and searchable
68    Active,
69    /// Being compacted into new segment
70    Compacting,
71    /// Marked for deletion after compaction
72    Tombstoned,
73    /// Deleted, awaiting GC
74    Deleted,
75}
76
77/// Quantizer metadata
78#[derive(Debug, Clone)]
79pub struct QuantizerMeta {
80    /// Quantizer version
81    pub version: u32,
82    /// Number of training samples used
83    pub n_training_samples: usize,
84    /// Training error (MSE)
85    pub training_error: f32,
86    /// Current estimated error
87    pub current_error: f32,
88    /// Created timestamp
89    pub created_at: SystemTime,
90}
91
92impl Default for QuantizerMeta {
93    fn default() -> Self {
94        Self {
95            version: 1,
96            n_training_samples: 0,
97            training_error: 0.0,
98            current_error: 0.0,
99            created_at: SystemTime::now(),
100        }
101    }
102}
103
104/// Segment statistics for compaction decisions
105#[derive(Debug, Clone)]
106pub struct SegmentStats {
107    /// Total vectors in segment
108    pub n_vectors: usize,
109    /// Deleted vectors (tombstones)
110    pub n_deleted: usize,
111    /// Segment size in bytes
112    pub size_bytes: u64,
113    /// Created timestamp
114    pub created_at: SystemTime,
115    /// Last access timestamp
116    pub last_accessed: SystemTime,
117    /// Number of accesses
118    pub access_count: u64,
119    /// Quantizer metadata
120    pub quantizer_meta: QuantizerMeta,
121    /// Quantization error samples (for drift detection)
122    pub error_samples: Vec<f32>,
123}
124
125impl SegmentStats {
126    /// Create new stats
127    pub fn new(n_vectors: usize, size_bytes: u64) -> Self {
128        Self {
129            n_vectors,
130            n_deleted: 0,
131            size_bytes,
132            created_at: SystemTime::now(),
133            last_accessed: SystemTime::now(),
134            access_count: 0,
135            quantizer_meta: QuantizerMeta::default(),
136            error_samples: Vec::new(),
137        }
138    }
139
140    /// Get deletion ratio
141    pub fn deletion_ratio(&self) -> f32 {
142        if self.n_vectors == 0 {
143            0.0
144        } else {
145            self.n_deleted as f32 / self.n_vectors as f32
146        }
147    }
148
149    /// Get live vector count
150    pub fn live_vectors(&self) -> usize {
151        self.n_vectors.saturating_sub(self.n_deleted)
152    }
153
154    /// Record quantizer error sample
155    pub fn record_error(&mut self, error: f32) {
156        self.error_samples.push(error);
157        // Keep only recent samples
158        if self.error_samples.len() > 1000 {
159            self.error_samples.remove(0);
160        }
161    }
162
163    /// Get current estimated quantizer error
164    pub fn estimated_error(&self) -> f32 {
165        if self.error_samples.is_empty() {
166            self.quantizer_meta.current_error
167        } else {
168            let sum: f32 = self.error_samples.iter().sum();
169            sum / self.error_samples.len() as f32
170        }
171    }
172
173    /// Check if quantizer needs retraining
174    pub fn needs_retraining(&self, threshold: f32) -> bool {
175        let current = self.estimated_error();
176        let original = self.quantizer_meta.training_error;
177
178        if original == 0.0 {
179            false
180        } else {
181            (current - original) / original > threshold
182        }
183    }
184}
185
186// ============================================================================
187// Segment
188// ============================================================================
189
190/// Immutable segment
191#[derive(Debug, Clone)]
192pub struct Segment {
193    /// Segment ID
194    pub id: SegmentId,
195    /// Segment state
196    pub state: SegmentState,
197    /// Statistics
198    pub stats: SegmentStats,
199    /// Data file path
200    pub data_path: String,
201    /// Index file path
202    pub index_path: String,
203    /// Tombstone bitvec path
204    pub tombstone_path: String,
205    /// Segment generation (increments with each compaction)
206    pub generation: u32,
207}
208
209impl Segment {
210    /// Create new segment
211    pub fn new(id: SegmentId, n_vectors: usize, size_bytes: u64, data_path: String) -> Self {
212        Self {
213            id,
214            state: SegmentState::Building,
215            stats: SegmentStats::new(n_vectors, size_bytes),
216            data_path: data_path.clone(),
217            index_path: format!("{}.idx", data_path),
218            tombstone_path: format!("{}.tomb", data_path),
219            generation: 1,
220        }
221    }
222
223    /// Mark segment as active
224    pub fn activate(&mut self) {
225        self.state = SegmentState::Active;
226    }
227
228    /// Mark vector as deleted
229    pub fn mark_deleted(&mut self, count: usize) {
230        self.stats.n_deleted += count;
231    }
232
233    /// Record access
234    pub fn record_access(&mut self) {
235        self.stats.access_count += 1;
236        self.stats.last_accessed = SystemTime::now();
237    }
238}
239
240// ============================================================================
241// Compaction Policy
242// ============================================================================
243
244/// Compaction trigger conditions
245#[derive(Debug, Clone)]
246pub struct CompactionPolicy {
247    /// Minimum deletion ratio to trigger compaction
248    pub deletion_ratio_threshold: f32,
249
250    /// Maximum segment size before split
251    pub max_segment_size: u64,
252
253    /// Minimum segment size (below this, merge with others)
254    pub min_segment_size: u64,
255
256    /// Target segment size for new segments
257    pub target_segment_size: u64,
258
259    /// Maximum segments before forced compaction
260    pub max_segments: usize,
261
262    /// Quantizer error drift threshold for retraining
263    pub quantizer_drift_threshold: f32,
264
265    /// Minimum time between compactions
266    pub compaction_cooldown: Duration,
267
268    /// Maximum concurrent compaction threads
269    pub max_compaction_threads: usize,
270}
271
272impl Default for CompactionPolicy {
273    fn default() -> Self {
274        Self {
275            deletion_ratio_threshold: 0.3,
276            max_segment_size: 1024 * 1024 * 1024,   // 1 GB
277            min_segment_size: 64 * 1024 * 1024,     // 64 MB
278            target_segment_size: 256 * 1024 * 1024, // 256 MB
279            max_segments: 100,
280            quantizer_drift_threshold: 0.2, // 20% error increase triggers retraining
281            compaction_cooldown: Duration::from_secs(60),
282            max_compaction_threads: 2,
283        }
284    }
285}
286
287impl CompactionPolicy {
288    /// Create policy optimized for SSD
289    pub fn ssd_optimized() -> Self {
290        Self {
291            deletion_ratio_threshold: 0.25,         // More aggressive reclamation
292            target_segment_size: 512 * 1024 * 1024, // Larger segments
293            ..Default::default()
294        }
295    }
296
297    /// Create policy optimized for RAM
298    pub fn ram_optimized() -> Self {
299        Self {
300            deletion_ratio_threshold: 0.4,         // Less aggressive
301            target_segment_size: 64 * 1024 * 1024, // Smaller segments
302            max_segments: 50,                      // Fewer segments for faster search
303            ..Default::default()
304        }
305    }
306}
307
308// ============================================================================
309// Compaction Decision
310// ============================================================================
311
312/// Compaction decision for a set of segments
313#[derive(Debug)]
314pub enum CompactionDecision {
315    /// No compaction needed
316    None,
317    /// Merge segments into one
318    Merge(Vec<SegmentId>),
319    /// Split segment
320    Split(SegmentId),
321    /// Retrain quantizer for segments
322    Retrain(Vec<SegmentId>),
323    /// Full recompaction
324    FullRecompact(Vec<SegmentId>),
325}
326
327/// Compaction job
328#[derive(Debug)]
329pub struct CompactionJob {
330    /// Job ID
331    pub id: u64,
332    /// Decision
333    pub decision: CompactionDecision,
334    /// Source segments
335    pub source_segments: Vec<SegmentId>,
336    /// Created time
337    pub created_at: Instant,
338    /// Priority (lower = higher priority)
339    pub priority: u32,
340}
341
342// ============================================================================
343// Compaction Planner
344// ============================================================================
345
346/// Plans compaction jobs
347pub struct CompactionPlanner {
348    policy: CompactionPolicy,
349}
350
351impl CompactionPlanner {
352    /// Create new planner
353    pub fn new(policy: CompactionPolicy) -> Self {
354        Self { policy }
355    }
356
357    /// Analyze segments and decide on compaction
358    pub fn plan(&self, segments: &[&Segment]) -> Vec<CompactionDecision> {
359        let mut decisions = Vec::new();
360
361        // Check for high deletion ratio segments
362        let high_deletion: Vec<_> = segments
363            .iter()
364            .filter(|s| s.stats.deletion_ratio() > self.policy.deletion_ratio_threshold)
365            .map(|s| s.id)
366            .collect();
367
368        if !high_deletion.is_empty() {
369            decisions.push(CompactionDecision::Merge(high_deletion));
370        }
371
372        // Check for small segments to merge
373        let small_segments: Vec<_> = segments
374            .iter()
375            .filter(|s| s.stats.size_bytes < self.policy.min_segment_size)
376            .collect();
377
378        if small_segments.len() >= 2 {
379            // Group small segments for merging
380            let mut current_group: Vec<SegmentId> = Vec::new();
381            let mut current_size = 0u64;
382
383            for seg in small_segments {
384                if current_size + seg.stats.size_bytes <= self.policy.target_segment_size {
385                    current_group.push(seg.id);
386                    current_size += seg.stats.size_bytes;
387                } else {
388                    if current_group.len() >= 2 {
389                        decisions.push(CompactionDecision::Merge(current_group.clone()));
390                    }
391                    current_group.clear();
392                    current_group.push(seg.id);
393                    current_size = seg.stats.size_bytes;
394                }
395            }
396
397            if current_group.len() >= 2 {
398                decisions.push(CompactionDecision::Merge(current_group));
399            }
400        }
401
402        // Check for oversized segments
403        for seg in segments {
404            if seg.stats.size_bytes > self.policy.max_segment_size {
405                decisions.push(CompactionDecision::Split(seg.id));
406            }
407        }
408
409        // Check for quantizer drift
410        let drifted: Vec<_> = segments
411            .iter()
412            .filter(|s| {
413                s.stats
414                    .needs_retraining(self.policy.quantizer_drift_threshold)
415            })
416            .map(|s| s.id)
417            .collect();
418
419        if !drifted.is_empty() {
420            decisions.push(CompactionDecision::Retrain(drifted));
421        }
422
423        // Check if too many segments
424        if segments.len() > self.policy.max_segments {
425            // Aggressive merge of oldest/smallest segments
426            let mut sorted: Vec<_> = segments.iter().collect();
427            sorted.sort_by_key(|s| s.stats.live_vectors());
428
429            let to_merge: Vec<_> = sorted
430                .iter()
431                .take(segments.len() / 2)
432                .map(|s| s.id)
433                .collect();
434
435            if to_merge.len() >= 2 {
436                decisions.push(CompactionDecision::FullRecompact(to_merge));
437            }
438        }
439
440        decisions
441    }
442
443    /// Get policy
444    pub fn policy(&self) -> &CompactionPolicy {
445        &self.policy
446    }
447}
448
449// ============================================================================
450// Version Manager
451// ============================================================================
452
453/// Manages segment versions for atomic transitions
454pub struct VersionManager {
455    /// Current version
456    current_version: AtomicU64,
457    /// Version to segments mapping
458    versions: parking_lot::RwLock<HashMap<u64, Vec<SegmentId>>>,
459}
460
461impl VersionManager {
462    /// Create new version manager
463    pub fn new() -> Self {
464        Self {
465            current_version: AtomicU64::new(1),
466            versions: parking_lot::RwLock::new(HashMap::new()),
467        }
468    }
469
470    /// Get current version
471    pub fn current(&self) -> u64 {
472        self.current_version.load(Ordering::SeqCst)
473    }
474
475    /// Create new version with segments
476    pub fn create_version(&self, segments: Vec<SegmentId>) -> u64 {
477        let version = self.current_version.fetch_add(1, Ordering::SeqCst) + 1;
478        self.versions.write().insert(version, segments);
479        version
480    }
481
482    /// Switch to new version atomically
483    pub fn switch_to(&self, version: u64) -> bool {
484        let versions = self.versions.read();
485        if versions.contains_key(&version) {
486            self.current_version.store(version, Ordering::SeqCst);
487            true
488        } else {
489            false
490        }
491    }
492
493    /// Get segments for version
494    pub fn get_segments(&self, version: u64) -> Option<Vec<SegmentId>> {
495        self.versions.read().get(&version).cloned()
496    }
497
498    /// Rollback to previous version
499    pub fn rollback(&self) -> bool {
500        let current = self.current_version.load(Ordering::SeqCst);
501        if current > 1 {
502            self.current_version.store(current - 1, Ordering::SeqCst);
503            true
504        } else {
505            false
506        }
507    }
508
509    /// Clean old versions
510    pub fn clean_old_versions(&self, keep_n: usize) {
511        let current = self.current();
512        let mut versions = self.versions.write();
513
514        let to_remove: Vec<_> = versions
515            .keys()
516            .filter(|&&v| v + keep_n as u64 <= current)
517            .cloned()
518            .collect();
519
520        for v in to_remove {
521            versions.remove(&v);
522        }
523    }
524}
525
526impl Default for VersionManager {
527    fn default() -> Self {
528        Self::new()
529    }
530}
531
532// ============================================================================
533// Segment Manager
534// ============================================================================
535
536/// Manages segment lifecycle
537pub struct SegmentManager {
538    /// All segments
539    segments: parking_lot::RwLock<HashMap<SegmentId, Segment>>,
540    /// Compaction planner
541    planner: CompactionPlanner,
542    /// Version manager
543    versions: VersionManager,
544    /// Last compaction time
545    last_compaction: parking_lot::Mutex<Option<Instant>>,
546    /// Job counter
547    job_counter: AtomicU64,
548}
549
550impl SegmentManager {
551    /// Create new segment manager
552    pub fn new(policy: CompactionPolicy) -> Self {
553        Self {
554            segments: parking_lot::RwLock::new(HashMap::new()),
555            planner: CompactionPlanner::new(policy),
556            versions: VersionManager::new(),
557            last_compaction: parking_lot::Mutex::new(None),
558            job_counter: AtomicU64::new(0),
559        }
560    }
561
562    /// Add segment
563    pub fn add_segment(&self, segment: Segment) {
564        let id = segment.id;
565        self.segments.write().insert(id, segment);
566
567        // Update version
568        let current_segments: Vec<_> = self
569            .segments
570            .read()
571            .iter()
572            .filter(|(_, s)| s.state == SegmentState::Active)
573            .map(|(id, _)| *id)
574            .collect();
575
576        self.versions.create_version(current_segments);
577    }
578
579    /// Get segment
580    pub fn get_segment(&self, id: SegmentId) -> Option<Segment> {
581        self.segments.read().get(&id).cloned()
582    }
583
584    /// Mark vectors as deleted in segment
585    pub fn mark_deleted(&self, id: SegmentId, count: usize) {
586        if let Some(segment) = self.segments.write().get_mut(&id) {
587            segment.mark_deleted(count);
588        }
589    }
590
591    /// Record quantizer error for segment
592    pub fn record_quantizer_error(&self, id: SegmentId, error: f32) {
593        if let Some(segment) = self.segments.write().get_mut(&id) {
594            segment.stats.record_error(error);
595        }
596    }
597
598    /// Check if compaction is needed and return jobs
599    pub fn maybe_compact(&self) -> Vec<CompactionJob> {
600        // Check cooldown
601        let mut last = self.last_compaction.lock();
602        if let Some(last_time) = *last {
603            if last_time.elapsed() < self.planner.policy().compaction_cooldown {
604                return Vec::new();
605            }
606        }
607
608        // Get active segments
609        let segments = self.segments.read();
610        let active: Vec<_> = segments
611            .values()
612            .filter(|s| s.state == SegmentState::Active)
613            .collect();
614
615        let decisions = self.planner.plan(&active);
616
617        if !decisions.is_empty() {
618            *last = Some(Instant::now());
619        }
620
621        decisions
622            .into_iter()
623            .map(|d| {
624                let source_segments = match &d {
625                    CompactionDecision::None => Vec::new(),
626                    CompactionDecision::Merge(ids) => ids.clone(),
627                    CompactionDecision::Split(id) => vec![*id],
628                    CompactionDecision::Retrain(ids) => ids.clone(),
629                    CompactionDecision::FullRecompact(ids) => ids.clone(),
630                };
631
632                CompactionJob {
633                    id: self.job_counter.fetch_add(1, Ordering::SeqCst),
634                    decision: d,
635                    source_segments,
636                    created_at: Instant::now(),
637                    priority: 0,
638                }
639            })
640            .collect()
641    }
642
643    /// Execute compaction job
644    pub fn execute_compaction(
645        &self,
646        job: &CompactionJob,
647    ) -> Result<Option<Segment>, CompactionError> {
648        match &job.decision {
649            CompactionDecision::None => Ok(None),
650
651            CompactionDecision::Merge(ids) => {
652                // Mark source segments as compacting
653                {
654                    let mut segments = self.segments.write();
655                    for id in ids {
656                        if let Some(seg) = segments.get_mut(id) {
657                            seg.state = SegmentState::Compacting;
658                        }
659                    }
660                }
661
662                // Create merged segment (placeholder implementation)
663                let merged_id = SegmentId::next();
664                let segments = self.segments.read();
665
666                let total_size: u64 = ids
667                    .iter()
668                    .filter_map(|id| segments.get(id))
669                    .map(|s| s.stats.size_bytes)
670                    .sum();
671
672                let total_live: usize = ids
673                    .iter()
674                    .filter_map(|id| segments.get(id))
675                    .map(|s| s.stats.live_vectors())
676                    .sum();
677
678                let max_gen = ids
679                    .iter()
680                    .filter_map(|id| segments.get(id))
681                    .map(|s| s.generation)
682                    .max()
683                    .unwrap_or(0);
684
685                drop(segments);
686
687                let mut merged = Segment::new(
688                    merged_id,
689                    total_live,
690                    total_size,
691                    format!("/segments/{}", merged_id.0),
692                );
693                merged.generation = max_gen + 1;
694                merged.state = SegmentState::Active;
695
696                // Mark source segments as tombstoned
697                {
698                    let mut segments = self.segments.write();
699                    for id in ids {
700                        if let Some(seg) = segments.get_mut(id) {
701                            seg.state = SegmentState::Tombstoned;
702                        }
703                    }
704                }
705
706                self.add_segment(merged.clone());
707                Ok(Some(merged))
708            }
709
710            CompactionDecision::Split(id) => {
711                // Split implementation
712                let segment = self
713                    .get_segment(*id)
714                    .ok_or(CompactionError::SegmentNotFound(*id))?;
715
716                let half_size = segment.stats.size_bytes / 2;
717                let half_vectors = segment.stats.n_vectors / 2;
718
719                let seg1_id = SegmentId::next();
720                let seg2_id = SegmentId::next();
721
722                let mut seg1 = Segment::new(
723                    seg1_id,
724                    half_vectors,
725                    half_size,
726                    format!("/segments/{}", seg1_id.0),
727                );
728                seg1.generation = segment.generation + 1;
729                seg1.state = SegmentState::Active;
730
731                let mut seg2 = Segment::new(
732                    seg2_id,
733                    segment.stats.n_vectors - half_vectors,
734                    segment.stats.size_bytes - half_size,
735                    format!("/segments/{}", seg2_id.0),
736                );
737                seg2.generation = segment.generation + 1;
738                seg2.state = SegmentState::Active;
739
740                // Mark original as tombstoned
741                if let Some(seg) = self.segments.write().get_mut(id) {
742                    seg.state = SegmentState::Tombstoned;
743                }
744
745                self.add_segment(seg1);
746                self.add_segment(seg2.clone());
747
748                Ok(Some(seg2))
749            }
750
751            CompactionDecision::Retrain(_ids) => {
752                // Retraining would involve:
753                // 1. Sample vectors from segments
754                // 2. Train new quantizer
755                // 3. Re-encode vectors
756                // 4. Create new segments
757                // Placeholder for now
758                Ok(None)
759            }
760
761            CompactionDecision::FullRecompact(ids) => {
762                // Full recompaction with new quantizer
763                self.execute_compaction(&CompactionJob {
764                    id: job.id,
765                    decision: CompactionDecision::Merge(ids.clone()),
766                    source_segments: ids.clone(),
767                    created_at: job.created_at,
768                    priority: job.priority,
769                })
770            }
771        }
772    }
773
774    /// Clean tombstoned segments
775    pub fn clean_tombstones(&self) -> Vec<SegmentId> {
776        let mut segments = self.segments.write();
777        let tombstoned: Vec<_> = segments
778            .iter()
779            .filter(|(_, s)| s.state == SegmentState::Tombstoned)
780            .map(|(id, _)| *id)
781            .collect();
782
783        for id in &tombstoned {
784            if let Some(seg) = segments.get_mut(id) {
785                seg.state = SegmentState::Deleted;
786            }
787        }
788
789        tombstoned
790    }
791
792    /// Get statistics
793    pub fn stats(&self) -> ManagerStats {
794        let segments = self.segments.read();
795
796        let total_segments = segments.len();
797        let active_segments = segments
798            .values()
799            .filter(|s| s.state == SegmentState::Active)
800            .count();
801
802        let total_vectors: usize = segments
803            .values()
804            .filter(|s| s.state == SegmentState::Active)
805            .map(|s| s.stats.n_vectors)
806            .sum();
807
808        let total_deleted: usize = segments
809            .values()
810            .filter(|s| s.state == SegmentState::Active)
811            .map(|s| s.stats.n_deleted)
812            .sum();
813
814        let total_size: u64 = segments
815            .values()
816            .filter(|s| s.state == SegmentState::Active)
817            .map(|s| s.stats.size_bytes)
818            .sum();
819
820        let avg_deletion_ratio = if active_segments > 0 {
821            segments
822                .values()
823                .filter(|s| s.state == SegmentState::Active)
824                .map(|s| s.stats.deletion_ratio())
825                .sum::<f32>()
826                / active_segments as f32
827        } else {
828            0.0
829        };
830
831        ManagerStats {
832            total_segments,
833            active_segments,
834            total_vectors,
835            live_vectors: total_vectors - total_deleted,
836            deleted_vectors: total_deleted,
837            total_size_bytes: total_size,
838            avg_deletion_ratio,
839            current_version: self.versions.current(),
840        }
841    }
842
843    /// Get version manager
844    pub fn versions(&self) -> &VersionManager {
845        &self.versions
846    }
847}
848
849/// Manager statistics
850#[derive(Debug, Clone)]
851pub struct ManagerStats {
852    pub total_segments: usize,
853    pub active_segments: usize,
854    pub total_vectors: usize,
855    pub live_vectors: usize,
856    pub deleted_vectors: usize,
857    pub total_size_bytes: u64,
858    pub avg_deletion_ratio: f32,
859    pub current_version: u64,
860}
861
862/// Compaction error
863#[derive(Debug)]
864pub enum CompactionError {
865    SegmentNotFound(SegmentId),
866    IoError(std::io::Error),
867    InvalidState(String),
868}
869
870impl std::fmt::Display for CompactionError {
871    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
872        match self {
873            Self::SegmentNotFound(id) => write!(f, "Segment not found: {:?}", id),
874            Self::IoError(e) => write!(f, "IO error: {}", e),
875            Self::InvalidState(s) => write!(f, "Invalid state: {}", s),
876        }
877    }
878}
879
880impl std::error::Error for CompactionError {}
881
882#[cfg(test)]
883mod tests {
884    use super::*;
885
886    #[test]
887    fn test_segment_lifecycle() {
888        let mut segment = Segment::new(
889            SegmentId::next(),
890            1000,
891            1024 * 1024,
892            "/data/segment1".to_string(),
893        );
894
895        assert_eq!(segment.state, SegmentState::Building);
896
897        segment.activate();
898        assert_eq!(segment.state, SegmentState::Active);
899
900        segment.mark_deleted(100);
901        assert_eq!(segment.stats.n_deleted, 100);
902        assert_eq!(segment.stats.live_vectors(), 900);
903
904        let ratio = segment.stats.deletion_ratio();
905        assert!((ratio - 0.1).abs() < 0.001);
906    }
907
908    #[test]
909    fn test_compaction_planner() {
910        let policy = CompactionPolicy {
911            deletion_ratio_threshold: 0.3,
912            min_segment_size: 1024,
913            max_segment_size: 1024 * 1024,
914            ..Default::default()
915        };
916
917        let planner = CompactionPlanner::new(policy);
918
919        // Create segment with high deletion
920        let mut seg1 = Segment::new(SegmentId(1), 1000, 2048, "/seg1".to_string());
921        seg1.state = SegmentState::Active;
922        seg1.stats.n_deleted = 400; // 40% deleted
923
924        // Create small segments
925        let mut seg2 = Segment::new(SegmentId(2), 100, 512, "/seg2".to_string());
926        seg2.state = SegmentState::Active;
927
928        let mut seg3 = Segment::new(SegmentId(3), 100, 512, "/seg3".to_string());
929        seg3.state = SegmentState::Active;
930
931        let segments: Vec<&Segment> = vec![&seg1, &seg2, &seg3];
932        let decisions = planner.plan(&segments);
933
934        // Should recommend merging high-deletion and small segments
935        assert!(!decisions.is_empty());
936    }
937
938    #[test]
939    fn test_version_manager() {
940        let vm = VersionManager::new();
941
942        let v1 = vm.create_version(vec![SegmentId(1), SegmentId(2)]);
943        let v2 = vm.create_version(vec![SegmentId(1), SegmentId(2), SegmentId(3)]);
944
945        assert!(v2 > v1);
946
947        vm.switch_to(v2);
948        assert_eq!(vm.current(), v2);
949
950        vm.rollback();
951        assert_eq!(vm.current(), v2 - 1);
952
953        let segments = vm.get_segments(v1).unwrap();
954        assert_eq!(segments.len(), 2);
955    }
956
957    #[test]
958    fn test_segment_manager() {
959        let policy = CompactionPolicy::default();
960        let manager = SegmentManager::new(policy);
961
962        // Add segments
963        let mut seg1 = Segment::new(SegmentId::next(), 1000, 1024 * 1024, "/seg1".to_string());
964        seg1.state = SegmentState::Active;
965        manager.add_segment(seg1);
966
967        let mut seg2 = Segment::new(SegmentId::next(), 500, 512 * 1024, "/seg2".to_string());
968        seg2.state = SegmentState::Active;
969        manager.add_segment(seg2);
970
971        let stats = manager.stats();
972        assert_eq!(stats.active_segments, 2);
973        assert_eq!(stats.total_vectors, 1500);
974    }
975
976    #[test]
977    fn test_quantizer_drift() {
978        let mut stats = SegmentStats::new(1000, 1024);
979        stats.quantizer_meta.training_error = 0.1;
980
981        // No drift yet
982        assert!(!stats.needs_retraining(0.2));
983
984        // Add error samples showing drift
985        for _ in 0..100 {
986            stats.record_error(0.15); // 50% higher than training
987        }
988
989        assert!(stats.needs_retraining(0.2));
990    }
991}