ecad-processor 2.0.1

High-performance multi-metric weather data processor for European Climate Assessment & Dataset (ECA&D) archives with Parquet output
Documentation
use chrono::NaiveDate;
/// V3 Integration Benchmarks
///
/// This benchmark suite compares V2 baseline implementations with
/// V3 optimized implementations to demonstrate actual performance improvements.
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use ecad_processor::models::{StationMetadata, TemperatureRecord};
use ecad_processor::processors::DataMerger;
use ecad_processor::utils::coordinates::{dms_to_decimal, dms_to_decimal_cached};
use std::collections::HashMap;

/// Generate test data for benchmarking
fn create_test_data(
    stations: usize,
    days: usize,
) -> (Vec<StationMetadata>, Vec<TemperatureRecord>) {
    let mut station_metadata = Vec::with_capacity(stations);
    let mut temperature_records = Vec::new();

    for station_id in 1..=stations {
        let station = StationMetadata {
            staid: station_id as u32,
            name: format!("Test Station {}", station_id),
            country: "UK".to_string(),
            latitude: 51.0 + (station_id as f64) * 0.01,
            longitude: -1.0 - (station_id as f64) * 0.01,
            elevation: Some(100 + (station_id as i32) * 10),
        };
        station_metadata.push(station);

        let base_date = NaiveDate::from_ymd_opt(2023, 1, 1).unwrap();
        for day in 0..days {
            let date = base_date + chrono::Duration::days(day as i64);
            let base_temp = 15.0 + (day as f32) * 0.1 + (station_id as f32) * 0.5;

            // Min temperature
            temperature_records.push(TemperatureRecord {
                staid: station_id as u32,
                souid: 1,
                date,
                temperature: base_temp - 5.0,
                quality_flag: 0,
            });

            // Max temperature
            temperature_records.push(TemperatureRecord {
                staid: station_id as u32,
                souid: 2,
                date,
                temperature: base_temp + 5.0,
                quality_flag: 0,
            });

            // Avg temperature
            temperature_records.push(TemperatureRecord {
                staid: station_id as u32,
                souid: 3,
                date,
                temperature: base_temp,
                quality_flag: 0,
            });
        }
    }

    (station_metadata, temperature_records)
}

/// Benchmark temperature validation: V2 vs V3 SIMD
fn benchmark_temperature_validation_comparison(c: &mut Criterion) {
    let temperatures: Vec<f32> = (0..1000).map(|i| (i as f32 * 0.1) - 25.0).collect();

    let mut group = c.benchmark_group("temperature_validation");
    group.throughput(Throughput::Elements(temperatures.len() as u64));

    // V2 baseline - individual validation
    group.bench_function("v2_individual", |b| {
        b.iter(|| {
            let mut valid_count = 0;
            for &temp in &temperatures {
                if temp >= -50.0 && temp <= 50.0 && temp.is_finite() {
                    valid_count += 1;
                }
            }
            black_box(valid_count)
        })
    });

    // V3 optimized - SIMD batch validation
    #[cfg(feature = "simd")]
    group.bench_function("v3_simd_batch", |b| {
        b.iter(|| {
            let results = TemperatureRecord::validate_batch(&temperatures);
            black_box(results.iter().filter(|&&x| x).count())
        })
    });

    group.finish();
}

/// Benchmark data merging: V2 vs V3 optimized
fn benchmark_data_merger_comparison(c: &mut Criterion) {
    let (stations, temp_records) = create_test_data(20, 50);
    let station_map: HashMap<u32, StationMetadata> =
        stations.into_iter().map(|s| (s.staid, s)).collect();

    // Group temperatures by station and type
    let mut min_temps: HashMap<u32, Vec<TemperatureRecord>> = HashMap::new();
    let mut max_temps: HashMap<u32, Vec<TemperatureRecord>> = HashMap::new();
    let mut avg_temps: HashMap<u32, Vec<TemperatureRecord>> = HashMap::new();

    for record in &temp_records {
        match record.souid {
            1 => min_temps
                .entry(record.staid)
                .or_insert_with(Vec::new)
                .push(record.clone()),
            2 => max_temps
                .entry(record.staid)
                .or_insert_with(Vec::new)
                .push(record.clone()),
            3 => avg_temps
                .entry(record.staid)
                .or_insert_with(Vec::new)
                .push(record.clone()),
            _ => {}
        }
    }

    let mut group = c.benchmark_group("data_merger");
    group.throughput(Throughput::Elements(temp_records.len() as u64));

    // V2 baseline merger
    group.bench_function("v2_baseline", |b| {
        b.iter(|| {
            let merger = DataMerger::new();
            let mut results = Vec::new();

            for (station_id, min_vec) in &min_temps {
                if let Some(station) = station_map.get(station_id) {
                    let max_vec = max_temps.get(station_id).cloned().unwrap_or_default();
                    let avg_vec = avg_temps.get(station_id).cloned().unwrap_or_default();

                    if let Ok(consolidated) =
                        merger.merge_station_data(station, min_vec.clone(), max_vec, avg_vec)
                    {
                        results.extend(consolidated);
                    }
                }
            }

            black_box(results.len())
        })
    });

    // V3 optimized merger
    #[cfg(feature = "lockfree")]
    group.bench_function("v3_optimized", |b| {
        b.iter(|| {
            let merger = DataMerger::new();
            let mut results = Vec::new();

            for (station_id, min_vec) in &min_temps {
                if let Some(station) = station_map.get(station_id) {
                    let max_vec = max_temps.get(station_id).cloned().unwrap_or_default();
                    let avg_vec = avg_temps.get(station_id).cloned().unwrap_or_default();

                    if let Ok(consolidated) = merger.merge_station_data_optimized(
                        station,
                        min_vec.clone(),
                        max_vec,
                        avg_vec,
                    ) {
                        results.extend(consolidated);
                    }
                }
            }

            black_box(results.len())
        })
    });

    group.finish();
}

