use datafusion::execution::config::SessionConfig;
use datafusion::execution::runtime_env::RuntimeEnv;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OptimizationConfig {
pub target_partitions: usize,
pub enable_optimizer: bool,
pub enable_predicate_pushdown: bool,
pub enable_projection_pushdown: bool,
pub enable_parquet_pushdown: bool,
pub batch_size: usize,
pub enable_query_cache: bool,
pub max_cache_entries: usize,
pub memory_limit: Option<usize>,
}
impl Default for OptimizationConfig {
fn default() -> Self {
Self {
target_partitions: num_cpus::get(),
enable_optimizer: true,
enable_predicate_pushdown: true,
enable_projection_pushdown: true,
enable_parquet_pushdown: true,
batch_size: 8192,
enable_query_cache: true,
max_cache_entries: 100,
memory_limit: None,
}
}
}
impl OptimizationConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_target_partitions(mut self, partitions: usize) -> Self {
self.target_partitions = partitions;
self
}
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
pub fn with_predicate_pushdown(mut self, enabled: bool) -> Self {
self.enable_predicate_pushdown = enabled;
self
}
pub fn with_projection_pushdown(mut self, enabled: bool) -> Self {
self.enable_projection_pushdown = enabled;
self
}
pub fn with_parquet_pushdown(mut self, enabled: bool) -> Self {
self.enable_parquet_pushdown = enabled;
self
}
pub fn with_memory_limit(mut self, limit: usize) -> Self {
self.memory_limit = Some(limit);
self
}
pub fn with_query_cache(mut self, enabled: bool) -> Self {
self.enable_query_cache = enabled;
self
}
pub fn with_max_cache_entries(mut self, max: usize) -> Self {
self.max_cache_entries = max;
self
}
pub fn to_session_config(&self) -> SessionConfig {
let config = SessionConfig::new()
.with_target_partitions(self.target_partitions)
.with_batch_size(self.batch_size);
config
}
pub fn to_runtime_env(&self) -> Arc<RuntimeEnv> {
Arc::new(RuntimeEnv::default())
}
}
#[derive(Debug, Clone)]
pub struct CubeStatistics {
pub row_count: usize,
pub partition_count: usize,
pub avg_rows_per_partition: usize,
pub memory_bytes: usize,
pub column_stats: Vec<ColumnStatistics>,
}
impl CubeStatistics {
pub fn from_batches(batches: &[arrow::record_batch::RecordBatch]) -> Self {
let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
let partition_count = batches.len();
let avg_rows_per_partition = if partition_count > 0 {
row_count / partition_count
} else {
0
};
let memory_bytes: usize = batches
.iter()
.map(|b| b.get_array_memory_size())
.sum();
let column_stats = if let Some(first_batch) = batches.first() {
let schema = first_batch.schema();
(0..schema.fields().len())
.map(|col_idx| ColumnStatistics::from_batches(batches, col_idx))
.collect()
} else {
Vec::new()
};
Self {
row_count,
partition_count,
avg_rows_per_partition,
memory_bytes,
column_stats,
}
}
pub fn summary(&self) -> String {
format!(
"Rows: {}, Partitions: {}, Memory: {:.2} MB",
self.row_count,
self.partition_count,
self.memory_bytes as f64 / 1_048_576.0
)
}
}
#[derive(Debug, Clone)]
pub struct ColumnStatistics {
pub column_index: usize,
pub column_name: String,
pub null_count: usize,
pub null_percentage: f64,
pub distinct_count: Option<usize>,
}
impl ColumnStatistics {
fn from_batches(batches: &[arrow::record_batch::RecordBatch], col_idx: usize) -> Self {
let schema = batches.first().map(|b| b.schema()).unwrap();
let column_name = schema.field(col_idx).name().clone();
let mut total_nulls = 0;
let mut total_rows = 0;
for batch in batches {
let array = batch.column(col_idx);
total_nulls += array.null_count();
total_rows += array.len();
}
let null_percentage = if total_rows > 0 {
(total_nulls as f64 / total_rows as f64) * 100.0
} else {
0.0
};
Self {
column_index: col_idx,
column_name,
null_count: total_nulls,
null_percentage,
distinct_count: None, }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_optimization_config_default() {
let config = OptimizationConfig::default();
assert!(config.enable_optimizer);
assert!(config.enable_predicate_pushdown);
assert!(config.enable_projection_pushdown);
assert_eq!(config.batch_size, 8192);
}
#[test]
fn test_optimization_config_builder() {
let config = OptimizationConfig::new()
.with_target_partitions(8)
.with_batch_size(4096)
.with_predicate_pushdown(false)
.with_memory_limit(1_000_000_000);
assert_eq!(config.target_partitions, 8);
assert_eq!(config.batch_size, 4096);
assert!(!config.enable_predicate_pushdown);
assert_eq!(config.memory_limit, Some(1_000_000_000));
}
#[test]
fn test_session_config_creation() {
let config = OptimizationConfig::new()
.with_target_partitions(4)
.with_batch_size(1024);
let session_config = config.to_session_config();
assert_eq!(session_config.target_partitions(), 4);
assert_eq!(session_config.batch_size(), 1024);
}
}