quill_sql/
database.rs

1use crate::background::{BackgroundWorkers, WorkerHandle, WorkerKind, WorkerMetadata};
2use log::{debug, warn};
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8use tempfile::TempDir;
9
10use crate::buffer::{BufferManager, BUFFER_POOL_SIZE};
11use crate::catalog::load_catalog_data;
12use crate::catalog::registry::global_index_registry;
13use crate::config::{background_config, IndexVacuumConfig, WalConfig};
14use crate::error::{QuillSQLError, QuillSQLResult};
15use crate::optimizer::LogicalOptimizer;
16use crate::plan::logical_plan::{LogicalPlan, TransactionScope};
17use crate::plan::PhysicalPlanner;
18use crate::recovery::wal::codec::CheckpointPayload;
19use crate::recovery::{ControlFileManager, RecoveryManager, WalManager, WalWriterHandle};
20use crate::session::SessionContext;
21use crate::utils::util::{pretty_format_logical_plan, pretty_format_physical_plan};
22use crate::{
23    catalog::Catalog,
24    execution::ExecutionEngine,
25    plan::{LogicalPlanner, PlannerContext},
26    storage::disk_manager::DiskManager,
27    storage::disk_scheduler::DiskScheduler,
28    storage::tuple::Tuple,
29    transaction::{IsolationLevel, TransactionManager},
30};
31use sqlparser::ast::TransactionAccessMode;
32
33#[derive(Debug, Default, Clone)]
34pub struct WalOptions {
35    pub directory: Option<PathBuf>,
36    pub segment_size: Option<u64>,
37    pub sync_on_flush: Option<bool>,
38    pub writer_interval_ms: Option<Option<u64>>,
39    pub buffer_capacity: Option<usize>,
40    pub flush_coalesce_bytes: Option<usize>,
41    pub synchronous_commit: Option<bool>,
42    pub checkpoint_interval_ms: Option<Option<u64>>,
43    pub retain_segments: Option<usize>,
44}
45
46#[derive(Debug, Clone, Default)]
47pub struct DatabaseOptions {
48    pub wal: WalOptions,
49    pub default_isolation_level: Option<IsolationLevel>,
50}
51
52pub struct Database {
53    pub(crate) buffer_pool: Arc<BufferManager>,
54    pub(crate) catalog: Catalog,
55    background_workers: BackgroundWorkers,
56    pub(crate) wal_manager: Arc<WalManager>,
57    pub(crate) transaction_manager: Arc<TransactionManager>,
58    default_isolation: IsolationLevel,
59    temp_dir: Option<TempDir>,
60}
61impl Database {
62    pub fn new_on_disk(db_path: &str) -> QuillSQLResult<Self> {
63        Self::new_on_disk_with_options(db_path, DatabaseOptions::default())
64    }
65
66    pub fn new_on_disk_with_options(
67        db_path: &str,
68        options: DatabaseOptions,
69    ) -> QuillSQLResult<Self> {
70        let disk_manager = Arc::new(DiskManager::try_new(db_path)?);
71        let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
72        let buffer_pool = Arc::new(BufferManager::new(BUFFER_POOL_SIZE, disk_scheduler.clone()));
73
74        let wal_config = wal_config_for_path(db_path, &options.wal);
75        let synchronous_commit = wal_config.synchronous_commit;
76        let (control_file, wal_init) =
77            ControlFileManager::load_or_init(&wal_config.directory, wal_config.segment_size)?;
78        let control_file = Arc::new(control_file);
79        let wal_manager = Arc::new(WalManager::new_with_scheduler(
80            wal_config.clone(),
81            Some(wal_init),
82            Some(control_file.clone()),
83            disk_scheduler.clone(),
84        )?);
85        let transaction_manager = Arc::new(TransactionManager::new(
86            wal_manager.clone(),
87            synchronous_commit,
88        ));
89
90        let worker_cfg = background_config(&wal_config, IndexVacuumConfig::default());
91        let mut background_workers = BackgroundWorkers::new();
92        if let Some(interval) = worker_cfg.wal_writer_interval {
93            if let Some(handle) = wal_manager.start_background_flush(interval)? {
94                background_workers.register(wal_writer_worker(handle, interval));
95            }
96        }
97        buffer_pool.set_wal_manager(wal_manager.clone());
98
99        let catalog = Catalog::new(buffer_pool.clone(), disk_manager.clone());
100
101        let recovery_summary = RecoveryManager::new(wal_manager.clone(), disk_scheduler.clone())
102            .with_buffer_pool(buffer_pool.clone())
103            .replay()?;
104        if recovery_summary.redo_count > 0 {
105            debug!(
106                "Recovery replayed {} record(s) starting at LSN {}",
107                recovery_summary.redo_count, recovery_summary.start_lsn
108            );
109        }
110        if !recovery_summary.loser_transactions.is_empty() {
111            warn!(
112                "{} transaction(s) require undo after recovery: {:?}",
113                recovery_summary.loser_transactions.len(),
114                recovery_summary.loser_transactions
115            );
116        }
117
118        background_workers.register_opt(spawn_checkpoint_worker(
119            wal_manager.clone(),
120            buffer_pool.clone(),
121            transaction_manager.clone(),
122            worker_cfg.checkpoint_interval,
123        ));
124
125        background_workers.register_opt(spawn_bg_writer(
126            buffer_pool.clone(),
127            worker_cfg.bg_writer_interval,
128            worker_cfg.vacuum,
129        ));
130
131        let mut db = Self {
132            buffer_pool,
133            catalog,
134            background_workers,
135            wal_manager,
136            transaction_manager,
137            default_isolation: options
138                .default_isolation_level
139                .unwrap_or(IsolationLevel::ReadUncommitted),
140            temp_dir: None,
141        };
142        load_catalog_data(&mut db)?;
143        Ok(db)
144    }
145
146    pub fn new_temp() -> QuillSQLResult<Self> {
147        Self::new_temp_with_options(DatabaseOptions::default())
148    }
149
150    pub fn new_temp_with_options(options: DatabaseOptions) -> QuillSQLResult<Self> {
151        let temp_dir = TempDir::new()?;
152        let temp_path = temp_dir.path().join("test.db");
153        let disk_manager =
154            Arc::new(DiskManager::try_new(temp_path.to_str().ok_or(
155                QuillSQLError::Internal("Invalid temp path".to_string()),
156            )?)?);
157        let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
158        let buffer_pool = Arc::new(BufferManager::new(BUFFER_POOL_SIZE, disk_scheduler.clone()));
159
160        let wal_config = wal_config_for_temp(temp_dir.path(), &options.wal);
161        let synchronous_commit = wal_config.synchronous_commit;
162        let (control_file, wal_init) =
163            ControlFileManager::load_or_init(&wal_config.directory, wal_config.segment_size)?;
164        let control_file = Arc::new(control_file);
165        let wal_manager = Arc::new(WalManager::new_with_scheduler(
166            wal_config.clone(),
167            Some(wal_init),
168            Some(control_file.clone()),
169            disk_scheduler.clone(),
170        )?);
171        let transaction_manager = Arc::new(TransactionManager::new(
172            wal_manager.clone(),
173            synchronous_commit,
174        ));
175
176        let worker_cfg = background_config(&wal_config, IndexVacuumConfig::default());
177        let mut background_workers = BackgroundWorkers::new();
178        if let Some(interval) = worker_cfg.wal_writer_interval {
179            if let Some(handle) = wal_manager.start_background_flush(interval)? {
180                background_workers.register(wal_writer_worker(handle, interval));
181            }
182        }
183        buffer_pool.set_wal_manager(wal_manager.clone());
184
185        let catalog = Catalog::new(buffer_pool.clone(), disk_manager.clone());
186
187        let recovery_summary = RecoveryManager::new(wal_manager.clone(), disk_scheduler.clone())
188            .with_buffer_pool(buffer_pool.clone())
189            .replay()?;
190        if recovery_summary.redo_count > 0 {
191            debug!(
192                "Recovery replayed {} record(s) starting at LSN {}",
193                recovery_summary.redo_count, recovery_summary.start_lsn
194            );
195        }
196        if !recovery_summary.loser_transactions.is_empty() {
197            warn!(
198                "{} transaction(s) require undo after recovery: {:?}",
199                recovery_summary.loser_transactions.len(),
200                recovery_summary.loser_transactions
201            );
202        }
203
204        background_workers.register_opt(spawn_checkpoint_worker(
205            wal_manager.clone(),
206            buffer_pool.clone(),
207            transaction_manager.clone(),
208            worker_cfg.checkpoint_interval,
209        ));
210
211        background_workers.register_opt(spawn_bg_writer(
212            buffer_pool.clone(),
213            worker_cfg.bg_writer_interval,
214            worker_cfg.vacuum,
215        ));
216
217        let mut db = Self {
218            buffer_pool,
219            catalog,
220            background_workers,
221            wal_manager,
222            transaction_manager,
223            default_isolation: options
224                .default_isolation_level
225                .unwrap_or(IsolationLevel::ReadUncommitted),
226            temp_dir: Some(temp_dir),
227        };
228        load_catalog_data(&mut db)?;
229        Ok(db)
230    }
231
232    pub fn run(&mut self, sql: &str) -> QuillSQLResult<Vec<Tuple>> {
233        let mut session = SessionContext::new(self.default_isolation);
234        self.run_with_session(&mut session, sql)
235    }
236
237    pub fn run_with_session(
238        &mut self,
239        session: &mut SessionContext,
240        sql: &str,
241    ) -> QuillSQLResult<Vec<Tuple>> {
242        let logical_plan = self.create_logical_plan(sql)?;
243        debug!(
244            "Logical Plan: \n{}",
245            pretty_format_logical_plan(&logical_plan)
246        );
247
248        let optimized_logical_plan = LogicalOptimizer::new().optimize(&logical_plan)?;
249        debug!(
250            "Optimized Logical Plan: \n{}",
251            pretty_format_logical_plan(&logical_plan)
252        );
253
254        let physical_planner = PhysicalPlanner {
255            catalog: &self.catalog,
256        };
257        let physical_plan = physical_planner.create_physical_plan(optimized_logical_plan.clone());
258        debug!(
259            "Physical Plan: \n{}",
260            pretty_format_physical_plan(&physical_plan)
261        );
262
263        match optimized_logical_plan {
264            LogicalPlan::BeginTransaction(ref modes) => {
265                if session.has_active_transaction() {
266                    return Err(QuillSQLError::Execution(
267                        "transaction already active".to_string(),
268                    ));
269                }
270                let txn = self.transaction_manager.begin(
271                    modes.unwrap_effective_isolation(session.default_isolation()),
272                    modes
273                        .access_mode
274                        .unwrap_or(TransactionAccessMode::ReadWrite),
275                )?;
276                session.set_active_transaction(txn)?;
277                Ok(vec![])
278            }
279            LogicalPlan::CommitTransaction => {
280                let txn_ref = session
281                    .active_txn_mut()
282                    .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
283                self.transaction_manager.commit(txn_ref)?;
284                session.clear_active_transaction();
285                Ok(vec![])
286            }
287            LogicalPlan::RollbackTransaction => {
288                let txn_ref = session
289                    .active_txn_mut()
290                    .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
291                self.transaction_manager.abort(txn_ref)?;
292                session.clear_active_transaction();
293                Ok(vec![])
294            }
295            LogicalPlan::SetTransaction {
296                ref scope,
297                ref modes,
298            } => {
299                match scope {
300                    TransactionScope::Session => session.apply_session_modes(modes),
301                    TransactionScope::Transaction => session.apply_transaction_modes(modes),
302                }
303                Ok(vec![])
304            }
305            _ => {
306                let needs_cleanup = !session.has_active_transaction();
307                let autocommit = session.autocommit();
308
309                let result = {
310                    let txn = session.ensure_active_transaction(&self.transaction_manager)?;
311                    let context = crate::execution::ExecutionContext::new(
312                        &mut self.catalog,
313                        txn,
314                        &self.transaction_manager,
315                    );
316                    let mut engine = ExecutionEngine { context };
317                    engine.execute(Arc::new(physical_plan))?
318                };
319
320                if autocommit && needs_cleanup {
321                    if let Some(txn) = session.active_txn_mut() {
322                        self.transaction_manager.commit(txn)?;
323                    }
324                    session.clear_active_transaction();
325                }
326
327                Ok(result)
328            }
329        }
330    }
331
332    pub fn default_isolation(&self) -> IsolationLevel {
333        self.default_isolation
334    }
335
336    pub fn create_logical_plan(&mut self, sql: &str) -> QuillSQLResult<LogicalPlan> {
337        // sql -> ast
338        let stmts = crate::sql::parser::parse_sql(sql)?;
339        if stmts.len() != 1 {
340            return Err(QuillSQLError::NotSupport(
341                "only support one sql statement".to_string(),
342            ));
343        }
344        let stmt = &stmts[0];
345        let mut planner = LogicalPlanner {
346            context: PlannerContext {
347                catalog: &self.catalog,
348            },
349        };
350        // ast -> logical plan
351        planner.plan(stmt)
352    }
353
354    pub fn flush(&self) -> QuillSQLResult<()> {
355        let _ = self.wal_manager.flush(None)?;
356        self.buffer_pool.flush_all_pages()
357    }
358
359    pub fn transaction_manager(&self) -> Arc<TransactionManager> {
360        self.transaction_manager.clone()
361    }
362}
363
364impl Drop for Database {
365    fn drop(&mut self) {
366        self.background_workers.shutdown_all();
367    }
368}
369
370fn wal_config_for_path(db_path: &str, overrides: &WalOptions) -> WalConfig {
371    let mut config = WalConfig::default();
372    config.directory = overrides
373        .directory
374        .clone()
375        .unwrap_or_else(|| wal_directory_from_path(db_path));
376    if let Some(size) = overrides.segment_size {
377        config.segment_size = size;
378    }
379    if let Some(sync) = overrides.sync_on_flush {
380        config.sync_on_flush = sync;
381    }
382    if let Some(interval) = overrides.writer_interval_ms.clone() {
383        config.writer_interval_ms = interval;
384    }
385    if let Some(capacity) = overrides.buffer_capacity {
386        config.buffer_capacity = capacity;
387    }
388    if let Some(bytes) = overrides.flush_coalesce_bytes {
389        config.flush_coalesce_bytes = bytes;
390    }
391    if let Some(sync_commit) = overrides.synchronous_commit {
392        config.synchronous_commit = sync_commit;
393    }
394    if let Some(interval) = overrides.checkpoint_interval_ms.clone() {
395        config.checkpoint_interval_ms = interval;
396    }
397    if let Some(retain) = overrides.retain_segments {
398        config.retain_segments = retain.max(1);
399    }
400    config
401}
402
403fn wal_directory_from_path(db_path: &str) -> PathBuf {
404    let mut base = PathBuf::from(db_path);
405    base.set_extension("wal");
406    if base.extension().is_none() {
407        PathBuf::from(format!("{}.wal", db_path))
408    } else {
409        base
410    }
411}
412
413fn wal_config_for_temp(temp_root: &Path, overrides: &WalOptions) -> WalConfig {
414    let mut config = WalConfig::default();
415    config.directory = overrides
416        .directory
417        .clone()
418        .unwrap_or_else(|| temp_root.join("wal"));
419    if let Some(size) = overrides.segment_size {
420        config.segment_size = size;
421    }
422    if let Some(sync) = overrides.sync_on_flush {
423        config.sync_on_flush = sync;
424    }
425    if let Some(interval) = overrides.writer_interval_ms.clone() {
426        config.writer_interval_ms = interval;
427    }
428    if let Some(capacity) = overrides.buffer_capacity {
429        config.buffer_capacity = capacity;
430    }
431    if let Some(bytes) = overrides.flush_coalesce_bytes {
432        config.flush_coalesce_bytes = bytes;
433    }
434    if let Some(sync_commit) = overrides.synchronous_commit {
435        config.synchronous_commit = sync_commit;
436    }
437    if let Some(interval) = overrides.checkpoint_interval_ms.clone() {
438        config.checkpoint_interval_ms = interval;
439    }
440    if let Some(retain) = overrides.retain_segments {
441        config.retain_segments = retain.max(1);
442    }
443    config
444}
445
446fn wal_writer_worker(handle: WalWriterHandle, interval: Duration) -> WorkerHandle {
447    WorkerHandle::new(
448        WorkerMetadata {
449            kind: WorkerKind::WalWriter,
450            interval: Some(interval),
451        },
452        move || {
453            if let Err(err) = handle.stop() {
454                warn!("Failed to stop WAL writer: {}", err);
455            }
456        },
457        None,
458    )
459}
460
461fn spawn_checkpoint_worker(
462    wal_manager: Arc<WalManager>,
463    buffer_pool: Arc<BufferManager>,
464    transaction_manager: Arc<TransactionManager>,
465    interval: Option<Duration>,
466) -> Option<WorkerHandle> {
467    let Some(interval) = interval else {
468        return None;
469    };
470    if interval.is_zero() {
471        return None;
472    }
473    let wal = wal_manager.clone();
474    let bp = buffer_pool.clone();
475    let txn_mgr = transaction_manager.clone();
476
477    spawn_periodic_worker(
478        "checkpoint-worker",
479        WorkerKind::Checkpoint,
480        interval,
481        move || {
482            let dirty_pages = bp.dirty_page_ids();
483            let dpt_snapshot = bp.dirty_page_table_snapshot();
484            let active_txns = txn_mgr.active_transactions();
485            let last_lsn = wal.max_assigned_lsn();
486
487            if last_lsn != 0 {
488                if let Err(e) = wal.flush_until(last_lsn) {
489                    warn!("Checkpoint flush failed: {}", e);
490                }
491                let payload = CheckpointPayload {
492                    last_lsn,
493                    dirty_pages,
494                    active_transactions: active_txns,
495                    dpt: dpt_snapshot,
496                };
497                if let Err(e) = wal.log_checkpoint(payload) {
498                    warn!("Checkpoint write failed: {}", e);
499                }
500            }
501        },
502    )
503}
504
505fn spawn_bg_writer(
506    buffer_pool: Arc<BufferManager>,
507    interval: Option<Duration>,
508    vacuum_cfg: IndexVacuumConfig,
509) -> Option<WorkerHandle> {
510    let Some(interval) = interval else {
511        return None;
512    };
513    if interval.is_zero() {
514        return None;
515    }
516    let bp = buffer_pool.clone();
517    spawn_periodic_worker(
518        "bg-writer",
519        WorkerKind::BufferPoolWriter,
520        interval,
521        move || {
522            let dirty_ids = bp.dirty_page_ids();
523            for page_id in dirty_ids.into_iter().take(16) {
524                let _ = bp.flush_page(page_id);
525            }
526
527            let registry = global_index_registry();
528            for (idx, heap) in registry.iter().take(16) {
529                let pending = idx.take_pending_garbage();
530                if pending >= vacuum_cfg.trigger_threshold {
531                    let _ = idx.lazy_cleanup_with(
532                        |rid| heap.tuple_meta(*rid).map(|m| m.is_deleted).unwrap_or(false),
533                        Some(vacuum_cfg.batch_limit),
534                    );
535                }
536            }
537        },
538    )
539}
540
541fn spawn_periodic_worker<F>(
542    name: &str,
543    kind: WorkerKind,
544    interval: Duration,
545    mut tick: F,
546) -> Option<WorkerHandle>
547where
548    F: FnMut() + Send + 'static,
549{
550    let stop_flag = Arc::new(AtomicBool::new(false));
551    let thread_flag = Arc::clone(&stop_flag);
552
553    match thread::Builder::new().name(name.into()).spawn(move || {
554        while !thread_flag.load(Ordering::Relaxed) {
555            tick();
556            if thread_flag.load(Ordering::Relaxed) {
557                break;
558            }
559            thread::sleep(interval);
560        }
561    }) {
562        Ok(join_handle) => {
563            let stop_handle = Arc::clone(&stop_flag);
564            Some(WorkerHandle::new(
565                WorkerMetadata {
566                    kind,
567                    interval: Some(interval),
568                },
569                move || {
570                    stop_handle.store(true, Ordering::Release);
571                },
572                Some(join_handle),
573            ))
574        }
575        Err(err) => {
576            warn!("Failed to spawn {}: {}", name, err);
577            None
578        }
579    }
580}