adaptive_pipeline_domain/services/
file_io_service.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # File I/O Service Interface
9//!
10//! Domain service trait for efficient file operations with chunked reading,
11//! memory mapping for large files, streaming support, and async I/O.
12//! Configurable chunk size, buffer size, and concurrency limits. Provides
13//! checksum verification, metadata extraction, and comprehensive error
14//! handling. Thread-safe operations. See mdBook for configuration and
15//! optimization strategies.
16//! - **Partial Results**: Return partial results when possible
17//! - **Resource Cleanup**: Automatic cleanup on errors
18//!
19//! ## Performance Considerations
20//!
21//! ### Memory Usage
22//!
23//! - **Streaming**: Process files without loading entirely into memory
24//! - **Memory Mapping**: Efficient memory usage for large files
25//! - **Buffer Management**: Efficient buffer allocation and reuse
26//!
27//! ### I/O Optimization
28//!
29//! - **Sequential Access**: Optimize for sequential file access patterns
30//! - **Prefetching**: Intelligent prefetching for better performance
31//! - **Caching**: File system cache utilization
32//!
33//! ## Integration
34//!
35//! The file I/O service integrates with:
36//!
37//! - **File Processor**: Used by file processor for chunk-based processing
38//! - **Pipeline Service**: Integrated into pipeline processing workflow
39//! - **Storage Systems**: Abstracts various storage backend implementations
40//! - **Monitoring**: Provides metrics for I/O operations
41//!
42//! ## Thread Safety
43//!
44//! The service interface is designed for thread safety:
45//!
46//! - **Concurrent Operations**: Safe concurrent access to file operations
47//! - **Resource Sharing**: Safe sharing of file handles and resources
48//! - **State Management**: Thread-safe state management
49//!
50//! ## Future Enhancements
51//!
52//! Planned enhancements include:
53//!
54//! - **Compression**: Built-in compression for file operations
55//! - **Encryption**: Transparent encryption/decryption during I/O
56//! - **Network Storage**: Support for network-based storage systems
57//! - **Caching**: Intelligent caching layer for frequently accessed files
58//!
59//! ## Architecture Note - Infrastructure Port
60//!
61//! **Important:** This service trait is **async** and represents an
62//! **infrastructure port**, not a pure domain service. This is an intentional
63//! exception to the "domain traits should be sync" principle.
64//!
65//! ### Why FileIOService is Async
66//!
67//! File I/O operations are inherently I/O-bound, not CPU-bound:
68//! - **I/O-Bound Operations**: File operations involve waiting for disk I/O
69//! - **Non-Blocking Benefits**: Async I/O prevents blocking the runtime
70//! - **tokio Integration**: Async file operations integrate naturally with
71//!   tokio
72//! - **Performance**: Async I/O provides better concurrency for I/O operations
73//!
74//! ### Architectural Classification
75//!
76//! This trait is classified as an **infrastructure port** rather than a domain
77//! service:
78//! - **Domain Services**: CPU-bound business logic (compression, encryption,
79//!   checksums)
80//! - **Infrastructure Ports**: I/O-bound operations (file I/O, network,
81//!   database)
82//!
83//! ### Design Trade-offs
84//!
85//! We considered making this sync (using std::fs) but chose async because:
86//! 1. Most of the application uses tokio async runtime
87//! 2. File operations benefit from non-blocking I/O
88//! 3. Alternative would be to use blocking thread pool, adding complexity
89//! 4. The trait is already an infrastructure concern (port/interface)
90//!
91//! ### References
92//!
93//! See REFACTORING_STATUS.md Phase 1, item 2 for full discussion.
94
95use crate::{FileChunk, PipelineError};
96use async_trait::async_trait;
97use std::path::Path;
98use std::sync::Arc;
99
100/// Configuration for file I/O operations
101///
102/// This struct encapsulates all configuration parameters for file I/O
103/// operations, providing fine-grained control over performance, memory usage,
104/// and behavior.
105///
106/// # Key Configuration Areas
107///
108/// - **Chunk Processing**: Default chunk size and chunking behavior
109/// - **Memory Management**: Memory mapping thresholds and buffer sizes
110/// - **Concurrency**: Limits on concurrent operations
111/// - **Verification**: Checksum verification settings
112/// - **Performance**: Various performance optimization settings
113///
114/// # Examples
115#[derive(Debug, Clone)]
116pub struct FileIOConfig {
117    /// Default chunk size for reading files
118    pub default_chunk_size: usize,
119    /// Maximum file size for memory mapping (in bytes)
120    pub max_mmap_size: u64,
121    /// Whether to use memory mapping for large files
122    pub enable_memory_mapping: bool,
123    /// Buffer size for streaming operations
124    pub buffer_size: usize,
125    /// Whether to verify checksums during read operations
126    pub verify_checksums: bool,
127    /// Maximum number of concurrent file operations
128    pub max_concurrent_operations: usize,
129}
130
131impl Default for FileIOConfig {
132    fn default() -> Self {
133        Self {
134            default_chunk_size: 1024 * 1024,   // 1MB (matches ChunkSize minimum)
135            max_mmap_size: 1024 * 1024 * 1024, // 1GB
136            enable_memory_mapping: true,
137            buffer_size: 8192, // 8KB
138            verify_checksums: true,
139            max_concurrent_operations: 10,
140        }
141    }
142}
143
144/// Information about a file being processed
145#[derive(Debug, Clone)]
146pub struct FileInfo {
147    /// File path
148    pub path: std::path::PathBuf,
149    /// File size in bytes
150    pub size: u64,
151    /// Whether the file is memory-mapped
152    pub is_memory_mapped: bool,
153    /// File modification time
154    pub modified_at: std::time::SystemTime,
155    /// File creation time
156    pub created_at: std::time::SystemTime,
157    /// File permissions (Unix-style)
158    pub permissions: u32,
159    /// MIME type if detectable
160    pub mime_type: Option<String>,
161}
162
163/// Statistics for file I/O operations
164#[derive(Debug, Clone, Default)]
165pub struct FileIOStats {
166    /// Total bytes read
167    pub bytes_read: u64,
168    /// Total bytes written
169    pub bytes_written: u64,
170    /// Number of chunks processed
171    pub chunks_processed: u64,
172    /// Number of files processed
173    pub files_processed: u64,
174    /// Number of memory-mapped files
175    pub memory_mapped_files: u64,
176    /// Total processing time in milliseconds
177    pub total_processing_time_ms: u64,
178    /// Number of checksum verifications
179    pub checksum_verifications: u64,
180    /// Number of failed operations
181    pub failed_operations: u64,
182}
183
184/// Options for reading files
185#[derive(Debug, Clone)]
186pub struct ReadOptions {
187    /// Chunk size for reading
188    pub chunk_size: Option<usize>,
189    /// Starting offset
190    pub start_offset: Option<u64>,
191    /// Maximum bytes to read
192    pub max_bytes: Option<u64>,
193    /// Whether to calculate checksums
194    pub calculate_checksums: bool,
195    /// Whether to use memory mapping if available
196    pub use_memory_mapping: bool,
197}
198
199impl Default for ReadOptions {
200    fn default() -> Self {
201        Self {
202            chunk_size: None,
203            start_offset: None,
204            max_bytes: None,
205            calculate_checksums: true,
206            use_memory_mapping: true,
207        }
208    }
209}
210
211/// Options for writing files
212#[derive(Debug, Clone)]
213pub struct WriteOptions {
214    /// Whether to append to existing file
215    pub append: bool,
216    /// Whether to create parent directories
217    pub create_dirs: bool,
218    /// File permissions to set
219    pub permissions: Option<u32>,
220    /// Whether to sync to disk immediately
221    pub sync: bool,
222    /// Whether to calculate checksums
223    pub calculate_checksums: bool,
224}
225
226impl Default for WriteOptions {
227    fn default() -> Self {
228        Self {
229            append: false,
230            create_dirs: true,
231            permissions: None,
232            sync: false,
233            calculate_checksums: true,
234        }
235    }
236}
237
238/// Result of a file read operation
239#[derive(Debug)]
240pub struct ReadResult {
241    /// File chunks read
242    pub chunks: Vec<FileChunk>,
243    /// File information
244    pub file_info: FileInfo,
245    /// Total bytes read
246    pub bytes_read: u64,
247    /// Whether the entire file was read
248    pub complete: bool,
249}
250
251/// Result of a file write operation
252#[derive(Debug)]
253pub struct WriteResult {
254    /// File path written to
255    pub path: std::path::PathBuf,
256    /// Total bytes written
257    pub bytes_written: u64,
258    /// File checksum if calculated
259    pub checksum: Option<String>,
260    /// Whether the operation was successful
261    pub success: bool,
262}
263
264/// Trait for file I/O operations with memory mapping support
265#[async_trait]
266pub trait FileIOService: Send + Sync {
267    /// Reads a file and returns it as chunks
268    async fn read_file_chunks(&self, path: &Path, options: ReadOptions) -> Result<ReadResult, PipelineError>;
269
270    /// Reads a file using memory mapping if possible
271    async fn read_file_mmap(&self, path: &Path, options: ReadOptions) -> Result<ReadResult, PipelineError>;
272
273    /// Writes chunks to a file
274    async fn write_file_chunks(
275        &self,
276        path: &Path,
277        chunks: &[FileChunk],
278        options: WriteOptions,
279    ) -> Result<WriteResult, PipelineError>;
280
281    /// Writes data directly to a file
282    async fn write_file_data(
283        &self,
284        path: &Path,
285        data: &[u8],
286        options: WriteOptions,
287    ) -> Result<WriteResult, PipelineError>;
288
289    /// Gets information about a file
290    async fn get_file_info(&self, path: &Path) -> Result<FileInfo, PipelineError>;
291
292    /// Checks if a file exists
293    async fn file_exists(&self, path: &Path) -> Result<bool, PipelineError>;
294
295    /// Deletes a file
296    async fn delete_file(&self, path: &Path) -> Result<(), PipelineError>;
297
298    /// Copies a file
299    async fn copy_file(
300        &self,
301        source: &Path,
302        destination: &Path,
303        options: WriteOptions,
304    ) -> Result<WriteResult, PipelineError>;
305
306    /// Moves a file
307    async fn move_file(
308        &self,
309        source: &Path,
310        destination: &Path,
311        options: WriteOptions,
312    ) -> Result<WriteResult, PipelineError>;
313
314    /// Creates a directory
315    async fn create_directory(&self, path: &Path) -> Result<(), PipelineError>;
316
317    /// Checks if a directory exists
318    async fn directory_exists(&self, path: &Path) -> Result<bool, PipelineError>;
319
320    /// Lists files in a directory
321    async fn list_directory(&self, path: &Path) -> Result<Vec<FileInfo>, PipelineError>;
322
323    /// Gets the current configuration
324    fn get_config(&self) -> FileIOConfig;
325
326    /// Updates the configuration
327    fn update_config(&mut self, config: FileIOConfig);
328
329    /// Gets I/O statistics
330    fn get_stats(&self) -> FileIOStats;
331
332    /// Resets I/O statistics
333    fn reset_stats(&mut self);
334
335    /// Validates file integrity using checksums
336    async fn validate_file_integrity(&self, path: &Path, expected_checksum: &str) -> Result<bool, PipelineError>;
337
338    /// Calculates file checksum
339    async fn calculate_file_checksum(&self, path: &Path) -> Result<String, PipelineError>;
340
341    /// Streams file chunks for processing
342    async fn stream_file_chunks(
343        &self,
344        path: &Path,
345        options: ReadOptions,
346    ) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<FileChunk, PipelineError>> + Send>>, PipelineError>;
347
348    /// Writes a single chunk to a file (for streaming writes)
349    async fn write_chunk_to_file(
350        &self,
351        path: &Path,
352        chunk: &FileChunk,
353        options: WriteOptions,
354        is_first_chunk: bool,
355    ) -> Result<WriteResult, PipelineError>;
356}
357
358/// Implementation of FileIOService for `Arc<dyn FileIOService>`
359/// This enables shared ownership of FileIOService trait objects
360#[async_trait]
361impl FileIOService for Arc<dyn FileIOService> {
362    async fn read_file_chunks(&self, path: &Path, options: ReadOptions) -> Result<ReadResult, PipelineError> {
363        (**self).read_file_chunks(path, options).await
364    }
365
366    async fn read_file_mmap(&self, path: &Path, options: ReadOptions) -> Result<ReadResult, PipelineError> {
367        (**self).read_file_mmap(path, options).await
368    }
369
370    async fn write_file_chunks(
371        &self,
372        path: &Path,
373        chunks: &[FileChunk],
374        options: WriteOptions,
375    ) -> Result<WriteResult, PipelineError> {
376        (**self).write_file_chunks(path, chunks, options).await
377    }
378
379    async fn write_file_data(
380        &self,
381        path: &Path,
382        data: &[u8],
383        options: WriteOptions,
384    ) -> Result<WriteResult, PipelineError> {
385        (**self).write_file_data(path, data, options).await
386    }
387
388    async fn get_file_info(&self, path: &Path) -> Result<FileInfo, PipelineError> {
389        (**self).get_file_info(path).await
390    }
391
392    async fn file_exists(&self, path: &Path) -> Result<bool, PipelineError> {
393        (**self).file_exists(path).await
394    }
395
396    async fn delete_file(&self, path: &Path) -> Result<(), PipelineError> {
397        (**self).delete_file(path).await
398    }
399
400    async fn copy_file(
401        &self,
402        source: &Path,
403        destination: &Path,
404        options: WriteOptions,
405    ) -> Result<WriteResult, PipelineError> {
406        (**self).copy_file(source, destination, options).await
407    }
408
409    async fn move_file(
410        &self,
411        source: &Path,
412        destination: &Path,
413        options: WriteOptions,
414    ) -> Result<WriteResult, PipelineError> {
415        (**self).move_file(source, destination, options).await
416    }
417
418    async fn create_directory(&self, path: &Path) -> Result<(), PipelineError> {
419        (**self).create_directory(path).await
420    }
421
422    async fn directory_exists(&self, path: &Path) -> Result<bool, PipelineError> {
423        (**self).directory_exists(path).await
424    }
425
426    async fn list_directory(&self, path: &Path) -> Result<Vec<FileInfo>, PipelineError> {
427        (**self).list_directory(path).await
428    }
429
430    fn get_config(&self) -> FileIOConfig {
431        (**self).get_config()
432    }
433
434    fn update_config(&mut self, _config: FileIOConfig) {
435        // Note: This is a no-op for Arc since we can't get mutable access
436        // In practice, config updates should be done through the concrete type
437        // This is intentionally a no-op to avoid panicking in production code
438    }
439
440    fn get_stats(&self) -> FileIOStats {
441        (**self).get_stats()
442    }
443
444    fn reset_stats(&mut self) {
445        // Note: This is a no-op for Arc since we can't get mutable access
446        // In practice, stats resets should be done through the concrete type
447        // This is intentionally a no-op to avoid panicking in production code
448    }
449
450    async fn validate_file_integrity(&self, path: &Path, expected_checksum: &str) -> Result<bool, PipelineError> {
451        (**self).validate_file_integrity(path, expected_checksum).await
452    }
453
454    async fn calculate_file_checksum(&self, path: &Path) -> Result<String, PipelineError> {
455        (**self).calculate_file_checksum(path).await
456    }
457
458    async fn stream_file_chunks(
459        &self,
460        path: &Path,
461        options: ReadOptions,
462    ) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<FileChunk, PipelineError>> + Send>>, PipelineError>
463    {
464        (**self).stream_file_chunks(path, options).await
465    }
466
467    async fn write_chunk_to_file(
468        &self,
469        path: &Path,
470        chunk: &FileChunk,
471        options: WriteOptions,
472        is_first_chunk: bool,
473    ) -> Result<WriteResult, PipelineError> {
474        (**self).write_chunk_to_file(path, chunk, options, is_first_chunk).await
475    }
476}