#![allow(
clippy::cast_precision_loss,
clippy::case_sensitive_file_extension_comparisons
)]
use rayon::prelude::*;
use std::path::Path;
use std::time::Instant;
fn main() {
println!("\n{}", "=".repeat(70));
println!("MRRC Concurrent Reading with Rayon");
println!("{}\n", "=".repeat(70));
let records = load_records_from_test_data();
if records.is_empty() {
println!("No test records found. Skipping example.");
println!("To run this example, place .mrc files in tests/data/fixtures/");
return;
}
println!("Loaded {} records", records.len());
println!();
demo_sequential(&records);
demo_parallel_iteration(&records);
demo_parallel_processing(&records);
demo_parallel_grouping(&records);
}
fn load_records_from_test_data() -> Vec<String> {
let test_dir = Path::new("tests/data/fixtures");
if !test_dir.exists() {
return Vec::new();
}
let mut records = Vec::new();
if let Ok(entries) = std::fs::read_dir(test_dir) {
for entry in entries.flatten() {
if let Some(filename) = entry.file_name().to_str() {
if filename.ends_with(".mrc") {
for i in 0..10 {
records.push(format!("record-{filename}-{i}"));
}
}
}
}
}
records
}
fn demo_sequential(records: &[String]) {
println!("{}", "=".repeat(70));
println!("1. SEQUENTIAL PROCESSING (Baseline)");
println!("{}\n", "=".repeat(70));
let start = Instant::now();
let results: Vec<usize> = records
.iter()
.map(|record| process_record(record))
.collect();
let elapsed = start.elapsed();
let total: usize = results.iter().sum();
println!("Time: {elapsed:?}");
println!("Total work: {total} units");
println!(
"Throughput: {:.0} units/ms",
total as f64 / elapsed.as_secs_f64() / 1000.0
);
println!();
}
fn demo_parallel_iteration(records: &[String]) {
println!("{}", "=".repeat(70));
println!("2. PARALLEL ITERATION WITH RAYON");
println!("{}\n", "=".repeat(70));
let start = Instant::now();
let results: Vec<usize> = records
.par_iter()
.map(|record| process_record(record))
.collect();
let elapsed = start.elapsed();
let total: usize = results.iter().sum();
println!("Time: {elapsed:?}");
println!("Total work: {total} units");
println!(
"Throughput: {:.0} units/ms",
total as f64 / elapsed.as_secs_f64() / 1000.0
);
println!();
println!("RAYON WORK-STEALING:");
println!(" - Rayon divides work into chunks");
println!(" - Threads steal tasks from each other");
println!(" - Load balancing is automatic");
println!(" - No synchronization overhead");
println!();
}
fn demo_parallel_processing(records: &[String]) {
println!("{}", "=".repeat(70));
println!("3. PARALLEL PROCESSING WITH FILTERS");
println!("{}\n", "=".repeat(70));
let start = Instant::now();
let matching_count: usize = records
.par_iter()
.filter(|record| record.contains('1')) .map(|record| process_record(record))
.sum();
let elapsed = start.elapsed();
println!("Processed {matching_count} matching records");
println!("Time: {elapsed:?}");
println!();
println!("PATTERN: filter → map → reduce");
println!(" 1. filter: Select records matching criteria");
println!(" 2. map: Process each record (CPU-intensive work)");
println!(" 3. reduce: Combine results (sum, collect, etc.)");
println!();
}
fn demo_parallel_grouping(records: &[String]) {
use std::collections::BTreeMap;
println!("{}", "=".repeat(70));
println!("4. PARALLEL GROUPING AND AGGREGATION");
println!("{}\n", "=".repeat(70));
let start = Instant::now();
let group_counts: BTreeMap<char, usize> = records
.par_iter()
.fold(BTreeMap::new, |mut acc, record| {
let first_char = record.chars().next().unwrap_or(' ');
*acc.entry(first_char).or_insert(0) += 1;
acc
})
.reduce(BTreeMap::new, |mut acc, other| {
for (key, count) in other {
*acc.entry(key).or_insert(0) += count;
}
acc
});
let elapsed = start.elapsed();
println!("Grouped results:");
for (key, count) in group_counts.iter().take(5) {
println!(" '{key}': {count} records");
}
println!("Time: {elapsed:?}");
println!();
println!("FOLD-REDUCE PATTERN:");
println!(" - Each thread maintains local accumulator");
println!(" - fold: Process records into local result");
println!(" - reduce: Merge thread results");
println!(" - No shared state = no contention");
println!();
}
fn process_record(record: &str) -> usize {
let mut sum = 0;
for ch in record.chars() {
sum += ch as usize;
}
sum % 100 }
#[allow(dead_code)]
fn show_rayon_config() {
println!("Rayon Configuration:");
println!(" Threads: {}", rayon::current_num_threads());
println!();
}
#[allow(dead_code)]
fn parallel_chunks_example(records: &[String]) {
println!("\nExample: Processing records in chunks");
let chunk_size = 100;
let results: Vec<usize> = records
.par_chunks(chunk_size)
.flat_map(|chunk| chunk.par_iter().map(|record| process_record(record)))
.collect();
println!(
"Processed {} records in chunks of {}",
results.len(),
chunk_size
);
}
#[allow(dead_code)]
fn conditional_processing_example(records: &[String]) {
println!("\nExample: Conditional parallel processing");
let filtered_results: Vec<(String, usize)> = records
.par_iter()
.filter_map(|record| {
if record.len() > 10 {
Some((record.clone(), process_record(record)))
} else {
None
}
})
.collect();
println!("Processed {} matching records", filtered_results.len());
}
#[allow(dead_code)]
fn parallel_with_results_example(records: &[String]) {
println!("\nExample: Parallel processing with error handling");
let results: Vec<Result<usize, String>> = records
.par_iter()
.map(|record| {
if record.is_empty() {
Err("Empty record".to_string())
} else {
Ok(process_record(record))
}
})
.collect();
let success_count = results.iter().filter(|r| r.is_ok()).count();
println!("Successful: {}/{}", success_count, results.len());
}