Skip to main content

oxirs_vec/
compression_io.rs

1//! Streaming compress/decompress, block I/O, AdaptiveCompressor, and factory.
2
3use super::compression_codecs::{
4    NoOpCompressor, PcaCompressor, ProductQuantizer, ScalarQuantizer, ZstdCompressor,
5};
6use super::compression_types::{
7    AdaptiveQuality, CompressionMethod, CompressionMetrics, VectorAnalysis, VectorCompressor,
8};
9use crate::{Vector, VectorError};
10
11// ─────────────────────────────────────────────────────────────────────────────
12// Method equivalence check
13// ─────────────────────────────────────────────────────────────────────────────
14
15pub(crate) fn methods_equivalent(method1: &CompressionMethod, method2: &CompressionMethod) -> bool {
16    match (method1, method2) {
17        (CompressionMethod::None, CompressionMethod::None) => true,
18        (CompressionMethod::Zstd { level: l1 }, CompressionMethod::Zstd { level: l2 }) => {
19            (l1 - l2).abs() <= 2
20        }
21        (
22            CompressionMethod::Quantization { bits: b1 },
23            CompressionMethod::Quantization { bits: b2 },
24        ) => b1 == b2,
25        (CompressionMethod::Pca { components: c1 }, CompressionMethod::Pca { components: c2 }) => {
26            ((*c1 as i32) - (*c2 as i32)).abs() <= (*c1 as i32) / 10
27        }
28        _ => false,
29    }
30}
31
32// ─────────────────────────────────────────────────────────────────────────────
33// Factory
34// ─────────────────────────────────────────────────────────────────────────────
35
36/// Create a boxed compressor from the given method description
37pub fn create_compressor(method: &CompressionMethod) -> Box<dyn VectorCompressor> {
38    match method {
39        CompressionMethod::None => Box::new(NoOpCompressor),
40        CompressionMethod::Zstd { level } => Box::new(ZstdCompressor::new(*level)),
41        CompressionMethod::Quantization { bits } => Box::new(ScalarQuantizer::new(*bits)),
42        CompressionMethod::Pca { components } => Box::new(PcaCompressor::new(*components)),
43        CompressionMethod::ProductQuantization {
44            subvectors,
45            codebook_size,
46        } => Box::new(ProductQuantizer::new(*subvectors, *codebook_size)),
47        CompressionMethod::Adaptive {
48            quality_level,
49            analysis_samples,
50        } => Box::new(AdaptiveCompressor::new(
51            quality_level.clone(),
52            *analysis_samples,
53        )),
54    }
55}
56
57// ─────────────────────────────────────────────────────────────────────────────
58// AdaptiveCompressor
59// ─────────────────────────────────────────────────────────────────────────────
60
61/// Adaptive compressor that automatically selects the best compression method
62pub struct AdaptiveCompressor {
63    pub(crate) quality_level: AdaptiveQuality,
64    pub(crate) analysis_samples: usize,
65    pub(crate) current_method: Option<Box<dyn VectorCompressor>>,
66    pub(crate) analysis_cache: Option<VectorAnalysis>,
67    pub(crate) performance_metrics: CompressionMetrics,
68}
69
70impl AdaptiveCompressor {
71    pub fn new(quality_level: AdaptiveQuality, analysis_samples: usize) -> Self {
72        Self {
73            quality_level,
74            analysis_samples: analysis_samples.max(10),
75            current_method: None,
76            analysis_cache: None,
77            performance_metrics: CompressionMetrics::default(),
78        }
79    }
80
81    pub fn with_fast_quality() -> Self {
82        Self::new(AdaptiveQuality::Fast, 50)
83    }
84
85    pub fn with_balanced_quality() -> Self {
86        Self::new(AdaptiveQuality::Balanced, 100)
87    }
88
89    pub fn with_best_ratio() -> Self {
90        Self::new(AdaptiveQuality::BestRatio, 200)
91    }
92
93    /// Analyze sample vectors and optimize compression method
94    pub fn optimize_for_vectors(&mut self, sample_vectors: &[Vector]) -> Result<(), VectorError> {
95        if sample_vectors.is_empty() {
96            return Ok(());
97        }
98
99        let start_time = std::time::Instant::now();
100
101        let samples_to_analyze = sample_vectors.len().min(self.analysis_samples);
102        let analysis_vectors = &sample_vectors[..samples_to_analyze];
103
104        let analysis = VectorAnalysis::analyze(analysis_vectors, &self.quality_level)?;
105
106        let should_switch = match (&self.current_method, &self.analysis_cache) {
107            (Some(_), Some(cached)) => {
108                !methods_equivalent(&cached.recommended_method, &analysis.recommended_method)
109            }
110            _ => true,
111        };
112
113        if should_switch {
114            self.current_method = Some(create_compressor(&analysis.recommended_method));
115            self.performance_metrics.method_switches += 1;
116        }
117
118        self.analysis_cache = Some(analysis);
119
120        let analysis_time = start_time.elapsed().as_secs_f64() * 1000.0;
121        tracing::debug!("Adaptive compression analysis took {:.2}ms", analysis_time);
122
123        Ok(())
124    }
125
126    pub fn get_metrics(&self) -> &CompressionMetrics {
127        &self.performance_metrics
128    }
129
130    pub fn get_analysis(&self) -> Option<&VectorAnalysis> {
131        self.analysis_cache.as_ref()
132    }
133
134    /// Adaptively re-analyze and potentially switch compression method
135    pub fn adaptive_reanalysis(&mut self, recent_vectors: &[Vector]) -> Result<bool, VectorError> {
136        if recent_vectors.len() < self.analysis_samples / 4 {
137            return Ok(false);
138        }
139
140        let old_method = self
141            .analysis_cache
142            .as_ref()
143            .map(|a| a.recommended_method.clone());
144
145        self.optimize_for_vectors(recent_vectors)?;
146
147        let method_changed = match (old_method, &self.analysis_cache) {
148            (Some(old), Some(new)) => !methods_equivalent(&old, &new.recommended_method),
149            _ => false,
150        };
151
152        Ok(method_changed)
153    }
154}
155
156impl VectorCompressor for AdaptiveCompressor {
157    fn compress(&self, vector: &Vector) -> Result<Vec<u8>, VectorError> {
158        if let Some(compressor) = &self.current_method {
159            let start = std::time::Instant::now();
160            let result = compressor.compress(vector);
161            let _compression_time = start.elapsed().as_secs_f64() * 1000.0;
162            result
163        } else {
164            let no_op = NoOpCompressor;
165            no_op.compress(vector)
166        }
167    }
168
169    fn decompress(&self, data: &[u8], dimensions: usize) -> Result<Vector, VectorError> {
170        if let Some(compressor) = &self.current_method {
171            let start = std::time::Instant::now();
172            let result = compressor.decompress(data, dimensions);
173            let _decompression_time = start.elapsed().as_secs_f64() * 1000.0;
174            result
175        } else {
176            let no_op = NoOpCompressor;
177            no_op.decompress(data, dimensions)
178        }
179    }
180
181    fn compression_ratio(&self) -> f32 {
182        if let Some(compressor) = &self.current_method {
183            compressor.compression_ratio()
184        } else {
185            1.0
186        }
187    }
188}