oxirs_vec/
compression_io.rs1use super::compression_codecs::{
4 NoOpCompressor, PcaCompressor, ProductQuantizer, ScalarQuantizer, ZstdCompressor,
5};
6use super::compression_types::{
7 AdaptiveQuality, CompressionMethod, CompressionMetrics, VectorAnalysis, VectorCompressor,
8};
9use crate::{Vector, VectorError};
10
11pub(crate) fn methods_equivalent(method1: &CompressionMethod, method2: &CompressionMethod) -> bool {
16 match (method1, method2) {
17 (CompressionMethod::None, CompressionMethod::None) => true,
18 (CompressionMethod::Zstd { level: l1 }, CompressionMethod::Zstd { level: l2 }) => {
19 (l1 - l2).abs() <= 2
20 }
21 (
22 CompressionMethod::Quantization { bits: b1 },
23 CompressionMethod::Quantization { bits: b2 },
24 ) => b1 == b2,
25 (CompressionMethod::Pca { components: c1 }, CompressionMethod::Pca { components: c2 }) => {
26 ((*c1 as i32) - (*c2 as i32)).abs() <= (*c1 as i32) / 10
27 }
28 _ => false,
29 }
30}
31
32pub fn create_compressor(method: &CompressionMethod) -> Box<dyn VectorCompressor> {
38 match method {
39 CompressionMethod::None => Box::new(NoOpCompressor),
40 CompressionMethod::Zstd { level } => Box::new(ZstdCompressor::new(*level)),
41 CompressionMethod::Quantization { bits } => Box::new(ScalarQuantizer::new(*bits)),
42 CompressionMethod::Pca { components } => Box::new(PcaCompressor::new(*components)),
43 CompressionMethod::ProductQuantization {
44 subvectors,
45 codebook_size,
46 } => Box::new(ProductQuantizer::new(*subvectors, *codebook_size)),
47 CompressionMethod::Adaptive {
48 quality_level,
49 analysis_samples,
50 } => Box::new(AdaptiveCompressor::new(
51 quality_level.clone(),
52 *analysis_samples,
53 )),
54 }
55}
56
57pub struct AdaptiveCompressor {
63 pub(crate) quality_level: AdaptiveQuality,
64 pub(crate) analysis_samples: usize,
65 pub(crate) current_method: Option<Box<dyn VectorCompressor>>,
66 pub(crate) analysis_cache: Option<VectorAnalysis>,
67 pub(crate) performance_metrics: CompressionMetrics,
68}
69
70impl AdaptiveCompressor {
71 pub fn new(quality_level: AdaptiveQuality, analysis_samples: usize) -> Self {
72 Self {
73 quality_level,
74 analysis_samples: analysis_samples.max(10),
75 current_method: None,
76 analysis_cache: None,
77 performance_metrics: CompressionMetrics::default(),
78 }
79 }
80
81 pub fn with_fast_quality() -> Self {
82 Self::new(AdaptiveQuality::Fast, 50)
83 }
84
85 pub fn with_balanced_quality() -> Self {
86 Self::new(AdaptiveQuality::Balanced, 100)
87 }
88
89 pub fn with_best_ratio() -> Self {
90 Self::new(AdaptiveQuality::BestRatio, 200)
91 }
92
93 pub fn optimize_for_vectors(&mut self, sample_vectors: &[Vector]) -> Result<(), VectorError> {
95 if sample_vectors.is_empty() {
96 return Ok(());
97 }
98
99 let start_time = std::time::Instant::now();
100
101 let samples_to_analyze = sample_vectors.len().min(self.analysis_samples);
102 let analysis_vectors = &sample_vectors[..samples_to_analyze];
103
104 let analysis = VectorAnalysis::analyze(analysis_vectors, &self.quality_level)?;
105
106 let should_switch = match (&self.current_method, &self.analysis_cache) {
107 (Some(_), Some(cached)) => {
108 !methods_equivalent(&cached.recommended_method, &analysis.recommended_method)
109 }
110 _ => true,
111 };
112
113 if should_switch {
114 self.current_method = Some(create_compressor(&analysis.recommended_method));
115 self.performance_metrics.method_switches += 1;
116 }
117
118 self.analysis_cache = Some(analysis);
119
120 let analysis_time = start_time.elapsed().as_secs_f64() * 1000.0;
121 tracing::debug!("Adaptive compression analysis took {:.2}ms", analysis_time);
122
123 Ok(())
124 }
125
126 pub fn get_metrics(&self) -> &CompressionMetrics {
127 &self.performance_metrics
128 }
129
130 pub fn get_analysis(&self) -> Option<&VectorAnalysis> {
131 self.analysis_cache.as_ref()
132 }
133
134 pub fn adaptive_reanalysis(&mut self, recent_vectors: &[Vector]) -> Result<bool, VectorError> {
136 if recent_vectors.len() < self.analysis_samples / 4 {
137 return Ok(false);
138 }
139
140 let old_method = self
141 .analysis_cache
142 .as_ref()
143 .map(|a| a.recommended_method.clone());
144
145 self.optimize_for_vectors(recent_vectors)?;
146
147 let method_changed = match (old_method, &self.analysis_cache) {
148 (Some(old), Some(new)) => !methods_equivalent(&old, &new.recommended_method),
149 _ => false,
150 };
151
152 Ok(method_changed)
153 }
154}
155
156impl VectorCompressor for AdaptiveCompressor {
157 fn compress(&self, vector: &Vector) -> Result<Vec<u8>, VectorError> {
158 if let Some(compressor) = &self.current_method {
159 let start = std::time::Instant::now();
160 let result = compressor.compress(vector);
161 let _compression_time = start.elapsed().as_secs_f64() * 1000.0;
162 result
163 } else {
164 let no_op = NoOpCompressor;
165 no_op.compress(vector)
166 }
167 }
168
169 fn decompress(&self, data: &[u8], dimensions: usize) -> Result<Vector, VectorError> {
170 if let Some(compressor) = &self.current_method {
171 let start = std::time::Instant::now();
172 let result = compressor.decompress(data, dimensions);
173 let _decompression_time = start.elapsed().as_secs_f64() * 1000.0;
174 result
175 } else {
176 let no_op = NoOpCompressor;
177 no_op.decompress(data, dimensions)
178 }
179 }
180
181 fn compression_ratio(&self) -> f32 {
182 if let Some(compressor) = &self.current_method {
183 compressor.compression_ratio()
184 } else {
185 1.0
186 }
187 }
188}