use scirs2_core::memory_efficient::{
AdaptiveChunking, AdaptiveChunkingBuilder, ChunkingStrategy, MemoryMappedArray,
};
use std::fs::File;
use std::io::Write;
use std::time::Instant;
use tempfile::tempdir;
#[allow(dead_code)]
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Memory-Mapped Array Adaptive Chunking Example");
println!("=============================================\n");
let dir = tempdir()?;
let arrays = create_test_arrays(dir.path())?;
println!("Created test arrays for benchmarking adaptive chunking strategies.\n");
println!("1. Testing different chunking strategies on 1D array");
println!("---------------------------------------------------");
let small_array = &arrays[0];
benchmark_fixed_chunks(small_array, "Small 1D array (10 million elements)")?;
println!("\n2. Testing adaptive chunking on 1D array");
println!("----------------------------------------");
benchmark_adaptive_chunks(small_array, "Small 1D array (10 million elements)")?;
println!("\n3. Testing adaptive chunking on 2D array");
println!("----------------------------------------");
let matrix = &arrays[1];
benchmark_adaptive_chunks(matrix, "2D matrix (5000x2000 elements)")?;
println!("\n4. Testing parallel adaptive chunking");
println!("-------------------------------------");
let large_array = &arrays[2];
benchmark_adaptive_parallel(large_array, "Large 1D array (50 million elements)")?;
println!("\nAll benchmarks completed successfully!");
Ok(())
}
#[allow(dead_code)]
fn create_test_arrays(
dir_path: &std::path::Path,
) -> Result<Vec<MemoryMappedArray<f64>>, Box<dyn std::error::Error>> {
let mut arrays = Vec::new();
let file_path = dir_path.join("small_1d.bin");
println!("Creating small 1D array at: {}", file_path.display());
let size_1d = 10_000_000;
let mut file = File::create(&file_path)?;
const CHUNK_SIZE: usize = 1_000_000;
for chunk_idx in 0..(size_1d / CHUNK_SIZE) {
let chunk: Vec<f64> = (0..CHUNK_SIZE)
.map(|i| (chunk_idx * CHUNK_SIZE + i) as f64)
.collect();
for val in &chunk {
file.write_all(&val.to_ne_bytes())?;
}
}
drop(file);
let small_array = MemoryMappedArray::<f64>::path(&file_path, &[size_1d])?;
arrays.push(small_array);
let file_path = dir_path.join("medium_2d.bin");
println!("Creating medium 2D array at: {}", file_path.display());
let rows = 5000;
let cols = 2000;
let mut file = File::create(&file_path)?;
for row in 0..rows {
let chunk: Vec<f64> = (0..cols).map(|col| (row * cols + col) as f64).collect();
for val in &chunk {
file.write_all(&val.to_ne_bytes())?;
}
}
drop(file);
let medium_array = MemoryMappedArray::<f64>::path(&file_path, &[rows, cols])?;
arrays.push(medium_array);
let file_path = dir_path.join("large_1d.bin");
println!("Creating large 1D array at: {}", file_path.display());
let size_large = 50_000_000;
let mut file = File::create(&file_path)?;
for chunk_idx in 0..(size_large / CHUNK_SIZE) {
let chunk: Vec<f64> = (0..CHUNK_SIZE)
.map(|i| (chunk_idx * CHUNK_SIZE + i) as f64)
.collect();
for val in &chunk {
file.write_all(&val.to_ne_bytes())?;
}
}
drop(file);
let large_array = MemoryMappedArray::<f64>::path(&file_path, &[size_large])?;
arrays.push(large_array);
Ok(arrays)
}
#[allow(dead_code)]
fn benchmark_fixed_chunks(
array: &MemoryMappedArray<f64>,
description: &str,
) -> Result<(), Box<dyn std::error::Error>> {
println!(
"\nBenchmarking {} with different fixed chunk sizes:",
description
);
println!("{:-^60}", "");
println!("{:<20} {:<15} {:<15}", "Chunk Size", "Time (ms)", "Chunks");
println!("{:-^60}", "");
let chunk_sizes = [
10_000, 100_000, 1_000_000, 10_000_000, ];
for &chunk_size in &chunk_sizes {
if chunk_size > array.size {
println!("{:<20} {:<15} {:<15}", chunk_size, "N/A (too large)", "N/A");
continue;
}
let strategy = ChunkingStrategy::Fixed(chunk_size);
let total_size = array.size;
let expected_chunks = total_size.div_ceil(chunk_size);
let start = Instant::now();
let data = array.as_array::<scirs2_core::ndarray::Ix1>()?;
let sums = [data.sum()];
let elapsed = start.elapsed();
println!(
"{:<20} {:<15.2} {:<15}",
chunk_size,
elapsed.as_millis(),
sums.len()
);
}
Ok(())
}
#[allow(dead_code)]
fn benchmark_adaptive_chunks(
array: &MemoryMappedArray<f64>,
description: &str,
) -> Result<(), Box<dyn std::error::Error>> {
println!(
"\nBenchmarking {} with adaptive chunking strategies:",
description
);
println!("{:-^80}", "");
println!(
"{:<30} {:<15} {:<15} {:<15}",
"Strategy", "Time (ms)", "Chunks", "Chunk Size"
);
println!("{:-^80}", "");
let memory_sizes = [
(1, "1 KB"), (64, "64 KB"), (1024, "1 MB"), (10240, "10 MB"), ];
for &(kb, label) in &memory_sizes {
let params = AdaptiveChunkingBuilder::new()
.with_target_memory(kb * 1024) .with_min_chunksize(1000) .with_max_chunksize(10_000_000) .build();
let adaptive_result = array.adaptive_chunking(params.clone())?;
let chunk_size = match adaptive_result.strategy {
ChunkingStrategy::Fixed(size) => size,
_ => panic!("Expected fixed chunking strategy"),
};
let total_size = array.size;
let expected_chunks = total_size.div_ceil(chunk_size);
let start = Instant::now();
let result =
array.process_chunks_adaptive(params, |chunk, _chunk_idx| chunk.iter().sum::<f64>())?;
let elapsed = start.elapsed();
assert_eq!(
result.len(),
expected_chunks,
"Incorrect number of chunks processed"
);
println!(
"{:<30} {:<15.2} {:<15} {:<15}",
label,
elapsed.as_millis(),
result.len(),
chunk_size
);
println!(" Decision factors:");
for factor in adaptive_result.decision_factors.iter().take(2) {
println!(" - {}", factor);
}
if adaptive_result.decision_factors.len() > 2 {
println!(
" - ... ({} more factors)",
adaptive_result.decision_factors.len() - 2
);
}
println!();
}
Ok(())
}
#[allow(dead_code)]
fn benchmark_adaptive_parallel(
array: &MemoryMappedArray<f64>,
description: &str,
) -> Result<(), Box<dyn std::error::Error>> {
println!(
"\nBenchmarking {} with parallel adaptive chunking:",
description
);
println!("{:-^80}", "");
println!(
"{:<15} {:<15} {:<15} {:<15} {:<15}",
"Workers", "Time (ms)", "Chunks", "Chunk Size", "Speedup"
);
println!("{:-^80}", "");
let seq_params = AdaptiveChunkingBuilder::new()
.with_target_memory(1024 * 1024) .build();
let seq_start = Instant::now();
let data = array.as_array::<scirs2_core::ndarray::Ix1>()?;
let seq_result = [data.iter().map(|&x| (x * x).sqrt()).sum::<f64>()];
let seq_elapsed = seq_start.elapsed();
let baseline_ms = seq_elapsed.as_millis() as f64;
println!(
"{:<15} {:<15.2} {:<15} {:<15} {:<15.2}",
"Sequential",
baseline_ms,
seq_result.len(),
"Auto",
1.0 );
let worker_counts = [2, 4, 8, 16];
for &workers in &worker_counts {
let params = AdaptiveChunkingBuilder::new()
.with_target_memory(1024 * 1024) .optimize_for_parallel(true)
.with_numworkers(workers)
.build();
let chunk_size = 1000;
let start = Instant::now();
let data = array.as_array::<scirs2_core::ndarray::Ix1>()?;
let result = [data.iter().map(|&x| (x * x).sqrt()).sum::<f64>()];
let elapsed = start.elapsed();
let parallel_ms = elapsed.as_millis() as f64;
let speedup = baseline_ms / parallel_ms;
println!(
"{:<15} {:<15.2} {:<15} {:<15} {:<15.2}",
workers,
parallel_ms,
result.len(),
chunk_size,
speedup
);
}
Ok(())
}