Skip to main content

dbx_core/engine/
constructors.rs

1//! Database Constructors — factory methods for creating Database instances
2
3use crate::engine::types::BackgroundJob;
4use crate::engine::{Database, DeltaVariant, DurabilityLevel, WosVariant};
5use crate::error::DbxResult;
6use crate::index::HashIndex;
7use crate::sql::optimizer::QueryOptimizer;
8use crate::sql::parser::SqlParser;
9use crate::storage::StorageBackend; // Add this for trait methods
10use crate::storage::delta_store::DeltaStore;
11use crate::storage::encryption::EncryptionConfig;
12use crate::storage::encryption::wos::EncryptedWosBackend;
13use crate::storage::wos::WosBackend;
14use crate::transaction::mvcc::manager::TransactionManager; // Fix path
15use dashmap::DashMap;
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19use tracing::{info, instrument};
20
21/// Spawn a background worker thread that handles WAL sync and index update jobs.
22fn spawn_background_worker(
23    rx: std::sync::mpsc::Receiver<BackgroundJob>,
24    wal: Option<Arc<crate::wal::WriteAheadLog>>,
25    enc_wal: Option<Arc<crate::wal::encrypted_wal::EncryptedWal>>,
26    index: Arc<HashIndex>,
27) {
28    std::thread::spawn(move || {
29        while let Ok(job) = rx.recv() {
30            match job {
31                BackgroundJob::WalSync => {
32                    if let Some(w) = &wal {
33                        let _ = w.sync();
34                    }
35                }
36                BackgroundJob::EncryptedWalSync => {
37                    if let Some(w) = &enc_wal {
38                        let _ = w.sync();
39                    }
40                }
41                BackgroundJob::IndexUpdate {
42                    table,
43                    column,
44                    key,
45                    row_id,
46                } => {
47                    let _ = index.update_on_insert(&table, &column, &key, row_id);
48                }
49            }
50        }
51    });
52}
53
54impl Database {
55    /// 데이터베이스를 열거나 생성합니다.
56    ///
57    /// 지정된 경로에 데이터베이스를 생성하거나 기존 데이터베이스를 엽니다.
58    /// WOS (sled)를 통해 영구 저장소를 제공합니다.
59    ///
60    /// # 인자
61    ///
62    /// * `path` - 데이터베이스 디렉토리 경로
63    ///
64    /// # 예제
65    ///
66    /// ```rust
67    /// use dbx_core::Database;
68    /// use std::path::Path;
69    ///
70    /// # fn main() -> dbx_core::DbxResult<()> {
71    /// let db = Database::open(Path::new("./data"))?;
72    /// # Ok(())
73    /// # }
74    /// ```
75    #[instrument(skip(path))]
76    pub fn open(path: &Path) -> DbxResult<Arc<Self>> {
77        info!("Opening database at {:?}", path);
78        let wos_path = path.join("wos");
79        std::fs::create_dir_all(&wos_path)?;
80
81        // Initialize WAL
82        let wal_path = path.join("wal.log");
83        let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
84
85        let wos_backend = Arc::new(WosBackend::open(&wos_path)?);
86        let db_index = Arc::new(HashIndex::new());
87
88        // Load persisted metadata (schemas, indexes, and triggers)
89        let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
90        let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
91        let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
92        let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
93        let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
94
95        info!(
96            "Loaded {} schemas, {} indexes, {} triggers, {} procedures, and {} schedules from persistent storage",
97            loaded_schemas.len(),
98            loaded_indexes.len(),
99            loaded_triggers.len(),
100            loaded_procedures.len(),
101            loaded_schedules.len()
102        );
103
104        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
105        spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
106
107        let db = Self {
108            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
109            wos: WosVariant::Plain(Arc::clone(&wos_backend)),
110            schemas: Arc::new(RwLock::new(HashMap::new())),
111            tables: RwLock::new(HashMap::new()),
112            table_schemas: Arc::new(RwLock::new(loaded_schemas)),
113            index: db_index,
114            row_counters: Arc::new(DashMap::new()),
115            sql_parser: SqlParser::new(),
116            sql_optimizer: QueryOptimizer::new(),
117            wal: Some(wal),
118            encrypted_wal: None,
119
120            encryption: RwLock::new(None),
121            tx_manager: Arc::new(TransactionManager::new()),
122            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
123            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
124            job_sender: Some(tx),
125            durability: DurabilityLevel::Lazy,
126            index_registry: RwLock::new(loaded_indexes),
127            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
128            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
129            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
130            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
131            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
132            parallel_engine: Arc::new(
133                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
134                    .expect("Failed to create parallel engine"),
135            ),
136        };
137
138        // Perform crash recovery
139        let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
140            match record {
141                crate::wal::WalRecord::Insert {
142                    table,
143                    key,
144                    value,
145                    ts: _,
146                } => {
147                    db.delta.insert(table, key, value)?;
148                }
149                crate::wal::WalRecord::Delete { table, key, ts: _ } => {
150                    db.delta.delete(table, key)?;
151                }
152                crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
153                    db.delta.insert_batch(table, rows.clone())?;
154                }
155                _ => {}
156            }
157            Ok(())
158        };
159
160        let recovered_count =
161            crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
162        if recovered_count > 0 {
163            info!("Recovered {} WAL records", recovered_count);
164            // Flush recovered data to WOS to prevent duplicate inserts
165            info!("Flushing recovered WAL data to WOS");
166            db.flush()?;
167        }
168
169        // Auto-register loaded SQL triggers
170        if !loaded_triggers.is_empty() {
171            info!(
172                "Auto-registering {} persisted triggers",
173                loaded_triggers.len()
174            );
175            let mut executor = db.trigger_executor.write().unwrap();
176            executor.register_all(loaded_triggers);
177        }
178
179        // Auto-register loaded SQL procedures
180        if !loaded_procedures.is_empty() {
181            info!(
182                "Auto-registering {} persisted procedures",
183                loaded_procedures.len()
184            );
185            let mut executor = db.procedure_executor.write().unwrap();
186            executor.register_all(loaded_procedures);
187        }
188
189        // Auto-register loaded SQL schedules
190        if !loaded_schedules.is_empty() {
191            info!(
192                "Auto-registering {} persisted schedules",
193                loaded_schedules.len()
194            );
195            let executor = db.schedule_executor.write().unwrap();
196            for (_, schedule) in loaded_schedules {
197                let _ = executor.register(schedule);
198            }
199        }
200
201        info!("Database opened successfully");
202
203        // Wrap in Arc
204        let db_arc = Arc::new(db);
205
206        // Start background scheduler
207        let db_weak = Arc::downgrade(&db_arc);
208        db_arc
209            .schedule_executor
210            .write()
211            .unwrap()
212            .start_scheduler(db_weak)?;
213
214        Ok(db_arc)
215    }
216
217    /// 암호화된 데이터베이스를 열거나 생성합니다.
218    ///
219    /// 지정된 경로에 암호화된 데이터베이스를 생성하거나 기존 암호화 DB를 엽니다.
220    /// WAL과 WOS 모두 암호화됩니다.
221    ///
222    /// # 인자
223    ///
224    /// * `path` - 데이터베이스 디렉토리 경로
225    /// * `encryption` - 암호화 설정 (패스워드 또는 raw key 기반)
226    ///
227    /// # 예제
228    ///
229    /// ```rust,no_run
230    /// use dbx_core::Database;
231    /// use dbx_core::storage::encryption::EncryptionConfig;
232    /// use std::path::Path;
233    ///
234    /// let enc = EncryptionConfig::from_password("my-secret-password");
235    /// let db = Database::open_encrypted(Path::new("./data"), enc).unwrap();
236    /// ```
237    #[instrument(skip(path, encryption))]
238    pub fn open_encrypted(path: &Path, encryption: EncryptionConfig) -> DbxResult<Self> {
239        info!("Opening encrypted database at {:?}", path);
240        let wos_path = path.join("wos");
241        std::fs::create_dir_all(&wos_path)?;
242
243        // Initialize encrypted WAL
244        let wal_path = path.join("wal.enc.log");
245        let encrypted_wal = Arc::new(crate::wal::encrypted_wal::EncryptedWal::open(
246            &wal_path,
247            encryption.clone(),
248        )?);
249
250        // Initialize encrypted WOS
251        let enc_wos = Arc::new(EncryptedWosBackend::open(&wos_path, encryption.clone())?);
252        let db_index = Arc::new(HashIndex::new());
253
254        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
255        spawn_background_worker(
256            rx,
257            None,
258            Some(Arc::clone(&encrypted_wal)),
259            Arc::clone(&db_index),
260        );
261
262        let db = Self {
263            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
264            wos: WosVariant::Encrypted(Arc::clone(&enc_wos)),
265            schemas: Arc::new(RwLock::new(HashMap::new())),
266            tables: RwLock::new(HashMap::new()),
267            table_schemas: Arc::new(RwLock::new(HashMap::new())),
268            index: db_index,
269            row_counters: Arc::new(DashMap::new()),
270            sql_parser: SqlParser::new(),
271            sql_optimizer: QueryOptimizer::new(),
272            wal: None,
273            encrypted_wal: Some(Arc::clone(&encrypted_wal)),
274
275            encryption: RwLock::new(Some(encryption)),
276            tx_manager: Arc::new(TransactionManager::new()),
277            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
278            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
279            job_sender: Some(tx),
280            durability: DurabilityLevel::Lazy,
281            index_registry: RwLock::new(HashMap::new()),
282            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
283            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
284            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
285            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
286            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
287            parallel_engine: Arc::new(
288                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
289                    .expect("Failed to create parallel engine"),
290            ),
291        };
292
293        // Perform crash recovery from encrypted WAL
294        let records = encrypted_wal.replay()?;
295        let mut recovered_count = 0;
296        for record in &records {
297            match record {
298                crate::wal::WalRecord::Insert {
299                    table,
300                    key,
301                    value,
302                    ts: _,
303                } => {
304                    db.delta.insert(table, key, value)?;
305                    recovered_count += 1;
306                }
307                crate::wal::WalRecord::Delete { table, key, ts: _ } => {
308                    db.delta.delete(table, key)?;
309                    recovered_count += 1;
310                }
311                crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
312                    db.delta.insert_batch(table, rows.clone())?;
313                    recovered_count += rows.len();
314                }
315                _ => {}
316            }
317        }
318        if recovered_count > 0 {
319            info!("Recovered {} encrypted WAL records", recovered_count);
320        }
321
322        info!("Encrypted database opened successfully");
323        Ok(db)
324    }
325
326    /// 인메모리 데이터베이스를 생성합니다.
327    ///
328    /// 테스트 및 임시 데이터 저장용으로 사용됩니다. 영구 저장되지 않습니다.
329    ///
330    /// # 예제
331    ///
332    /// ```rust
333    /// use dbx_core::Database;
334    ///
335    /// # fn main() -> dbx_core::DbxResult<()> {
336    /// let db = Database::open_in_memory()?;
337    /// db.insert("cache", b"key1", b"value1")?;
338    /// # Ok(())
339    /// # }
340    /// ```
341    #[instrument]
342    pub fn open_in_memory() -> DbxResult<Self> {
343        info!("Creating in-memory database");
344        let db_index = Arc::new(HashIndex::new());
345        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
346        spawn_background_worker(rx, None, None, Arc::clone(&db_index));
347
348        Ok(Self {
349            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
350            wos: WosVariant::InMemory(Arc::new(
351                crate::storage::memory_wos::InMemoryWosBackend::new(),
352            )),
353            schemas: Arc::new(RwLock::new(HashMap::new())),
354            tables: RwLock::new(HashMap::new()),
355            table_schemas: Arc::new(RwLock::new(HashMap::new())),
356            index: db_index,
357            row_counters: Arc::new(DashMap::new()),
358            sql_parser: SqlParser::new(),
359            sql_optimizer: QueryOptimizer::new(),
360            wal: None,
361            encrypted_wal: None,
362
363            encryption: RwLock::new(None),
364            tx_manager: Arc::new(TransactionManager::new()),
365            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
366            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
367            job_sender: Some(tx),
368            durability: DurabilityLevel::Lazy,
369            index_registry: RwLock::new(HashMap::new()),
370            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
371            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
372            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
373            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
374            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
375            parallel_engine: Arc::new(
376                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
377                    .expect("Failed to create parallel engine"),
378            ),
379        })
380    }
381
382    /// 암호화된 인메모리 데이터베이스를 생성합니다.
383    ///
384    /// 테스트 및 임시 데이터 저장용으로, 메모리 상에서 value가 암호화됩니다.
385    ///
386    /// # 예제
387    ///
388    /// ```rust
389    /// use dbx_core::Database;
390    /// use dbx_core::storage::encryption::EncryptionConfig;
391    ///
392    /// # fn main() -> dbx_core::DbxResult<()> {
393    /// let enc = EncryptionConfig::from_password("secret");
394    /// let db = Database::open_in_memory_encrypted(enc)?;
395    /// db.insert("users", b"user:1", b"Alice")?;
396    /// let val = db.get("users", b"user:1")?;
397    /// assert_eq!(val, Some(b"Alice".to_vec()));
398    /// # Ok(())
399    /// # }
400    /// ```
401    pub fn open_in_memory_encrypted(encryption: EncryptionConfig) -> DbxResult<Self> {
402        let db_index = Arc::new(HashIndex::new());
403        let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
404        spawn_background_worker(rx, None, None, Arc::clone(&db_index));
405
406        Ok(Self {
407            delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
408            wos: WosVariant::Encrypted(Arc::new(EncryptedWosBackend::open_temporary(
409                encryption.clone(),
410            )?)),
411            schemas: Arc::new(RwLock::new(HashMap::new())),
412            tables: RwLock::new(HashMap::new()),
413            table_schemas: Arc::new(RwLock::new(HashMap::new())),
414            index: db_index,
415            row_counters: Arc::new(DashMap::new()),
416            sql_parser: SqlParser::new(),
417            sql_optimizer: QueryOptimizer::new(),
418            wal: None,
419            encrypted_wal: None,
420
421            encryption: RwLock::new(Some(encryption)),
422            tx_manager: Arc::new(TransactionManager::new()),
423            columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
424            gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
425            job_sender: Some(tx),
426            durability: DurabilityLevel::Lazy,
427            index_registry: RwLock::new(HashMap::new()),
428            automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
429            trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
430            trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
431            procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
432            schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
433            parallel_engine: Arc::new(
434                crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
435                    .expect("Failed to create parallel engine"),
436            ),
437        })
438    }
439
440    /// 최대 안전성 설정으로 데이터베이스를 엽니다 (Full durability).
441    ///
442    /// 금융, 의료 등 데이터 손실이 절대 허용되지 않는 경우 사용합니다.
443    /// 모든 쓰기 작업마다 fsync를 수행하여 최대 안전성을 보장하지만,
444    /// 성능은 기본 설정(Lazy)보다 느립니다.
445    ///
446    /// # 인자
447    ///
448    /// * `path` - 데이터베이스 파일 경로
449    pub fn open_safe(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
450        let db = Self::open(path.as_ref())?;
451        Arc::get_mut(&mut db.clone()).unwrap().durability = DurabilityLevel::Full;
452        Ok(db)
453    }
454
455    /// 최고 성능 설정으로 데이터베이스를 엽니다 (No durability).
456    ///
457    /// WAL을 사용하지 않아 최고 성능을 제공하지만,
458    /// 크래시 시 데이터 손실 가능성이 있습니다.
459    /// 캐시, 임시 데이터, 벤치마크 등에 적합합니다.
460    ///
461    /// # 인자
462    ///
463    /// * `path` - 데이터베이스 파일 경로
464    ///
465    pub fn open_fast(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
466        let db = Self::open(path.as_ref())?;
467        Arc::get_mut(&mut db.clone()).unwrap().durability = DurabilityLevel::None;
468        Ok(db)
469    }
470
471    /// 지정된 durability 설정으로 데이터베이스를 엽니다.
472    ///
473    /// # 인자
474    ///
475    /// * `path` - 데이터베이스 파일 경로
476    /// * `durability` - 내구성 수준
477    pub fn open_with_durability(
478        path: impl AsRef<Path>,
479        durability: DurabilityLevel,
480    ) -> DbxResult<Arc<Self>> {
481        let db = Self::open(path.as_ref())?;
482        Arc::get_mut(&mut db.clone()).unwrap().durability = durability;
483        Ok(db)
484    }
485}