Skip to main content

dbx_core/engine/
constructors.rs

1//! Database Constructors - factory methods for creating Database instances
2
3use crate::engine::types::BackgroundJob;
4use crate::engine::{Database, DbConfig, DeltaVariant, DurabilityLevel, WosVariant};
5use crate::error::DbxResult;
6use crate::index::HashIndex;
7use crate::sql::optimizer::QueryOptimizer;
8use crate::sql::parser::SqlParser;
9use crate::sql::view::{MaterializedViewRegistry, ViewRegistry};
10use crate::storage::StorageBackend; // Add this for trait methods
11use crate::storage::delta_store::DeltaStore;
12use crate::storage::encryption::EncryptionConfig;
13use crate::storage::encryption::wos::EncryptedWosBackend;
14use crate::storage::memory_wos::InMemoryWosBackend;
15use crate::storage::native_wos::NativeWosBackend;
16use crate::transaction::mvcc::manager::TransactionManager; // Fix path
17use dashmap::DashMap;
18use std::collections::HashMap;
19use std::path::Path;
20use std::sync::{Arc, RwLock};
21use tracing::{info, instrument};
22
23/// Spawn a background worker thread that handles WAL sync and index update jobs.
24fn spawn_background_worker(
25    rx: std::sync::mpsc::Receiver<BackgroundJob>,
26    wal: Option<Arc<crate::wal::WriteAheadLog>>,
27    enc_wal: Option<Arc<crate::wal::encrypted_wal::EncryptedWal>>,
28    index: Arc<HashIndex>,
29) {
30    std::thread::spawn(move || {
31        while let Ok(job) = rx.recv() {
32            match job {
33                BackgroundJob::WalSync => {
34                    if let Some(w) = &wal {
35                        let _ = w.sync();
36                    }
37                }
38                BackgroundJob::EncryptedWalSync => {
39                    if let Some(w) = &enc_wal {
40                        let _ = w.sync();
41                    }
42                }
43                BackgroundJob::IndexUpdate {
44                    table,
45                    column,
46                    key,
47                    row_id,
48                } => {
49                    let _ = index.update_on_insert(&table, &column, &key, row_id);
50                }
51            }
52        }
53    });
54}
55
56impl Database {
57    /// 데이터베이스를 열거나 생성합니다.
58    ///
59    /// 지정된 경로에 데이터베이스를 생성하거나 기존 데이터베이스를 엽니다.
60    /// WOS (sled)를 통해 영구 저장소를 제공합니다.
61    ///
62    /// # 인자
63    ///
64    /// * `path` - 데이터베이스 디렉토리 경로
65    ///
66    /// # 예제
67    ///
68    /// ```rust
69    /// use dbx_core::Database;
70    /// use std::path::Path;
71    ///
72    /// # fn main() -> dbx_core::DbxResult<()> {
73    /// let db = Database::open(Path::new("./data"))?;
74    /// # Ok(())
75    /// # }
76    /// ```
77    #[instrument(skip(path))]
78    pub fn open(path: &Path) -> DbxResult<Arc<Self>> {
79        info!("Opening database at {:?}", path);
80        let storage_manager = Arc::new(crate::storage::manager::StoragePathManager::new(path));
81        let wos_path = storage_manager.wos_dir()?;
82        let wal_path = storage_manager.wal_path();
83
84        // Initialize WAL
85        let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
86
87        let wos_backend = Arc::new(NativeWosBackend::open(&wos_path)?);
88        let db_index = Arc::new(HashIndex::new());
89
90        // Load persisted metadata (schemas, indexes, and triggers)
91        let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
92        let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
93        let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
94        let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
95        let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
96
97        info!(
98            "Loaded {} schemas, {} indexes, {} triggers, {} procedures, and {} schedules from persistent storage",
99            loaded_schemas.len(),
100            loaded_indexes.len(),
101            loaded_triggers.len(),
102            loaded_procedures.len(),
103            loaded_schedules.len()
104        );
105
106        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
107        spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
108
109        let mut db = Self {
110            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
111            memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
112            file_wos: Some(WosVariant::Native(Arc::clone(&wos_backend))),
113            storage_manager: Arc::clone(&storage_manager),
114            table_persistence: DashMap::new(),
115            schemas: Arc::new(RwLock::new(HashMap::new())),
116            tables: RwLock::new(HashMap::new()),
117            table_schemas: Arc::new(RwLock::new(loaded_schemas)),
118            index: db_index,
119            row_counters: Arc::new(DashMap::new()),
120            sql_parser: SqlParser::new(),
121            sql_optimizer: QueryOptimizer::new(),
122            wal: Some(wal),
123            encrypted_wal: None,
124
125            encryption: RwLock::new(None),
126            tx_manager: Arc::new(TransactionManager::new()),
127            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
128            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
129            job_sender: Some(tx),
130            durability: DurabilityLevel::Lazy,
131            index_registry: RwLock::new(loaded_indexes),
132            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
133            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
134            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
135            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
136            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
137            parallel_engine: Arc::new(
138                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
139                    .expect("Failed to create parallel engine"),
140            ),
141            view_registry: ViewRegistry::new(),
142            mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
143            partition_maps: Arc::new(RwLock::new(HashMap::new())),
144            partition_stats: Arc::new(DashMap::new()),
145            partition_compression: Arc::new(DashMap::new()),
146            partition_lifecycle: Arc::new(DashMap::new()),
147            partition_tier_hints: Arc::new(DashMap::new()),
148            partition_creation_times: Arc::new(DashMap::new()),
149            lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
150            lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
151            config: DbConfig::default(),
152            workload_analyzer: Arc::new(RwLock::new(
153                crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
154            )),
155            replication_master: None,
156            sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
157            scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
158                Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
159            )),
160            metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
161            cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
162            metadata_registry: Arc::new(crate::storage::metadata::MetadataRegistry::new()),
163        };
164
165        db.sql_optimizer
166            .register_rule(Box::new(crate::sql::optimizer::TierPruningRule::new(
167                Arc::clone(&db.metadata_registry),
168            )));
169
170        // Perform crash recovery
171        let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
172            match record {
173                crate::wal::WalRecord::Insert {
174                    table,
175                    key,
176                    value,
177                    ts: _,
178                } => {
179                    db.delta.insert(table, key, value)?;
180                }
181                crate::wal::WalRecord::Delete { table, key, ts: _ } => {
182                    db.delta.delete(table, key)?;
183                }
184                crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
185                    db.delta.insert_batch(table, rows.clone())?;
186                }
187                _ => {}
188            }
189            Ok(())
190        };
191
192        let recovered_count =
193            crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
194        if recovered_count > 0 {
195            info!("Recovered {} WAL records", recovered_count);
196            // Flush recovered data to WOS to prevent duplicate inserts
197            info!("Flushing recovered WAL data to WOS");
198            db.flush()?;
199        }
200
201        // Auto-register loaded SQL triggers
202        if !loaded_triggers.is_empty() {
203            info!(
204                "Auto-registering {} persisted triggers",
205                loaded_triggers.len()
206            );
207            let mut executor = db.trigger_executor.write().unwrap();
208            executor.register_all(loaded_triggers);
209        }
210
211        // Auto-register loaded SQL procedures
212        if !loaded_procedures.is_empty() {
213            info!(
214                "Auto-registering {} persisted procedures",
215                loaded_procedures.len()
216            );
217            let mut executor = db.procedure_executor.write().unwrap();
218            executor.register_all(loaded_procedures);
219        }
220
221        // Auto-register loaded SQL schedules
222        if !loaded_schedules.is_empty() {
223            info!(
224                "Auto-registering {} persisted schedules",
225                loaded_schedules.len()
226            );
227            let executor = db.schedule_executor.write().unwrap();
228            for (_, schedule) in loaded_schedules {
229                let _ = executor.register(schedule);
230            }
231        }
232
233        info!("Database opened successfully");
234
235        // Wrap in Arc
236        let db_arc = Arc::new(db);
237
238        // Start background scheduler
239        let db_weak = Arc::downgrade(&db_arc);
240        db_arc
241            .schedule_executor
242            .write()
243            .unwrap()
244            .start_scheduler(db_weak)?;
245
246        // Start Materialized View event-driven refresh thread
247        let mv_reg = Arc::clone(&db_arc.mat_view_registry);
248        let db_mv_weak = Arc::downgrade(&db_arc);
249        std::thread::Builder::new()
250            .name("dbx-mv-refresh".into())
251            .spawn(move || {
252                loop {
253                    // Condvar 블로킹: DML notify_change()가 호출될 때까지 대기
254                    let dirty = mv_reg.wait_and_take_dirty();
255                    // debounce: min_refresh_interval만큼 추가 대기 후 나머지 dirty도 수집
256                    std::thread::sleep(mv_reg.min_refresh_interval());
257                    let mut all_dirty = dirty;
258                    all_dirty.extend(mv_reg.take_dirty());
259                    for name in all_dirty {
260                        if let Some(db) = db_mv_weak.upgrade() {
261                            let _ = db.execute_sql(&format!("REFRESH MATERIALIZED VIEW {}", name));
262                        } else {
263                            // DB가 Drop됨 — 스레드 종료
264                            return;
265                        }
266                    }
267                }
268            })
269            .ok(); // 스레드 spawn 실패는 무시 (기능 선택적)
270
271        Ok(db_arc)
272    }
273
274    /// 암호화된 데이터베이스를 열거나 생성합니다.
275    ///
276    /// 지정된 경로에 암호화된 데이터베이스를 생성하거나 기존 암호화 DB를 엽니다.
277    /// WAL과 WOS 모두 암호화됩니다.
278    ///
279    /// # 인자
280    ///
281    /// * `path` - 데이터베이스 디렉토리 경로
282    /// * `encryption` - 암호화 설정 (패스워드 또는 raw key 기반)
283    ///
284    /// # 예제
285    ///
286    /// ```rust,no_run
287    /// use dbx_core::Database;
288    /// use dbx_core::storage::encryption::EncryptionConfig;
289    /// use std::path::Path;
290    ///
291    /// let enc = EncryptionConfig::from_password("my-secret-password");
292    /// let db = Database::open_encrypted(Path::new("./data"), enc).unwrap();
293    /// ```
294    #[instrument(skip(path, encryption))]
295    pub fn open_encrypted(path: &Path, encryption: EncryptionConfig) -> DbxResult<Self> {
296        info!("Opening encrypted database at {:?}", path);
297        let storage_manager = Arc::new(crate::storage::manager::StoragePathManager::new(path));
298        let wos_path = storage_manager.wos_dir()?;
299
300        // Initialize encrypted WAL
301        let wal_path = storage_manager.encrypted_wal_path();
302        let encrypted_wal = Arc::new(crate::wal::encrypted_wal::EncryptedWal::open(
303            &wal_path,
304            encryption.clone(),
305        )?);
306
307        // Initialize encrypted WOS
308        let enc_wos = Arc::new(EncryptedWosBackend::open(&wos_path, encryption.clone())?);
309        let db_index = Arc::new(HashIndex::new());
310
311        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
312        spawn_background_worker(
313            rx,
314            None,
315            Some(Arc::clone(&encrypted_wal)),
316            Arc::clone(&db_index),
317        );
318
319        let mut db = Self {
320            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
321            memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
322            file_wos: Some(WosVariant::Encrypted(Arc::clone(&enc_wos))),
323            storage_manager: Arc::clone(&storage_manager),
324            table_persistence: DashMap::new(),
325            schemas: Arc::new(RwLock::new(HashMap::new())),
326            tables: RwLock::new(HashMap::new()),
327            table_schemas: Arc::new(RwLock::new(HashMap::new())),
328            index: db_index,
329            row_counters: Arc::new(DashMap::new()),
330            sql_parser: SqlParser::new(),
331            sql_optimizer: QueryOptimizer::new(),
332            wal: None,
333            encrypted_wal: Some(Arc::clone(&encrypted_wal)),
334
335            encryption: RwLock::new(Some(encryption)),
336            tx_manager: Arc::new(TransactionManager::new()),
337            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
338            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
339            job_sender: Some(tx),
340            durability: DurabilityLevel::Lazy,
341            index_registry: RwLock::new(HashMap::new()),
342            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
343            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
344            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
345            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
346            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
347            parallel_engine: Arc::new(
348                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
349                    .expect("Failed to create parallel engine"),
350            ),
351            view_registry: ViewRegistry::new(),
352            mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
353            partition_maps: Arc::new(RwLock::new(HashMap::new())),
354            partition_stats: Arc::new(DashMap::new()),
355            partition_compression: Arc::new(DashMap::new()),
356            partition_lifecycle: Arc::new(DashMap::new()),
357            partition_tier_hints: Arc::new(DashMap::new()),
358            partition_creation_times: Arc::new(DashMap::new()),
359            lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
360            lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
361            config: DbConfig::default(),
362            workload_analyzer: Arc::new(RwLock::new(
363                crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
364            )),
365            replication_master: None,
366            sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
367            scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
368                Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
369            )),
370            metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
371            cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
372            metadata_registry: Arc::new(crate::storage::metadata::MetadataRegistry::new()),
373        };
374
375        db.sql_optimizer
376            .register_rule(Box::new(crate::sql::optimizer::TierPruningRule::new(
377                Arc::clone(&db.metadata_registry),
378            )));
379
380        let records = encrypted_wal.replay()?;
381        let mut recovered_count = 0;
382        for record in &records {
383            match record {
384                crate::wal::WalRecord::Insert {
385                    table,
386                    key,
387                    value,
388                    ts: _,
389                } => {
390                    db.delta.insert(table, key, value)?;
391                    recovered_count += 1;
392                }
393                crate::wal::WalRecord::Delete { table, key, ts: _ } => {
394                    db.delta.delete(table, key)?;
395                    recovered_count += 1;
396                }
397                crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
398                    db.delta.insert_batch(table, rows.clone())?;
399                    recovered_count += rows.len();
400                }
401                _ => {}
402            }
403        }
404        if recovered_count > 0 {
405            info!("Recovered {} encrypted WAL records", recovered_count);
406        }
407
408        info!("Encrypted database opened successfully");
409        Ok(db)
410    }
411
412    /// 인메모리 데이터베이스를 생성합니다.
413    ///
414    /// 테스트 및 임시 데이터 저장용으로 사용됩니다. 영구 저장되지 않습니다.
415    ///
416    /// # 예제
417    ///
418    /// ```rust
419    /// use dbx_core::Database;
420    ///
421    /// # fn main() -> dbx_core::DbxResult<()> {
422    /// let db = Database::open_in_memory()?;
423    /// db.insert("cache", b"key1", b"value1")?;
424    /// # Ok(())
425    /// # }
426    /// ```
427    #[instrument]
428    pub fn open_in_memory() -> DbxResult<Self> {
429        info!("Creating in-memory database");
430        let storage_manager = Arc::new(crate::storage::manager::StoragePathManager::new(
431            tempfile::tempdir()
432                .map_err(|e| crate::error::DbxError::Storage(e.to_string()))?
433                .path(),
434        ));
435        let db_index = Arc::new(HashIndex::new());
436        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
437        spawn_background_worker(rx, None, None, Arc::clone(&db_index));
438
439        let mut db = Self {
440            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
441            memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
442            file_wos: None,
443            storage_manager: Arc::clone(&storage_manager),
444            table_persistence: DashMap::new(),
445            schemas: Arc::new(RwLock::new(HashMap::new())),
446            tables: RwLock::new(HashMap::new()),
447            table_schemas: Arc::new(RwLock::new(HashMap::new())),
448            index: db_index,
449            row_counters: Arc::new(DashMap::new()),
450            sql_parser: SqlParser::new(),
451            sql_optimizer: QueryOptimizer::new(),
452            wal: None,
453            encrypted_wal: None,
454
455            encryption: RwLock::new(None),
456            tx_manager: Arc::new(TransactionManager::new()),
457            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
458            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
459            job_sender: Some(tx),
460            durability: DurabilityLevel::Lazy,
461            index_registry: RwLock::new(HashMap::new()),
462            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
463            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
464            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
465            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
466            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
467            parallel_engine: Arc::new(
468                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
469                    .expect("Failed to create parallel engine"),
470            ),
471            view_registry: ViewRegistry::new(),
472            mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
473            partition_maps: Arc::new(RwLock::new(HashMap::new())),
474            partition_stats: Arc::new(DashMap::new()),
475            partition_compression: Arc::new(DashMap::new()),
476            partition_lifecycle: Arc::new(DashMap::new()),
477            partition_tier_hints: Arc::new(DashMap::new()),
478            partition_creation_times: Arc::new(DashMap::new()),
479            lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
480            lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
481            config: DbConfig::default(),
482            workload_analyzer: Arc::new(RwLock::new(
483                crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
484            )),
485            replication_master: None,
486            sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
487            scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
488                Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
489            )),
490            metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
491            cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
492            metadata_registry: Arc::new(crate::storage::metadata::MetadataRegistry::new()),
493        };
494
495        db.sql_optimizer
496            .register_rule(Box::new(crate::sql::optimizer::TierPruningRule::new(
497                Arc::clone(&db.metadata_registry),
498            )));
499
500        Ok(db)
501    }
502
503    /// 암호화된 인메모리 데이터베이스를 생성합니다.
504    ///
505    /// 테스트 및 임시 데이터 저장용으로, 메모리 상에서 value가 암호화됩니다.
506    ///
507    /// # 예제
508    ///
509    /// ```rust
510    /// use dbx_core::Database;
511    /// use dbx_core::storage::encryption::EncryptionConfig;
512    ///
513    /// # fn main() -> dbx_core::DbxResult<()> {
514    /// let enc = EncryptionConfig::from_password("secret");
515    /// let db = Database::open_in_memory_encrypted(enc)?;
516    /// db.insert("users", b"user:1", b"Alice")?;
517    /// let val = db.get("users", b"user:1")?;
518    /// assert_eq!(val, Some(b"Alice".to_vec()));
519    /// # Ok(())
520    /// # }
521    /// ```
522    pub fn open_in_memory_encrypted(encryption: EncryptionConfig) -> DbxResult<Self> {
523        info!("Creating encrypted in-memory database");
524        let storage_manager = Arc::new(crate::storage::manager::StoragePathManager::new(
525            tempfile::tempdir()
526                .map_err(|e| crate::error::DbxError::Storage(e.to_string()))?
527                .path(),
528        ));
529        let db_index = Arc::new(HashIndex::new());
530        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
531        spawn_background_worker(rx, None, None, Arc::clone(&db_index));
532
533        let mut db = Self {
534            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
535            memory_wos: WosVariant::Encrypted(Arc::new(EncryptedWosBackend::open_temporary(
536                encryption.clone(),
537            )?)),
538            file_wos: None,
539            storage_manager: Arc::clone(&storage_manager),
540            table_persistence: DashMap::new(),
541            schemas: Arc::new(RwLock::new(HashMap::new())),
542            tables: RwLock::new(HashMap::new()),
543            table_schemas: Arc::new(RwLock::new(HashMap::new())),
544            index: db_index,
545            row_counters: Arc::new(DashMap::new()),
546            sql_parser: SqlParser::new(),
547            sql_optimizer: QueryOptimizer::new(),
548            wal: None,
549            encrypted_wal: None,
550
551            encryption: RwLock::new(Some(encryption)),
552            tx_manager: Arc::new(TransactionManager::new()),
553            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
554            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
555            job_sender: Some(tx),
556            durability: DurabilityLevel::Lazy,
557            index_registry: RwLock::new(HashMap::new()),
558            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
559            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
560            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
561            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
562            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
563            parallel_engine: Arc::new(
564                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
565                    .expect("Failed to create parallel engine"),
566            ),
567            view_registry: ViewRegistry::new(),
568            mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
569            partition_maps: Arc::new(RwLock::new(HashMap::new())),
570            partition_stats: Arc::new(DashMap::new()),
571            partition_compression: Arc::new(DashMap::new()),
572            partition_lifecycle: Arc::new(DashMap::new()),
573            partition_tier_hints: Arc::new(DashMap::new()),
574            partition_creation_times: Arc::new(DashMap::new()),
575            lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
576            lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
577            config: DbConfig::default(),
578            workload_analyzer: Arc::new(RwLock::new(
579                crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
580            )),
581            replication_master: None,
582            sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
583            scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
584                Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
585            )),
586            metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
587            cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
588            metadata_registry: Arc::new(crate::storage::metadata::MetadataRegistry::new()),
589        };
590
591        db.sql_optimizer
592            .register_rule(Box::new(crate::sql::optimizer::TierPruningRule::new(
593                Arc::clone(&db.metadata_registry),
594            )));
595
596        Ok(db)
597    }
598
599    /// 최대 안전성 설정으로 데이터베이스를 엽니다 (Full durability).
600    ///
601    /// 금융, 의료 등 데이터 손실이 절대 허용되지 않는 경우 사용합니다.
602    /// 모든 쓰기 작업마다 fsync를 수행하여 최대 안전성을 보장하지만,
603    /// 성능은 기본 설정(Lazy)보다 느립니다.
604    ///
605    /// # 인자
606    ///
607    /// * `path` - 데이터베이스 파일 경로
608    pub fn open_safe(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
609        Self::open_with_durability(path, DurabilityLevel::Full)
610    }
611
612    /// 최고 성능 설정으로 데이터베이스를 엽니다 (No durability).
613    ///
614    /// WAL을 사용하지 않아 최고 성능을 제공하지만,
615    /// 크래시 시 데이터 손실 가능성이 있습니다.
616    /// 캐시, 임시 데이터, 벤치마크 등에 적합합니다.
617    ///
618    /// # 인자
619    ///
620    /// * `path` - 데이터베이스 파일 경로
621    ///
622    pub fn open_fast(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
623        Self::open_with_durability(path, DurabilityLevel::None)
624    }
625
626    /// 지정된 durability 설정으로 데이터베이스를 엽니다.
627    ///
628    /// # 인자
629    ///
630    /// * `path` - 데이터베이스 파일 경로
631    /// * `durability` - 내구성 수준
632    pub fn open_with_durability(
633        path: impl AsRef<Path>,
634        durability: DurabilityLevel,
635    ) -> DbxResult<Arc<Self>> {
636        info!(
637            "Opening database at {:?} with durability {:?}",
638            path.as_ref(),
639            durability
640        );
641        let storage_manager = Arc::new(crate::storage::manager::StoragePathManager::new(path));
642        let wos_path = storage_manager.wos_dir()?;
643
644        // Initialize WAL
645        let wal_path = storage_manager.wal_path();
646        let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
647
648        let wos_backend = Arc::new(NativeWosBackend::open(&wos_path)?);
649        let db_index = Arc::new(HashIndex::new());
650
651        // Load persisted metadata
652        let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
653        let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
654        let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
655        let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
656        let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
657
658        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
659        spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
660
661        let db = Self {
662            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
663            memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
664            file_wos: Some(WosVariant::Native(Arc::clone(&wos_backend))),
665            storage_manager: Arc::clone(&storage_manager),
666            table_persistence: DashMap::new(),
667            schemas: Arc::new(RwLock::new(HashMap::new())),
668            tables: RwLock::new(HashMap::new()),
669            table_schemas: Arc::new(RwLock::new(loaded_schemas)),
670            index: db_index,
671            row_counters: Arc::new(DashMap::new()),
672            sql_parser: SqlParser::new(),
673            sql_optimizer: QueryOptimizer::new(),
674            wal: Some(wal),
675            encrypted_wal: None,
676            encryption: RwLock::new(None),
677            tx_manager: Arc::new(TransactionManager::new()),
678            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
679            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
680            job_sender: Some(tx),
681            durability, // ← set BEFORE Arc wrapping
682            index_registry: RwLock::new(loaded_indexes),
683            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
684            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
685            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
686            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
687            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
688            parallel_engine: Arc::new(
689                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
690                    .expect("Failed to create parallel engine"),
691            ),
692            view_registry: ViewRegistry::new(),
693            mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
694            partition_maps: Arc::new(RwLock::new(HashMap::new())),
695            partition_stats: Arc::new(DashMap::new()),
696            partition_compression: Arc::new(DashMap::new()),
697            partition_lifecycle: Arc::new(DashMap::new()),
698            partition_tier_hints: Arc::new(DashMap::new()),
699            partition_creation_times: Arc::new(DashMap::new()),
700            lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
701            lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
702            config: DbConfig::default(),
703            workload_analyzer: Arc::new(RwLock::new(
704                crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
705            )),
706            replication_master: None,
707            sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
708            scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
709                Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
710            )),
711            metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
712            cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
713            metadata_registry: Arc::new(crate::storage::metadata::MetadataRegistry::new()),
714        };
715
716        // Crash recovery
717        let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
718            match record {
719                crate::wal::WalRecord::Insert {
720                    table,
721                    key,
722                    value,
723                    ts: _,
724                } => {
725                    db.delta.insert(table, key, value)?;
726                }
727                crate::wal::WalRecord::Delete { table, key, ts: _ } => {
728                    db.delta.delete(table, key)?;
729                }
730                crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
731                    db.delta.insert_batch(table, rows.clone())?;
732                }
733                _ => {}
734            }
735            Ok(())
736        };
737        let recovered_count =
738            crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
739        if recovered_count > 0 {
740            info!("Recovered {} WAL records", recovered_count);
741            db.flush()?;
742        }
743
744        // Auto-register triggers, procedures, schedules
745        if !loaded_triggers.is_empty() {
746            db.trigger_executor
747                .write()
748                .unwrap()
749                .register_all(loaded_triggers);
750        }
751        if !loaded_procedures.is_empty() {
752            db.procedure_executor
753                .write()
754                .unwrap()
755                .register_all(loaded_procedures);
756        }
757        if !loaded_schedules.is_empty() {
758            let executor = db.schedule_executor.write().unwrap();
759            for (_, schedule) in loaded_schedules {
760                let _ = executor.register(schedule);
761            }
762        }
763
764        info!(
765            "Database opened successfully with durability {:?}",
766            durability
767        );
768
769        let db_arc = Arc::new(db);
770        let db_weak = Arc::downgrade(&db_arc);
771        db_arc
772            .schedule_executor
773            .write()
774            .unwrap()
775            .start_scheduler(db_weak)?;
776
777        // Materialized View 이벤트 기반 갱신 백그라운드 스레드
778        let mv_reg = Arc::clone(&db_arc.mat_view_registry);
779        let db_mv_weak = Arc::downgrade(&db_arc);
780        std::thread::Builder::new()
781            .name("dbx-mv-refresh".into())
782            .spawn(move || {
783                loop {
784                    let dirty = mv_reg.wait_and_take_dirty();
785                    std::thread::sleep(mv_reg.min_refresh_interval());
786                    let mut all_dirty = dirty;
787                    all_dirty.extend(mv_reg.take_dirty());
788                    for name in all_dirty {
789                        if let Some(db) = db_mv_weak.upgrade() {
790                            let _ = db.execute_sql(&format!("REFRESH MATERIALIZED VIEW {}", name));
791                        } else {
792                            return;
793                        }
794                    }
795                }
796            })
797            .ok();
798
799        Ok(db_arc)
800    }
801}
802
803impl Database {
804    /// `DbConfig`를 사용하여 데이터베이스를 엽니다.
805    ///
806    /// `config.parallelism.cpu_cap`으로 CPU 사용량을 제어할 수 있습니다.
807    ///
808    /// # 예시
809    ///
810    /// ```rust,no_run
811    /// use dbx_core::Database;
812    /// use dbx_core::engine::parallel_engine::{DbConfig, ParallelismConfig};
813    /// use std::path::Path;
814    ///
815    /// let db = Database::open_with_config(
816    ///     Path::new("./data"),
817    ///     DbConfig {
818    ///         parallelism: ParallelismConfig::conservative(), // CPU 50%만 사용
819    ///         ..Default::default()
820    ///     },
821    /// ).unwrap();
822    /// ```
823    pub fn open_with_config(
824        path: &std::path::Path,
825        config: DbConfig,
826    ) -> DbxResult<std::sync::Arc<Self>> {
827        use crate::engine::parallel_engine::{ParallelExecutionEngine, ParallelizationPolicy};
828        use crate::index::HashIndex;
829        use crate::sql::optimizer::QueryOptimizer;
830        use crate::sql::parser::SqlParser;
831        use crate::storage::native_wos::NativeWosBackend;
832        use crate::transaction::mvcc::manager::TransactionManager;
833        use dashmap::DashMap;
834        use std::collections::HashMap;
835        use std::sync::{Arc, RwLock};
836        use tracing::info;
837
838        info!("Opening database at {:?} with custom config", path);
839        let storage_manager = Arc::new(crate::storage::manager::StoragePathManager::new(path));
840        let wos_path = storage_manager.wos_dir()?;
841
842        let wal_path = storage_manager.wal_path();
843        let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
844
845        let wos_backend = Arc::new(NativeWosBackend::open_with_mode(
846            &wos_path,
847            config.dirty_buffer_mode,
848        )?);
849        let db_index = Arc::new(HashIndex::new());
850
851        let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
852        let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
853        let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
854        let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
855        let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
856
857        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
858        spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
859
860        // 병렬 엔진을 config로 생성
861        let parallel_engine = Arc::new(
862            ParallelExecutionEngine::new_with_config(ParallelizationPolicy::Auto, config.clone())
863                .expect("Failed to create parallel engine"),
864        );
865
866        let mut db = Self {
867            delta: DeltaVariant::RowBased(Arc::new(crate::storage::delta_store::DeltaStore::new())),
868            memory_wos: crate::engine::WosVariant::InMemory(Arc::new(
869                crate::storage::memory_wos::InMemoryWosBackend::new(),
870            )),
871            file_wos: Some(crate::engine::WosVariant::Native(Arc::clone(&wos_backend))),
872            storage_manager: Arc::clone(&storage_manager),
873            table_persistence: DashMap::new(),
874            schemas: Arc::new(RwLock::new(HashMap::new())),
875            tables: RwLock::new(HashMap::new()),
876            table_schemas: Arc::new(RwLock::new(loaded_schemas)),
877            index: db_index,
878            row_counters: Arc::new(DashMap::new()),
879            sql_parser: SqlParser::new(),
880            sql_optimizer: QueryOptimizer::new(),
881            wal: Some(wal),
882            encrypted_wal: None,
883            encryption: RwLock::new(None),
884            tx_manager: Arc::new(TransactionManager::new()),
885            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
886            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
887            job_sender: Some(tx),
888            durability: DurabilityLevel::Lazy,
889            index_registry: RwLock::new(loaded_indexes),
890            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
891            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
892            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
893            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
894            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
895            parallel_engine,
896            view_registry: ViewRegistry::new(),
897            mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
898            partition_maps: Arc::new(RwLock::new(HashMap::new())),
899            partition_stats: Arc::new(DashMap::new()),
900            partition_compression: Arc::new(DashMap::new()),
901            partition_lifecycle: Arc::new(DashMap::new()),
902            partition_tier_hints: Arc::new(DashMap::new()),
903            partition_creation_times: Arc::new(DashMap::new()),
904            lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
905            lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
906            config: config.clone(),
907            workload_analyzer: Arc::new(RwLock::new(
908                crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
909            )),
910            replication_master: None,
911            sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
912            scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
913                Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
914            )),
915            metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
916            cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
917            metadata_registry: Arc::new(crate::storage::metadata::MetadataRegistry::new()),
918        };
919
920        db.sql_optimizer
921            .register_rule(Box::new(crate::sql::optimizer::TierPruningRule::new(
922                Arc::clone(&db.metadata_registry),
923            )));
924
925        // Crash recovery
926        let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
927            match record {
928                crate::wal::WalRecord::Insert {
929                    table,
930                    key,
931                    value,
932                    ts: _,
933                } => {
934                    db.delta.insert(table, key, value)?;
935                }
936                crate::wal::WalRecord::Delete { table, key, ts: _ } => {
937                    db.delta.delete(table, key)?;
938                }
939                crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
940                    db.delta.insert_batch(table, rows.clone())?;
941                }
942                _ => {}
943            }
944            Ok(())
945        };
946        let recovered_count =
947            crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
948        if recovered_count > 0 {
949            info!("Recovered {} WAL records", recovered_count);
950            db.flush()?;
951        }
952
953        // Triggers, procedures, schedules
954        if !loaded_triggers.is_empty() {
955            db.trigger_executor
956                .write()
957                .unwrap()
958                .register_all(loaded_triggers);
959        }
960        if !loaded_procedures.is_empty() {
961            db.procedure_executor
962                .write()
963                .unwrap()
964                .register_all(loaded_procedures);
965        }
966        if !loaded_schedules.is_empty() {
967            let executor = db.schedule_executor.write().unwrap();
968            for (_, schedule) in loaded_schedules {
969                let _ = executor.register(schedule);
970            }
971        }
972
973        info!(
974            "Database opened with custom parallelism config (cpu_cap={:.0}%)",
975            config.parallelism.cpu_cap * 100.0
976        );
977
978        let db_arc = Arc::new(db);
979        let db_weak = Arc::downgrade(&db_arc);
980        db_arc
981            .schedule_executor
982            .write()
983            .unwrap()
984            .start_scheduler(db_weak)?;
985
986        Ok(db_arc)
987    }
988}