use crate::engine::types::BackgroundJob;
use crate::engine::{Database, DeltaVariant, DurabilityLevel, WosVariant};
use crate::error::DbxResult;
use crate::index::HashIndex;
use crate::sql::optimizer::QueryOptimizer;
use crate::sql::parser::SqlParser;
use crate::storage::StorageBackend; use crate::storage::delta_store::DeltaStore;
use crate::storage::encryption::EncryptionConfig;
use crate::storage::encryption::wos::EncryptedWosBackend;
use crate::storage::memory_wos::InMemoryWosBackend;
use crate::storage::wos::WosBackend;
use crate::transaction::mvcc::manager::TransactionManager; use dashmap::DashMap;
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, RwLock};
use tracing::{info, instrument};
fn spawn_background_worker(
rx: std::sync::mpsc::Receiver<BackgroundJob>,
wal: Option<Arc<crate::wal::WriteAheadLog>>,
enc_wal: Option<Arc<crate::wal::encrypted_wal::EncryptedWal>>,
index: Arc<HashIndex>,
) {
std::thread::spawn(move || {
while let Ok(job) = rx.recv() {
match job {
BackgroundJob::WalSync => {
if let Some(w) = &wal {
let _ = w.sync();
}
}
BackgroundJob::EncryptedWalSync => {
if let Some(w) = &enc_wal {
let _ = w.sync();
}
}
BackgroundJob::IndexUpdate {
table,
column,
key,
row_id,
} => {
let _ = index.update_on_insert(&table, &column, &key, row_id);
}
}
}
});
}
impl Database {
#[instrument(skip(path))]
pub fn open(path: &Path) -> DbxResult<Arc<Self>> {
info!("Opening database at {:?}", path);
let wos_path = path.join("wos");
std::fs::create_dir_all(&wos_path)?;
let wal_path = path.join("wal.log");
let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
let wos_backend = Arc::new(WosBackend::open(&wos_path)?);
let db_index = Arc::new(HashIndex::new());
let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
info!(
"Loaded {} schemas, {} indexes, {} triggers, {} procedures, and {} schedules from persistent storage",
loaded_schemas.len(),
loaded_indexes.len(),
loaded_triggers.len(),
loaded_procedures.len(),
loaded_schedules.len()
);
let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
let db = Self {
delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
file_wos: Some(WosVariant::Plain(Arc::clone(&wos_backend))),
table_persistence: DashMap::new(),
schemas: Arc::new(RwLock::new(HashMap::new())),
tables: RwLock::new(HashMap::new()),
table_schemas: Arc::new(RwLock::new(loaded_schemas)),
index: db_index,
row_counters: Arc::new(DashMap::new()),
sql_parser: SqlParser::new(),
sql_optimizer: QueryOptimizer::new(),
wal: Some(wal),
encrypted_wal: None,
encryption: RwLock::new(None),
tx_manager: Arc::new(TransactionManager::new()),
columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
job_sender: Some(tx),
durability: DurabilityLevel::Lazy,
index_registry: RwLock::new(loaded_indexes),
automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
parallel_engine: Arc::new(
crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
.expect("Failed to create parallel engine"),
),
};
let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
match record {
crate::wal::WalRecord::Insert {
table,
key,
value,
ts: _,
} => {
db.delta.insert(table, key, value)?;
}
crate::wal::WalRecord::Delete { table, key, ts: _ } => {
db.delta.delete(table, key)?;
}
crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
db.delta.insert_batch(table, rows.clone())?;
}
_ => {}
}
Ok(())
};
let recovered_count =
crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
if recovered_count > 0 {
info!("Recovered {} WAL records", recovered_count);
info!("Flushing recovered WAL data to WOS");
db.flush()?;
}
if !loaded_triggers.is_empty() {
info!(
"Auto-registering {} persisted triggers",
loaded_triggers.len()
);
let mut executor = db.trigger_executor.write().unwrap();
executor.register_all(loaded_triggers);
}
if !loaded_procedures.is_empty() {
info!(
"Auto-registering {} persisted procedures",
loaded_procedures.len()
);
let mut executor = db.procedure_executor.write().unwrap();
executor.register_all(loaded_procedures);
}
if !loaded_schedules.is_empty() {
info!(
"Auto-registering {} persisted schedules",
loaded_schedules.len()
);
let executor = db.schedule_executor.write().unwrap();
for (_, schedule) in loaded_schedules {
let _ = executor.register(schedule);
}
}
info!("Database opened successfully");
let db_arc = Arc::new(db);
let db_weak = Arc::downgrade(&db_arc);
db_arc
.schedule_executor
.write()
.unwrap()
.start_scheduler(db_weak)?;
Ok(db_arc)
}
#[instrument(skip(path, encryption))]
pub fn open_encrypted(path: &Path, encryption: EncryptionConfig) -> DbxResult<Self> {
info!("Opening encrypted database at {:?}", path);
let wos_path = path.join("wos");
std::fs::create_dir_all(&wos_path)?;
let wal_path = path.join("wal.enc.log");
let encrypted_wal = Arc::new(crate::wal::encrypted_wal::EncryptedWal::open(
&wal_path,
encryption.clone(),
)?);
let enc_wos = Arc::new(EncryptedWosBackend::open(&wos_path, encryption.clone())?);
let db_index = Arc::new(HashIndex::new());
let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
spawn_background_worker(
rx,
None,
Some(Arc::clone(&encrypted_wal)),
Arc::clone(&db_index),
);
let db = Self {
delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
file_wos: Some(WosVariant::Encrypted(Arc::clone(&enc_wos))),
table_persistence: DashMap::new(),
schemas: Arc::new(RwLock::new(HashMap::new())),
tables: RwLock::new(HashMap::new()),
table_schemas: Arc::new(RwLock::new(HashMap::new())),
index: db_index,
row_counters: Arc::new(DashMap::new()),
sql_parser: SqlParser::new(),
sql_optimizer: QueryOptimizer::new(),
wal: None,
encrypted_wal: Some(Arc::clone(&encrypted_wal)),
encryption: RwLock::new(Some(encryption)),
tx_manager: Arc::new(TransactionManager::new()),
columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
job_sender: Some(tx),
durability: DurabilityLevel::Lazy,
index_registry: RwLock::new(HashMap::new()),
automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
parallel_engine: Arc::new(
crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
.expect("Failed to create parallel engine"),
),
};
let records = encrypted_wal.replay()?;
let mut recovered_count = 0;
for record in &records {
match record {
crate::wal::WalRecord::Insert {
table,
key,
value,
ts: _,
} => {
db.delta.insert(table, key, value)?;
recovered_count += 1;
}
crate::wal::WalRecord::Delete { table, key, ts: _ } => {
db.delta.delete(table, key)?;
recovered_count += 1;
}
crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
db.delta.insert_batch(table, rows.clone())?;
recovered_count += rows.len();
}
_ => {}
}
}
if recovered_count > 0 {
info!("Recovered {} encrypted WAL records", recovered_count);
}
info!("Encrypted database opened successfully");
Ok(db)
}
#[instrument]
pub fn open_in_memory() -> DbxResult<Self> {
info!("Creating in-memory database");
let db_index = Arc::new(HashIndex::new());
let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
spawn_background_worker(rx, None, None, Arc::clone(&db_index));
Ok(Self {
delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
file_wos: None,
table_persistence: DashMap::new(),
schemas: Arc::new(RwLock::new(HashMap::new())),
tables: RwLock::new(HashMap::new()),
table_schemas: Arc::new(RwLock::new(HashMap::new())),
index: db_index,
row_counters: Arc::new(DashMap::new()),
sql_parser: SqlParser::new(),
sql_optimizer: QueryOptimizer::new(),
wal: None,
encrypted_wal: None,
encryption: RwLock::new(None),
tx_manager: Arc::new(TransactionManager::new()),
columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
job_sender: Some(tx),
durability: DurabilityLevel::Lazy,
index_registry: RwLock::new(HashMap::new()),
automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
parallel_engine: Arc::new(
crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
.expect("Failed to create parallel engine"),
),
})
}
pub fn open_in_memory_encrypted(encryption: EncryptionConfig) -> DbxResult<Self> {
let db_index = Arc::new(HashIndex::new());
let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
spawn_background_worker(rx, None, None, Arc::clone(&db_index));
Ok(Self {
delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
memory_wos: WosVariant::Encrypted(Arc::new(EncryptedWosBackend::open_temporary(
encryption.clone(),
)?)),
file_wos: None,
table_persistence: DashMap::new(),
schemas: Arc::new(RwLock::new(HashMap::new())),
tables: RwLock::new(HashMap::new()),
table_schemas: Arc::new(RwLock::new(HashMap::new())),
index: db_index,
row_counters: Arc::new(DashMap::new()),
sql_parser: SqlParser::new(),
sql_optimizer: QueryOptimizer::new(),
wal: None,
encrypted_wal: None,
encryption: RwLock::new(Some(encryption)),
tx_manager: Arc::new(TransactionManager::new()),
columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
job_sender: Some(tx),
durability: DurabilityLevel::Lazy,
index_registry: RwLock::new(HashMap::new()),
automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
parallel_engine: Arc::new(
crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
.expect("Failed to create parallel engine"),
),
})
}
pub fn open_safe(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
Self::open_with_durability(path, DurabilityLevel::Full)
}
pub fn open_fast(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
Self::open_with_durability(path, DurabilityLevel::None)
}
pub fn open_with_durability(
path: impl AsRef<Path>,
durability: DurabilityLevel,
) -> DbxResult<Arc<Self>> {
info!(
"Opening database at {:?} with durability {:?}",
path.as_ref(),
durability
);
let path = path.as_ref();
let wos_path = path.join("wos");
std::fs::create_dir_all(&wos_path)?;
let wal_path = path.join("wal.log");
let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
let wos_backend = Arc::new(WosBackend::open(&wos_path)?);
let db_index = Arc::new(HashIndex::new());
let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
let db = Self {
delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
memory_wos: WosVariant::InMemory(Arc::new(InMemoryWosBackend::new())),
file_wos: Some(WosVariant::Plain(Arc::clone(&wos_backend))),
table_persistence: DashMap::new(),
schemas: Arc::new(RwLock::new(HashMap::new())),
tables: RwLock::new(HashMap::new()),
table_schemas: Arc::new(RwLock::new(loaded_schemas)),
index: db_index,
row_counters: Arc::new(DashMap::new()),
sql_parser: SqlParser::new(),
sql_optimizer: QueryOptimizer::new(),
wal: Some(wal),
encrypted_wal: None,
encryption: RwLock::new(None),
tx_manager: Arc::new(TransactionManager::new()),
columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
job_sender: Some(tx),
durability, index_registry: RwLock::new(loaded_indexes),
automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
parallel_engine: Arc::new(
crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
.expect("Failed to create parallel engine"),
),
};
let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
match record {
crate::wal::WalRecord::Insert {
table,
key,
value,
ts: _,
} => {
db.delta.insert(table, key, value)?;
}
crate::wal::WalRecord::Delete { table, key, ts: _ } => {
db.delta.delete(table, key)?;
}
crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
db.delta.insert_batch(table, rows.clone())?;
}
_ => {}
}
Ok(())
};
let recovered_count =
crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
if recovered_count > 0 {
info!("Recovered {} WAL records", recovered_count);
db.flush()?;
}
if !loaded_triggers.is_empty() {
db.trigger_executor
.write()
.unwrap()
.register_all(loaded_triggers);
}
if !loaded_procedures.is_empty() {
db.procedure_executor
.write()
.unwrap()
.register_all(loaded_procedures);
}
if !loaded_schedules.is_empty() {
let executor = db.schedule_executor.write().unwrap();
for (_, schedule) in loaded_schedules {
let _ = executor.register(schedule);
}
}
info!(
"Database opened successfully with durability {:?}",
durability
);
let db_arc = Arc::new(db);
let db_weak = Arc::downgrade(&db_arc);
db_arc
.schedule_executor
.write()
.unwrap()
.start_scheduler(db_weak)?;
Ok(db_arc)
}
}