adaptive_pipeline_domain/services/compression_service.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Compression Service
9//!
10//! Domain service trait for data compression/decompression with support for
11//! multiple algorithms (Brotli, Gzip, Zstd, Lz4) and configurable levels.
12//! Provides chunk-by-chunk streaming for large files, algorithm selection,
13//! performance optimization, and benchmarking. Thread-safe, stateless
14//! operations. See mdBook for algorithm characteristics and usage examples.
15
16use crate::{FileChunk, PipelineError, ProcessingContext};
17
18// NOTE: Domain traits are synchronous. Async execution is an infrastructure
19// concern. Infrastructure can provide async adapters that wrap sync
20// implementations.
21
22/// Compression algorithms supported by the adaptive pipeline system
23///
24/// This enum provides type-safe selection of compression algorithms with
25/// different performance characteristics and use cases. Each algorithm offers
26/// different trade-offs between compression speed, compression ratio, and
27/// memory usage.
28///
29/// # Algorithm Characteristics
30///
31/// - **Brotli**: Best compression ratio, slower processing, higher memory usage
32/// - **Gzip**: Good balance of speed and compression, widely supported
33/// - **Zstd**: Modern algorithm with excellent speed/ratio balance
34/// - **Lz4**: Fastest compression, good for real-time processing
35/// - **Custom**: User-defined algorithms for specialized requirements
36///
37/// # Examples
38#[derive(Debug, Clone, PartialEq)]
39pub enum CompressionAlgorithm {
40 Brotli,
41 Gzip,
42 Zstd,
43 Lz4,
44 Custom(String),
45}
46
47/// Compression level settings that balance processing speed vs. compression
48/// ratio
49///
50/// This enum provides predefined compression levels optimized for different use
51/// cases, allowing users to choose the appropriate trade-off between
52/// compression speed and the resulting compression ratio.
53///
54/// # Level Characteristics
55///
56/// - **Fastest**: Minimal compression for maximum speed (level 1-2)
57/// - **Fast**: Light compression with good speed (level 3-4)
58/// - **Balanced**: Optimal balance of speed and compression (level 5-6)
59/// - **Best**: Maximum compression ratio, slower processing (level 9-11)
60/// - **Custom**: User-defined level for fine-tuned control
61///
62/// # Performance Impact
63///
64/// Higher compression levels generally result in:
65/// - Better compression ratios (smaller output)
66/// - Increased processing time
67/// - Higher memory usage
68/// - More CPU utilization
69///
70/// # Examples
71#[derive(Debug, Clone, Copy, PartialEq)]
72pub enum CompressionLevel {
73 /// Fastest compression with minimal ratio optimization
74 /// Suitable for real-time processing and streaming applications
75 Fastest,
76
77 /// Fast compression with light optimization
78 /// Good for interactive applications requiring quick response
79 Fast,
80
81 /// Balanced compression optimizing both speed and ratio
82 /// Recommended for most general-purpose applications
83 Balanced,
84
85 /// Best compression ratio with slower processing
86 /// Ideal for archival storage and bandwidth-limited scenarios
87 Best,
88
89 /// Custom compression level for fine-tuned control
90 /// Value interpretation depends on the specific algorithm
91 Custom(u32),
92}
93
94/// Compression configuration that encapsulates all parameters for compression
95/// operations
96///
97/// This configuration struct provides comprehensive control over compression
98/// behavior, allowing fine-tuning of compression parameters for optimal
99/// performance in different scenarios. The configuration is immutable and
100/// thread-safe.
101///
102/// # Configuration Parameters
103///
104/// - **Algorithm**: The compression algorithm to use
105/// - **Level**: Compression level balancing speed vs. ratio
106/// - **Dictionary**: Optional pre-trained dictionary for better compression
107/// - **Window Size**: Sliding window size for compression algorithms
108/// - **Parallel Processing**: Enable multi-threaded compression when supported
109///
110/// # Examples
111///
112///
113/// # Performance Considerations
114///
115/// - **Dictionary**: Pre-trained dictionaries can significantly improve
116/// compression ratios for similar data patterns but require additional memory
117/// - **Window Size**: Larger windows generally improve compression but use more
118/// memory
119/// - **Parallel Processing**: Can improve throughput on multi-core systems but
120/// may increase memory usage and complexity
121#[derive(Debug, Clone)]
122pub struct CompressionConfig {
123 /// The compression algorithm to use for processing
124 pub algorithm: CompressionAlgorithm,
125
126 /// Compression level balancing speed vs. compression ratio
127 pub level: CompressionLevel,
128
129 /// Optional pre-trained dictionary for improved compression of similar data
130 pub dictionary: Option<Vec<u8>>,
131
132 /// Optional sliding window size for compression algorithms
133 /// (algorithm-specific)
134 pub window_size: Option<u32>,
135
136 /// Enable parallel processing for supported algorithms
137 pub parallel_processing: bool,
138}
139
140/// Domain service interface for compression operations in the adaptive pipeline
141/// system
142///
143/// This trait defines the contract for compression services that handle data
144/// compression and decompression operations. Implementations provide
145/// algorithm-specific compression logic while maintaining consistent interfaces
146/// across different compression algorithms.
147///
148/// # Design Principles
149///
150/// - **Stateless Operations**: All methods are stateless and thread-safe
151/// - **Chunk-Based Processing**: Operates on file chunks for streaming support
152/// - **Configuration-Driven**: Behavior controlled through configuration
153/// objects
154/// - **Error Handling**: Comprehensive error reporting through `PipelineError`
155/// - **Context Integration**: Integrates with processing context for state
156/// management
157///
158/// # Implementation Requirements
159///
160/// Implementations must:
161/// - Be thread-safe (`Send + Sync`)
162/// - Handle all supported compression algorithms
163/// - Provide consistent error handling
164/// - Support streaming operations through chunk processing
165/// - Maintain compression metadata and statistics
166///
167/// # Usage Examples
168///
169/// # Architecture Note
170///
171/// This trait is **synchronous** following DDD principles. The domain layer
172/// defines *what* operations exist, not *how* they execute. Async execution
173/// is an infrastructure concern. Infrastructure adapters can wrap this trait
174/// to provide async interfaces when needed.
175///
176/// # Unified Stage Interface
177///
178/// This trait extends `StageService`, providing the unified `process_chunk()`
179/// method that all stages implement. The specialized `compress_chunk()` and
180/// `decompress_chunk()` methods are maintained for backward compatibility and
181/// internal use, but `process_chunk()` is the primary interface used by the
182/// pipeline system.
183pub trait CompressionService: super::stage_service::StageService {
184 /// Compresses a file chunk using the specified configuration
185 ///
186 /// This method compresses the data contained in a file chunk according to
187 /// the provided compression configuration. The operation is stateless
188 /// and can be called concurrently from multiple threads.
189 ///
190 /// # Parameters
191 ///
192 /// - `chunk`: The file chunk containing data to compress
193 /// - `config`: Compression configuration specifying algorithm and
194 /// parameters
195 /// - `context`: Processing context for state management and metadata
196 ///
197 /// # Returns
198 ///
199 /// Returns a new `FileChunk` containing the compressed data, or a
200 /// `PipelineError` if compression fails.
201 ///
202 /// # Errors
203 ///
204 /// - `CompressionError`: Algorithm-specific compression failures
205 /// - `ConfigurationError`: Invalid compression configuration
206 /// - `MemoryError`: Insufficient memory for compression operation
207 /// - `DataError`: Invalid or corrupted input data
208 ///
209 /// # Note on Async
210 ///
211 /// This method is synchronous in the domain. For async contexts,
212 /// use `AsyncCompressionAdapter` from the infrastructure layer.
213 fn compress_chunk(
214 &self,
215 chunk: FileChunk,
216 config: &CompressionConfig,
217 context: &mut ProcessingContext,
218 ) -> Result<FileChunk, PipelineError>;
219
220 /// Decompresses a file chunk using the specified configuration
221 ///
222 /// This method decompresses the data contained in a file chunk that was
223 /// previously compressed using a compatible compression algorithm. The
224 /// decompression parameters must match those used during compression.
225 ///
226 /// # Parameters
227 ///
228 /// - `chunk`: The file chunk containing compressed data to decompress
229 /// - `config`: Compression configuration specifying algorithm and
230 /// parameters
231 /// - `context`: Processing context for state management and metadata
232 ///
233 /// # Returns
234 ///
235 /// Returns a new `FileChunk` containing the decompressed data, or a
236 /// `PipelineError` if decompression fails.
237 ///
238 /// # Errors
239 ///
240 /// - `DecompressionError`: Algorithm-specific decompression failures
241 /// - `ConfigurationError`: Mismatched compression configuration
242 /// - `MemoryError`: Insufficient memory for decompression operation
243 /// - `DataCorruptionError`: Corrupted or invalid compressed data
244 ///
245 /// # Note on Async
246 ///
247 /// This method is synchronous in the domain. For async contexts,
248 /// use `AsyncCompressionAdapter` from the infrastructure layer.
249 fn decompress_chunk(
250 &self,
251 chunk: FileChunk,
252 config: &CompressionConfig,
253 context: &mut ProcessingContext,
254 ) -> Result<FileChunk, PipelineError>;
255
256 /// Estimates compression ratio for given data
257 ///
258 /// # Note
259 ///
260 /// Parallel processing of chunks is an infrastructure concern.
261 /// Use infrastructure adapters for batch/parallel operations.
262 fn estimate_compression_ratio(
263 &self,
264 data_sample: &[u8],
265 algorithm: &CompressionAlgorithm,
266 ) -> Result<f64, PipelineError>;
267
268 /// Gets optimal compression configuration for file type
269 ///
270 /// Analyzes file characteristics and recommends configuration.
271 fn get_optimal_config(
272 &self,
273 file_extension: &str,
274 data_sample: &[u8],
275 performance_priority: CompressionPriority,
276 ) -> Result<CompressionConfig, PipelineError>;
277
278 /// Validates compression configuration
279 ///
280 /// Checks if the configuration is valid and supported.
281 fn validate_config(&self, config: &CompressionConfig) -> Result<(), PipelineError>;
282
283 /// Gets supported algorithms
284 ///
285 /// Returns list of compression algorithms supported by this implementation.
286 fn supported_algorithms(&self) -> Vec<CompressionAlgorithm>;
287
288 /// Benchmarks compression performance
289 ///
290 /// Tests compression performance with sample data.
291 fn benchmark_algorithm(
292 &self,
293 algorithm: &CompressionAlgorithm,
294 test_data: &[u8],
295 ) -> Result<CompressionBenchmark, PipelineError>;
296}
297
298/// Compression priority for optimization
299#[derive(Debug, Clone, PartialEq)]
300pub enum CompressionPriority {
301 Speed,
302 Ratio,
303 Balanced,
304}
305
306/// Compression benchmark results
307#[derive(Debug, Clone)]
308pub struct CompressionBenchmark {
309 pub algorithm: CompressionAlgorithm,
310 pub compression_ratio: f64,
311 pub compression_speed_mbps: f64,
312 pub decompression_speed_mbps: f64,
313 pub memory_usage_mb: f64,
314 pub cpu_usage_percent: f64,
315}
316
317impl Default for CompressionBenchmark {
318 fn default() -> Self {
319 Self {
320 algorithm: CompressionAlgorithm::Brotli,
321 compression_ratio: 0.5,
322 compression_speed_mbps: 100.0,
323 decompression_speed_mbps: 200.0,
324 memory_usage_mb: 64.0,
325 cpu_usage_percent: 50.0,
326 }
327 }
328}
329
330impl Default for CompressionConfig {
331 fn default() -> Self {
332 Self {
333 algorithm: CompressionAlgorithm::Brotli,
334 level: CompressionLevel::Balanced,
335 dictionary: None,
336 window_size: None,
337 parallel_processing: true,
338 }
339 }
340}
341
342impl CompressionConfig {
343 /// Creates a new compression configuration
344 pub fn new(algorithm: CompressionAlgorithm) -> Self {
345 Self {
346 algorithm,
347 ..Default::default()
348 }
349 }
350
351 /// Sets compression level
352 pub fn with_level(mut self, level: CompressionLevel) -> Self {
353 self.level = level;
354 self
355 }
356
357 /// Sets dictionary
358 pub fn with_dictionary(mut self, dictionary: Vec<u8>) -> Self {
359 self.dictionary = Some(dictionary);
360 self
361 }
362
363 /// Sets window size
364 pub fn with_window_size(mut self, size: u32) -> Self {
365 self.window_size = Some(size);
366 self
367 }
368
369 /// Sets parallel processing
370 pub fn with_parallel_processing(mut self, enabled: bool) -> Self {
371 self.parallel_processing = enabled;
372 self
373 }
374
375 /// Creates a speed-optimized configuration
376 pub fn for_speed(algorithm: CompressionAlgorithm) -> Self {
377 Self {
378 algorithm,
379 level: CompressionLevel::Fastest,
380 dictionary: None,
381 window_size: None,
382 parallel_processing: true,
383 }
384 }
385
386 /// Creates a ratio-optimized configuration
387 pub fn for_ratio(algorithm: CompressionAlgorithm) -> Self {
388 Self {
389 algorithm,
390 level: CompressionLevel::Best,
391 dictionary: None,
392 window_size: None,
393 parallel_processing: false, // Better compression with single thread
394 }
395 }
396}
397
398/// Implementation of `FromParameters` for type-safe config extraction.
399///
400/// This implementation converts `StageConfiguration.parameters` HashMap
401/// into a typed `CompressionConfig` object.
402///
403/// ## Expected Parameters
404///
405/// - **algorithm** (required): Compression algorithm name
406/// - Valid values: "brotli", "gzip", "zstd", "lz4"
407/// - Example: `"algorithm" => "brotli"`
408///
409/// - **level** (optional): Compression level (1-19 depending on algorithm)
410/// - Default: 6 (balanced)
411/// - Example: `"level" => "9"`
412///
413/// - **parallel_processing** (optional): Enable parallel compression
414/// - Valid values: "true", "false"
415/// - Default: false
416/// - Example: `"parallel_processing" => "true"`
417///
418/// ## Usage Example
419///
420/// ```rust
421/// use adaptive_pipeline_domain::services::{CompressionConfig, FromParameters};
422/// use std::collections::HashMap;
423///
424/// let mut params = HashMap::new();
425/// params.insert("algorithm".to_string(), "brotli".to_string());
426/// params.insert("level".to_string(), "9".to_string());
427///
428/// let config = CompressionConfig::from_parameters(¶ms).unwrap();
429/// ```
430impl super::stage_service::FromParameters for CompressionConfig {
431 fn from_parameters(params: &std::collections::HashMap<String, String>) -> Result<Self, PipelineError> {
432 // Required: algorithm
433 let algorithm_str = params
434 .get("algorithm")
435 .ok_or_else(|| PipelineError::MissingParameter("algorithm".into()))?;
436
437 let algorithm = match algorithm_str.to_lowercase().as_str() {
438 "brotli" => CompressionAlgorithm::Brotli,
439 "gzip" => CompressionAlgorithm::Gzip,
440 "zstd" => CompressionAlgorithm::Zstd,
441 "lz4" => CompressionAlgorithm::Lz4,
442 other => CompressionAlgorithm::Custom(other.to_string()),
443 };
444
445 // Optional: level (default to balanced)
446 let level = params
447 .get("level")
448 .and_then(|s| s.parse::<u32>().ok())
449 .map(CompressionLevel::Custom)
450 .unwrap_or(CompressionLevel::Balanced);
451
452 // Optional: parallel_processing (default to false)
453 let parallel_processing = params
454 .get("parallel_processing")
455 .and_then(|s| s.parse::<bool>().ok())
456 .unwrap_or(false);
457
458 Ok(Self {
459 algorithm,
460 level,
461 dictionary: None, // Not supported via parameters yet
462 window_size: None, // Not supported via parameters yet
463 parallel_processing,
464 })
465 }
466}
467
468impl CompressionLevel {
469 /// Gets the numeric level for the compression algorithm
470 pub fn to_numeric(&self, algorithm: &CompressionAlgorithm) -> u32 {
471 match (self, algorithm) {
472 (CompressionLevel::Fastest, CompressionAlgorithm::Brotli) => 1,
473 (CompressionLevel::Fast, CompressionAlgorithm::Brotli) => 3,
474 (CompressionLevel::Balanced, CompressionAlgorithm::Brotli) => 6,
475 (CompressionLevel::Best, CompressionAlgorithm::Brotli) => 11,
476
477 (CompressionLevel::Fastest, CompressionAlgorithm::Gzip) => 1,
478 (CompressionLevel::Fast, CompressionAlgorithm::Gzip) => 3,
479 (CompressionLevel::Balanced, CompressionAlgorithm::Gzip) => 6,
480 (CompressionLevel::Best, CompressionAlgorithm::Gzip) => 9,
481
482 (CompressionLevel::Fastest, CompressionAlgorithm::Zstd) => 1,
483 (CompressionLevel::Fast, CompressionAlgorithm::Zstd) => 3,
484 (CompressionLevel::Balanced, CompressionAlgorithm::Zstd) => 9,
485 (CompressionLevel::Best, CompressionAlgorithm::Zstd) => 19,
486
487 (CompressionLevel::Custom(level), _) => *level,
488
489 _ => 6, // Default balanced level
490 }
491 }
492}
493
494impl std::fmt::Display for CompressionAlgorithm {
495 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
496 match self {
497 CompressionAlgorithm::Brotli => write!(f, "Brotli"),
498 CompressionAlgorithm::Gzip => write!(f, "Gzip"),
499 CompressionAlgorithm::Zstd => write!(f, "Zstd"),
500 CompressionAlgorithm::Lz4 => write!(f, "LZ4"),
501 CompressionAlgorithm::Custom(name) => write!(f, "Custom({})", name),
502 }
503 }
504}
505
506impl std::fmt::Display for CompressionLevel {
507 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
508 match self {
509 CompressionLevel::Fastest => write!(f, "Fastest"),
510 CompressionLevel::Fast => write!(f, "Fast"),
511 CompressionLevel::Balanced => write!(f, "Balanced"),
512 CompressionLevel::Best => write!(f, "Best"),
513 CompressionLevel::Custom(level) => write!(f, "Custom({})", level),
514 }
515 }
516}