1use crate::error::{CoreError, CoreResult, ErrorContext};
12use crate::testing::{TestConfig, TestResult};
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::time::{Duration, Instant};
16use tempfile::{NamedTempFile, TempDir};
17
18#[cfg(feature = "random")]
19use rand::Rng;
20
21#[cfg(feature = "memory_efficient")]
22use crate::memory_efficient::MemoryMappedArray;
23
24#[derive(Debug, Clone)]
26pub struct LargeScaleTestConfig {
27 pub max_dataset_size: usize,
29 pub memory_limit: usize,
31 pub temp_dir: Option<PathBuf>,
33 pub cleanup_files: bool,
35 pub chunk_size: usize,
37 pub worker_count: usize,
39 pub progress_reporting: bool,
41}
42
43impl Default for LargeScaleTestConfig {
44 fn default() -> Self {
45 Self {
46 max_dataset_size: 1024 * 1024 * 1024, memory_limit: 256 * 1024 * 1024, temp_dir: None,
49 cleanup_files: true,
50 chunk_size: 1024 * 1024, worker_count: std::thread::available_parallelism()
52 .map(|n| n.get())
53 .unwrap_or(4),
54 progress_reporting: false,
55 }
56 }
57}
58
59impl LargeScaleTestConfig {
60 pub fn new() -> Self {
62 Self::default()
63 }
64
65 pub fn with_max_dataset_size(mut self, size: usize) -> Self {
67 self.max_dataset_size = size;
68 self
69 }
70
71 pub fn with_memory_limit(mut self, limit: usize) -> Self {
73 self.memory_limit = limit;
74 self
75 }
76
77 pub fn with_temp_dir<P: AsRef<Path>>(mut self, dir: P) -> Self {
79 self.temp_dir = Some(dir.as_ref().to_path_buf());
80 self
81 }
82
83 pub fn with_cleanup(mut self, cleanup: bool) -> Self {
85 self.cleanup_files = cleanup;
86 self
87 }
88
89 pub fn with_chunk_size(mut self, size: usize) -> Self {
91 self.chunk_size = size;
92 self
93 }
94
95 pub fn with_worker_count(mut self, count: usize) -> Self {
97 self.worker_count = count;
98 self
99 }
100
101 pub fn with_progress_reporting(mut self, enabled: bool) -> Self {
103 self.progress_reporting = enabled;
104 self
105 }
106}
107
108#[derive(Debug, Clone)]
110pub struct LargeScaleTestResult {
111 pub test_name: String,
113 pub dataset_size: usize,
115 pub peak_memory: usize,
117 pub throughput: f64,
119 pub duration: Duration,
121 pub chunks_processed: usize,
123 pub success: bool,
125 pub error: Option<String>,
127 pub metrics: std::collections::HashMap<String, f64>,
129}
130
131impl LargeScaleTestResult {
132 pub fn new(testname: String) -> Self {
134 Self {
135 test_name: testname,
136 dataset_size: 0,
137 peak_memory: 0,
138 throughput: 0.0,
139 duration: Duration::from_secs(0),
140 chunks_processed: 0,
141 success: false,
142 error: None,
143 metrics: std::collections::HashMap::new(),
144 }
145 }
146
147 pub fn with_success(mut self, success: bool) -> Self {
149 self.success = success;
150 self
151 }
152
153 pub fn with_dataset_size(mut self, size: usize) -> Self {
155 self.dataset_size = size;
156 self
157 }
158
159 pub fn with_peak_memory(mut self, memory: usize) -> Self {
161 self.peak_memory = memory;
162 self
163 }
164
165 pub fn with_throughput(mut self, throughput: f64) -> Self {
167 self.throughput = throughput;
168 self
169 }
170
171 pub fn with_duration(mut self, duration: Duration) -> Self {
173 self.duration = duration;
174 self
175 }
176
177 pub fn with_chunks_processed(mut self, chunks: usize) -> Self {
179 self.chunks_processed = chunks;
180 self
181 }
182
183 pub fn witherror(mut self, error: String) -> Self {
185 self.error = Some(error);
186 self.success = false;
187 self
188 }
189
190 pub fn with_metric(mut self, name: String, value: f64) -> Self {
192 self.metrics.insert(name, value);
193 self
194 }
195}
196
197pub struct LargeDatasetGenerator {
199 config: LargeScaleTestConfig,
200 temp_dir: Option<TempDir>,
201}
202
203impl LargeDatasetGenerator {
204 pub fn new(config: LargeScaleTestConfig) -> CoreResult<Self> {
206 let temp_dir = if config.temp_dir.is_none() {
207 Some(TempDir::new().map_err(|e| {
208 CoreError::IoError(ErrorContext::new(format!(
209 "Failed to create temp directory: {}",
210 e
211 )))
212 })?)
213 } else {
214 None
215 };
216
217 Ok(Self { config, temp_dir })
218 }
219
220 pub fn generate_numeric_dataset(&self, size: usize) -> CoreResult<PathBuf> {
222 let temp_path = self.get_temp_path("numeric_dataset.bin")?;
223
224 let start_time = Instant::now();
225 if self.config.progress_reporting {
226 println!("Generating {} MB numeric dataset...", size / (1024 * 1024));
227 }
228
229 let mut file = fs::File::create(&temp_path).map_err(|e| {
231 CoreError::IoError(ErrorContext::new(format!(
232 "Failed to create dataset file: {}",
233 e
234 )))
235 })?;
236
237 use std::io::Write;
238 let chunk_size = self.config.chunk_size.min(size);
239 let num_elements_per_chunk = chunk_size / std::mem::size_of::<f64>();
240 let mut byteswritten = 0;
241
242 while byteswritten < size {
243 let remaining = size - byteswritten;
244 let current_chunk_size = chunk_size.min(remaining);
245 let elements_in_chunk = current_chunk_size / std::mem::size_of::<f64>();
246
247 let chunk_data: Vec<f64> = (0..elements_in_chunk)
249 .map(|i| (byteswritten / std::mem::size_of::<f64>() + i) as f64)
250 .collect();
251
252 let bytes = unsafe {
254 std::slice::from_raw_parts(
255 chunk_data.as_ptr() as *const u8,
256 chunk_data.len() * std::mem::size_of::<f64>(),
257 )
258 };
259 file.write_all(bytes).map_err(|e| {
260 CoreError::IoError(ErrorContext::new(format!(
261 "Failed to write chunk: {error}",
262 error = e
263 )))
264 })?;
265
266 byteswritten += current_chunk_size;
267
268 if self.config.progress_reporting && byteswritten % (10 * 1024 * 1024) == 0 {
269 let progress = (byteswritten * 100) / size;
270 println!("Progress: {}%", progress);
271 }
272 }
273
274 if self.config.progress_reporting {
275 println!("Dataset generation completed in {:?}", start_time.elapsed());
276 }
277
278 Ok(temp_path)
279 }
280
281 pub fn generate_sparse_dataset(&self, size: usize, density: f64) -> CoreResult<PathBuf> {
283 let temp_path = self.get_temp_path("sparse_dataset.bin")?;
284
285 if self.config.progress_reporting {
286 println!(
287 "Generating {} MB sparse dataset (density: {:.2})...",
288 size / (1024 * 1024),
289 density
290 );
291 }
292
293 let mut file = fs::File::create(&temp_path).map_err(|e| {
294 CoreError::IoError(ErrorContext::new(format!(
295 "Failed to create sparse dataset file: {}",
296 e
297 )))
298 })?;
299
300 use std::io::Write;
301 let chunk_size = self.config.chunk_size.min(size);
302 let num_elements_per_chunk = chunk_size / std::mem::size_of::<f64>();
303 let mut byteswritten = 0;
304
305 #[cfg(feature = "random")]
306 let mut rng = rand::rng();
307
308 while byteswritten < size {
309 let remaining = size - byteswritten;
310 let current_chunk_size = chunk_size.min(remaining);
311 let elements_in_chunk = current_chunk_size / std::mem::size_of::<f64>();
312
313 let chunk_data: Vec<f64> = (0..elements_in_chunk)
315 .map(|_| {
316 #[cfg(feature = "random")]
317 {
318 if rng.random_range(0.0..=1.0) < density {
319 rng.random_range(-1000.0..=1000.0)
320 } else {
321 0.0
322 }
323 }
324 #[cfg(not(feature = "random"))]
325 {
326 if (byteswritten / std::mem::size_of::<f64>()) % (1.0 / density) as usize
328 == 0
329 {
330 1.0
331 } else {
332 0.0
333 }
334 }
335 })
336 .collect();
337
338 let bytes = unsafe {
340 std::slice::from_raw_parts(
341 chunk_data.as_ptr() as *const u8,
342 chunk_data.len() * std::mem::size_of::<f64>(),
343 )
344 };
345 file.write_all(bytes).map_err(|e| {
346 CoreError::IoError(ErrorContext::new(format!(
347 "Failed to write sparse chunk: {}",
348 e
349 )))
350 })?;
351
352 byteswritten += current_chunk_size;
353 }
354
355 Ok(temp_path)
356 }
357
358 fn get_temp_path(&self, filename: &str) -> CoreResult<PathBuf> {
360 if let Some(ref temp_dir_path) = self.config.temp_dir {
361 Ok(temp_dir_path.join(filename))
362 } else if let Some(ref temp_dir) = self.temp_dir {
363 Ok(temp_dir.path().join(filename))
364 } else {
365 let temp_file = NamedTempFile::new().map_err(|e| {
366 CoreError::IoError(ErrorContext::new(format!(
367 "Failed to create temp file: {}",
368 e
369 )))
370 })?;
371 Ok(temp_file.into_temp_path().to_path_buf())
372 }
373 }
374}
375
376pub struct LargeScaleProcessor {
378 config: LargeScaleTestConfig,
379}
380
381impl LargeScaleProcessor {
382 pub fn new(config: LargeScaleTestConfig) -> Self {
384 Self { config }
385 }
386
387 pub fn test_chunked_processing<F>(
389 &self,
390 dataset_path: &Path,
391 processor: F,
392 ) -> CoreResult<LargeScaleTestResult>
393 where
394 F: Fn(&[f64]) -> CoreResult<f64>,
395 {
396 let start_time = Instant::now();
397 let mut result = LargeScaleTestResult::new("chunked_processing".to_string());
398
399 let file_size = fs::metadata(dataset_path)
401 .map_err(|e| {
402 CoreError::IoError(ErrorContext::new(format!(
403 "Failed to get file metadata: {}",
404 e
405 )))
406 })?
407 .len() as usize;
408
409 if self.config.progress_reporting {
410 println!(
411 "Processing {} MB dataset in chunks...",
412 file_size / (1024 * 1024)
413 );
414 }
415
416 use std::io::Read;
418 let mut file = fs::File::open(dataset_path).map_err(|e| {
419 CoreError::IoError(ErrorContext::new(format!(
420 "Failed to open dataset file: {}",
421 e
422 )))
423 })?;
424
425 let mut bytes_processed = 0;
426 let mut chunks_processed = 0;
427 let mut accumulator = 0.0;
428 let chunk_size = self.config.chunk_size;
429 let elements_per_chunk = chunk_size / std::mem::size_of::<f64>();
430
431 while bytes_processed < file_size {
432 let remaining = file_size - bytes_processed;
433 let current_chunk_size = chunk_size.min(remaining);
434 let elements_in_chunk = current_chunk_size / std::mem::size_of::<f64>();
435
436 let mut buffer = vec![0u8; current_chunk_size];
438 file.read_exact(&mut buffer).map_err(|e| {
439 CoreError::IoError(ErrorContext::new(format!(
440 "Failed to read chunk: {error}",
441 error = e
442 )))
443 })?;
444
445 let chunk_data = unsafe {
447 std::slice::from_raw_parts(buffer.as_ptr() as *const f64, elements_in_chunk)
448 };
449
450 let chunk_result = processor(chunk_data)?;
452 accumulator += chunk_result;
453
454 bytes_processed += current_chunk_size;
455 chunks_processed += 1;
456
457 if self.config.progress_reporting && chunks_processed % 100 == 0 {
458 let progress = (bytes_processed * 100) / file_size;
459 println!("Processing progress: {}%", progress);
460 }
461 }
462
463 let duration = start_time.elapsed();
464 let throughput = file_size as f64 / duration.as_secs_f64();
465
466 result = result
467 .with_success(true)
468 .with_dataset_size(file_size)
469 .with_duration(duration)
470 .with_chunks_processed(chunks_processed)
471 .with_throughput(throughput)
472 .with_metric("accumulator_result".to_string(), accumulator);
473
474 if self.config.progress_reporting {
475 println!(
476 "Processing completed: {} chunks, {:.2} MB/s throughput",
477 chunks_processed,
478 throughput / (1024.0 * 1024.0)
479 );
480 }
481
482 Ok(result)
483 }
484
485 #[cfg(feature = "memory_efficient")]
487 pub fn test_memory_mapped_processing<F>(
488 &self,
489 dataset_path: &Path,
490 processor: F,
491 ) -> CoreResult<LargeScaleTestResult>
492 where
493 F: Fn(&[f64]) -> CoreResult<f64>,
494 {
495 let start_time = Instant::now();
496 let mut result = LargeScaleTestResult::new("memory_mapped_processing".to_string());
497
498 let file_size = fs::metadata(dataset_path)
500 .map_err(|e| {
501 CoreError::IoError(ErrorContext::new(format!(
502 "Failed to get file metadata: {}",
503 e
504 )))
505 })?
506 .len() as usize;
507
508 let num_elements = file_size / std::mem::size_of::<f64>();
509
510 if self.config.progress_reporting {
511 println!("Memory-mapping {} MB dataset...", file_size / (1024 * 1024));
512 }
513
514 let mmap_array =
516 MemoryMappedArray::<f64>::path(dataset_path, &[num_elements]).map_err(|e| {
517 CoreError::IoError(ErrorContext::new(format!(
518 "Failed to create memory map: {:?}",
519 e
520 )))
521 })?;
522
523 let chunk_size = self.config.chunk_size / std::mem::size_of::<f64>();
525 let mut chunks_processed = 0;
526 let mut accumulator = 0.0;
527
528 for chunk_start in (0..num_elements).step_by(chunk_size) {
529 let chunk_end = (chunk_start + chunk_size).min(num_elements);
530
531 let chunk_data = {
533 let array = mmap_array.as_array::<crate::ndarray::Ix1>().map_err(|e| {
534 CoreError::ComputationError(ErrorContext::new(format!(
535 "Failed to access memory-mapped array: {:?}",
536 e
537 )))
538 })?;
539
540 let slice = array.slice(crate::s![chunk_start..chunk_end]);
542 slice.to_vec() };
544
545 let chunk_result = processor(&chunk_data)?;
546 accumulator += chunk_result;
547 chunks_processed += 1;
548
549 if self.config.progress_reporting && chunks_processed % 100 == 0 {
550 let progress = (chunk_start * 100) / num_elements;
551 println!("Memory-mapped processing progress: {}%", progress);
552 }
553 }
554
555 let duration = start_time.elapsed();
556 let throughput = file_size as f64 / duration.as_secs_f64();
557
558 result = result
559 .with_success(true)
560 .with_dataset_size(file_size)
561 .with_duration(duration)
562 .with_chunks_processed(chunks_processed)
563 .with_throughput(throughput)
564 .with_metric("accumulator_result".to_string(), accumulator);
565
566 if self.config.progress_reporting {
567 println!(
568 "Memory-mapped processing completed: {} chunks, {:.2} MB/s throughput",
569 chunks_processed,
570 throughput / (1024.0 * 1024.0)
571 );
572 }
573
574 Ok(result)
575 }
576
577 pub fn test_out_of_core_reduction(
579 &self,
580 dataset_path: &Path,
581 ) -> CoreResult<LargeScaleTestResult> {
582 let start_time = Instant::now();
583 let mut result = LargeScaleTestResult::new("out_of_core_reduction".to_string());
584
585 let processor_result =
587 self.test_chunked_processing(dataset_path, |chunk| Ok(chunk.iter().sum::<f64>()))?;
588
589 let verification_result = self.verify_reduction_result(dataset_path)?;
591
592 let success =
593 (processor_result.metrics["accumulator_result"] - verification_result).abs() < 1e-6;
594
595 result = result
596 .with_success(success)
597 .with_dataset_size(processor_result.dataset_size)
598 .with_duration(processor_result.duration)
599 .with_chunks_processed(processor_result.chunks_processed)
600 .with_throughput(processor_result.throughput)
601 .with_metric(
602 "computed_sum".to_string(),
603 processor_result.metrics["accumulator_result"],
604 )
605 .with_metric("verified_sum".to_string(), verification_result);
606
607 if !success {
608 result = result.witherror(format!(
609 "Reduction verification failed: computed={}, verified={}",
610 processor_result.metrics["accumulator_result"], verification_result
611 ));
612 }
613
614 Ok(result)
615 }
616
617 fn verify_reduction_result(&self, datasetpath: &Path) -> CoreResult<f64> {
619 let mut file = fs::File::open(datasetpath).map_err(|e| {
621 CoreError::IoError(ErrorContext::new(format!(
622 "Failed to open dataset for verification: {}",
623 e
624 )))
625 })?;
626
627 use std::io::Read;
628 let verification_chunk_size = 1024; let mut buffer = vec![0u8; verification_chunk_size];
630 let mut sum = 0.0;
631
632 loop {
633 match file.read(&mut buffer) {
634 Ok(0) => break, Ok(bytes_read) => {
636 let elements = bytes_read / std::mem::size_of::<f64>();
637 let data = unsafe {
638 std::slice::from_raw_parts(buffer.as_ptr() as *const f64, elements)
639 };
640 sum += data.iter().sum::<f64>();
641 }
642 Err(e) => {
643 return Err(CoreError::IoError(ErrorContext::new(format!(
644 "Verification read failed: {}",
645 e
646 ))))
647 }
648 }
649 }
650
651 Ok(sum)
652 }
653}
654
655pub struct LargeScaleTestUtils;
657
658impl LargeScaleTestUtils {
659 pub fn create_large_scale_test_suite(
661 name: &str,
662 config: TestConfig,
663 ) -> crate::testing::TestSuite {
664 let mut suite = crate::testing::TestSuite::new(name, config);
665
666 let large_config = LargeScaleTestConfig::default()
668 .with_max_dataset_size(10 * 1024 * 1024) .with_chunk_size(1024 * 1024) .with_progress_reporting(false);
671
672 let large_config_1 = large_config.clone();
673 suite.add_test("chunked_dataset_processing", move |_runner| {
674 let generator = LargeDatasetGenerator::new(large_config_1.clone())?;
675 let processor = LargeScaleProcessor::new(large_config_1.clone());
676
677 let dataset_path =
679 generator.generate_numeric_dataset(large_config_1.max_dataset_size)?;
680
681 let result = processor.test_chunked_processing(&dataset_path, |chunk| {
683 Ok(chunk.iter().sum::<f64>() / chunk.len() as f64)
685 })?;
686
687 if !result.success {
688 return Ok(TestResult::failure(
689 result.duration,
690 result.chunks_processed,
691 result
692 .error
693 .unwrap_or_else(|| "Chunked processing failed".to_string()),
694 ));
695 }
696
697 Ok(TestResult::success(
698 std::time::Duration::from_secs(1),
699 result.chunks_processed,
700 ))
701 });
702
703 let large_config_2 = large_config.clone();
704 suite.add_test("sparse_dataset_processing", move |_runner| {
705 let generator = LargeDatasetGenerator::new(large_config_2.clone())?;
706 let processor = LargeScaleProcessor::new(large_config_2.clone());
707
708 let dataset_path =
710 generator.generate_sparse_dataset(large_config_2.max_dataset_size, 0.1)?;
711
712 let result = processor.test_chunked_processing(&dataset_path, |chunk| {
714 Ok(chunk.iter().filter(|&&x| x != 0.0).count() as f64)
716 })?;
717
718 if !result.success {
719 return Ok(TestResult::failure(
720 result.duration,
721 result.chunks_processed,
722 result
723 .error
724 .unwrap_or_else(|| "Sparse processing failed".to_string()),
725 ));
726 }
727
728 Ok(TestResult::success(
729 std::time::Duration::from_secs(1),
730 result.chunks_processed,
731 ))
732 });
733
734 let large_config_3 = large_config.clone();
735 suite.add_test("out_of_core_reduction", move |_runner| {
736 let generator = LargeDatasetGenerator::new(large_config_3.clone())?;
737 let processor = LargeScaleProcessor::new(large_config_3.clone());
738
739 let dataset_path =
741 generator.generate_numeric_dataset(large_config_3.max_dataset_size)?;
742
743 let result = processor.test_out_of_core_reduction(&dataset_path)?;
745
746 if !result.success {
747 return Ok(TestResult::failure(
748 result.duration,
749 result.chunks_processed,
750 result
751 .error
752 .unwrap_or_else(|| "Out-of-core reduction failed".to_string()),
753 ));
754 }
755
756 Ok(TestResult::success(
757 std::time::Duration::from_secs(1),
758 result.chunks_processed,
759 ))
760 });
761
762 #[cfg(feature = "memory_efficient")]
763 {
764 let large_config_4 = large_config.clone();
765 suite.add_test("memory_mapped_processing", move |_runner| {
766 let generator = LargeDatasetGenerator::new(large_config_4.clone())?;
767 let processor = LargeScaleProcessor::new(large_config_4.clone());
768
769 let dataset_path =
771 generator.generate_numeric_dataset(large_config_4.max_dataset_size)?;
772
773 let result = processor.test_chunked_processing(&dataset_path, |chunk| {
775 let mean = chunk.iter().sum::<f64>() / chunk.len() as f64;
777 let variance =
778 chunk.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / chunk.len() as f64;
779 Ok(variance)
780 })?;
781
782 if !result.success {
783 return Ok(TestResult::failure(
784 result.duration,
785 result.chunks_processed,
786 result
787 .error
788 .unwrap_or_else(|| "Memory-mapped processing failed".to_string()),
789 ));
790 }
791
792 Ok(TestResult::success(
793 result.duration,
794 result.chunks_processed,
795 ))
796 });
797 }
798
799 suite
800 }
801}
802
803#[cfg(test)]
804mod tests {
805 use super::*;
806
807 #[test]
808 fn test_large_scale_config() {
809 let config = LargeScaleTestConfig::new()
810 .with_max_dataset_size(512 * 1024 * 1024)
811 .with_memory_limit(128 * 1024 * 1024)
812 .with_chunk_size(2 * 1024 * 1024)
813 .with_worker_count(8)
814 .with_progress_reporting(true);
815
816 assert_eq!(config.max_dataset_size, 512 * 1024 * 1024);
817 assert_eq!(config.memory_limit, 128 * 1024 * 1024);
818 assert_eq!(config.chunk_size, 2 * 1024 * 1024);
819 assert_eq!(config.worker_count, 8);
820 assert!(config.progress_reporting);
821 }
822
823 #[test]
824 fn test_dataset_generator() {
825 let config = LargeScaleTestConfig::default().with_max_dataset_size(1024); let generator = LargeDatasetGenerator::new(config).expect("Operation failed");
828 let dataset_path = generator
829 .generate_numeric_dataset(1024)
830 .expect("Operation failed");
831
832 assert!(dataset_path.exists());
833
834 let metadata = fs::metadata(&dataset_path).expect("Operation failed");
835 assert_eq!(metadata.len() as usize, 1024);
836 }
837
838 #[test]
839 fn test_sparse_dataset_generator() {
840 let config = LargeScaleTestConfig::default();
841 let generator = LargeDatasetGenerator::new(config).expect("Operation failed");
842
843 let dataset_path = generator
844 .generate_sparse_dataset(1024, 0.5)
845 .expect("Operation failed");
846 assert!(dataset_path.exists());
847
848 let metadata = fs::metadata(&dataset_path).expect("Operation failed");
849 assert_eq!(metadata.len() as usize, 1024);
850 }
851
852 #[test]
853 fn test_chunked_processing() {
854 let config = LargeScaleTestConfig::default().with_chunk_size(256);
855
856 let generator = LargeDatasetGenerator::new(config.clone()).expect("Operation failed");
857 let processor = LargeScaleProcessor::new(config);
858
859 let dataset_path = generator
860 .generate_numeric_dataset(1024)
861 .expect("Operation failed");
862
863 let result = processor
864 .test_chunked_processing(&dataset_path, |chunk| Ok(chunk.len() as f64))
865 .expect("Operation failed");
866
867 assert!(result.success);
868 assert_eq!(result.dataset_size, 1024);
869 assert!(result.chunks_processed > 0);
870 assert!(result.throughput > 0.0);
871 }
872}