adaptive_pipeline_domain/services/
compression_service.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Compression Service
9//!
10//! Domain service trait for data compression/decompression with support for
11//! multiple algorithms (Brotli, Gzip, Zstd, Lz4) and configurable levels.
12//! Provides chunk-by-chunk streaming for large files, algorithm selection,
13//! performance optimization, and benchmarking. Thread-safe, stateless
14//! operations. See mdBook for algorithm characteristics and usage examples.
15
16use crate::{FileChunk, PipelineError, ProcessingContext};
17
18// NOTE: Domain traits are synchronous. Async execution is an infrastructure
19// concern. Infrastructure can provide async adapters that wrap sync
20// implementations.
21
22/// Compression algorithms supported by the adaptive pipeline system
23///
24/// This enum provides type-safe selection of compression algorithms with
25/// different performance characteristics and use cases. Each algorithm offers
26/// different trade-offs between compression speed, compression ratio, and
27/// memory usage.
28///
29/// # Algorithm Characteristics
30///
31/// - **Brotli**: Best compression ratio, slower processing, higher memory usage
32/// - **Gzip**: Good balance of speed and compression, widely supported
33/// - **Zstd**: Modern algorithm with excellent speed/ratio balance
34/// - **Lz4**: Fastest compression, good for real-time processing
35/// - **Custom**: User-defined algorithms for specialized requirements
36///
37/// # Examples
38#[derive(Debug, Clone, PartialEq)]
39pub enum CompressionAlgorithm {
40    Brotli,
41    Gzip,
42    Zstd,
43    Lz4,
44    Custom(String),
45}
46
47/// Compression level settings that balance processing speed vs. compression
48/// ratio
49///
50/// This enum provides predefined compression levels optimized for different use
51/// cases, allowing users to choose the appropriate trade-off between
52/// compression speed and the resulting compression ratio.
53///
54/// # Level Characteristics
55///
56/// - **Fastest**: Minimal compression for maximum speed (level 1-2)
57/// - **Fast**: Light compression with good speed (level 3-4)
58/// - **Balanced**: Optimal balance of speed and compression (level 5-6)
59/// - **Best**: Maximum compression ratio, slower processing (level 9-11)
60/// - **Custom**: User-defined level for fine-tuned control
61///
62/// # Performance Impact
63///
64/// Higher compression levels generally result in:
65/// - Better compression ratios (smaller output)
66/// - Increased processing time
67/// - Higher memory usage
68/// - More CPU utilization
69///
70/// # Examples
71#[derive(Debug, Clone, Copy, PartialEq)]
72pub enum CompressionLevel {
73    /// Fastest compression with minimal ratio optimization
74    /// Suitable for real-time processing and streaming applications
75    Fastest,
76
77    /// Fast compression with light optimization
78    /// Good for interactive applications requiring quick response
79    Fast,
80
81    /// Balanced compression optimizing both speed and ratio
82    /// Recommended for most general-purpose applications
83    Balanced,
84
85    /// Best compression ratio with slower processing
86    /// Ideal for archival storage and bandwidth-limited scenarios
87    Best,
88
89    /// Custom compression level for fine-tuned control
90    /// Value interpretation depends on the specific algorithm
91    Custom(u32),
92}
93
94/// Compression configuration that encapsulates all parameters for compression
95/// operations
96///
97/// This configuration struct provides comprehensive control over compression
98/// behavior, allowing fine-tuning of compression parameters for optimal
99/// performance in different scenarios. The configuration is immutable and
100/// thread-safe.
101///
102/// # Configuration Parameters
103///
104/// - **Algorithm**: The compression algorithm to use
105/// - **Level**: Compression level balancing speed vs. ratio
106/// - **Dictionary**: Optional pre-trained dictionary for better compression
107/// - **Window Size**: Sliding window size for compression algorithms
108/// - **Parallel Processing**: Enable multi-threaded compression when supported
109///
110/// # Examples
111///
112///
113/// # Performance Considerations
114///
115/// - **Dictionary**: Pre-trained dictionaries can significantly improve
116///   compression ratios for similar data patterns but require additional memory
117/// - **Window Size**: Larger windows generally improve compression but use more
118///   memory
119/// - **Parallel Processing**: Can improve throughput on multi-core systems but
120///   may increase memory usage and complexity
121#[derive(Debug, Clone)]
122pub struct CompressionConfig {
123    /// The compression algorithm to use for processing
124    pub algorithm: CompressionAlgorithm,
125
126    /// Compression level balancing speed vs. compression ratio
127    pub level: CompressionLevel,
128
129    /// Optional pre-trained dictionary for improved compression of similar data
130    pub dictionary: Option<Vec<u8>>,
131
132    /// Optional sliding window size for compression algorithms
133    /// (algorithm-specific)
134    pub window_size: Option<u32>,
135
136    /// Enable parallel processing for supported algorithms
137    pub parallel_processing: bool,
138}
139
140/// Domain service interface for compression operations in the adaptive pipeline
141/// system
142///
143/// This trait defines the contract for compression services that handle data
144/// compression and decompression operations. Implementations provide
145/// algorithm-specific compression logic while maintaining consistent interfaces
146/// across different compression algorithms.
147///
148/// # Design Principles
149///
150/// - **Stateless Operations**: All methods are stateless and thread-safe
151/// - **Chunk-Based Processing**: Operates on file chunks for streaming support
152/// - **Configuration-Driven**: Behavior controlled through configuration
153///   objects
154/// - **Error Handling**: Comprehensive error reporting through `PipelineError`
155/// - **Context Integration**: Integrates with processing context for state
156///   management
157///
158/// # Implementation Requirements
159///
160/// Implementations must:
161/// - Be thread-safe (`Send + Sync`)
162/// - Handle all supported compression algorithms
163/// - Provide consistent error handling
164/// - Support streaming operations through chunk processing
165/// - Maintain compression metadata and statistics
166///
167/// # Usage Examples
168///
169/// # Architecture Note
170///
171/// This trait is **synchronous** following DDD principles. The domain layer
172/// defines *what* operations exist, not *how* they execute. Async execution
173/// is an infrastructure concern. Infrastructure adapters can wrap this trait
174/// to provide async interfaces when needed.
175///
176/// # Unified Stage Interface
177///
178/// This trait extends `StageService`, providing the unified `process_chunk()`
179/// method that all stages implement. The specialized `compress_chunk()` and
180/// `decompress_chunk()` methods are maintained for backward compatibility and
181/// internal use, but `process_chunk()` is the primary interface used by the
182/// pipeline system.
183pub trait CompressionService: super::stage_service::StageService {
184    /// Compresses a file chunk using the specified configuration
185    ///
186    /// This method compresses the data contained in a file chunk according to
187    /// the provided compression configuration. The operation is stateless
188    /// and can be called concurrently from multiple threads.
189    ///
190    /// # Parameters
191    ///
192    /// - `chunk`: The file chunk containing data to compress
193    /// - `config`: Compression configuration specifying algorithm and
194    ///   parameters
195    /// - `context`: Processing context for state management and metadata
196    ///
197    /// # Returns
198    ///
199    /// Returns a new `FileChunk` containing the compressed data, or a
200    /// `PipelineError` if compression fails.
201    ///
202    /// # Errors
203    ///
204    /// - `CompressionError`: Algorithm-specific compression failures
205    /// - `ConfigurationError`: Invalid compression configuration
206    /// - `MemoryError`: Insufficient memory for compression operation
207    /// - `DataError`: Invalid or corrupted input data
208    ///
209    /// # Note on Async
210    ///
211    /// This method is synchronous in the domain. For async contexts,
212    /// use `AsyncCompressionAdapter` from the infrastructure layer.
213    fn compress_chunk(
214        &self,
215        chunk: FileChunk,
216        config: &CompressionConfig,
217        context: &mut ProcessingContext,
218    ) -> Result<FileChunk, PipelineError>;
219
220    /// Decompresses a file chunk using the specified configuration
221    ///
222    /// This method decompresses the data contained in a file chunk that was
223    /// previously compressed using a compatible compression algorithm. The
224    /// decompression parameters must match those used during compression.
225    ///
226    /// # Parameters
227    ///
228    /// - `chunk`: The file chunk containing compressed data to decompress
229    /// - `config`: Compression configuration specifying algorithm and
230    ///   parameters
231    /// - `context`: Processing context for state management and metadata
232    ///
233    /// # Returns
234    ///
235    /// Returns a new `FileChunk` containing the decompressed data, or a
236    /// `PipelineError` if decompression fails.
237    ///
238    /// # Errors
239    ///
240    /// - `DecompressionError`: Algorithm-specific decompression failures
241    /// - `ConfigurationError`: Mismatched compression configuration
242    /// - `MemoryError`: Insufficient memory for decompression operation
243    /// - `DataCorruptionError`: Corrupted or invalid compressed data
244    ///
245    /// # Note on Async
246    ///
247    /// This method is synchronous in the domain. For async contexts,
248    /// use `AsyncCompressionAdapter` from the infrastructure layer.
249    fn decompress_chunk(
250        &self,
251        chunk: FileChunk,
252        config: &CompressionConfig,
253        context: &mut ProcessingContext,
254    ) -> Result<FileChunk, PipelineError>;
255
256    /// Estimates compression ratio for given data
257    ///
258    /// # Note
259    ///
260    /// Parallel processing of chunks is an infrastructure concern.
261    /// Use infrastructure adapters for batch/parallel operations.
262    fn estimate_compression_ratio(
263        &self,
264        data_sample: &[u8],
265        algorithm: &CompressionAlgorithm,
266    ) -> Result<f64, PipelineError>;
267
268    /// Gets optimal compression configuration for file type
269    ///
270    /// Analyzes file characteristics and recommends configuration.
271    fn get_optimal_config(
272        &self,
273        file_extension: &str,
274        data_sample: &[u8],
275        performance_priority: CompressionPriority,
276    ) -> Result<CompressionConfig, PipelineError>;
277
278    /// Validates compression configuration
279    ///
280    /// Checks if the configuration is valid and supported.
281    fn validate_config(&self, config: &CompressionConfig) -> Result<(), PipelineError>;
282
283    /// Gets supported algorithms
284    ///
285    /// Returns list of compression algorithms supported by this implementation.
286    fn supported_algorithms(&self) -> Vec<CompressionAlgorithm>;
287
288    /// Benchmarks compression performance
289    ///
290    /// Tests compression performance with sample data.
291    fn benchmark_algorithm(
292        &self,
293        algorithm: &CompressionAlgorithm,
294        test_data: &[u8],
295    ) -> Result<CompressionBenchmark, PipelineError>;
296}
297
298/// Compression priority for optimization
299#[derive(Debug, Clone, PartialEq)]
300pub enum CompressionPriority {
301    Speed,
302    Ratio,
303    Balanced,
304}
305
306/// Compression benchmark results
307#[derive(Debug, Clone)]
308pub struct CompressionBenchmark {
309    pub algorithm: CompressionAlgorithm,
310    pub compression_ratio: f64,
311    pub compression_speed_mbps: f64,
312    pub decompression_speed_mbps: f64,
313    pub memory_usage_mb: f64,
314    pub cpu_usage_percent: f64,
315}
316
317impl Default for CompressionBenchmark {
318    fn default() -> Self {
319        Self {
320            algorithm: CompressionAlgorithm::Brotli,
321            compression_ratio: 0.5,
322            compression_speed_mbps: 100.0,
323            decompression_speed_mbps: 200.0,
324            memory_usage_mb: 64.0,
325            cpu_usage_percent: 50.0,
326        }
327    }
328}
329
330impl Default for CompressionConfig {
331    fn default() -> Self {
332        Self {
333            algorithm: CompressionAlgorithm::Brotli,
334            level: CompressionLevel::Balanced,
335            dictionary: None,
336            window_size: None,
337            parallel_processing: true,
338        }
339    }
340}
341
342impl CompressionConfig {
343    /// Creates a new compression configuration
344    pub fn new(algorithm: CompressionAlgorithm) -> Self {
345        Self {
346            algorithm,
347            ..Default::default()
348        }
349    }
350
351    /// Sets compression level
352    pub fn with_level(mut self, level: CompressionLevel) -> Self {
353        self.level = level;
354        self
355    }
356
357    /// Sets dictionary
358    pub fn with_dictionary(mut self, dictionary: Vec<u8>) -> Self {
359        self.dictionary = Some(dictionary);
360        self
361    }
362
363    /// Sets window size
364    pub fn with_window_size(mut self, size: u32) -> Self {
365        self.window_size = Some(size);
366        self
367    }
368
369    /// Sets parallel processing
370    pub fn with_parallel_processing(mut self, enabled: bool) -> Self {
371        self.parallel_processing = enabled;
372        self
373    }
374
375    /// Creates a speed-optimized configuration
376    pub fn for_speed(algorithm: CompressionAlgorithm) -> Self {
377        Self {
378            algorithm,
379            level: CompressionLevel::Fastest,
380            dictionary: None,
381            window_size: None,
382            parallel_processing: true,
383        }
384    }
385
386    /// Creates a ratio-optimized configuration
387    pub fn for_ratio(algorithm: CompressionAlgorithm) -> Self {
388        Self {
389            algorithm,
390            level: CompressionLevel::Best,
391            dictionary: None,
392            window_size: None,
393            parallel_processing: false, // Better compression with single thread
394        }
395    }
396}
397
398/// Implementation of `FromParameters` for type-safe config extraction.
399///
400/// This implementation converts `StageConfiguration.parameters` HashMap
401/// into a typed `CompressionConfig` object.
402///
403/// ## Expected Parameters
404///
405/// - **algorithm** (required): Compression algorithm name
406///   - Valid values: "brotli", "gzip", "zstd", "lz4"
407///   - Example: `"algorithm" => "brotli"`
408///
409/// - **level** (optional): Compression level (1-19 depending on algorithm)
410///   - Default: 6 (balanced)
411///   - Example: `"level" => "9"`
412///
413/// - **parallel_processing** (optional): Enable parallel compression
414///   - Valid values: "true", "false"
415///   - Default: false
416///   - Example: `"parallel_processing" => "true"`
417///
418/// ## Usage Example
419///
420/// ```rust
421/// use adaptive_pipeline_domain::services::{CompressionConfig, FromParameters};
422/// use std::collections::HashMap;
423///
424/// let mut params = HashMap::new();
425/// params.insert("algorithm".to_string(), "brotli".to_string());
426/// params.insert("level".to_string(), "9".to_string());
427///
428/// let config = CompressionConfig::from_parameters(&params).unwrap();
429/// ```
430impl super::stage_service::FromParameters for CompressionConfig {
431    fn from_parameters(params: &std::collections::HashMap<String, String>) -> Result<Self, PipelineError> {
432        // Required: algorithm
433        let algorithm_str = params
434            .get("algorithm")
435            .ok_or_else(|| PipelineError::MissingParameter("algorithm".into()))?;
436
437        let algorithm = match algorithm_str.to_lowercase().as_str() {
438            "brotli" => CompressionAlgorithm::Brotli,
439            "gzip" => CompressionAlgorithm::Gzip,
440            "zstd" => CompressionAlgorithm::Zstd,
441            "lz4" => CompressionAlgorithm::Lz4,
442            other => CompressionAlgorithm::Custom(other.to_string()),
443        };
444
445        // Optional: level (default to balanced)
446        let level = params
447            .get("level")
448            .and_then(|s| s.parse::<u32>().ok())
449            .map(CompressionLevel::Custom)
450            .unwrap_or(CompressionLevel::Balanced);
451
452        // Optional: parallel_processing (default to false)
453        let parallel_processing = params
454            .get("parallel_processing")
455            .and_then(|s| s.parse::<bool>().ok())
456            .unwrap_or(false);
457
458        Ok(Self {
459            algorithm,
460            level,
461            dictionary: None,  // Not supported via parameters yet
462            window_size: None, // Not supported via parameters yet
463            parallel_processing,
464        })
465    }
466}
467
468impl CompressionLevel {
469    /// Gets the numeric level for the compression algorithm
470    pub fn to_numeric(&self, algorithm: &CompressionAlgorithm) -> u32 {
471        match (self, algorithm) {
472            (CompressionLevel::Fastest, CompressionAlgorithm::Brotli) => 1,
473            (CompressionLevel::Fast, CompressionAlgorithm::Brotli) => 3,
474            (CompressionLevel::Balanced, CompressionAlgorithm::Brotli) => 6,
475            (CompressionLevel::Best, CompressionAlgorithm::Brotli) => 11,
476
477            (CompressionLevel::Fastest, CompressionAlgorithm::Gzip) => 1,
478            (CompressionLevel::Fast, CompressionAlgorithm::Gzip) => 3,
479            (CompressionLevel::Balanced, CompressionAlgorithm::Gzip) => 6,
480            (CompressionLevel::Best, CompressionAlgorithm::Gzip) => 9,
481
482            (CompressionLevel::Fastest, CompressionAlgorithm::Zstd) => 1,
483            (CompressionLevel::Fast, CompressionAlgorithm::Zstd) => 3,
484            (CompressionLevel::Balanced, CompressionAlgorithm::Zstd) => 9,
485            (CompressionLevel::Best, CompressionAlgorithm::Zstd) => 19,
486
487            (CompressionLevel::Custom(level), _) => *level,
488
489            _ => 6, // Default balanced level
490        }
491    }
492}
493
494impl std::fmt::Display for CompressionAlgorithm {
495    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
496        match self {
497            CompressionAlgorithm::Brotli => write!(f, "Brotli"),
498            CompressionAlgorithm::Gzip => write!(f, "Gzip"),
499            CompressionAlgorithm::Zstd => write!(f, "Zstd"),
500            CompressionAlgorithm::Lz4 => write!(f, "LZ4"),
501            CompressionAlgorithm::Custom(name) => write!(f, "Custom({})", name),
502        }
503    }
504}
505
506impl std::fmt::Display for CompressionLevel {
507    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
508        match self {
509            CompressionLevel::Fastest => write!(f, "Fastest"),
510            CompressionLevel::Fast => write!(f, "Fast"),
511            CompressionLevel::Balanced => write!(f, "Balanced"),
512            CompressionLevel::Best => write!(f, "Best"),
513            CompressionLevel::Custom(level) => write!(f, "Custom({})", level),
514        }
515    }
516}