dbx_core/engine/
parallel_engine.rs1use crate::error::{DbxError, DbxResult};
7use rayon::ThreadPoolBuilder;
8use std::sync::Arc;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
12pub enum ParallelizationPolicy {
13 #[default]
15 Auto,
16 Fixed(usize),
18 Adaptive,
20}
21
22#[derive(Debug, Clone)]
35pub struct ParallelismConfig {
36 pub cpu_cap: f64,
40 pub min_rows_for_parallel: usize,
43}
44
45impl Default for ParallelismConfig {
46 fn default() -> Self {
47 Self {
48 cpu_cap: 1.0,
49 min_rows_for_parallel: 1_000,
50 }
51 }
52}
53
54impl ParallelismConfig {
55 pub fn conservative() -> Self {
57 Self {
58 cpu_cap: 0.5,
59 min_rows_for_parallel: 5_000,
60 }
61 }
62
63 pub fn aggressive() -> Self {
65 Self {
66 cpu_cap: 1.0,
67 min_rows_for_parallel: 500,
68 }
69 }
70}
71
72pub struct ParallelExecutionEngine {
74 thread_pool: Arc<rayon::ThreadPool>,
75 policy: ParallelizationPolicy,
76 pub db_config: DbConfig,
78}
79
80impl ParallelExecutionEngine {
81 pub fn new(policy: ParallelizationPolicy) -> DbxResult<Self> {
83 Self::new_with_config(policy, DbConfig::default())
84 }
85
86 pub fn new_with_config(policy: ParallelizationPolicy, config: DbConfig) -> DbxResult<Self> {
90 let cpu_cap = config.parallelism.cpu_cap.clamp(0.01, 1.0);
91 let base_threads = Self::determine_thread_count(policy);
92 let capped_threads = ((base_threads as f64 * cpu_cap).ceil() as usize).max(1);
93
94 let thread_pool = ThreadPoolBuilder::new()
95 .num_threads(capped_threads)
96 .thread_name(|i| format!("dbx-parallel-{}", i))
97 .build()
98 .map_err(|e| {
99 DbxError::NotImplemented(format!("Failed to create thread pool: {}", e))
100 })?;
101
102 Ok(Self {
103 thread_pool: Arc::new(thread_pool),
104 policy,
105 db_config: config,
106 })
107 }
108
109 pub fn new_auto() -> DbxResult<Self> {
111 Self::new(ParallelizationPolicy::Auto)
112 }
113
114 pub fn new_fixed(num_threads: usize) -> DbxResult<Self> {
116 if num_threads == 0 {
117 return Err(DbxError::InvalidArguments(
118 "Thread count must be greater than 0".to_string(),
119 ));
120 }
121 Self::new(ParallelizationPolicy::Fixed(num_threads))
122 }
123
124 pub fn policy(&self) -> ParallelizationPolicy {
126 self.policy
127 }
128
129 pub fn thread_count(&self) -> usize {
131 self.thread_pool.current_num_threads()
132 }
133
134 pub fn thread_pool(&self) -> &rayon::ThreadPool {
136 &self.thread_pool
137 }
138
139 pub fn config(&self) -> &ParallelismConfig {
141 &self.db_config.parallelism
142 }
143
144 pub fn db_config(&self) -> &DbConfig {
146 &self.db_config
147 }
148
149 pub fn should_parallelize_rows(&self, row_count: usize) -> bool {
152 row_count >= self.db_config.parallelism.min_rows_for_parallel && self.thread_count() > 1
153 }
154
155 pub fn execute<F, R>(&self, f: F) -> R
157 where
158 F: FnOnce() -> R + Send,
159 R: Send,
160 {
161 self.thread_pool.install(f)
162 }
163
164 fn determine_thread_count(policy: ParallelizationPolicy) -> usize {
166 match policy {
167 ParallelizationPolicy::Auto => {
168 let num_cpus = num_cpus::get();
170 num_cpus.min(16)
171 }
172 ParallelizationPolicy::Fixed(n) => n,
173 ParallelizationPolicy::Adaptive => {
174 let num_cpus = num_cpus::get();
177 (num_cpus / 2).max(1)
178 }
179 }
180 }
181
182 pub fn auto_tune(&self, workload_size: usize) -> usize {
186 self.auto_tune_weighted(workload_size, 1.0)
187 }
188
189 pub fn auto_tune_weighted(&self, workload_size: usize, avg_complexity: f64) -> usize {
193 let thread_count = self.thread_count();
194
195 match self.policy {
196 ParallelizationPolicy::Auto | ParallelizationPolicy::Adaptive => {
197 let base_threshold: f64 = 1000.0;
201 let adjusted_threshold =
202 (base_threshold / avg_complexity.max(0.1)).max(1.0) as usize;
203
204 if workload_size < adjusted_threshold {
205 1
206 } else {
207 let optimal = (workload_size / adjusted_threshold).min(thread_count);
208 optimal.max(1)
209 }
210 }
211 ParallelizationPolicy::Fixed(_) => thread_count,
212 }
213 }
214
215 pub fn estimate_query_complexity(sql: &str) -> f64 {
219 let sql_upper = sql.to_uppercase();
220 let mut score = 1.0;
221
222 let join_count = sql_upper.matches("JOIN").count();
224 score += join_count as f64 * 2.0;
225
226 let subquery_depth = sql_upper.matches("SELECT").count().saturating_sub(1);
228 score += subquery_depth as f64 * 3.0;
229
230 if sql_upper.contains("WITH ") {
232 score += 4.0;
233 }
234
235 let union_count = sql_upper.matches("UNION").count();
237 score += union_count as f64 * 2.5;
238
239 for func in ["COUNT(", "SUM(", "AVG(", "MAX(", "MIN("] {
241 score += sql_upper.matches(func).count() as f64 * 0.5;
242 }
243
244 if sql_upper.contains("OVER(") || sql_upper.contains("OVER (") {
246 score += 3.0;
247 }
248
249 if sql_upper.contains("ORDER BY") {
251 score += 0.5;
252 }
253 if sql_upper.contains("GROUP BY") {
254 score += 1.0;
255 }
256 if sql_upper.contains("HAVING") {
257 score += 1.0;
258 }
259
260 score += (sql.len() as f64 / 200.0).min(5.0);
262
263 score
264 }
265
266 pub fn should_parallelize(&self, workload_size: usize) -> bool {
268 self.auto_tune(workload_size) > 1
269 }
270}
271
272impl Default for ParallelExecutionEngine {
273 fn default() -> Self {
274 Self::new_auto().expect("Failed to create default parallel execution engine")
275 }
276}
277
278use crate::replication::transport::ReplicationConfig;
279use crate::storage::realtime_sync::RealtimeSyncConfig;
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
290pub enum DirtyBufferMode {
291 #[default]
293 BTreeMap,
294 DashMap,
296}
297
298#[derive(Debug, Clone, Default)]
312pub struct DbConfig {
313 pub parallelism: ParallelismConfig,
315 pub sync: RealtimeSyncConfig,
317 pub replication: ReplicationConfig,
319 pub dirty_buffer_mode: DirtyBufferMode,
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326
327 #[test]
328 fn test_new_auto() {
329 let engine = ParallelExecutionEngine::new_auto().unwrap();
330 assert_eq!(engine.policy(), ParallelizationPolicy::Auto);
331 assert!(engine.thread_count() > 0);
332 }
333
334 #[test]
335 fn test_new_fixed() {
336 let engine = ParallelExecutionEngine::new_fixed(4).unwrap();
337 assert_eq!(engine.policy(), ParallelizationPolicy::Fixed(4));
338 assert_eq!(engine.thread_count(), 4);
339 }
340
341 #[test]
342 fn test_new_fixed_zero_threads() {
343 let result = ParallelExecutionEngine::new_fixed(0);
344 assert!(result.is_err());
345 }
346
347 #[test]
348 fn test_execute() {
349 let engine = ParallelExecutionEngine::new_auto().unwrap();
350 let result = engine.execute(|| 42);
351 assert_eq!(result, 42);
352 }
353
354 #[test]
355 fn test_auto_tune_small_workload() {
356 let engine = ParallelExecutionEngine::new_auto().unwrap();
357 let parallelism = engine.auto_tune(500);
358 assert_eq!(parallelism, 1); }
360
361 #[test]
362 fn test_auto_tune_large_workload() {
363 let engine = ParallelExecutionEngine::new_auto().unwrap();
364 let parallelism = engine.auto_tune(100_000);
365 assert!(parallelism > 1); }
367
368 #[test]
369 fn test_should_parallelize() {
370 let engine = ParallelExecutionEngine::new_auto().unwrap();
371 assert!(!engine.should_parallelize(500)); assert!(engine.should_parallelize(100_000)); }
374
375 #[test]
376 fn test_fixed_policy_always_uses_all_threads() {
377 let engine = ParallelExecutionEngine::new_fixed(8).unwrap();
378 let parallelism = engine.auto_tune(100);
379 assert_eq!(parallelism, 8); }
381}