adaptive_pipeline/infrastructure/adapters/
chunk_processor_adapters.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8// Infrastructure module - contains future features not yet fully utilized
9#![allow(dead_code, unused_imports, unused_variables)]
10//! # Chunk Processor Adapters
11//!
12//! This module provides adapter implementations that bridge domain services
13//! with the chunk processing interface. These adapters enable domain services
14//! to be used as chunk processors in the file processing pipeline.
15//!
16//! ## Overview
17//!
18//! The chunk processor adapters provide:
19//!
20//! - **Service Integration**: Bridge domain services with chunk processing
21//! - **Type Safety**: Generic adapters with compile-time type checking
22//! - **Configuration**: Flexible configuration for different service types
23//! - **Reusability**: Reusable adapters for common service patterns
24//! - **Performance**: Efficient adaptation with minimal overhead
25//!
26//! ## Architecture
27//!
28//! The adapters follow the Adapter pattern:
29//!
30//! - **Generic Design**: Generic adapters that work with any service type
31//! - **Service Wrapping**: Wrap domain services to implement ChunkProcessor
32//! - **Configuration-Driven**: Behavior controlled through configuration
33//! - **Async Operations**: Full async support for non-blocking operations
34//!
35//! ## Adapter Types
36//!
37//! ### Service Chunk Adapter
38//!
39//! Generic adapter that can wrap any service:
40//! - **Compression Services**: Adapt compression services for chunk processing
41//! - **Encryption Services**: Adapt encryption services for chunk processing
42//! - **Custom Services**: Adapt any domain service with appropriate interface
43//!
44//! ### Specialized Adapters
45//!
46//! - **Compression Adapter**: Specialized adapter for compression services
47//! - **Encryption Adapter**: Specialized adapter for encryption services
48//! - **Validation Adapter**: Specialized adapter for validation services
49//!
50//! ## Usage Examples
51//!
52//! ### Basic Service Adaptation
53
54//!
55//! ### Compression Service Adaptation
56
57//!
58//! ### Encryption Service Adaptation
59
60//!
61//! ### Pipeline Integration
62
63//!
64//! ## Adapter Configuration
65//!
66//! ### Configuration Options
67//!
68//! - **modifies_data**: Whether the adapter modifies chunk data
69//!
70//! ### Performance Tuning
71//!
72//! - **Batch Processing**: Process multiple chunks in batches
73//! - **Memory Management**: Efficient memory usage and cleanup
74//! - **Async Operations**: Non-blocking operations for better throughput
75//!
76//! ## Performance Considerations
77//!
78//! ### Adaptation Overhead
79//!
80//! - **Minimal Overhead**: Adapters add minimal performance overhead
81//! - **Zero-Cost Abstractions**: Generic design enables compiler optimizations
82//! - **Efficient Wrapping**: Direct service calls without unnecessary
83//!   indirection
84//!
85//! ### Memory Usage
86//!
87//! - **Shared Services**: Services are shared via Arc for memory efficiency
88//! - **Chunk Copying**: Minimal chunk copying during adaptation
89//! - **Resource Cleanup**: Automatic cleanup of adapter resources
90//!
91//! ### Concurrency
92//!
93//! - **Thread Safety**: All adapters are thread-safe
94//! - **Concurrent Processing**: Support for concurrent chunk processing
95//! - **Lock-Free Operations**: Lock-free operations where possible
96//!
97//! ## Error Handling
98//!
99//! ### Adapter Errors
100//!
101//! - **Service Errors**: Proper propagation of service errors
102//! - **Configuration Errors**: Validation of adapter configuration
103//! - **Processing Errors**: Comprehensive error context and recovery
104//!
105//! ### Error Recovery
106//!
107//! - **Graceful Degradation**: Graceful handling of service failures
108//! - **Fallback Processing**: Alternative processing strategies
109//!
110//! ## Integration
111//!
112//! The adapters integrate with:
113//!
114//! - **Domain Services**: Bridge domain services with chunk processing
115//! - **File Processor**: Used by file processor service for chunk processing
116//! - **Pipeline System**: Integrate with pipeline processing workflow
117//! - **Configuration System**: Support for runtime configuration
118//!
119//! ## Thread Safety
120//!
121//! All adapters are fully thread-safe:
122//!
123//! - **Shared Services**: Services are shared safely via Arc
124//! - **Concurrent Access**: Safe concurrent access to adapter methods
125//! - **Immutable Configuration**: Configuration is immutable after creation
126//!
127//! ## Future Enhancements
128//!
129//! Planned enhancements include:
130//!
131//! - **Retry Logic**: Configurable retry logic for transient failures
132//! - **Security Context Enforcement**: Permission-based operation validation
133//! - **Dynamic Adaptation**: Runtime adaptation of service behavior
134//! - **Metrics Integration**: Built-in metrics collection and reporting
135//! - **Caching**: Intelligent caching of processing results
136//! - **Load Balancing**: Load balancing across multiple service instances
137
138use adaptive_pipeline_domain::services::compression_service::{
139    CompressionAlgorithm,
140    CompressionConfig,
141    CompressionLevel,
142    CompressionService,
143};
144use adaptive_pipeline_domain::services::encryption_service::{
145    EncryptionAlgorithm,
146    EncryptionConfig,
147    EncryptionService,
148    KeyDerivationFunction,
149    KeyMaterial,
150};
151use adaptive_pipeline_domain::services::file_processor_service::ChunkProcessor;
152use adaptive_pipeline_domain::{ FileChunk, PipelineError, ProcessingContext, SecurityContext };
153use std::sync::Arc;
154
155/// Generic adapter that wraps any service as a ChunkProcessor
156///
157/// This adapter provides a generic way to wrap any domain service and use it
158/// as a chunk processor in the file processing pipeline. It uses generics to
159/// provide type-safe, reusable chunk processing capabilities.
160///
161/// # Key Features
162///
163/// - **Generic Design**: Works with any service type that implements required
164///   traits
165/// - **Type Safety**: Compile-time type checking for service compatibility
166/// - **Configuration**: Flexible configuration for different service behaviors
167/// - **Performance**: Minimal overhead adaptation with efficient service calls
168/// - **Thread Safety**: Full thread safety with Arc-based service sharing
169///
170/// # Examples
171///
172///
173/// # Generic Type Parameter
174///
175/// The type parameter `T` represents the service being adapted:
176/// - Must implement the required service interface
177/// - Can be any domain service (compression, encryption, validation, etc.)
178/// - Uses `?Sized` to support trait objects
179///
180/// Uses generics to provide type-safe, reusable chunk processing capabilities
181pub struct ServiceChunkAdapter<T: ?Sized> {
182    service: Arc<T>,
183    name: String,
184    config: AdapterConfig,
185}
186
187/// Configuration for service adapters (generic)
188#[derive(Debug, Clone)]
189pub struct AdapterConfig {
190    pub modifies_data: bool,
191
192    // TODO(v2.0 - Security Context Enforcement): Implement security validation
193    // Currently defined but not enforced anywhere in the codebase.
194    //
195    // To implement this feature:
196    // 1. Accept SecurityContext via adapter configuration or constructor
197    // 2. Validate SecurityContext.can_encrypt()/can_compress() before operations
198    // 3. Return PipelineError::SecurityViolation if permissions insufficient
199    // 4. Document security requirements in adapter method docs
200    //
201    // Related:
202    // - Code review Comments 5 & 6
203    // - See docs/roadmap.md for security enforcement design
204    // - ProcessingContext already carries SecurityContext (can be used)
205    //
206    // pub requires_security_context: bool,
207}
208
209/// Configuration for compression adapters with typed configuration
210#[derive(Debug, Clone)]
211pub struct CompressionAdapterConfig {
212    pub modifies_data: bool,
213    pub compression_config: CompressionConfig,
214}
215
216/// Configuration for encryption adapters with required key material
217#[derive(Debug, Clone)]
218pub struct EncryptionAdapterConfig {
219    pub modifies_data: bool,
220    pub encryption_config: EncryptionConfig,
221    pub key_material: KeyMaterial,
222}
223
224impl<T: ?Sized> ServiceChunkAdapter<T> {
225    pub fn new(service: Arc<T>, name: String, config: AdapterConfig) -> Self {
226        Self { service, name, config }
227    }
228}
229
230/// Compression service adapter with typed configuration
231///
232/// This adapter requires compression configuration to be provided explicitly
233/// following the dependency injection pattern.
234pub struct CompressionChunkAdapter {
235    service: Arc<dyn CompressionService>,
236    name: String,
237    config: CompressionAdapterConfig,
238}
239
240impl CompressionChunkAdapter {
241    /// Creates a new compression adapter with required configuration
242    ///
243    /// # Arguments
244    ///
245    /// * `service` - The compression service implementation
246    /// * `name` - Name for this adapter instance
247    /// * `config` - Configuration including compression settings
248    pub fn new(
249        service: Arc<dyn CompressionService>,
250        name: String,
251        config: CompressionAdapterConfig,
252    ) -> Self {
253        Self { service, name, config }
254    }
255}
256
257impl ChunkProcessor for CompressionChunkAdapter {
258    fn process_chunk(&self, chunk: &FileChunk) -> Result<FileChunk, PipelineError> {
259        // Create a minimal processing context for the service
260        // NOTE: File paths are managed by CpuWorkerContext (DI pattern), not here
261        let security_context = SecurityContext::new(
262            Some("chunk_processor".to_string()),
263            adaptive_pipeline_domain::entities::security_context::SecurityLevel::Internal
264        );
265        let mut processing_context = ProcessingContext::new(
266            chunk.data().len() as u64,
267            security_context
268        );
269
270        // Use the compression config provided via configuration (DI pattern)
271        let compressed_chunk = self.service.compress_chunk(
272            chunk.clone(),
273            &self.config.compression_config,
274            &mut processing_context
275        )?;
276
277        // Return the compressed chunk (already processed by the service)
278        Ok(compressed_chunk)
279    }
280
281    fn name(&self) -> &str {
282        &self.name
283    }
284
285    fn modifies_data(&self) -> bool {
286        self.config.modifies_data
287    }
288}
289
290/// Encryption service adapter with required configuration
291///
292/// SECURITY: This adapter requires encryption configuration and key material
293/// to be provided explicitly. It will NOT create insecure default keys.
294pub struct EncryptionChunkAdapter {
295    service: Arc<dyn EncryptionService>,
296    name: String,
297    config: EncryptionAdapterConfig,
298}
299
300impl EncryptionChunkAdapter {
301    /// Creates a new encryption adapter with required configuration
302    ///
303    /// # Security
304    ///
305    /// - Requires explicit `EncryptionConfig` with algorithm settings
306    /// - Requires explicit `KeyMaterial` - will NOT generate insecure defaults
307    /// - Validates configuration before accepting
308    ///
309    /// # Arguments
310    ///
311    /// * `service` - The encryption service implementation
312    /// * `name` - Name for this adapter instance
313    /// * `config` - Configuration including encryption settings and key material
314    pub fn new(
315        service: Arc<dyn EncryptionService>,
316        name: String,
317        config: EncryptionAdapterConfig,
318    ) -> Self {
319        Self { service, name, config }
320    }
321}
322
323impl ChunkProcessor for EncryptionChunkAdapter {
324    fn process_chunk(&self, chunk: &FileChunk) -> Result<FileChunk, PipelineError> {
325        // Create a minimal security context for the service
326        // NOTE: File paths are managed by CpuWorkerContext (DI pattern), not here
327        let security_context = SecurityContext::new(
328            Some("chunk_processor".to_string()),
329            adaptive_pipeline_domain::entities::security_context::SecurityLevel::Internal
330        );
331        let mut processing_context = ProcessingContext::new(
332            chunk.data().len() as u64,
333            security_context
334        );
335
336        // Use the encryption config and key material provided via configuration
337        // SECURITY: No insecure defaults - config must be provided explicitly
338        let encrypted_chunk = self.service.encrypt_chunk(
339            chunk.clone(),
340            &self.config.encryption_config,
341            &self.config.key_material,
342            &mut processing_context
343        )?;
344
345        // Return the encrypted chunk (already processed by the service)
346        Ok(encrypted_chunk)
347    }
348
349    fn name(&self) -> &str {
350        &self.name
351    }
352
353    fn modifies_data(&self) -> bool {
354        self.config.modifies_data
355    }
356}
357
358/// Factory functions for creating service adapters
359impl CompressionChunkAdapter {
360    /// Creates a new compression adapter with provided configuration
361    ///
362    /// # Arguments
363    ///
364    /// * `service` - The compression service implementation
365    /// * `name` - Optional name for this adapter (defaults to "CompressionAdapter")
366    /// * `compression_config` - Compression algorithm and parameters
367    pub fn new_compression_adapter(
368        service: Arc<dyn CompressionService>,
369        name: Option<String>,
370        compression_config: CompressionConfig,
371    ) -> Self {
372        Self::new(
373            service,
374            name.unwrap_or_else(|| "CompressionAdapter".to_string()),
375            CompressionAdapterConfig {
376                modifies_data: true,
377                compression_config,
378            }
379        )
380    }
381}
382
383impl EncryptionChunkAdapter {
384    /// Creates a new encryption adapter with provided configuration
385    ///
386    /// # Security
387    ///
388    /// Caller MUST provide:
389    /// - Valid encryption configuration
390    /// - Secure key material (NOT zero-filled or weak keys)
391    ///
392    /// # Arguments
393    ///
394    /// * `service` - The encryption service implementation
395    /// * `name` - Optional name for this adapter (defaults to "EncryptionAdapter")
396    /// * `encryption_config` - Encryption algorithm and parameters
397    /// * `key_material` - Cryptographic key material (must be secure!)
398    pub fn new_encryption_adapter(
399        service: Arc<dyn EncryptionService>,
400        name: Option<String>,
401        encryption_config: EncryptionConfig,
402        key_material: KeyMaterial,
403    ) -> Self {
404        Self::new(
405            service,
406            name.unwrap_or_else(|| "EncryptionAdapter".to_string()),
407            EncryptionAdapterConfig {
408                modifies_data: true,
409                encryption_config,
410                key_material,
411            }
412        )
413    }
414}
415
416/// Generic factory for creating any service adapter
417pub struct ServiceAdapterFactory;
418
419impl ServiceAdapterFactory {
420    /// Create a compression chunk adapter with required configuration
421    ///
422    /// # Arguments
423    ///
424    /// * `service` - The compression service implementation
425    /// * `compression_config` - Compression algorithm and parameters
426    pub fn create_compression_adapter(
427        service: Arc<dyn CompressionService>,
428        compression_config: CompressionConfig,
429    ) -> Box<dyn ChunkProcessor> {
430        Box::new(CompressionChunkAdapter::new_compression_adapter(
431            service,
432            None,
433            compression_config,
434        ))
435    }
436
437    /// Create an encryption chunk adapter with required configuration
438    ///
439    /// # Security
440    ///
441    /// Caller MUST provide secure key material. This factory will NOT
442    /// generate insecure defaults.
443    ///
444    /// # Arguments
445    ///
446    /// * `service` - The encryption service implementation
447    /// * `encryption_config` - Encryption algorithm and parameters
448    /// * `key_material` - Cryptographic key material (must be secure!)
449    pub fn create_encryption_adapter(
450        service: Arc<dyn EncryptionService>,
451        encryption_config: EncryptionConfig,
452        key_material: KeyMaterial,
453    ) -> Box<dyn ChunkProcessor> {
454        Box::new(EncryptionChunkAdapter::new_encryption_adapter(
455            service,
456            None,
457            encryption_config,
458            key_material,
459        ))
460    }
461
462    /// Create a custom service adapter with specific configuration
463    pub fn create_custom_adapter<T: Send + Sync + 'static>(
464        service: Arc<T>,
465        name: String,
466        config: AdapterConfig
467    ) -> ServiceChunkAdapter<T> {
468        ServiceChunkAdapter::new(service, name, config)
469    }
470}