1use memmap2::{Mmap, MmapOptions};
2use rayon::prelude::*;
3use std::cmp::min;
4use std::fs::File;
5use std::io::{BufReader, Read, Seek, SeekFrom};
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use tracing::{debug, info, trace, warn};
9
10use crate::errors::{Result, WinxError};
11
12pub const DIRECT_READ_THRESHOLD: u64 = 10_000_000;
14
15pub const MAX_MMAP_SIZE: u64 = 1_000_000_000;
17
18pub const MAX_SEGMENTED_MMAP_SIZE: u64 = 4_000_000_000;
20
21pub const SEGMENT_SIZE: u64 = 256_000_000;
23
24const DIRECT_READ_CHUNK_SIZE: usize = 1_048_576;
25const MMAP_PARALLEL_CHUNK_SIZE: usize = 1_048_576;
26const STREAMING_CHUNK_SIZE: usize = 4_194_304;
27
28pub fn read_file_optimized(path: &Path, max_file_size: u64) -> Result<Vec<u8>> {
49 let file = File::open(path).map_err(|e| WinxError::FileAccessError {
51 path: path.to_path_buf(),
52 message: format!("Error opening file: {e}"),
53 })?;
54
55 let metadata = file.metadata().map_err(|e| WinxError::FileAccessError {
56 path: path.to_path_buf(),
57 message: format!("Failed to get file metadata: {e}"),
58 })?;
59
60 let file_size = metadata.len();
62 if file_size > max_file_size {
63 return Err(WinxError::FileTooLarge {
64 path: path.to_path_buf(),
65 size: file_size,
66 max_size: max_file_size,
67 });
68 }
69
70 if file_size < DIRECT_READ_THRESHOLD {
72 debug!("Using direct read for file: {}", path.display());
73 read_direct(&file, file_size, path)
74 } else if file_size < MAX_MMAP_SIZE {
75 debug!("Using memory-mapped read for file: {}", path.display());
76 read_mmap(&file, path)
77 } else if file_size < MAX_SEGMENTED_MMAP_SIZE {
78 debug!("Using segmented memory-mapped read for file: {}", path.display());
79 read_segmented_mmap(&file, file_size, path)
80 } else {
81 debug!("Using streaming read for extremely large file: {}", path.display());
82 read_streaming(&file, file_size, path)
83 }
84}
85
86fn read_direct(file: &File, file_size: u64, path: &Path) -> Result<Vec<u8>> {
104 if file_size < 1_000_000 {
106 let mut buffer = Vec::with_capacity(file_size as usize);
108
109 let mut file_handle = file.try_clone().map_err(|e| WinxError::FileAccessError {
111 path: path.to_path_buf(),
112 message: format!("Error cloning file handle: {e}"),
113 })?;
114
115 file_handle.seek(SeekFrom::Start(0)).map_err(|e| WinxError::FileAccessError {
116 path: path.to_path_buf(),
117 message: format!("Error seeking to start of file: {e}"),
118 })?;
119
120 let mut reader = BufReader::with_capacity(min(file_size as usize, 64 * 1024), file_handle);
122
123 reader.read_to_end(&mut buffer).map_err(|e| WinxError::FileAccessError {
125 path: path.to_path_buf(),
126 message: format!("Error reading file: {e}"),
127 })?;
128
129 return Ok(buffer);
130 }
131
132 let mut buffer = Vec::with_capacity(file_size as usize);
134
135 let mut file_handle = file.try_clone().map_err(|e| WinxError::FileAccessError {
137 path: path.to_path_buf(),
138 message: format!("Error cloning file handle: {e}"),
139 })?;
140
141 file_handle.seek(SeekFrom::Start(0)).map_err(|e| WinxError::FileAccessError {
142 path: path.to_path_buf(),
143 message: format!("Error seeking to start of file: {e}"),
144 })?;
145
146 let mut reader = BufReader::with_capacity(262_144, file_handle); let mut chunk = vec![0; DIRECT_READ_CHUNK_SIZE];
149 let mut bytes_read = 0;
150
151 loop {
152 match reader.read(&mut chunk) {
153 Ok(0) => break, Ok(n) => {
155 buffer.extend_from_slice(&chunk[..n]);
156 bytes_read += n as u64;
157
158 if file_size > 5_000_000 && bytes_read % 5_000_000 < DIRECT_READ_CHUNK_SIZE as u64 {
160 trace!(
161 "Read progress for {}: {}MB/{}MB ({}%)",
162 path.display(),
163 bytes_read / 1_000_000,
164 file_size / 1_000_000,
165 bytes_read * 100 / file_size
166 );
167 }
168 }
169 Err(e) => {
170 return Err(WinxError::FileAccessError {
171 path: path.to_path_buf(),
172 message: format!("Error reading file chunk: {e}"),
173 });
174 }
175 }
176 }
177
178 Ok(buffer)
179}
180
181fn read_mmap(file: &File, path: &Path) -> Result<Vec<u8>> {
199 if file.metadata().map(|m| m.len()).unwrap_or(0) == 0 {
201 return Ok(Vec::new());
202 }
203
204 let mmap = unsafe { MmapOptions::new().map(file) }.map_err(|e| WinxError::FileAccessError {
212 path: path.to_path_buf(),
213 message: format!("Failed to memory-map file: {e}"),
214 })?;
215
216 if mmap.len() > 10_000_000 {
218 debug!("Using parallel processing for large mmap file: {}", path.display());
220
221 let chunk_count = mmap.len().div_ceil(MMAP_PARALLEL_CHUNK_SIZE);
223 let mut result = vec![0; mmap.len()];
224
225 let chunks: Vec<_> = (0..chunk_count)
227 .into_par_iter()
228 .map(|i| {
229 let start = i * MMAP_PARALLEL_CHUNK_SIZE;
230 let end = min((i + 1) * MMAP_PARALLEL_CHUNK_SIZE, mmap.len());
231
232 if start < mmap.len() {
233 let src = &mmap[start..end];
235 (start, end, src.to_vec())
236 } else {
237 (start, start, Vec::new())
238 }
239 })
240 .collect();
241
242 for (start, end, chunk) in chunks {
244 if start < end {
245 result[start..end].copy_from_slice(&chunk);
246 }
247 }
248
249 Ok(result)
250 } else {
251 Ok(mmap.to_vec())
253 }
254}
255
256fn read_segmented_mmap(_file: &File, file_size: u64, path: &Path) -> Result<Vec<u8>> {
275 let segment_count = file_size.div_ceil(SEGMENT_SIZE);
277 debug!(
278 "Reading file {} in {} segments of {}MB each",
279 path.display(),
280 segment_count,
281 SEGMENT_SIZE / 1_000_000
282 );
283
284 let mut result = Vec::with_capacity(file_size as usize);
286
287 for i in 0..segment_count {
289 let segment_start = i * SEGMENT_SIZE;
290 let segment_size = min(SEGMENT_SIZE, file_size - segment_start);
291
292 info!(
293 "Processing segment {}/{} of file {} ({:.1}%)",
294 i + 1,
295 segment_count,
296 path.display(),
297 (segment_start as f64 / file_size as f64) * 100.0
298 );
299
300 let segment_file = File::open(path).map_err(|e| WinxError::FileAccessError {
302 path: path.to_path_buf(),
303 message: format!("Error opening file for segment {i}: {e}"),
304 })?;
305
306 let mut segment_file = segment_file;
308 segment_file.seek(SeekFrom::Start(segment_start)).map_err(|e| {
309 WinxError::FileAccessError {
310 path: path.to_path_buf(),
311 message: format!("Error seeking to segment start: {e}"),
312 }
313 })?;
314
315 let segment_mmap = unsafe {
320 MmapOptions::new().offset(segment_start).len(segment_size as usize).map(&segment_file)
321 }
322 .map_err(|e| WinxError::FileAccessError {
323 path: path.to_path_buf(),
324 message: format!("Failed to memory-map file segment {i}: {e}"),
325 })?;
326
327 result.extend_from_slice(&segment_mmap);
329 }
330
331 Ok(result)
332}
333
334fn read_streaming(file: &File, file_size: u64, path: &Path) -> Result<Vec<u8>> {
353 warn!(
354 "Reading extremely large file ({}GB) with streaming approach: {}",
355 file_size / 1_000_000_000,
356 path.display()
357 );
358
359 let initial_capacity = min(file_size as usize, 1_000_000_000); let mut buffer = Vec::with_capacity(initial_capacity);
362
363 let mut reader = BufReader::with_capacity(STREAMING_CHUNK_SIZE, file);
364 let mut chunk = vec![0; STREAMING_CHUNK_SIZE];
365 let mut bytes_read = 0;
366
367 loop {
368 match reader.read(&mut chunk) {
369 Ok(0) => break, Ok(n) => {
371 buffer.extend_from_slice(&chunk[..n]);
372 bytes_read += n as u64;
373
374 if bytes_read % 100_000_000 < STREAMING_CHUNK_SIZE as u64 {
376 info!(
377 "Read progress for large file {}: {:.2}GB/{:.2}GB ({:.1}%)",
378 path.display(),
379 bytes_read as f64 / 1_000_000_000.0,
380 file_size as f64 / 1_000_000_000.0,
381 bytes_read as f64 * 100.0 / file_size as f64
382 );
383 }
384 }
385 Err(e) => {
386 return Err(WinxError::FileAccessError {
387 path: path.to_path_buf(),
388 message: format!("Error reading file chunk at position {bytes_read}: {e}"),
389 });
390 }
391 }
392 }
393
394 Ok(buffer)
395}
396
397pub fn read_file_segment(
417 path: &Path,
418 offset: u64,
419 length: u64,
420 max_file_size: u64,
421) -> Result<Vec<u8>> {
422 let file = File::open(path).map_err(|e| WinxError::FileAccessError {
424 path: path.to_path_buf(),
425 message: format!("Error opening file: {e}"),
426 })?;
427
428 let metadata = file.metadata().map_err(|e| WinxError::FileAccessError {
429 path: path.to_path_buf(),
430 message: format!("Failed to get file metadata: {e}"),
431 })?;
432
433 let file_size = metadata.len();
435 if file_size > max_file_size {
436 return Err(WinxError::FileTooLarge {
437 path: path.to_path_buf(),
438 size: file_size,
439 max_size: max_file_size,
440 });
441 }
442
443 if offset >= file_size {
445 return Err(WinxError::FileAccessError {
446 path: path.to_path_buf(),
447 message: format!("Offset {offset} exceeds file size {file_size}"),
448 });
449 }
450
451 let length = min(length, file_size - offset);
453
454 if length < DIRECT_READ_THRESHOLD {
456 debug!("Using direct read for file segment: {}", path.display());
457 read_segment_direct(&file, offset, length, path)
458 } else {
459 debug!("Using memory-mapped read for file segment: {}", path.display());
460 read_segment_mmap(&file, offset, length, path)
461 }
462}
463
464fn read_segment_direct(file: &File, offset: u64, length: u64, path: &Path) -> Result<Vec<u8>> {
481 let mut seekable_file = file.try_clone().map_err(|e| WinxError::FileAccessError {
483 path: path.to_path_buf(),
484 message: format!("Failed to clone file handle: {e}"),
485 })?;
486
487 seekable_file.seek(SeekFrom::Start(offset)).map_err(|e| WinxError::FileAccessError {
489 path: path.to_path_buf(),
490 message: format!("Failed to seek to offset {offset}: {e}"),
491 })?;
492
493 let mut buffer = Vec::with_capacity(length as usize);
495 let reader = BufReader::with_capacity(min(length as usize, 64 * 1024), seekable_file);
496
497 reader.take(length).read_to_end(&mut buffer).map_err(|e| WinxError::FileAccessError {
499 path: path.to_path_buf(),
500 message: format!("Error reading file segment: {e}"),
501 })?;
502
503 Ok(buffer)
504}
505
506fn read_segment_mmap(file: &File, offset: u64, length: u64, path: &Path) -> Result<Vec<u8>> {
523 let segment_mmap = unsafe { MmapOptions::new().offset(offset).len(length as usize).map(file) }
528 .map_err(|e| WinxError::FileAccessError {
529 path: path.to_path_buf(),
530 message: format!("Failed to memory-map file segment: {e}"),
531 })?;
532
533 Ok(segment_mmap.to_vec())
535}
536
537pub fn read_file_to_string(path: &Path, max_file_size: u64) -> Result<String> {
555 let bytes = read_file_optimized(path, max_file_size)?;
556
557 String::from_utf8(bytes).map_err(|e| WinxError::FileAccessError {
558 path: path.to_path_buf(),
559 message: format!("Failed to decode file as UTF-8: {e}"),
560 })
561}
562
563pub fn process_text_file_parallel<F>(
582 path: &Path,
583 max_file_size: u64,
584 line_processor: F,
585) -> Result<()>
586where
587 F: Fn(&str) + Sync,
588{
589 let content = read_file_to_string(path, max_file_size)?;
590
591 if content.len() > 1_000_000 {
593 content.par_lines().for_each(|line| {
595 line_processor(line);
596 });
597 } else {
598 content.lines().for_each(|line| {
600 line_processor(line);
601 });
602 }
603
604 Ok(())
605}
606
607pub fn read_file_segment_to_string(
624 path: &Path,
625 offset: u64,
626 length: u64,
627 max_file_size: u64,
628) -> Result<String> {
629 let bytes = read_file_segment(path, offset, length, max_file_size)?;
630
631 String::from_utf8(bytes).map_err(|e| WinxError::FileAccessError {
632 path: path.to_path_buf(),
633 message: format!("Failed to decode file segment as UTF-8: {e}"),
634 })
635}
636
637#[derive(Clone)]
642pub struct ShareableMap {
643 data: Arc<Mmap>,
645 path: PathBuf,
647}
648
649impl ShareableMap {
650 pub fn new(path: &Path) -> Result<Self> {
664 let file = File::open(path).map_err(|e| WinxError::FileAccessError {
665 path: path.to_path_buf(),
666 message: format!("Error opening file: {e}"),
667 })?;
668
669 if file
671 .metadata()
672 .map_err(|e| WinxError::FileAccessError {
673 path: path.to_path_buf(),
674 message: format!("Failed to get metadata: {e}"),
675 })?
676 .len()
677 == 0
678 {
679 return Err(WinxError::FileAccessError {
680 path: path.to_path_buf(),
681 message: "Cannot memory map empty file".to_string(),
682 });
683 }
684
685 let mmap =
690 unsafe { MmapOptions::new().map(&file) }.map_err(|e| WinxError::FileAccessError {
691 path: path.to_path_buf(),
692 message: format!("Failed to memory-map file: {e}"),
693 })?;
694
695 Ok(Self { data: Arc::new(mmap), path: path.to_path_buf() })
696 }
697
698 pub fn new_segment(path: &Path, offset: u64, length: u64) -> Result<Self> {
714 if length == 0 {
715 return Err(WinxError::FileAccessError {
716 path: path.to_path_buf(),
717 message: "Cannot memory map segment of length 0".to_string(),
718 });
719 }
720
721 let file = File::open(path).map_err(|e| WinxError::FileAccessError {
722 path: path.to_path_buf(),
723 message: format!("Error opening file: {e}"),
724 })?;
725
726 let mmap = unsafe { MmapOptions::new().offset(offset).len(length as usize).map(&file) }
730 .map_err(|e| WinxError::FileAccessError {
731 path: path.to_path_buf(),
732 message: format!("Failed to memory-map file segment: {e}"),
733 })?;
734
735 Ok(Self { data: Arc::new(mmap), path: path.to_path_buf() })
736 }
737
738 pub fn as_slice(&self) -> &[u8] {
740 &self.data
741 }
742
743 pub fn path(&self) -> &Path {
745 &self.path
746 }
747
748 pub fn len(&self) -> usize {
750 self.data.len()
751 }
752
753 pub fn is_empty(&self) -> bool {
755 self.data.is_empty()
756 }
757}
758
759#[cfg(test)]
760mod tests {
761 use super::*;
762 use std::io::Write;
763 use tempfile::NamedTempFile;
764
765 fn create_test_file(size: usize) -> Result<(NamedTempFile, Vec<u8>)> {
766 let mut file = NamedTempFile::new()?;
767 let mut data = Vec::with_capacity(size);
768
769 for i in 0..size {
771 data.push((i % 256) as u8);
772 }
773
774 file.write_all(&data)?;
775 file.flush()?;
776
777 Ok((file, data))
778 }
779
780 #[test]
781 fn test_direct_read_small_file() -> Result<()> {
782 let size = 10 * 1024; let (file, expected_data) = create_test_file(size)?;
784
785 let result = read_direct(file.as_file(), size as u64, file.path())?;
786 assert_eq!(result, expected_data);
787 Ok(())
788 }
789
790 #[test]
791 fn test_mmap_read() -> Result<()> {
792 let size = 1024 * 1024; let (file, expected_data) = create_test_file(size)?;
794
795 let result = read_mmap(file.as_file(), file.path())?;
796 assert_eq!(result, expected_data);
797 Ok(())
798 }
799
800 #[test]
801 fn test_file_segment_read() -> Result<()> {
802 let size = 1024 * 1024; let (file, data) = create_test_file(size)?;
804
805 let offset = 100 * 1024; let length = 200 * 1024; let expected_segment = &data[offset as usize..(offset + length) as usize];
809
810 let result = read_segment_direct(file.as_file(), offset, length, file.path())?;
811 assert_eq!(result, expected_segment);
812
813 let result = read_segment_mmap(file.as_file(), offset, length, file.path())?;
814 assert_eq!(result, expected_segment);
815 Ok(())
816 }
817
818 #[test]
819 fn test_shareable_map() -> Result<()> {
820 let size = 100 * 1024; let (file, data) = create_test_file(size)?;
822
823 let map = ShareableMap::new(file.path())?;
824 assert_eq!(map.as_slice(), &data);
825
826 let offset = 10 * 1024; let length = 20 * 1024; let segment_map = ShareableMap::new_segment(file.path(), offset, length)?;
830 assert_eq!(segment_map.as_slice(), &data[offset as usize..(offset + length) as usize]);
831 Ok(())
832 }
833
834 #[test]
835 fn test_parallel_processing() -> Result<()> {
836 let mut file = NamedTempFile::new()?;
838 let mut lines = Vec::new();
839
840 for i in 0..1000 {
841 let line = format!("Line {i}\n");
842 file.write_all(line.as_bytes())?;
843 lines.push(format!("Line {i}"));
844 }
845 file.flush()?;
846
847 let processed_lines = std::sync::Mutex::new(Vec::new());
849
850 process_text_file_parallel(file.path(), 1_000_000, |line| {
851 if let Ok(mut lines) = processed_lines.lock() {
852 lines.push(line.to_string());
853 }
854 })?;
855
856 let result =
858 processed_lines.lock().map_err(|error| WinxError::ResourceAllocationError {
859 message: format!("Failed to lock processed lines: {error}"),
860 })?;
861 assert_eq!(result.len(), lines.len());
862
863 for line in &lines {
865 assert!(result.contains(line));
866 }
867 Ok(())
868 }
869}