quill_sql/
database.rs

1use crate::background::{self, BackgroundWorkers};
2use log::{debug, warn};
3use serde::Serialize;
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7use tempfile::TempDir;
8
9use crate::buffer::{BufferManager, BUFFER_POOL_SIZE};
10use crate::catalog::{load_catalog_data, registry::TableRegistry, TableStatistics};
11use crate::config::{background_config, IndexVacuumConfig, MvccVacuumConfig, WalConfig};
12use crate::error::{QuillSQLError, QuillSQLResult};
13use crate::execution::physical_plan::PhysicalPlan;
14use crate::optimizer::LogicalOptimizer;
15use crate::plan::logical_plan::{LogicalPlan, TransactionScope};
16use crate::plan::PhysicalPlanner;
17use crate::recovery::{ControlFileManager, RecoveryManager, WalManager};
18use crate::session::SessionContext;
19use crate::utils::{
20    table_ref::TableReference,
21    util::{pretty_format_logical_plan, pretty_format_physical_plan},
22};
23use crate::{
24    buffer::INVALID_PAGE_ID,
25    catalog::Catalog,
26    execution::ExecutionEngine,
27    plan::{LogicalPlanner, PlannerContext},
28    recovery::wal::{WalHeadDebug, WalSegmentDebug},
29    storage::{
30        disk_manager::DiskManager, disk_scheduler::DiskScheduler, tuple::Tuple,
31        DefaultStorageEngine, StorageEngine,
32    },
33    transaction::{
34        CommandId, IsolationLevel, LockDebugSnapshot, TransactionManager, TxnDebugSnapshot,
35    },
36};
37use sqlparser::ast::TransactionAccessMode;
38
39#[derive(Debug, Default, Clone)]
40pub struct WalOptions {
41    pub directory: Option<PathBuf>,
42    pub segment_size: Option<u64>,
43    pub sync_on_flush: Option<bool>,
44    pub persist_control_file_on_flush: Option<bool>,
45    pub writer_interval_ms: Option<Option<u64>>,
46    pub buffer_capacity: Option<usize>,
47    pub flush_coalesce_bytes: Option<usize>,
48    pub synchronous_commit: Option<bool>,
49    pub checkpoint_interval_ms: Option<Option<u64>>,
50    pub retain_segments: Option<usize>,
51}
52
53#[derive(Debug, Clone, Default)]
54pub struct DatabaseOptions {
55    pub wal: WalOptions,
56    pub default_isolation_level: Option<IsolationLevel>,
57}
58
59enum DatabaseLocation {
60    OnDisk(String),
61    Temporary,
62}
63
64fn bootstrap_storage(
65    location: DatabaseLocation,
66    wal_options: &WalOptions,
67) -> QuillSQLResult<(Arc<DiskManager>, WalConfig, Option<TempDir>)> {
68    match location {
69        DatabaseLocation::OnDisk(path) => {
70            let disk_manager = Arc::new(DiskManager::try_new(path.as_str())?);
71            let wal_config = wal_config_for_path(path.as_str(), wal_options);
72            Ok((disk_manager, wal_config, None))
73        }
74        DatabaseLocation::Temporary => {
75            let temp_dir = TempDir::new()?;
76            let temp_path = temp_dir.path().join("test.db");
77            let temp_str = temp_path
78                .to_str()
79                .ok_or_else(|| QuillSQLError::Internal("Invalid temp path".to_string()))?;
80            let disk_manager = Arc::new(DiskManager::try_new(temp_str)?);
81            let wal_config = wal_config_for_temp(temp_dir.path(), wal_options);
82            Ok((disk_manager, wal_config, Some(temp_dir)))
83        }
84    }
85}
86
87pub struct Database {
88    _temp_dir: Option<TempDir>,
89    pub(crate) buffer_pool: Arc<BufferManager>,
90    pub(crate) catalog: Catalog,
91    background_workers: BackgroundWorkers,
92    pub(crate) wal_manager: Arc<WalManager>,
93    pub(crate) transaction_manager: Arc<TransactionManager>,
94    default_isolation: IsolationLevel,
95    storage_engine: Arc<dyn StorageEngine>,
96    _table_registry: Arc<TableRegistry>,
97    debug_trace: Arc<Mutex<Option<DebugTrace>>>,
98}
99
100struct PreparedStatement {
101    optimized_logical_plan: LogicalPlan,
102    physical_plan: PhysicalPlan,
103}
104
105#[derive(Debug, Clone, Serialize)]
106pub struct DebugTrace {
107    pub logical_plan: String,
108    pub physical_plan: String,
109    pub rows: usize,
110    pub duration_ms: u128,
111    pub logical_tree: DebugPlanNode,
112    pub physical_tree: DebugPlanNode,
113}
114
115#[derive(Debug, Clone, Serialize)]
116pub struct BufferDebugStats {
117    pub capacity: usize,
118    pub free_frames: usize,
119    pub pinned_frames: usize,
120    pub dirty_frames: usize,
121    pub dirty_page_table: usize,
122}
123
124#[derive(Debug, Clone, Serialize)]
125pub struct WalSegmentsDebug {
126    pub segments: Vec<WalSegmentDebug>,
127}
128
129#[derive(Debug, Clone, Serialize)]
130pub struct DebugPlanNode {
131    pub op: String,
132    pub children: Vec<DebugPlanNode>,
133}
134
135#[derive(Debug, Clone, Serialize)]
136pub struct DebugPlanSnapshot {
137    pub logical: DebugPlanNode,
138    pub physical: DebugPlanNode,
139}
140
141#[derive(Debug, Clone, Serialize)]
142pub struct MvccVersionSample {
143    pub table: String,
144    pub rid: String,
145    pub insert_txn: u64,
146    pub delete_txn: u64,
147    pub visible: bool,
148}
149
150#[derive(Debug, Clone, Serialize)]
151pub struct MvccVersionsDebug {
152    pub samples: Vec<MvccVersionSample>,
153    pub note: String,
154}
155
156impl DebugPlanNode {
157    pub fn from_physical(plan: &PhysicalPlan) -> Self {
158        Self {
159            op: plan.display_name(),
160            children: plan
161                .inputs()
162                .iter()
163                .map(|child| Self::from_physical(child))
164                .collect(),
165        }
166    }
167
168    pub fn from_logical(plan: &LogicalPlan) -> Self {
169        Self {
170            op: plan.to_string(),
171            children: plan
172                .inputs()
173                .iter()
174                .map(|child| Self::from_logical(child))
175                .collect(),
176        }
177    }
178}
179impl Database {
180    pub fn new_on_disk(db_path: &str) -> QuillSQLResult<Self> {
181        Self::new_on_disk_with_options(db_path, DatabaseOptions::default())
182    }
183
184    pub fn new_on_disk_with_options(
185        db_path: &str,
186        options: DatabaseOptions,
187    ) -> QuillSQLResult<Self> {
188        Self::new_with_location(DatabaseLocation::OnDisk(db_path.to_string()), options)
189    }
190
191    pub fn new_temp() -> QuillSQLResult<Self> {
192        Self::new_temp_with_options(DatabaseOptions::default())
193    }
194
195    pub fn new_temp_with_options(options: DatabaseOptions) -> QuillSQLResult<Self> {
196        Self::new_with_location(DatabaseLocation::Temporary, options)
197    }
198
199    fn new_with_location(
200        location: DatabaseLocation,
201        options: DatabaseOptions,
202    ) -> QuillSQLResult<Self> {
203        let (disk_manager, wal_config, temp_dir) = bootstrap_storage(location, &options.wal)?;
204
205        let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
206        let buffer_pool = Arc::new(BufferManager::new(BUFFER_POOL_SIZE, disk_scheduler.clone()));
207
208        let synchronous_commit = wal_config.synchronous_commit;
209        let (control_file, wal_init) =
210            ControlFileManager::load_or_init(&wal_config.directory, wal_config.segment_size)?;
211        let control_file = Arc::new(control_file);
212        let wal_manager = Arc::new(WalManager::new_with_scheduler(
213            wal_config.clone(),
214            Some(wal_init),
215            Some(control_file.clone()),
216            disk_scheduler.clone(),
217        )?);
218        let transaction_manager = Arc::new(TransactionManager::new(
219            wal_manager.clone(),
220            synchronous_commit,
221        ));
222
223        let worker_cfg = background_config(
224            &wal_config,
225            IndexVacuumConfig::default(),
226            MvccVacuumConfig::default(),
227        );
228        let mut background_workers = BackgroundWorkers::new();
229        if let Some(interval) = worker_cfg.wal_writer_interval {
230            if let Some(handle) = wal_manager.start_background_flush(interval)? {
231                background_workers.register(background::wal_writer_worker(handle, interval));
232            }
233        }
234        buffer_pool.set_wal_manager(wal_manager.clone());
235
236        let table_registry = Arc::new(TableRegistry::new());
237        let catalog = Catalog::new(
238            buffer_pool.clone(),
239            disk_manager.clone(),
240            table_registry.clone(),
241        );
242        let storage_engine: Arc<dyn StorageEngine> = Arc::new(DefaultStorageEngine::default());
243
244        let recovery_summary = RecoveryManager::new(wal_manager.clone(), disk_scheduler.clone())
245            .with_buffer_pool(buffer_pool.clone())
246            .replay()?;
247        if recovery_summary.redo_count > 0 {
248            debug!(
249                "Recovery replayed {} record(s) starting at LSN {}",
250                recovery_summary.redo_count, recovery_summary.start_lsn
251            );
252        }
253        if !recovery_summary.loser_transactions.is_empty() {
254            warn!(
255                "{} transaction(s) require undo after recovery: {:?}",
256                recovery_summary.loser_transactions.len(),
257                recovery_summary.loser_transactions
258            );
259        }
260
261        let wal_for_workers: Arc<dyn background::CheckpointWal> = wal_manager.clone();
262        let buffer_for_workers: Arc<dyn background::BufferMaintenance> = buffer_pool.clone();
263        let txn_for_workers: Arc<dyn background::TxnSnapshotOps> = transaction_manager.clone();
264
265        background_workers.register_opt(background::spawn_checkpoint_worker(
266            wal_for_workers.clone(),
267            buffer_for_workers.clone(),
268            txn_for_workers.clone(),
269            worker_cfg.checkpoint_interval,
270        ));
271
272        background_workers.register_opt(background::spawn_bg_writer(
273            buffer_for_workers.clone(),
274            worker_cfg.bg_writer_interval,
275        ));
276
277        let mvcc_interval = if worker_cfg.mvcc_vacuum.interval_ms == 0 {
278            None
279        } else {
280            Some(Duration::from_millis(worker_cfg.mvcc_vacuum.interval_ms))
281        };
282        background_workers.register_opt(background::spawn_mvcc_vacuum_worker(
283            txn_for_workers,
284            mvcc_interval,
285            worker_cfg.mvcc_vacuum.batch_limit,
286            table_registry.clone(),
287        ));
288
289        let mut db = Self {
290            _temp_dir: temp_dir,
291            buffer_pool,
292            catalog,
293            background_workers,
294            wal_manager,
295            transaction_manager,
296            default_isolation: options
297                .default_isolation_level
298                .unwrap_or(IsolationLevel::ReadUncommitted),
299            storage_engine,
300            _table_registry: table_registry,
301            debug_trace: Arc::new(Mutex::new(None)),
302        };
303        load_catalog_data(&mut db)?;
304        Ok(db)
305    }
306
307    pub fn run(&mut self, sql: &str) -> QuillSQLResult<Vec<Tuple>> {
308        let mut session = SessionContext::new(self.default_isolation);
309        self.run_with_session(&mut session, sql)
310    }
311
312    pub fn run_with_session(
313        &mut self,
314        session: &mut SessionContext,
315        sql: &str,
316    ) -> QuillSQLResult<Vec<Tuple>> {
317        let start = Instant::now();
318        let PreparedStatement {
319            optimized_logical_plan,
320            physical_plan,
321        } = self.plan_statement(sql)?;
322        let logical_plan_str = pretty_format_logical_plan(&optimized_logical_plan);
323        let physical_plan_str = pretty_format_physical_plan(&physical_plan);
324        let logical_tree = DebugPlanNode::from_logical(&optimized_logical_plan);
325        let physical_tree = DebugPlanNode::from_physical(&physical_plan);
326
327        if let Some(result) = self.execute_transaction_control(session, &optimized_logical_plan)? {
328            return Ok(result);
329        }
330
331        let result = self.execute_physical_plan(session, physical_plan)?;
332
333        let elapsed = start.elapsed().as_millis();
334        let rows = result.len();
335        if let Ok(mut guard) = self.debug_trace.lock() {
336            *guard = Some(DebugTrace {
337                logical_plan: logical_plan_str,
338                physical_plan: physical_plan_str,
339                logical_tree,
340                physical_tree,
341                rows,
342                duration_ms: elapsed,
343            });
344        }
345
346        Ok(result)
347    }
348
349    pub fn default_isolation(&self) -> IsolationLevel {
350        self.default_isolation
351    }
352
353    pub fn create_logical_plan(&mut self, sql: &str) -> QuillSQLResult<LogicalPlan> {
354        // sql -> ast
355        let stmts = crate::sql::parser::parse_sql(sql)?;
356        if stmts.len() != 1 {
357            return Err(QuillSQLError::NotSupport(
358                "only support one sql statement".to_string(),
359            ));
360        }
361        let stmt = &stmts[0];
362        let mut planner = LogicalPlanner {
363            context: PlannerContext {
364                catalog: &self.catalog,
365            },
366        };
367        // ast -> logical plan
368        planner.plan(stmt)
369    }
370
371    pub fn analyze_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<TableStatistics> {
372        self.catalog.analyze_table(table_ref)
373    }
374
375    pub fn flush(&self) -> QuillSQLResult<()> {
376        let target = self.wal_manager.max_assigned_lsn();
377        let _ = self.wal_manager.flush_until(target)?;
378        self.wal_manager.persist_control_file()?;
379        self.buffer_pool.flush_all_pages()
380    }
381
382    pub fn debug_last_trace(&self) -> Option<DebugTrace> {
383        self.debug_trace.lock().ok().and_then(|guard| guard.clone())
384    }
385
386    pub fn debug_wal_head(&self) -> WalHeadDebug {
387        self.wal_manager.debug_head()
388    }
389
390    pub fn debug_wal_segments(&self) -> QuillSQLResult<WalSegmentsDebug> {
391        Ok(WalSegmentsDebug {
392            segments: self.wal_manager.debug_segments()?,
393        })
394    }
395
396    pub fn debug_wal_peek(
397        &self,
398        limit: usize,
399    ) -> QuillSQLResult<Vec<crate::recovery::wal::WalPeekDebug>> {
400        self.wal_manager.debug_peek(limit)
401    }
402
403    pub fn debug_lock_snapshot(&self) -> LockDebugSnapshot {
404        self.transaction_manager.lock_manager_arc().debug_snapshot()
405    }
406
407    pub fn debug_buffer_stats(&self) -> BufferDebugStats {
408        let frames = self.buffer_pool.frame_meta_snapshot();
409        let free_frames = frames
410            .iter()
411            .filter(|meta| meta.page_id == INVALID_PAGE_ID)
412            .count();
413        let pinned_frames = frames.iter().filter(|meta| meta.pin_count > 0).count();
414        let dirty_frames = frames.iter().filter(|meta| meta.is_dirty).count();
415        let dirty_page_table = self.buffer_pool.dirty_page_table_snapshot().len();
416        BufferDebugStats {
417            capacity: frames.len(),
418            free_frames,
419            pinned_frames,
420            dirty_frames,
421            dirty_page_table,
422        }
423    }
424
425    pub fn debug_txn_snapshot(&self) -> TxnDebugSnapshot {
426        self.transaction_manager.debug_snapshot()
427    }
428
429    pub fn debug_mvcc_versions(&self) -> QuillSQLResult<MvccVersionsDebug> {
430        let mut samples = Vec::new();
431        let max_samples = 20usize;
432        let snapshot = self
433            .transaction_manager
434            .snapshot(self.transaction_manager.next_txn_id_hint());
435
436        for (table_ref, heap) in self._table_registry.iter_tables() {
437            let mut current = heap.get_first_rid()?;
438            while let Some(rid) = current {
439                if samples.len() >= max_samples {
440                    break;
441                }
442                let meta = heap.tuple_meta(rid)?;
443                let visible = snapshot.is_visible(&meta, 0 as CommandId, |tid| {
444                    self.transaction_manager.transaction_status(tid)
445                });
446                samples.push(MvccVersionSample {
447                    table: table_ref.to_string(),
448                    rid: rid.to_string(),
449                    insert_txn: meta.insert_txn_id,
450                    delete_txn: meta.delete_txn_id,
451                    visible,
452                });
453                current = heap.get_next_rid(rid)?;
454            }
455            if samples.len() >= max_samples {
456                break;
457            }
458        }
459
460        Ok(MvccVersionsDebug {
461            samples,
462            note: format!("sampled up to {} tuples", max_samples),
463        })
464    }
465
466    pub fn debug_last_plan(&self) -> Option<DebugPlanSnapshot> {
467        self.debug_trace
468            .lock()
469            .ok()
470            .and_then(|opt| opt.clone())
471            .map(|trace| DebugPlanSnapshot {
472                logical: trace.logical_tree,
473                physical: trace.physical_tree,
474            })
475    }
476
477    pub fn table_statistics(
478        &self,
479        table_ref: &TableReference,
480    ) -> Option<&crate::catalog::TableStatistics> {
481        self.catalog.table_statistics(table_ref)
482    }
483
484    pub fn transaction_manager(&self) -> Arc<TransactionManager> {
485        self.transaction_manager.clone()
486    }
487
488    fn plan_statement(&mut self, sql: &str) -> QuillSQLResult<PreparedStatement> {
489        let logical_plan = self.create_logical_plan(sql)?;
490        debug!(
491            "Logical Plan: \n{}",
492            pretty_format_logical_plan(&logical_plan)
493        );
494
495        let optimized_logical_plan = self.optimize_logical_plan(&logical_plan)?;
496        debug!(
497            "Optimized Logical Plan: \n{}",
498            pretty_format_logical_plan(&optimized_logical_plan)
499        );
500
501        let physical_plan = self.build_physical_plan(&optimized_logical_plan);
502        debug!(
503            "Physical Plan: \n{}",
504            pretty_format_physical_plan(&physical_plan)
505        );
506
507        Ok(PreparedStatement {
508            optimized_logical_plan,
509            physical_plan,
510        })
511    }
512
513    fn optimize_logical_plan(&self, logical_plan: &LogicalPlan) -> QuillSQLResult<LogicalPlan> {
514        LogicalOptimizer::new().optimize(logical_plan)
515    }
516
517    fn build_physical_plan(&self, logical_plan: &LogicalPlan) -> PhysicalPlan {
518        let physical_planner = PhysicalPlanner::new();
519        physical_planner.create_physical_plan(logical_plan.clone())
520    }
521
522    fn execute_transaction_control(
523        &self,
524        session: &mut SessionContext,
525        plan: &LogicalPlan,
526    ) -> QuillSQLResult<Option<Vec<Tuple>>> {
527        match plan {
528            LogicalPlan::BeginTransaction(modes) => {
529                if session.has_active_transaction() {
530                    return Err(QuillSQLError::Execution(
531                        "transaction already active".to_string(),
532                    ));
533                }
534                let txn = self.transaction_manager.begin(
535                    modes.unwrap_effective_isolation(session.default_isolation()),
536                    modes
537                        .access_mode
538                        .unwrap_or(TransactionAccessMode::ReadWrite),
539                )?;
540                session.set_active_transaction(txn)?;
541                Ok(Some(vec![]))
542            }
543            LogicalPlan::CommitTransaction => {
544                let txn_ref = session
545                    .active_txn_mut()
546                    .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
547                self.transaction_manager.commit(txn_ref)?;
548                session.clear_active_transaction();
549                Ok(Some(vec![]))
550            }
551            LogicalPlan::RollbackTransaction => {
552                let txn_ref = session
553                    .active_txn_mut()
554                    .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
555                self.transaction_manager.abort(txn_ref)?;
556                session.clear_active_transaction();
557                Ok(Some(vec![]))
558            }
559            LogicalPlan::SetTransaction { scope, modes } => {
560                match scope {
561                    TransactionScope::Session => session.apply_session_modes(modes),
562                    TransactionScope::Transaction => session.apply_transaction_modes(modes),
563                }
564                Ok(Some(vec![]))
565            }
566            _ => Ok(None),
567        }
568    }
569
570    fn execute_physical_plan(
571        &mut self,
572        session: &mut SessionContext,
573        physical_plan: PhysicalPlan,
574    ) -> QuillSQLResult<Vec<Tuple>> {
575        let needs_cleanup = !session.has_active_transaction();
576        let autocommit = session.autocommit();
577        let result = {
578            let txn = session.ensure_active_transaction(&self.transaction_manager)?;
579            let context = crate::execution::ExecutionContext::new(
580                &mut self.catalog,
581                txn,
582                self.transaction_manager.clone(),
583                self.storage_engine.clone(),
584            );
585            let mut engine = ExecutionEngine { context };
586            engine.execute(Arc::new(physical_plan))?
587        };
588
589        if autocommit && needs_cleanup {
590            if let Some(txn) = session.active_txn_mut() {
591                self.transaction_manager.commit(txn)?;
592            }
593            session.clear_active_transaction();
594        }
595
596        Ok(result)
597    }
598}
599
600impl Drop for Database {
601    fn drop(&mut self) {
602        self.background_workers.shutdown_all();
603    }
604}
605
606fn wal_config_for_path(db_path: &str, overrides: &WalOptions) -> WalConfig {
607    build_wal_config(wal_directory_from_path(db_path), overrides)
608}
609
610fn wal_directory_from_path(db_path: &str) -> PathBuf {
611    let mut base = PathBuf::from(db_path);
612    base.set_extension("wal");
613    if base.extension().is_none() {
614        PathBuf::from(format!("{}.wal", db_path))
615    } else {
616        base
617    }
618}
619
620fn wal_config_for_temp(temp_root: &Path, overrides: &WalOptions) -> WalConfig {
621    build_wal_config(temp_root.join("wal"), overrides)
622}
623
624fn build_wal_config(default_directory: PathBuf, overrides: &WalOptions) -> WalConfig {
625    let mut config = WalConfig {
626        directory: overrides.directory.clone().unwrap_or(default_directory),
627        ..WalConfig::default()
628    };
629    if let Some(size) = overrides.segment_size {
630        config.segment_size = size;
631    }
632    if let Some(sync) = overrides.sync_on_flush {
633        config.sync_on_flush = sync;
634    }
635    if let Some(flag) = overrides.persist_control_file_on_flush {
636        config.persist_control_file_on_flush = flag;
637    }
638    if let Some(interval) = overrides.writer_interval_ms {
639        config.writer_interval_ms = interval;
640    }
641    if let Some(capacity) = overrides.buffer_capacity {
642        config.buffer_capacity = capacity;
643    }
644    if let Some(bytes) = overrides.flush_coalesce_bytes {
645        config.flush_coalesce_bytes = bytes;
646    }
647    if let Some(sync_commit) = overrides.synchronous_commit {
648        config.synchronous_commit = sync_commit;
649    }
650    if let Some(interval) = overrides.checkpoint_interval_ms {
651        config.checkpoint_interval_ms = interval;
652    }
653    if let Some(retain) = overrides.retain_segments {
654        config.retain_segments = retain.max(1);
655    }
656    config
657}