use crossbeam_channel::{bounded, Receiver, Sender};
use rayon::prelude::*;
use std::collections::VecDeque;
use std::fs::File;
use std::io::{BufRead, BufReader, Read};
use std::path::{Path, PathBuf};
use std::thread;
#[derive(Debug, Clone)]
pub struct IoConfig {
pub buffer_size: usize,
pub prefetch_enabled: bool,
pub prefetch_queue_size: usize,
pub mmap_threshold: usize,
pub parallel_io: bool,
pub max_parallel_files: usize,
}
impl Default for IoConfig {
fn default() -> Self {
Self {
buffer_size: 64 * 1024,
prefetch_enabled: true,
prefetch_queue_size: 10,
mmap_threshold: 16 * 1024 * 1024,
parallel_io: true,
max_parallel_files: 4,
}
}
}
pub struct OptimizedFileReader {
config: IoConfig,
prefetch_sender: Option<Sender<PathBuf>>,
prefetch_receiver: Option<Receiver<PathBuf>>,
}
impl OptimizedFileReader {
pub fn new(config: IoConfig) -> Self {
let (prefetch_sender, prefetch_receiver) = if config.prefetch_enabled {
let (tx, rx) = bounded(config.prefetch_queue_size);
(Some(tx), Some(rx))
} else {
(None, None)
};
Self {
config,
prefetch_sender,
prefetch_receiver,
}
}
pub fn start_prefetching(&self) {
if let (Some(_sender), Some(_receiver)) = (&self.prefetch_sender, &self.prefetch_receiver) {
let _config = self.config.clone();
let receiver = _receiver.clone();
thread::spawn(move || {
let mut prefetched_files = VecDeque::new();
while let Ok(path) = receiver.recv() {
if let Ok(file) = File::open(&path) {
prefetched_files.push_back((path, file));
}
}
});
}
}
pub fn read_file(&self, path: &Path) -> std::io::Result<Vec<u8>> {
let metadata = std::fs::metadata(path)?;
let file_size = metadata.len() as usize;
if file_size > self.config.mmap_threshold {
self.read_file_mmap(path)
} else {
self.read_file_buffered(path)
}
}
fn read_file_mmap(&self, path: &Path) -> std::io::Result<Vec<u8>> {
use memmap2::Mmap;
let file = File::open(path)?;
let mmap = unsafe { Mmap::map(&file)? };
Ok(mmap.to_vec())
}
fn read_file_buffered(&self, path: &Path) -> std::io::Result<Vec<u8>> {
let file = File::open(path)?;
let mut reader = BufReader::with_capacity(self.config.buffer_size, file);
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer)?;
Ok(buffer)
}
pub fn read_lines(&self, path: &Path) -> std::io::Result<Vec<String>> {
let file = File::open(path)?;
let reader = BufReader::with_capacity(self.config.buffer_size, file);
let mut lines = Vec::new();
for line in reader.lines() {
lines.push(line?);
}
Ok(lines)
}
pub fn should_process_file(&self, path: &Path) -> bool {
if let Ok(metadata) = std::fs::metadata(path) {
let file_size = metadata.len() as usize;
if file_size > 100 * 1024 * 1024 {
return false;
}
}
if let Some(extension) = path.extension() {
if let Some(ext_str) = extension.to_str() {
match ext_str.to_lowercase().as_str() {
"exe" | "dll" | "so" | "bin" => return false,
_ => {}
}
}
}
true
}
}
pub struct ParallelFileProcessor {
config: IoConfig,
thread_pool: rayon::ThreadPool,
}
impl ParallelFileProcessor {
pub fn new(config: IoConfig) -> Self {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(config.max_parallel_files)
.build()
.unwrap();
Self {
config,
thread_pool,
}
}
pub fn process_files<F, R>(&self, paths: Vec<PathBuf>, processor: F) -> Vec<R>
where
F: Fn(PathBuf) -> R + Send + Sync,
R: Send,
{
self.thread_pool
.install(|| paths.into_par_iter().map(processor).collect())
}
pub fn process_files_with_error<F, R, E>(
&self,
paths: Vec<PathBuf>,
processor: F,
) -> Vec<Result<R, E>>
where
F: Fn(PathBuf) -> Result<R, E> + Send + Sync,
R: Send,
E: Send,
{
self.thread_pool
.install(|| paths.into_par_iter().map(processor).collect())
}
}
pub struct AdaptiveIoStrategy {
config: IoConfig,
file_reader: OptimizedFileReader,
parallel_processor: ParallelFileProcessor,
}
impl AdaptiveIoStrategy {
pub fn new(config: IoConfig) -> Self {
let file_reader = OptimizedFileReader::new(config.clone());
let parallel_processor = ParallelFileProcessor::new(config.clone());
Self {
config,
file_reader,
parallel_processor,
}
}
pub fn choose_strategy(&self, path: &Path) -> IoStrategy {
if let Ok(metadata) = std::fs::metadata(path) {
let file_size = metadata.len() as usize;
if file_size > self.config.mmap_threshold {
IoStrategy::MemoryMapped
} else if file_size > self.config.buffer_size * 4 {
IoStrategy::Buffered
} else {
IoStrategy::Direct
}
} else {
IoStrategy::Direct
}
}
pub fn process_file<F, R>(&self, path: PathBuf, processor: F) -> Result<R, std::io::Error>
where
F: FnOnce(Vec<u8>) -> R,
{
let strategy = self.choose_strategy(&path);
match strategy {
IoStrategy::MemoryMapped => {
let content = self.file_reader.read_file_mmap(&path)?;
Ok(processor(content))
}
IoStrategy::Buffered => {
let content = self.file_reader.read_file_buffered(&path)?;
Ok(processor(content))
}
IoStrategy::Direct => {
let content = std::fs::read(&path)?;
Ok(processor(content))
}
}
}
pub fn process_files_parallel<F, R>(
&self,
paths: Vec<PathBuf>,
processor: F,
) -> Vec<Result<R, std::io::Error>>
where
F: Fn(Vec<u8>) -> R + Send + Sync,
R: Send,
{
self.parallel_processor
.process_files_with_error(paths, |path| self.process_file(path, &processor))
}
}
#[derive(Debug, Clone, Copy)]
pub enum IoStrategy {
Direct,
Buffered,
MemoryMapped,
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use tempfile::TempDir;
#[test]
fn test_io_config_default() {
let config = IoConfig::default();
assert_eq!(config.buffer_size, 64 * 1024);
assert!(config.prefetch_enabled);
assert_eq!(config.prefetch_queue_size, 10);
}
#[test]
fn test_optimized_file_reader() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("test.txt");
fs::write(&test_file, "test content").unwrap();
let config = IoConfig::default();
let reader = OptimizedFileReader::new(config);
let content = reader.read_file(&test_file).unwrap();
assert_eq!(content, b"test content");
}
#[test]
fn test_adaptive_io_strategy() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("test.txt");
fs::write(&test_file, "test content").unwrap();
let config = IoConfig::default();
let strategy = AdaptiveIoStrategy::new(config);
let result = strategy
.process_file(test_file, |content| String::from_utf8(content).unwrap())
.unwrap();
assert_eq!(result, "test content");
}
#[test]
fn test_should_process_file() {
let config = IoConfig::default();
let reader = OptimizedFileReader::new(config);
let temp_dir = TempDir::new().unwrap();
let text_file = temp_dir.path().join("test.txt");
let exe_file = temp_dir.path().join("test.exe");
fs::write(&text_file, "content").unwrap();
fs::write(&exe_file, b"\x00\x01\x02").unwrap();
assert!(reader.should_process_file(&text_file));
assert!(!reader.should_process_file(&exe_file));
}
}