1use crate::background::{self, BackgroundWorkers};
2use log::{debug, warn};
3use serde::Serialize;
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7use tempfile::TempDir;
8
9use crate::buffer::{BufferManager, BUFFER_POOL_SIZE};
10use crate::catalog::{load_catalog_data, registry::TableRegistry, TableStatistics};
11use crate::config::{background_config, IndexVacuumConfig, MvccVacuumConfig, WalConfig};
12use crate::error::{QuillSQLError, QuillSQLResult};
13use crate::execution::physical_plan::PhysicalPlan;
14use crate::optimizer::LogicalOptimizer;
15use crate::plan::logical_plan::{LogicalPlan, TransactionScope};
16use crate::plan::PhysicalPlanner;
17use crate::recovery::{ControlFileManager, RecoveryManager, WalManager};
18use crate::session::SessionContext;
19use crate::utils::{
20 table_ref::TableReference,
21 util::{pretty_format_logical_plan, pretty_format_physical_plan},
22};
23use crate::{
24 buffer::INVALID_PAGE_ID,
25 catalog::Catalog,
26 execution::ExecutionEngine,
27 plan::{LogicalPlanner, PlannerContext},
28 recovery::wal::{WalHeadDebug, WalSegmentDebug},
29 storage::{
30 disk_manager::DiskManager, disk_scheduler::DiskScheduler, tuple::Tuple,
31 DefaultStorageEngine, StorageEngine,
32 },
33 transaction::{
34 CommandId, IsolationLevel, LockDebugSnapshot, TransactionManager, TxnDebugSnapshot,
35 },
36};
37use sqlparser::ast::TransactionAccessMode;
38
39#[derive(Debug, Default, Clone)]
40pub struct WalOptions {
41 pub directory: Option<PathBuf>,
42 pub segment_size: Option<u64>,
43 pub sync_on_flush: Option<bool>,
44 pub persist_control_file_on_flush: Option<bool>,
45 pub writer_interval_ms: Option<Option<u64>>,
46 pub buffer_capacity: Option<usize>,
47 pub flush_coalesce_bytes: Option<usize>,
48 pub synchronous_commit: Option<bool>,
49 pub checkpoint_interval_ms: Option<Option<u64>>,
50 pub retain_segments: Option<usize>,
51}
52
53#[derive(Debug, Clone, Default)]
54pub struct DatabaseOptions {
55 pub wal: WalOptions,
56 pub default_isolation_level: Option<IsolationLevel>,
57}
58
59enum DatabaseLocation {
60 OnDisk(String),
61 Temporary,
62}
63
64fn bootstrap_storage(
65 location: DatabaseLocation,
66 wal_options: &WalOptions,
67) -> QuillSQLResult<(Arc<DiskManager>, WalConfig, Option<TempDir>)> {
68 match location {
69 DatabaseLocation::OnDisk(path) => {
70 let disk_manager = Arc::new(DiskManager::try_new(path.as_str())?);
71 let wal_config = wal_config_for_path(path.as_str(), wal_options);
72 Ok((disk_manager, wal_config, None))
73 }
74 DatabaseLocation::Temporary => {
75 let temp_dir = TempDir::new()?;
76 let temp_path = temp_dir.path().join("test.db");
77 let temp_str = temp_path
78 .to_str()
79 .ok_or_else(|| QuillSQLError::Internal("Invalid temp path".to_string()))?;
80 let disk_manager = Arc::new(DiskManager::try_new(temp_str)?);
81 let wal_config = wal_config_for_temp(temp_dir.path(), wal_options);
82 Ok((disk_manager, wal_config, Some(temp_dir)))
83 }
84 }
85}
86
87pub struct Database {
88 _temp_dir: Option<TempDir>,
89 pub(crate) buffer_pool: Arc<BufferManager>,
90 pub(crate) catalog: Catalog,
91 background_workers: BackgroundWorkers,
92 pub(crate) wal_manager: Arc<WalManager>,
93 pub(crate) transaction_manager: Arc<TransactionManager>,
94 default_isolation: IsolationLevel,
95 storage_engine: Arc<dyn StorageEngine>,
96 _table_registry: Arc<TableRegistry>,
97 debug_trace: Arc<Mutex<Option<DebugTrace>>>,
98}
99
100struct PreparedStatement {
101 optimized_logical_plan: LogicalPlan,
102 physical_plan: PhysicalPlan,
103}
104
105#[derive(Debug, Clone, Serialize)]
106pub struct DebugTrace {
107 pub logical_plan: String,
108 pub physical_plan: String,
109 pub rows: usize,
110 pub duration_ms: u128,
111 pub logical_tree: DebugPlanNode,
112 pub physical_tree: DebugPlanNode,
113}
114
115#[derive(Debug, Clone, Serialize)]
116pub struct BufferDebugStats {
117 pub capacity: usize,
118 pub free_frames: usize,
119 pub pinned_frames: usize,
120 pub dirty_frames: usize,
121 pub dirty_page_table: usize,
122}
123
124#[derive(Debug, Clone, Serialize)]
125pub struct WalSegmentsDebug {
126 pub segments: Vec<WalSegmentDebug>,
127}
128
129#[derive(Debug, Clone, Serialize)]
130pub struct DebugPlanNode {
131 pub op: String,
132 pub children: Vec<DebugPlanNode>,
133}
134
135#[derive(Debug, Clone, Serialize)]
136pub struct DebugPlanSnapshot {
137 pub logical: DebugPlanNode,
138 pub physical: DebugPlanNode,
139}
140
141#[derive(Debug, Clone, Serialize)]
142pub struct MvccVersionSample {
143 pub table: String,
144 pub rid: String,
145 pub insert_txn: u64,
146 pub delete_txn: u64,
147 pub visible: bool,
148}
149
150#[derive(Debug, Clone, Serialize)]
151pub struct MvccVersionsDebug {
152 pub samples: Vec<MvccVersionSample>,
153 pub note: String,
154}
155
156impl DebugPlanNode {
157 pub fn from_physical(plan: &PhysicalPlan) -> Self {
158 Self {
159 op: plan.display_name(),
160 children: plan
161 .inputs()
162 .iter()
163 .map(|child| Self::from_physical(child))
164 .collect(),
165 }
166 }
167
168 pub fn from_logical(plan: &LogicalPlan) -> Self {
169 Self {
170 op: plan.to_string(),
171 children: plan
172 .inputs()
173 .iter()
174 .map(|child| Self::from_logical(child))
175 .collect(),
176 }
177 }
178}
179impl Database {
180 pub fn new_on_disk(db_path: &str) -> QuillSQLResult<Self> {
181 Self::new_on_disk_with_options(db_path, DatabaseOptions::default())
182 }
183
184 pub fn new_on_disk_with_options(
185 db_path: &str,
186 options: DatabaseOptions,
187 ) -> QuillSQLResult<Self> {
188 Self::new_with_location(DatabaseLocation::OnDisk(db_path.to_string()), options)
189 }
190
191 pub fn new_temp() -> QuillSQLResult<Self> {
192 Self::new_temp_with_options(DatabaseOptions::default())
193 }
194
195 pub fn new_temp_with_options(options: DatabaseOptions) -> QuillSQLResult<Self> {
196 Self::new_with_location(DatabaseLocation::Temporary, options)
197 }
198
199 fn new_with_location(
200 location: DatabaseLocation,
201 options: DatabaseOptions,
202 ) -> QuillSQLResult<Self> {
203 let (disk_manager, wal_config, temp_dir) = bootstrap_storage(location, &options.wal)?;
204
205 let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
206 let buffer_pool = Arc::new(BufferManager::new(BUFFER_POOL_SIZE, disk_scheduler.clone()));
207
208 let synchronous_commit = wal_config.synchronous_commit;
209 let (control_file, wal_init) =
210 ControlFileManager::load_or_init(&wal_config.directory, wal_config.segment_size)?;
211 let control_file = Arc::new(control_file);
212 let wal_manager = Arc::new(WalManager::new_with_scheduler(
213 wal_config.clone(),
214 Some(wal_init),
215 Some(control_file.clone()),
216 disk_scheduler.clone(),
217 )?);
218 let transaction_manager = Arc::new(TransactionManager::new(
219 wal_manager.clone(),
220 synchronous_commit,
221 ));
222
223 let worker_cfg = background_config(
224 &wal_config,
225 IndexVacuumConfig::default(),
226 MvccVacuumConfig::default(),
227 );
228 let mut background_workers = BackgroundWorkers::new();
229 if let Some(interval) = worker_cfg.wal_writer_interval {
230 if let Some(handle) = wal_manager.start_background_flush(interval)? {
231 background_workers.register(background::wal_writer_worker(handle, interval));
232 }
233 }
234 buffer_pool.set_wal_manager(wal_manager.clone());
235
236 let table_registry = Arc::new(TableRegistry::new());
237 let catalog = Catalog::new(
238 buffer_pool.clone(),
239 disk_manager.clone(),
240 table_registry.clone(),
241 );
242 let storage_engine: Arc<dyn StorageEngine> = Arc::new(DefaultStorageEngine::default());
243
244 let recovery_summary = RecoveryManager::new(wal_manager.clone(), disk_scheduler.clone())
245 .with_buffer_pool(buffer_pool.clone())
246 .replay()?;
247 if recovery_summary.redo_count > 0 {
248 debug!(
249 "Recovery replayed {} record(s) starting at LSN {}",
250 recovery_summary.redo_count, recovery_summary.start_lsn
251 );
252 }
253 if !recovery_summary.loser_transactions.is_empty() {
254 warn!(
255 "{} transaction(s) require undo after recovery: {:?}",
256 recovery_summary.loser_transactions.len(),
257 recovery_summary.loser_transactions
258 );
259 }
260
261 let wal_for_workers: Arc<dyn background::CheckpointWal> = wal_manager.clone();
262 let buffer_for_workers: Arc<dyn background::BufferMaintenance> = buffer_pool.clone();
263 let txn_for_workers: Arc<dyn background::TxnSnapshotOps> = transaction_manager.clone();
264
265 background_workers.register_opt(background::spawn_checkpoint_worker(
266 wal_for_workers.clone(),
267 buffer_for_workers.clone(),
268 txn_for_workers.clone(),
269 worker_cfg.checkpoint_interval,
270 ));
271
272 background_workers.register_opt(background::spawn_bg_writer(
273 buffer_for_workers.clone(),
274 worker_cfg.bg_writer_interval,
275 ));
276
277 let mvcc_interval = if worker_cfg.mvcc_vacuum.interval_ms == 0 {
278 None
279 } else {
280 Some(Duration::from_millis(worker_cfg.mvcc_vacuum.interval_ms))
281 };
282 background_workers.register_opt(background::spawn_mvcc_vacuum_worker(
283 txn_for_workers,
284 mvcc_interval,
285 worker_cfg.mvcc_vacuum.batch_limit,
286 table_registry.clone(),
287 ));
288
289 let mut db = Self {
290 _temp_dir: temp_dir,
291 buffer_pool,
292 catalog,
293 background_workers,
294 wal_manager,
295 transaction_manager,
296 default_isolation: options
297 .default_isolation_level
298 .unwrap_or(IsolationLevel::ReadUncommitted),
299 storage_engine,
300 _table_registry: table_registry,
301 debug_trace: Arc::new(Mutex::new(None)),
302 };
303 load_catalog_data(&mut db)?;
304 Ok(db)
305 }
306
307 pub fn run(&mut self, sql: &str) -> QuillSQLResult<Vec<Tuple>> {
308 let mut session = SessionContext::new(self.default_isolation);
309 self.run_with_session(&mut session, sql)
310 }
311
312 pub fn run_with_session(
313 &mut self,
314 session: &mut SessionContext,
315 sql: &str,
316 ) -> QuillSQLResult<Vec<Tuple>> {
317 let start = Instant::now();
318 let PreparedStatement {
319 optimized_logical_plan,
320 physical_plan,
321 } = self.plan_statement(sql)?;
322 let logical_plan_str = pretty_format_logical_plan(&optimized_logical_plan);
323 let physical_plan_str = pretty_format_physical_plan(&physical_plan);
324 let logical_tree = DebugPlanNode::from_logical(&optimized_logical_plan);
325 let physical_tree = DebugPlanNode::from_physical(&physical_plan);
326
327 if let Some(result) = self.execute_transaction_control(session, &optimized_logical_plan)? {
328 return Ok(result);
329 }
330
331 let result = self.execute_physical_plan(session, physical_plan)?;
332
333 let elapsed = start.elapsed().as_millis();
334 let rows = result.len();
335 if let Ok(mut guard) = self.debug_trace.lock() {
336 *guard = Some(DebugTrace {
337 logical_plan: logical_plan_str,
338 physical_plan: physical_plan_str,
339 logical_tree,
340 physical_tree,
341 rows,
342 duration_ms: elapsed,
343 });
344 }
345
346 Ok(result)
347 }
348
349 pub fn default_isolation(&self) -> IsolationLevel {
350 self.default_isolation
351 }
352
353 pub fn create_logical_plan(&mut self, sql: &str) -> QuillSQLResult<LogicalPlan> {
354 let stmts = crate::sql::parser::parse_sql(sql)?;
356 if stmts.len() != 1 {
357 return Err(QuillSQLError::NotSupport(
358 "only support one sql statement".to_string(),
359 ));
360 }
361 let stmt = &stmts[0];
362 let mut planner = LogicalPlanner {
363 context: PlannerContext {
364 catalog: &self.catalog,
365 },
366 };
367 planner.plan(stmt)
369 }
370
371 pub fn analyze_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<TableStatistics> {
372 self.catalog.analyze_table(table_ref)
373 }
374
375 pub fn flush(&self) -> QuillSQLResult<()> {
376 let target = self.wal_manager.max_assigned_lsn();
377 let _ = self.wal_manager.flush_until(target)?;
378 self.wal_manager.persist_control_file()?;
379 self.buffer_pool.flush_all_pages()
380 }
381
382 pub fn debug_last_trace(&self) -> Option<DebugTrace> {
383 self.debug_trace.lock().ok().and_then(|guard| guard.clone())
384 }
385
386 pub fn debug_wal_head(&self) -> WalHeadDebug {
387 self.wal_manager.debug_head()
388 }
389
390 pub fn debug_wal_segments(&self) -> QuillSQLResult<WalSegmentsDebug> {
391 Ok(WalSegmentsDebug {
392 segments: self.wal_manager.debug_segments()?,
393 })
394 }
395
396 pub fn debug_wal_peek(
397 &self,
398 limit: usize,
399 ) -> QuillSQLResult<Vec<crate::recovery::wal::WalPeekDebug>> {
400 self.wal_manager.debug_peek(limit)
401 }
402
403 pub fn debug_lock_snapshot(&self) -> LockDebugSnapshot {
404 self.transaction_manager.lock_manager_arc().debug_snapshot()
405 }
406
407 pub fn debug_buffer_stats(&self) -> BufferDebugStats {
408 let frames = self.buffer_pool.frame_meta_snapshot();
409 let free_frames = frames
410 .iter()
411 .filter(|meta| meta.page_id == INVALID_PAGE_ID)
412 .count();
413 let pinned_frames = frames.iter().filter(|meta| meta.pin_count > 0).count();
414 let dirty_frames = frames.iter().filter(|meta| meta.is_dirty).count();
415 let dirty_page_table = self.buffer_pool.dirty_page_table_snapshot().len();
416 BufferDebugStats {
417 capacity: frames.len(),
418 free_frames,
419 pinned_frames,
420 dirty_frames,
421 dirty_page_table,
422 }
423 }
424
425 pub fn debug_txn_snapshot(&self) -> TxnDebugSnapshot {
426 self.transaction_manager.debug_snapshot()
427 }
428
429 pub fn debug_mvcc_versions(&self) -> QuillSQLResult<MvccVersionsDebug> {
430 let mut samples = Vec::new();
431 let max_samples = 20usize;
432 let snapshot = self
433 .transaction_manager
434 .snapshot(self.transaction_manager.next_txn_id_hint());
435
436 for (table_ref, heap) in self._table_registry.iter_tables() {
437 let mut current = heap.get_first_rid()?;
438 while let Some(rid) = current {
439 if samples.len() >= max_samples {
440 break;
441 }
442 let meta = heap.tuple_meta(rid)?;
443 let visible = snapshot.is_visible(&meta, 0 as CommandId, |tid| {
444 self.transaction_manager.transaction_status(tid)
445 });
446 samples.push(MvccVersionSample {
447 table: table_ref.to_string(),
448 rid: rid.to_string(),
449 insert_txn: meta.insert_txn_id,
450 delete_txn: meta.delete_txn_id,
451 visible,
452 });
453 current = heap.get_next_rid(rid)?;
454 }
455 if samples.len() >= max_samples {
456 break;
457 }
458 }
459
460 Ok(MvccVersionsDebug {
461 samples,
462 note: format!("sampled up to {} tuples", max_samples),
463 })
464 }
465
466 pub fn debug_last_plan(&self) -> Option<DebugPlanSnapshot> {
467 self.debug_trace
468 .lock()
469 .ok()
470 .and_then(|opt| opt.clone())
471 .map(|trace| DebugPlanSnapshot {
472 logical: trace.logical_tree,
473 physical: trace.physical_tree,
474 })
475 }
476
477 pub fn table_statistics(
478 &self,
479 table_ref: &TableReference,
480 ) -> Option<&crate::catalog::TableStatistics> {
481 self.catalog.table_statistics(table_ref)
482 }
483
484 pub fn transaction_manager(&self) -> Arc<TransactionManager> {
485 self.transaction_manager.clone()
486 }
487
488 fn plan_statement(&mut self, sql: &str) -> QuillSQLResult<PreparedStatement> {
489 let logical_plan = self.create_logical_plan(sql)?;
490 debug!(
491 "Logical Plan: \n{}",
492 pretty_format_logical_plan(&logical_plan)
493 );
494
495 let optimized_logical_plan = self.optimize_logical_plan(&logical_plan)?;
496 debug!(
497 "Optimized Logical Plan: \n{}",
498 pretty_format_logical_plan(&optimized_logical_plan)
499 );
500
501 let physical_plan = self.build_physical_plan(&optimized_logical_plan);
502 debug!(
503 "Physical Plan: \n{}",
504 pretty_format_physical_plan(&physical_plan)
505 );
506
507 Ok(PreparedStatement {
508 optimized_logical_plan,
509 physical_plan,
510 })
511 }
512
513 fn optimize_logical_plan(&self, logical_plan: &LogicalPlan) -> QuillSQLResult<LogicalPlan> {
514 LogicalOptimizer::new().optimize(logical_plan)
515 }
516
517 fn build_physical_plan(&self, logical_plan: &LogicalPlan) -> PhysicalPlan {
518 let physical_planner = PhysicalPlanner::new();
519 physical_planner.create_physical_plan(logical_plan.clone())
520 }
521
522 fn execute_transaction_control(
523 &self,
524 session: &mut SessionContext,
525 plan: &LogicalPlan,
526 ) -> QuillSQLResult<Option<Vec<Tuple>>> {
527 match plan {
528 LogicalPlan::BeginTransaction(modes) => {
529 if session.has_active_transaction() {
530 return Err(QuillSQLError::Execution(
531 "transaction already active".to_string(),
532 ));
533 }
534 let txn = self.transaction_manager.begin(
535 modes.unwrap_effective_isolation(session.default_isolation()),
536 modes
537 .access_mode
538 .unwrap_or(TransactionAccessMode::ReadWrite),
539 )?;
540 session.set_active_transaction(txn)?;
541 Ok(Some(vec![]))
542 }
543 LogicalPlan::CommitTransaction => {
544 let txn_ref = session
545 .active_txn_mut()
546 .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
547 self.transaction_manager.commit(txn_ref)?;
548 session.clear_active_transaction();
549 Ok(Some(vec![]))
550 }
551 LogicalPlan::RollbackTransaction => {
552 let txn_ref = session
553 .active_txn_mut()
554 .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
555 self.transaction_manager.abort(txn_ref)?;
556 session.clear_active_transaction();
557 Ok(Some(vec![]))
558 }
559 LogicalPlan::SetTransaction { scope, modes } => {
560 match scope {
561 TransactionScope::Session => session.apply_session_modes(modes),
562 TransactionScope::Transaction => session.apply_transaction_modes(modes),
563 }
564 Ok(Some(vec![]))
565 }
566 _ => Ok(None),
567 }
568 }
569
570 fn execute_physical_plan(
571 &mut self,
572 session: &mut SessionContext,
573 physical_plan: PhysicalPlan,
574 ) -> QuillSQLResult<Vec<Tuple>> {
575 let needs_cleanup = !session.has_active_transaction();
576 let autocommit = session.autocommit();
577 let result = {
578 let txn = session.ensure_active_transaction(&self.transaction_manager)?;
579 let context = crate::execution::ExecutionContext::new(
580 &mut self.catalog,
581 txn,
582 self.transaction_manager.clone(),
583 self.storage_engine.clone(),
584 );
585 let mut engine = ExecutionEngine { context };
586 engine.execute(Arc::new(physical_plan))?
587 };
588
589 if autocommit && needs_cleanup {
590 if let Some(txn) = session.active_txn_mut() {
591 self.transaction_manager.commit(txn)?;
592 }
593 session.clear_active_transaction();
594 }
595
596 Ok(result)
597 }
598}
599
600impl Drop for Database {
601 fn drop(&mut self) {
602 self.background_workers.shutdown_all();
603 }
604}
605
606fn wal_config_for_path(db_path: &str, overrides: &WalOptions) -> WalConfig {
607 build_wal_config(wal_directory_from_path(db_path), overrides)
608}
609
610fn wal_directory_from_path(db_path: &str) -> PathBuf {
611 let mut base = PathBuf::from(db_path);
612 base.set_extension("wal");
613 if base.extension().is_none() {
614 PathBuf::from(format!("{}.wal", db_path))
615 } else {
616 base
617 }
618}
619
620fn wal_config_for_temp(temp_root: &Path, overrides: &WalOptions) -> WalConfig {
621 build_wal_config(temp_root.join("wal"), overrides)
622}
623
624fn build_wal_config(default_directory: PathBuf, overrides: &WalOptions) -> WalConfig {
625 let mut config = WalConfig {
626 directory: overrides.directory.clone().unwrap_or(default_directory),
627 ..WalConfig::default()
628 };
629 if let Some(size) = overrides.segment_size {
630 config.segment_size = size;
631 }
632 if let Some(sync) = overrides.sync_on_flush {
633 config.sync_on_flush = sync;
634 }
635 if let Some(flag) = overrides.persist_control_file_on_flush {
636 config.persist_control_file_on_flush = flag;
637 }
638 if let Some(interval) = overrides.writer_interval_ms {
639 config.writer_interval_ms = interval;
640 }
641 if let Some(capacity) = overrides.buffer_capacity {
642 config.buffer_capacity = capacity;
643 }
644 if let Some(bytes) = overrides.flush_coalesce_bytes {
645 config.flush_coalesce_bytes = bytes;
646 }
647 if let Some(sync_commit) = overrides.synchronous_commit {
648 config.synchronous_commit = sync_commit;
649 }
650 if let Some(interval) = overrides.checkpoint_interval_ms {
651 config.checkpoint_interval_ms = interval;
652 }
653 if let Some(retain) = overrides.retain_segments {
654 config.retain_segments = retain.max(1);
655 }
656 config
657}