use crate::engine::types::BackgroundJob;
use crate::engine::{Database, DbConfig, DeltaVariant, DurabilityLevel, WosVariant};
use crate::error::DbxResult;
use crate::index::HashIndex;
use crate::sql::optimizer::QueryOptimizer;
use crate::sql::parser::SqlParser;
use crate::sql::view::{MaterializedViewRegistry, ViewRegistry};
use crate::storage::StorageBackend; use crate::storage::delta_store::DeltaStore;
use crate::storage::encryption::EncryptionConfig;
use crate::storage::encryption::wos::EncryptedWosBackend;
use crate::storage::memory_wos::InMemoryWosBackend;
use crate::storage::native_wos::NativeWosBackend;
use crate::transaction::mvcc::manager::TransactionManager; use dashmap::DashMap;
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, RwLock};
use tracing::{info, instrument};
fn spawn_background_worker(
rx: std::sync::mpsc::Receiver<BackgroundJob>,
wal: Option<Arc<crate::wal::WriteAheadLog>>,
enc_wal: Option<Arc<crate::wal::encrypted_wal::EncryptedWal>>,
index: Arc<HashIndex>,
) {
std::thread::spawn(move || {
while let Ok(job) = rx.recv() {
match job {
BackgroundJob::WalSync => {
if let Some(w) = &wal {
let _ = w.sync();
}
}
BackgroundJob::EncryptedWalSync => {
if let Some(w) = &enc_wal {
let _ = w.sync();
}
}
BackgroundJob::IndexUpdate {
table,
column,
key,
row_id,
} => {
let _ = index.update_on_insert(&table, &column, &key, row_id);
}
}
}
});
}
impl Database {
#[instrument(skip(path))]
pub fn open(path: &Path) -> DbxResult<Arc<Self>> {
info!("Opening database at {:?}", path);
let wos_path = path.join("wos");
std::fs::create_dir_all(&wos_path)?;
let wal_path = path.join("wal.log");
let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
let wos_backend = Arc::new(NativeWosBackend::open(&wos_path)?);
let db_index = Arc::new(HashIndex::new());
let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
info!(
"Loaded {} schemas, {} indexes, {} triggers, {} procedures, and {} schedules from persistent storage",
loaded_schemas.len(),
loaded_indexes.len(),
loaded_triggers.len(),
loaded_procedures.len(),
loaded_schedules.len()
);
let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
let db = Self {
delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
file_wos: Some(WosVariant::Native(Arc::clone(&wos_backend))),
table_persistence: DashMap::new(),
schemas: Arc::new(RwLock::new(HashMap::new())),
tables: RwLock::new(HashMap::new()),
table_schemas: Arc::new(RwLock::new(loaded_schemas)),
index: db_index,
row_counters: Arc::new(DashMap::new()),
sql_parser: SqlParser::new(),
sql_optimizer: QueryOptimizer::new(),
wal: Some(wal),
encrypted_wal: None,
encryption: RwLock::new(None),
tx_manager: Arc::new(TransactionManager::new()),
columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
job_sender: Some(tx),
durability: DurabilityLevel::Lazy,
index_registry: RwLock::new(loaded_indexes),
automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
parallel_engine: Arc::new(
crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
.expect("Failed to create parallel engine"),
),
view_registry: ViewRegistry::new(),
mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
partition_maps: Arc::new(RwLock::new(HashMap::new())),
partition_stats: Arc::new(DashMap::new()),
partition_compression: Arc::new(DashMap::new()),
partition_lifecycle: Arc::new(DashMap::new()),
partition_tier_hints: Arc::new(DashMap::new()),
partition_creation_times: Arc::new(DashMap::new()),
lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
config: DbConfig::default(),
workload_analyzer: Arc::new(RwLock::new(
crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
)),
replication_master: None,
sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
)),
metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
};
let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
match record {
crate::wal::WalRecord::Insert {
table,
key,
value,
ts: _,
} => {
db.delta.insert(table, key, value)?;
}
crate::wal::WalRecord::Delete { table, key, ts: _ } => {
db.delta.delete(table, key)?;
}
crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
db.delta.insert_batch(table, rows.clone())?;
}
_ => {}
}
Ok(())
};
let recovered_count =
crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
if recovered_count > 0 {
info!("Recovered {} WAL records", recovered_count);
info!("Flushing recovered WAL data to WOS");
db.flush()?;
}
if !loaded_triggers.is_empty() {
info!(
"Auto-registering {} persisted triggers",
loaded_triggers.len()
);
let mut executor = db.trigger_executor.write().unwrap();
executor.register_all(loaded_triggers);
}
if !loaded_procedures.is_empty() {
info!(
"Auto-registering {} persisted procedures",
loaded_procedures.len()
);
let mut executor = db.procedure_executor.write().unwrap();
executor.register_all(loaded_procedures);
}
if !loaded_schedules.is_empty() {
info!(
"Auto-registering {} persisted schedules",
loaded_schedules.len()
);
let executor = db.schedule_executor.write().unwrap();
for (_, schedule) in loaded_schedules {
let _ = executor.register(schedule);
}
}
info!("Database opened successfully");
let db_arc = Arc::new(db);
let db_weak = Arc::downgrade(&db_arc);
db_arc
.schedule_executor
.write()
.unwrap()
.start_scheduler(db_weak)?;
let mv_reg = Arc::clone(&db_arc.mat_view_registry);
let db_mv_weak = Arc::downgrade(&db_arc);
std::thread::Builder::new()
.name("dbx-mv-refresh".into())
.spawn(move || {
loop {
let dirty = mv_reg.wait_and_take_dirty();
std::thread::sleep(mv_reg.min_refresh_interval());
let mut all_dirty = dirty;
all_dirty.extend(mv_reg.take_dirty());
for name in all_dirty {
if let Some(db) = db_mv_weak.upgrade() {
let _ = db.execute_sql(&format!("REFRESH MATERIALIZED VIEW {}", name));
} else {
return;
}
}
}
})
.ok();
Ok(db_arc)
}
#[instrument(skip(path, encryption))]
pub fn open_encrypted(path: &Path, encryption: EncryptionConfig) -> DbxResult<Self> {
info!("Opening encrypted database at {:?}", path);
let wos_path = path.join("wos");
std::fs::create_dir_all(&wos_path)?;
let wal_path = path.join("wal.enc.log");
let encrypted_wal = Arc::new(crate::wal::encrypted_wal::EncryptedWal::open(
&wal_path,
encryption.clone(),
)?);
let enc_wos = Arc::new(EncryptedWosBackend::open(&wos_path, encryption.clone())?);
let db_index = Arc::new(HashIndex::new());
let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
spawn_background_worker(
rx,
None,
Some(Arc::clone(&encrypted_wal)),
Arc::clone(&db_index),
);
let db = Self {
delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
file_wos: Some(WosVariant::Encrypted(Arc::clone(&enc_wos))),
table_persistence: DashMap::new(),
schemas: Arc::new(RwLock::new(HashMap::new())),
tables: RwLock::new(HashMap::new()),
table_schemas: Arc::new(RwLock::new(HashMap::new())),
index: db_index,
row_counters: Arc::new(DashMap::new()),
sql_parser: SqlParser::new(),
sql_optimizer: QueryOptimizer::new(),
wal: None,
encrypted_wal: Some(Arc::clone(&encrypted_wal)),
encryption: RwLock::new(Some(encryption)),
tx_manager: Arc::new(TransactionManager::new()),
columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
job_sender: Some(tx),
durability: DurabilityLevel::Lazy,
index_registry: RwLock::new(HashMap::new()),
automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
parallel_engine: Arc::new(
crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
.expect("Failed to create parallel engine"),
),
view_registry: ViewRegistry::new(),
mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
partition_maps: Arc::new(RwLock::new(HashMap::new())),
partition_stats: Arc::new(DashMap::new()),
partition_compression: Arc::new(DashMap::new()),
partition_lifecycle: Arc::new(DashMap::new()),
partition_tier_hints: Arc::new(DashMap::new()),
partition_creation_times: Arc::new(DashMap::new()),
lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
config: DbConfig::default(),
workload_analyzer: Arc::new(RwLock::new(
crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
)),
replication_master: None,
sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
)),
metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
};
let records = encrypted_wal.replay()?;
let mut recovered_count = 0;
for record in &records {
match record {
crate::wal::WalRecord::Insert {
table,
key,
value,
ts: _,
} => {
db.delta.insert(table, key, value)?;
recovered_count += 1;
}
crate::wal::WalRecord::Delete { table, key, ts: _ } => {
db.delta.delete(table, key)?;
recovered_count += 1;
}
crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
db.delta.insert_batch(table, rows.clone())?;
recovered_count += rows.len();
}
_ => {}
}
}
if recovered_count > 0 {
info!("Recovered {} encrypted WAL records", recovered_count);
}
info!("Encrypted database opened successfully");
Ok(db)
}
#[instrument]
pub fn open_in_memory() -> DbxResult<Self> {
info!("Creating in-memory database");
let db_index = Arc::new(HashIndex::new());
let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
spawn_background_worker(rx, None, None, Arc::clone(&db_index));
Ok(Self {
delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
file_wos: None,
table_persistence: DashMap::new(),
schemas: Arc::new(RwLock::new(HashMap::new())),
tables: RwLock::new(HashMap::new()),
table_schemas: Arc::new(RwLock::new(HashMap::new())),
index: db_index,
row_counters: Arc::new(DashMap::new()),
sql_parser: SqlParser::new(),
sql_optimizer: QueryOptimizer::new(),
wal: None,
encrypted_wal: None,
encryption: RwLock::new(None),
tx_manager: Arc::new(TransactionManager::new()),
columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
job_sender: Some(tx),
durability: DurabilityLevel::Lazy,
index_registry: RwLock::new(HashMap::new()),
automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
parallel_engine: Arc::new(
crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
.expect("Failed to create parallel engine"),
),
view_registry: ViewRegistry::new(),
mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
partition_maps: Arc::new(RwLock::new(HashMap::new())),
partition_stats: Arc::new(DashMap::new()),
partition_compression: Arc::new(DashMap::new()),
partition_lifecycle: Arc::new(DashMap::new()),
partition_tier_hints: Arc::new(DashMap::new()),
partition_creation_times: Arc::new(DashMap::new()),
lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
config: DbConfig::default(),
workload_analyzer: Arc::new(RwLock::new(
crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
)),
replication_master: None,
sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
)),
metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
})
}
pub fn open_in_memory_encrypted(encryption: EncryptionConfig) -> DbxResult<Self> {
let db_index = Arc::new(HashIndex::new());
let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
spawn_background_worker(rx, None, None, Arc::clone(&db_index));
Ok(Self {
delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
memory_wos: WosVariant::Encrypted(Arc::new(EncryptedWosBackend::open_temporary(
encryption.clone(),
)?)),
file_wos: None,
table_persistence: DashMap::new(),
schemas: Arc::new(RwLock::new(HashMap::new())),
tables: RwLock::new(HashMap::new()),
table_schemas: Arc::new(RwLock::new(HashMap::new())),
index: db_index,
row_counters: Arc::new(DashMap::new()),
sql_parser: SqlParser::new(),
sql_optimizer: QueryOptimizer::new(),
wal: None,
encrypted_wal: None,
encryption: RwLock::new(Some(encryption)),
tx_manager: Arc::new(TransactionManager::new()),
columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
job_sender: Some(tx),
durability: DurabilityLevel::Lazy,
index_registry: RwLock::new(HashMap::new()),
automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
parallel_engine: Arc::new(
crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
.expect("Failed to create parallel engine"),
),
view_registry: ViewRegistry::new(),
mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
partition_maps: Arc::new(RwLock::new(HashMap::new())),
partition_stats: Arc::new(DashMap::new()),
partition_compression: Arc::new(DashMap::new()),
partition_lifecycle: Arc::new(DashMap::new()),
partition_tier_hints: Arc::new(DashMap::new()),
partition_creation_times: Arc::new(DashMap::new()),
lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
config: DbConfig::default(),
workload_analyzer: Arc::new(RwLock::new(
crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
)),
replication_master: None,
sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
)),
metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
})
}
pub fn open_safe(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
Self::open_with_durability(path, DurabilityLevel::Full)
}
pub fn open_fast(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
Self::open_with_durability(path, DurabilityLevel::None)
}
pub fn open_with_durability(
path: impl AsRef<Path>,
durability: DurabilityLevel,
) -> DbxResult<Arc<Self>> {
info!(
"Opening database at {:?} with durability {:?}",
path.as_ref(),
durability
);
let path = path.as_ref();
let wos_path = path.join("wos");
std::fs::create_dir_all(&wos_path)?;
let wal_path = path.join("wal.log");
let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
let wos_backend = Arc::new(NativeWosBackend::open(&wos_path)?);
let db_index = Arc::new(HashIndex::new());
let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
let db = Self {
delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
file_wos: Some(WosVariant::Native(Arc::clone(&wos_backend))),
table_persistence: DashMap::new(),
schemas: Arc::new(RwLock::new(HashMap::new())),
tables: RwLock::new(HashMap::new()),
table_schemas: Arc::new(RwLock::new(loaded_schemas)),
index: db_index,
row_counters: Arc::new(DashMap::new()),
sql_parser: SqlParser::new(),
sql_optimizer: QueryOptimizer::new(),
wal: Some(wal),
encrypted_wal: None,
encryption: RwLock::new(None),
tx_manager: Arc::new(TransactionManager::new()),
columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
job_sender: Some(tx),
durability, index_registry: RwLock::new(loaded_indexes),
automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
parallel_engine: Arc::new(
crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
.expect("Failed to create parallel engine"),
),
view_registry: ViewRegistry::new(),
mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
partition_maps: Arc::new(RwLock::new(HashMap::new())),
partition_stats: Arc::new(DashMap::new()),
partition_compression: Arc::new(DashMap::new()),
partition_lifecycle: Arc::new(DashMap::new()),
partition_tier_hints: Arc::new(DashMap::new()),
partition_creation_times: Arc::new(DashMap::new()),
lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
config: DbConfig::default(),
workload_analyzer: Arc::new(RwLock::new(
crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
)),
replication_master: None,
sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
)),
metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
};
let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
match record {
crate::wal::WalRecord::Insert {
table,
key,
value,
ts: _,
} => {
db.delta.insert(table, key, value)?;
}
crate::wal::WalRecord::Delete { table, key, ts: _ } => {
db.delta.delete(table, key)?;
}
crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
db.delta.insert_batch(table, rows.clone())?;
}
_ => {}
}
Ok(())
};
let recovered_count =
crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
if recovered_count > 0 {
info!("Recovered {} WAL records", recovered_count);
db.flush()?;
}
if !loaded_triggers.is_empty() {
db.trigger_executor
.write()
.unwrap()
.register_all(loaded_triggers);
}
if !loaded_procedures.is_empty() {
db.procedure_executor
.write()
.unwrap()
.register_all(loaded_procedures);
}
if !loaded_schedules.is_empty() {
let executor = db.schedule_executor.write().unwrap();
for (_, schedule) in loaded_schedules {
let _ = executor.register(schedule);
}
}
info!(
"Database opened successfully with durability {:?}",
durability
);
let db_arc = Arc::new(db);
let db_weak = Arc::downgrade(&db_arc);
db_arc
.schedule_executor
.write()
.unwrap()
.start_scheduler(db_weak)?;
let mv_reg = Arc::clone(&db_arc.mat_view_registry);
let db_mv_weak = Arc::downgrade(&db_arc);
std::thread::Builder::new()
.name("dbx-mv-refresh".into())
.spawn(move || {
loop {
let dirty = mv_reg.wait_and_take_dirty();
std::thread::sleep(mv_reg.min_refresh_interval());
let mut all_dirty = dirty;
all_dirty.extend(mv_reg.take_dirty());
for name in all_dirty {
if let Some(db) = db_mv_weak.upgrade() {
let _ = db.execute_sql(&format!("REFRESH MATERIALIZED VIEW {}", name));
} else {
return;
}
}
}
})
.ok();
Ok(db_arc)
}
}
impl Database {
pub fn open_with_config(
path: &std::path::Path,
config: DbConfig,
) -> DbxResult<std::sync::Arc<Self>> {
use crate::engine::parallel_engine::{ParallelExecutionEngine, ParallelizationPolicy};
use crate::index::HashIndex;
use crate::sql::optimizer::QueryOptimizer;
use crate::sql::parser::SqlParser;
use crate::storage::native_wos::NativeWosBackend;
use crate::transaction::mvcc::manager::TransactionManager;
use dashmap::DashMap;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tracing::info;
info!("Opening database at {:?} with custom config", path);
let wos_path = path.join("wos");
std::fs::create_dir_all(&wos_path)?;
let wal_path = path.join("wal.log");
let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
let wos_backend = Arc::new(NativeWosBackend::open_with_mode(
&wos_path,
config.dirty_buffer_mode,
)?);
let db_index = Arc::new(HashIndex::new());
let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
let parallel_engine = Arc::new(
ParallelExecutionEngine::new_with_config(ParallelizationPolicy::Auto, config.clone())
.expect("Failed to create parallel engine"),
);
let db = Self {
delta: DeltaVariant::RowBased(Arc::new(crate::storage::delta_store::DeltaStore::new())),
memory_wos: crate::engine::WosVariant::InMemory(Arc::new(
crate::storage::memory_wos::InMemoryWosBackend::new(),
)),
file_wos: Some(crate::engine::WosVariant::Native(Arc::clone(&wos_backend))),
table_persistence: DashMap::new(),
schemas: Arc::new(RwLock::new(HashMap::new())),
tables: RwLock::new(HashMap::new()),
table_schemas: Arc::new(RwLock::new(loaded_schemas)),
index: db_index,
row_counters: Arc::new(DashMap::new()),
sql_parser: SqlParser::new(),
sql_optimizer: QueryOptimizer::new(),
wal: Some(wal),
encrypted_wal: None,
encryption: RwLock::new(None),
tx_manager: Arc::new(TransactionManager::new()),
columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
job_sender: Some(tx),
durability: DurabilityLevel::Lazy,
index_registry: RwLock::new(loaded_indexes),
automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
parallel_engine,
view_registry: ViewRegistry::new(),
mat_view_registry: Arc::new(MaterializedViewRegistry::new()),
partition_maps: Arc::new(RwLock::new(HashMap::new())),
partition_stats: Arc::new(DashMap::new()),
partition_compression: Arc::new(DashMap::new()),
partition_lifecycle: Arc::new(DashMap::new()),
partition_tier_hints: Arc::new(DashMap::new()),
partition_creation_times: Arc::new(DashMap::new()),
lifecycle_stop_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
lifecycle_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
config: config.clone(),
workload_analyzer: Arc::new(RwLock::new(
crate::engine::workload_analyzer::WorkloadAnalyzer::default_window(),
)),
replication_master: None,
sharding_router: Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
scatter_gather: Arc::new(crate::sharding::scatter_gather::ScatterGather::new(
Arc::new(crate::sharding::router::ShardRouter::new_local(4)),
)),
metrics: Arc::new(crate::monitoring::DbxMetrics::new()),
cas_locks: Arc::new(crate::engine::database::RowLockManager::new(10)),
};
let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
match record {
crate::wal::WalRecord::Insert {
table,
key,
value,
ts: _,
} => {
db.delta.insert(table, key, value)?;
}
crate::wal::WalRecord::Delete { table, key, ts: _ } => {
db.delta.delete(table, key)?;
}
crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
db.delta.insert_batch(table, rows.clone())?;
}
_ => {}
}
Ok(())
};
let recovered_count =
crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
if recovered_count > 0 {
info!("Recovered {} WAL records", recovered_count);
db.flush()?;
}
if !loaded_triggers.is_empty() {
db.trigger_executor
.write()
.unwrap()
.register_all(loaded_triggers);
}
if !loaded_procedures.is_empty() {
db.procedure_executor
.write()
.unwrap()
.register_all(loaded_procedures);
}
if !loaded_schedules.is_empty() {
let executor = db.schedule_executor.write().unwrap();
for (_, schedule) in loaded_schedules {
let _ = executor.register(schedule);
}
}
info!(
"Database opened with custom parallelism config (cpu_cap={:.0}%)",
config.parallelism.cpu_cap * 100.0
);
let db_arc = Arc::new(db);
let db_weak = Arc::downgrade(&db_arc);
db_arc
.schedule_executor
.write()
.unwrap()
.start_scheduler(db_weak)?;
Ok(db_arc)
}
}