adaptive_pipeline/infrastructure/adapters/chunk_processor_adapters.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8// Infrastructure module - contains future features not yet fully utilized
9#![allow(dead_code, unused_imports, unused_variables)]
10//! # Chunk Processor Adapters
11//!
12//! This module provides adapter implementations that bridge domain services
13//! with the chunk processing interface. These adapters enable domain services
14//! to be used as chunk processors in the file processing pipeline.
15//!
16//! ## Overview
17//!
18//! The chunk processor adapters provide:
19//!
20//! - **Service Integration**: Bridge domain services with chunk processing
21//! - **Type Safety**: Generic adapters with compile-time type checking
22//! - **Configuration**: Flexible configuration for different service types
23//! - **Reusability**: Reusable adapters for common service patterns
24//! - **Performance**: Efficient adaptation with minimal overhead
25//!
26//! ## Architecture
27//!
28//! The adapters follow the Adapter pattern:
29//!
30//! - **Generic Design**: Generic adapters that work with any service type
31//! - **Service Wrapping**: Wrap domain services to implement ChunkProcessor
32//! - **Configuration-Driven**: Behavior controlled through configuration
33//! - **Async Operations**: Full async support for non-blocking operations
34//!
35//! ## Adapter Types
36//!
37//! ### Service Chunk Adapter
38//!
39//! Generic adapter that can wrap any service:
40//! - **Compression Services**: Adapt compression services for chunk processing
41//! - **Encryption Services**: Adapt encryption services for chunk processing
42//! - **Custom Services**: Adapt any domain service with appropriate interface
43//!
44//! ### Specialized Adapters
45//!
46//! - **Compression Adapter**: Specialized adapter for compression services
47//! - **Encryption Adapter**: Specialized adapter for encryption services
48//! - **Validation Adapter**: Specialized adapter for validation services
49//!
50//! ## Usage Examples
51//!
52//! ### Basic Service Adaptation
53
54//!
55//! ### Compression Service Adaptation
56
57//!
58//! ### Encryption Service Adaptation
59
60//!
61//! ### Pipeline Integration
62
63//!
64//! ## Adapter Configuration
65//!
66//! ### Configuration Options
67//!
68//! - **modifies_data**: Whether the adapter modifies chunk data
69//!
70//! ### Performance Tuning
71//!
72//! - **Batch Processing**: Process multiple chunks in batches
73//! - **Memory Management**: Efficient memory usage and cleanup
74//! - **Async Operations**: Non-blocking operations for better throughput
75//!
76//! ## Performance Considerations
77//!
78//! ### Adaptation Overhead
79//!
80//! - **Minimal Overhead**: Adapters add minimal performance overhead
81//! - **Zero-Cost Abstractions**: Generic design enables compiler optimizations
82//! - **Efficient Wrapping**: Direct service calls without unnecessary
83//! indirection
84//!
85//! ### Memory Usage
86//!
87//! - **Shared Services**: Services are shared via Arc for memory efficiency
88//! - **Chunk Copying**: Minimal chunk copying during adaptation
89//! - **Resource Cleanup**: Automatic cleanup of adapter resources
90//!
91//! ### Concurrency
92//!
93//! - **Thread Safety**: All adapters are thread-safe
94//! - **Concurrent Processing**: Support for concurrent chunk processing
95//! - **Lock-Free Operations**: Lock-free operations where possible
96//!
97//! ## Error Handling
98//!
99//! ### Adapter Errors
100//!
101//! - **Service Errors**: Proper propagation of service errors
102//! - **Configuration Errors**: Validation of adapter configuration
103//! - **Processing Errors**: Comprehensive error context and recovery
104//!
105//! ### Error Recovery
106//!
107//! - **Graceful Degradation**: Graceful handling of service failures
108//! - **Fallback Processing**: Alternative processing strategies
109//!
110//! ## Integration
111//!
112//! The adapters integrate with:
113//!
114//! - **Domain Services**: Bridge domain services with chunk processing
115//! - **File Processor**: Used by file processor service for chunk processing
116//! - **Pipeline System**: Integrate with pipeline processing workflow
117//! - **Configuration System**: Support for runtime configuration
118//!
119//! ## Thread Safety
120//!
121//! All adapters are fully thread-safe:
122//!
123//! - **Shared Services**: Services are shared safely via Arc
124//! - **Concurrent Access**: Safe concurrent access to adapter methods
125//! - **Immutable Configuration**: Configuration is immutable after creation
126//!
127//! ## Future Enhancements
128//!
129//! Planned enhancements include:
130//!
131//! - **Retry Logic**: Configurable retry logic for transient failures
132//! - **Security Context Enforcement**: Permission-based operation validation
133//! - **Dynamic Adaptation**: Runtime adaptation of service behavior
134//! - **Metrics Integration**: Built-in metrics collection and reporting
135//! - **Caching**: Intelligent caching of processing results
136//! - **Load Balancing**: Load balancing across multiple service instances
137
138use adaptive_pipeline_domain::services::compression_service::{
139 CompressionAlgorithm,
140 CompressionConfig,
141 CompressionLevel,
142 CompressionService,
143};
144use adaptive_pipeline_domain::services::encryption_service::{
145 EncryptionAlgorithm,
146 EncryptionConfig,
147 EncryptionService,
148 KeyDerivationFunction,
149 KeyMaterial,
150};
151use adaptive_pipeline_domain::services::file_processor_service::ChunkProcessor;
152use adaptive_pipeline_domain::{ FileChunk, PipelineError, ProcessingContext, SecurityContext };
153use std::sync::Arc;
154
155/// Generic adapter that wraps any service as a ChunkProcessor
156///
157/// This adapter provides a generic way to wrap any domain service and use it
158/// as a chunk processor in the file processing pipeline. It uses generics to
159/// provide type-safe, reusable chunk processing capabilities.
160///
161/// # Key Features
162///
163/// - **Generic Design**: Works with any service type that implements required
164/// traits
165/// - **Type Safety**: Compile-time type checking for service compatibility
166/// - **Configuration**: Flexible configuration for different service behaviors
167/// - **Performance**: Minimal overhead adaptation with efficient service calls
168/// - **Thread Safety**: Full thread safety with Arc-based service sharing
169///
170/// # Examples
171///
172///
173/// # Generic Type Parameter
174///
175/// The type parameter `T` represents the service being adapted:
176/// - Must implement the required service interface
177/// - Can be any domain service (compression, encryption, validation, etc.)
178/// - Uses `?Sized` to support trait objects
179///
180/// Uses generics to provide type-safe, reusable chunk processing capabilities
181pub struct ServiceChunkAdapter<T: ?Sized> {
182 service: Arc<T>,
183 name: String,
184 config: AdapterConfig,
185}
186
187/// Configuration for service adapters (generic)
188#[derive(Debug, Clone)]
189pub struct AdapterConfig {
190 pub modifies_data: bool,
191
192 // TODO(v2.0 - Security Context Enforcement): Implement security validation
193 // Currently defined but not enforced anywhere in the codebase.
194 //
195 // To implement this feature:
196 // 1. Accept SecurityContext via adapter configuration or constructor
197 // 2. Validate SecurityContext.can_encrypt()/can_compress() before operations
198 // 3. Return PipelineError::SecurityViolation if permissions insufficient
199 // 4. Document security requirements in adapter method docs
200 //
201 // Related:
202 // - Code review Comments 5 & 6
203 // - See docs/roadmap.md for security enforcement design
204 // - ProcessingContext already carries SecurityContext (can be used)
205 //
206 // pub requires_security_context: bool,
207}
208
209/// Configuration for compression adapters with typed configuration
210#[derive(Debug, Clone)]
211pub struct CompressionAdapterConfig {
212 pub modifies_data: bool,
213 pub compression_config: CompressionConfig,
214}
215
216/// Configuration for encryption adapters with required key material
217#[derive(Debug, Clone)]
218pub struct EncryptionAdapterConfig {
219 pub modifies_data: bool,
220 pub encryption_config: EncryptionConfig,
221 pub key_material: KeyMaterial,
222}
223
224impl<T: ?Sized> ServiceChunkAdapter<T> {
225 pub fn new(service: Arc<T>, name: String, config: AdapterConfig) -> Self {
226 Self { service, name, config }
227 }
228}
229
230/// Compression service adapter with typed configuration
231///
232/// This adapter requires compression configuration to be provided explicitly
233/// following the dependency injection pattern.
234pub struct CompressionChunkAdapter {
235 service: Arc<dyn CompressionService>,
236 name: String,
237 config: CompressionAdapterConfig,
238}
239
240impl CompressionChunkAdapter {
241 /// Creates a new compression adapter with required configuration
242 ///
243 /// # Arguments
244 ///
245 /// * `service` - The compression service implementation
246 /// * `name` - Name for this adapter instance
247 /// * `config` - Configuration including compression settings
248 pub fn new(
249 service: Arc<dyn CompressionService>,
250 name: String,
251 config: CompressionAdapterConfig,
252 ) -> Self {
253 Self { service, name, config }
254 }
255}
256
257impl ChunkProcessor for CompressionChunkAdapter {
258 fn process_chunk(&self, chunk: &FileChunk) -> Result<FileChunk, PipelineError> {
259 // Create a minimal processing context for the service
260 // NOTE: File paths are managed by CpuWorkerContext (DI pattern), not here
261 let security_context = SecurityContext::new(
262 Some("chunk_processor".to_string()),
263 adaptive_pipeline_domain::entities::security_context::SecurityLevel::Internal
264 );
265 let mut processing_context = ProcessingContext::new(
266 chunk.data().len() as u64,
267 security_context
268 );
269
270 // Use the compression config provided via configuration (DI pattern)
271 let compressed_chunk = self.service.compress_chunk(
272 chunk.clone(),
273 &self.config.compression_config,
274 &mut processing_context
275 )?;
276
277 // Return the compressed chunk (already processed by the service)
278 Ok(compressed_chunk)
279 }
280
281 fn name(&self) -> &str {
282 &self.name
283 }
284
285 fn modifies_data(&self) -> bool {
286 self.config.modifies_data
287 }
288}
289
290/// Encryption service adapter with required configuration
291///
292/// SECURITY: This adapter requires encryption configuration and key material
293/// to be provided explicitly. It will NOT create insecure default keys.
294pub struct EncryptionChunkAdapter {
295 service: Arc<dyn EncryptionService>,
296 name: String,
297 config: EncryptionAdapterConfig,
298}
299
300impl EncryptionChunkAdapter {
301 /// Creates a new encryption adapter with required configuration
302 ///
303 /// # Security
304 ///
305 /// - Requires explicit `EncryptionConfig` with algorithm settings
306 /// - Requires explicit `KeyMaterial` - will NOT generate insecure defaults
307 /// - Validates configuration before accepting
308 ///
309 /// # Arguments
310 ///
311 /// * `service` - The encryption service implementation
312 /// * `name` - Name for this adapter instance
313 /// * `config` - Configuration including encryption settings and key material
314 pub fn new(
315 service: Arc<dyn EncryptionService>,
316 name: String,
317 config: EncryptionAdapterConfig,
318 ) -> Self {
319 Self { service, name, config }
320 }
321}
322
323impl ChunkProcessor for EncryptionChunkAdapter {
324 fn process_chunk(&self, chunk: &FileChunk) -> Result<FileChunk, PipelineError> {
325 // Create a minimal security context for the service
326 // NOTE: File paths are managed by CpuWorkerContext (DI pattern), not here
327 let security_context = SecurityContext::new(
328 Some("chunk_processor".to_string()),
329 adaptive_pipeline_domain::entities::security_context::SecurityLevel::Internal
330 );
331 let mut processing_context = ProcessingContext::new(
332 chunk.data().len() as u64,
333 security_context
334 );
335
336 // Use the encryption config and key material provided via configuration
337 // SECURITY: No insecure defaults - config must be provided explicitly
338 let encrypted_chunk = self.service.encrypt_chunk(
339 chunk.clone(),
340 &self.config.encryption_config,
341 &self.config.key_material,
342 &mut processing_context
343 )?;
344
345 // Return the encrypted chunk (already processed by the service)
346 Ok(encrypted_chunk)
347 }
348
349 fn name(&self) -> &str {
350 &self.name
351 }
352
353 fn modifies_data(&self) -> bool {
354 self.config.modifies_data
355 }
356}
357
358/// Factory functions for creating service adapters
359impl CompressionChunkAdapter {
360 /// Creates a new compression adapter with provided configuration
361 ///
362 /// # Arguments
363 ///
364 /// * `service` - The compression service implementation
365 /// * `name` - Optional name for this adapter (defaults to "CompressionAdapter")
366 /// * `compression_config` - Compression algorithm and parameters
367 pub fn new_compression_adapter(
368 service: Arc<dyn CompressionService>,
369 name: Option<String>,
370 compression_config: CompressionConfig,
371 ) -> Self {
372 Self::new(
373 service,
374 name.unwrap_or_else(|| "CompressionAdapter".to_string()),
375 CompressionAdapterConfig {
376 modifies_data: true,
377 compression_config,
378 }
379 )
380 }
381}
382
383impl EncryptionChunkAdapter {
384 /// Creates a new encryption adapter with provided configuration
385 ///
386 /// # Security
387 ///
388 /// Caller MUST provide:
389 /// - Valid encryption configuration
390 /// - Secure key material (NOT zero-filled or weak keys)
391 ///
392 /// # Arguments
393 ///
394 /// * `service` - The encryption service implementation
395 /// * `name` - Optional name for this adapter (defaults to "EncryptionAdapter")
396 /// * `encryption_config` - Encryption algorithm and parameters
397 /// * `key_material` - Cryptographic key material (must be secure!)
398 pub fn new_encryption_adapter(
399 service: Arc<dyn EncryptionService>,
400 name: Option<String>,
401 encryption_config: EncryptionConfig,
402 key_material: KeyMaterial,
403 ) -> Self {
404 Self::new(
405 service,
406 name.unwrap_or_else(|| "EncryptionAdapter".to_string()),
407 EncryptionAdapterConfig {
408 modifies_data: true,
409 encryption_config,
410 key_material,
411 }
412 )
413 }
414}
415
416/// Generic factory for creating any service adapter
417pub struct ServiceAdapterFactory;
418
419impl ServiceAdapterFactory {
420 /// Create a compression chunk adapter with required configuration
421 ///
422 /// # Arguments
423 ///
424 /// * `service` - The compression service implementation
425 /// * `compression_config` - Compression algorithm and parameters
426 pub fn create_compression_adapter(
427 service: Arc<dyn CompressionService>,
428 compression_config: CompressionConfig,
429 ) -> Box<dyn ChunkProcessor> {
430 Box::new(CompressionChunkAdapter::new_compression_adapter(
431 service,
432 None,
433 compression_config,
434 ))
435 }
436
437 /// Create an encryption chunk adapter with required configuration
438 ///
439 /// # Security
440 ///
441 /// Caller MUST provide secure key material. This factory will NOT
442 /// generate insecure defaults.
443 ///
444 /// # Arguments
445 ///
446 /// * `service` - The encryption service implementation
447 /// * `encryption_config` - Encryption algorithm and parameters
448 /// * `key_material` - Cryptographic key material (must be secure!)
449 pub fn create_encryption_adapter(
450 service: Arc<dyn EncryptionService>,
451 encryption_config: EncryptionConfig,
452 key_material: KeyMaterial,
453 ) -> Box<dyn ChunkProcessor> {
454 Box::new(EncryptionChunkAdapter::new_encryption_adapter(
455 service,
456 None,
457 encryption_config,
458 key_material,
459 ))
460 }
461
462 /// Create a custom service adapter with specific configuration
463 pub fn create_custom_adapter<T: Send + Sync + 'static>(
464 service: Arc<T>,
465 name: String,
466 config: AdapterConfig
467 ) -> ServiceChunkAdapter<T> {
468 ServiceChunkAdapter::new(service, name, config)
469 }
470}