use crate::error::Result;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;
pub struct OptimizedFileReader {
reader: Box<dyn BufRead>,
file_size: Option<u64>,
buffer_size: usize,
}
impl OptimizedFileReader {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
let file = File::open(&path)?;
let file_size = file.metadata()?.len();
let buffer_size = Self::optimal_buffer_size(file_size);
Ok(Self {
reader: Box::new(BufReader::with_capacity(buffer_size, file)),
file_size: Some(file_size),
buffer_size,
})
}
pub fn from_stdin() -> Self {
let stdin = std::io::stdin();
Self {
reader: Box::new(stdin.lock()),
file_size: None,
buffer_size: 64 * 1024, }
}
fn optimal_buffer_size(file_size: u64) -> usize {
match file_size {
0..=1_000_000 => 8 * 1024, 1_000_001..=10_000_000 => 32 * 1024, _ => 128 * 1024, }
}
pub fn should_use_streaming(&self) -> bool {
const MAX_MEMORY_SIZE: u64 = 100 * 1024 * 1024;
if let Some(size) = self.file_size {
size > MAX_MEMORY_SIZE
} else {
true }
}
pub fn read_lines_streaming<F, T>(&mut self, mut processor: F) -> Result<Vec<T>>
where
F: FnMut(String) -> Result<Option<T>>,
{
let mut results = Vec::new();
let mut line = String::new();
loop {
line.clear();
match self.reader.read_line(&mut line)? {
0 => break, _ => {
let trimmed_line = line.trim_end().to_string();
if let Some(result) = processor(trimmed_line)? {
results.push(result);
}
}
}
}
Ok(results)
}
pub fn read_lines_batched<F, T>(
&mut self,
batch_size: usize,
mut processor: F,
) -> Result<Vec<T>>
where
F: FnMut(Vec<String>) -> Result<Vec<T>>,
{
let mut results = Vec::new();
let mut batch = Vec::with_capacity(batch_size);
let mut line = String::new();
loop {
line.clear();
match self.reader.read_line(&mut line)? {
0 => break, _ => {
let trimmed_line = line.trim_end().to_string();
batch.push(trimmed_line);
if batch.len() >= batch_size {
let mut batch_results = processor(batch)?;
results.append(&mut batch_results);
batch = Vec::with_capacity(batch_size);
}
}
}
}
if !batch.is_empty() {
let mut batch_results = processor(batch)?;
results.append(&mut batch_results);
}
Ok(results)
}
pub fn file_size(&self) -> Option<u64> {
self.file_size
}
pub fn buffer_size(&self) -> usize {
self.buffer_size
}
}
pub fn estimate_memory_usage_for_processing(file_size: Option<u64>, data_points: usize) -> usize {
const BASELINE_OVERHEAD: usize = 1024 * 1024;
let file_memory = if let Some(size) = file_size {
let multiplier = if size > 10_000_000 { 1.5 } else { 2.0 };
(size as f64 * multiplier) as usize
} else {
data_points * 32 };
file_memory + BASELINE_OVERHEAD
}
#[derive(Debug, Clone)]
pub enum ProcessingStrategy {
InMemory, Streaming, BatchedStreaming {
batch_size: usize,
},
}
impl ProcessingStrategy {
pub fn select_optimal(file_size: Option<u64>, estimated_data_points: usize) -> Self {
const SMALL_THRESHOLD: u64 = 1_000_000; const LARGE_THRESHOLD: u64 = 100_000_000;
if let Some(size) = file_size {
if size < SMALL_THRESHOLD {
ProcessingStrategy::InMemory
} else if size < LARGE_THRESHOLD {
ProcessingStrategy::Streaming
} else {
let batch_size = if size > 1_000_000_000 {
10000 } else {
5000 };
ProcessingStrategy::BatchedStreaming { batch_size }
}
} else {
if estimated_data_points < 10000 {
ProcessingStrategy::InMemory
} else {
ProcessingStrategy::Streaming
}
}
}
}