1use crate::background::{BackgroundWorkers, WorkerHandle, WorkerKind, WorkerMetadata};
2use log::{debug, warn};
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8use tempfile::TempDir;
9
10use crate::buffer::{BufferManager, BUFFER_POOL_SIZE};
11use crate::catalog::load_catalog_data;
12use crate::catalog::registry::global_index_registry;
13use crate::config::{background_config, IndexVacuumConfig, WalConfig};
14use crate::error::{QuillSQLError, QuillSQLResult};
15use crate::optimizer::LogicalOptimizer;
16use crate::plan::logical_plan::{LogicalPlan, TransactionScope};
17use crate::plan::PhysicalPlanner;
18use crate::recovery::wal::codec::CheckpointPayload;
19use crate::recovery::{ControlFileManager, RecoveryManager, WalManager, WalWriterHandle};
20use crate::session::SessionContext;
21use crate::utils::util::{pretty_format_logical_plan, pretty_format_physical_plan};
22use crate::{
23 catalog::Catalog,
24 execution::ExecutionEngine,
25 plan::{LogicalPlanner, PlannerContext},
26 storage::disk_manager::DiskManager,
27 storage::disk_scheduler::DiskScheduler,
28 storage::tuple::Tuple,
29 transaction::{IsolationLevel, TransactionManager},
30};
31use sqlparser::ast::TransactionAccessMode;
32
33#[derive(Debug, Default, Clone)]
34pub struct WalOptions {
35 pub directory: Option<PathBuf>,
36 pub segment_size: Option<u64>,
37 pub sync_on_flush: Option<bool>,
38 pub writer_interval_ms: Option<Option<u64>>,
39 pub buffer_capacity: Option<usize>,
40 pub flush_coalesce_bytes: Option<usize>,
41 pub synchronous_commit: Option<bool>,
42 pub checkpoint_interval_ms: Option<Option<u64>>,
43 pub retain_segments: Option<usize>,
44}
45
46#[derive(Debug, Clone, Default)]
47pub struct DatabaseOptions {
48 pub wal: WalOptions,
49 pub default_isolation_level: Option<IsolationLevel>,
50}
51
52pub struct Database {
53 pub(crate) buffer_pool: Arc<BufferManager>,
54 pub(crate) catalog: Catalog,
55 background_workers: BackgroundWorkers,
56 pub(crate) wal_manager: Arc<WalManager>,
57 pub(crate) transaction_manager: Arc<TransactionManager>,
58 default_isolation: IsolationLevel,
59 temp_dir: Option<TempDir>,
60}
61impl Database {
62 pub fn new_on_disk(db_path: &str) -> QuillSQLResult<Self> {
63 Self::new_on_disk_with_options(db_path, DatabaseOptions::default())
64 }
65
66 pub fn new_on_disk_with_options(
67 db_path: &str,
68 options: DatabaseOptions,
69 ) -> QuillSQLResult<Self> {
70 let disk_manager = Arc::new(DiskManager::try_new(db_path)?);
71 let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
72 let buffer_pool = Arc::new(BufferManager::new(BUFFER_POOL_SIZE, disk_scheduler.clone()));
73
74 let wal_config = wal_config_for_path(db_path, &options.wal);
75 let synchronous_commit = wal_config.synchronous_commit;
76 let (control_file, wal_init) =
77 ControlFileManager::load_or_init(&wal_config.directory, wal_config.segment_size)?;
78 let control_file = Arc::new(control_file);
79 let wal_manager = Arc::new(WalManager::new_with_scheduler(
80 wal_config.clone(),
81 Some(wal_init),
82 Some(control_file.clone()),
83 disk_scheduler.clone(),
84 )?);
85 let transaction_manager = Arc::new(TransactionManager::new(
86 wal_manager.clone(),
87 synchronous_commit,
88 ));
89
90 let worker_cfg = background_config(&wal_config, IndexVacuumConfig::default());
91 let mut background_workers = BackgroundWorkers::new();
92 if let Some(interval) = worker_cfg.wal_writer_interval {
93 if let Some(handle) = wal_manager.start_background_flush(interval)? {
94 background_workers.register(wal_writer_worker(handle, interval));
95 }
96 }
97 buffer_pool.set_wal_manager(wal_manager.clone());
98
99 let catalog = Catalog::new(buffer_pool.clone(), disk_manager.clone());
100
101 let recovery_summary = RecoveryManager::new(wal_manager.clone(), disk_scheduler.clone())
102 .with_buffer_pool(buffer_pool.clone())
103 .replay()?;
104 if recovery_summary.redo_count > 0 {
105 debug!(
106 "Recovery replayed {} record(s) starting at LSN {}",
107 recovery_summary.redo_count, recovery_summary.start_lsn
108 );
109 }
110 if !recovery_summary.loser_transactions.is_empty() {
111 warn!(
112 "{} transaction(s) require undo after recovery: {:?}",
113 recovery_summary.loser_transactions.len(),
114 recovery_summary.loser_transactions
115 );
116 }
117
118 background_workers.register_opt(spawn_checkpoint_worker(
119 wal_manager.clone(),
120 buffer_pool.clone(),
121 transaction_manager.clone(),
122 worker_cfg.checkpoint_interval,
123 ));
124
125 background_workers.register_opt(spawn_bg_writer(
126 buffer_pool.clone(),
127 worker_cfg.bg_writer_interval,
128 worker_cfg.vacuum,
129 ));
130
131 let mut db = Self {
132 buffer_pool,
133 catalog,
134 background_workers,
135 wal_manager,
136 transaction_manager,
137 default_isolation: options
138 .default_isolation_level
139 .unwrap_or(IsolationLevel::ReadUncommitted),
140 temp_dir: None,
141 };
142 load_catalog_data(&mut db)?;
143 Ok(db)
144 }
145
146 pub fn new_temp() -> QuillSQLResult<Self> {
147 Self::new_temp_with_options(DatabaseOptions::default())
148 }
149
150 pub fn new_temp_with_options(options: DatabaseOptions) -> QuillSQLResult<Self> {
151 let temp_dir = TempDir::new()?;
152 let temp_path = temp_dir.path().join("test.db");
153 let disk_manager =
154 Arc::new(DiskManager::try_new(temp_path.to_str().ok_or(
155 QuillSQLError::Internal("Invalid temp path".to_string()),
156 )?)?);
157 let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
158 let buffer_pool = Arc::new(BufferManager::new(BUFFER_POOL_SIZE, disk_scheduler.clone()));
159
160 let wal_config = wal_config_for_temp(temp_dir.path(), &options.wal);
161 let synchronous_commit = wal_config.synchronous_commit;
162 let (control_file, wal_init) =
163 ControlFileManager::load_or_init(&wal_config.directory, wal_config.segment_size)?;
164 let control_file = Arc::new(control_file);
165 let wal_manager = Arc::new(WalManager::new_with_scheduler(
166 wal_config.clone(),
167 Some(wal_init),
168 Some(control_file.clone()),
169 disk_scheduler.clone(),
170 )?);
171 let transaction_manager = Arc::new(TransactionManager::new(
172 wal_manager.clone(),
173 synchronous_commit,
174 ));
175
176 let worker_cfg = background_config(&wal_config, IndexVacuumConfig::default());
177 let mut background_workers = BackgroundWorkers::new();
178 if let Some(interval) = worker_cfg.wal_writer_interval {
179 if let Some(handle) = wal_manager.start_background_flush(interval)? {
180 background_workers.register(wal_writer_worker(handle, interval));
181 }
182 }
183 buffer_pool.set_wal_manager(wal_manager.clone());
184
185 let catalog = Catalog::new(buffer_pool.clone(), disk_manager.clone());
186
187 let recovery_summary = RecoveryManager::new(wal_manager.clone(), disk_scheduler.clone())
188 .with_buffer_pool(buffer_pool.clone())
189 .replay()?;
190 if recovery_summary.redo_count > 0 {
191 debug!(
192 "Recovery replayed {} record(s) starting at LSN {}",
193 recovery_summary.redo_count, recovery_summary.start_lsn
194 );
195 }
196 if !recovery_summary.loser_transactions.is_empty() {
197 warn!(
198 "{} transaction(s) require undo after recovery: {:?}",
199 recovery_summary.loser_transactions.len(),
200 recovery_summary.loser_transactions
201 );
202 }
203
204 background_workers.register_opt(spawn_checkpoint_worker(
205 wal_manager.clone(),
206 buffer_pool.clone(),
207 transaction_manager.clone(),
208 worker_cfg.checkpoint_interval,
209 ));
210
211 background_workers.register_opt(spawn_bg_writer(
212 buffer_pool.clone(),
213 worker_cfg.bg_writer_interval,
214 worker_cfg.vacuum,
215 ));
216
217 let mut db = Self {
218 buffer_pool,
219 catalog,
220 background_workers,
221 wal_manager,
222 transaction_manager,
223 default_isolation: options
224 .default_isolation_level
225 .unwrap_or(IsolationLevel::ReadUncommitted),
226 temp_dir: Some(temp_dir),
227 };
228 load_catalog_data(&mut db)?;
229 Ok(db)
230 }
231
232 pub fn run(&mut self, sql: &str) -> QuillSQLResult<Vec<Tuple>> {
233 let mut session = SessionContext::new(self.default_isolation);
234 self.run_with_session(&mut session, sql)
235 }
236
237 pub fn run_with_session(
238 &mut self,
239 session: &mut SessionContext,
240 sql: &str,
241 ) -> QuillSQLResult<Vec<Tuple>> {
242 let logical_plan = self.create_logical_plan(sql)?;
243 debug!(
244 "Logical Plan: \n{}",
245 pretty_format_logical_plan(&logical_plan)
246 );
247
248 let optimized_logical_plan = LogicalOptimizer::new().optimize(&logical_plan)?;
249 debug!(
250 "Optimized Logical Plan: \n{}",
251 pretty_format_logical_plan(&logical_plan)
252 );
253
254 let physical_planner = PhysicalPlanner {
255 catalog: &self.catalog,
256 };
257 let physical_plan = physical_planner.create_physical_plan(optimized_logical_plan.clone());
258 debug!(
259 "Physical Plan: \n{}",
260 pretty_format_physical_plan(&physical_plan)
261 );
262
263 match optimized_logical_plan {
264 LogicalPlan::BeginTransaction(ref modes) => {
265 if session.has_active_transaction() {
266 return Err(QuillSQLError::Execution(
267 "transaction already active".to_string(),
268 ));
269 }
270 let txn = self.transaction_manager.begin(
271 modes.unwrap_effective_isolation(session.default_isolation()),
272 modes
273 .access_mode
274 .unwrap_or(TransactionAccessMode::ReadWrite),
275 )?;
276 session.set_active_transaction(txn)?;
277 Ok(vec![])
278 }
279 LogicalPlan::CommitTransaction => {
280 let txn_ref = session
281 .active_txn_mut()
282 .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
283 self.transaction_manager.commit(txn_ref)?;
284 session.clear_active_transaction();
285 Ok(vec![])
286 }
287 LogicalPlan::RollbackTransaction => {
288 let txn_ref = session
289 .active_txn_mut()
290 .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
291 self.transaction_manager.abort(txn_ref)?;
292 session.clear_active_transaction();
293 Ok(vec![])
294 }
295 LogicalPlan::SetTransaction {
296 ref scope,
297 ref modes,
298 } => {
299 match scope {
300 TransactionScope::Session => session.apply_session_modes(modes),
301 TransactionScope::Transaction => session.apply_transaction_modes(modes),
302 }
303 Ok(vec![])
304 }
305 _ => {
306 let needs_cleanup = !session.has_active_transaction();
307 let autocommit = session.autocommit();
308
309 let result = {
310 let txn = session.ensure_active_transaction(&self.transaction_manager)?;
311 let context = crate::execution::ExecutionContext::new(
312 &mut self.catalog,
313 txn,
314 &self.transaction_manager,
315 );
316 let mut engine = ExecutionEngine { context };
317 engine.execute(Arc::new(physical_plan))?
318 };
319
320 if autocommit && needs_cleanup {
321 if let Some(txn) = session.active_txn_mut() {
322 self.transaction_manager.commit(txn)?;
323 }
324 session.clear_active_transaction();
325 }
326
327 Ok(result)
328 }
329 }
330 }
331
332 pub fn default_isolation(&self) -> IsolationLevel {
333 self.default_isolation
334 }
335
336 pub fn create_logical_plan(&mut self, sql: &str) -> QuillSQLResult<LogicalPlan> {
337 let stmts = crate::sql::parser::parse_sql(sql)?;
339 if stmts.len() != 1 {
340 return Err(QuillSQLError::NotSupport(
341 "only support one sql statement".to_string(),
342 ));
343 }
344 let stmt = &stmts[0];
345 let mut planner = LogicalPlanner {
346 context: PlannerContext {
347 catalog: &self.catalog,
348 },
349 };
350 planner.plan(stmt)
352 }
353
354 pub fn flush(&self) -> QuillSQLResult<()> {
355 let _ = self.wal_manager.flush(None)?;
356 self.buffer_pool.flush_all_pages()
357 }
358
359 pub fn transaction_manager(&self) -> Arc<TransactionManager> {
360 self.transaction_manager.clone()
361 }
362}
363
364impl Drop for Database {
365 fn drop(&mut self) {
366 self.background_workers.shutdown_all();
367 }
368}
369
370fn wal_config_for_path(db_path: &str, overrides: &WalOptions) -> WalConfig {
371 let mut config = WalConfig::default();
372 config.directory = overrides
373 .directory
374 .clone()
375 .unwrap_or_else(|| wal_directory_from_path(db_path));
376 if let Some(size) = overrides.segment_size {
377 config.segment_size = size;
378 }
379 if let Some(sync) = overrides.sync_on_flush {
380 config.sync_on_flush = sync;
381 }
382 if let Some(interval) = overrides.writer_interval_ms.clone() {
383 config.writer_interval_ms = interval;
384 }
385 if let Some(capacity) = overrides.buffer_capacity {
386 config.buffer_capacity = capacity;
387 }
388 if let Some(bytes) = overrides.flush_coalesce_bytes {
389 config.flush_coalesce_bytes = bytes;
390 }
391 if let Some(sync_commit) = overrides.synchronous_commit {
392 config.synchronous_commit = sync_commit;
393 }
394 if let Some(interval) = overrides.checkpoint_interval_ms.clone() {
395 config.checkpoint_interval_ms = interval;
396 }
397 if let Some(retain) = overrides.retain_segments {
398 config.retain_segments = retain.max(1);
399 }
400 config
401}
402
403fn wal_directory_from_path(db_path: &str) -> PathBuf {
404 let mut base = PathBuf::from(db_path);
405 base.set_extension("wal");
406 if base.extension().is_none() {
407 PathBuf::from(format!("{}.wal", db_path))
408 } else {
409 base
410 }
411}
412
413fn wal_config_for_temp(temp_root: &Path, overrides: &WalOptions) -> WalConfig {
414 let mut config = WalConfig::default();
415 config.directory = overrides
416 .directory
417 .clone()
418 .unwrap_or_else(|| temp_root.join("wal"));
419 if let Some(size) = overrides.segment_size {
420 config.segment_size = size;
421 }
422 if let Some(sync) = overrides.sync_on_flush {
423 config.sync_on_flush = sync;
424 }
425 if let Some(interval) = overrides.writer_interval_ms.clone() {
426 config.writer_interval_ms = interval;
427 }
428 if let Some(capacity) = overrides.buffer_capacity {
429 config.buffer_capacity = capacity;
430 }
431 if let Some(bytes) = overrides.flush_coalesce_bytes {
432 config.flush_coalesce_bytes = bytes;
433 }
434 if let Some(sync_commit) = overrides.synchronous_commit {
435 config.synchronous_commit = sync_commit;
436 }
437 if let Some(interval) = overrides.checkpoint_interval_ms.clone() {
438 config.checkpoint_interval_ms = interval;
439 }
440 if let Some(retain) = overrides.retain_segments {
441 config.retain_segments = retain.max(1);
442 }
443 config
444}
445
446fn wal_writer_worker(handle: WalWriterHandle, interval: Duration) -> WorkerHandle {
447 WorkerHandle::new(
448 WorkerMetadata {
449 kind: WorkerKind::WalWriter,
450 interval: Some(interval),
451 },
452 move || {
453 if let Err(err) = handle.stop() {
454 warn!("Failed to stop WAL writer: {}", err);
455 }
456 },
457 None,
458 )
459}
460
461fn spawn_checkpoint_worker(
462 wal_manager: Arc<WalManager>,
463 buffer_pool: Arc<BufferManager>,
464 transaction_manager: Arc<TransactionManager>,
465 interval: Option<Duration>,
466) -> Option<WorkerHandle> {
467 let Some(interval) = interval else {
468 return None;
469 };
470 if interval.is_zero() {
471 return None;
472 }
473 let wal = wal_manager.clone();
474 let bp = buffer_pool.clone();
475 let txn_mgr = transaction_manager.clone();
476
477 spawn_periodic_worker(
478 "checkpoint-worker",
479 WorkerKind::Checkpoint,
480 interval,
481 move || {
482 let dirty_pages = bp.dirty_page_ids();
483 let dpt_snapshot = bp.dirty_page_table_snapshot();
484 let active_txns = txn_mgr.active_transactions();
485 let last_lsn = wal.max_assigned_lsn();
486
487 if last_lsn != 0 {
488 if let Err(e) = wal.flush_until(last_lsn) {
489 warn!("Checkpoint flush failed: {}", e);
490 }
491 let payload = CheckpointPayload {
492 last_lsn,
493 dirty_pages,
494 active_transactions: active_txns,
495 dpt: dpt_snapshot,
496 };
497 if let Err(e) = wal.log_checkpoint(payload) {
498 warn!("Checkpoint write failed: {}", e);
499 }
500 }
501 },
502 )
503}
504
505fn spawn_bg_writer(
506 buffer_pool: Arc<BufferManager>,
507 interval: Option<Duration>,
508 vacuum_cfg: IndexVacuumConfig,
509) -> Option<WorkerHandle> {
510 let Some(interval) = interval else {
511 return None;
512 };
513 if interval.is_zero() {
514 return None;
515 }
516 let bp = buffer_pool.clone();
517 spawn_periodic_worker(
518 "bg-writer",
519 WorkerKind::BufferPoolWriter,
520 interval,
521 move || {
522 let dirty_ids = bp.dirty_page_ids();
523 for page_id in dirty_ids.into_iter().take(16) {
524 let _ = bp.flush_page(page_id);
525 }
526
527 let registry = global_index_registry();
528 for (idx, heap) in registry.iter().take(16) {
529 let pending = idx.take_pending_garbage();
530 if pending >= vacuum_cfg.trigger_threshold {
531 let _ = idx.lazy_cleanup_with(
532 |rid| heap.tuple_meta(*rid).map(|m| m.is_deleted).unwrap_or(false),
533 Some(vacuum_cfg.batch_limit),
534 );
535 }
536 }
537 },
538 )
539}
540
541fn spawn_periodic_worker<F>(
542 name: &str,
543 kind: WorkerKind,
544 interval: Duration,
545 mut tick: F,
546) -> Option<WorkerHandle>
547where
548 F: FnMut() + Send + 'static,
549{
550 let stop_flag = Arc::new(AtomicBool::new(false));
551 let thread_flag = Arc::clone(&stop_flag);
552
553 match thread::Builder::new().name(name.into()).spawn(move || {
554 while !thread_flag.load(Ordering::Relaxed) {
555 tick();
556 if thread_flag.load(Ordering::Relaxed) {
557 break;
558 }
559 thread::sleep(interval);
560 }
561 }) {
562 Ok(join_handle) => {
563 let stop_handle = Arc::clone(&stop_flag);
564 Some(WorkerHandle::new(
565 WorkerMetadata {
566 kind,
567 interval: Some(interval),
568 },
569 move || {
570 stop_handle.store(true, Ordering::Release);
571 },
572 Some(join_handle),
573 ))
574 }
575 Err(err) => {
576 warn!("Failed to spawn {}: {}", name, err);
577 None
578 }
579 }
580}