Skip to main content

dbx_core/engine/
constructors.rs

1//! Database Constructors - factory methods for creating Database instances
2
3use crate::engine::types::BackgroundJob;
4use crate::engine::{Database, DbConfig, DeltaVariant, DurabilityLevel, WosVariant};
5use crate::error::DbxResult;
6use crate::index::HashIndex;
7use crate::sql::optimizer::QueryOptimizer;
8use crate::sql::parser::SqlParser;
9use crate::sql::view::{MaterializedViewRegistry, ViewRegistry};
10use crate::storage::StorageBackend; // Add this for trait methods
11use crate::storage::delta_store::DeltaStore;
12use crate::storage::encryption::EncryptionConfig;
13use crate::storage::encryption::wos::EncryptedWosBackend;
14use crate::storage::memory_wos::InMemoryWosBackend;
15use crate::storage::native_wos::NativeWosBackend;
16use crate::transaction::mvcc::manager::TransactionManager; // Fix path
17use dashmap::DashMap;
18use std::collections::HashMap;
19use std::path::Path;
20use std::sync::{Arc, RwLock};
21use tracing::{info, instrument};
22
23/// Spawn a background worker thread that handles WAL sync and index update jobs.
24fn spawn_background_worker(
25    rx: std::sync::mpsc::Receiver<BackgroundJob>,
26    wal: Option<Arc<crate::wal::WriteAheadLog>>,
27    enc_wal: Option<Arc<crate::wal::encrypted_wal::EncryptedWal>>,
28    index: Arc<HashIndex>,
29) {
30    std::thread::spawn(move || {
31        while let Ok(job) = rx.recv() {
32            match job {
33                BackgroundJob::WalSync => {
34                    if let Some(w) = &wal {
35                        let _ = w.sync();
36                    }
37                }
38                BackgroundJob::EncryptedWalSync => {
39                    if let Some(w) = &enc_wal {
40                        let _ = w.sync();
41                    }
42                }
43                BackgroundJob::IndexUpdate {
44                    table,
45                    column,
46                    key,
47                    row_id,
48                } => {
49                    let _ = index.update_on_insert(&table, &column, &key, row_id);
50                }
51            }
52        }
53    });
54}
55
56impl Database {
57    /// 데이터베이스를 열거나 생성합니다.
58    ///
59    /// 지정된 경로에 데이터베이스를 생성하거나 기존 데이터베이스를 엽니다.
60    /// WOS (sled)를 통해 영구 저장소를 제공합니다.
61    ///
62    /// # 인자
63    ///
64    /// * `path` - 데이터베이스 디렉토리 경로
65    ///
66    /// # 예제
67    ///
68    /// ```rust
69    /// use dbx_core::Database;
70    /// use std::path::Path;
71    ///
72    /// # fn main() -> dbx_core::DbxResult<()> {
73    /// let db = Database::open(Path::new("./data"))?;
74    /// # Ok(())
75    /// # }
76    /// ```
77    #[instrument(skip(path))]
78    pub fn open(path: &Path) -> DbxResult<Arc<Self>> {
79        info!("Opening database at {:?}", path);
80        let wos_path = path.join("wos");
81        std::fs::create_dir_all(&wos_path)?;
82
83        // Initialize WAL
84        let wal_path = path.join("wal.log");
85        let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
86
87        let wos_backend = Arc::new(NativeWosBackend::open(&wos_path)?);
88        let db_index = Arc::new(HashIndex::new());
89
90        // Load persisted metadata (schemas, indexes, and triggers)
91        let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
92        let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
93        let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
94        let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
95        let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
96
97        info!(
98            "Loaded {} schemas, {} indexes, {} triggers, {} procedures, and {} schedules from persistent storage",
99            loaded_schemas.len(),
100            loaded_indexes.len(),
101            loaded_triggers.len(),
102            loaded_procedures.len(),
103            loaded_schedules.len()
104        );
105
106        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
107        spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
108
109        let db = Self {
110            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
111            memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
112            file_wos: Some(WosVariant::Native(Arc::clone(&wos_backend))),
113            table_persistence: DashMap::new(),
114            schemas: Arc::new(RwLock::new(HashMap::new())),
115            tables: RwLock::new(HashMap::new()),
116            table_schemas: Arc::new(RwLock::new(loaded_schemas)),
117            index: db_index,
118            row_counters: Arc::new(DashMap::new()),
119            sql_parser: SqlParser::new(),
120            sql_optimizer: QueryOptimizer::new(),
121            wal: Some(wal),
122            encrypted_wal: None,
123
124            encryption: RwLock::new(None),
125            tx_manager: Arc::new(TransactionManager::new()),
126            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
127            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
128            job_sender: Some(tx),
129            durability: DurabilityLevel::Lazy,
130            index_registry: RwLock::new(loaded_indexes),
131            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
132            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
133            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
134            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
135            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
136            parallel_engine: Arc::new(
137                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
138                    .expect("Failed to create parallel engine"),
139            ),
140            view_registry: ViewRegistry::new(),
141            mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
142            partition_maps: Arc::new(RwLock::new(HashMap::new())),
143            partition_stats: Arc::new(DashMap::new()),
144            partition_compression: Arc::new(DashMap::new()),
145            partition_lifecycle: Arc::new(DashMap::new()),
146            partition_tier_hints: Arc::new(DashMap::new()),
147            partition_creation_times: Arc::new(DashMap::new()),
148            lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
149            lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
150            config: DbConfig::default(),
151            workload_analyzer: Arc::new(RwLock::new(
152                crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
153            )),
154            replication_master: None,
155            sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
156            scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
157                Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
158            )),
159            metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
160            cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
161        };
162
163        // Perform crash recovery
164        let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
165            match record {
166                crate::wal::WalRecord::Insert {
167                    table,
168                    key,
169                    value,
170                    ts: _,
171                } => {
172                    db.delta.insert(table, key, value)?;
173                }
174                crate::wal::WalRecord::Delete { table, key, ts: _ } => {
175                    db.delta.delete(table, key)?;
176                }
177                crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
178                    db.delta.insert_batch(table, rows.clone())?;
179                }
180                _ => {}
181            }
182            Ok(())
183        };
184
185        let recovered_count =
186            crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
187        if recovered_count > 0 {
188            info!("Recovered {} WAL records", recovered_count);
189            // Flush recovered data to WOS to prevent duplicate inserts
190            info!("Flushing recovered WAL data to WOS");
191            db.flush()?;
192        }
193
194        // Auto-register loaded SQL triggers
195        if !loaded_triggers.is_empty() {
196            info!(
197                "Auto-registering {} persisted triggers",
198                loaded_triggers.len()
199            );
200            let mut executor = db.trigger_executor.write().unwrap();
201            executor.register_all(loaded_triggers);
202        }
203
204        // Auto-register loaded SQL procedures
205        if !loaded_procedures.is_empty() {
206            info!(
207                "Auto-registering {} persisted procedures",
208                loaded_procedures.len()
209            );
210            let mut executor = db.procedure_executor.write().unwrap();
211            executor.register_all(loaded_procedures);
212        }
213
214        // Auto-register loaded SQL schedules
215        if !loaded_schedules.is_empty() {
216            info!(
217                "Auto-registering {} persisted schedules",
218                loaded_schedules.len()
219            );
220            let executor = db.schedule_executor.write().unwrap();
221            for (_, schedule) in loaded_schedules {
222                let _ = executor.register(schedule);
223            }
224        }
225
226        info!("Database opened successfully");
227
228        // Wrap in Arc
229        let db_arc = Arc::new(db);
230
231        // Start background scheduler
232        let db_weak = Arc::downgrade(&db_arc);
233        db_arc
234            .schedule_executor
235            .write()
236            .unwrap()
237            .start_scheduler(db_weak)?;
238
239        // Start Materialized View event-driven refresh thread
240        let mv_reg = Arc::clone(&db_arc.mat_view_registry);
241        let db_mv_weak = Arc::downgrade(&db_arc);
242        std::thread::Builder::new()
243            .name("dbx-mv-refresh".into())
244            .spawn(move || {
245                loop {
246                    // Condvar 블로킹: DML notify_change()가 호출될 때까지 대기
247                    let dirty = mv_reg.wait_and_take_dirty();
248                    // debounce: min_refresh_interval만큼 추가 대기 후 나머지 dirty도 수집
249                    std::thread::sleep(mv_reg.min_refresh_interval());
250                    let mut all_dirty = dirty;
251                    all_dirty.extend(mv_reg.take_dirty());
252                    for name in all_dirty {
253                        if let Some(db) = db_mv_weak.upgrade() {
254                            let _ = db.execute_sql(&format!("REFRESH MATERIALIZED VIEW {}", name));
255                        } else {
256                            // DB가 Drop됨 — 스레드 종료
257                            return;
258                        }
259                    }
260                }
261            })
262            .ok(); // 스레드 spawn 실패는 무시 (기능 선택적)
263
264        Ok(db_arc)
265    }
266
267    /// 암호화된 데이터베이스를 열거나 생성합니다.
268    ///
269    /// 지정된 경로에 암호화된 데이터베이스를 생성하거나 기존 암호화 DB를 엽니다.
270    /// WAL과 WOS 모두 암호화됩니다.
271    ///
272    /// # 인자
273    ///
274    /// * `path` - 데이터베이스 디렉토리 경로
275    /// * `encryption` - 암호화 설정 (패스워드 또는 raw key 기반)
276    ///
277    /// # 예제
278    ///
279    /// ```rust,no_run
280    /// use dbx_core::Database;
281    /// use dbx_core::storage::encryption::EncryptionConfig;
282    /// use std::path::Path;
283    ///
284    /// let enc = EncryptionConfig::from_password("my-secret-password");
285    /// let db = Database::open_encrypted(Path::new("./data"), enc).unwrap();
286    /// ```
287    #[instrument(skip(path, encryption))]
288    pub fn open_encrypted(path: &Path, encryption: EncryptionConfig) -> DbxResult<Self> {
289        info!("Opening encrypted database at {:?}", path);
290        let wos_path = path.join("wos");
291        std::fs::create_dir_all(&wos_path)?;
292
293        // Initialize encrypted WAL
294        let wal_path = path.join("wal.enc.log");
295        let encrypted_wal = Arc::new(crate::wal::encrypted_wal::EncryptedWal::open(
296            &wal_path,
297            encryption.clone(),
298        )?);
299
300        // Initialize encrypted WOS
301        let enc_wos = Arc::new(EncryptedWosBackend::open(&wos_path, encryption.clone())?);
302        let db_index = Arc::new(HashIndex::new());
303
304        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
305        spawn_background_worker(
306            rx,
307            None,
308            Some(Arc::clone(&encrypted_wal)),
309            Arc::clone(&db_index),
310        );
311
312        let db = Self {
313            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
314            memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
315            file_wos: Some(WosVariant::Encrypted(Arc::clone(&enc_wos))),
316            table_persistence: DashMap::new(),
317            schemas: Arc::new(RwLock::new(HashMap::new())),
318            tables: RwLock::new(HashMap::new()),
319            table_schemas: Arc::new(RwLock::new(HashMap::new())),
320            index: db_index,
321            row_counters: Arc::new(DashMap::new()),
322            sql_parser: SqlParser::new(),
323            sql_optimizer: QueryOptimizer::new(),
324            wal: None,
325            encrypted_wal: Some(Arc::clone(&encrypted_wal)),
326
327            encryption: RwLock::new(Some(encryption)),
328            tx_manager: Arc::new(TransactionManager::new()),
329            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
330            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
331            job_sender: Some(tx),
332            durability: DurabilityLevel::Lazy,
333            index_registry: RwLock::new(HashMap::new()),
334            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
335            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
336            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
337            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
338            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
339            parallel_engine: Arc::new(
340                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
341                    .expect("Failed to create parallel engine"),
342            ),
343            view_registry: ViewRegistry::new(),
344            mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
345            partition_maps: Arc::new(RwLock::new(HashMap::new())),
346            partition_stats: Arc::new(DashMap::new()),
347            partition_compression: Arc::new(DashMap::new()),
348            partition_lifecycle: Arc::new(DashMap::new()),
349            partition_tier_hints: Arc::new(DashMap::new()),
350            partition_creation_times: Arc::new(DashMap::new()),
351            lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
352            lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
353            config: DbConfig::default(),
354            workload_analyzer: Arc::new(RwLock::new(
355                crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
356            )),
357            replication_master: None,
358            sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
359            scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
360                Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
361            )),
362            metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
363            cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
364        };
365        let records = encrypted_wal.replay()?;
366        let mut recovered_count = 0;
367        for record in &records {
368            match record {
369                crate::wal::WalRecord::Insert {
370                    table,
371                    key,
372                    value,
373                    ts: _,
374                } => {
375                    db.delta.insert(table, key, value)?;
376                    recovered_count += 1;
377                }
378                crate::wal::WalRecord::Delete { table, key, ts: _ } => {
379                    db.delta.delete(table, key)?;
380                    recovered_count += 1;
381                }
382                crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
383                    db.delta.insert_batch(table, rows.clone())?;
384                    recovered_count += rows.len();
385                }
386                _ => {}
387            }
388        }
389        if recovered_count > 0 {
390            info!("Recovered {} encrypted WAL records", recovered_count);
391        }
392
393        info!("Encrypted database opened successfully");
394        Ok(db)
395    }
396
397    /// 인메모리 데이터베이스를 생성합니다.
398    ///
399    /// 테스트 및 임시 데이터 저장용으로 사용됩니다. 영구 저장되지 않습니다.
400    ///
401    /// # 예제
402    ///
403    /// ```rust
404    /// use dbx_core::Database;
405    ///
406    /// # fn main() -> dbx_core::DbxResult<()> {
407    /// let db = Database::open_in_memory()?;
408    /// db.insert("cache", b"key1", b"value1")?;
409    /// # Ok(())
410    /// # }
411    /// ```
412    #[instrument]
413    pub fn open_in_memory() -> DbxResult<Self> {
414        info!("Creating in-memory database");
415        let db_index = Arc::new(HashIndex::new());
416        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
417        spawn_background_worker(rx, None, None, Arc::clone(&db_index));
418
419        Ok(Self {
420            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
421            memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
422            file_wos: None,
423            table_persistence: DashMap::new(),
424            schemas: Arc::new(RwLock::new(HashMap::new())),
425            tables: RwLock::new(HashMap::new()),
426            table_schemas: Arc::new(RwLock::new(HashMap::new())),
427            index: db_index,
428            row_counters: Arc::new(DashMap::new()),
429            sql_parser: SqlParser::new(),
430            sql_optimizer: QueryOptimizer::new(),
431            wal: None,
432            encrypted_wal: None,
433
434            encryption: RwLock::new(None),
435            tx_manager: Arc::new(TransactionManager::new()),
436            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
437            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
438            job_sender: Some(tx),
439            durability: DurabilityLevel::Lazy,
440            index_registry: RwLock::new(HashMap::new()),
441            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
442            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
443            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
444            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
445            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
446            parallel_engine: Arc::new(
447                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
448                    .expect("Failed to create parallel engine"),
449            ),
450            view_registry: ViewRegistry::new(),
451            mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
452            partition_maps: Arc::new(RwLock::new(HashMap::new())),
453            partition_stats: Arc::new(DashMap::new()),
454            partition_compression: Arc::new(DashMap::new()),
455            partition_lifecycle: Arc::new(DashMap::new()),
456            partition_tier_hints: Arc::new(DashMap::new()),
457            partition_creation_times: Arc::new(DashMap::new()),
458            lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
459            lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
460            config: DbConfig::default(),
461            workload_analyzer: Arc::new(RwLock::new(
462                crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
463            )),
464            replication_master: None,
465            sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
466            scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
467                Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
468            )),
469            metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
470            cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
471        })
472    }
473
474    /// 암호화된 인메모리 데이터베이스를 생성합니다.
475    ///
476    /// 테스트 및 임시 데이터 저장용으로, 메모리 상에서 value가 암호화됩니다.
477    ///
478    /// # 예제
479    ///
480    /// ```rust
481    /// use dbx_core::Database;
482    /// use dbx_core::storage::encryption::EncryptionConfig;
483    ///
484    /// # fn main() -> dbx_core::DbxResult<()> {
485    /// let enc = EncryptionConfig::from_password("secret");
486    /// let db = Database::open_in_memory_encrypted(enc)?;
487    /// db.insert("users", b"user:1", b"Alice")?;
488    /// let val = db.get("users", b"user:1")?;
489    /// assert_eq!(val, Some(b"Alice".to_vec()));
490    /// # Ok(())
491    /// # }
492    /// ```
493    pub fn open_in_memory_encrypted(encryption: EncryptionConfig) -> DbxResult<Self> {
494        let db_index = Arc::new(HashIndex::new());
495        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
496        spawn_background_worker(rx, None, None, Arc::clone(&db_index));
497
498        Ok(Self {
499            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
500            memory_wos: WosVariant::Encrypted(Arc::new(EncryptedWosBackend::open_temporary(
501                encryption.clone(),
502            )?)),
503            file_wos: None,
504            table_persistence: DashMap::new(),
505            schemas: Arc::new(RwLock::new(HashMap::new())),
506            tables: RwLock::new(HashMap::new()),
507            table_schemas: Arc::new(RwLock::new(HashMap::new())),
508            index: db_index,
509            row_counters: Arc::new(DashMap::new()),
510            sql_parser: SqlParser::new(),
511            sql_optimizer: QueryOptimizer::new(),
512            wal: None,
513            encrypted_wal: None,
514
515            encryption: RwLock::new(Some(encryption)),
516            tx_manager: Arc::new(TransactionManager::new()),
517            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
518            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
519            job_sender: Some(tx),
520            durability: DurabilityLevel::Lazy,
521            index_registry: RwLock::new(HashMap::new()),
522            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
523            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
524            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
525            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
526            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
527            parallel_engine: Arc::new(
528                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
529                    .expect("Failed to create parallel engine"),
530            ),
531            view_registry: ViewRegistry::new(),
532            mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
533            partition_maps: Arc::new(RwLock::new(HashMap::new())),
534            partition_stats: Arc::new(DashMap::new()),
535            partition_compression: Arc::new(DashMap::new()),
536            partition_lifecycle: Arc::new(DashMap::new()),
537            partition_tier_hints: Arc::new(DashMap::new()),
538            partition_creation_times: Arc::new(DashMap::new()),
539            lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
540            lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
541            config: DbConfig::default(),
542            workload_analyzer: Arc::new(RwLock::new(
543                crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
544            )),
545            replication_master: None,
546            sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
547            scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
548                Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
549            )),
550            metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
551            cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
552        })
553    }
554
555    /// 최대 안전성 설정으로 데이터베이스를 엽니다 (Full durability).
556    ///
557    /// 금융, 의료 등 데이터 손실이 절대 허용되지 않는 경우 사용합니다.
558    /// 모든 쓰기 작업마다 fsync를 수행하여 최대 안전성을 보장하지만,
559    /// 성능은 기본 설정(Lazy)보다 느립니다.
560    ///
561    /// # 인자
562    ///
563    /// * `path` - 데이터베이스 파일 경로
564    pub fn open_safe(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
565        Self::open_with_durability(path, DurabilityLevel::Full)
566    }
567
568    /// 최고 성능 설정으로 데이터베이스를 엽니다 (No durability).
569    ///
570    /// WAL을 사용하지 않아 최고 성능을 제공하지만,
571    /// 크래시 시 데이터 손실 가능성이 있습니다.
572    /// 캐시, 임시 데이터, 벤치마크 등에 적합합니다.
573    ///
574    /// # 인자
575    ///
576    /// * `path` - 데이터베이스 파일 경로
577    ///
578    pub fn open_fast(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
579        Self::open_with_durability(path, DurabilityLevel::None)
580    }
581
582    /// 지정된 durability 설정으로 데이터베이스를 엽니다.
583    ///
584    /// # 인자
585    ///
586    /// * `path` - 데이터베이스 파일 경로
587    /// * `durability` - 내구성 수준
588    pub fn open_with_durability(
589        path: impl AsRef<Path>,
590        durability: DurabilityLevel,
591    ) -> DbxResult<Arc<Self>> {
592        info!(
593            "Opening database at {:?} with durability {:?}",
594            path.as_ref(),
595            durability
596        );
597        let path = path.as_ref();
598        let wos_path = path.join("wos");
599        std::fs::create_dir_all(&wos_path)?;
600
601        // Initialize WAL
602        let wal_path = path.join("wal.log");
603        let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
604
605        let wos_backend = Arc::new(NativeWosBackend::open(&wos_path)?);
606        let db_index = Arc::new(HashIndex::new());
607
608        // Load persisted metadata
609        let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
610        let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
611        let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
612        let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
613        let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
614
615        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
616        spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
617
618        let db = Self {
619            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
620            memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
621            file_wos: Some(WosVariant::Native(Arc::clone(&wos_backend))),
622            table_persistence: DashMap::new(),
623            schemas: Arc::new(RwLock::new(HashMap::new())),
624            tables: RwLock::new(HashMap::new()),
625            table_schemas: Arc::new(RwLock::new(loaded_schemas)),
626            index: db_index,
627            row_counters: Arc::new(DashMap::new()),
628            sql_parser: SqlParser::new(),
629            sql_optimizer: QueryOptimizer::new(),
630            wal: Some(wal),
631            encrypted_wal: None,
632            encryption: RwLock::new(None),
633            tx_manager: Arc::new(TransactionManager::new()),
634            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
635            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
636            job_sender: Some(tx),
637            durability, // ← set BEFORE Arc wrapping
638            index_registry: RwLock::new(loaded_indexes),
639            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
640            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
641            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
642            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
643            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
644            parallel_engine: Arc::new(
645                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
646                    .expect("Failed to create parallel engine"),
647            ),
648            view_registry: ViewRegistry::new(),
649            mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
650            partition_maps: Arc::new(RwLock::new(HashMap::new())),
651            partition_stats: Arc::new(DashMap::new()),
652            partition_compression: Arc::new(DashMap::new()),
653            partition_lifecycle: Arc::new(DashMap::new()),
654            partition_tier_hints: Arc::new(DashMap::new()),
655            partition_creation_times: Arc::new(DashMap::new()),
656            lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
657            lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
658            config: DbConfig::default(),
659            workload_analyzer: Arc::new(RwLock::new(
660                crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
661            )),
662            replication_master: None,
663            sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
664            scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
665                Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
666            )),
667            metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
668            cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
669        };
670
671        // Crash recovery
672        let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
673            match record {
674                crate::wal::WalRecord::Insert {
675                    table,
676                    key,
677                    value,
678                    ts: _,
679                } => {
680                    db.delta.insert(table, key, value)?;
681                }
682                crate::wal::WalRecord::Delete { table, key, ts: _ } => {
683                    db.delta.delete(table, key)?;
684                }
685                crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
686                    db.delta.insert_batch(table, rows.clone())?;
687                }
688                _ => {}
689            }
690            Ok(())
691        };
692        let recovered_count =
693            crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
694        if recovered_count > 0 {
695            info!("Recovered {} WAL records", recovered_count);
696            db.flush()?;
697        }
698
699        // Auto-register triggers, procedures, schedules
700        if !loaded_triggers.is_empty() {
701            db.trigger_executor
702                .write()
703                .unwrap()
704                .register_all(loaded_triggers);
705        }
706        if !loaded_procedures.is_empty() {
707            db.procedure_executor
708                .write()
709                .unwrap()
710                .register_all(loaded_procedures);
711        }
712        if !loaded_schedules.is_empty() {
713            let executor = db.schedule_executor.write().unwrap();
714            for (_, schedule) in loaded_schedules {
715                let _ = executor.register(schedule);
716            }
717        }
718
719        info!(
720            "Database opened successfully with durability {:?}",
721            durability
722        );
723
724        let db_arc = Arc::new(db);
725        let db_weak = Arc::downgrade(&db_arc);
726        db_arc
727            .schedule_executor
728            .write()
729            .unwrap()
730            .start_scheduler(db_weak)?;
731
732        // Materialized View 이벤트 기반 갱신 백그라운드 스레드
733        let mv_reg = Arc::clone(&db_arc.mat_view_registry);
734        let db_mv_weak = Arc::downgrade(&db_arc);
735        std::thread::Builder::new()
736            .name("dbx-mv-refresh".into())
737            .spawn(move || {
738                loop {
739                    let dirty = mv_reg.wait_and_take_dirty();
740                    std::thread::sleep(mv_reg.min_refresh_interval());
741                    let mut all_dirty = dirty;
742                    all_dirty.extend(mv_reg.take_dirty());
743                    for name in all_dirty {
744                        if let Some(db) = db_mv_weak.upgrade() {
745                            let _ = db.execute_sql(&format!("REFRESH MATERIALIZED VIEW {}", name));
746                        } else {
747                            return;
748                        }
749                    }
750                }
751            })
752            .ok();
753
754        Ok(db_arc)
755    }
756}
757
758impl Database {
759    /// `DbConfig`를 사용하여 데이터베이스를 엽니다.
760    ///
761    /// `config.parallelism.cpu_cap`으로 CPU 사용량을 제어할 수 있습니다.
762    ///
763    /// # 예시
764    ///
765    /// ```rust,no_run
766    /// use dbx_core::Database;
767    /// use dbx_core::engine::parallel_engine::{DbConfig, ParallelismConfig};
768    /// use std::path::Path;
769    ///
770    /// let db = Database::open_with_config(
771    ///     Path::new("./data"),
772    ///     DbConfig {
773    ///         parallelism: ParallelismConfig::conservative(), // CPU 50%만 사용
774    ///         ..Default::default()
775    ///     },
776    /// ).unwrap();
777    /// ```
778    pub fn open_with_config(
779        path: &std::path::Path,
780        config: DbConfig,
781    ) -> DbxResult<std::sync::Arc<Self>> {
782        use crate::engine::parallel_engine::{ParallelExecutionEngine, ParallelizationPolicy};
783        use crate::index::HashIndex;
784        use crate::sql::optimizer::QueryOptimizer;
785        use crate::sql::parser::SqlParser;
786        use crate::storage::native_wos::NativeWosBackend;
787        use crate::transaction::mvcc::manager::TransactionManager;
788        use dashmap::DashMap;
789        use std::collections::HashMap;
790        use std::sync::{Arc, RwLock};
791        use tracing::info;
792
793        info!("Opening database at {:?} with custom config", path);
794        let wos_path = path.join("wos");
795        std::fs::create_dir_all(&wos_path)?;
796
797        let wal_path = path.join("wal.log");
798        let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
799
800        let wos_backend = Arc::new(NativeWosBackend::open_with_mode(
801            &wos_path,
802            config.dirty_buffer_mode,
803        )?);
804        let db_index = Arc::new(HashIndex::new());
805
806        let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
807        let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
808        let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
809        let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
810        let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
811
812        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
813        spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
814
815        // 병렬 엔진을 config로 생성
816        let parallel_engine = Arc::new(
817            ParallelExecutionEngine::new_with_config(ParallelizationPolicy::Auto, config.clone())
818                .expect("Failed to create parallel engine"),
819        );
820
821        let db = Self {
822            delta: DeltaVariant::RowBased(Arc::new(crate::storage::delta_store::DeltaStore::new())),
823            memory_wos: crate::engine::WosVariant::InMemory(Arc::new(
824                crate::storage::memory_wos::InMemoryWosBackend::new(),
825            )),
826            file_wos: Some(crate::engine::WosVariant::Native(Arc::clone(&wos_backend))),
827            table_persistence: DashMap::new(),
828            schemas: Arc::new(RwLock::new(HashMap::new())),
829            tables: RwLock::new(HashMap::new()),
830            table_schemas: Arc::new(RwLock::new(loaded_schemas)),
831            index: db_index,
832            row_counters: Arc::new(DashMap::new()),
833            sql_parser: SqlParser::new(),
834            sql_optimizer: QueryOptimizer::new(),
835            wal: Some(wal),
836            encrypted_wal: None,
837            encryption: RwLock::new(None),
838            tx_manager: Arc::new(TransactionManager::new()),
839            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
840            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
841            job_sender: Some(tx),
842            durability: DurabilityLevel::Lazy,
843            index_registry: RwLock::new(loaded_indexes),
844            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
845            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
846            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
847            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
848            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
849            parallel_engine,
850            view_registry: ViewRegistry::new(),
851            mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
852            partition_maps: Arc::new(RwLock::new(HashMap::new())),
853            partition_stats: Arc::new(DashMap::new()),
854            partition_compression: Arc::new(DashMap::new()),
855            partition_lifecycle: Arc::new(DashMap::new()),
856            partition_tier_hints: Arc::new(DashMap::new()),
857            partition_creation_times: Arc::new(DashMap::new()),
858            lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
859            lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
860            config: config.clone(),
861            workload_analyzer: Arc::new(RwLock::new(
862                crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
863            )),
864            replication_master: None,
865            sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
866            scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
867                Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
868            )),
869            metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
870            cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
871        };
872
873        // Crash recovery
874        let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
875            match record {
876                crate::wal::WalRecord::Insert {
877                    table,
878                    key,
879                    value,
880                    ts: _,
881                } => {
882                    db.delta.insert(table, key, value)?;
883                }
884                crate::wal::WalRecord::Delete { table, key, ts: _ } => {
885                    db.delta.delete(table, key)?;
886                }
887                crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
888                    db.delta.insert_batch(table, rows.clone())?;
889                }
890                _ => {}
891            }
892            Ok(())
893        };
894        let recovered_count =
895            crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
896        if recovered_count > 0 {
897            info!("Recovered {} WAL records", recovered_count);
898            db.flush()?;
899        }
900
901        // Triggers, procedures, schedules
902        if !loaded_triggers.is_empty() {
903            db.trigger_executor
904                .write()
905                .unwrap()
906                .register_all(loaded_triggers);
907        }
908        if !loaded_procedures.is_empty() {
909            db.procedure_executor
910                .write()
911                .unwrap()
912                .register_all(loaded_procedures);
913        }
914        if !loaded_schedules.is_empty() {
915            let executor = db.schedule_executor.write().unwrap();
916            for (_, schedule) in loaded_schedules {
917                let _ = executor.register(schedule);
918            }
919        }
920
921        info!(
922            "Database opened with custom parallelism config (cpu_cap={:.0}%)",
923            config.parallelism.cpu_cap * 100.0
924        );
925
926        let db_arc = Arc::new(db);
927        let db_weak = Arc::downgrade(&db_arc);
928        db_arc
929            .schedule_executor
930            .write()
931            .unwrap()
932            .start_scheduler(db_weak)?;
933
934        Ok(db_arc)
935    }
936}