adaptive_pipeline/infrastructure/adapters/
async_compression.rs1use adaptive_pipeline_domain::entities::ProcessingContext;
40use adaptive_pipeline_domain::services::compression_service::{
41 CompressionAlgorithm, CompressionBenchmark, CompressionConfig, CompressionPriority, CompressionService,
42};
43use adaptive_pipeline_domain::value_objects::FileChunk;
44use adaptive_pipeline_domain::PipelineError;
45use std::sync::Arc;
46
47pub struct AsyncCompressionAdapter<T: CompressionService + 'static> {
61 inner: Arc<T>,
62}
63
64impl<T: CompressionService + 'static> AsyncCompressionAdapter<T> {
65 pub fn new(service: Arc<T>) -> Self {
67 Self { inner: service }
68 }
69
70 pub async fn compress_chunk_async(
75 &self,
76 chunk: FileChunk,
77 config: &CompressionConfig,
78 context: &mut ProcessingContext,
79 ) -> Result<FileChunk, PipelineError> {
80 let service = self.inner.clone();
81 let config = config.clone();
82
83 let mut context_clone = context.clone();
86
87 tokio::task::spawn_blocking(move || service.compress_chunk(chunk, &config, &mut context_clone))
88 .await
89 .map_err(|e| PipelineError::InternalError(format!("Task join error: {}", e)))?
90 }
91
92 pub async fn decompress_chunk_async(
94 &self,
95 chunk: FileChunk,
96 config: &CompressionConfig,
97 context: &mut ProcessingContext,
98 ) -> Result<FileChunk, PipelineError> {
99 let service = self.inner.clone();
100 let config = config.clone();
101 let mut context_clone = context.clone();
102
103 tokio::task::spawn_blocking(move || service.decompress_chunk(chunk, &config, &mut context_clone))
104 .await
105 .map_err(|e| PipelineError::InternalError(format!("Task join error: {}", e)))?
106 }
107
108 pub async fn compress_chunks_parallel(
118 &self,
119 chunks: Vec<FileChunk>,
120 config: &CompressionConfig,
121 context: &mut ProcessingContext,
122 ) -> Result<Vec<FileChunk>, PipelineError> {
123 use crate::infrastructure::config::rayon_config::RAYON_POOLS;
124 use rayon::prelude::*;
125
126 let service = self.inner.clone();
127 let config = config.clone();
128 let context_clone = context.clone();
129
130 tokio::task::spawn_blocking(move || {
132 RAYON_POOLS.cpu_bound_pool().install(|| {
134 chunks
136 .into_par_iter()
137 .map(|chunk| {
138 let mut local_context = context_clone.clone();
139 service.compress_chunk(chunk, &config, &mut local_context)
140 })
141 .collect::<Result<Vec<_>, _>>()
142 })
143 })
144 .await
145 .map_err(|e| PipelineError::InternalError(format!("Task join error: {}", e)))?
146 }
147
148 pub fn estimate_compression_ratio(
150 &self,
151 data_sample: &[u8],
152 algorithm: &CompressionAlgorithm,
153 ) -> Result<f64, PipelineError> {
154 self.inner.estimate_compression_ratio(data_sample, algorithm)
155 }
156
157 pub fn get_optimal_config(
159 &self,
160 file_extension: &str,
161 data_sample: &[u8],
162 performance_priority: CompressionPriority,
163 ) -> Result<CompressionConfig, PipelineError> {
164 self.inner
165 .get_optimal_config(file_extension, data_sample, performance_priority)
166 }
167
168 pub fn validate_config(&self, config: &CompressionConfig) -> Result<(), PipelineError> {
170 self.inner.validate_config(config)
171 }
172
173 pub fn supported_algorithms(&self) -> Vec<CompressionAlgorithm> {
175 self.inner.supported_algorithms()
176 }
177
178 pub async fn benchmark_algorithm_async(
180 &self,
181 algorithm: &CompressionAlgorithm,
182 test_data: &[u8],
183 ) -> Result<CompressionBenchmark, PipelineError> {
184 let service = self.inner.clone();
185 let algorithm = algorithm.clone();
186 let test_data = test_data.to_vec();
187
188 tokio::task::spawn_blocking(move || service.benchmark_algorithm(&algorithm, &test_data))
189 .await
190 .map_err(|e| PipelineError::InternalError(format!("Task join error: {}", e)))?
191 }
192}
193
194impl<T: CompressionService + 'static> Clone for AsyncCompressionAdapter<T> {
195 fn clone(&self) -> Self {
196 Self {
197 inner: self.inner.clone(),
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 struct FakeCompressionService;
208
209 impl CompressionService for FakeCompressionService {
210 fn compress_chunk(
211 &self,
212 chunk: FileChunk,
213 _config: &CompressionConfig,
214 _context: &mut ProcessingContext,
215 ) -> Result<FileChunk, PipelineError> {
216 Ok(chunk) }
218
219 fn decompress_chunk(
220 &self,
221 chunk: FileChunk,
222 _config: &CompressionConfig,
223 _context: &mut ProcessingContext,
224 ) -> Result<FileChunk, PipelineError> {
225 Ok(chunk) }
227
228 fn estimate_compression_ratio(
229 &self,
230 _data_sample: &[u8],
231 _algorithm: &CompressionAlgorithm,
232 ) -> Result<f64, PipelineError> {
233 Ok(0.5) }
235
236 fn get_optimal_config(
237 &self,
238 _file_extension: &str,
239 _data_sample: &[u8],
240 _performance_priority: CompressionPriority,
241 ) -> Result<CompressionConfig, PipelineError> {
242 Ok(CompressionConfig::default())
243 }
244
245 fn validate_config(&self, _config: &CompressionConfig) -> Result<(), PipelineError> {
246 Ok(())
247 }
248
249 fn supported_algorithms(&self) -> Vec<CompressionAlgorithm> {
250 vec![CompressionAlgorithm::Gzip]
251 }
252
253 fn benchmark_algorithm(
254 &self,
255 _algorithm: &CompressionAlgorithm,
256 _test_data: &[u8],
257 ) -> Result<CompressionBenchmark, PipelineError> {
258 Ok(CompressionBenchmark::default())
259 }
260 }
261
262 impl adaptive_pipeline_domain::services::StageService for FakeCompressionService {
263 fn process_chunk(
264 &self,
265 chunk: FileChunk,
266 config: &adaptive_pipeline_domain::entities::pipeline_stage::StageConfiguration,
267 context: &mut ProcessingContext,
268 ) -> Result<FileChunk, PipelineError> {
269 use adaptive_pipeline_domain::services::FromParameters;
270 let compression_config = CompressionConfig::from_parameters(&config.parameters)?;
271 match config.operation {
272 adaptive_pipeline_domain::entities::Operation::Forward => {
273 self.compress_chunk(chunk, &compression_config, context)
274 }
275 adaptive_pipeline_domain::entities::Operation::Reverse => {
276 self.decompress_chunk(chunk, &compression_config, context)
277 }
278 }
279 }
280
281 fn position(&self) -> adaptive_pipeline_domain::entities::StagePosition {
282 adaptive_pipeline_domain::entities::StagePosition::PreBinary
283 }
284
285 fn is_reversible(&self) -> bool {
286 true
287 }
288
289 fn stage_type(&self) -> adaptive_pipeline_domain::entities::StageType {
290 adaptive_pipeline_domain::entities::StageType::Compression
291 }
292 }
293
294 #[tokio::test]
295 async fn test_async_adapter_pattern() {
296 let sync_service = Arc::new(FakeCompressionService);
297 let async_adapter = AsyncCompressionAdapter::new(sync_service);
298
299 let algorithms = async_adapter.supported_algorithms();
301 assert_eq!(algorithms.len(), 1);
302 }
303}