1use chess::Board;
2use indicatif::{ProgressBar, ProgressStyle};
3use rayon::prelude::*;
4use std::collections::HashSet;
5use std::fs::File;
6use std::path::Path;
7use std::sync::{Arc, Mutex};
8
9pub struct UltraFastLoader {
12 pub loaded_count: usize,
13 pub duplicate_count: usize,
14 pub error_count: usize,
15 batch_size: usize,
16 #[allow(dead_code)]
17 use_bloom_filter: bool,
18}
19
20impl UltraFastLoader {
21 pub fn new_for_massive_datasets() -> Self {
22 Self {
23 loaded_count: 0,
24 duplicate_count: 0,
25 error_count: 0,
26 batch_size: 50000, use_bloom_filter: true,
28 }
29 }
30
31 pub fn ultra_load_binary<P: AsRef<Path>>(
33 &mut self,
34 path: P,
35 engine: &mut crate::ChessVectorEngine,
36 ) -> Result<(), Box<dyn std::error::Error>> {
37 let path_ref = path.as_ref();
38 println!("Operation complete");
39
40 let file_size = std::fs::metadata(path_ref)?.len();
41 println!("π File size: {:.1} MB", file_size as f64 / 1_000_000.0);
42
43 if file_size > 500_000_000 {
44 println!("β‘ Large file detected - using memory-mapped loading");
46 return self.memory_mapped_load(path_ref, engine);
47 }
48
49 let data = std::fs::read(path_ref)?;
51
52 let decompressed_data = if let Ok(decompressed) = lz4_flex::decompress_size_prepended(&data)
54 {
55 println!(
56 "ποΈ LZ4 decompressed: {} β {} bytes",
57 data.len(),
58 decompressed.len()
59 );
60 decompressed
61 } else {
62 data
63 };
64
65 let positions: Vec<(String, f32)> = match bincode::deserialize(&decompressed_data) {
67 Ok(pos) => pos,
68 Err(e) => {
69 println!("Operation complete");
70 return Err(e.into());
71 }
72 };
73
74 let total_positions = positions.len();
75 println!("π¦ Loaded {} positions from binary", total_positions);
76
77 if total_positions == 0 {
78 return Ok(());
79 }
80
81 if total_positions > 100_000 {
83 self.parallel_batch_load(positions, engine)
84 } else {
85 self.sequential_load(positions, engine)
86 }
87 }
88
89 fn memory_mapped_load<P: AsRef<Path>>(
91 &mut self,
92 path: P,
93 engine: &mut crate::ChessVectorEngine,
94 ) -> Result<(), Box<dyn std::error::Error>> {
95 use memmap2::Mmap;
96
97 let file = File::open(path)?;
98 let mmap = unsafe { Mmap::map(&file)? };
99
100 println!("πΊοΈ Memory-mapped {} bytes", mmap.len());
101
102 const CHUNK_SIZE: usize = 50_000_000; let total_chunks = mmap.len().div_ceil(CHUNK_SIZE);
105
106 println!("π¦ Processing {} chunks of ~50MB each", total_chunks);
107
108 self.stream_parse_memory_mapped(&mmap, engine)
111 }
112
113 fn stream_parse_memory_mapped(
115 &mut self,
116 mmap: &memmap2::Mmap,
117 engine: &mut crate::ChessVectorEngine,
118 ) -> Result<(), Box<dyn std::error::Error>> {
119 if let Ok(decompressed) = lz4_flex::decompress_size_prepended(mmap) {
123 println!("ποΈ Full file LZ4 decompressed");
124 return self.parse_decompressed_data(&decompressed, engine);
125 }
126
127 if let Ok(positions) = bincode::deserialize::<Vec<(String, f32)>>(mmap) {
129 println!("π¦ Direct memory-mapped deserialization");
130 return self.parallel_batch_load(positions, engine);
131 }
132
133 if let Ok(text) = std::str::from_utf8(mmap) {
135 println!("π Treating as text format");
136 return self.parse_text_data(text, engine);
137 }
138
139 Err("Unable to parse memory-mapped file in any known format".into())
140 }
141
142 fn parse_decompressed_data(
144 &mut self,
145 data: &[u8],
146 engine: &mut crate::ChessVectorEngine,
147 ) -> Result<(), Box<dyn std::error::Error>> {
148 let positions: Vec<(String, f32)> = bincode::deserialize(data)?;
149 self.parallel_batch_load(positions, engine)
150 }
151
152 fn parse_text_data(
154 &mut self,
155 text: &str,
156 engine: &mut crate::ChessVectorEngine,
157 ) -> Result<(), Box<dyn std::error::Error>> {
158 println!("π Parsing text data...");
159
160 let lines: Vec<&str> = text.lines().collect();
161 let total_lines = lines.len();
162
163 if total_lines == 0 {
164 return Ok(());
165 }
166
167 println!("π Processing {} lines", total_lines);
168
169 let pb = ProgressBar::new(total_lines as u64);
170 pb.set_style(
171 ProgressStyle::default_bar()
172 .template("β‘ Parsing [{elapsed_precise}] [{bar:40.green/blue}] {pos}/{len} ({percent}%) {msg}")?
173 .progress_chars("βββ")
174 );
175
176 let batch_size = 10000;
178 let existing_boards: HashSet<Board> = engine.position_boards.iter().cloned().collect();
179 let existing_boards = Arc::new(existing_boards);
180
181 let results: Arc<Mutex<Vec<(Board, f32)>>> = Arc::new(Mutex::new(Vec::new()));
182
183 lines
184 .par_chunks(batch_size)
185 .enumerate()
186 .for_each(|(chunk_idx, chunk)| {
187 let mut local_results = Vec::new();
188
189 for (line_idx, line) in chunk.iter().enumerate() {
190 if line.trim().is_empty() {
191 continue;
192 }
193
194 if let Ok(json) = serde_json::from_str::<serde_json::Value>(line) {
196 if let Some((board, eval)) = self.extract_from_json(&json) {
197 if !existing_boards.contains(&board) {
198 local_results.push((board, eval));
199 }
200 }
201 }
202
203 if line_idx % 1000 == 0 {
205 pb.set_position((chunk_idx * batch_size + line_idx) as u64);
206 }
207 }
208
209 if !local_results.is_empty() {
211 if let Ok(mut results) = results.lock() {
212 results.extend(local_results);
213 }
214 }
215 });
216
217 pb.finish_with_message("β
Text parsing complete");
218
219 let final_results = Arc::try_unwrap(results).unwrap().into_inner().unwrap();
221 self.loaded_count = final_results.len();
222
223 println!("π¦ Parsed {} valid positions", self.loaded_count);
224
225 for (board, eval) in final_results {
227 engine.add_position(&board, eval);
228 }
229
230 Ok(())
231 }
232
233 fn extract_from_json(&self, json: &serde_json::Value) -> Option<(Board, f32)> {
235 if let (Some(fen), Some(eval)) = (
237 json.get("fen").and_then(|v| v.as_str()),
238 json.get("evaluation").and_then(|v| v.as_f64()),
239 ) {
240 if let Ok(board) = fen.parse::<Board>() {
241 return Some((board, eval as f32));
242 }
243 }
244
245 None
246 }
247
248 fn parallel_batch_load(
250 &mut self,
251 positions: Vec<(String, f32)>,
252 engine: &mut crate::ChessVectorEngine,
253 ) -> Result<(), Box<dyn std::error::Error>> {
254 let total_positions = positions.len();
255 println!("π Parallel batch loading {} positions", total_positions);
256
257 let pb = ProgressBar::new(total_positions as u64);
258 pb.set_style(
259 ProgressStyle::default_bar()
260 .template("β‘ Loading [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({percent}%) {msg}")?
261 .progress_chars("βββ")
262 );
263
264 let existing_boards: HashSet<Board> = engine.position_boards.iter().cloned().collect();
266
267 let chunk_size = self.batch_size;
269 let chunks: Vec<_> = positions.chunks(chunk_size).collect();
270
271 let mut total_loaded = 0;
272 let mut total_duplicates = 0;
273
274 for (chunk_idx, chunk) in chunks.iter().enumerate() {
275 let mut batch_boards = Vec::new();
276 let mut batch_evaluations = Vec::new();
277
278 for (fen, evaluation) in chunk.iter() {
280 match fen.parse::<Board>() {
281 Ok(board) => {
282 if !existing_boards.contains(&board) {
283 batch_boards.push(board);
284 batch_evaluations.push(*evaluation);
285 } else {
286 total_duplicates += 1;
287 }
288 }
289 Err(_) => {
290 self.error_count += 1;
291 }
292 }
293 }
294
295 for (board, eval) in batch_boards.iter().zip(batch_evaluations.iter()) {
297 engine.add_position(board, *eval);
298 total_loaded += 1;
299 }
300
301 pb.set_position(((chunk_idx + 1) * chunk_size).min(total_positions) as u64);
303 pb.set_message(format!(
304 "{} loaded, {} dupes",
305 total_loaded, total_duplicates
306 ));
307 }
308
309 pb.finish_with_message(format!("β
Loaded {} positions", total_loaded));
310
311 self.loaded_count = total_loaded;
312 self.duplicate_count = total_duplicates;
313
314 println!("π Final stats:");
315 println!(" Loaded: {} positions", self.loaded_count);
316 println!("Operation complete");
317 println!("Operation complete");
318
319 Ok(())
320 }
321
322 fn sequential_load(
324 &mut self,
325 positions: Vec<(String, f32)>,
326 engine: &mut crate::ChessVectorEngine,
327 ) -> Result<(), Box<dyn std::error::Error>> {
328 println!("π¦ Sequential loading {} positions", positions.len());
329
330 let existing_boards: HashSet<Board> = engine.position_boards.iter().cloned().collect();
331
332 for (fen, evaluation) in positions {
333 match fen.parse::<Board>() {
334 Ok(board) => {
335 if !existing_boards.contains(&board) {
336 engine.add_position(&board, evaluation);
337 self.loaded_count += 1;
338 } else {
339 self.duplicate_count += 1;
340 }
341 }
342 Err(_) => {
343 self.error_count += 1;
344 }
345 }
346 }
347
348 Ok(())
349 }
350
351 pub fn get_stats(&self) -> LoadingStats {
353 LoadingStats {
354 loaded: self.loaded_count,
355 duplicates: self.duplicate_count,
356 errors: self.error_count,
357 total_processed: self.loaded_count + self.duplicate_count + self.error_count,
358 }
359 }
360}
361
362#[derive(Debug, Clone)]
364pub struct LoadingStats {
365 pub loaded: usize,
366 pub duplicates: usize,
367 pub errors: usize,
368 pub total_processed: usize,
369}
370
371impl LoadingStats {
372 pub fn success_rate(&self) -> f64 {
373 if self.total_processed == 0 {
374 return 1.0;
375 }
376 self.loaded as f64 / self.total_processed as f64
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383
384 #[test]
385 fn test_ultra_fast_loader_creation() {
386 let loader = UltraFastLoader::new_for_massive_datasets();
387 assert_eq!(loader.loaded_count, 0);
388 assert_eq!(loader.batch_size, 50000);
389 assert!(loader.use_bloom_filter);
390 }
391
392 #[test]
393 fn test_loading_stats() {
394 let mut loader = UltraFastLoader::new_for_massive_datasets();
395 loader.loaded_count = 8000;
396 loader.duplicate_count = 1500;
397 loader.error_count = 500;
398
399 let stats = loader.get_stats();
400 assert_eq!(stats.loaded, 8000);
401 assert_eq!(stats.total_processed, 10000);
402 assert_eq!(stats.success_rate(), 0.8);
403 }
404}