use memmap2::{Mmap, MmapOptions};
use rayon::prelude::*;
use std::cmp::min;
use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tracing::{debug, info, trace, warn};
use crate::errors::{Result, WinxError};
pub const DIRECT_READ_THRESHOLD: u64 = 10_000_000;
pub const MAX_MMAP_SIZE: u64 = 1_000_000_000;
pub const MAX_SEGMENTED_MMAP_SIZE: u64 = 4_000_000_000;
pub const SEGMENT_SIZE: u64 = 256_000_000;
const DIRECT_READ_CHUNK_SIZE: usize = 1_048_576;
const MMAP_PARALLEL_CHUNK_SIZE: usize = 1_048_576;
const STREAMING_CHUNK_SIZE: usize = 4_194_304;
pub fn read_file_optimized(path: &Path, max_file_size: u64) -> Result<Vec<u8>> {
let file = File::open(path).map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error opening file: {e}"),
})?;
let metadata = file.metadata().map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Failed to get file metadata: {e}"),
})?;
let file_size = metadata.len();
if file_size > max_file_size {
return Err(WinxError::FileTooLarge {
path: path.to_path_buf(),
size: file_size,
max_size: max_file_size,
});
}
if file_size < DIRECT_READ_THRESHOLD {
debug!("Using direct read for file: {}", path.display());
read_direct(&file, file_size, path)
} else if file_size < MAX_MMAP_SIZE {
debug!("Using memory-mapped read for file: {}", path.display());
read_mmap(&file, path)
} else if file_size < MAX_SEGMENTED_MMAP_SIZE {
debug!("Using segmented memory-mapped read for file: {}", path.display());
read_segmented_mmap(&file, file_size, path)
} else {
debug!("Using streaming read for extremely large file: {}", path.display());
read_streaming(&file, file_size, path)
}
}
fn read_direct(file: &File, file_size: u64, path: &Path) -> Result<Vec<u8>> {
if file_size < 1_000_000 {
let mut buffer = Vec::with_capacity(file_size as usize);
let mut file_handle = file.try_clone().map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error cloning file handle: {e}"),
})?;
file_handle.seek(SeekFrom::Start(0)).map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error seeking to start of file: {e}"),
})?;
let mut reader = BufReader::with_capacity(min(file_size as usize, 64 * 1024), file_handle);
reader.read_to_end(&mut buffer).map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error reading file: {e}"),
})?;
return Ok(buffer);
}
let mut buffer = Vec::with_capacity(file_size as usize);
let mut file_handle = file.try_clone().map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error cloning file handle: {e}"),
})?;
file_handle.seek(SeekFrom::Start(0)).map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error seeking to start of file: {e}"),
})?;
let mut reader = BufReader::with_capacity(262_144, file_handle);
let mut chunk = vec![0; DIRECT_READ_CHUNK_SIZE];
let mut bytes_read = 0;
loop {
match reader.read(&mut chunk) {
Ok(0) => break, Ok(n) => {
buffer.extend_from_slice(&chunk[..n]);
bytes_read += n as u64;
if file_size > 5_000_000 && bytes_read % 5_000_000 < DIRECT_READ_CHUNK_SIZE as u64 {
trace!(
"Read progress for {}: {}MB/{}MB ({}%)",
path.display(),
bytes_read / 1_000_000,
file_size / 1_000_000,
bytes_read * 100 / file_size
);
}
}
Err(e) => {
return Err(WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error reading file chunk: {e}"),
});
}
}
}
Ok(buffer)
}
fn read_mmap(file: &File, path: &Path) -> Result<Vec<u8>> {
if file.metadata().map(|m| m.len()).unwrap_or(0) == 0 {
return Ok(Vec::new());
}
let mmap = unsafe { MmapOptions::new().map(file) }.map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Failed to memory-map file: {e}"),
})?;
if mmap.len() > 10_000_000 {
debug!("Using parallel processing for large mmap file: {}", path.display());
let chunk_count = mmap.len().div_ceil(MMAP_PARALLEL_CHUNK_SIZE);
let mut result = vec![0; mmap.len()];
let chunks: Vec<_> = (0..chunk_count)
.into_par_iter()
.map(|i| {
let start = i * MMAP_PARALLEL_CHUNK_SIZE;
let end = min((i + 1) * MMAP_PARALLEL_CHUNK_SIZE, mmap.len());
if start < mmap.len() {
let src = &mmap[start..end];
(start, end, src.to_vec())
} else {
(start, start, Vec::new())
}
})
.collect();
for (start, end, chunk) in chunks {
if start < end {
result[start..end].copy_from_slice(&chunk);
}
}
Ok(result)
} else {
Ok(mmap.to_vec())
}
}
fn read_segmented_mmap(_file: &File, file_size: u64, path: &Path) -> Result<Vec<u8>> {
let segment_count = file_size.div_ceil(SEGMENT_SIZE);
debug!(
"Reading file {} in {} segments of {}MB each",
path.display(),
segment_count,
SEGMENT_SIZE / 1_000_000
);
let mut result = Vec::with_capacity(file_size as usize);
for i in 0..segment_count {
let segment_start = i * SEGMENT_SIZE;
let segment_size = min(SEGMENT_SIZE, file_size - segment_start);
info!(
"Processing segment {}/{} of file {} ({:.1}%)",
i + 1,
segment_count,
path.display(),
(segment_start as f64 / file_size as f64) * 100.0
);
let segment_file = File::open(path).map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error opening file for segment {i}: {e}"),
})?;
let mut segment_file = segment_file;
segment_file.seek(SeekFrom::Start(segment_start)).map_err(|e| {
WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error seeking to segment start: {e}"),
}
})?;
let segment_mmap = unsafe {
MmapOptions::new().offset(segment_start).len(segment_size as usize).map(&segment_file)
}
.map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Failed to memory-map file segment {i}: {e}"),
})?;
result.extend_from_slice(&segment_mmap);
}
Ok(result)
}
fn read_streaming(file: &File, file_size: u64, path: &Path) -> Result<Vec<u8>> {
warn!(
"Reading extremely large file ({}GB) with streaming approach: {}",
file_size / 1_000_000_000,
path.display()
);
let initial_capacity = min(file_size as usize, 1_000_000_000); let mut buffer = Vec::with_capacity(initial_capacity);
let mut reader = BufReader::with_capacity(STREAMING_CHUNK_SIZE, file);
let mut chunk = vec![0; STREAMING_CHUNK_SIZE];
let mut bytes_read = 0;
loop {
match reader.read(&mut chunk) {
Ok(0) => break, Ok(n) => {
buffer.extend_from_slice(&chunk[..n]);
bytes_read += n as u64;
if bytes_read % 100_000_000 < STREAMING_CHUNK_SIZE as u64 {
info!(
"Read progress for large file {}: {:.2}GB/{:.2}GB ({:.1}%)",
path.display(),
bytes_read as f64 / 1_000_000_000.0,
file_size as f64 / 1_000_000_000.0,
bytes_read as f64 * 100.0 / file_size as f64
);
}
}
Err(e) => {
return Err(WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error reading file chunk at position {bytes_read}: {e}"),
});
}
}
}
Ok(buffer)
}
pub fn read_file_segment(
path: &Path,
offset: u64,
length: u64,
max_file_size: u64,
) -> Result<Vec<u8>> {
let file = File::open(path).map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error opening file: {e}"),
})?;
let metadata = file.metadata().map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Failed to get file metadata: {e}"),
})?;
let file_size = metadata.len();
if file_size > max_file_size {
return Err(WinxError::FileTooLarge {
path: path.to_path_buf(),
size: file_size,
max_size: max_file_size,
});
}
if offset >= file_size {
return Err(WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Offset {offset} exceeds file size {file_size}"),
});
}
let length = min(length, file_size - offset);
if length < DIRECT_READ_THRESHOLD {
debug!("Using direct read for file segment: {}", path.display());
read_segment_direct(&file, offset, length, path)
} else {
debug!("Using memory-mapped read for file segment: {}", path.display());
read_segment_mmap(&file, offset, length, path)
}
}
fn read_segment_direct(file: &File, offset: u64, length: u64, path: &Path) -> Result<Vec<u8>> {
let mut seekable_file = file.try_clone().map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Failed to clone file handle: {e}"),
})?;
seekable_file.seek(SeekFrom::Start(offset)).map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Failed to seek to offset {offset}: {e}"),
})?;
let mut buffer = Vec::with_capacity(length as usize);
let reader = BufReader::with_capacity(min(length as usize, 64 * 1024), seekable_file);
reader.take(length).read_to_end(&mut buffer).map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error reading file segment: {e}"),
})?;
Ok(buffer)
}
fn read_segment_mmap(file: &File, offset: u64, length: u64, path: &Path) -> Result<Vec<u8>> {
let segment_mmap = unsafe { MmapOptions::new().offset(offset).len(length as usize).map(file) }
.map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Failed to memory-map file segment: {e}"),
})?;
Ok(segment_mmap.to_vec())
}
pub fn read_file_to_string(path: &Path, max_file_size: u64) -> Result<String> {
let bytes = read_file_optimized(path, max_file_size)?;
String::from_utf8(bytes).map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Failed to decode file as UTF-8: {e}"),
})
}
pub fn process_text_file_parallel<F>(
path: &Path,
max_file_size: u64,
line_processor: F,
) -> Result<()>
where
F: Fn(&str) + Sync,
{
let content = read_file_to_string(path, max_file_size)?;
if content.len() > 1_000_000 {
content.par_lines().for_each(|line| {
line_processor(line);
});
} else {
content.lines().for_each(|line| {
line_processor(line);
});
}
Ok(())
}
pub fn read_file_segment_to_string(
path: &Path,
offset: u64,
length: u64,
max_file_size: u64,
) -> Result<String> {
let bytes = read_file_segment(path, offset, length, max_file_size)?;
String::from_utf8(bytes).map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Failed to decode file segment as UTF-8: {e}"),
})
}
#[derive(Clone)]
pub struct ShareableMap {
data: Arc<Mmap>,
path: PathBuf,
}
impl ShareableMap {
pub fn new(path: &Path) -> Result<Self> {
let file = File::open(path).map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error opening file: {e}"),
})?;
if file
.metadata()
.map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Failed to get metadata: {e}"),
})?
.len()
== 0
{
return Err(WinxError::FileAccessError {
path: path.to_path_buf(),
message: "Cannot memory map empty file".to_string(),
});
}
let mmap =
unsafe { MmapOptions::new().map(&file) }.map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Failed to memory-map file: {e}"),
})?;
Ok(Self { data: Arc::new(mmap), path: path.to_path_buf() })
}
pub fn new_segment(path: &Path, offset: u64, length: u64) -> Result<Self> {
if length == 0 {
return Err(WinxError::FileAccessError {
path: path.to_path_buf(),
message: "Cannot memory map segment of length 0".to_string(),
});
}
let file = File::open(path).map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Error opening file: {e}"),
})?;
let mmap = unsafe { MmapOptions::new().offset(offset).len(length as usize).map(&file) }
.map_err(|e| WinxError::FileAccessError {
path: path.to_path_buf(),
message: format!("Failed to memory-map file segment: {e}"),
})?;
Ok(Self { data: Arc::new(mmap), path: path.to_path_buf() })
}
pub fn as_slice(&self) -> &[u8] {
&self.data
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
fn create_test_file(size: usize) -> Result<(NamedTempFile, Vec<u8>)> {
let mut file = NamedTempFile::new()?;
let mut data = Vec::with_capacity(size);
for i in 0..size {
data.push((i % 256) as u8);
}
file.write_all(&data)?;
file.flush()?;
Ok((file, data))
}
#[test]
fn test_direct_read_small_file() -> Result<()> {
let size = 10 * 1024; let (file, expected_data) = create_test_file(size)?;
let result = read_direct(file.as_file(), size as u64, file.path())?;
assert_eq!(result, expected_data);
Ok(())
}
#[test]
fn test_mmap_read() -> Result<()> {
let size = 1024 * 1024; let (file, expected_data) = create_test_file(size)?;
let result = read_mmap(file.as_file(), file.path())?;
assert_eq!(result, expected_data);
Ok(())
}
#[test]
fn test_file_segment_read() -> Result<()> {
let size = 1024 * 1024; let (file, data) = create_test_file(size)?;
let offset = 100 * 1024; let length = 200 * 1024; let expected_segment = &data[offset as usize..(offset + length) as usize];
let result = read_segment_direct(file.as_file(), offset, length, file.path())?;
assert_eq!(result, expected_segment);
let result = read_segment_mmap(file.as_file(), offset, length, file.path())?;
assert_eq!(result, expected_segment);
Ok(())
}
#[test]
fn test_shareable_map() -> Result<()> {
let size = 100 * 1024; let (file, data) = create_test_file(size)?;
let map = ShareableMap::new(file.path())?;
assert_eq!(map.as_slice(), &data);
let offset = 10 * 1024; let length = 20 * 1024; let segment_map = ShareableMap::new_segment(file.path(), offset, length)?;
assert_eq!(segment_map.as_slice(), &data[offset as usize..(offset + length) as usize]);
Ok(())
}
#[test]
fn test_parallel_processing() -> Result<()> {
let mut file = NamedTempFile::new()?;
let mut lines = Vec::new();
for i in 0..1000 {
let line = format!("Line {i}\n");
file.write_all(line.as_bytes())?;
lines.push(format!("Line {i}"));
}
file.flush()?;
let processed_lines = std::sync::Mutex::new(Vec::new());
process_text_file_parallel(file.path(), 1_000_000, |line| {
if let Ok(mut lines) = processed_lines.lock() {
lines.push(line.to_string());
}
})?;
let result =
processed_lines.lock().map_err(|error| WinxError::ResourceAllocationError {
message: format!("Failed to lock processed lines: {error}"),
})?;
assert_eq!(result.len(), lines.len());
for line in &lines {
assert!(result.contains(line));
}
Ok(())
}
}