elasticube_core/
optimization.rs1use datafusion::execution::config::SessionConfig;
7use datafusion::execution::runtime_env::RuntimeEnv;
8use std::sync::Arc;
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct OptimizationConfig {
13 pub target_partitions: usize,
17
18 pub enable_optimizer: bool,
21
22 pub enable_predicate_pushdown: bool,
26
27 pub enable_projection_pushdown: bool,
31
32 pub enable_parquet_pushdown: bool,
36
37 pub batch_size: usize,
41
42 pub enable_query_cache: bool,
45
46 pub max_cache_entries: usize,
49
50 pub memory_limit: Option<usize>,
54}
55
56impl Default for OptimizationConfig {
57 fn default() -> Self {
58 Self {
59 target_partitions: num_cpus::get(),
60 enable_optimizer: true,
61 enable_predicate_pushdown: true,
62 enable_projection_pushdown: true,
63 enable_parquet_pushdown: true,
64 batch_size: 8192,
65 enable_query_cache: true,
66 max_cache_entries: 100,
67 memory_limit: None,
68 }
69 }
70}
71
72impl OptimizationConfig {
73 pub fn new() -> Self {
75 Self::default()
76 }
77
78 pub fn with_target_partitions(mut self, partitions: usize) -> Self {
80 self.target_partitions = partitions;
81 self
82 }
83
84 pub fn with_batch_size(mut self, size: usize) -> Self {
86 self.batch_size = size;
87 self
88 }
89
90 pub fn with_predicate_pushdown(mut self, enabled: bool) -> Self {
92 self.enable_predicate_pushdown = enabled;
93 self
94 }
95
96 pub fn with_projection_pushdown(mut self, enabled: bool) -> Self {
98 self.enable_projection_pushdown = enabled;
99 self
100 }
101
102 pub fn with_parquet_pushdown(mut self, enabled: bool) -> Self {
104 self.enable_parquet_pushdown = enabled;
105 self
106 }
107
108 pub fn with_memory_limit(mut self, limit: usize) -> Self {
110 self.memory_limit = Some(limit);
111 self
112 }
113
114 pub fn with_query_cache(mut self, enabled: bool) -> Self {
116 self.enable_query_cache = enabled;
117 self
118 }
119
120 pub fn with_max_cache_entries(mut self, max: usize) -> Self {
122 self.max_cache_entries = max;
123 self
124 }
125
126 pub fn to_session_config(&self) -> SessionConfig {
128 let config = SessionConfig::new()
129 .with_target_partitions(self.target_partitions)
130 .with_batch_size(self.batch_size);
131
132 config
137 }
138
139 pub fn to_runtime_env(&self) -> Arc<RuntimeEnv> {
141 Arc::new(RuntimeEnv::default())
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct CubeStatistics {
151 pub row_count: usize,
153
154 pub partition_count: usize,
156
157 pub avg_rows_per_partition: usize,
159
160 pub memory_bytes: usize,
162
163 pub column_stats: Vec<ColumnStatistics>,
165}
166
167impl CubeStatistics {
168 pub fn from_batches(batches: &[arrow::record_batch::RecordBatch]) -> Self {
170 let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
171 let partition_count = batches.len();
172 let avg_rows_per_partition = if partition_count > 0 {
173 row_count / partition_count
174 } else {
175 0
176 };
177
178 let memory_bytes: usize = batches
180 .iter()
181 .map(|b| b.get_array_memory_size())
182 .sum();
183
184 let column_stats = if let Some(first_batch) = batches.first() {
186 let schema = first_batch.schema();
187 (0..schema.fields().len())
188 .map(|col_idx| ColumnStatistics::from_batches(batches, col_idx))
189 .collect()
190 } else {
191 Vec::new()
192 };
193
194 Self {
195 row_count,
196 partition_count,
197 avg_rows_per_partition,
198 memory_bytes,
199 column_stats,
200 }
201 }
202
203 pub fn summary(&self) -> String {
205 format!(
206 "Rows: {}, Partitions: {}, Memory: {:.2} MB",
207 self.row_count,
208 self.partition_count,
209 self.memory_bytes as f64 / 1_048_576.0
210 )
211 }
212}
213
214#[derive(Debug, Clone)]
216pub struct ColumnStatistics {
217 pub column_index: usize,
219
220 pub column_name: String,
222
223 pub null_count: usize,
225
226 pub null_percentage: f64,
228
229 pub distinct_count: Option<usize>,
232}
233
234impl ColumnStatistics {
235 fn from_batches(batches: &[arrow::record_batch::RecordBatch], col_idx: usize) -> Self {
237 let schema = batches.first().map(|b| b.schema()).unwrap();
238 let column_name = schema.field(col_idx).name().clone();
239
240 let mut total_nulls = 0;
241 let mut total_rows = 0;
242
243 for batch in batches {
244 let array = batch.column(col_idx);
245 total_nulls += array.null_count();
246 total_rows += array.len();
247 }
248
249 let null_percentage = if total_rows > 0 {
250 (total_nulls as f64 / total_rows as f64) * 100.0
251 } else {
252 0.0
253 };
254
255 Self {
256 column_index: col_idx,
257 column_name,
258 null_count: total_nulls,
259 null_percentage,
260 distinct_count: None, }
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268
269 #[test]
270 fn test_optimization_config_default() {
271 let config = OptimizationConfig::default();
272 assert!(config.enable_optimizer);
273 assert!(config.enable_predicate_pushdown);
274 assert!(config.enable_projection_pushdown);
275 assert_eq!(config.batch_size, 8192);
276 }
277
278 #[test]
279 fn test_optimization_config_builder() {
280 let config = OptimizationConfig::new()
281 .with_target_partitions(8)
282 .with_batch_size(4096)
283 .with_predicate_pushdown(false)
284 .with_memory_limit(1_000_000_000);
285
286 assert_eq!(config.target_partitions, 8);
287 assert_eq!(config.batch_size, 4096);
288 assert!(!config.enable_predicate_pushdown);
289 assert_eq!(config.memory_limit, Some(1_000_000_000));
290 }
291
292 #[test]
293 fn test_session_config_creation() {
294 let config = OptimizationConfig::new()
295 .with_target_partitions(4)
296 .with_batch_size(1024);
297
298 let session_config = config.to_session_config();
299 assert_eq!(session_config.target_partitions(), 4);
300 assert_eq!(session_config.batch_size(), 1024);
301 }
302}