use crate::models::DatasetType;
use polars::prelude::ParquetCompression;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::debug;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParquetOptimizationConfig {
pub partition_by_station: bool,
pub target_row_group_size: usize,
pub compression_algorithm: CompressionAlgorithm,
pub enable_statistics: bool,
pub parallel_write_threads: usize,
pub dictionary_page_size: usize,
pub data_page_size: usize,
pub row_group_strategy: RowGroupStrategy,
pub memory_limit_mb: usize,
pub external_sort_enabled: bool,
pub optimize_for_stations: bool,
pub min_row_group_size: usize,
pub max_accumulation_memory_mb: usize,
pub merge_station_files: bool,
pub cleanup_station_files: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RowGroupStrategy {
Fixed,
Adaptive,
StationAware,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CompressionAlgorithm {
Snappy,
Zstd,
Lz4,
Uncompressed,
}
impl Default for ParquetOptimizationConfig {
fn default() -> Self {
Self {
partition_by_station: true,
target_row_group_size: 0, compression_algorithm: CompressionAlgorithm::Snappy,
enable_statistics: true,
parallel_write_threads: 4,
dictionary_page_size: 1024 * 1024, data_page_size: 1024 * 1024, row_group_strategy: RowGroupStrategy::Adaptive,
memory_limit_mb: 0, external_sort_enabled: true,
optimize_for_stations: true,
min_row_group_size: 250_000, max_accumulation_memory_mb: 2048, merge_station_files: true, cleanup_station_files: false, }
}
}
impl CompressionAlgorithm {
pub fn to_polars_compression(&self) -> ParquetCompression {
match self {
CompressionAlgorithm::Snappy => ParquetCompression::Snappy,
CompressionAlgorithm::Zstd => ParquetCompression::Zstd(None),
CompressionAlgorithm::Lz4 => ParquetCompression::Lz4Raw,
CompressionAlgorithm::Uncompressed => ParquetCompression::Uncompressed,
}
}
}
#[derive(Debug, Clone)]
pub struct SystemProfile {
pub cpu_cores: usize,
pub memory_mb: usize,
pub performance_cores: usize,
}
impl SystemProfile {
pub fn detect() -> Self {
use sysinfo::System;
let cpu_cores = num_cpus::get();
let performance_cores = num_cpus::get_physical();
let mut system = System::new();
system.refresh_memory();
let memory_mb = (system.total_memory() / 1024 / 1024) as usize;
Self {
cpu_cores,
memory_mb,
performance_cores,
}
}
}
impl ParquetOptimizationConfig {
pub fn calculate_optimal_row_group_size(
&self,
total_rows: usize,
station_count: usize,
system_profile: &SystemProfile,
) -> usize {
if self.target_row_group_size > 0 {
return self.target_row_group_size;
}
let optimal_size = match self.row_group_strategy {
RowGroupStrategy::Fixed => 500_000, RowGroupStrategy::Adaptive => {
self.calculate_adaptive_row_group_size(total_rows, system_profile)
}
RowGroupStrategy::StationAware => self.calculate_station_aware_row_group_size(
total_rows,
station_count,
system_profile,
),
};
debug!(
"Row group optimization: {} rows (strategy: {:?}, {} stations, {} cores, {}MB memory)",
optimal_size,
self.row_group_strategy,
station_count,
system_profile.performance_cores,
system_profile.memory_mb
);
optimal_size
}
fn calculate_adaptive_row_group_size(
&self,
_total_rows: usize,
system_profile: &SystemProfile,
) -> usize {
let target_mb = 256;
let estimated_bytes_per_row = 100;
let target_rows_from_size = (target_mb * 1024 * 1024) / estimated_bytes_per_row;
let target_rows_from_cores = system_profile.performance_cores * 100_000;
let memory_limit_mb = if self.memory_limit_mb > 0 {
self.memory_limit_mb
} else {
system_profile.memory_mb
};
let max_rows_from_memory = (memory_limit_mb / 8) * 1024 * 1024 / estimated_bytes_per_row;
let optimal_size = target_rows_from_size.min(target_rows_from_cores);
let final_size = optimal_size.min(max_rows_from_memory);
final_size.clamp(100_000, 2_000_000)
}
fn calculate_station_aware_row_group_size(
&self,
total_rows: usize,
station_count: usize,
system_profile: &SystemProfile,
) -> usize {
if station_count == 0 {
return self.calculate_adaptive_row_group_size(total_rows, system_profile);
}
let avg_rows_per_station = total_rows / station_count;
let target_stations_per_group = (station_count / system_profile.performance_cores).max(1);
let station_based_size = avg_rows_per_station * target_stations_per_group;
let adaptive_size = self.calculate_adaptive_row_group_size(total_rows, system_profile);
station_based_size
.min(adaptive_size)
.clamp(250_000, 2_000_000)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MidasConfig {
pub workers: usize,
pub sample_size: usize,
pub enable_column_elimination: bool,
pub chunk_size: usize,
pub batch_size: usize,
pub force_reprocess: bool,
pub discovery_only: bool,
pub skip_schema_validation: bool,
pub enable_gpu: bool,
pub streaming_chunk_size: usize,
pub max_concurrent_files: usize,
pub parquet_optimization: ParquetOptimizationConfig,
pub dataset_configs: HashMap<DatasetType, DatasetSpecificConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub enum UnionStrategy {
Standard,
Diagonal,
#[default]
Adaptive,
}
fn default_max_schema_variance() -> usize {
3
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatasetSpecificConfig {
pub excluded_columns: Vec<String>,
pub file_patterns: Vec<String>,
pub schema_validation: SchemaValidation,
#[serde(default)]
pub union_strategy: UnionStrategy,
#[serde(default = "default_max_schema_variance")]
pub max_schema_variance: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchemaValidation {
pub strict: bool,
pub allow_column_variations: bool,
pub max_column_diff: usize,
}
impl Default for MidasConfig {
fn default() -> Self {
let mut dataset_configs = HashMap::new();
dataset_configs.insert(
DatasetType::Rain,
DatasetSpecificConfig {
excluded_columns: vec![
"prcp_amt_j".to_string(),
"meto_stmp_time".to_string(),
"midas_stmp_etime".to_string(),
],
file_patterns: vec!["*rain*.csv".to_string()],
schema_validation: SchemaValidation {
strict: false,
allow_column_variations: true,
max_column_diff: 2,
},
union_strategy: UnionStrategy::Diagonal, max_schema_variance: 3,
},
);
dataset_configs.insert(
DatasetType::Temperature,
DatasetSpecificConfig {
excluded_columns: vec![
"min_grss_temp".to_string(),
"min_conc_temp".to_string(),
"min_grss_temp_q".to_string(),
"min_conc_temp_q".to_string(),
"min_grss_temp_j".to_string(),
"min_conc_temp_j".to_string(),
],
file_patterns: vec!["*temperature*.csv".to_string()],
schema_validation: SchemaValidation {
strict: false,
allow_column_variations: true,
max_column_diff: 3,
},
union_strategy: UnionStrategy::Adaptive, max_schema_variance: 3,
},
);
dataset_configs.insert(
DatasetType::Wind,
DatasetSpecificConfig {
excluded_columns: vec![
"mean_wind_dir".to_string(),
"max_gust_dir".to_string(),
"max_gust_ctime".to_string(),
],
file_patterns: vec!["*wind*.csv".to_string()],
schema_validation: SchemaValidation {
strict: false,
allow_column_variations: true,
max_column_diff: 4,
},
union_strategy: UnionStrategy::Adaptive, max_schema_variance: 4,
},
);
dataset_configs.insert(
DatasetType::Radiation,
DatasetSpecificConfig {
excluded_columns: vec![],
file_patterns: vec!["*radiation*.csv".to_string()],
schema_validation: SchemaValidation {
strict: false,
allow_column_variations: true,
max_column_diff: 2,
},
union_strategy: UnionStrategy::Standard, max_schema_variance: 2,
},
);
Self {
workers: 4,
sample_size: 20,
enable_column_elimination: true,
chunk_size: 8192,
batch_size: 80, force_reprocess: false,
discovery_only: false,
skip_schema_validation: false,
enable_gpu: false,
streaming_chunk_size: 100000, max_concurrent_files: 8, parquet_optimization: ParquetOptimizationConfig::default(),
dataset_configs,
}
}
}
impl MidasConfig {
pub fn with_workers(mut self, workers: usize) -> Self {
self.workers = workers;
self
}
pub fn with_sample_size(mut self, sample_size: usize) -> Self {
self.sample_size = sample_size;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
pub fn without_column_elimination(mut self) -> Self {
self.enable_column_elimination = false;
self
}
pub fn with_force_reprocess(mut self) -> Self {
self.force_reprocess = true;
self
}
pub fn with_discovery_only(mut self) -> Self {
self.discovery_only = true;
self
}
pub fn with_gpu(mut self) -> Self {
self.enable_gpu = true;
self
}
pub fn with_minimal_memory(mut self) -> Self {
self.streaming_chunk_size = 8192; self
}
pub fn with_streaming_chunk_size(mut self, chunk_size: usize) -> Self {
self.streaming_chunk_size = chunk_size;
self
}
pub fn with_max_concurrent_files(mut self, max_files: usize) -> Self {
self.max_concurrent_files = max_files;
self
}
pub fn with_parquet_optimization(mut self, config: ParquetOptimizationConfig) -> Self {
self.parquet_optimization = config;
self
}
pub fn get_dataset_config(&self, dataset_type: &DatasetType) -> Option<&DatasetSpecificConfig> {
self.dataset_configs.get(dataset_type)
}
}