1use crate::engine::types::BackgroundJob;
4use crate::engine::{Database, DbConfig, DeltaVariant, DurabilityLevel, WosVariant};
5use crate::error::DbxResult;
6use crate::index::HashIndex;
7use crate::sql::optimizer::QueryOptimizer;
8use crate::sql::parser::SqlParser;
9use crate::sql::view::{MaterializedViewRegistry, ViewRegistry};
10use crate::storage::StorageBackend; use crate::storage::delta_store::DeltaStore;
12use crate::storage::encryption::EncryptionConfig;
13use crate::storage::encryption::wos::EncryptedWosBackend;
14use crate::storage::memory_wos::InMemoryWosBackend;
15use crate::storage::native_wos::NativeWosBackend;
16use crate::transaction::mvcc::manager::TransactionManager; use dashmap::DashMap;
18use std::collections::HashMap;
19use std::path::Path;
20use std::sync::{Arc, RwLock};
21use tracing::{info, instrument};
22
23fn spawn_background_worker(
25 rx: std::sync::mpsc::Receiver<BackgroundJob>,
26 wal: Option<Arc<crate::wal::WriteAheadLog>>,
27 enc_wal: Option<Arc<crate::wal::encrypted_wal::EncryptedWal>>,
28 index: Arc<HashIndex>,
29) {
30 std::thread::spawn(move || {
31 while let Ok(job) = rx.recv() {
32 match job {
33 BackgroundJob::WalSync => {
34 if let Some(w) = &wal {
35 let _ = w.sync();
36 }
37 }
38 BackgroundJob::EncryptedWalSync => {
39 if let Some(w) = &enc_wal {
40 let _ = w.sync();
41 }
42 }
43 BackgroundJob::IndexUpdate {
44 table,
45 column,
46 key,
47 row_id,
48 } => {
49 let _ = index.update_on_insert(&table, &column, &key, row_id);
50 }
51 }
52 }
53 });
54}
55
56impl Database {
57 #[instrument(skip(path))]
78 pub fn open(path: &Path) -> DbxResult<Arc<Self>> {
79 info!("Opening database at {:?}", path);
80 let storage_manager = Arc::new(crate::storage::manager::StoragePathManager::new(path));
81 let wos_path = storage_manager.wos_dir()?;
82 let wal_path = storage_manager.wal_path();
83
84 let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
86
87 let wos_backend = Arc::new(NativeWosBackend::open(&wos_path)?);
88 let db_index = Arc::new(HashIndex::new());
89
90 let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
92 let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
93 let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
94 let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
95 let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
96
97 info!(
98 "Loaded {} schemas, {} indexes, {} triggers, {} procedures, and {} schedules from persistent storage",
99 loaded_schemas.len(),
100 loaded_indexes.len(),
101 loaded_triggers.len(),
102 loaded_procedures.len(),
103 loaded_schedules.len()
104 );
105
106 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
107 spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
108
109 let mut db = Self {
110 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
111 memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
112 file_wos: Some(WosVariant::Native(Arc::clone(&wos_backend))),
113 storage_manager: Arc::clone(&storage_manager),
114 table_persistence: DashMap::new(),
115 schemas: Arc::new(RwLock::new(HashMap::new())),
116 tables: RwLock::new(HashMap::new()),
117 table_schemas: Arc::new(RwLock::new(loaded_schemas)),
118 index: db_index,
119 row_counters: Arc::new(DashMap::new()),
120 sql_parser: SqlParser::new(),
121 sql_optimizer: QueryOptimizer::new(),
122 wal: Some(wal),
123 encrypted_wal: None,
124
125 encryption: RwLock::new(None),
126 tx_manager: Arc::new(TransactionManager::new()),
127 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
128 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
129 job_sender: Some(tx),
130 durability: DurabilityLevel::Lazy,
131 index_registry: RwLock::new(loaded_indexes),
132 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
133 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
134 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
135 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
136 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
137 parallel_engine: Arc::new(
138 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
139 .expect("Failed to create parallel engine"),
140 ),
141 view_registry: ViewRegistry::new(),
142 mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
143 partition_maps: Arc::new(RwLock::new(HashMap::new())),
144 partition_stats: Arc::new(DashMap::new()),
145 partition_compression: Arc::new(DashMap::new()),
146 partition_lifecycle: Arc::new(DashMap::new()),
147 partition_tier_hints: Arc::new(DashMap::new()),
148 partition_creation_times: Arc::new(DashMap::new()),
149 lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
150 lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
151 config: DbConfig::default(),
152 workload_analyzer: Arc::new(RwLock::new(
153 crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
154 )),
155 replication_master: None,
156 sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
157 scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
158 Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
159 )),
160 metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
161 cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
162 metadata_registry: Arc::new(crate::storage::metadata::MetadataRegistry::new()),
163 };
164
165 db.sql_optimizer
166 .register_rule(Box::new(crate::sql::optimizer::TierPruningRule::new(
167 Arc::clone(&db.metadata_registry),
168 )));
169
170 let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
172 match record {
173 crate::wal::WalRecord::Insert {
174 table,
175 key,
176 value,
177 ts: _,
178 } => {
179 db.delta.insert(table, key, value)?;
180 }
181 crate::wal::WalRecord::Delete { table, key, ts: _ } => {
182 db.delta.delete(table, key)?;
183 }
184 crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
185 db.delta.insert_batch(table, rows.clone())?;
186 }
187 _ => {}
188 }
189 Ok(())
190 };
191
192 let recovered_count =
193 crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
194 if recovered_count > 0 {
195 info!("Recovered {} WAL records", recovered_count);
196 info!("Flushing recovered WAL data to WOS");
198 db.flush()?;
199 }
200
201 if !loaded_triggers.is_empty() {
203 info!(
204 "Auto-registering {} persisted triggers",
205 loaded_triggers.len()
206 );
207 let mut executor = db.trigger_executor.write().unwrap();
208 executor.register_all(loaded_triggers);
209 }
210
211 if !loaded_procedures.is_empty() {
213 info!(
214 "Auto-registering {} persisted procedures",
215 loaded_procedures.len()
216 );
217 let mut executor = db.procedure_executor.write().unwrap();
218 executor.register_all(loaded_procedures);
219 }
220
221 if !loaded_schedules.is_empty() {
223 info!(
224 "Auto-registering {} persisted schedules",
225 loaded_schedules.len()
226 );
227 let executor = db.schedule_executor.write().unwrap();
228 for (_, schedule) in loaded_schedules {
229 let _ = executor.register(schedule);
230 }
231 }
232
233 info!("Database opened successfully");
234
235 let db_arc = Arc::new(db);
237
238 let db_weak = Arc::downgrade(&db_arc);
240 db_arc
241 .schedule_executor
242 .write()
243 .unwrap()
244 .start_scheduler(db_weak)?;
245
246 let mv_reg = Arc::clone(&db_arc.mat_view_registry);
248 let db_mv_weak = Arc::downgrade(&db_arc);
249 std::thread::Builder::new()
250 .name("dbx-mv-refresh".into())
251 .spawn(move || {
252 loop {
253 let dirty = mv_reg.wait_and_take_dirty();
255 std::thread::sleep(mv_reg.min_refresh_interval());
257 let mut all_dirty = dirty;
258 all_dirty.extend(mv_reg.take_dirty());
259 for name in all_dirty {
260 if let Some(db) = db_mv_weak.upgrade() {
261 let _ = db.execute_sql(&format!("REFRESH MATERIALIZED VIEW {}", name));
262 } else {
263 return;
265 }
266 }
267 }
268 })
269 .ok(); Ok(db_arc)
272 }
273
274 #[instrument(skip(path, encryption))]
295 pub fn open_encrypted(path: &Path, encryption: EncryptionConfig) -> DbxResult<Self> {
296 info!("Opening encrypted database at {:?}", path);
297 let storage_manager = Arc::new(crate::storage::manager::StoragePathManager::new(path));
298 let wos_path = storage_manager.wos_dir()?;
299
300 let wal_path = storage_manager.encrypted_wal_path();
302 let encrypted_wal = Arc::new(crate::wal::encrypted_wal::EncryptedWal::open(
303 &wal_path,
304 encryption.clone(),
305 )?);
306
307 let enc_wos = Arc::new(EncryptedWosBackend::open(&wos_path, encryption.clone())?);
309 let db_index = Arc::new(HashIndex::new());
310
311 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
312 spawn_background_worker(
313 rx,
314 None,
315 Some(Arc::clone(&encrypted_wal)),
316 Arc::clone(&db_index),
317 );
318
319 let mut db = Self {
320 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
321 memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
322 file_wos: Some(WosVariant::Encrypted(Arc::clone(&enc_wos))),
323 storage_manager: Arc::clone(&storage_manager),
324 table_persistence: DashMap::new(),
325 schemas: Arc::new(RwLock::new(HashMap::new())),
326 tables: RwLock::new(HashMap::new()),
327 table_schemas: Arc::new(RwLock::new(HashMap::new())),
328 index: db_index,
329 row_counters: Arc::new(DashMap::new()),
330 sql_parser: SqlParser::new(),
331 sql_optimizer: QueryOptimizer::new(),
332 wal: None,
333 encrypted_wal: Some(Arc::clone(&encrypted_wal)),
334
335 encryption: RwLock::new(Some(encryption)),
336 tx_manager: Arc::new(TransactionManager::new()),
337 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
338 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
339 job_sender: Some(tx),
340 durability: DurabilityLevel::Lazy,
341 index_registry: RwLock::new(HashMap::new()),
342 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
343 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
344 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
345 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
346 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
347 parallel_engine: Arc::new(
348 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
349 .expect("Failed to create parallel engine"),
350 ),
351 view_registry: ViewRegistry::new(),
352 mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
353 partition_maps: Arc::new(RwLock::new(HashMap::new())),
354 partition_stats: Arc::new(DashMap::new()),
355 partition_compression: Arc::new(DashMap::new()),
356 partition_lifecycle: Arc::new(DashMap::new()),
357 partition_tier_hints: Arc::new(DashMap::new()),
358 partition_creation_times: Arc::new(DashMap::new()),
359 lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
360 lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
361 config: DbConfig::default(),
362 workload_analyzer: Arc::new(RwLock::new(
363 crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
364 )),
365 replication_master: None,
366 sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
367 scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
368 Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
369 )),
370 metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
371 cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
372 metadata_registry: Arc::new(crate::storage::metadata::MetadataRegistry::new()),
373 };
374
375 db.sql_optimizer
376 .register_rule(Box::new(crate::sql::optimizer::TierPruningRule::new(
377 Arc::clone(&db.metadata_registry),
378 )));
379
380 let records = encrypted_wal.replay()?;
381 let mut recovered_count = 0;
382 for record in &records {
383 match record {
384 crate::wal::WalRecord::Insert {
385 table,
386 key,
387 value,
388 ts: _,
389 } => {
390 db.delta.insert(table, key, value)?;
391 recovered_count += 1;
392 }
393 crate::wal::WalRecord::Delete { table, key, ts: _ } => {
394 db.delta.delete(table, key)?;
395 recovered_count += 1;
396 }
397 crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
398 db.delta.insert_batch(table, rows.clone())?;
399 recovered_count += rows.len();
400 }
401 _ => {}
402 }
403 }
404 if recovered_count > 0 {
405 info!("Recovered {} encrypted WAL records", recovered_count);
406 }
407
408 info!("Encrypted database opened successfully");
409 Ok(db)
410 }
411
412 #[instrument]
428 pub fn open_in_memory() -> DbxResult<Self> {
429 info!("Creating in-memory database");
430 let storage_manager = Arc::new(crate::storage::manager::StoragePathManager::new(
431 tempfile::tempdir()
432 .map_err(|e| crate::error::DbxError::Storage(e.to_string()))?
433 .path(),
434 ));
435 let db_index = Arc::new(HashIndex::new());
436 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
437 spawn_background_worker(rx, None, None, Arc::clone(&db_index));
438
439 let mut db = Self {
440 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
441 memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
442 file_wos: None,
443 storage_manager: Arc::clone(&storage_manager),
444 table_persistence: DashMap::new(),
445 schemas: Arc::new(RwLock::new(HashMap::new())),
446 tables: RwLock::new(HashMap::new()),
447 table_schemas: Arc::new(RwLock::new(HashMap::new())),
448 index: db_index,
449 row_counters: Arc::new(DashMap::new()),
450 sql_parser: SqlParser::new(),
451 sql_optimizer: QueryOptimizer::new(),
452 wal: None,
453 encrypted_wal: None,
454
455 encryption: RwLock::new(None),
456 tx_manager: Arc::new(TransactionManager::new()),
457 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
458 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
459 job_sender: Some(tx),
460 durability: DurabilityLevel::Lazy,
461 index_registry: RwLock::new(HashMap::new()),
462 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
463 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
464 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
465 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
466 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
467 parallel_engine: Arc::new(
468 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
469 .expect("Failed to create parallel engine"),
470 ),
471 view_registry: ViewRegistry::new(),
472 mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
473 partition_maps: Arc::new(RwLock::new(HashMap::new())),
474 partition_stats: Arc::new(DashMap::new()),
475 partition_compression: Arc::new(DashMap::new()),
476 partition_lifecycle: Arc::new(DashMap::new()),
477 partition_tier_hints: Arc::new(DashMap::new()),
478 partition_creation_times: Arc::new(DashMap::new()),
479 lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
480 lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
481 config: DbConfig::default(),
482 workload_analyzer: Arc::new(RwLock::new(
483 crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
484 )),
485 replication_master: None,
486 sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
487 scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
488 Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
489 )),
490 metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
491 cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
492 metadata_registry: Arc::new(crate::storage::metadata::MetadataRegistry::new()),
493 };
494
495 db.sql_optimizer
496 .register_rule(Box::new(crate::sql::optimizer::TierPruningRule::new(
497 Arc::clone(&db.metadata_registry),
498 )));
499
500 Ok(db)
501 }
502
503 pub fn open_in_memory_encrypted(encryption: EncryptionConfig) -> DbxResult<Self> {
523 info!("Creating encrypted in-memory database");
524 let storage_manager = Arc::new(crate::storage::manager::StoragePathManager::new(
525 tempfile::tempdir()
526 .map_err(|e| crate::error::DbxError::Storage(e.to_string()))?
527 .path(),
528 ));
529 let db_index = Arc::new(HashIndex::new());
530 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
531 spawn_background_worker(rx, None, None, Arc::clone(&db_index));
532
533 let mut db = Self {
534 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
535 memory_wos: WosVariant::Encrypted(Arc::new(EncryptedWosBackend::open_temporary(
536 encryption.clone(),
537 )?)),
538 file_wos: None,
539 storage_manager: Arc::clone(&storage_manager),
540 table_persistence: DashMap::new(),
541 schemas: Arc::new(RwLock::new(HashMap::new())),
542 tables: RwLock::new(HashMap::new()),
543 table_schemas: Arc::new(RwLock::new(HashMap::new())),
544 index: db_index,
545 row_counters: Arc::new(DashMap::new()),
546 sql_parser: SqlParser::new(),
547 sql_optimizer: QueryOptimizer::new(),
548 wal: None,
549 encrypted_wal: None,
550
551 encryption: RwLock::new(Some(encryption)),
552 tx_manager: Arc::new(TransactionManager::new()),
553 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
554 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
555 job_sender: Some(tx),
556 durability: DurabilityLevel::Lazy,
557 index_registry: RwLock::new(HashMap::new()),
558 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
559 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
560 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
561 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
562 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
563 parallel_engine: Arc::new(
564 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
565 .expect("Failed to create parallel engine"),
566 ),
567 view_registry: ViewRegistry::new(),
568 mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
569 partition_maps: Arc::new(RwLock::new(HashMap::new())),
570 partition_stats: Arc::new(DashMap::new()),
571 partition_compression: Arc::new(DashMap::new()),
572 partition_lifecycle: Arc::new(DashMap::new()),
573 partition_tier_hints: Arc::new(DashMap::new()),
574 partition_creation_times: Arc::new(DashMap::new()),
575 lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
576 lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
577 config: DbConfig::default(),
578 workload_analyzer: Arc::new(RwLock::new(
579 crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
580 )),
581 replication_master: None,
582 sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
583 scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
584 Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
585 )),
586 metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
587 cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
588 metadata_registry: Arc::new(crate::storage::metadata::MetadataRegistry::new()),
589 };
590
591 db.sql_optimizer
592 .register_rule(Box::new(crate::sql::optimizer::TierPruningRule::new(
593 Arc::clone(&db.metadata_registry),
594 )));
595
596 Ok(db)
597 }
598
599 pub fn open_safe(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
609 Self::open_with_durability(path, DurabilityLevel::Full)
610 }
611
612 pub fn open_fast(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
623 Self::open_with_durability(path, DurabilityLevel::None)
624 }
625
626 pub fn open_with_durability(
633 path: impl AsRef<Path>,
634 durability: DurabilityLevel,
635 ) -> DbxResult<Arc<Self>> {
636 info!(
637 "Opening database at {:?} with durability {:?}",
638 path.as_ref(),
639 durability
640 );
641 let storage_manager = Arc::new(crate::storage::manager::StoragePathManager::new(path));
642 let wos_path = storage_manager.wos_dir()?;
643
644 let wal_path = storage_manager.wal_path();
646 let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
647
648 let wos_backend = Arc::new(NativeWosBackend::open(&wos_path)?);
649 let db_index = Arc::new(HashIndex::new());
650
651 let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
653 let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
654 let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
655 let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
656 let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
657
658 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
659 spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
660
661 let db = Self {
662 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
663 memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
664 file_wos: Some(WosVariant::Native(Arc::clone(&wos_backend))),
665 storage_manager: Arc::clone(&storage_manager),
666 table_persistence: DashMap::new(),
667 schemas: Arc::new(RwLock::new(HashMap::new())),
668 tables: RwLock::new(HashMap::new()),
669 table_schemas: Arc::new(RwLock::new(loaded_schemas)),
670 index: db_index,
671 row_counters: Arc::new(DashMap::new()),
672 sql_parser: SqlParser::new(),
673 sql_optimizer: QueryOptimizer::new(),
674 wal: Some(wal),
675 encrypted_wal: None,
676 encryption: RwLock::new(None),
677 tx_manager: Arc::new(TransactionManager::new()),
678 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
679 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
680 job_sender: Some(tx),
681 durability, index_registry: RwLock::new(loaded_indexes),
683 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
684 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
685 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
686 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
687 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
688 parallel_engine: Arc::new(
689 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
690 .expect("Failed to create parallel engine"),
691 ),
692 view_registry: ViewRegistry::new(),
693 mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
694 partition_maps: Arc::new(RwLock::new(HashMap::new())),
695 partition_stats: Arc::new(DashMap::new()),
696 partition_compression: Arc::new(DashMap::new()),
697 partition_lifecycle: Arc::new(DashMap::new()),
698 partition_tier_hints: Arc::new(DashMap::new()),
699 partition_creation_times: Arc::new(DashMap::new()),
700 lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
701 lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
702 config: DbConfig::default(),
703 workload_analyzer: Arc::new(RwLock::new(
704 crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
705 )),
706 replication_master: None,
707 sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
708 scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
709 Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
710 )),
711 metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
712 cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
713 metadata_registry: Arc::new(crate::storage::metadata::MetadataRegistry::new()),
714 };
715
716 let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
718 match record {
719 crate::wal::WalRecord::Insert {
720 table,
721 key,
722 value,
723 ts: _,
724 } => {
725 db.delta.insert(table, key, value)?;
726 }
727 crate::wal::WalRecord::Delete { table, key, ts: _ } => {
728 db.delta.delete(table, key)?;
729 }
730 crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
731 db.delta.insert_batch(table, rows.clone())?;
732 }
733 _ => {}
734 }
735 Ok(())
736 };
737 let recovered_count =
738 crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
739 if recovered_count > 0 {
740 info!("Recovered {} WAL records", recovered_count);
741 db.flush()?;
742 }
743
744 if !loaded_triggers.is_empty() {
746 db.trigger_executor
747 .write()
748 .unwrap()
749 .register_all(loaded_triggers);
750 }
751 if !loaded_procedures.is_empty() {
752 db.procedure_executor
753 .write()
754 .unwrap()
755 .register_all(loaded_procedures);
756 }
757 if !loaded_schedules.is_empty() {
758 let executor = db.schedule_executor.write().unwrap();
759 for (_, schedule) in loaded_schedules {
760 let _ = executor.register(schedule);
761 }
762 }
763
764 info!(
765 "Database opened successfully with durability {:?}",
766 durability
767 );
768
769 let db_arc = Arc::new(db);
770 let db_weak = Arc::downgrade(&db_arc);
771 db_arc
772 .schedule_executor
773 .write()
774 .unwrap()
775 .start_scheduler(db_weak)?;
776
777 let mv_reg = Arc::clone(&db_arc.mat_view_registry);
779 let db_mv_weak = Arc::downgrade(&db_arc);
780 std::thread::Builder::new()
781 .name("dbx-mv-refresh".into())
782 .spawn(move || {
783 loop {
784 let dirty = mv_reg.wait_and_take_dirty();
785 std::thread::sleep(mv_reg.min_refresh_interval());
786 let mut all_dirty = dirty;
787 all_dirty.extend(mv_reg.take_dirty());
788 for name in all_dirty {
789 if let Some(db) = db_mv_weak.upgrade() {
790 let _ = db.execute_sql(&format!("REFRESH MATERIALIZED VIEW {}", name));
791 } else {
792 return;
793 }
794 }
795 }
796 })
797 .ok();
798
799 Ok(db_arc)
800 }
801}
802
803impl Database {
804 pub fn open_with_config(
824 path: &std::path::Path,
825 config: DbConfig,
826 ) -> DbxResult<std::sync::Arc<Self>> {
827 use crate::engine::parallel_engine::{ParallelExecutionEngine, ParallelizationPolicy};
828 use crate::index::HashIndex;
829 use crate::sql::optimizer::QueryOptimizer;
830 use crate::sql::parser::SqlParser;
831 use crate::storage::native_wos::NativeWosBackend;
832 use crate::transaction::mvcc::manager::TransactionManager;
833 use dashmap::DashMap;
834 use std::collections::HashMap;
835 use std::sync::{Arc, RwLock};
836 use tracing::info;
837
838 info!("Opening database at {:?} with custom config", path);
839 let storage_manager = Arc::new(crate::storage::manager::StoragePathManager::new(path));
840 let wos_path = storage_manager.wos_dir()?;
841
842 let wal_path = storage_manager.wal_path();
843 let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
844
845 let wos_backend = Arc::new(NativeWosBackend::open_with_mode(
846 &wos_path,
847 config.dirty_buffer_mode,
848 )?);
849 let db_index = Arc::new(HashIndex::new());
850
851 let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
852 let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
853 let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
854 let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
855 let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
856
857 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
858 spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
859
860 let parallel_engine = Arc::new(
862 ParallelExecutionEngine::new_with_config(ParallelizationPolicy::Auto, config.clone())
863 .expect("Failed to create parallel engine"),
864 );
865
866 let mut db = Self {
867 delta: DeltaVariant::RowBased(Arc::new(crate::storage::delta_store::DeltaStore::new())),
868 memory_wos: crate::engine::WosVariant::InMemory(Arc::new(
869 crate::storage::memory_wos::InMemoryWosBackend::new(),
870 )),
871 file_wos: Some(crate::engine::WosVariant::Native(Arc::clone(&wos_backend))),
872 storage_manager: Arc::clone(&storage_manager),
873 table_persistence: DashMap::new(),
874 schemas: Arc::new(RwLock::new(HashMap::new())),
875 tables: RwLock::new(HashMap::new()),
876 table_schemas: Arc::new(RwLock::new(loaded_schemas)),
877 index: db_index,
878 row_counters: Arc::new(DashMap::new()),
879 sql_parser: SqlParser::new(),
880 sql_optimizer: QueryOptimizer::new(),
881 wal: Some(wal),
882 encrypted_wal: None,
883 encryption: RwLock::new(None),
884 tx_manager: Arc::new(TransactionManager::new()),
885 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
886 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
887 job_sender: Some(tx),
888 durability: DurabilityLevel::Lazy,
889 index_registry: RwLock::new(loaded_indexes),
890 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
891 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
892 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
893 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
894 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
895 parallel_engine,
896 view_registry: ViewRegistry::new(),
897 mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
898 partition_maps: Arc::new(RwLock::new(HashMap::new())),
899 partition_stats: Arc::new(DashMap::new()),
900 partition_compression: Arc::new(DashMap::new()),
901 partition_lifecycle: Arc::new(DashMap::new()),
902 partition_tier_hints: Arc::new(DashMap::new()),
903 partition_creation_times: Arc::new(DashMap::new()),
904 lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
905 lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
906 config: config.clone(),
907 workload_analyzer: Arc::new(RwLock::new(
908 crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
909 )),
910 replication_master: None,
911 sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
912 scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
913 Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
914 )),
915 metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
916 cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
917 metadata_registry: Arc::new(crate::storage::metadata::MetadataRegistry::new()),
918 };
919
920 db.sql_optimizer
921 .register_rule(Box::new(crate::sql::optimizer::TierPruningRule::new(
922 Arc::clone(&db.metadata_registry),
923 )));
924
925 let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
927 match record {
928 crate::wal::WalRecord::Insert {
929 table,
930 key,
931 value,
932 ts: _,
933 } => {
934 db.delta.insert(table, key, value)?;
935 }
936 crate::wal::WalRecord::Delete { table, key, ts: _ } => {
937 db.delta.delete(table, key)?;
938 }
939 crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
940 db.delta.insert_batch(table, rows.clone())?;
941 }
942 _ => {}
943 }
944 Ok(())
945 };
946 let recovered_count =
947 crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
948 if recovered_count > 0 {
949 info!("Recovered {} WAL records", recovered_count);
950 db.flush()?;
951 }
952
953 if !loaded_triggers.is_empty() {
955 db.trigger_executor
956 .write()
957 .unwrap()
958 .register_all(loaded_triggers);
959 }
960 if !loaded_procedures.is_empty() {
961 db.procedure_executor
962 .write()
963 .unwrap()
964 .register_all(loaded_procedures);
965 }
966 if !loaded_schedules.is_empty() {
967 let executor = db.schedule_executor.write().unwrap();
968 for (_, schedule) in loaded_schedules {
969 let _ = executor.register(schedule);
970 }
971 }
972
973 info!(
974 "Database opened with custom parallelism config (cpu_cap={:.0}%)",
975 config.parallelism.cpu_cap * 100.0
976 );
977
978 let db_arc = Arc::new(db);
979 let db_weak = Arc::downgrade(&db_arc);
980 db_arc
981 .schedule_executor
982 .write()
983 .unwrap()
984 .start_scheduler(db_weak)?;
985
986 Ok(db_arc)
987 }
988}