1use crate::engine::types::BackgroundJob;
4use crate::engine::{Database, DbConfig, DeltaVariant, DurabilityLevel, WosVariant};
5use crate::error::DbxResult;
6use crate::index::HashIndex;
7use crate::sql::optimizer::QueryOptimizer;
8use crate::sql::parser::SqlParser;
9use crate::sql::view::{MaterializedViewRegistry, ViewRegistry};
10use crate::storage::StorageBackend; use crate::storage::delta_store::DeltaStore;
12use crate::storage::encryption::EncryptionConfig;
13use crate::storage::encryption::wos::EncryptedWosBackend;
14use crate::storage::memory_wos::InMemoryWosBackend;
15use crate::storage::native_wos::NativeWosBackend;
16use crate::transaction::mvcc::manager::TransactionManager; use dashmap::DashMap;
18use std::collections::HashMap;
19use std::path::Path;
20use std::sync::{Arc, RwLock};
21use tracing::{info, instrument};
22
23fn spawn_background_worker(
25 rx: std::sync::mpsc::Receiver<BackgroundJob>,
26 wal: Option<Arc<crate::wal::WriteAheadLog>>,
27 enc_wal: Option<Arc<crate::wal::encrypted_wal::EncryptedWal>>,
28 index: Arc<HashIndex>,
29) {
30 std::thread::spawn(move || {
31 while let Ok(job) = rx.recv() {
32 match job {
33 BackgroundJob::WalSync => {
34 if let Some(w) = &wal {
35 let _ = w.sync();
36 }
37 }
38 BackgroundJob::EncryptedWalSync => {
39 if let Some(w) = &enc_wal {
40 let _ = w.sync();
41 }
42 }
43 BackgroundJob::IndexUpdate {
44 table,
45 column,
46 key,
47 row_id,
48 } => {
49 let _ = index.update_on_insert(&table, &column, &key, row_id);
50 }
51 }
52 }
53 });
54}
55
56impl Database {
57 #[instrument(skip(path))]
78 pub fn open(path: &Path) -> DbxResult<Arc<Self>> {
79 info!("Opening database at {:?}", path);
80 let wos_path = path.join("wos");
81 std::fs::create_dir_all(&wos_path)?;
82
83 let wal_path = path.join("wal.log");
85 let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
86
87 let wos_backend = Arc::new(NativeWosBackend::open(&wos_path)?);
88 let db_index = Arc::new(HashIndex::new());
89
90 let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
92 let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
93 let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
94 let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
95 let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
96
97 info!(
98 "Loaded {} schemas, {} indexes, {} triggers, {} procedures, and {} schedules from persistent storage",
99 loaded_schemas.len(),
100 loaded_indexes.len(),
101 loaded_triggers.len(),
102 loaded_procedures.len(),
103 loaded_schedules.len()
104 );
105
106 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
107 spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
108
109 let db = Self {
110 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
111 memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
112 file_wos: Some(WosVariant::Native(Arc::clone(&wos_backend))),
113 table_persistence: DashMap::new(),
114 schemas: Arc::new(RwLock::new(HashMap::new())),
115 tables: RwLock::new(HashMap::new()),
116 table_schemas: Arc::new(RwLock::new(loaded_schemas)),
117 index: db_index,
118 row_counters: Arc::new(DashMap::new()),
119 sql_parser: SqlParser::new(),
120 sql_optimizer: QueryOptimizer::new(),
121 wal: Some(wal),
122 encrypted_wal: None,
123
124 encryption: RwLock::new(None),
125 tx_manager: Arc::new(TransactionManager::new()),
126 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
127 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
128 job_sender: Some(tx),
129 durability: DurabilityLevel::Lazy,
130 index_registry: RwLock::new(loaded_indexes),
131 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
132 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
133 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
134 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
135 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
136 parallel_engine: Arc::new(
137 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
138 .expect("Failed to create parallel engine"),
139 ),
140 view_registry: ViewRegistry::new(),
141 mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
142 partition_maps: Arc::new(RwLock::new(HashMap::new())),
143 partition_stats: Arc::new(DashMap::new()),
144 partition_compression: Arc::new(DashMap::new()),
145 partition_lifecycle: Arc::new(DashMap::new()),
146 partition_tier_hints: Arc::new(DashMap::new()),
147 partition_creation_times: Arc::new(DashMap::new()),
148 lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
149 lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
150 config: DbConfig::default(),
151 workload_analyzer: Arc::new(RwLock::new(
152 crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
153 )),
154 replication_master: None,
155 sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
156 scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
157 Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
158 )),
159 metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
160 cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
161 };
162
163 let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
165 match record {
166 crate::wal::WalRecord::Insert {
167 table,
168 key,
169 value,
170 ts: _,
171 } => {
172 db.delta.insert(table, key, value)?;
173 }
174 crate::wal::WalRecord::Delete { table, key, ts: _ } => {
175 db.delta.delete(table, key)?;
176 }
177 crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
178 db.delta.insert_batch(table, rows.clone())?;
179 }
180 _ => {}
181 }
182 Ok(())
183 };
184
185 let recovered_count =
186 crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
187 if recovered_count > 0 {
188 info!("Recovered {} WAL records", recovered_count);
189 info!("Flushing recovered WAL data to WOS");
191 db.flush()?;
192 }
193
194 if !loaded_triggers.is_empty() {
196 info!(
197 "Auto-registering {} persisted triggers",
198 loaded_triggers.len()
199 );
200 let mut executor = db.trigger_executor.write().unwrap();
201 executor.register_all(loaded_triggers);
202 }
203
204 if !loaded_procedures.is_empty() {
206 info!(
207 "Auto-registering {} persisted procedures",
208 loaded_procedures.len()
209 );
210 let mut executor = db.procedure_executor.write().unwrap();
211 executor.register_all(loaded_procedures);
212 }
213
214 if !loaded_schedules.is_empty() {
216 info!(
217 "Auto-registering {} persisted schedules",
218 loaded_schedules.len()
219 );
220 let executor = db.schedule_executor.write().unwrap();
221 for (_, schedule) in loaded_schedules {
222 let _ = executor.register(schedule);
223 }
224 }
225
226 info!("Database opened successfully");
227
228 let db_arc = Arc::new(db);
230
231 let db_weak = Arc::downgrade(&db_arc);
233 db_arc
234 .schedule_executor
235 .write()
236 .unwrap()
237 .start_scheduler(db_weak)?;
238
239 let mv_reg = Arc::clone(&db_arc.mat_view_registry);
241 let db_mv_weak = Arc::downgrade(&db_arc);
242 std::thread::Builder::new()
243 .name("dbx-mv-refresh".into())
244 .spawn(move || {
245 loop {
246 let dirty = mv_reg.wait_and_take_dirty();
248 std::thread::sleep(mv_reg.min_refresh_interval());
250 let mut all_dirty = dirty;
251 all_dirty.extend(mv_reg.take_dirty());
252 for name in all_dirty {
253 if let Some(db) = db_mv_weak.upgrade() {
254 let _ = db.execute_sql(&format!("REFRESH MATERIALIZED VIEW {}", name));
255 } else {
256 return;
258 }
259 }
260 }
261 })
262 .ok(); Ok(db_arc)
265 }
266
267 #[instrument(skip(path, encryption))]
288 pub fn open_encrypted(path: &Path, encryption: EncryptionConfig) -> DbxResult<Self> {
289 info!("Opening encrypted database at {:?}", path);
290 let wos_path = path.join("wos");
291 std::fs::create_dir_all(&wos_path)?;
292
293 let wal_path = path.join("wal.enc.log");
295 let encrypted_wal = Arc::new(crate::wal::encrypted_wal::EncryptedWal::open(
296 &wal_path,
297 encryption.clone(),
298 )?);
299
300 let enc_wos = Arc::new(EncryptedWosBackend::open(&wos_path, encryption.clone())?);
302 let db_index = Arc::new(HashIndex::new());
303
304 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
305 spawn_background_worker(
306 rx,
307 None,
308 Some(Arc::clone(&encrypted_wal)),
309 Arc::clone(&db_index),
310 );
311
312 let db = Self {
313 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
314 memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
315 file_wos: Some(WosVariant::Encrypted(Arc::clone(&enc_wos))),
316 table_persistence: DashMap::new(),
317 schemas: Arc::new(RwLock::new(HashMap::new())),
318 tables: RwLock::new(HashMap::new()),
319 table_schemas: Arc::new(RwLock::new(HashMap::new())),
320 index: db_index,
321 row_counters: Arc::new(DashMap::new()),
322 sql_parser: SqlParser::new(),
323 sql_optimizer: QueryOptimizer::new(),
324 wal: None,
325 encrypted_wal: Some(Arc::clone(&encrypted_wal)),
326
327 encryption: RwLock::new(Some(encryption)),
328 tx_manager: Arc::new(TransactionManager::new()),
329 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
330 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
331 job_sender: Some(tx),
332 durability: DurabilityLevel::Lazy,
333 index_registry: RwLock::new(HashMap::new()),
334 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
335 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
336 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
337 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
338 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
339 parallel_engine: Arc::new(
340 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
341 .expect("Failed to create parallel engine"),
342 ),
343 view_registry: ViewRegistry::new(),
344 mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
345 partition_maps: Arc::new(RwLock::new(HashMap::new())),
346 partition_stats: Arc::new(DashMap::new()),
347 partition_compression: Arc::new(DashMap::new()),
348 partition_lifecycle: Arc::new(DashMap::new()),
349 partition_tier_hints: Arc::new(DashMap::new()),
350 partition_creation_times: Arc::new(DashMap::new()),
351 lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
352 lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
353 config: DbConfig::default(),
354 workload_analyzer: Arc::new(RwLock::new(
355 crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
356 )),
357 replication_master: None,
358 sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
359 scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
360 Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
361 )),
362 metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
363 cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
364 };
365 let records = encrypted_wal.replay()?;
366 let mut recovered_count = 0;
367 for record in &records {
368 match record {
369 crate::wal::WalRecord::Insert {
370 table,
371 key,
372 value,
373 ts: _,
374 } => {
375 db.delta.insert(table, key, value)?;
376 recovered_count += 1;
377 }
378 crate::wal::WalRecord::Delete { table, key, ts: _ } => {
379 db.delta.delete(table, key)?;
380 recovered_count += 1;
381 }
382 crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
383 db.delta.insert_batch(table, rows.clone())?;
384 recovered_count += rows.len();
385 }
386 _ => {}
387 }
388 }
389 if recovered_count > 0 {
390 info!("Recovered {} encrypted WAL records", recovered_count);
391 }
392
393 info!("Encrypted database opened successfully");
394 Ok(db)
395 }
396
397 #[instrument]
413 pub fn open_in_memory() -> DbxResult<Self> {
414 info!("Creating in-memory database");
415 let db_index = Arc::new(HashIndex::new());
416 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
417 spawn_background_worker(rx, None, None, Arc::clone(&db_index));
418
419 Ok(Self {
420 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
421 memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
422 file_wos: None,
423 table_persistence: DashMap::new(),
424 schemas: Arc::new(RwLock::new(HashMap::new())),
425 tables: RwLock::new(HashMap::new()),
426 table_schemas: Arc::new(RwLock::new(HashMap::new())),
427 index: db_index,
428 row_counters: Arc::new(DashMap::new()),
429 sql_parser: SqlParser::new(),
430 sql_optimizer: QueryOptimizer::new(),
431 wal: None,
432 encrypted_wal: None,
433
434 encryption: RwLock::new(None),
435 tx_manager: Arc::new(TransactionManager::new()),
436 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
437 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
438 job_sender: Some(tx),
439 durability: DurabilityLevel::Lazy,
440 index_registry: RwLock::new(HashMap::new()),
441 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
442 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
443 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
444 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
445 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
446 parallel_engine: Arc::new(
447 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
448 .expect("Failed to create parallel engine"),
449 ),
450 view_registry: ViewRegistry::new(),
451 mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
452 partition_maps: Arc::new(RwLock::new(HashMap::new())),
453 partition_stats: Arc::new(DashMap::new()),
454 partition_compression: Arc::new(DashMap::new()),
455 partition_lifecycle: Arc::new(DashMap::new()),
456 partition_tier_hints: Arc::new(DashMap::new()),
457 partition_creation_times: Arc::new(DashMap::new()),
458 lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
459 lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
460 config: DbConfig::default(),
461 workload_analyzer: Arc::new(RwLock::new(
462 crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
463 )),
464 replication_master: None,
465 sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
466 scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
467 Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
468 )),
469 metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
470 cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
471 })
472 }
473
474 pub fn open_in_memory_encrypted(encryption: EncryptionConfig) -> DbxResult<Self> {
494 let db_index = Arc::new(HashIndex::new());
495 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
496 spawn_background_worker(rx, None, None, Arc::clone(&db_index));
497
498 Ok(Self {
499 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
500 memory_wos: WosVariant::Encrypted(Arc::new(EncryptedWosBackend::open_temporary(
501 encryption.clone(),
502 )?)),
503 file_wos: None,
504 table_persistence: DashMap::new(),
505 schemas: Arc::new(RwLock::new(HashMap::new())),
506 tables: RwLock::new(HashMap::new()),
507 table_schemas: Arc::new(RwLock::new(HashMap::new())),
508 index: db_index,
509 row_counters: Arc::new(DashMap::new()),
510 sql_parser: SqlParser::new(),
511 sql_optimizer: QueryOptimizer::new(),
512 wal: None,
513 encrypted_wal: None,
514
515 encryption: RwLock::new(Some(encryption)),
516 tx_manager: Arc::new(TransactionManager::new()),
517 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
518 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
519 job_sender: Some(tx),
520 durability: DurabilityLevel::Lazy,
521 index_registry: RwLock::new(HashMap::new()),
522 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
523 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
524 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
525 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
526 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
527 parallel_engine: Arc::new(
528 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
529 .expect("Failed to create parallel engine"),
530 ),
531 view_registry: ViewRegistry::new(),
532 mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
533 partition_maps: Arc::new(RwLock::new(HashMap::new())),
534 partition_stats: Arc::new(DashMap::new()),
535 partition_compression: Arc::new(DashMap::new()),
536 partition_lifecycle: Arc::new(DashMap::new()),
537 partition_tier_hints: Arc::new(DashMap::new()),
538 partition_creation_times: Arc::new(DashMap::new()),
539 lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
540 lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
541 config: DbConfig::default(),
542 workload_analyzer: Arc::new(RwLock::new(
543 crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
544 )),
545 replication_master: None,
546 sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
547 scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
548 Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
549 )),
550 metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
551 cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
552 })
553 }
554
555 pub fn open_safe(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
565 Self::open_with_durability(path, DurabilityLevel::Full)
566 }
567
568 pub fn open_fast(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
579 Self::open_with_durability(path, DurabilityLevel::None)
580 }
581
582 pub fn open_with_durability(
589 path: impl AsRef<Path>,
590 durability: DurabilityLevel,
591 ) -> DbxResult<Arc<Self>> {
592 info!(
593 "Opening database at {:?} with durability {:?}",
594 path.as_ref(),
595 durability
596 );
597 let path = path.as_ref();
598 let wos_path = path.join("wos");
599 std::fs::create_dir_all(&wos_path)?;
600
601 let wal_path = path.join("wal.log");
603 let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
604
605 let wos_backend = Arc::new(NativeWosBackend::open(&wos_path)?);
606 let db_index = Arc::new(HashIndex::new());
607
608 let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
610 let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
611 let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
612 let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
613 let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
614
615 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
616 spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
617
618 let db = Self {
619 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
620 memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
621 file_wos: Some(WosVariant::Native(Arc::clone(&wos_backend))),
622 table_persistence: DashMap::new(),
623 schemas: Arc::new(RwLock::new(HashMap::new())),
624 tables: RwLock::new(HashMap::new()),
625 table_schemas: Arc::new(RwLock::new(loaded_schemas)),
626 index: db_index,
627 row_counters: Arc::new(DashMap::new()),
628 sql_parser: SqlParser::new(),
629 sql_optimizer: QueryOptimizer::new(),
630 wal: Some(wal),
631 encrypted_wal: None,
632 encryption: RwLock::new(None),
633 tx_manager: Arc::new(TransactionManager::new()),
634 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
635 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
636 job_sender: Some(tx),
637 durability, index_registry: RwLock::new(loaded_indexes),
639 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
640 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
641 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
642 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
643 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
644 parallel_engine: Arc::new(
645 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
646 .expect("Failed to create parallel engine"),
647 ),
648 view_registry: ViewRegistry::new(),
649 mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
650 partition_maps: Arc::new(RwLock::new(HashMap::new())),
651 partition_stats: Arc::new(DashMap::new()),
652 partition_compression: Arc::new(DashMap::new()),
653 partition_lifecycle: Arc::new(DashMap::new()),
654 partition_tier_hints: Arc::new(DashMap::new()),
655 partition_creation_times: Arc::new(DashMap::new()),
656 lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
657 lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
658 config: DbConfig::default(),
659 workload_analyzer: Arc::new(RwLock::new(
660 crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
661 )),
662 replication_master: None,
663 sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
664 scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
665 Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
666 )),
667 metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
668 cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
669 };
670
671 let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
673 match record {
674 crate::wal::WalRecord::Insert {
675 table,
676 key,
677 value,
678 ts: _,
679 } => {
680 db.delta.insert(table, key, value)?;
681 }
682 crate::wal::WalRecord::Delete { table, key, ts: _ } => {
683 db.delta.delete(table, key)?;
684 }
685 crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
686 db.delta.insert_batch(table, rows.clone())?;
687 }
688 _ => {}
689 }
690 Ok(())
691 };
692 let recovered_count =
693 crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
694 if recovered_count > 0 {
695 info!("Recovered {} WAL records", recovered_count);
696 db.flush()?;
697 }
698
699 if !loaded_triggers.is_empty() {
701 db.trigger_executor
702 .write()
703 .unwrap()
704 .register_all(loaded_triggers);
705 }
706 if !loaded_procedures.is_empty() {
707 db.procedure_executor
708 .write()
709 .unwrap()
710 .register_all(loaded_procedures);
711 }
712 if !loaded_schedules.is_empty() {
713 let executor = db.schedule_executor.write().unwrap();
714 for (_, schedule) in loaded_schedules {
715 let _ = executor.register(schedule);
716 }
717 }
718
719 info!(
720 "Database opened successfully with durability {:?}",
721 durability
722 );
723
724 let db_arc = Arc::new(db);
725 let db_weak = Arc::downgrade(&db_arc);
726 db_arc
727 .schedule_executor
728 .write()
729 .unwrap()
730 .start_scheduler(db_weak)?;
731
732 let mv_reg = Arc::clone(&db_arc.mat_view_registry);
734 let db_mv_weak = Arc::downgrade(&db_arc);
735 std::thread::Builder::new()
736 .name("dbx-mv-refresh".into())
737 .spawn(move || {
738 loop {
739 let dirty = mv_reg.wait_and_take_dirty();
740 std::thread::sleep(mv_reg.min_refresh_interval());
741 let mut all_dirty = dirty;
742 all_dirty.extend(mv_reg.take_dirty());
743 for name in all_dirty {
744 if let Some(db) = db_mv_weak.upgrade() {
745 let _ = db.execute_sql(&format!("REFRESH MATERIALIZED VIEW {}", name));
746 } else {
747 return;
748 }
749 }
750 }
751 })
752 .ok();
753
754 Ok(db_arc)
755 }
756}
757
758impl Database {
759 pub fn open_with_config(
779 path: &std::path::Path,
780 config: DbConfig,
781 ) -> DbxResult<std::sync::Arc<Self>> {
782 use crate::engine::parallel_engine::{ParallelExecutionEngine, ParallelizationPolicy};
783 use crate::index::HashIndex;
784 use crate::sql::optimizer::QueryOptimizer;
785 use crate::sql::parser::SqlParser;
786 use crate::storage::native_wos::NativeWosBackend;
787 use crate::transaction::mvcc::manager::TransactionManager;
788 use dashmap::DashMap;
789 use std::collections::HashMap;
790 use std::sync::{Arc, RwLock};
791 use tracing::info;
792
793 info!("Opening database at {:?} with custom config", path);
794 let wos_path = path.join("wos");
795 std::fs::create_dir_all(&wos_path)?;
796
797 let wal_path = path.join("wal.log");
798 let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
799
800 let wos_backend = Arc::new(NativeWosBackend::open_with_mode(
801 &wos_path,
802 config.dirty_buffer_mode,
803 )?);
804 let db_index = Arc::new(HashIndex::new());
805
806 let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
807 let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
808 let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
809 let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
810 let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
811
812 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
813 spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
814
815 let parallel_engine = Arc::new(
817 ParallelExecutionEngine::new_with_config(ParallelizationPolicy::Auto, config.clone())
818 .expect("Failed to create parallel engine"),
819 );
820
821 let db = Self {
822 delta: DeltaVariant::RowBased(Arc::new(crate::storage::delta_store::DeltaStore::new())),
823 memory_wos: crate::engine::WosVariant::InMemory(Arc::new(
824 crate::storage::memory_wos::InMemoryWosBackend::new(),
825 )),
826 file_wos: Some(crate::engine::WosVariant::Native(Arc::clone(&wos_backend))),
827 table_persistence: DashMap::new(),
828 schemas: Arc::new(RwLock::new(HashMap::new())),
829 tables: RwLock::new(HashMap::new()),
830 table_schemas: Arc::new(RwLock::new(loaded_schemas)),
831 index: db_index,
832 row_counters: Arc::new(DashMap::new()),
833 sql_parser: SqlParser::new(),
834 sql_optimizer: QueryOptimizer::new(),
835 wal: Some(wal),
836 encrypted_wal: None,
837 encryption: RwLock::new(None),
838 tx_manager: Arc::new(TransactionManager::new()),
839 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
840 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
841 job_sender: Some(tx),
842 durability: DurabilityLevel::Lazy,
843 index_registry: RwLock::new(loaded_indexes),
844 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
845 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
846 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
847 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
848 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
849 parallel_engine,
850 view_registry: ViewRegistry::new(),
851 mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
852 partition_maps: Arc::new(RwLock::new(HashMap::new())),
853 partition_stats: Arc::new(DashMap::new()),
854 partition_compression: Arc::new(DashMap::new()),
855 partition_lifecycle: Arc::new(DashMap::new()),
856 partition_tier_hints: Arc::new(DashMap::new()),
857 partition_creation_times: Arc::new(DashMap::new()),
858 lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
859 lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
860 config: config.clone(),
861 workload_analyzer: Arc::new(RwLock::new(
862 crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
863 )),
864 replication_master: None,
865 sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
866 scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
867 Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
868 )),
869 metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
870 cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
871 };
872
873 let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
875 match record {
876 crate::wal::WalRecord::Insert {
877 table,
878 key,
879 value,
880 ts: _,
881 } => {
882 db.delta.insert(table, key, value)?;
883 }
884 crate::wal::WalRecord::Delete { table, key, ts: _ } => {
885 db.delta.delete(table, key)?;
886 }
887 crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
888 db.delta.insert_batch(table, rows.clone())?;
889 }
890 _ => {}
891 }
892 Ok(())
893 };
894 let recovered_count =
895 crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
896 if recovered_count > 0 {
897 info!("Recovered {} WAL records", recovered_count);
898 db.flush()?;
899 }
900
901 if !loaded_triggers.is_empty() {
903 db.trigger_executor
904 .write()
905 .unwrap()
906 .register_all(loaded_triggers);
907 }
908 if !loaded_procedures.is_empty() {
909 db.procedure_executor
910 .write()
911 .unwrap()
912 .register_all(loaded_procedures);
913 }
914 if !loaded_schedules.is_empty() {
915 let executor = db.schedule_executor.write().unwrap();
916 for (_, schedule) in loaded_schedules {
917 let _ = executor.register(schedule);
918 }
919 }
920
921 info!(
922 "Database opened with custom parallelism config (cpu_cap={:.0}%)",
923 config.parallelism.cpu_cap * 100.0
924 );
925
926 let db_arc = Arc::new(db);
927 let db_weak = Arc::downgrade(&db_arc);
928 db_arc
929 .schedule_executor
930 .write()
931 .unwrap()
932 .start_scheduler(db_weak)?;
933
934 Ok(db_arc)
935 }
936}