lawkit_core/common/
streaming_io.rs

1use crate::error::Result;
2use std::fs::File;
3use std::io::{BufRead, BufReader};
4use std::path::Path;
5
6/// diffxの技術を活用した最適化IO処理
7pub struct OptimizedFileReader {
8    reader: Box<dyn BufRead>,
9    file_size: Option<u64>,
10    buffer_size: usize,
11}
12
13impl OptimizedFileReader {
14    /// ファイルからの最適化読み込み(diffxパターン)
15    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
16        let file = File::open(&path)?;
17        let file_size = file.metadata()?.len();
18        let buffer_size = Self::optimal_buffer_size(file_size);
19
20        Ok(Self {
21            reader: Box::new(BufReader::with_capacity(buffer_size, file)),
22            file_size: Some(file_size),
23            buffer_size,
24        })
25    }
26
27    /// 標準入力からの読み込み
28    pub fn from_stdin() -> Self {
29        let stdin = std::io::stdin();
30        Self {
31            reader: Box::new(stdin.lock()),
32            file_size: None,
33            buffer_size: 64 * 1024, // デフォルト64KB
34        }
35    }
36
37    /// diffxの経験値に基づく最適バッファサイズ
38    fn optimal_buffer_size(file_size: u64) -> usize {
39        match file_size {
40            0..=1_000_000 => 8 * 1024,           // 8KB(小ファイル)
41            1_000_001..=10_000_000 => 32 * 1024, // 32KB(中ファイル)
42            _ => 128 * 1024,                     // 128KB(大ファイル)
43        }
44    }
45
46    /// 大容量ファイルストリーミング判定(diffx 100MB閾値)
47    pub fn should_use_streaming(&self) -> bool {
48        const MAX_MEMORY_SIZE: u64 = 100 * 1024 * 1024; // 100MB
49
50        if let Some(size) = self.file_size {
51            size > MAX_MEMORY_SIZE
52        } else {
53            true // stdin は常にストリーミング
54        }
55    }
56
57    /// ライン単位でのストリーミング読み込み
58    pub fn read_lines_streaming<F, T>(&mut self, mut processor: F) -> Result<Vec<T>>
59    where
60        F: FnMut(String) -> Result<Option<T>>,
61    {
62        let mut results = Vec::new();
63        let mut line = String::new();
64
65        loop {
66            line.clear();
67            match self.reader.read_line(&mut line)? {
68                0 => break, // EOF
69                _ => {
70                    let trimmed_line = line.trim_end().to_string();
71                    if let Some(result) = processor(trimmed_line)? {
72                        results.push(result);
73                    }
74                }
75            }
76        }
77
78        Ok(results)
79    }
80
81    /// バッチ処理(diffxのバッチサイズ最適化)
82    pub fn read_lines_batched<F, T>(
83        &mut self,
84        batch_size: usize,
85        mut processor: F,
86    ) -> Result<Vec<T>>
87    where
88        F: FnMut(Vec<String>) -> Result<Vec<T>>,
89    {
90        let mut results = Vec::new();
91        let mut batch = Vec::with_capacity(batch_size);
92        let mut line = String::new();
93
94        loop {
95            line.clear();
96            match self.reader.read_line(&mut line)? {
97                0 => break, // EOF
98                _ => {
99                    let trimmed_line = line.trim_end().to_string();
100                    batch.push(trimmed_line);
101
102                    if batch.len() >= batch_size {
103                        let mut batch_results = processor(batch)?;
104                        results.append(&mut batch_results);
105                        batch = Vec::with_capacity(batch_size);
106                    }
107                }
108            }
109        }
110
111        // 残りのバッチを処理
112        if !batch.is_empty() {
113            let mut batch_results = processor(batch)?;
114            results.append(&mut batch_results);
115        }
116
117        Ok(results)
118    }
119
120    /// ファイルサイズ情報
121    pub fn file_size(&self) -> Option<u64> {
122        self.file_size
123    }
124
125    /// バッファサイズ情報
126    pub fn buffer_size(&self) -> usize {
127        self.buffer_size
128    }
129}
130
131/// メモリ使用量の推定(diffxパターン)
132pub fn estimate_memory_usage_for_processing(file_size: Option<u64>, data_points: usize) -> usize {
133    const BASELINE_OVERHEAD: usize = 1024 * 1024; // 1MB基本オーバーヘッド
134
135    let file_memory = if let Some(size) = file_size {
136        // diffx知見:入力の1.5x-2x使用
137        let multiplier = if size > 10_000_000 { 1.5 } else { 2.0 };
138        (size as f64 * multiplier) as usize
139    } else {
140        data_points * 32 // 推定32バイト/データポイント
141    };
142
143    file_memory + BASELINE_OVERHEAD
144}
145
146/// 適応的処理戦略選択(diffxの自動最適化パターン)
147#[derive(Debug, Clone)]
148pub enum ProcessingStrategy {
149    InMemory,  // 小データ:メモリ内処理
150    Streaming, // 大データ:ストリーミング処理
151    BatchedStreaming {
152        // 超大データ:バッチ化ストリーミング
153        batch_size: usize,
154    },
155}
156
157impl ProcessingStrategy {
158    pub fn select_optimal(file_size: Option<u64>, estimated_data_points: usize) -> Self {
159        const SMALL_THRESHOLD: u64 = 1_000_000; // 1MB
160        const LARGE_THRESHOLD: u64 = 100_000_000; // 100MB(diffx閾値)
161
162        if let Some(size) = file_size {
163            if size < SMALL_THRESHOLD {
164                ProcessingStrategy::InMemory
165            } else if size < LARGE_THRESHOLD {
166                ProcessingStrategy::Streaming
167            } else {
168                // diffxの大ファイル戦略
169                let batch_size = if size > 1_000_000_000 {
170                    10000 // 1GB超:大バッチ
171                } else {
172                    5000 // 100MB-1GB:中バッチ
173                };
174                ProcessingStrategy::BatchedStreaming { batch_size }
175            }
176        } else {
177            // stdin:データポイント数で判定
178            if estimated_data_points < 10000 {
179                ProcessingStrategy::InMemory
180            } else {
181                ProcessingStrategy::Streaming
182            }
183        }
184    }
185}