quill_sql/
database.rs

1use crate::background::{self, BackgroundWorkers};
2use log::{debug, warn};
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::Duration;
6use tempfile::TempDir;
7
8use crate::buffer::{BufferManager, BUFFER_POOL_SIZE};
9use crate::catalog::load_catalog_data;
10use crate::config::{background_config, IndexVacuumConfig, MvccVacuumConfig, WalConfig};
11use crate::error::{QuillSQLError, QuillSQLResult};
12use crate::optimizer::LogicalOptimizer;
13use crate::plan::logical_plan::{LogicalPlan, TransactionScope};
14use crate::plan::PhysicalPlanner;
15use crate::recovery::{ControlFileManager, RecoveryManager, WalManager};
16use crate::session::SessionContext;
17use crate::utils::util::{pretty_format_logical_plan, pretty_format_physical_plan};
18use crate::{
19    catalog::Catalog,
20    execution::ExecutionEngine,
21    plan::{LogicalPlanner, PlannerContext},
22    storage::{
23        disk_manager::DiskManager, disk_scheduler::DiskScheduler, tuple::Tuple,
24        DefaultStorageEngine, StorageEngine,
25    },
26    transaction::{IsolationLevel, TransactionManager},
27};
28use sqlparser::ast::TransactionAccessMode;
29
30#[derive(Debug, Default, Clone)]
31pub struct WalOptions {
32    pub directory: Option<PathBuf>,
33    pub segment_size: Option<u64>,
34    pub sync_on_flush: Option<bool>,
35    pub writer_interval_ms: Option<Option<u64>>,
36    pub buffer_capacity: Option<usize>,
37    pub flush_coalesce_bytes: Option<usize>,
38    pub synchronous_commit: Option<bool>,
39    pub checkpoint_interval_ms: Option<Option<u64>>,
40    pub retain_segments: Option<usize>,
41}
42
43#[derive(Debug, Clone, Default)]
44pub struct DatabaseOptions {
45    pub wal: WalOptions,
46    pub default_isolation_level: Option<IsolationLevel>,
47}
48
49enum DatabaseLocation {
50    OnDisk(String),
51    Temporary,
52}
53
54fn bootstrap_storage(
55    location: DatabaseLocation,
56    wal_options: &WalOptions,
57) -> QuillSQLResult<(Arc<DiskManager>, WalConfig, Option<TempDir>)> {
58    match location {
59        DatabaseLocation::OnDisk(path) => {
60            let disk_manager = Arc::new(DiskManager::try_new(path.as_str())?);
61            let wal_config = wal_config_for_path(path.as_str(), wal_options);
62            Ok((disk_manager, wal_config, None))
63        }
64        DatabaseLocation::Temporary => {
65            let temp_dir = TempDir::new()?;
66            let temp_path = temp_dir.path().join("test.db");
67            let temp_str = temp_path
68                .to_str()
69                .ok_or_else(|| QuillSQLError::Internal("Invalid temp path".to_string()))?;
70            let disk_manager = Arc::new(DiskManager::try_new(temp_str)?);
71            let wal_config = wal_config_for_temp(temp_dir.path(), wal_options);
72            Ok((disk_manager, wal_config, Some(temp_dir)))
73        }
74    }
75}
76
77pub struct Database {
78    _temp_dir: Option<TempDir>,
79    pub(crate) buffer_pool: Arc<BufferManager>,
80    pub(crate) catalog: Catalog,
81    background_workers: BackgroundWorkers,
82    pub(crate) wal_manager: Arc<WalManager>,
83    pub(crate) transaction_manager: Arc<TransactionManager>,
84    default_isolation: IsolationLevel,
85    storage_engine: Arc<dyn StorageEngine>,
86}
87impl Database {
88    pub fn new_on_disk(db_path: &str) -> QuillSQLResult<Self> {
89        Self::new_on_disk_with_options(db_path, DatabaseOptions::default())
90    }
91
92    pub fn new_on_disk_with_options(
93        db_path: &str,
94        options: DatabaseOptions,
95    ) -> QuillSQLResult<Self> {
96        Self::new_with_location(DatabaseLocation::OnDisk(db_path.to_string()), options)
97    }
98
99    pub fn new_temp() -> QuillSQLResult<Self> {
100        Self::new_temp_with_options(DatabaseOptions::default())
101    }
102
103    pub fn new_temp_with_options(options: DatabaseOptions) -> QuillSQLResult<Self> {
104        Self::new_with_location(DatabaseLocation::Temporary, options)
105    }
106
107    fn new_with_location(
108        location: DatabaseLocation,
109        options: DatabaseOptions,
110    ) -> QuillSQLResult<Self> {
111        let (disk_manager, wal_config, temp_dir) = bootstrap_storage(location, &options.wal)?;
112
113        let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
114        let buffer_pool = Arc::new(BufferManager::new(BUFFER_POOL_SIZE, disk_scheduler.clone()));
115
116        let synchronous_commit = wal_config.synchronous_commit;
117        let (control_file, wal_init) =
118            ControlFileManager::load_or_init(&wal_config.directory, wal_config.segment_size)?;
119        let control_file = Arc::new(control_file);
120        let wal_manager = Arc::new(WalManager::new_with_scheduler(
121            wal_config.clone(),
122            Some(wal_init),
123            Some(control_file.clone()),
124            disk_scheduler.clone(),
125        )?);
126        let transaction_manager = Arc::new(TransactionManager::new(
127            wal_manager.clone(),
128            synchronous_commit,
129        ));
130
131        let worker_cfg = background_config(
132            &wal_config,
133            IndexVacuumConfig::default(),
134            MvccVacuumConfig::default(),
135        );
136        let mut background_workers = BackgroundWorkers::new();
137        if let Some(interval) = worker_cfg.wal_writer_interval {
138            if let Some(handle) = wal_manager.start_background_flush(interval)? {
139                background_workers.register(background::wal_writer_worker(handle, interval));
140            }
141        }
142        buffer_pool.set_wal_manager(wal_manager.clone());
143
144        let catalog = Catalog::new(buffer_pool.clone(), disk_manager.clone());
145        let storage_engine: Arc<dyn StorageEngine> = Arc::new(DefaultStorageEngine::default());
146
147        let recovery_summary = RecoveryManager::new(wal_manager.clone(), disk_scheduler.clone())
148            .with_buffer_pool(buffer_pool.clone())
149            .replay()?;
150        if recovery_summary.redo_count > 0 {
151            debug!(
152                "Recovery replayed {} record(s) starting at LSN {}",
153                recovery_summary.redo_count, recovery_summary.start_lsn
154            );
155        }
156        if !recovery_summary.loser_transactions.is_empty() {
157            warn!(
158                "{} transaction(s) require undo after recovery: {:?}",
159                recovery_summary.loser_transactions.len(),
160                recovery_summary.loser_transactions
161            );
162        }
163
164        let wal_for_workers: Arc<dyn background::CheckpointWal> = wal_manager.clone();
165        let buffer_for_workers: Arc<dyn background::BufferMaintenance> = buffer_pool.clone();
166        let txn_for_workers: Arc<dyn background::TxnSnapshotOps> = transaction_manager.clone();
167
168        background_workers.register_opt(background::spawn_checkpoint_worker(
169            wal_for_workers.clone(),
170            buffer_for_workers.clone(),
171            txn_for_workers.clone(),
172            worker_cfg.checkpoint_interval,
173        ));
174
175        background_workers.register_opt(background::spawn_bg_writer(
176            buffer_for_workers.clone(),
177            worker_cfg.bg_writer_interval,
178            worker_cfg.vacuum,
179        ));
180
181        let mvcc_interval = if worker_cfg.mvcc_vacuum.interval_ms == 0 {
182            None
183        } else {
184            Some(Duration::from_millis(worker_cfg.mvcc_vacuum.interval_ms))
185        };
186        background_workers.register_opt(background::spawn_mvcc_vacuum_worker(
187            txn_for_workers,
188            mvcc_interval,
189            worker_cfg.mvcc_vacuum.batch_limit,
190        ));
191
192        let mut db = Self {
193            _temp_dir: temp_dir,
194            buffer_pool,
195            catalog,
196            background_workers,
197            wal_manager,
198            transaction_manager,
199            default_isolation: options
200                .default_isolation_level
201                .unwrap_or(IsolationLevel::ReadUncommitted),
202            storage_engine,
203        };
204        load_catalog_data(&mut db)?;
205        Ok(db)
206    }
207
208    pub fn run(&mut self, sql: &str) -> QuillSQLResult<Vec<Tuple>> {
209        let mut session = SessionContext::new(self.default_isolation);
210        self.run_with_session(&mut session, sql)
211    }
212
213    pub fn run_with_session(
214        &mut self,
215        session: &mut SessionContext,
216        sql: &str,
217    ) -> QuillSQLResult<Vec<Tuple>> {
218        let logical_plan = self.create_logical_plan(sql)?;
219        debug!(
220            "Logical Plan: \n{}",
221            pretty_format_logical_plan(&logical_plan)
222        );
223
224        let optimized_logical_plan = LogicalOptimizer::new().optimize(&logical_plan)?;
225        debug!(
226            "Optimized Logical Plan: \n{}",
227            pretty_format_logical_plan(&logical_plan)
228        );
229
230        let physical_planner = PhysicalPlanner {
231            catalog: &self.catalog,
232        };
233        let physical_plan = physical_planner.create_physical_plan(optimized_logical_plan.clone());
234        debug!(
235            "Physical Plan: \n{}",
236            pretty_format_physical_plan(&physical_plan)
237        );
238
239        match optimized_logical_plan {
240            LogicalPlan::BeginTransaction(ref modes) => {
241                if session.has_active_transaction() {
242                    return Err(QuillSQLError::Execution(
243                        "transaction already active".to_string(),
244                    ));
245                }
246                let txn = self.transaction_manager.begin(
247                    modes.unwrap_effective_isolation(session.default_isolation()),
248                    modes
249                        .access_mode
250                        .unwrap_or(TransactionAccessMode::ReadWrite),
251                )?;
252                session.set_active_transaction(txn)?;
253                Ok(vec![])
254            }
255            LogicalPlan::CommitTransaction => {
256                let txn_ref = session
257                    .active_txn_mut()
258                    .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
259                self.transaction_manager.commit(txn_ref)?;
260                session.clear_active_transaction();
261                Ok(vec![])
262            }
263            LogicalPlan::RollbackTransaction => {
264                let txn_ref = session
265                    .active_txn_mut()
266                    .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
267                self.transaction_manager.abort(txn_ref)?;
268                session.clear_active_transaction();
269                Ok(vec![])
270            }
271            LogicalPlan::SetTransaction {
272                ref scope,
273                ref modes,
274            } => {
275                match scope {
276                    TransactionScope::Session => session.apply_session_modes(modes),
277                    TransactionScope::Transaction => session.apply_transaction_modes(modes),
278                }
279                Ok(vec![])
280            }
281            _ => {
282                let needs_cleanup = !session.has_active_transaction();
283                let autocommit = session.autocommit();
284
285                let result = {
286                    let txn = session.ensure_active_transaction(&self.transaction_manager)?;
287                    let context = crate::execution::ExecutionContext::new(
288                        &mut self.catalog,
289                        txn,
290                        &self.transaction_manager,
291                        self.storage_engine.clone(),
292                    );
293                    let mut engine = ExecutionEngine { context };
294                    engine.execute(Arc::new(physical_plan))?
295                };
296
297                if autocommit && needs_cleanup {
298                    if let Some(txn) = session.active_txn_mut() {
299                        self.transaction_manager.commit(txn)?;
300                    }
301                    session.clear_active_transaction();
302                }
303
304                Ok(result)
305            }
306        }
307    }
308
309    pub fn default_isolation(&self) -> IsolationLevel {
310        self.default_isolation
311    }
312
313    pub fn create_logical_plan(&mut self, sql: &str) -> QuillSQLResult<LogicalPlan> {
314        // sql -> ast
315        let stmts = crate::sql::parser::parse_sql(sql)?;
316        if stmts.len() != 1 {
317            return Err(QuillSQLError::NotSupport(
318                "only support one sql statement".to_string(),
319            ));
320        }
321        let stmt = &stmts[0];
322        let mut planner = LogicalPlanner {
323            context: PlannerContext {
324                catalog: &self.catalog,
325            },
326        };
327        // ast -> logical plan
328        planner.plan(stmt)
329    }
330
331    pub fn flush(&self) -> QuillSQLResult<()> {
332        let _ = self.wal_manager.flush(None)?;
333        self.buffer_pool.flush_all_pages()
334    }
335
336    pub fn transaction_manager(&self) -> Arc<TransactionManager> {
337        self.transaction_manager.clone()
338    }
339}
340
341impl Drop for Database {
342    fn drop(&mut self) {
343        self.background_workers.shutdown_all();
344    }
345}
346
347fn wal_config_for_path(db_path: &str, overrides: &WalOptions) -> WalConfig {
348    let mut config = WalConfig {
349        directory: overrides
350            .directory
351            .clone()
352            .unwrap_or_else(|| wal_directory_from_path(db_path)),
353        ..WalConfig::default()
354    };
355    if let Some(size) = overrides.segment_size {
356        config.segment_size = size;
357    }
358    if let Some(sync) = overrides.sync_on_flush {
359        config.sync_on_flush = sync;
360    }
361    if let Some(interval) = overrides.writer_interval_ms {
362        config.writer_interval_ms = interval;
363    }
364    if let Some(capacity) = overrides.buffer_capacity {
365        config.buffer_capacity = capacity;
366    }
367    if let Some(bytes) = overrides.flush_coalesce_bytes {
368        config.flush_coalesce_bytes = bytes;
369    }
370    if let Some(sync_commit) = overrides.synchronous_commit {
371        config.synchronous_commit = sync_commit;
372    }
373    if let Some(interval) = overrides.checkpoint_interval_ms {
374        config.checkpoint_interval_ms = interval;
375    }
376    if let Some(retain) = overrides.retain_segments {
377        config.retain_segments = retain.max(1);
378    }
379    config
380}
381
382fn wal_directory_from_path(db_path: &str) -> PathBuf {
383    let mut base = PathBuf::from(db_path);
384    base.set_extension("wal");
385    if base.extension().is_none() {
386        PathBuf::from(format!("{}.wal", db_path))
387    } else {
388        base
389    }
390}
391
392fn wal_config_for_temp(temp_root: &Path, overrides: &WalOptions) -> WalConfig {
393    let mut config = WalConfig {
394        directory: overrides
395            .directory
396            .clone()
397            .unwrap_or_else(|| temp_root.join("wal")),
398        ..WalConfig::default()
399    };
400    if let Some(size) = overrides.segment_size {
401        config.segment_size = size;
402    }
403    if let Some(sync) = overrides.sync_on_flush {
404        config.sync_on_flush = sync;
405    }
406    if let Some(interval) = overrides.writer_interval_ms {
407        config.writer_interval_ms = interval;
408    }
409    if let Some(capacity) = overrides.buffer_capacity {
410        config.buffer_capacity = capacity;
411    }
412    if let Some(bytes) = overrides.flush_coalesce_bytes {
413        config.flush_coalesce_bytes = bytes;
414    }
415    if let Some(sync_commit) = overrides.synchronous_commit {
416        config.synchronous_commit = sync_commit;
417    }
418    if let Some(interval) = overrides.checkpoint_interval_ms {
419        config.checkpoint_interval_ms = interval;
420    }
421    if let Some(retain) = overrides.retain_segments {
422        config.retain_segments = retain.max(1);
423    }
424    config
425}