lawkit_core/common/
streaming_io.rs1use crate::error::Result;
2use std::fs::File;
3use std::io::{BufRead, BufReader};
4use std::path::Path;
5
6pub struct OptimizedFileReader {
8 reader: Box<dyn BufRead>,
9 file_size: Option<u64>,
10 buffer_size: usize,
11}
12
13impl OptimizedFileReader {
14 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
16 let file = File::open(&path)?;
17 let file_size = file.metadata()?.len();
18 let buffer_size = Self::optimal_buffer_size(file_size);
19
20 Ok(Self {
21 reader: Box::new(BufReader::with_capacity(buffer_size, file)),
22 file_size: Some(file_size),
23 buffer_size,
24 })
25 }
26
27 pub fn from_stdin() -> Self {
29 let stdin = std::io::stdin();
30 Self {
31 reader: Box::new(stdin.lock()),
32 file_size: None,
33 buffer_size: 64 * 1024, }
35 }
36
37 fn optimal_buffer_size(file_size: u64) -> usize {
39 match file_size {
40 0..=1_000_000 => 8 * 1024, 1_000_001..=10_000_000 => 32 * 1024, _ => 128 * 1024, }
44 }
45
46 pub fn should_use_streaming(&self) -> bool {
48 const MAX_MEMORY_SIZE: u64 = 100 * 1024 * 1024; if let Some(size) = self.file_size {
51 size > MAX_MEMORY_SIZE
52 } else {
53 true }
55 }
56
57 pub fn read_lines_streaming<F, T>(&mut self, mut processor: F) -> Result<Vec<T>>
59 where
60 F: FnMut(String) -> Result<Option<T>>,
61 {
62 let mut results = Vec::new();
63 let mut line = String::new();
64
65 loop {
66 line.clear();
67 match self.reader.read_line(&mut line)? {
68 0 => break, _ => {
70 let trimmed_line = line.trim_end().to_string();
71 if let Some(result) = processor(trimmed_line)? {
72 results.push(result);
73 }
74 }
75 }
76 }
77
78 Ok(results)
79 }
80
81 pub fn read_lines_batched<F, T>(
83 &mut self,
84 batch_size: usize,
85 mut processor: F,
86 ) -> Result<Vec<T>>
87 where
88 F: FnMut(Vec<String>) -> Result<Vec<T>>,
89 {
90 let mut results = Vec::new();
91 let mut batch = Vec::with_capacity(batch_size);
92 let mut line = String::new();
93
94 loop {
95 line.clear();
96 match self.reader.read_line(&mut line)? {
97 0 => break, _ => {
99 let trimmed_line = line.trim_end().to_string();
100 batch.push(trimmed_line);
101
102 if batch.len() >= batch_size {
103 let mut batch_results = processor(batch)?;
104 results.append(&mut batch_results);
105 batch = Vec::with_capacity(batch_size);
106 }
107 }
108 }
109 }
110
111 if !batch.is_empty() {
113 let mut batch_results = processor(batch)?;
114 results.append(&mut batch_results);
115 }
116
117 Ok(results)
118 }
119
120 pub fn file_size(&self) -> Option<u64> {
122 self.file_size
123 }
124
125 pub fn buffer_size(&self) -> usize {
127 self.buffer_size
128 }
129}
130
131pub fn estimate_memory_usage_for_processing(file_size: Option<u64>, data_points: usize) -> usize {
133 const BASELINE_OVERHEAD: usize = 1024 * 1024; let file_memory = if let Some(size) = file_size {
136 let multiplier = if size > 10_000_000 { 1.5 } else { 2.0 };
138 (size as f64 * multiplier) as usize
139 } else {
140 data_points * 32 };
142
143 file_memory + BASELINE_OVERHEAD
144}
145
146#[derive(Debug, Clone)]
148pub enum ProcessingStrategy {
149 InMemory, Streaming, BatchedStreaming {
152 batch_size: usize,
154 },
155}
156
157impl ProcessingStrategy {
158 pub fn select_optimal(file_size: Option<u64>, estimated_data_points: usize) -> Self {
159 const SMALL_THRESHOLD: u64 = 1_000_000; const LARGE_THRESHOLD: u64 = 100_000_000; if let Some(size) = file_size {
163 if size < SMALL_THRESHOLD {
164 ProcessingStrategy::InMemory
165 } else if size < LARGE_THRESHOLD {
166 ProcessingStrategy::Streaming
167 } else {
168 let batch_size = if size > 1_000_000_000 {
170 10000 } else {
172 5000 };
174 ProcessingStrategy::BatchedStreaming { batch_size }
175 }
176 } else {
177 if estimated_data_points < 10000 {
179 ProcessingStrategy::InMemory
180 } else {
181 ProcessingStrategy::Streaming
182 }
183 }
184 }
185}