/// Benchmark coordinate conversion: V2 vs V3 cached
fn benchmark_coordinate_conversion_comparison(c: &mut Criterion) {
    let coordinates = vec![
        "51:30:15", "52:12:30", "50:45:22", "53:18:45", "49:55:30", "51:25:10", "52:35:40",
        "50:15:55", "53:45:20", "49:30:35", "51:30:15", "52:12:30",
        "50:45:22", // Repeat some for cache hits
    ];

    let mut group = c.benchmark_group("coordinate_conversion");
    group.throughput(Throughput::Elements(coordinates.len() as u64));

    // V2 baseline - no caching
    group.bench_function("v2_no_cache", |b| {
        b.iter(|| {
            let mut results = Vec::new();
            for dms in &coordinates {
                if let Ok(decimal) = dms_to_decimal(dms) {
                    results.push(decimal);
                }
            }
            black_box(results.len())
        })
    });

    // V3 cached conversion
    #[cfg(feature = "caching")]
    group.bench_function("v3_cached", |b| {
        b.iter(|| {
            let mut results = Vec::new();
            for dms in &coordinates {
                if let Ok(decimal) = dms_to_decimal_cached(dms) {
                    results.push(decimal);
                }
            }
            black_box(results.len())
        })
    });

    group.finish();
}

/// Benchmark scalability comparison
fn benchmark_scalability_comparison(c: &mut Criterion) {
    let mut group = c.benchmark_group("scalability");

    for &station_count in &[10, 50, 100, 250] {
        group.throughput(Throughput::Elements(station_count as u64 * 30 * 3)); // stations * days * temp_types

        let (stations, temp_records) = create_test_data(station_count, 30);
        let station_map: HashMap<u32, StationMetadata> =
            stations.into_iter().map(|s| (s.staid, s)).collect();

        // Group data
        let mut min_temps: HashMap<u32, Vec<TemperatureRecord>> = HashMap::new();
        let mut max_temps: HashMap<u32, Vec<TemperatureRecord>> = HashMap::new();
        let mut avg_temps: HashMap<u32, Vec<TemperatureRecord>> = HashMap::new();

        for record in &temp_records {
            match record.souid {
                1 => min_temps
                    .entry(record.staid)
                    .or_insert_with(Vec::new)
                    .push(record.clone()),
                2 => max_temps
                    .entry(record.staid)
                    .or_insert_with(Vec::new)
                    .push(record.clone()),
                3 => avg_temps
                    .entry(record.staid)
                    .or_insert_with(Vec::new)
                    .push(record.clone()),
                _ => {}
            }
        }

        // V2 baseline
        group.bench_with_input(
            BenchmarkId::new("v2_baseline", station_count),
            &station_count,
            |b, _| {
                b.iter(|| {
                    let merger = DataMerger::new();
                    let mut results = Vec::new();

                    for (station_id, min_vec) in &min_temps {
                        if let Some(station) = station_map.get(station_id) {
                            let max_vec = max_temps.get(station_id).cloned().unwrap_or_default();
                            let avg_vec = avg_temps.get(station_id).cloned().unwrap_or_default();

                            if let Ok(consolidated) = merger.merge_station_data(
                                station,
                                min_vec.clone(),
                                max_vec,
                                avg_vec,
                            ) {
                                results.extend(consolidated);
                            }
                        }
                    }

                    black_box(results.len())
                })
            },
        );

        // V3 optimized
        #[cfg(feature = "lockfree")]
        group.bench_with_input(
            BenchmarkId::new("v3_optimized", station_count),
            &station_count,
            |b, _| {
                b.iter(|| {
                    let merger = DataMerger::new();
                    let mut results = Vec::new();

                    for (station_id, min_vec) in &min_temps {
                        if let Some(station) = station_map.get(station_id) {
                            let max_vec = max_temps.get(station_id).cloned().unwrap_or_default();
                            let avg_vec = avg_temps.get(station_id).cloned().unwrap_or_default();

                            if let Ok(consolidated) = merger.merge_station_data_optimized(
                                station,
                                min_vec.clone(),
                                max_vec,
                                avg_vec,
                            ) {
                                results.extend(consolidated);
                            }
                        }
                    }

                    black_box(results.len())
                })
            },
        );
    }

    group.finish();
}

criterion_group!(
    v3_comparisons,
    benchmark_temperature_validation_comparison,
    benchmark_data_merger_comparison,
    benchmark_coordinate_conversion_comparison,
    benchmark_scalability_comparison
);

criterion_main!(v3_comparisons);