1use crate::engine::types::BackgroundJob;
4use crate::engine::{Database, DeltaVariant, DurabilityLevel, WosVariant};
5use crate::error::DbxResult;
6use crate::index::HashIndex;
7use crate::sql::optimizer::QueryOptimizer;
8use crate::sql::parser::SqlParser;
9use crate::storage::StorageBackend; use crate::storage::delta_store::DeltaStore;
11use crate::storage::encryption::EncryptionConfig;
12use crate::storage::encryption::wos::EncryptedWosBackend;
13use crate::storage::wos::WosBackend;
14use crate::transaction::mvcc::manager::TransactionManager; use dashmap::DashMap;
16use std::collections::HashMap;
17use std::path::Path;
18use std::sync::{Arc, RwLock};
19use tracing::{info, instrument};
20
21fn spawn_background_worker(
23 rx: std::sync::mpsc::Receiver<BackgroundJob>,
24 wal: Option<Arc<crate::wal::WriteAheadLog>>,
25 enc_wal: Option<Arc<crate::wal::encrypted_wal::EncryptedWal>>,
26 index: Arc<HashIndex>,
27) {
28 std::thread::spawn(move || {
29 while let Ok(job) = rx.recv() {
30 match job {
31 BackgroundJob::WalSync => {
32 if let Some(w) = &wal {
33 let _ = w.sync();
34 }
35 }
36 BackgroundJob::EncryptedWalSync => {
37 if let Some(w) = &enc_wal {
38 let _ = w.sync();
39 }
40 }
41 BackgroundJob::IndexUpdate {
42 table,
43 column,
44 key,
45 row_id,
46 } => {
47 let _ = index.update_on_insert(&table, &column, &key, row_id);
48 }
49 }
50 }
51 });
52}
53
54impl Database {
55 #[instrument(skip(path))]
76 pub fn open(path: &Path) -> DbxResult<Arc<Self>> {
77 info!("Opening database at {:?}", path);
78 let wos_path = path.join("wos");
79 std::fs::create_dir_all(&wos_path)?;
80
81 let wal_path = path.join("wal.log");
83 let wal = Arc::new(crate::wal::WriteAheadLog::open(&wal_path)?);
84
85 let wos_backend = Arc::new(WosBackend::open(&wos_path)?);
86 let db_index = Arc::new(HashIndex::new());
87
88 let loaded_schemas = crate::engine::metadata::load_all_schemas(&wos_backend)?;
90 let loaded_indexes = crate::engine::metadata::load_all_indexes(&wos_backend)?;
91 let loaded_triggers = crate::engine::metadata::load_all_triggers(&wos_backend)?;
92 let loaded_procedures = crate::engine::metadata::load_all_procedures(&wos_backend)?;
93 let loaded_schedules = crate::engine::metadata::load_all_schedules(&wos_backend)?;
94
95 info!(
96 "Loaded {} schemas, {} indexes, {} triggers, {} procedures, and {} schedules from persistent storage",
97 loaded_schemas.len(),
98 loaded_indexes.len(),
99 loaded_triggers.len(),
100 loaded_procedures.len(),
101 loaded_schedules.len()
102 );
103
104 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
105 spawn_background_worker(rx, Some(wal.clone()), None, Arc::clone(&db_index));
106
107 let db = Self {
108 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
109 wos: WosVariant::Plain(Arc::clone(&wos_backend)),
110 schemas: Arc::new(RwLock::new(HashMap::new())),
111 tables: RwLock::new(HashMap::new()),
112 table_schemas: Arc::new(RwLock::new(loaded_schemas)),
113 index: db_index,
114 row_counters: Arc::new(DashMap::new()),
115 sql_parser: SqlParser::new(),
116 sql_optimizer: QueryOptimizer::new(),
117 wal: Some(wal),
118 encrypted_wal: None,
119
120 encryption: RwLock::new(None),
121 tx_manager: Arc::new(TransactionManager::new()),
122 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
123 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
124 job_sender: Some(tx),
125 durability: DurabilityLevel::Lazy,
126 index_registry: RwLock::new(loaded_indexes),
127 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
128 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
129 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
130 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
131 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
132 parallel_engine: Arc::new(
133 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
134 .expect("Failed to create parallel engine"),
135 ),
136 };
137
138 let apply_fn = |record: &crate::wal::WalRecord| -> DbxResult<()> {
140 match record {
141 crate::wal::WalRecord::Insert {
142 table,
143 key,
144 value,
145 ts: _,
146 } => {
147 db.delta.insert(table, key, value)?;
148 }
149 crate::wal::WalRecord::Delete { table, key, ts: _ } => {
150 db.delta.delete(table, key)?;
151 }
152 crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
153 db.delta.insert_batch(table, rows.clone())?;
154 }
155 _ => {}
156 }
157 Ok(())
158 };
159
160 let recovered_count =
161 crate::wal::checkpoint::CheckpointManager::recover(&wal_path, apply_fn)?;
162 if recovered_count > 0 {
163 info!("Recovered {} WAL records", recovered_count);
164 info!("Flushing recovered WAL data to WOS");
166 db.flush()?;
167 }
168
169 if !loaded_triggers.is_empty() {
171 info!(
172 "Auto-registering {} persisted triggers",
173 loaded_triggers.len()
174 );
175 let mut executor = db.trigger_executor.write().unwrap();
176 executor.register_all(loaded_triggers);
177 }
178
179 if !loaded_procedures.is_empty() {
181 info!(
182 "Auto-registering {} persisted procedures",
183 loaded_procedures.len()
184 );
185 let mut executor = db.procedure_executor.write().unwrap();
186 executor.register_all(loaded_procedures);
187 }
188
189 if !loaded_schedules.is_empty() {
191 info!(
192 "Auto-registering {} persisted schedules",
193 loaded_schedules.len()
194 );
195 let executor = db.schedule_executor.write().unwrap();
196 for (_, schedule) in loaded_schedules {
197 let _ = executor.register(schedule);
198 }
199 }
200
201 info!("Database opened successfully");
202
203 let db_arc = Arc::new(db);
205
206 let db_weak = Arc::downgrade(&db_arc);
208 db_arc
209 .schedule_executor
210 .write()
211 .unwrap()
212 .start_scheduler(db_weak)?;
213
214 Ok(db_arc)
215 }
216
217 #[instrument(skip(path, encryption))]
238 pub fn open_encrypted(path: &Path, encryption: EncryptionConfig) -> DbxResult<Self> {
239 info!("Opening encrypted database at {:?}", path);
240 let wos_path = path.join("wos");
241 std::fs::create_dir_all(&wos_path)?;
242
243 let wal_path = path.join("wal.enc.log");
245 let encrypted_wal = Arc::new(crate::wal::encrypted_wal::EncryptedWal::open(
246 &wal_path,
247 encryption.clone(),
248 )?);
249
250 let enc_wos = Arc::new(EncryptedWosBackend::open(&wos_path, encryption.clone())?);
252 let db_index = Arc::new(HashIndex::new());
253
254 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
255 spawn_background_worker(
256 rx,
257 None,
258 Some(Arc::clone(&encrypted_wal)),
259 Arc::clone(&db_index),
260 );
261
262 let db = Self {
263 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
264 wos: WosVariant::Encrypted(Arc::clone(&enc_wos)),
265 schemas: Arc::new(RwLock::new(HashMap::new())),
266 tables: RwLock::new(HashMap::new()),
267 table_schemas: Arc::new(RwLock::new(HashMap::new())),
268 index: db_index,
269 row_counters: Arc::new(DashMap::new()),
270 sql_parser: SqlParser::new(),
271 sql_optimizer: QueryOptimizer::new(),
272 wal: None,
273 encrypted_wal: Some(Arc::clone(&encrypted_wal)),
274
275 encryption: RwLock::new(Some(encryption)),
276 tx_manager: Arc::new(TransactionManager::new()),
277 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
278 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
279 job_sender: Some(tx),
280 durability: DurabilityLevel::Lazy,
281 index_registry: RwLock::new(HashMap::new()),
282 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
283 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
284 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
285 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
286 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
287 parallel_engine: Arc::new(
288 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
289 .expect("Failed to create parallel engine"),
290 ),
291 };
292
293 let records = encrypted_wal.replay()?;
295 let mut recovered_count = 0;
296 for record in &records {
297 match record {
298 crate::wal::WalRecord::Insert {
299 table,
300 key,
301 value,
302 ts: _,
303 } => {
304 db.delta.insert(table, key, value)?;
305 recovered_count += 1;
306 }
307 crate::wal::WalRecord::Delete { table, key, ts: _ } => {
308 db.delta.delete(table, key)?;
309 recovered_count += 1;
310 }
311 crate::wal::WalRecord::Batch { table, rows, ts: _ } => {
312 db.delta.insert_batch(table, rows.clone())?;
313 recovered_count += rows.len();
314 }
315 _ => {}
316 }
317 }
318 if recovered_count > 0 {
319 info!("Recovered {} encrypted WAL records", recovered_count);
320 }
321
322 info!("Encrypted database opened successfully");
323 Ok(db)
324 }
325
326 #[instrument]
342 pub fn open_in_memory() -> DbxResult<Self> {
343 info!("Creating in-memory database");
344 let db_index = Arc::new(HashIndex::new());
345 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
346 spawn_background_worker(rx, None, None, Arc::clone(&db_index));
347
348 Ok(Self {
349 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
350 wos: WosVariant::InMemory(Arc::new(
351 crate::storage::memory_wos::InMemoryWosBackend::new(),
352 )),
353 schemas: Arc::new(RwLock::new(HashMap::new())),
354 tables: RwLock::new(HashMap::new()),
355 table_schemas: Arc::new(RwLock::new(HashMap::new())),
356 index: db_index,
357 row_counters: Arc::new(DashMap::new()),
358 sql_parser: SqlParser::new(),
359 sql_optimizer: QueryOptimizer::new(),
360 wal: None,
361 encrypted_wal: None,
362
363 encryption: RwLock::new(None),
364 tx_manager: Arc::new(TransactionManager::new()),
365 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
366 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
367 job_sender: Some(tx),
368 durability: DurabilityLevel::Lazy,
369 index_registry: RwLock::new(HashMap::new()),
370 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
371 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
372 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
373 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
374 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
375 parallel_engine: Arc::new(
376 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
377 .expect("Failed to create parallel engine"),
378 ),
379 })
380 }
381
382 pub fn open_in_memory_encrypted(encryption: EncryptionConfig) -> DbxResult<Self> {
402 let db_index = Arc::new(HashIndex::new());
403 let (tx, rx) = std::sync::mpsc::channel::<BackgroundJob>();
404 spawn_background_worker(rx, None, None, Arc::clone(&db_index));
405
406 Ok(Self {
407 delta: DeltaVariant::RowBased(Arc::new(DeltaStore::new())),
408 wos: WosVariant::Encrypted(Arc::new(EncryptedWosBackend::open_temporary(
409 encryption.clone(),
410 )?)),
411 schemas: Arc::new(RwLock::new(HashMap::new())),
412 tables: RwLock::new(HashMap::new()),
413 table_schemas: Arc::new(RwLock::new(HashMap::new())),
414 index: db_index,
415 row_counters: Arc::new(DashMap::new()),
416 sql_parser: SqlParser::new(),
417 sql_optimizer: QueryOptimizer::new(),
418 wal: None,
419 encrypted_wal: None,
420
421 encryption: RwLock::new(Some(encryption)),
422 tx_manager: Arc::new(TransactionManager::new()),
423 columnar_cache: Arc::new(crate::storage::columnar_cache::ColumnarCache::new()),
424 gpu_manager: crate::storage::gpu::GpuManager::try_new().map(Arc::new),
425 job_sender: Some(tx),
426 durability: DurabilityLevel::Lazy,
427 index_registry: RwLock::new(HashMap::new()),
428 automation_engine: Arc::new(crate::automation::ExecutionEngine::new()),
429 trigger_registry: crate::engine::automation_api::TriggerRegistry::new(),
430 trigger_executor: Arc::new(RwLock::new(crate::automation::TriggerExecutor::new())),
431 procedure_executor: Arc::new(RwLock::new(crate::automation::ProcedureExecutor::new())),
432 schedule_executor: Arc::new(RwLock::new(crate::automation::ScheduleExecutor::new())),
433 parallel_engine: Arc::new(
434 crate::engine::parallel_engine::ParallelExecutionEngine::new_auto()
435 .expect("Failed to create parallel engine"),
436 ),
437 })
438 }
439
440 pub fn open_safe(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
450 let db = Self::open(path.as_ref())?;
451 Arc::get_mut(&mut db.clone()).unwrap().durability = DurabilityLevel::Full;
452 Ok(db)
453 }
454
455 pub fn open_fast(path: impl AsRef<Path>) -> DbxResult<Arc<Self>> {
466 let db = Self::open(path.as_ref())?;
467 Arc::get_mut(&mut db.clone()).unwrap().durability = DurabilityLevel::None;
468 Ok(db)
469 }
470
471 pub fn open_with_durability(
478 path: impl AsRef<Path>,
479 durability: DurabilityLevel,
480 ) -> DbxResult<Arc<Self>> {
481 let db = Self::open(path.as_ref())?;
482 Arc::get_mut(&mut db.clone()).unwrap().durability = durability;
483 Ok(db)
484 }
485}