elasticube_core/
optimization.rs

1//! Performance optimization features for ElastiCube
2//!
3//! Provides configuration for query optimization, storage optimization,
4//! and caching to improve analytical query performance.
5
6use datafusion::execution::config::SessionConfig;
7use datafusion::execution::runtime_env::RuntimeEnv;
8use std::sync::Arc;
9
10/// Configuration for query optimization
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct OptimizationConfig {
13    /// Target number of partitions for parallel query execution
14    /// Higher values enable more parallelism but increase overhead
15    /// Default: number of CPU cores
16    pub target_partitions: usize,
17
18    /// Enable/disable query optimization
19    /// Default: true
20    pub enable_optimizer: bool,
21
22    /// Enable/disable predicate pushdown optimization
23    /// Pushes filters as early as possible in the query plan
24    /// Default: true
25    pub enable_predicate_pushdown: bool,
26
27    /// Enable/disable projection pushdown optimization
28    /// Only reads columns that are actually needed
29    /// Default: true
30    pub enable_projection_pushdown: bool,
31
32    /// Enable/disable filter pushdown to Parquet readers
33    /// Uses Parquet row group statistics to skip reading unnecessary data
34    /// Default: true
35    pub enable_parquet_pushdown: bool,
36
37    /// Batch size for query execution
38    /// Larger batches improve throughput but use more memory
39    /// Default: 8192
40    pub batch_size: usize,
41
42    /// Enable query result caching
43    /// Default: true
44    pub enable_query_cache: bool,
45
46    /// Maximum number of cached query results
47    /// Default: 100
48    pub max_cache_entries: usize,
49
50    /// Memory limit for query execution (in bytes)
51    /// None means unlimited
52    /// Default: None
53    pub memory_limit: Option<usize>,
54}
55
56impl Default for OptimizationConfig {
57    fn default() -> Self {
58        Self {
59            target_partitions: num_cpus::get(),
60            enable_optimizer: true,
61            enable_predicate_pushdown: true,
62            enable_projection_pushdown: true,
63            enable_parquet_pushdown: true,
64            batch_size: 8192,
65            enable_query_cache: true,
66            max_cache_entries: 100,
67            memory_limit: None,
68        }
69    }
70}
71
72impl OptimizationConfig {
73    /// Create a new optimization configuration with default settings
74    pub fn new() -> Self {
75        Self::default()
76    }
77
78    /// Set target partitions for parallel execution
79    pub fn with_target_partitions(mut self, partitions: usize) -> Self {
80        self.target_partitions = partitions;
81        self
82    }
83
84    /// Set batch size for query execution
85    pub fn with_batch_size(mut self, size: usize) -> Self {
86        self.batch_size = size;
87        self
88    }
89
90    /// Enable or disable predicate pushdown
91    pub fn with_predicate_pushdown(mut self, enabled: bool) -> Self {
92        self.enable_predicate_pushdown = enabled;
93        self
94    }
95
96    /// Enable or disable projection pushdown
97    pub fn with_projection_pushdown(mut self, enabled: bool) -> Self {
98        self.enable_projection_pushdown = enabled;
99        self
100    }
101
102    /// Enable or disable Parquet pushdown optimizations
103    pub fn with_parquet_pushdown(mut self, enabled: bool) -> Self {
104        self.enable_parquet_pushdown = enabled;
105        self
106    }
107
108    /// Set memory limit for query execution
109    pub fn with_memory_limit(mut self, limit: usize) -> Self {
110        self.memory_limit = Some(limit);
111        self
112    }
113
114    /// Enable or disable query result caching
115    pub fn with_query_cache(mut self, enabled: bool) -> Self {
116        self.enable_query_cache = enabled;
117        self
118    }
119
120    /// Set maximum number of cached query results
121    pub fn with_max_cache_entries(mut self, max: usize) -> Self {
122        self.max_cache_entries = max;
123        self
124    }
125
126    /// Create a DataFusion SessionConfig from this optimization config
127    pub fn to_session_config(&self) -> SessionConfig {
128        let config = SessionConfig::new()
129            .with_target_partitions(self.target_partitions)
130            .with_batch_size(self.batch_size);
131
132        // Note: DataFusion 50+ has different APIs for optimizer rules
133        // The optimizer rules are enabled by default
134        // We can configure them via SessionConfig options if needed
135
136        config
137    }
138
139    /// Create a DataFusion RuntimeEnv from this optimization config
140    pub fn to_runtime_env(&self) -> Arc<RuntimeEnv> {
141        // DataFusion 50+ uses RuntimeEnv::default() or RuntimeEnv::new()
142        // Memory limits are configured differently in newer versions
143        // For now, we'll use the default RuntimeEnv
144        Arc::new(RuntimeEnv::default())
145    }
146}
147
148/// Statistics for a cube's data
149#[derive(Debug, Clone)]
150pub struct CubeStatistics {
151    /// Total number of rows
152    pub row_count: usize,
153
154    /// Number of RecordBatch partitions
155    pub partition_count: usize,
156
157    /// Average rows per partition
158    pub avg_rows_per_partition: usize,
159
160    /// Total memory usage (estimated)
161    pub memory_bytes: usize,
162
163    /// Per-column statistics
164    pub column_stats: Vec<ColumnStatistics>,
165}
166
167impl CubeStatistics {
168    /// Create statistics from cube data
169    pub fn from_batches(batches: &[arrow::record_batch::RecordBatch]) -> Self {
170        let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
171        let partition_count = batches.len();
172        let avg_rows_per_partition = if partition_count > 0 {
173            row_count / partition_count
174        } else {
175            0
176        };
177
178        // Estimate memory usage
179        let memory_bytes: usize = batches
180            .iter()
181            .map(|b| b.get_array_memory_size())
182            .sum();
183
184        // Collect column statistics
185        let column_stats = if let Some(first_batch) = batches.first() {
186            let schema = first_batch.schema();
187            (0..schema.fields().len())
188                .map(|col_idx| ColumnStatistics::from_batches(batches, col_idx))
189                .collect()
190        } else {
191            Vec::new()
192        };
193
194        Self {
195            row_count,
196            partition_count,
197            avg_rows_per_partition,
198            memory_bytes,
199            column_stats,
200        }
201    }
202
203    /// Get a human-readable summary
204    pub fn summary(&self) -> String {
205        format!(
206            "Rows: {}, Partitions: {}, Memory: {:.2} MB",
207            self.row_count,
208            self.partition_count,
209            self.memory_bytes as f64 / 1_048_576.0
210        )
211    }
212}
213
214/// Statistics for a single column
215#[derive(Debug, Clone)]
216pub struct ColumnStatistics {
217    /// Column index
218    pub column_index: usize,
219
220    /// Column name
221    pub column_name: String,
222
223    /// Number of null values
224    pub null_count: usize,
225
226    /// Null percentage
227    pub null_percentage: f64,
228
229    /// Estimated distinct values (cardinality)
230    /// None if not computed
231    pub distinct_count: Option<usize>,
232}
233
234impl ColumnStatistics {
235    /// Compute statistics for a column across all batches
236    fn from_batches(batches: &[arrow::record_batch::RecordBatch], col_idx: usize) -> Self {
237        let schema = batches.first().map(|b| b.schema()).unwrap();
238        let column_name = schema.field(col_idx).name().clone();
239
240        let mut total_nulls = 0;
241        let mut total_rows = 0;
242
243        for batch in batches {
244            let array = batch.column(col_idx);
245            total_nulls += array.null_count();
246            total_rows += array.len();
247        }
248
249        let null_percentage = if total_rows > 0 {
250            (total_nulls as f64 / total_rows as f64) * 100.0
251        } else {
252            0.0
253        };
254
255        Self {
256            column_index: col_idx,
257            column_name,
258            null_count: total_nulls,
259            null_percentage,
260            distinct_count: None, // Computing distinct count is expensive, skip for now
261        }
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268
269    #[test]
270    fn test_optimization_config_default() {
271        let config = OptimizationConfig::default();
272        assert!(config.enable_optimizer);
273        assert!(config.enable_predicate_pushdown);
274        assert!(config.enable_projection_pushdown);
275        assert_eq!(config.batch_size, 8192);
276    }
277
278    #[test]
279    fn test_optimization_config_builder() {
280        let config = OptimizationConfig::new()
281            .with_target_partitions(8)
282            .with_batch_size(4096)
283            .with_predicate_pushdown(false)
284            .with_memory_limit(1_000_000_000);
285
286        assert_eq!(config.target_partitions, 8);
287        assert_eq!(config.batch_size, 4096);
288        assert!(!config.enable_predicate_pushdown);
289        assert_eq!(config.memory_limit, Some(1_000_000_000));
290    }
291
292    #[test]
293    fn test_session_config_creation() {
294        let config = OptimizationConfig::new()
295            .with_target_partitions(4)
296            .with_batch_size(1024);
297
298        let session_config = config.to_session_config();
299        assert_eq!(session_config.target_partitions(), 4);
300        assert_eq!(session_config.batch_size(), 1024);
301    }
302}