adaptive_pipeline/infrastructure/adapters/
file_io.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # File I/O Service Implementation
9//!
10//! This module is part of the Infrastructure layer, providing concrete
11//! implementations of domain interfaces (ports).
12//!
13//! This module provides the concrete implementation of the file I/O service
14//! interface for the adaptive pipeline system. It offers high-performance file
15//! operations with memory mapping support, asynchronous I/O, and comprehensive
16//! error handling.
17//!
18//! ## Overview
19//!
20//! The file I/O service implementation provides:
21//!
22//! - **Memory Mapping**: High-performance file access using memory-mapped files
23//! - **Asynchronous I/O**: Non-blocking file operations using Tokio
24//! - **Chunked Processing**: Efficient processing of large files in chunks
25//! - **Statistics Tracking**: Comprehensive I/O performance metrics
26//! - **Error Handling**: Robust error handling and recovery
27//!
28//! ## Architecture
29//!
30//! The implementation follows the infrastructure layer patterns:
31//!
32//! - **Service Implementation**: `TokioFileIO` implements domain interface
33//! - **Memory Management**: Efficient memory usage with memory mapping
34//! - **Concurrency**: Thread-safe operations with parking_lot RwLock
35//! - **Configuration**: Flexible configuration for different use cases
36//!
37//! ## Performance Features
38//!
39//! ### Memory Mapping
40//!
41//! Uses memory-mapped files for optimal performance:
42//! - **Zero-Copy**: Direct memory access without copying data
43//! - **OS Optimization**: Leverages operating system virtual memory
44//! - **Cache Efficiency**: Better CPU cache utilization
45//! - **Large File Support**: Efficient handling of multi-gigabyte files
46//!
47//! ### Asynchronous Operations
48//!
49//! All I/O operations are asynchronous:
50//! - **Non-Blocking**: Doesn't block the async runtime
51//! - **Concurrent Processing**: Multiple files can be processed simultaneously
52//! - **Resource Efficiency**: Optimal use of system resources
53//! - **Scalability**: Handles high concurrent load
54//!
55//! ### Chunked Processing
56//!
57//! Processes files in configurable chunks:
58//! - **Memory Efficiency**: Constant memory usage regardless of file size
59//! - **Progress Tracking**: Real-time progress monitoring
60//! - **Error Recovery**: Granular error handling per chunk
61//! - **Parallel Processing**: Chunks can be processed in parallel
62//!
63//! ## Configuration Options
64//!
65//! The service supports various configuration options:
66//!
67//! ### Buffer Sizes
68//! - **Read Buffer**: Configurable read buffer size
69//! - **Write Buffer**: Configurable write buffer size
70//! - **Chunk Size**: Optimal chunk size for processing
71//!
72//! ### Memory Mapping
73//! - **Threshold**: Minimum file size for memory mapping
74//! - **Alignment**: Memory alignment for optimal performance
75//! - **Prefetch**: Prefetch strategies for sequential access
76//!
77//! ### Concurrency
78//! - **Thread Pool**: Configurable thread pool size
79//! - **Concurrent Reads**: Maximum concurrent read operations
80//! - **Concurrent Writes**: Maximum concurrent write operations
81//!
82//! ## Usage Examples
83//!
84//! ### Basic File Reading
85
86//!
87//! ### Chunked File Processing
88
89//!
90//! ## Error Handling
91//!
92//! Comprehensive error handling for:
93//! - **File System Errors**: Permission denied, file not found, etc.
94//! - **I/O Errors**: Read/write failures, disk full, etc.
95//! - **Memory Mapping Errors**: Mapping failures, access violations
96//! - **Configuration Errors**: Invalid parameters, resource limits
97//!
98//! ## Performance Characteristics
99//!
100//! ### Throughput
101//! - **High Bandwidth**: Optimized for maximum I/O throughput
102//! - **Low Latency**: Minimal overhead for small operations
103//! - **Scalable**: Performance scales with available system resources
104//!
105//! ### Memory Usage
106//! - **Efficient**: Minimal memory overhead
107//! - **Predictable**: Constant memory usage for chunked processing
108//! - **Configurable**: Tunable memory usage based on requirements
109//!
110//! ## Thread Safety
111//!
112//! The implementation is fully thread-safe:
113//! - **Concurrent Access**: Multiple threads can use the service simultaneously
114//! - **Lock-Free Reads**: Read operations don't block each other
115//! - **Atomic Updates**: Statistics and configuration updates are atomic
116//!
117//! ## Integration
118//!
119//! The service integrates with:
120//! - **Domain Layer**: Implements `FileIOService` trait
121//! - **Processing Pipeline**: Provides file access for pipeline stages
122//! - **Metrics System**: Reports detailed I/O performance metrics
123//! - **Configuration System**: Dynamic configuration updates
124
125use async_trait::async_trait;
126use memmap2::{Mmap, MmapOptions};
127use std::fs::File;
128use std::io::SeekFrom;
129use std::path::Path;
130
131use tokio::fs;
132use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
133
134use parking_lot::RwLock;
135
136use adaptive_pipeline_domain::services::file_io_service::{
137    FileIOConfig, FileIOService, FileIOStats, FileInfo, ReadOptions, ReadResult, WriteOptions, WriteResult,
138};
139use adaptive_pipeline_domain::{FileChunk, PipelineError};
140
141/// Implementation of FileIOService with memory mapping support
142///
143/// This struct provides a high-performance implementation of the file I/O
144/// service interface, featuring memory-mapped file access, asynchronous
145/// operations, and comprehensive statistics tracking.
146///
147/// # Features
148///
149/// - **Memory Mapping**: Uses memory-mapped files for optimal performance
150/// - **Async I/O**: All operations are asynchronous and non-blocking
151/// - **Thread Safety**: Safe for concurrent access from multiple threads
152/// - **Statistics**: Tracks detailed I/O performance metrics
153/// - **Configuration**: Runtime configuration updates supported
154///
155/// # Examples
156pub struct TokioFileIO {
157    config: RwLock<FileIOConfig>,
158    stats: RwLock<FileIOStats>,
159}
160
161impl TokioFileIO {
162    /// Creates a new FileIOService instance
163    pub fn new(config: FileIOConfig) -> Self {
164        Self {
165            config: RwLock::new(config),
166            stats: RwLock::new(FileIOStats::default()),
167        }
168    }
169
170    /// Creates a new FileIOService with default configuration
171    pub fn new_default() -> Self {
172        Self::new(FileIOConfig::default())
173    }
174
175    /// Determines if a file should be memory-mapped based on size and config
176    fn should_use_mmap(&self, file_size: u64) -> bool {
177        let config = self.config.read();
178        config.enable_memory_mapping && file_size <= config.max_mmap_size
179    }
180
181    /// Creates file chunks from memory-mapped data
182    fn create_chunks_from_mmap(
183        &self,
184        mmap: &Mmap,
185        chunk_size: usize,
186        calculate_checksums: bool,
187        start_offset: u64,
188        max_bytes: Option<u64>,
189    ) -> Result<Vec<FileChunk>, PipelineError> {
190        let mut chunks = Vec::new();
191        let data_len = mmap.len() as u64;
192        let start = start_offset.min(data_len);
193        let end = match max_bytes {
194            Some(max) => (start + max).min(data_len),
195            None => data_len,
196        };
197
198        let mut current_offset = start;
199        let mut sequence = 0u64;
200
201        while current_offset < end {
202            let chunk_end = (current_offset + (chunk_size as u64)).min(end) as usize;
203            let chunk_start = current_offset as usize;
204            let chunk_data = mmap[chunk_start..chunk_end].to_vec();
205            let is_final = (chunk_end as u64) >= end;
206
207            let chunk = FileChunk::new(sequence, current_offset, chunk_data, is_final)?;
208
209            let chunk = if calculate_checksums {
210                chunk.with_calculated_checksum()?
211            } else {
212                chunk
213            };
214
215            chunks.push(chunk);
216            current_offset = chunk_end as u64;
217            sequence += 1;
218        }
219
220        Ok(chunks)
221    }
222
223    /// Updates statistics
224    fn update_stats<F>(&self, update_fn: F)
225    where
226        F: FnOnce(&mut FileIOStats),
227    {
228        let mut stats = self.stats.write();
229        update_fn(&mut stats);
230    }
231
232    /// Gets file metadata
233    async fn get_file_metadata(&self, path: &Path) -> Result<std::fs::Metadata, PipelineError> {
234        fs::metadata(path)
235            .await
236            .map_err(|e| PipelineError::IoError(format!("Failed to get file metadata for {}: {}", path.display(), e)))
237    }
238}
239
240#[async_trait]
241impl FileIOService for TokioFileIO {
242    async fn read_file_chunks(&self, path: &Path, options: ReadOptions) -> Result<ReadResult, PipelineError> {
243        let start_time = std::time::Instant::now();
244        let metadata = self.get_file_metadata(path).await?;
245        let file_size = metadata.len();
246
247        // Determine if we should use memory mapping
248        if options.use_memory_mapping && self.should_use_mmap(file_size) {
249            return self.read_file_mmap(path, options).await;
250        }
251
252        // Use regular file I/O
253        let chunk_size = options.chunk_size.unwrap_or(self.config.read().default_chunk_size);
254        let mut file = fs::File::open(path)
255            .await
256            .map_err(|e| PipelineError::IoError(format!("Failed to open file {}: {}", path.display(), e)))?;
257
258        if let Some(offset) = options.start_offset {
259            file.seek(SeekFrom::Start(offset))
260                .await
261                .map_err(|e| PipelineError::IoError(format!("Failed to seek to offset {}: {}", offset, e)))?;
262        }
263
264        let mut chunks = Vec::new();
265        let mut buffer = vec![0u8; chunk_size];
266        let mut current_offset = options.start_offset.unwrap_or(0);
267        let mut sequence = 0u64;
268        let mut total_read = 0u64;
269
270        let max_bytes = options.max_bytes.unwrap_or(file_size);
271
272        loop {
273            if total_read >= max_bytes {
274                break;
275            }
276
277            let bytes_to_read = ((max_bytes - total_read) as usize).min(chunk_size);
278            let bytes_read = file
279                .read(&mut buffer[..bytes_to_read])
280                .await
281                .map_err(|e| PipelineError::IoError(format!("Failed to read from file: {}", e)))?;
282
283            if bytes_read == 0 {
284                break;
285            }
286
287            let chunk_data = buffer[..bytes_read].to_vec();
288            let is_final = bytes_read < bytes_to_read || total_read + (bytes_read as u64) >= max_bytes;
289
290            let chunk = FileChunk::new(sequence, current_offset, chunk_data, is_final)?;
291
292            let chunk = if options.calculate_checksums {
293                chunk.with_calculated_checksum()?
294            } else {
295                chunk
296            };
297
298            chunks.push(chunk);
299            current_offset += bytes_read as u64;
300            total_read += bytes_read as u64;
301            sequence += 1;
302        }
303
304        let file_info = FileInfo {
305            path: path.to_path_buf(),
306            size: file_size,
307            is_memory_mapped: false,
308            modified_at: metadata.modified().unwrap_or(std::time::UNIX_EPOCH),
309            created_at: metadata.created().unwrap_or(std::time::UNIX_EPOCH),
310            permissions: 0o644, // Default permissions
311            mime_type: None,
312        };
313
314        self.update_stats(|stats| {
315            stats.bytes_read += total_read;
316            stats.chunks_processed += chunks.len() as u64;
317            stats.files_processed += 1;
318            stats.total_processing_time_ms += start_time.elapsed().as_millis() as u64;
319        });
320
321        Ok(ReadResult {
322            chunks,
323            file_info,
324            bytes_read: total_read,
325            complete: total_read >= file_size,
326        })
327    }
328
329    async fn read_file_mmap(&self, path: &Path, options: ReadOptions) -> Result<ReadResult, PipelineError> {
330        let start_time = std::time::Instant::now();
331        let metadata = self.get_file_metadata(path).await?;
332        let file_size = metadata.len();
333
334        let file = File::open(path)
335            .map_err(|e| PipelineError::IoError(format!("Failed to open file for mmap {}: {}", path.display(), e)))?;
336
337        let mmap = unsafe {
338            MmapOptions::new()
339                .map(&file)
340                .map_err(|e| PipelineError::IoError(format!("Failed to create memory map: {}", e)))?
341        };
342
343        let chunk_size = options.chunk_size.unwrap_or(self.config.read().default_chunk_size);
344        let start_offset = options.start_offset.unwrap_or(0);
345
346        let chunks = self.create_chunks_from_mmap(
347            &mmap,
348            chunk_size,
349            options.calculate_checksums,
350            start_offset,
351            options.max_bytes,
352        )?;
353
354        let bytes_read = chunks.iter().map(|c| c.data_len() as u64).sum();
355
356        let file_info = FileInfo {
357            path: path.to_path_buf(),
358            size: file_size,
359            is_memory_mapped: true,
360            modified_at: metadata.modified().unwrap_or(std::time::UNIX_EPOCH),
361            created_at: metadata.created().unwrap_or(std::time::UNIX_EPOCH),
362            permissions: 0o644,
363            mime_type: None,
364        };
365
366        self.update_stats(|stats| {
367            stats.bytes_read += bytes_read;
368            stats.chunks_processed += chunks.len() as u64;
369            stats.files_processed += 1;
370            stats.memory_mapped_files += 1;
371            stats.total_processing_time_ms += start_time.elapsed().as_millis() as u64;
372        });
373
374        Ok(ReadResult {
375            chunks,
376            file_info,
377            bytes_read,
378            complete: true,
379        })
380    }
381
382    async fn write_file_chunks(
383        &self,
384        path: &Path,
385        chunks: &[FileChunk],
386        options: WriteOptions,
387    ) -> Result<WriteResult, PipelineError> {
388        if options.create_dirs {
389            if let Some(parent) = path.parent() {
390                fs::create_dir_all(parent)
391                    .await
392                    .map_err(|e| PipelineError::IoError(format!("Failed to create directories: {}", e)))?;
393            }
394        }
395
396        let mut file = (if options.append {
397            fs::OpenOptions::new().create(true).append(true).open(path).await
398        } else {
399            fs::File::create(path).await
400        })
401        .map_err(|e| PipelineError::IoError(format!("Failed to create/open file {}: {}", path.display(), e)))?;
402
403        let mut total_written = 0u64;
404        let mut file_hasher = ring::digest::Context::new(&ring::digest::SHA256);
405
406        for chunk in chunks {
407            let data = chunk.data();
408            file.write_all(data)
409                .await
410                .map_err(|e| PipelineError::IoError(format!("Failed to write chunk: {}", e)))?;
411
412            if options.calculate_checksums {
413                file_hasher.update(data);
414            }
415
416            total_written += data.len() as u64;
417        }
418
419        if options.sync {
420            file.sync_all()
421                .await
422                .map_err(|e| PipelineError::IoError(format!("Failed to sync file: {}", e)))?;
423        }
424
425        let checksum = if options.calculate_checksums {
426            Some(hex::encode(file_hasher.finish().as_ref()))
427        } else {
428            None
429        };
430
431        self.update_stats(|stats| {
432            stats.bytes_written += total_written;
433            stats.chunks_processed += chunks.len() as u64;
434        });
435
436        Ok(WriteResult {
437            path: path.to_path_buf(),
438            bytes_written: total_written,
439            checksum,
440            success: true,
441        })
442    }
443
444    async fn write_file_data(
445        &self,
446        path: &Path,
447        data: &[u8],
448        options: WriteOptions,
449    ) -> Result<WriteResult, PipelineError> {
450        // Create a single chunk and use write_file_chunks
451        let chunk = FileChunk::new(0, 0, data.to_vec(), true)?;
452        self.write_file_chunks(path, &[chunk], options).await
453    }
454
455    async fn get_file_info(&self, path: &Path) -> Result<FileInfo, PipelineError> {
456        let metadata = self.get_file_metadata(path).await?;
457
458        Ok(FileInfo {
459            path: path.to_path_buf(),
460            size: metadata.len(),
461            is_memory_mapped: false,
462            modified_at: metadata.modified().unwrap_or(std::time::UNIX_EPOCH),
463            created_at: metadata.created().unwrap_or(std::time::UNIX_EPOCH),
464            permissions: 0o644,
465            mime_type: None,
466        })
467    }
468
469    async fn file_exists(&self, path: &Path) -> Result<bool, PipelineError> {
470        Ok(fs::metadata(path).await.is_ok())
471    }
472
473    async fn delete_file(&self, path: &Path) -> Result<(), PipelineError> {
474        fs::remove_file(path)
475            .await
476            .map_err(|e| PipelineError::IoError(format!("Failed to delete file {}: {}", path.display(), e)))
477    }
478
479    async fn copy_file(
480        &self,
481        source: &Path,
482        destination: &Path,
483        options: WriteOptions,
484    ) -> Result<WriteResult, PipelineError> {
485        let read_result = self.read_file_chunks(source, ReadOptions::default()).await?;
486        self.write_file_chunks(destination, &read_result.chunks, options).await
487    }
488
489    async fn move_file(
490        &self,
491        source: &Path,
492        destination: &Path,
493        options: WriteOptions,
494    ) -> Result<WriteResult, PipelineError> {
495        let result = self.copy_file(source, destination, options).await?;
496        self.delete_file(source).await?;
497        Ok(result)
498    }
499
500    async fn create_directory(&self, path: &Path) -> Result<(), PipelineError> {
501        fs::create_dir_all(path)
502            .await
503            .map_err(|e| PipelineError::IoError(format!("Failed to create directory {}: {}", path.display(), e)))
504    }
505
506    async fn directory_exists(&self, path: &Path) -> Result<bool, PipelineError> {
507        match fs::metadata(path).await {
508            Ok(metadata) => Ok(metadata.is_dir()),
509            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
510            Err(e) => Err(PipelineError::IoError(format!(
511                "Failed to check directory {}: {}",
512                path.display(),
513                e
514            ))),
515        }
516    }
517
518    async fn list_directory(&self, path: &Path) -> Result<Vec<FileInfo>, PipelineError> {
519        let mut entries = fs::read_dir(path)
520            .await
521            .map_err(|e| PipelineError::IoError(format!("Failed to read directory {}: {}", path.display(), e)))?;
522
523        let mut files = Vec::new();
524        while let Some(entry) = entries
525            .next_entry()
526            .await
527            .map_err(|e| PipelineError::IoError(format!("Failed to read directory entry: {}", e)))?
528        {
529            let metadata = entry
530                .metadata()
531                .await
532                .map_err(|e| PipelineError::IoError(format!("Failed to get entry metadata: {}", e)))?;
533
534            if metadata.is_file() {
535                files.push(FileInfo {
536                    path: entry.path(),
537                    size: metadata.len(),
538                    is_memory_mapped: false,
539                    modified_at: metadata.modified().unwrap_or(std::time::UNIX_EPOCH),
540                    created_at: metadata.created().unwrap_or(std::time::UNIX_EPOCH),
541                    permissions: 0o644,
542                    mime_type: None,
543                });
544            }
545        }
546
547        Ok(files)
548    }
549
550    fn get_config(&self) -> FileIOConfig {
551        self.config.read().clone()
552    }
553
554    fn update_config(&mut self, config: FileIOConfig) {
555        *self.config.write() = config;
556    }
557
558    fn get_stats(&self) -> FileIOStats {
559        self.stats.read().clone()
560    }
561
562    fn reset_stats(&mut self) {
563        *self.stats.write() = FileIOStats::default();
564    }
565
566    async fn validate_file_integrity(&self, path: &Path, expected_checksum: &str) -> Result<bool, PipelineError> {
567        let calculated_checksum = self.calculate_file_checksum(path).await?;
568        Ok(calculated_checksum == expected_checksum)
569    }
570
571    async fn calculate_file_checksum(&self, path: &Path) -> Result<String, PipelineError> {
572        let read_result = self
573            .read_file_chunks(
574                path,
575                ReadOptions {
576                    calculate_checksums: false,
577                    ..Default::default()
578                },
579            )
580            .await?;
581
582        let mut hasher = ring::digest::Context::new(&ring::digest::SHA256);
583        for chunk in &read_result.chunks {
584            hasher.update(chunk.data());
585        }
586
587        Ok(hex::encode(hasher.finish().as_ref()))
588    }
589
590    async fn stream_file_chunks(
591        &self,
592        path: &Path,
593        options: ReadOptions,
594    ) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<FileChunk, PipelineError>> + Send>>, PipelineError>
595    {
596        let chunk_size = options.chunk_size.unwrap_or(self.config.read().default_chunk_size);
597        let file = fs::File::open(path)
598            .await
599            .map_err(|e| PipelineError::IoError(format!("Failed to open file {}: {}", path.display(), e)))?;
600
601        let file = if let Some(offset) = options.start_offset {
602            let mut f = file;
603            f.seek(std::io::SeekFrom::Start(offset))
604                .await
605                .map_err(|e| PipelineError::IoError(format!("Failed to seek to offset {}: {}", offset, e)))?;
606            f
607        } else {
608            file
609        };
610
611        // Create state for the stream
612        struct StreamState {
613            file: fs::File,
614            buffer: Vec<u8>,
615            current_offset: u64,
616            sequence: u64,
617            total_read: u64,
618            max_bytes: u64,
619            calculate_checksums: bool,
620        }
621
622        let state = StreamState {
623            file,
624            buffer: vec![0u8; chunk_size],
625            current_offset: options.start_offset.unwrap_or(0),
626            sequence: 0,
627            total_read: 0,
628            max_bytes: options.max_bytes.unwrap_or(u64::MAX),
629            calculate_checksums: options.calculate_checksums,
630        };
631
632        let stream = futures::stream::unfold(state, |mut state| async move {
633            if state.total_read >= state.max_bytes {
634                return None;
635            }
636
637            let bytes_to_read = std::cmp::min(state.buffer.len(), (state.max_bytes - state.total_read) as usize);
638            state.buffer.resize(bytes_to_read, 0);
639
640            match state.file.read(&mut state.buffer[..bytes_to_read]).await {
641                Ok(0) => None, // EOF
642                Ok(bytes_read) => {
643                    state.buffer.truncate(bytes_read);
644                    let is_final =
645                        bytes_read < bytes_to_read || state.total_read + (bytes_read as u64) >= state.max_bytes;
646
647                    match FileChunk::new(state.sequence, state.current_offset, state.buffer.clone(), is_final) {
648                        Ok(chunk) => {
649                            let chunk = if state.calculate_checksums {
650                                match chunk.with_calculated_checksum() {
651                                    Ok(c) => c,
652                                    Err(e) => {
653                                        return Some((Err(e), state));
654                                    }
655                                }
656                            } else {
657                                chunk
658                            };
659
660                            state.current_offset += bytes_read as u64;
661                            state.sequence += 1;
662                            state.total_read += bytes_read as u64;
663
664                            Some((Ok(chunk), state))
665                        }
666                        Err(e) => Some((Err(e), state)),
667                    }
668                }
669                Err(e) => Some((
670                    Err(PipelineError::IoError(format!("Failed to read chunk: {}", e))),
671                    state,
672                )),
673            }
674        });
675
676        Ok(Box::pin(stream))
677    }
678
679    async fn write_chunk_to_file(
680        &self,
681        path: &Path,
682        chunk: &FileChunk,
683        options: WriteOptions,
684        is_first_chunk: bool,
685    ) -> Result<WriteResult, PipelineError> {
686        let start_time = std::time::Instant::now();
687
688        // Create parent directories if needed
689        if options.create_dirs {
690            if let Some(parent) = path.parent() {
691                fs::create_dir_all(parent).await.map_err(|e| {
692                    PipelineError::IoError(format!("Failed to create directories for {}: {}", path.display(), e))
693                })?;
694            }
695        }
696
697        // Open file in append mode for subsequent chunks, create/truncate for first
698        // chunk
699        let file = (if is_first_chunk {
700            fs::OpenOptions::new()
701                .create(true)
702                .write(true)
703                .truncate(true)
704                .open(path)
705                .await
706        } else {
707            fs::OpenOptions::new()
708                .create(true)
709                .write(true)
710                .append(true)
711                .open(path)
712                .await
713        })
714        .map_err(|e| PipelineError::IoError(format!("Failed to open file {} for writing: {}", path.display(), e)))?;
715
716        let mut file = file;
717        file.write_all(chunk.data())
718            .await
719            .map_err(|e| PipelineError::IoError(format!("Failed to write chunk to {}: {}", path.display(), e)))?;
720
721        if options.sync {
722            file.sync_all()
723                .await
724                .map_err(|e| PipelineError::IoError(format!("Failed to sync file {}: {}", path.display(), e)))?;
725        }
726
727        let bytes_written = chunk.data().len() as u64;
728        let write_time = start_time.elapsed();
729
730        // Update statistics
731        self.update_stats(|stats| {
732            stats.bytes_written += bytes_written;
733            stats.chunks_processed += 1;
734            stats.total_processing_time_ms += write_time.as_millis() as u64;
735        });
736
737        Ok(WriteResult {
738            path: path.to_path_buf(),
739            bytes_written,
740            success: true,
741            checksum: if options.calculate_checksums {
742                chunk.checksum().map(|c| c.to_string())
743            } else {
744                None
745            },
746        })
747    }
748}
749
750#[cfg(test)]
751mod tests {
752    use super::*;
753    use tempfile::NamedTempFile;
754    use tokio::io::AsyncWriteExt;
755
756    #[tokio::test]
757    async fn test_file_io_basic_operations() {
758        let service = TokioFileIO::new_default();
759
760        // Create a temporary file with enough data for 1MB minimum chunk size
761        let temp_file = NamedTempFile::new().unwrap();
762        let temp_path = temp_file.path().to_path_buf();
763        // Create 2MB of test data to ensure we have multiple chunks
764        let test_data = vec![b'A'; 2 * 1024 * 1024]; // 2MB of 'A' characters
765
766        // Write test data asynchronously
767        let mut file = tokio::fs::File::create(&temp_path).await.unwrap();
768        file.write_all(&test_data).await.unwrap();
769        file.flush().await.unwrap();
770        drop(file);
771
772        // Test reading
773        let read_result = service
774            .read_file_chunks(&temp_path, ReadOptions::default())
775            .await
776            .unwrap();
777
778        assert!(!read_result.chunks.is_empty());
779        assert_eq!(read_result.bytes_read, test_data.len() as u64);
780
781        // Test writing
782        let copy_path = temp_path.with_extension("copy");
783        let write_result = service
784            .write_file_data(&copy_path, &test_data, WriteOptions::default())
785            .await
786            .unwrap();
787
788        assert_eq!(write_result.bytes_written, test_data.len() as u64);
789        assert!(write_result.success);
790    }
791
792    #[tokio::test]
793    async fn test_memory_mapping() {
794        let service = TokioFileIO::new_default();
795
796        // Create a temporary file with enough data to trigger memory mapping
797        let temp_file = NamedTempFile::new().unwrap();
798        let temp_path = temp_file.path().to_path_buf();
799        // Create 3MB of test data to ensure memory mapping is used and we have multiple
800        // chunks
801        let test_data = vec![0u8; 3 * 1024 * 1024]; // 3MB of data
802
803        // Write test data asynchronously
804        let mut file = tokio::fs::File::create(&temp_path).await.unwrap();
805        file.write_all(&test_data).await.unwrap();
806        file.flush().await.unwrap();
807        drop(file);
808
809        // Test memory-mapped reading
810        let read_result = service
811            .read_file_mmap(&temp_path, ReadOptions::default())
812            .await
813            .unwrap();
814
815        assert!(!read_result.chunks.is_empty());
816        assert!(read_result.file_info.is_memory_mapped);
817        assert_eq!(read_result.bytes_read, test_data.len() as u64);
818    }
819}