quill_sql/
database.rs

1use log::{debug, warn};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7use tempfile::TempDir;
8
9use crate::buffer::BUFFER_POOL_SIZE;
10use crate::catalog::load_catalog_data;
11use crate::catalog::registry::global_index_registry;
12use crate::config::{IndexVacuumConfig, WalConfig};
13use crate::error::{QuillSQLError, QuillSQLResult};
14use crate::optimizer::LogicalOptimizer;
15use crate::plan::logical_plan::{LogicalPlan, TransactionScope};
16use crate::plan::PhysicalPlanner;
17use crate::recovery::wal_record::CheckpointPayload;
18use crate::recovery::{ControlFileManager, RecoveryManager, WalManager};
19use crate::session::SessionContext;
20use crate::utils::util::{pretty_format_logical_plan, pretty_format_physical_plan};
21use crate::{
22    buffer::BufferPoolManager,
23    catalog::Catalog,
24    execution::{ExecutionContext, ExecutionEngine},
25    plan::{LogicalPlanner, PlannerContext},
26    storage::disk_manager::DiskManager,
27    storage::disk_scheduler::DiskScheduler,
28    storage::tuple::Tuple,
29    transaction::{IsolationLevel, TransactionManager},
30};
31use sqlparser::ast::TransactionAccessMode;
32
33#[derive(Debug, Default, Clone)]
34pub struct WalOptions {
35    pub directory: Option<PathBuf>,
36    pub segment_size: Option<u64>,
37    pub sync_on_flush: Option<bool>,
38    pub writer_interval_ms: Option<Option<u64>>,
39    pub buffer_capacity: Option<usize>,
40    pub flush_coalesce_bytes: Option<usize>,
41    pub synchronous_commit: Option<bool>,
42    pub checkpoint_interval_ms: Option<Option<u64>>,
43    pub retain_segments: Option<usize>,
44}
45
46#[derive(Debug, Clone, Default)]
47pub struct DatabaseOptions {
48    pub wal: WalOptions,
49    pub default_isolation_level: Option<IsolationLevel>,
50}
51
52pub struct Database {
53    pub(crate) buffer_pool: Arc<BufferPoolManager>,
54    pub(crate) catalog: Catalog,
55    pub(crate) wal_manager: Arc<WalManager>,
56    pub(crate) transaction_manager: Arc<TransactionManager>,
57    default_isolation: IsolationLevel,
58    temp_dir: Option<TempDir>,
59    checkpoint_stop: Arc<AtomicBool>,
60    checkpoint_handle: Option<thread::JoinHandle<()>>,
61    bg_writer_stop: Arc<AtomicBool>,
62    bg_writer_handle: Option<thread::JoinHandle<()>>,
63}
64impl Database {
65    pub fn new_on_disk(db_path: &str) -> QuillSQLResult<Self> {
66        Self::new_on_disk_with_options(db_path, DatabaseOptions::default())
67    }
68
69    pub fn new_on_disk_with_options(
70        db_path: &str,
71        options: DatabaseOptions,
72    ) -> QuillSQLResult<Self> {
73        let disk_manager = Arc::new(DiskManager::try_new(db_path)?);
74        let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
75        let buffer_pool = Arc::new(BufferPoolManager::new(
76            BUFFER_POOL_SIZE,
77            disk_scheduler.clone(),
78        ));
79
80        let wal_config = wal_config_for_path(db_path, &options.wal);
81        let synchronous_commit = wal_config.synchronous_commit;
82        let (control_file, wal_init) =
83            ControlFileManager::load_or_init(&wal_config.directory, wal_config.segment_size)?;
84        let control_file = Arc::new(control_file);
85        let wal_manager = Arc::new(WalManager::new(
86            wal_config.clone(),
87            disk_scheduler.clone(),
88            Some(wal_init),
89            Some(control_file.clone()),
90        )?);
91        let transaction_manager = Arc::new(TransactionManager::new(
92            wal_manager.clone(),
93            synchronous_commit,
94        ));
95        if let Some(interval) = wal_config.writer_interval_ms {
96            wal_manager.start_background_flush(Duration::from_millis(interval))?;
97        }
98        buffer_pool.set_wal_manager(wal_manager.clone());
99
100        let catalog = Catalog::new(buffer_pool.clone(), disk_manager.clone());
101
102        let recovery_summary = RecoveryManager::new(wal_manager.clone(), disk_scheduler.clone())
103            .with_buffer_pool(buffer_pool.clone())
104            .replay()?;
105        if recovery_summary.redo_count > 0 {
106            debug!(
107                "Recovery replayed {} record(s) starting at LSN {}",
108                recovery_summary.redo_count, recovery_summary.start_lsn
109            );
110        }
111        if !recovery_summary.loser_transactions.is_empty() {
112            warn!(
113                "{} transaction(s) require undo after recovery: {:?}",
114                recovery_summary.loser_transactions.len(),
115                recovery_summary.loser_transactions
116            );
117        }
118
119        let (checkpoint_stop, checkpoint_handle) = spawn_checkpoint_worker(
120            wal_manager.clone(),
121            buffer_pool.clone(),
122            transaction_manager.clone(),
123            wal_config.checkpoint_interval_ms,
124        );
125
126        let (bg_writer_stop, bg_writer_handle) = spawn_bg_writer(
127            buffer_pool.clone(),
128            std::env::var("QUILL_BG_WRITER_INTERVAL_MS")
129                .ok()
130                .and_then(|v| v.parse::<u64>().ok()),
131            IndexVacuumConfig::default(),
132        );
133
134        let mut db = Self {
135            buffer_pool,
136            catalog,
137            wal_manager,
138            transaction_manager,
139            default_isolation: options
140                .default_isolation_level
141                .unwrap_or(IsolationLevel::ReadUncommitted),
142            temp_dir: None,
143            checkpoint_stop,
144            checkpoint_handle,
145            bg_writer_stop,
146            bg_writer_handle,
147        };
148        load_catalog_data(&mut db)?;
149        Ok(db)
150    }
151
152    pub fn new_temp() -> QuillSQLResult<Self> {
153        Self::new_temp_with_options(DatabaseOptions::default())
154    }
155
156    pub fn new_temp_with_options(options: DatabaseOptions) -> QuillSQLResult<Self> {
157        let temp_dir = TempDir::new()?;
158        let temp_path = temp_dir.path().join("test.db");
159        let disk_manager =
160            Arc::new(DiskManager::try_new(temp_path.to_str().ok_or(
161                QuillSQLError::Internal("Invalid temp path".to_string()),
162            )?)?);
163        let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
164        let buffer_pool = Arc::new(BufferPoolManager::new(
165            BUFFER_POOL_SIZE,
166            disk_scheduler.clone(),
167        ));
168
169        let wal_config = wal_config_for_temp(temp_dir.path(), &options.wal);
170        let synchronous_commit = wal_config.synchronous_commit;
171        let (control_file, wal_init) =
172            ControlFileManager::load_or_init(&wal_config.directory, wal_config.segment_size)?;
173        let control_file = Arc::new(control_file);
174        let wal_manager = Arc::new(WalManager::new(
175            wal_config.clone(),
176            disk_scheduler.clone(),
177            Some(wal_init),
178            Some(control_file.clone()),
179        )?);
180        let transaction_manager = Arc::new(TransactionManager::new(
181            wal_manager.clone(),
182            synchronous_commit,
183        ));
184        if let Some(interval) = wal_config.writer_interval_ms {
185            wal_manager.start_background_flush(Duration::from_millis(interval))?;
186        }
187        buffer_pool.set_wal_manager(wal_manager.clone());
188
189        let catalog = Catalog::new(buffer_pool.clone(), disk_manager.clone());
190
191        let recovery_summary = RecoveryManager::new(wal_manager.clone(), disk_scheduler.clone())
192            .with_buffer_pool(buffer_pool.clone())
193            .replay()?;
194        if recovery_summary.redo_count > 0 {
195            debug!(
196                "Recovery replayed {} record(s) starting at LSN {}",
197                recovery_summary.redo_count, recovery_summary.start_lsn
198            );
199        }
200        if !recovery_summary.loser_transactions.is_empty() {
201            warn!(
202                "{} transaction(s) require undo after recovery: {:?}",
203                recovery_summary.loser_transactions.len(),
204                recovery_summary.loser_transactions
205            );
206        }
207
208        let (checkpoint_stop, checkpoint_handle) = spawn_checkpoint_worker(
209            wal_manager.clone(),
210            buffer_pool.clone(),
211            transaction_manager.clone(),
212            wal_config.checkpoint_interval_ms,
213        );
214
215        let (bg_writer_stop, bg_writer_handle) = spawn_bg_writer(
216            buffer_pool.clone(),
217            std::env::var("QUILL_BG_WRITER_INTERVAL_MS")
218                .ok()
219                .and_then(|v| v.parse::<u64>().ok()),
220            IndexVacuumConfig::default(),
221        );
222
223        let mut db = Self {
224            buffer_pool,
225            catalog,
226            wal_manager,
227            transaction_manager,
228            default_isolation: options
229                .default_isolation_level
230                .unwrap_or(IsolationLevel::ReadUncommitted),
231            temp_dir: Some(temp_dir),
232            checkpoint_stop,
233            checkpoint_handle,
234            bg_writer_stop,
235            bg_writer_handle,
236        };
237        load_catalog_data(&mut db)?;
238        Ok(db)
239    }
240
241    pub fn run(&mut self, sql: &str) -> QuillSQLResult<Vec<Tuple>> {
242        let mut session = SessionContext::new(self.default_isolation);
243        self.run_with_session(&mut session, sql)
244    }
245
246    pub fn run_with_session(
247        &mut self,
248        session: &mut SessionContext,
249        sql: &str,
250    ) -> QuillSQLResult<Vec<Tuple>> {
251        let logical_plan = self.create_logical_plan(sql)?;
252        debug!(
253            "Logical Plan: \n{}",
254            pretty_format_logical_plan(&logical_plan)
255        );
256
257        let optimized_logical_plan = LogicalOptimizer::new().optimize(&logical_plan)?;
258        debug!(
259            "Optimized Logical Plan: \n{}",
260            pretty_format_logical_plan(&logical_plan)
261        );
262
263        let physical_planner = PhysicalPlanner {
264            catalog: &self.catalog,
265        };
266        let physical_plan = physical_planner.create_physical_plan(optimized_logical_plan.clone());
267        debug!(
268            "Physical Plan: \n{}",
269            pretty_format_physical_plan(&physical_plan)
270        );
271
272        let execution_result = match optimized_logical_plan.clone() {
273            LogicalPlan::BeginTransaction(modes) => {
274                if session.has_active_transaction() {
275                    return Err(QuillSQLError::Execution(
276                        "transaction already active".to_string(),
277                    ));
278                }
279                let txn = self.transaction_manager.begin(
280                    modes.unwrap_effective_isolation(session.default_isolation()),
281                    modes
282                        .access_mode
283                        .unwrap_or(TransactionAccessMode::ReadWrite),
284                )?;
285                session.set_active_transaction(txn)?;
286                Ok(vec![])
287            }
288            LogicalPlan::CommitTransaction => {
289                let txn_ref = session
290                    .active_txn_mut()
291                    .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
292                self.transaction_manager.commit(txn_ref)?;
293                session.take_active_transaction();
294                Ok(vec![])
295            }
296            LogicalPlan::RollbackTransaction => {
297                let txn_ref = session
298                    .active_txn_mut()
299                    .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
300                self.transaction_manager.abort(txn_ref)?;
301                session.take_active_transaction();
302                Ok(vec![])
303            }
304            LogicalPlan::SetTransaction { scope, modes } => {
305                match scope {
306                    TransactionScope::Session => {
307                        session.apply_session_modes(&modes);
308                    }
309                    TransactionScope::Transaction => {
310                        session.apply_transaction_modes(&modes);
311                    }
312                }
313                Ok(vec![])
314            }
315            plan => {
316                let txn_existed = session.has_active_transaction();
317                if !txn_existed {
318                    let new_txn = self.transaction_manager.begin(
319                        session.default_isolation(),
320                        TransactionAccessMode::ReadWrite,
321                    )?;
322                    session.set_active_transaction(new_txn)?;
323                }
324
325                let execution_result = {
326                    let txn_ref = session
327                        .active_txn_mut()
328                        .expect("session must hold an active transaction");
329                    let execution_ctx = ExecutionContext::new(
330                        &mut self.catalog,
331                        txn_ref,
332                        &self.transaction_manager,
333                    );
334                    let mut execution_engine = ExecutionEngine {
335                        context: execution_ctx,
336                    };
337                    execution_engine.execute(Arc::new(physical_plan))
338                };
339
340                match execution_result {
341                    Ok(tuples) => {
342                        if !txn_existed && session.autocommit() {
343                            {
344                                let txn_ref = session
345                                    .active_txn_mut()
346                                    .expect("transaction should still be active");
347                                self.transaction_manager.commit(txn_ref)?;
348                            }
349                            session.take_active_transaction();
350                        }
351                        Ok(tuples)
352                    }
353                    Err(e) => {
354                        if txn_existed {
355                            if let Some(txn_ref) = session.active_txn_mut() {
356                                txn_ref.mark_tainted();
357                            }
358                        } else {
359                            if let Some(txn_ref) = session.active_txn_mut() {
360                                let _ = self.transaction_manager.abort(txn_ref);
361                            }
362                            session.take_active_transaction();
363                        }
364                        Err(e)
365                    }
366                }
367            }
368        };
369
370        execution_result
371    }
372
373    pub fn default_isolation(&self) -> IsolationLevel {
374        self.default_isolation
375    }
376
377    pub fn create_logical_plan(&mut self, sql: &str) -> QuillSQLResult<LogicalPlan> {
378        // sql -> ast
379        let stmts = crate::sql::parser::parse_sql(sql)?;
380        if stmts.len() != 1 {
381            return Err(QuillSQLError::NotSupport(
382                "only support one sql statement".to_string(),
383            ));
384        }
385        let stmt = &stmts[0];
386        let mut planner = LogicalPlanner {
387            context: PlannerContext {
388                catalog: &self.catalog,
389            },
390        };
391        // ast -> logical plan
392        planner.plan(stmt)
393    }
394
395    pub fn flush(&self) -> QuillSQLResult<()> {
396        let _ = self.wal_manager.flush(None)?;
397        self.buffer_pool.flush_all_pages()
398    }
399
400    pub fn transaction_manager(&self) -> Arc<TransactionManager> {
401        self.transaction_manager.clone()
402    }
403}
404
405impl Drop for Database {
406    fn drop(&mut self) {
407        self.checkpoint_stop.store(true, Ordering::Relaxed);
408        if let Some(handle) = self.checkpoint_handle.take() {
409            if let Err(join_err) = handle.join() {
410                warn!("Checkpoint worker terminated with panic: {:?}", join_err);
411            }
412        }
413        self.bg_writer_stop.store(true, Ordering::Relaxed);
414        if let Some(handle) = self.bg_writer_handle.take() {
415            if let Err(join_err) = handle.join() {
416                warn!("BG writer terminated with panic: {:?}", join_err);
417            }
418        }
419    }
420}
421
422fn wal_config_for_path(db_path: &str, overrides: &WalOptions) -> WalConfig {
423    let mut config = WalConfig::default();
424    config.directory = overrides
425        .directory
426        .clone()
427        .unwrap_or_else(|| wal_directory_from_path(db_path));
428    if let Some(size) = overrides.segment_size {
429        config.segment_size = size;
430    }
431    if let Some(sync) = overrides.sync_on_flush {
432        config.sync_on_flush = sync;
433    }
434    if let Some(interval) = overrides.writer_interval_ms.clone() {
435        config.writer_interval_ms = interval;
436    }
437    if let Some(capacity) = overrides.buffer_capacity {
438        config.buffer_capacity = capacity;
439    }
440    if let Some(bytes) = overrides.flush_coalesce_bytes {
441        config.flush_coalesce_bytes = bytes;
442    }
443    if let Some(sync_commit) = overrides.synchronous_commit {
444        config.synchronous_commit = sync_commit;
445    }
446    if let Some(interval) = overrides.checkpoint_interval_ms.clone() {
447        config.checkpoint_interval_ms = interval;
448    }
449    if let Some(retain) = overrides.retain_segments {
450        config.retain_segments = retain.max(1);
451    }
452    config
453}
454
455fn wal_directory_from_path(db_path: &str) -> PathBuf {
456    let mut base = PathBuf::from(db_path);
457    base.set_extension("wal");
458    if base.extension().is_none() {
459        PathBuf::from(format!("{}.wal", db_path))
460    } else {
461        base
462    }
463}
464
465fn wal_config_for_temp(temp_root: &Path, overrides: &WalOptions) -> WalConfig {
466    let mut config = WalConfig::default();
467    config.directory = overrides
468        .directory
469        .clone()
470        .unwrap_or_else(|| temp_root.join("wal"));
471    if let Some(size) = overrides.segment_size {
472        config.segment_size = size;
473    }
474    if let Some(sync) = overrides.sync_on_flush {
475        config.sync_on_flush = sync;
476    }
477    if let Some(interval) = overrides.writer_interval_ms.clone() {
478        config.writer_interval_ms = interval;
479    }
480    if let Some(capacity) = overrides.buffer_capacity {
481        config.buffer_capacity = capacity;
482    }
483    if let Some(bytes) = overrides.flush_coalesce_bytes {
484        config.flush_coalesce_bytes = bytes;
485    }
486    if let Some(sync_commit) = overrides.synchronous_commit {
487        config.synchronous_commit = sync_commit;
488    }
489    if let Some(interval) = overrides.checkpoint_interval_ms.clone() {
490        config.checkpoint_interval_ms = interval;
491    }
492    if let Some(retain) = overrides.retain_segments {
493        config.retain_segments = retain.max(1);
494    }
495    config
496}
497
498fn spawn_checkpoint_worker(
499    wal_manager: Arc<WalManager>,
500    buffer_pool: Arc<BufferPoolManager>,
501    transaction_manager: Arc<TransactionManager>,
502    interval_ms: Option<u64>,
503) -> (Arc<AtomicBool>, Option<thread::JoinHandle<()>>) {
504    let stop = Arc::new(AtomicBool::new(false));
505    let Some(ms) = interval_ms else {
506        return (stop, None);
507    };
508    if ms == 0 {
509        return (stop, None);
510    }
511    let stop_flag = stop.clone();
512    let wal = wal_manager.clone();
513    let bp = buffer_pool.clone();
514    let txn_mgr = transaction_manager.clone();
515    let interval = Duration::from_millis(ms);
516    let handle = thread::Builder::new()
517        .name("checkpoint-worker".into())
518        .spawn(move || {
519            while !stop_flag.load(Ordering::Relaxed) {
520                let dirty_pages = bp.dirty_page_ids();
521                let dpt_snapshot = bp.dirty_page_table_snapshot();
522                let active_txns = txn_mgr.active_transactions();
523                let last_lsn = wal.max_assigned_lsn();
524
525                if last_lsn != 0 {
526                    if let Err(e) = wal.flush_until(last_lsn) {
527                        warn!("Checkpoint flush failed: {}", e);
528                    }
529                    let payload = CheckpointPayload {
530                        last_lsn,
531                        dirty_pages,
532                        active_transactions: active_txns,
533                        dpt: dpt_snapshot,
534                    };
535                    if let Err(e) = wal.log_checkpoint(payload) {
536                        warn!("Checkpoint write failed: {}", e);
537                    }
538                }
539
540                if stop_flag.load(Ordering::Relaxed) {
541                    break;
542                }
543                thread::sleep(interval);
544            }
545        })
546        .map(Some)
547        .unwrap_or_else(|err| {
548            warn!("Failed to spawn checkpoint worker: {}", err);
549            None
550        });
551
552    (stop, handle)
553}
554
555fn spawn_bg_writer(
556    buffer_pool: Arc<BufferPoolManager>,
557    interval_ms: Option<u64>,
558    vacuum_cfg: IndexVacuumConfig,
559) -> (Arc<AtomicBool>, Option<thread::JoinHandle<()>>) {
560    let stop = Arc::new(AtomicBool::new(false));
561    let Some(ms) = interval_ms else {
562        return (stop, None);
563    };
564    if ms == 0 {
565        return (stop, None);
566    }
567    let stop_flag = stop.clone();
568    let bp = buffer_pool.clone();
569    let interval = Duration::from_millis(ms);
570    let handle = thread::Builder::new()
571        .name("bg-writer".into())
572        .spawn(move || {
573            while !stop_flag.load(Ordering::Relaxed) {
574                // Flush a small batch of dirty pages per cycle
575                let dirty_ids = bp.dirty_page_ids();
576                for page_id in dirty_ids.into_iter().take(16) {
577                    let _ = bp.flush_page(page_id);
578                }
579
580                // Opportunistic index lazy cleanup based on pending_garbage counters.
581                // Global registry is read-only and cheap to iterate.
582                let registry = global_index_registry();
583                for (idx, heap) in registry.iter().take(16) {
584                    let pending = idx.take_pending_garbage();
585                    if pending >= vacuum_cfg.trigger_threshold {
586                        // conservative predicate using heap meta only (no txn watermarks here):
587                        // delete-marked tuples are always globally invisible for readers.
588                        let _ = idx.lazy_cleanup_with(
589                            |rid| heap.tuple_meta(*rid).map(|m| m.is_deleted).unwrap_or(false),
590                            Some(vacuum_cfg.batch_limit),
591                        );
592                    }
593                }
594
595                if stop_flag.load(Ordering::Relaxed) {
596                    break;
597                }
598                thread::sleep(interval);
599            }
600        })
601        .map(Some)
602        .unwrap_or_else(|err| {
603            warn!("Failed to spawn bg writer: {}", err);
604            None
605        });
606    (stop, handle)
607}