adaptive_pipeline_domain/services/file_io_service.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # File I/O Service Interface
9//!
10//! Domain service trait for efficient file operations with chunked reading,
11//! memory mapping for large files, streaming support, and async I/O.
12//! Configurable chunk size, buffer size, and concurrency limits. Provides
13//! checksum verification, metadata extraction, and comprehensive error
14//! handling. Thread-safe operations. See mdBook for configuration and
15//! optimization strategies.
16//! - **Partial Results**: Return partial results when possible
17//! - **Resource Cleanup**: Automatic cleanup on errors
18//!
19//! ## Performance Considerations
20//!
21//! ### Memory Usage
22//!
23//! - **Streaming**: Process files without loading entirely into memory
24//! - **Memory Mapping**: Efficient memory usage for large files
25//! - **Buffer Management**: Efficient buffer allocation and reuse
26//!
27//! ### I/O Optimization
28//!
29//! - **Sequential Access**: Optimize for sequential file access patterns
30//! - **Prefetching**: Intelligent prefetching for better performance
31//! - **Caching**: File system cache utilization
32//!
33//! ## Integration
34//!
35//! The file I/O service integrates with:
36//!
37//! - **File Processor**: Used by file processor for chunk-based processing
38//! - **Pipeline Service**: Integrated into pipeline processing workflow
39//! - **Storage Systems**: Abstracts various storage backend implementations
40//! - **Monitoring**: Provides metrics for I/O operations
41//!
42//! ## Thread Safety
43//!
44//! The service interface is designed for thread safety:
45//!
46//! - **Concurrent Operations**: Safe concurrent access to file operations
47//! - **Resource Sharing**: Safe sharing of file handles and resources
48//! - **State Management**: Thread-safe state management
49//!
50//! ## Future Enhancements
51//!
52//! Planned enhancements include:
53//!
54//! - **Compression**: Built-in compression for file operations
55//! - **Encryption**: Transparent encryption/decryption during I/O
56//! - **Network Storage**: Support for network-based storage systems
57//! - **Caching**: Intelligent caching layer for frequently accessed files
58//!
59//! ## Architecture Note - Infrastructure Port
60//!
61//! **Important:** This service trait is **async** and represents an
62//! **infrastructure port**, not a pure domain service. This is an intentional
63//! exception to the "domain traits should be sync" principle.
64//!
65//! ### Why FileIOService is Async
66//!
67//! File I/O operations are inherently I/O-bound, not CPU-bound:
68//! - **I/O-Bound Operations**: File operations involve waiting for disk I/O
69//! - **Non-Blocking Benefits**: Async I/O prevents blocking the runtime
70//! - **tokio Integration**: Async file operations integrate naturally with
71//! tokio
72//! - **Performance**: Async I/O provides better concurrency for I/O operations
73//!
74//! ### Architectural Classification
75//!
76//! This trait is classified as an **infrastructure port** rather than a domain
77//! service:
78//! - **Domain Services**: CPU-bound business logic (compression, encryption,
79//! checksums)
80//! - **Infrastructure Ports**: I/O-bound operations (file I/O, network,
81//! database)
82//!
83//! ### Design Trade-offs
84//!
85//! We considered making this sync (using std::fs) but chose async because:
86//! 1. Most of the application uses tokio async runtime
87//! 2. File operations benefit from non-blocking I/O
88//! 3. Alternative would be to use blocking thread pool, adding complexity
89//! 4. The trait is already an infrastructure concern (port/interface)
90//!
91//! ### References
92//!
93//! See REFACTORING_STATUS.md Phase 1, item 2 for full discussion.
94
95use crate::{FileChunk, PipelineError};
96use async_trait::async_trait;
97use std::path::Path;
98use std::sync::Arc;
99
100/// Configuration for file I/O operations
101///
102/// This struct encapsulates all configuration parameters for file I/O
103/// operations, providing fine-grained control over performance, memory usage,
104/// and behavior.
105///
106/// # Key Configuration Areas
107///
108/// - **Chunk Processing**: Default chunk size and chunking behavior
109/// - **Memory Management**: Memory mapping thresholds and buffer sizes
110/// - **Concurrency**: Limits on concurrent operations
111/// - **Verification**: Checksum verification settings
112/// - **Performance**: Various performance optimization settings
113///
114/// # Examples
115#[derive(Debug, Clone)]
116pub struct FileIOConfig {
117 /// Default chunk size for reading files
118 pub default_chunk_size: usize,
119 /// Maximum file size for memory mapping (in bytes)
120 pub max_mmap_size: u64,
121 /// Whether to use memory mapping for large files
122 pub enable_memory_mapping: bool,
123 /// Buffer size for streaming operations
124 pub buffer_size: usize,
125 /// Whether to verify checksums during read operations
126 pub verify_checksums: bool,
127 /// Maximum number of concurrent file operations
128 pub max_concurrent_operations: usize,
129}
130
131impl Default for FileIOConfig {
132 fn default() -> Self {
133 Self {
134 default_chunk_size: 1024 * 1024, // 1MB (matches ChunkSize minimum)
135 max_mmap_size: 1024 * 1024 * 1024, // 1GB
136 enable_memory_mapping: true,
137 buffer_size: 8192, // 8KB
138 verify_checksums: true,
139 max_concurrent_operations: 10,
140 }
141 }
142}
143
144/// Information about a file being processed
145#[derive(Debug, Clone)]
146pub struct FileInfo {
147 /// File path
148 pub path: std::path::PathBuf,
149 /// File size in bytes
150 pub size: u64,
151 /// Whether the file is memory-mapped
152 pub is_memory_mapped: bool,
153 /// File modification time
154 pub modified_at: std::time::SystemTime,
155 /// File creation time
156 pub created_at: std::time::SystemTime,
157 /// File permissions (Unix-style)
158 pub permissions: u32,
159 /// MIME type if detectable
160 pub mime_type: Option<String>,
161}
162
163/// Statistics for file I/O operations
164#[derive(Debug, Clone, Default)]
165pub struct FileIOStats {
166 /// Total bytes read
167 pub bytes_read: u64,
168 /// Total bytes written
169 pub bytes_written: u64,
170 /// Number of chunks processed
171 pub chunks_processed: u64,
172 /// Number of files processed
173 pub files_processed: u64,
174 /// Number of memory-mapped files
175 pub memory_mapped_files: u64,
176 /// Total processing time in milliseconds
177 pub total_processing_time_ms: u64,
178 /// Number of checksum verifications
179 pub checksum_verifications: u64,
180 /// Number of failed operations
181 pub failed_operations: u64,
182}
183
184/// Options for reading files
185#[derive(Debug, Clone)]
186pub struct ReadOptions {
187 /// Chunk size for reading
188 pub chunk_size: Option<usize>,
189 /// Starting offset
190 pub start_offset: Option<u64>,
191 /// Maximum bytes to read
192 pub max_bytes: Option<u64>,
193 /// Whether to calculate checksums
194 pub calculate_checksums: bool,
195 /// Whether to use memory mapping if available
196 pub use_memory_mapping: bool,
197}
198
199impl Default for ReadOptions {
200 fn default() -> Self {
201 Self {
202 chunk_size: None,
203 start_offset: None,
204 max_bytes: None,
205 calculate_checksums: true,
206 use_memory_mapping: true,
207 }
208 }
209}
210
211/// Options for writing files
212#[derive(Debug, Clone)]
213pub struct WriteOptions {
214 /// Whether to append to existing file
215 pub append: bool,
216 /// Whether to create parent directories
217 pub create_dirs: bool,
218 /// File permissions to set
219 pub permissions: Option<u32>,
220 /// Whether to sync to disk immediately
221 pub sync: bool,
222 /// Whether to calculate checksums
223 pub calculate_checksums: bool,
224}
225
226impl Default for WriteOptions {
227 fn default() -> Self {
228 Self {
229 append: false,
230 create_dirs: true,
231 permissions: None,
232 sync: false,
233 calculate_checksums: true,
234 }
235 }
236}
237
238/// Result of a file read operation
239#[derive(Debug)]
240pub struct ReadResult {
241 /// File chunks read
242 pub chunks: Vec<FileChunk>,
243 /// File information
244 pub file_info: FileInfo,
245 /// Total bytes read
246 pub bytes_read: u64,
247 /// Whether the entire file was read
248 pub complete: bool,
249}
250
251/// Result of a file write operation
252#[derive(Debug)]
253pub struct WriteResult {
254 /// File path written to
255 pub path: std::path::PathBuf,
256 /// Total bytes written
257 pub bytes_written: u64,
258 /// File checksum if calculated
259 pub checksum: Option<String>,
260 /// Whether the operation was successful
261 pub success: bool,
262}
263
264/// Trait for file I/O operations with memory mapping support
265#[async_trait]
266pub trait FileIOService: Send + Sync {
267 /// Reads a file and returns it as chunks
268 async fn read_file_chunks(&self, path: &Path, options: ReadOptions) -> Result<ReadResult, PipelineError>;
269
270 /// Reads a file using memory mapping if possible
271 async fn read_file_mmap(&self, path: &Path, options: ReadOptions) -> Result<ReadResult, PipelineError>;
272
273 /// Writes chunks to a file
274 async fn write_file_chunks(
275 &self,
276 path: &Path,
277 chunks: &[FileChunk],
278 options: WriteOptions,
279 ) -> Result<WriteResult, PipelineError>;
280
281 /// Writes data directly to a file
282 async fn write_file_data(
283 &self,
284 path: &Path,
285 data: &[u8],
286 options: WriteOptions,
287 ) -> Result<WriteResult, PipelineError>;
288
289 /// Gets information about a file
290 async fn get_file_info(&self, path: &Path) -> Result<FileInfo, PipelineError>;
291
292 /// Checks if a file exists
293 async fn file_exists(&self, path: &Path) -> Result<bool, PipelineError>;
294
295 /// Deletes a file
296 async fn delete_file(&self, path: &Path) -> Result<(), PipelineError>;
297
298 /// Copies a file
299 async fn copy_file(
300 &self,
301 source: &Path,
302 destination: &Path,
303 options: WriteOptions,
304 ) -> Result<WriteResult, PipelineError>;
305
306 /// Moves a file
307 async fn move_file(
308 &self,
309 source: &Path,
310 destination: &Path,
311 options: WriteOptions,
312 ) -> Result<WriteResult, PipelineError>;
313
314 /// Creates a directory
315 async fn create_directory(&self, path: &Path) -> Result<(), PipelineError>;
316
317 /// Checks if a directory exists
318 async fn directory_exists(&self, path: &Path) -> Result<bool, PipelineError>;
319
320 /// Lists files in a directory
321 async fn list_directory(&self, path: &Path) -> Result<Vec<FileInfo>, PipelineError>;
322
323 /// Gets the current configuration
324 fn get_config(&self) -> FileIOConfig;
325
326 /// Updates the configuration
327 fn update_config(&mut self, config: FileIOConfig);
328
329 /// Gets I/O statistics
330 fn get_stats(&self) -> FileIOStats;
331
332 /// Resets I/O statistics
333 fn reset_stats(&mut self);
334
335 /// Validates file integrity using checksums
336 async fn validate_file_integrity(&self, path: &Path, expected_checksum: &str) -> Result<bool, PipelineError>;
337
338 /// Calculates file checksum
339 async fn calculate_file_checksum(&self, path: &Path) -> Result<String, PipelineError>;
340
341 /// Streams file chunks for processing
342 async fn stream_file_chunks(
343 &self,
344 path: &Path,
345 options: ReadOptions,
346 ) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<FileChunk, PipelineError>> + Send>>, PipelineError>;
347
348 /// Writes a single chunk to a file (for streaming writes)
349 async fn write_chunk_to_file(
350 &self,
351 path: &Path,
352 chunk: &FileChunk,
353 options: WriteOptions,
354 is_first_chunk: bool,
355 ) -> Result<WriteResult, PipelineError>;
356}
357
358/// Implementation of FileIOService for `Arc<dyn FileIOService>`
359/// This enables shared ownership of FileIOService trait objects
360#[async_trait]
361impl FileIOService for Arc<dyn FileIOService> {
362 async fn read_file_chunks(&self, path: &Path, options: ReadOptions) -> Result<ReadResult, PipelineError> {
363 (**self).read_file_chunks(path, options).await
364 }
365
366 async fn read_file_mmap(&self, path: &Path, options: ReadOptions) -> Result<ReadResult, PipelineError> {
367 (**self).read_file_mmap(path, options).await
368 }
369
370 async fn write_file_chunks(
371 &self,
372 path: &Path,
373 chunks: &[FileChunk],
374 options: WriteOptions,
375 ) -> Result<WriteResult, PipelineError> {
376 (**self).write_file_chunks(path, chunks, options).await
377 }
378
379 async fn write_file_data(
380 &self,
381 path: &Path,
382 data: &[u8],
383 options: WriteOptions,
384 ) -> Result<WriteResult, PipelineError> {
385 (**self).write_file_data(path, data, options).await
386 }
387
388 async fn get_file_info(&self, path: &Path) -> Result<FileInfo, PipelineError> {
389 (**self).get_file_info(path).await
390 }
391
392 async fn file_exists(&self, path: &Path) -> Result<bool, PipelineError> {
393 (**self).file_exists(path).await
394 }
395
396 async fn delete_file(&self, path: &Path) -> Result<(), PipelineError> {
397 (**self).delete_file(path).await
398 }
399
400 async fn copy_file(
401 &self,
402 source: &Path,
403 destination: &Path,
404 options: WriteOptions,
405 ) -> Result<WriteResult, PipelineError> {
406 (**self).copy_file(source, destination, options).await
407 }
408
409 async fn move_file(
410 &self,
411 source: &Path,
412 destination: &Path,
413 options: WriteOptions,
414 ) -> Result<WriteResult, PipelineError> {
415 (**self).move_file(source, destination, options).await
416 }
417
418 async fn create_directory(&self, path: &Path) -> Result<(), PipelineError> {
419 (**self).create_directory(path).await
420 }
421
422 async fn directory_exists(&self, path: &Path) -> Result<bool, PipelineError> {
423 (**self).directory_exists(path).await
424 }
425
426 async fn list_directory(&self, path: &Path) -> Result<Vec<FileInfo>, PipelineError> {
427 (**self).list_directory(path).await
428 }
429
430 fn get_config(&self) -> FileIOConfig {
431 (**self).get_config()
432 }
433
434 fn update_config(&mut self, _config: FileIOConfig) {
435 // Note: This is a no-op for Arc since we can't get mutable access
436 // In practice, config updates should be done through the concrete type
437 // This is intentionally a no-op to avoid panicking in production code
438 }
439
440 fn get_stats(&self) -> FileIOStats {
441 (**self).get_stats()
442 }
443
444 fn reset_stats(&mut self) {
445 // Note: This is a no-op for Arc since we can't get mutable access
446 // In practice, stats resets should be done through the concrete type
447 // This is intentionally a no-op to avoid panicking in production code
448 }
449
450 async fn validate_file_integrity(&self, path: &Path, expected_checksum: &str) -> Result<bool, PipelineError> {
451 (**self).validate_file_integrity(path, expected_checksum).await
452 }
453
454 async fn calculate_file_checksum(&self, path: &Path) -> Result<String, PipelineError> {
455 (**self).calculate_file_checksum(path).await
456 }
457
458 async fn stream_file_chunks(
459 &self,
460 path: &Path,
461 options: ReadOptions,
462 ) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<FileChunk, PipelineError>> + Send>>, PipelineError>
463 {
464 (**self).stream_file_chunks(path, options).await
465 }
466
467 async fn write_chunk_to_file(
468 &self,
469 path: &Path,
470 chunk: &FileChunk,
471 options: WriteOptions,
472 is_first_chunk: bool,
473 ) -> Result<WriteResult, PipelineError> {
474 (**self).write_chunk_to_file(path, chunk, options, is_first_chunk).await
475 }
476}