adaptive_pipeline_domain/services/file_processor_service.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # File Processor Service Interface
9//!
10//! This module defines the domain service interface for file processing
11//! operations within the adaptive pipeline system. It provides abstractions for
12//! coordinating file processing workflows, chunk management, and processing
13//! statistics.
14//!
15//! ## Overview
16//!
17//! The file processor service provides:
18//!
19//! - **File Processing Coordination**: Orchestrates file processing workflows
20//! - **Chunk Management**: Manages file chunking and chunk processing
21//! - **Processing Statistics**: Collects and reports processing metrics
22//! - **Error Handling**: Comprehensive error handling and recovery
23//! - **Parallel Processing**: Support for parallel chunk processing
24//!
25//! ## Architecture
26//!
27//! The service follows Domain-Driven Design principles:
28//!
29//! - **Domain Interface**: `FileProcessorService` trait defines the contract
30//! - **Configuration**: `FileProcessorConfig` encapsulates processing
31//! parameters
32//! - **Chunk Processing**: `ChunkProcessor` trait for pluggable processing
33//! logic
34//! - **Statistics**: Comprehensive processing statistics and metrics
35//!
36//! ## Key Features
37//!
38//! ### File Processing Workflow
39//!
40//! - **File Analysis**: Analyze files to determine optimal processing strategy
41//! - **Chunk Creation**: Divide files into appropriately sized chunks
42//! - **Parallel Processing**: Process chunks concurrently for better
43//! performance
44//! - **Result Aggregation**: Collect and aggregate processing results
45//!
46//! ### Chunk Processing
47//!
48//! - **Pluggable Processors**: Support for custom chunk processing logic
49//! - **Processing Pipeline**: Chain multiple processors for complex workflows
50//! - **Error Isolation**: Isolate errors to individual chunks when possible
51//! - **Progress Tracking**: Real-time progress monitoring and reporting
52//!
53//! ### Performance Optimization
54//!
55//! - **Adaptive Chunking**: Dynamic chunk size adjustment based on performance
56//! - **Memory Management**: Efficient memory usage with chunk recycling
57//! - **Parallel Execution**: Configurable parallel processing capabilities
58//! - **Resource Management**: Intelligent resource allocation and cleanup
59//!
60//! ## Usage Examples
61//!
62//! ### Basic File Processing
63
64//!
65//! ### Custom Chunk Processor
66
67//!
68//! ### Parallel Processing
69
70//!
71//! ## Configuration
72//!
73//! ### File Processor Configuration
74//!
75//! The service behavior is controlled through `FileProcessorConfig`:
76//!
77//! - **File Size Limits**: Maximum file size for processing
78//! - **Chunk Size**: Preferred chunk size for processing
79//! - **Memory Mapping**: Enable/disable memory mapping for large files
80//! - **Concurrency**: Maximum number of concurrent file operations
81//! - **Integrity Verification**: Enable/disable file integrity checks
82//! - **Temporary Directory**: Location for intermediate processing files
83//!
84//! ### Performance Tuning
85//!
86//! - **Chunk Size**: Optimize chunk size based on processing characteristics
87//! - **Concurrency**: Balance concurrency with system resources
88//! - **Memory Mapping**: Use memory mapping for large files
89//! - **Buffer Management**: Efficient buffer allocation and reuse
90//!
91//! ## Processing Statistics
92//!
93//! ### Collected Metrics
94//!
95//! - **Processing Time**: Total and per-chunk processing times
96//! - **Throughput**: Processing throughput in bytes/second
97//! - **Chunk Statistics**: Number of chunks processed and their sizes
98//! - **Error Rates**: Processing error rates and failure analysis
99//! - **Resource Usage**: Memory and CPU usage during processing
100//!
101//! ### Performance Analysis
102//!
103//! - **Bottleneck Identification**: Identify processing bottlenecks
104//! - **Optimization Recommendations**: Suggest configuration optimizations
105//! - **Trend Analysis**: Track performance trends over time
106//!
107//! ## Error Handling
108//!
109//! ### Processing Errors
110//!
111//! - **Chunk-Level Errors**: Isolate errors to individual chunks
112//! - **File-Level Errors**: Handle file-level processing failures
113//! - **System Errors**: Handle system resource and I/O errors
114//! - **Configuration Errors**: Validate configuration parameters
115//!
116//! ### Recovery Strategies
117//!
118//! - **Retry Logic**: Automatic retry for transient failures
119//! - **Partial Processing**: Continue processing unaffected chunks
120//! - **Fallback Processing**: Alternative processing strategies
121//! - **Error Reporting**: Detailed error context and suggestions
122//!
123//! ## Integration
124//!
125//! The file processor service integrates with:
126//!
127//! - **File I/O Service**: Uses file I/O service for reading and writing
128//! - **Chunk Processors**: Coordinates with pluggable chunk processors
129//! - **Pipeline Service**: Integrated into pipeline processing workflow
130//! - **Metrics Service**: Reports processing metrics and statistics
131//!
132//! ## Thread Safety
133//!
134//! The service interface is designed for thread safety:
135//!
136//! - **Concurrent Processing**: Safe concurrent processing of multiple files
137//! - **Shared Resources**: Safe sharing of processing resources
138//! - **State Management**: Thread-safe state management and coordination
139//!
140//! ## Future Enhancements
141//!
142//! Planned enhancements include:
143//!
144//! - **Streaming Processing**: Real-time streaming file processing
145//! - **Distributed Processing**: Support for distributed chunk processing
146//! - **Adaptive Optimization**: Automatic optimization based on performance
147//! - **Advanced Scheduling**: Sophisticated chunk scheduling strategies
148
149use crate::{FileChunk, PipelineError};
150use async_trait::async_trait;
151use std::collections::HashMap;
152use std::path::Path;
153// Note: FileIOService imports moved to infrastructure layer to maintain Clean
154// Architecture
155
156// NOTE: FileProcessorService is async (infrastructure port - involves I/O
157// operations). ChunkProcessor is synchronous (domain service - CPU-bound
158// processing). See file_io_service.rs for explanation of I/O-bound vs CPU-bound
159// async decisions.
160
161/// Configuration for file processing operations
162///
163/// This struct encapsulates all configuration parameters for file processing
164/// operations, providing control over performance, resource usage, and
165/// behavior.
166///
167/// # Key Configuration Areas
168///
169/// - **File Limits**: Maximum file size and processing constraints
170/// - **Chunk Processing**: Chunk size and chunking behavior
171/// - **Memory Management**: Memory mapping and resource allocation
172/// - **Concurrency**: Parallel processing and resource limits
173/// - **Integrity**: File integrity verification settings
174/// - **Storage**: Temporary file and directory management
175///
176/// # Examples
177#[derive(Debug, Clone)]
178pub struct FileProcessorConfig {
179 /// Maximum file size to process (in bytes)
180 pub max_file_size: u64,
181 /// Preferred chunk size for processing
182 pub processing_chunk_size: usize,
183 /// Whether to use memory mapping for large files
184 pub use_memory_mapping: bool,
185 /// Maximum number of concurrent file operations
186 pub max_concurrent_files: usize,
187 /// Whether to verify file integrity before processing
188 pub verify_integrity: bool,
189 /// Temporary directory for intermediate files
190 pub temp_dir: Option<std::path::PathBuf>,
191}
192
193impl Default for FileProcessorConfig {
194 fn default() -> Self {
195 Self {
196 max_file_size: 10 * 1024 * 1024 * 1024, // 10GB
197 processing_chunk_size: 1024 * 1024, // 1MB
198 use_memory_mapping: true,
199 max_concurrent_files: 4,
200 verify_integrity: true,
201 temp_dir: None,
202 }
203 }
204}
205
206/// Statistics for file processing operations
207#[derive(Debug, Clone, Default)]
208pub struct FileProcessingStats {
209 /// Total files processed
210 pub files_processed: u64,
211 /// Total bytes processed
212 pub bytes_processed: u64,
213 /// Total processing time in milliseconds
214 pub total_processing_time_ms: u64,
215 /// Number of files that used memory mapping
216 pub memory_mapped_files: u64,
217 /// Number of integrity check failures
218 pub integrity_failures: u64,
219 /// Number of processing errors
220 pub processing_errors: u64,
221 /// Average processing speed (bytes per second)
222 pub avg_processing_speed: f64,
223}
224
225/// Result of file processing operation
226#[derive(Debug)]
227pub struct FileProcessingResult {
228 /// Input file path
229 pub input_path: std::path::PathBuf,
230 /// Output file path (if applicable)
231 pub output_path: Option<std::path::PathBuf>,
232 /// Number of chunks processed
233 pub chunks_processed: u64,
234 /// Total bytes processed
235 pub bytes_processed: u64,
236 /// Processing time in milliseconds
237 pub processing_time_ms: u64,
238 /// Whether memory mapping was used
239 pub used_memory_mapping: bool,
240 /// File integrity status
241 pub integrity_verified: bool,
242 /// Processing metadata
243 pub metadata: HashMap<String, String>,
244}
245
246/// Trait for processing files with the pipeline system
247#[async_trait]
248pub trait FileProcessorService: Send + Sync {
249 /// Processes a single file through the pipeline
250 async fn process_file(
251 &self,
252 input_path: &Path,
253 output_path: Option<&Path>,
254 processor: Box<dyn ChunkProcessor>,
255 ) -> Result<FileProcessingResult, PipelineError>;
256
257 /// Processes multiple files concurrently
258 async fn process_files_batch(
259 &self,
260 file_pairs: Vec<(std::path::PathBuf, Option<std::path::PathBuf>)>,
261 processor: Box<dyn ChunkProcessor>,
262 ) -> Result<Vec<FileProcessingResult>, PipelineError>;
263
264 /// Processes a file in-place (modifying the original)
265 async fn process_file_in_place(
266 &self,
267 file_path: &Path,
268 processor: Box<dyn ChunkProcessor>,
269 ) -> Result<FileProcessingResult, PipelineError>;
270
271 /// Validates file integrity before processing
272 async fn validate_file_before_processing(&self, file_path: &Path) -> Result<bool, PipelineError>;
273
274 /// Gets processing statistics
275 fn get_processing_stats(&self) -> FileProcessingStats;
276
277 /// Resets processing statistics
278 fn reset_processing_stats(&mut self);
279
280 /// Gets the current configuration
281 fn get_config(&self) -> FileProcessorConfig;
282
283 /// Updates the configuration
284 fn update_config(&mut self, config: FileProcessorConfig);
285}
286
287/// Trait for processing individual file chunks
288///
289/// ## Developer Notes - Immutable Processing Pattern
290/// This trait follows DDD Value Object principles where FileChunk is immutable.
291/// Instead of mutating chunks, processors return new chunk instances.
292/// This ensures data integrity and prevents accidental mutations.
293///
294/// ## Architecture Note - Synchronous Domain Service
295///
296/// This trait is **synchronous** following DDD principles. Chunk processing
297/// is CPU-bound (compression, encryption, checksums), not I/O-bound.
298/// The domain layer defines *what* operations exist, not *how* they execute.
299///
300/// For async contexts, infrastructure adapters can wrap chunk processors
301/// using `tokio::spawn_blocking` or similar mechanisms.
302///
303/// ### Usage Pattern:
304pub trait ChunkProcessor: Send + Sync {
305 /// Processes a single chunk of data and returns a new processed chunk
306 ///
307 /// # Arguments
308 /// * `chunk` - The input chunk to process (immutable reference)
309 ///
310 /// # Returns
311 /// * `Ok(FileChunk)` - New chunk with processing results
312 /// * `Err(PipelineError)` - Processing failed
313 ///
314 /// # Developer Notes
315 /// - Input chunk is never modified (immutability)
316 /// - Return new chunk with changes applied
317 /// - Use chunk.with_data() or chunk.with_checksum() for modifications
318 ///
319 /// # Note on Async
320 ///
321 /// This method is synchronous (CPU-bound operations). For async contexts,
322 /// use infrastructure adapters that wrap this in `tokio::spawn_blocking`.
323 fn process_chunk(&self, chunk: &FileChunk) -> Result<FileChunk, PipelineError>;
324
325 /// Returns the processor name for logging/debugging
326 fn name(&self) -> &str;
327
328 /// Returns whether this processor modifies chunk data
329 fn modifies_data(&self) -> bool;
330
331 /// Returns whether this processor requires sequential processing
332 fn requires_sequential_processing(&self) -> bool {
333 false
334 }
335}
336
337/// Generic service adapter for chunk processing
338/// This adapter allows any service implementing the appropriate trait to be
339/// used as a ChunkProcessor
340#[allow(dead_code)]
341pub struct ServiceAdapter<T> {
342 service: T,
343 name: String,
344}
345
346impl<T> ServiceAdapter<T> {
347 pub fn new(service: T, name: String) -> Self {
348 Self { service, name }
349 }
350}
351
352// Note: Specific ChunkProcessor implementations for
353// ServiceAdapter<CompressionService> and ServiceAdapter<EncryptionService>
354// should be implemented in the infrastructure layer to maintain proper
355// dependency direction and Clean Architecture principles.
356
357/// Processor that applies multiple processors in sequence
358pub struct ChainProcessor {
359 pub processors: Vec<Box<dyn ChunkProcessor>>,
360}
361
362impl ChunkProcessor for ChainProcessor {
363 fn process_chunk(&self, chunk: &FileChunk) -> Result<FileChunk, PipelineError> {
364 let mut current_chunk = chunk.clone();
365 for processor in &self.processors {
366 current_chunk = processor.process_chunk(¤t_chunk)?;
367 }
368 Ok(current_chunk)
369 }
370
371 fn name(&self) -> &str {
372 "ChainProcessor"
373 }
374
375 fn modifies_data(&self) -> bool {
376 self.processors.iter().any(|p| p.modifies_data())
377 }
378
379 fn requires_sequential_processing(&self) -> bool {
380 self.processors.iter().any(|p| p.requires_sequential_processing())
381 }
382}