1use log::{debug, warn};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7use tempfile::TempDir;
8
9use crate::buffer::BUFFER_POOL_SIZE;
10use crate::catalog::load_catalog_data;
11use crate::catalog::registry::global_index_registry;
12use crate::config::{IndexVacuumConfig, WalConfig};
13use crate::error::{QuillSQLError, QuillSQLResult};
14use crate::optimizer::LogicalOptimizer;
15use crate::plan::logical_plan::{LogicalPlan, TransactionScope};
16use crate::plan::PhysicalPlanner;
17use crate::recovery::wal_record::CheckpointPayload;
18use crate::recovery::{ControlFileManager, RecoveryManager, WalManager};
19use crate::session::SessionContext;
20use crate::utils::util::{pretty_format_logical_plan, pretty_format_physical_plan};
21use crate::{
22 buffer::BufferPoolManager,
23 catalog::Catalog,
24 execution::{ExecutionContext, ExecutionEngine},
25 plan::{LogicalPlanner, PlannerContext},
26 storage::disk_manager::DiskManager,
27 storage::disk_scheduler::DiskScheduler,
28 storage::tuple::Tuple,
29 transaction::{IsolationLevel, TransactionManager},
30};
31use sqlparser::ast::TransactionAccessMode;
32
33#[derive(Debug, Default, Clone)]
34pub struct WalOptions {
35 pub directory: Option<PathBuf>,
36 pub segment_size: Option<u64>,
37 pub sync_on_flush: Option<bool>,
38 pub writer_interval_ms: Option<Option<u64>>,
39 pub buffer_capacity: Option<usize>,
40 pub flush_coalesce_bytes: Option<usize>,
41 pub synchronous_commit: Option<bool>,
42 pub checkpoint_interval_ms: Option<Option<u64>>,
43 pub retain_segments: Option<usize>,
44}
45
46#[derive(Debug, Clone, Default)]
47pub struct DatabaseOptions {
48 pub wal: WalOptions,
49 pub default_isolation_level: Option<IsolationLevel>,
50}
51
52pub struct Database {
53 pub(crate) buffer_pool: Arc<BufferPoolManager>,
54 pub(crate) catalog: Catalog,
55 pub(crate) wal_manager: Arc<WalManager>,
56 pub(crate) transaction_manager: Arc<TransactionManager>,
57 default_isolation: IsolationLevel,
58 temp_dir: Option<TempDir>,
59 checkpoint_stop: Arc<AtomicBool>,
60 checkpoint_handle: Option<thread::JoinHandle<()>>,
61 bg_writer_stop: Arc<AtomicBool>,
62 bg_writer_handle: Option<thread::JoinHandle<()>>,
63}
64impl Database {
65 pub fn new_on_disk(db_path: &str) -> QuillSQLResult<Self> {
66 Self::new_on_disk_with_options(db_path, DatabaseOptions::default())
67 }
68
69 pub fn new_on_disk_with_options(
70 db_path: &str,
71 options: DatabaseOptions,
72 ) -> QuillSQLResult<Self> {
73 let disk_manager = Arc::new(DiskManager::try_new(db_path)?);
74 let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
75 let buffer_pool = Arc::new(BufferPoolManager::new(
76 BUFFER_POOL_SIZE,
77 disk_scheduler.clone(),
78 ));
79
80 let wal_config = wal_config_for_path(db_path, &options.wal);
81 let synchronous_commit = wal_config.synchronous_commit;
82 let (control_file, wal_init) =
83 ControlFileManager::load_or_init(&wal_config.directory, wal_config.segment_size)?;
84 let control_file = Arc::new(control_file);
85 let wal_manager = Arc::new(WalManager::new(
86 wal_config.clone(),
87 disk_scheduler.clone(),
88 Some(wal_init),
89 Some(control_file.clone()),
90 )?);
91 let transaction_manager = Arc::new(TransactionManager::new(
92 wal_manager.clone(),
93 synchronous_commit,
94 ));
95 if let Some(interval) = wal_config.writer_interval_ms {
96 wal_manager.start_background_flush(Duration::from_millis(interval))?;
97 }
98 buffer_pool.set_wal_manager(wal_manager.clone());
99
100 let catalog = Catalog::new(buffer_pool.clone(), disk_manager.clone());
101
102 let recovery_summary = RecoveryManager::new(wal_manager.clone(), disk_scheduler.clone())
103 .with_buffer_pool(buffer_pool.clone())
104 .replay()?;
105 if recovery_summary.redo_count > 0 {
106 debug!(
107 "Recovery replayed {} record(s) starting at LSN {}",
108 recovery_summary.redo_count, recovery_summary.start_lsn
109 );
110 }
111 if !recovery_summary.loser_transactions.is_empty() {
112 warn!(
113 "{} transaction(s) require undo after recovery: {:?}",
114 recovery_summary.loser_transactions.len(),
115 recovery_summary.loser_transactions
116 );
117 }
118
119 let (checkpoint_stop, checkpoint_handle) = spawn_checkpoint_worker(
120 wal_manager.clone(),
121 buffer_pool.clone(),
122 transaction_manager.clone(),
123 wal_config.checkpoint_interval_ms,
124 );
125
126 let (bg_writer_stop, bg_writer_handle) = spawn_bg_writer(
127 buffer_pool.clone(),
128 std::env::var("QUILL_BG_WRITER_INTERVAL_MS")
129 .ok()
130 .and_then(|v| v.parse::<u64>().ok()),
131 IndexVacuumConfig::default(),
132 );
133
134 let mut db = Self {
135 buffer_pool,
136 catalog,
137 wal_manager,
138 transaction_manager,
139 default_isolation: options
140 .default_isolation_level
141 .unwrap_or(IsolationLevel::ReadUncommitted),
142 temp_dir: None,
143 checkpoint_stop,
144 checkpoint_handle,
145 bg_writer_stop,
146 bg_writer_handle,
147 };
148 load_catalog_data(&mut db)?;
149 Ok(db)
150 }
151
152 pub fn new_temp() -> QuillSQLResult<Self> {
153 Self::new_temp_with_options(DatabaseOptions::default())
154 }
155
156 pub fn new_temp_with_options(options: DatabaseOptions) -> QuillSQLResult<Self> {
157 let temp_dir = TempDir::new()?;
158 let temp_path = temp_dir.path().join("test.db");
159 let disk_manager =
160 Arc::new(DiskManager::try_new(temp_path.to_str().ok_or(
161 QuillSQLError::Internal("Invalid temp path".to_string()),
162 )?)?);
163 let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
164 let buffer_pool = Arc::new(BufferPoolManager::new(
165 BUFFER_POOL_SIZE,
166 disk_scheduler.clone(),
167 ));
168
169 let wal_config = wal_config_for_temp(temp_dir.path(), &options.wal);
170 let synchronous_commit = wal_config.synchronous_commit;
171 let (control_file, wal_init) =
172 ControlFileManager::load_or_init(&wal_config.directory, wal_config.segment_size)?;
173 let control_file = Arc::new(control_file);
174 let wal_manager = Arc::new(WalManager::new(
175 wal_config.clone(),
176 disk_scheduler.clone(),
177 Some(wal_init),
178 Some(control_file.clone()),
179 )?);
180 let transaction_manager = Arc::new(TransactionManager::new(
181 wal_manager.clone(),
182 synchronous_commit,
183 ));
184 if let Some(interval) = wal_config.writer_interval_ms {
185 wal_manager.start_background_flush(Duration::from_millis(interval))?;
186 }
187 buffer_pool.set_wal_manager(wal_manager.clone());
188
189 let catalog = Catalog::new(buffer_pool.clone(), disk_manager.clone());
190
191 let recovery_summary = RecoveryManager::new(wal_manager.clone(), disk_scheduler.clone())
192 .with_buffer_pool(buffer_pool.clone())
193 .replay()?;
194 if recovery_summary.redo_count > 0 {
195 debug!(
196 "Recovery replayed {} record(s) starting at LSN {}",
197 recovery_summary.redo_count, recovery_summary.start_lsn
198 );
199 }
200 if !recovery_summary.loser_transactions.is_empty() {
201 warn!(
202 "{} transaction(s) require undo after recovery: {:?}",
203 recovery_summary.loser_transactions.len(),
204 recovery_summary.loser_transactions
205 );
206 }
207
208 let (checkpoint_stop, checkpoint_handle) = spawn_checkpoint_worker(
209 wal_manager.clone(),
210 buffer_pool.clone(),
211 transaction_manager.clone(),
212 wal_config.checkpoint_interval_ms,
213 );
214
215 let (bg_writer_stop, bg_writer_handle) = spawn_bg_writer(
216 buffer_pool.clone(),
217 std::env::var("QUILL_BG_WRITER_INTERVAL_MS")
218 .ok()
219 .and_then(|v| v.parse::<u64>().ok()),
220 IndexVacuumConfig::default(),
221 );
222
223 let mut db = Self {
224 buffer_pool,
225 catalog,
226 wal_manager,
227 transaction_manager,
228 default_isolation: options
229 .default_isolation_level
230 .unwrap_or(IsolationLevel::ReadUncommitted),
231 temp_dir: Some(temp_dir),
232 checkpoint_stop,
233 checkpoint_handle,
234 bg_writer_stop,
235 bg_writer_handle,
236 };
237 load_catalog_data(&mut db)?;
238 Ok(db)
239 }
240
241 pub fn run(&mut self, sql: &str) -> QuillSQLResult<Vec<Tuple>> {
242 let mut session = SessionContext::new(self.default_isolation);
243 self.run_with_session(&mut session, sql)
244 }
245
246 pub fn run_with_session(
247 &mut self,
248 session: &mut SessionContext,
249 sql: &str,
250 ) -> QuillSQLResult<Vec<Tuple>> {
251 let logical_plan = self.create_logical_plan(sql)?;
252 debug!(
253 "Logical Plan: \n{}",
254 pretty_format_logical_plan(&logical_plan)
255 );
256
257 let optimized_logical_plan = LogicalOptimizer::new().optimize(&logical_plan)?;
258 debug!(
259 "Optimized Logical Plan: \n{}",
260 pretty_format_logical_plan(&logical_plan)
261 );
262
263 let physical_planner = PhysicalPlanner {
264 catalog: &self.catalog,
265 };
266 let physical_plan = physical_planner.create_physical_plan(optimized_logical_plan.clone());
267 debug!(
268 "Physical Plan: \n{}",
269 pretty_format_physical_plan(&physical_plan)
270 );
271
272 let execution_result = match optimized_logical_plan.clone() {
273 LogicalPlan::BeginTransaction(modes) => {
274 if session.has_active_transaction() {
275 return Err(QuillSQLError::Execution(
276 "transaction already active".to_string(),
277 ));
278 }
279 let txn = self.transaction_manager.begin(
280 modes.unwrap_effective_isolation(session.default_isolation()),
281 modes
282 .access_mode
283 .unwrap_or(TransactionAccessMode::ReadWrite),
284 )?;
285 session.set_active_transaction(txn)?;
286 Ok(vec![])
287 }
288 LogicalPlan::CommitTransaction => {
289 let txn_ref = session
290 .active_txn_mut()
291 .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
292 self.transaction_manager.commit(txn_ref)?;
293 session.take_active_transaction();
294 Ok(vec![])
295 }
296 LogicalPlan::RollbackTransaction => {
297 let txn_ref = session
298 .active_txn_mut()
299 .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
300 self.transaction_manager.abort(txn_ref)?;
301 session.take_active_transaction();
302 Ok(vec![])
303 }
304 LogicalPlan::SetTransaction { scope, modes } => {
305 match scope {
306 TransactionScope::Session => {
307 session.apply_session_modes(&modes);
308 }
309 TransactionScope::Transaction => {
310 session.apply_transaction_modes(&modes);
311 }
312 }
313 Ok(vec![])
314 }
315 plan => {
316 let txn_existed = session.has_active_transaction();
317 if !txn_existed {
318 let new_txn = self.transaction_manager.begin(
319 session.default_isolation(),
320 TransactionAccessMode::ReadWrite,
321 )?;
322 session.set_active_transaction(new_txn)?;
323 }
324
325 let execution_result = {
326 let txn_ref = session
327 .active_txn_mut()
328 .expect("session must hold an active transaction");
329 let execution_ctx = ExecutionContext::new(
330 &mut self.catalog,
331 txn_ref,
332 &self.transaction_manager,
333 );
334 let mut execution_engine = ExecutionEngine {
335 context: execution_ctx,
336 };
337 execution_engine.execute(Arc::new(physical_plan))
338 };
339
340 match execution_result {
341 Ok(tuples) => {
342 if !txn_existed && session.autocommit() {
343 {
344 let txn_ref = session
345 .active_txn_mut()
346 .expect("transaction should still be active");
347 self.transaction_manager.commit(txn_ref)?;
348 }
349 session.take_active_transaction();
350 }
351 Ok(tuples)
352 }
353 Err(e) => {
354 if txn_existed {
355 if let Some(txn_ref) = session.active_txn_mut() {
356 txn_ref.mark_tainted();
357 }
358 } else {
359 if let Some(txn_ref) = session.active_txn_mut() {
360 let _ = self.transaction_manager.abort(txn_ref);
361 }
362 session.take_active_transaction();
363 }
364 Err(e)
365 }
366 }
367 }
368 };
369
370 execution_result
371 }
372
373 pub fn default_isolation(&self) -> IsolationLevel {
374 self.default_isolation
375 }
376
377 pub fn create_logical_plan(&mut self, sql: &str) -> QuillSQLResult<LogicalPlan> {
378 let stmts = crate::sql::parser::parse_sql(sql)?;
380 if stmts.len() != 1 {
381 return Err(QuillSQLError::NotSupport(
382 "only support one sql statement".to_string(),
383 ));
384 }
385 let stmt = &stmts[0];
386 let mut planner = LogicalPlanner {
387 context: PlannerContext {
388 catalog: &self.catalog,
389 },
390 };
391 planner.plan(stmt)
393 }
394
395 pub fn flush(&self) -> QuillSQLResult<()> {
396 let _ = self.wal_manager.flush(None)?;
397 self.buffer_pool.flush_all_pages()
398 }
399
400 pub fn transaction_manager(&self) -> Arc<TransactionManager> {
401 self.transaction_manager.clone()
402 }
403}
404
405impl Drop for Database {
406 fn drop(&mut self) {
407 self.checkpoint_stop.store(true, Ordering::Relaxed);
408 if let Some(handle) = self.checkpoint_handle.take() {
409 if let Err(join_err) = handle.join() {
410 warn!("Checkpoint worker terminated with panic: {:?}", join_err);
411 }
412 }
413 self.bg_writer_stop.store(true, Ordering::Relaxed);
414 if let Some(handle) = self.bg_writer_handle.take() {
415 if let Err(join_err) = handle.join() {
416 warn!("BG writer terminated with panic: {:?}", join_err);
417 }
418 }
419 }
420}
421
422fn wal_config_for_path(db_path: &str, overrides: &WalOptions) -> WalConfig {
423 let mut config = WalConfig::default();
424 config.directory = overrides
425 .directory
426 .clone()
427 .unwrap_or_else(|| wal_directory_from_path(db_path));
428 if let Some(size) = overrides.segment_size {
429 config.segment_size = size;
430 }
431 if let Some(sync) = overrides.sync_on_flush {
432 config.sync_on_flush = sync;
433 }
434 if let Some(interval) = overrides.writer_interval_ms.clone() {
435 config.writer_interval_ms = interval;
436 }
437 if let Some(capacity) = overrides.buffer_capacity {
438 config.buffer_capacity = capacity;
439 }
440 if let Some(bytes) = overrides.flush_coalesce_bytes {
441 config.flush_coalesce_bytes = bytes;
442 }
443 if let Some(sync_commit) = overrides.synchronous_commit {
444 config.synchronous_commit = sync_commit;
445 }
446 if let Some(interval) = overrides.checkpoint_interval_ms.clone() {
447 config.checkpoint_interval_ms = interval;
448 }
449 if let Some(retain) = overrides.retain_segments {
450 config.retain_segments = retain.max(1);
451 }
452 config
453}
454
455fn wal_directory_from_path(db_path: &str) -> PathBuf {
456 let mut base = PathBuf::from(db_path);
457 base.set_extension("wal");
458 if base.extension().is_none() {
459 PathBuf::from(format!("{}.wal", db_path))
460 } else {
461 base
462 }
463}
464
465fn wal_config_for_temp(temp_root: &Path, overrides: &WalOptions) -> WalConfig {
466 let mut config = WalConfig::default();
467 config.directory = overrides
468 .directory
469 .clone()
470 .unwrap_or_else(|| temp_root.join("wal"));
471 if let Some(size) = overrides.segment_size {
472 config.segment_size = size;
473 }
474 if let Some(sync) = overrides.sync_on_flush {
475 config.sync_on_flush = sync;
476 }
477 if let Some(interval) = overrides.writer_interval_ms.clone() {
478 config.writer_interval_ms = interval;
479 }
480 if let Some(capacity) = overrides.buffer_capacity {
481 config.buffer_capacity = capacity;
482 }
483 if let Some(bytes) = overrides.flush_coalesce_bytes {
484 config.flush_coalesce_bytes = bytes;
485 }
486 if let Some(sync_commit) = overrides.synchronous_commit {
487 config.synchronous_commit = sync_commit;
488 }
489 if let Some(interval) = overrides.checkpoint_interval_ms.clone() {
490 config.checkpoint_interval_ms = interval;
491 }
492 if let Some(retain) = overrides.retain_segments {
493 config.retain_segments = retain.max(1);
494 }
495 config
496}
497
498fn spawn_checkpoint_worker(
499 wal_manager: Arc<WalManager>,
500 buffer_pool: Arc<BufferPoolManager>,
501 transaction_manager: Arc<TransactionManager>,
502 interval_ms: Option<u64>,
503) -> (Arc<AtomicBool>, Option<thread::JoinHandle<()>>) {
504 let stop = Arc::new(AtomicBool::new(false));
505 let Some(ms) = interval_ms else {
506 return (stop, None);
507 };
508 if ms == 0 {
509 return (stop, None);
510 }
511 let stop_flag = stop.clone();
512 let wal = wal_manager.clone();
513 let bp = buffer_pool.clone();
514 let txn_mgr = transaction_manager.clone();
515 let interval = Duration::from_millis(ms);
516 let handle = thread::Builder::new()
517 .name("checkpoint-worker".into())
518 .spawn(move || {
519 while !stop_flag.load(Ordering::Relaxed) {
520 let dirty_pages = bp.dirty_page_ids();
521 let dpt_snapshot = bp.dirty_page_table_snapshot();
522 let active_txns = txn_mgr.active_transactions();
523 let last_lsn = wal.max_assigned_lsn();
524
525 if last_lsn != 0 {
526 if let Err(e) = wal.flush_until(last_lsn) {
527 warn!("Checkpoint flush failed: {}", e);
528 }
529 let payload = CheckpointPayload {
530 last_lsn,
531 dirty_pages,
532 active_transactions: active_txns,
533 dpt: dpt_snapshot,
534 };
535 if let Err(e) = wal.log_checkpoint(payload) {
536 warn!("Checkpoint write failed: {}", e);
537 }
538 }
539
540 if stop_flag.load(Ordering::Relaxed) {
541 break;
542 }
543 thread::sleep(interval);
544 }
545 })
546 .map(Some)
547 .unwrap_or_else(|err| {
548 warn!("Failed to spawn checkpoint worker: {}", err);
549 None
550 });
551
552 (stop, handle)
553}
554
555fn spawn_bg_writer(
556 buffer_pool: Arc<BufferPoolManager>,
557 interval_ms: Option<u64>,
558 vacuum_cfg: IndexVacuumConfig,
559) -> (Arc<AtomicBool>, Option<thread::JoinHandle<()>>) {
560 let stop = Arc::new(AtomicBool::new(false));
561 let Some(ms) = interval_ms else {
562 return (stop, None);
563 };
564 if ms == 0 {
565 return (stop, None);
566 }
567 let stop_flag = stop.clone();
568 let bp = buffer_pool.clone();
569 let interval = Duration::from_millis(ms);
570 let handle = thread::Builder::new()
571 .name("bg-writer".into())
572 .spawn(move || {
573 while !stop_flag.load(Ordering::Relaxed) {
574 let dirty_ids = bp.dirty_page_ids();
576 for page_id in dirty_ids.into_iter().take(16) {
577 let _ = bp.flush_page(page_id);
578 }
579
580 let registry = global_index_registry();
583 for (idx, heap) in registry.iter().take(16) {
584 let pending = idx.take_pending_garbage();
585 if pending >= vacuum_cfg.trigger_threshold {
586 let _ = idx.lazy_cleanup_with(
589 |rid| heap.tuple_meta(*rid).map(|m| m.is_deleted).unwrap_or(false),
590 Some(vacuum_cfg.batch_limit),
591 );
592 }
593 }
594
595 if stop_flag.load(Ordering::Relaxed) {
596 break;
597 }
598 thread::sleep(interval);
599 }
600 })
601 .map(Some)
602 .unwrap_or_else(|err| {
603 warn!("Failed to spawn bg writer: {}", err);
604 None
605 });
606 (stop, handle)
607}