Skip to main content

dbx_core/engine/
parallel_engine.rs

1//! Parallel Execution Engine — Rayon-based parallel query execution
2//!
3//! This module provides a parallel execution engine that leverages Rayon's thread pool
4//! to execute queries in parallel, improving performance for large workloads.
5
6use crate::error::{DbxError, DbxResult};
7use rayon::ThreadPoolBuilder;
8use std::sync::Arc;
9
10/// Parallelization policy for the execution engine
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
12pub enum ParallelizationPolicy {
13    /// Automatically determine the number of threads based on system resources
14    #[default]
15    Auto,
16    /// Use a fixed number of threads
17    Fixed(usize),
18    /// Dynamically adjust thread count based on workload
19    Adaptive,
20}
21
22/// 병렬 처리 설정 — CPU 사용량 상한 및 임계값 제어
23///
24/// `Database::open_with_config()` 또는 `DbConfig`를 통해 전달합니다.
25///
26/// # 예시
27/// ```rust
28/// use dbx_core::engine::parallel_engine::ParallelismConfig;
29/// let config = ParallelismConfig {
30///     cpu_cap: 0.5,               // CPU 50%까지만 사용
31///     min_rows_for_parallel: 5000, // 5000행 이상에서만 병렬화
32/// };
33/// ```
34#[derive(Debug, Clone)]
35pub struct ParallelismConfig {
36    /// 최대 CPU 코어 사용 비율 (0.0 초과 ~ 1.0 이하).
37    /// 예: 0.5 → 8코어 머신에서 최대 4개 스레드.
38    /// 기본값: 1.0 (모든 코어 허용, 단 최대 16개 캡)
39    pub cpu_cap: f64,
40    /// 병렬 처리 최소 행 수. 이 미만이면 단일 스레드 실행.
41    /// 기본값: 1000
42    pub min_rows_for_parallel: usize,
43}
44
45impl Default for ParallelismConfig {
46    fn default() -> Self {
47        Self {
48            cpu_cap: 1.0,
49            min_rows_for_parallel: 1_000,
50        }
51    }
52}
53
54impl ParallelismConfig {
55    /// PC를 여유롭게 사용하는 보수적 설정 (CPU 50%, 임계값 5000행)
56    pub fn conservative() -> Self {
57        Self {
58            cpu_cap: 0.5,
59            min_rows_for_parallel: 5_000,
60        }
61    }
62
63    /// 모든 코어를 최대로 활용하는 공격적 설정
64    pub fn aggressive() -> Self {
65        Self {
66            cpu_cap: 1.0,
67            min_rows_for_parallel: 500,
68        }
69    }
70}
71
72/// Parallel execution engine using Rayon thread pool
73pub struct ParallelExecutionEngine {
74    thread_pool: Arc<rayon::ThreadPool>,
75    policy: ParallelizationPolicy,
76    /// 전체 설정 (병렬 및 실시간 동기화)
77    pub db_config: DbConfig,
78}
79
80impl ParallelExecutionEngine {
81    /// Create a new parallel execution engine with the specified policy
82    pub fn new(policy: ParallelizationPolicy) -> DbxResult<Self> {
83        Self::new_with_config(policy, DbConfig::default())
84    }
85
86    /// Create a new parallel execution engine with policy and parallelism config.
87    ///
88    /// `config.parallelism.cpu_cap`에 따라 스레드 수를 제한합니다.
89    pub fn new_with_config(policy: ParallelizationPolicy, config: DbConfig) -> DbxResult<Self> {
90        let cpu_cap = config.parallelism.cpu_cap.clamp(0.01, 1.0);
91        let base_threads = Self::determine_thread_count(policy);
92        let capped_threads = ((base_threads as f64 * cpu_cap).ceil() as usize).max(1);
93
94        let thread_pool = ThreadPoolBuilder::new()
95            .num_threads(capped_threads)
96            .thread_name(|i| format!("dbx-parallel-{}", i))
97            .build()
98            .map_err(|e| {
99                DbxError::NotImplemented(format!("Failed to create thread pool: {}", e))
100            })?;
101
102        Ok(Self {
103            thread_pool: Arc::new(thread_pool),
104            policy,
105            db_config: config,
106        })
107    }
108
109    /// Create a new parallel execution engine with automatic thread count
110    pub fn new_auto() -> DbxResult<Self> {
111        Self::new(ParallelizationPolicy::Auto)
112    }
113
114    /// Create a new parallel execution engine with a fixed number of threads
115    pub fn new_fixed(num_threads: usize) -> DbxResult<Self> {
116        if num_threads == 0 {
117            return Err(DbxError::InvalidArguments(
118                "Thread count must be greater than 0".to_string(),
119            ));
120        }
121        Self::new(ParallelizationPolicy::Fixed(num_threads))
122    }
123
124    /// Get the current parallelization policy
125    pub fn policy(&self) -> ParallelizationPolicy {
126        self.policy
127    }
128
129    /// Get the number of threads in the thread pool
130    pub fn thread_count(&self) -> usize {
131        self.thread_pool.current_num_threads()
132    }
133
134    /// Get a reference to the thread pool
135    pub fn thread_pool(&self) -> &rayon::ThreadPool {
136        &self.thread_pool
137    }
138
139    /// Get the parallelism config
140    pub fn config(&self) -> &ParallelismConfig {
141        &self.db_config.parallelism
142    }
143
144    /// Get the full database config
145    pub fn db_config(&self) -> &DbConfig {
146        &self.db_config
147    }
148
149    /// Check if parallelization is beneficial given a row count,
150    /// respecting `min_rows_for_parallel` from the config.
151    pub fn should_parallelize_rows(&self, row_count: usize) -> bool {
152        row_count >= self.db_config.parallelism.min_rows_for_parallel && self.thread_count() > 1
153    }
154
155    /// Execute a closure in the thread pool
156    pub fn execute<F, R>(&self, f: F) -> R
157    where
158        F: FnOnce() -> R + Send,
159        R: Send,
160    {
161        self.thread_pool.install(f)
162    }
163
164    /// Determine the optimal thread count based on the policy
165    fn determine_thread_count(policy: ParallelizationPolicy) -> usize {
166        match policy {
167            ParallelizationPolicy::Auto => {
168                // Use number of logical CPUs, but cap at 16 to avoid overhead
169                let num_cpus = num_cpus::get();
170                num_cpus.min(16)
171            }
172            ParallelizationPolicy::Fixed(n) => n,
173            ParallelizationPolicy::Adaptive => {
174                // For adaptive, start with half of available CPUs
175                // This can be adjusted dynamically later
176                let num_cpus = num_cpus::get();
177                (num_cpus / 2).max(1)
178            }
179        }
180    }
181
182    /// Auto-tune the thread count based on workload size
183    ///
184    /// Returns the recommended number of parallel tasks for the given workload size
185    pub fn auto_tune(&self, workload_size: usize) -> usize {
186        self.auto_tune_weighted(workload_size, 1.0)
187    }
188
189    /// Auto-tune with complexity weight factor
190    ///
191    /// Higher complexity = fewer items per thread needed to justify parallelism
192    pub fn auto_tune_weighted(&self, workload_size: usize, avg_complexity: f64) -> usize {
193        let thread_count = self.thread_count();
194
195        match self.policy {
196            ParallelizationPolicy::Auto | ParallelizationPolicy::Adaptive => {
197                // Base threshold adjusted by complexity
198                // Simple queries (complexity ~1.0): need 1000 items per thread
199                // Complex queries (complexity ~10.0): need 100 items per thread
200                let base_threshold: f64 = 1000.0;
201                let adjusted_threshold =
202                    (base_threshold / avg_complexity.max(0.1)).max(1.0) as usize;
203
204                if workload_size < adjusted_threshold {
205                    1
206                } else {
207                    let optimal = (workload_size / adjusted_threshold).min(thread_count);
208                    optimal.max(1)
209                }
210            }
211            ParallelizationPolicy::Fixed(_) => thread_count,
212        }
213    }
214
215    /// Estimate SQL query complexity based on heuristics
216    ///
217    /// Returns a complexity score (1.0 = simple SELECT, higher = more complex)
218    pub fn estimate_query_complexity(sql: &str) -> f64 {
219        let sql_upper = sql.to_uppercase();
220        let mut score = 1.0;
221
222        // JOIN adds complexity
223        let join_count = sql_upper.matches("JOIN").count();
224        score += join_count as f64 * 2.0;
225
226        // Subqueries
227        let subquery_depth = sql_upper.matches("SELECT").count().saturating_sub(1);
228        score += subquery_depth as f64 * 3.0;
229
230        // CTE (WITH)
231        if sql_upper.contains("WITH ") {
232            score += 4.0;
233        }
234
235        // UNION
236        let union_count = sql_upper.matches("UNION").count();
237        score += union_count as f64 * 2.5;
238
239        // Aggregate functions
240        for func in ["COUNT(", "SUM(", "AVG(", "MAX(", "MIN("] {
241            score += sql_upper.matches(func).count() as f64 * 0.5;
242        }
243
244        // Window functions
245        if sql_upper.contains("OVER(") || sql_upper.contains("OVER (") {
246            score += 3.0;
247        }
248
249        // ORDER BY, GROUP BY
250        if sql_upper.contains("ORDER BY") {
251            score += 0.5;
252        }
253        if sql_upper.contains("GROUP BY") {
254            score += 1.0;
255        }
256        if sql_upper.contains("HAVING") {
257            score += 1.0;
258        }
259
260        // Query length as proxy for complexity
261        score += (sql.len() as f64 / 200.0).min(5.0);
262
263        score
264    }
265
266    /// Check if parallelization is beneficial for the given workload size
267    pub fn should_parallelize(&self, workload_size: usize) -> bool {
268        self.auto_tune(workload_size) > 1
269    }
270}
271
272impl Default for ParallelExecutionEngine {
273    fn default() -> Self {
274        Self::new_auto().expect("Failed to create default parallel execution engine")
275    }
276}
277
278use crate::replication::transport::ReplicationConfig;
279use crate::storage::realtime_sync::RealtimeSyncConfig;
280
281/// WOS `dirty` 버퍼에 사용할 자료구조 선택.
282///
283/// | 모드 | 특징 |
284/// |------|------|
285/// | `BTreeMap` (기본) | 키 정렬 유지 → 범위 쿼리(scan) 효율적 |
286/// | `DashMap` | 샤드 락 → 다중 스레드 동시 접근 효율적 |
287///
288/// `dirty`는 재시작 시 항상 빈 상태로 초기화되므로 두 모드 간 자유롭게 전환 가능.
289#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
290pub enum DirtyBufferMode {
291    /// 기본값: BTreeMap — 범위 쿼리 최적화, 정렬 유지
292    #[default]
293    BTreeMap,
294    /// DashMap — 동시성 최적화 (단순 get/insert 위주 워크로드)
295    DashMap,
296}
297
298/// 데이터베이스 전체 설정
299///
300/// `Database::open_with_config()`에 전달하여 동작을 제어합니다.
301///
302/// # 예시
303/// ```rust
304/// use dbx_core::engine::parallel_engine::{DbConfig, ParallelismConfig, DirtyBufferMode};
305/// let config = DbConfig {
306///     parallelism: ParallelismConfig::conservative(),
307///     dirty_buffer_mode: DirtyBufferMode::DashMap,
308///     ..Default::default()
309/// };
310/// ```
311#[derive(Debug, Clone, Default)]
312pub struct DbConfig {
313    /// 병렬 처리 설정
314    pub parallelism: ParallelismConfig,
315    /// HTAP 실시간 동기화 설정 (기본은 Threshold(10,000))
316    pub sync: RealtimeSyncConfig,
317    /// 레플리케이션 / Transport 설정 (기본은 인메모리)
318    pub replication: ReplicationConfig,
319    /// dirty 버퍼 자료구조 선택 (기본: BTreeMap)
320    pub dirty_buffer_mode: DirtyBufferMode,
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326
327    #[test]
328    fn test_new_auto() {
329        let engine = ParallelExecutionEngine::new_auto().unwrap();
330        assert_eq!(engine.policy(), ParallelizationPolicy::Auto);
331        assert!(engine.thread_count() > 0);
332    }
333
334    #[test]
335    fn test_new_fixed() {
336        let engine = ParallelExecutionEngine::new_fixed(4).unwrap();
337        assert_eq!(engine.policy(), ParallelizationPolicy::Fixed(4));
338        assert_eq!(engine.thread_count(), 4);
339    }
340
341    #[test]
342    fn test_new_fixed_zero_threads() {
343        let result = ParallelExecutionEngine::new_fixed(0);
344        assert!(result.is_err());
345    }
346
347    #[test]
348    fn test_execute() {
349        let engine = ParallelExecutionEngine::new_auto().unwrap();
350        let result = engine.execute(|| 42);
351        assert_eq!(result, 42);
352    }
353
354    #[test]
355    fn test_auto_tune_small_workload() {
356        let engine = ParallelExecutionEngine::new_auto().unwrap();
357        let parallelism = engine.auto_tune(500);
358        assert_eq!(parallelism, 1); // Too small, should use single thread
359    }
360
361    #[test]
362    fn test_auto_tune_large_workload() {
363        let engine = ParallelExecutionEngine::new_auto().unwrap();
364        let parallelism = engine.auto_tune(100_000);
365        assert!(parallelism > 1); // Large enough for parallelization
366    }
367
368    #[test]
369    fn test_should_parallelize() {
370        let engine = ParallelExecutionEngine::new_auto().unwrap();
371        assert!(!engine.should_parallelize(500)); // Too small
372        assert!(engine.should_parallelize(100_000)); // Large enough
373    }
374
375    #[test]
376    fn test_fixed_policy_always_uses_all_threads() {
377        let engine = ParallelExecutionEngine::new_fixed(8).unwrap();
378        let parallelism = engine.auto_tune(100);
379        assert_eq!(parallelism, 8); // Fixed policy always uses all threads
380    }
381}