adaptive_pipeline_domain/services/
file_processor_service.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # File Processor Service Interface
9//!
10//! This module defines the domain service interface for file processing
11//! operations within the adaptive pipeline system. It provides abstractions for
12//! coordinating file processing workflows, chunk management, and processing
13//! statistics.
14//!
15//! ## Overview
16//!
17//! The file processor service provides:
18//!
19//! - **File Processing Coordination**: Orchestrates file processing workflows
20//! - **Chunk Management**: Manages file chunking and chunk processing
21//! - **Processing Statistics**: Collects and reports processing metrics
22//! - **Error Handling**: Comprehensive error handling and recovery
23//! - **Parallel Processing**: Support for parallel chunk processing
24//!
25//! ## Architecture
26//!
27//! The service follows Domain-Driven Design principles:
28//!
29//! - **Domain Interface**: `FileProcessorService` trait defines the contract
30//! - **Configuration**: `FileProcessorConfig` encapsulates processing
31//!   parameters
32//! - **Chunk Processing**: `ChunkProcessor` trait for pluggable processing
33//!   logic
34//! - **Statistics**: Comprehensive processing statistics and metrics
35//!
36//! ## Key Features
37//!
38//! ### File Processing Workflow
39//!
40//! - **File Analysis**: Analyze files to determine optimal processing strategy
41//! - **Chunk Creation**: Divide files into appropriately sized chunks
42//! - **Parallel Processing**: Process chunks concurrently for better
43//!   performance
44//! - **Result Aggregation**: Collect and aggregate processing results
45//!
46//! ### Chunk Processing
47//!
48//! - **Pluggable Processors**: Support for custom chunk processing logic
49//! - **Processing Pipeline**: Chain multiple processors for complex workflows
50//! - **Error Isolation**: Isolate errors to individual chunks when possible
51//! - **Progress Tracking**: Real-time progress monitoring and reporting
52//!
53//! ### Performance Optimization
54//!
55//! - **Adaptive Chunking**: Dynamic chunk size adjustment based on performance
56//! - **Memory Management**: Efficient memory usage with chunk recycling
57//! - **Parallel Execution**: Configurable parallel processing capabilities
58//! - **Resource Management**: Intelligent resource allocation and cleanup
59//!
60//! ## Usage Examples
61//!
62//! ### Basic File Processing
63
64//!
65//! ### Custom Chunk Processor
66
67//!
68//! ### Parallel Processing
69
70//!
71//! ## Configuration
72//!
73//! ### File Processor Configuration
74//!
75//! The service behavior is controlled through `FileProcessorConfig`:
76//!
77//! - **File Size Limits**: Maximum file size for processing
78//! - **Chunk Size**: Preferred chunk size for processing
79//! - **Memory Mapping**: Enable/disable memory mapping for large files
80//! - **Concurrency**: Maximum number of concurrent file operations
81//! - **Integrity Verification**: Enable/disable file integrity checks
82//! - **Temporary Directory**: Location for intermediate processing files
83//!
84//! ### Performance Tuning
85//!
86//! - **Chunk Size**: Optimize chunk size based on processing characteristics
87//! - **Concurrency**: Balance concurrency with system resources
88//! - **Memory Mapping**: Use memory mapping for large files
89//! - **Buffer Management**: Efficient buffer allocation and reuse
90//!
91//! ## Processing Statistics
92//!
93//! ### Collected Metrics
94//!
95//! - **Processing Time**: Total and per-chunk processing times
96//! - **Throughput**: Processing throughput in bytes/second
97//! - **Chunk Statistics**: Number of chunks processed and their sizes
98//! - **Error Rates**: Processing error rates and failure analysis
99//! - **Resource Usage**: Memory and CPU usage during processing
100//!
101//! ### Performance Analysis
102//!
103//! - **Bottleneck Identification**: Identify processing bottlenecks
104//! - **Optimization Recommendations**: Suggest configuration optimizations
105//! - **Trend Analysis**: Track performance trends over time
106//!
107//! ## Error Handling
108//!
109//! ### Processing Errors
110//!
111//! - **Chunk-Level Errors**: Isolate errors to individual chunks
112//! - **File-Level Errors**: Handle file-level processing failures
113//! - **System Errors**: Handle system resource and I/O errors
114//! - **Configuration Errors**: Validate configuration parameters
115//!
116//! ### Recovery Strategies
117//!
118//! - **Retry Logic**: Automatic retry for transient failures
119//! - **Partial Processing**: Continue processing unaffected chunks
120//! - **Fallback Processing**: Alternative processing strategies
121//! - **Error Reporting**: Detailed error context and suggestions
122//!
123//! ## Integration
124//!
125//! The file processor service integrates with:
126//!
127//! - **File I/O Service**: Uses file I/O service for reading and writing
128//! - **Chunk Processors**: Coordinates with pluggable chunk processors
129//! - **Pipeline Service**: Integrated into pipeline processing workflow
130//! - **Metrics Service**: Reports processing metrics and statistics
131//!
132//! ## Thread Safety
133//!
134//! The service interface is designed for thread safety:
135//!
136//! - **Concurrent Processing**: Safe concurrent processing of multiple files
137//! - **Shared Resources**: Safe sharing of processing resources
138//! - **State Management**: Thread-safe state management and coordination
139//!
140//! ## Future Enhancements
141//!
142//! Planned enhancements include:
143//!
144//! - **Streaming Processing**: Real-time streaming file processing
145//! - **Distributed Processing**: Support for distributed chunk processing
146//! - **Adaptive Optimization**: Automatic optimization based on performance
147//! - **Advanced Scheduling**: Sophisticated chunk scheduling strategies
148
149use crate::{FileChunk, PipelineError};
150use async_trait::async_trait;
151use std::collections::HashMap;
152use std::path::Path;
153// Note: FileIOService imports moved to infrastructure layer to maintain Clean
154// Architecture
155
156// NOTE: FileProcessorService is async (infrastructure port - involves I/O
157// operations). ChunkProcessor is synchronous (domain service - CPU-bound
158// processing). See file_io_service.rs for explanation of I/O-bound vs CPU-bound
159// async decisions.
160
161/// Configuration for file processing operations
162///
163/// This struct encapsulates all configuration parameters for file processing
164/// operations, providing control over performance, resource usage, and
165/// behavior.
166///
167/// # Key Configuration Areas
168///
169/// - **File Limits**: Maximum file size and processing constraints
170/// - **Chunk Processing**: Chunk size and chunking behavior
171/// - **Memory Management**: Memory mapping and resource allocation
172/// - **Concurrency**: Parallel processing and resource limits
173/// - **Integrity**: File integrity verification settings
174/// - **Storage**: Temporary file and directory management
175///
176/// # Examples
177#[derive(Debug, Clone)]
178pub struct FileProcessorConfig {
179    /// Maximum file size to process (in bytes)
180    pub max_file_size: u64,
181    /// Preferred chunk size for processing
182    pub processing_chunk_size: usize,
183    /// Whether to use memory mapping for large files
184    pub use_memory_mapping: bool,
185    /// Maximum number of concurrent file operations
186    pub max_concurrent_files: usize,
187    /// Whether to verify file integrity before processing
188    pub verify_integrity: bool,
189    /// Temporary directory for intermediate files
190    pub temp_dir: Option<std::path::PathBuf>,
191}
192
193impl Default for FileProcessorConfig {
194    fn default() -> Self {
195        Self {
196            max_file_size: 10 * 1024 * 1024 * 1024, // 10GB
197            processing_chunk_size: 1024 * 1024,     // 1MB
198            use_memory_mapping: true,
199            max_concurrent_files: 4,
200            verify_integrity: true,
201            temp_dir: None,
202        }
203    }
204}
205
206/// Statistics for file processing operations
207#[derive(Debug, Clone, Default)]
208pub struct FileProcessingStats {
209    /// Total files processed
210    pub files_processed: u64,
211    /// Total bytes processed
212    pub bytes_processed: u64,
213    /// Total processing time in milliseconds
214    pub total_processing_time_ms: u64,
215    /// Number of files that used memory mapping
216    pub memory_mapped_files: u64,
217    /// Number of integrity check failures
218    pub integrity_failures: u64,
219    /// Number of processing errors
220    pub processing_errors: u64,
221    /// Average processing speed (bytes per second)
222    pub avg_processing_speed: f64,
223}
224
225/// Result of file processing operation
226#[derive(Debug)]
227pub struct FileProcessingResult {
228    /// Input file path
229    pub input_path: std::path::PathBuf,
230    /// Output file path (if applicable)
231    pub output_path: Option<std::path::PathBuf>,
232    /// Number of chunks processed
233    pub chunks_processed: u64,
234    /// Total bytes processed
235    pub bytes_processed: u64,
236    /// Processing time in milliseconds
237    pub processing_time_ms: u64,
238    /// Whether memory mapping was used
239    pub used_memory_mapping: bool,
240    /// File integrity status
241    pub integrity_verified: bool,
242    /// Processing metadata
243    pub metadata: HashMap<String, String>,
244}
245
246/// Trait for processing files with the pipeline system
247#[async_trait]
248pub trait FileProcessorService: Send + Sync {
249    /// Processes a single file through the pipeline
250    async fn process_file(
251        &self,
252        input_path: &Path,
253        output_path: Option<&Path>,
254        processor: Box<dyn ChunkProcessor>,
255    ) -> Result<FileProcessingResult, PipelineError>;
256
257    /// Processes multiple files concurrently
258    async fn process_files_batch(
259        &self,
260        file_pairs: Vec<(std::path::PathBuf, Option<std::path::PathBuf>)>,
261        processor: Box<dyn ChunkProcessor>,
262    ) -> Result<Vec<FileProcessingResult>, PipelineError>;
263
264    /// Processes a file in-place (modifying the original)
265    async fn process_file_in_place(
266        &self,
267        file_path: &Path,
268        processor: Box<dyn ChunkProcessor>,
269    ) -> Result<FileProcessingResult, PipelineError>;
270
271    /// Validates file integrity before processing
272    async fn validate_file_before_processing(&self, file_path: &Path) -> Result<bool, PipelineError>;
273
274    /// Gets processing statistics
275    fn get_processing_stats(&self) -> FileProcessingStats;
276
277    /// Resets processing statistics
278    fn reset_processing_stats(&mut self);
279
280    /// Gets the current configuration
281    fn get_config(&self) -> FileProcessorConfig;
282
283    /// Updates the configuration
284    fn update_config(&mut self, config: FileProcessorConfig);
285}
286
287/// Trait for processing individual file chunks
288///
289/// ## Developer Notes - Immutable Processing Pattern
290/// This trait follows DDD Value Object principles where FileChunk is immutable.
291/// Instead of mutating chunks, processors return new chunk instances.
292/// This ensures data integrity and prevents accidental mutations.
293///
294/// ## Architecture Note - Synchronous Domain Service
295///
296/// This trait is **synchronous** following DDD principles. Chunk processing
297/// is CPU-bound (compression, encryption, checksums), not I/O-bound.
298/// The domain layer defines *what* operations exist, not *how* they execute.
299///
300/// For async contexts, infrastructure adapters can wrap chunk processors
301/// using `tokio::spawn_blocking` or similar mechanisms.
302///
303/// ### Usage Pattern:
304pub trait ChunkProcessor: Send + Sync {
305    /// Processes a single chunk of data and returns a new processed chunk
306    ///
307    /// # Arguments
308    /// * `chunk` - The input chunk to process (immutable reference)
309    ///
310    /// # Returns
311    /// * `Ok(FileChunk)` - New chunk with processing results
312    /// * `Err(PipelineError)` - Processing failed
313    ///
314    /// # Developer Notes
315    /// - Input chunk is never modified (immutability)
316    /// - Return new chunk with changes applied
317    /// - Use chunk.with_data() or chunk.with_checksum() for modifications
318    ///
319    /// # Note on Async
320    ///
321    /// This method is synchronous (CPU-bound operations). For async contexts,
322    /// use infrastructure adapters that wrap this in `tokio::spawn_blocking`.
323    fn process_chunk(&self, chunk: &FileChunk) -> Result<FileChunk, PipelineError>;
324
325    /// Returns the processor name for logging/debugging
326    fn name(&self) -> &str;
327
328    /// Returns whether this processor modifies chunk data
329    fn modifies_data(&self) -> bool;
330
331    /// Returns whether this processor requires sequential processing
332    fn requires_sequential_processing(&self) -> bool {
333        false
334    }
335}
336
337/// Generic service adapter for chunk processing
338/// This adapter allows any service implementing the appropriate trait to be
339/// used as a ChunkProcessor
340#[allow(dead_code)]
341pub struct ServiceAdapter<T> {
342    service: T,
343    name: String,
344}
345
346impl<T> ServiceAdapter<T> {
347    pub fn new(service: T, name: String) -> Self {
348        Self { service, name }
349    }
350}
351
352// Note: Specific ChunkProcessor implementations for
353// ServiceAdapter<CompressionService> and ServiceAdapter<EncryptionService>
354// should be implemented in the infrastructure layer to maintain proper
355// dependency direction and Clean Architecture principles.
356
357/// Processor that applies multiple processors in sequence
358pub struct ChainProcessor {
359    pub processors: Vec<Box<dyn ChunkProcessor>>,
360}
361
362impl ChunkProcessor for ChainProcessor {
363    fn process_chunk(&self, chunk: &FileChunk) -> Result<FileChunk, PipelineError> {
364        let mut current_chunk = chunk.clone();
365        for processor in &self.processors {
366            current_chunk = processor.process_chunk(&current_chunk)?;
367        }
368        Ok(current_chunk)
369    }
370
371    fn name(&self) -> &str {
372        "ChainProcessor"
373    }
374
375    fn modifies_data(&self) -> bool {
376        self.processors.iter().any(|p| p.modifies_data())
377    }
378
379    fn requires_sequential_processing(&self) -> bool {
380        self.processors.iter().any(|p| p.requires_sequential_processing())
381    }
382}