Skip to main content

quill_sql/
database.rs

1use log::debug;
2use serde::Serialize;
3use sqlparser::ast::TransactionAccessMode;
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::{Arc, Mutex};
7use std::time::Instant;
8use tempfile::TempDir;
9
10use crate::catalog::{load_catalog_data, Catalog, TableStatistics};
11use crate::error::{QuillSQLError, QuillSQLResult};
12use crate::execution::physical_plan::PhysicalPlan;
13use crate::execution::ExecutionEngine;
14use crate::optimizer::LogicalOptimizer;
15use crate::plan::logical_plan::{LogicalPlan, TransactionScope};
16use crate::plan::{LogicalPlanner, PhysicalPlanner, PlannerContext};
17use crate::session::SessionContext;
18use crate::storage::engine::TableHandle;
19use crate::storage::holt::{HoltStore, HoltTableHandle};
20use crate::storage::tuple::Tuple;
21use crate::storage::HoltStorage;
22use crate::transaction::{
23    CommandId, IsolationLevel, LockDebugSnapshot, TransactionManager, TransactionStatus,
24    TxnDebugSnapshot,
25};
26use crate::utils::table_ref::TableReference;
27use crate::utils::util::{pretty_format_logical_plan, pretty_format_physical_plan};
28
29#[derive(Debug, Clone, Default)]
30pub struct DatabaseOptions {
31    pub holt: HoltOptions,
32    pub default_isolation_level: Option<IsolationLevel>,
33}
34
35#[derive(Debug, Default, Clone)]
36pub struct HoltOptions {
37    pub directory: Option<PathBuf>,
38}
39
40#[derive(Clone)]
41enum DatabaseLocation {
42    OnDisk(String),
43    Temporary,
44}
45
46pub struct Database {
47    pub(crate) catalog: Catalog,
48    pub(crate) transaction_manager: Arc<TransactionManager>,
49    pub(crate) holt_store: Arc<HoltStore>,
50    default_isolation: IsolationLevel,
51    storage: Arc<HoltStorage>,
52    debug_trace: Arc<Mutex<Option<DebugTrace>>>,
53    // Must drop after every Holt owner so temporary databases can checkpoint
54    // before their directory is removed.
55    _temp_dir: Option<TempDir>,
56}
57
58struct PreparedStatement {
59    optimized_logical_plan: LogicalPlan,
60    physical_plan: PhysicalPlan,
61}
62
63#[derive(Debug, Clone, Serialize)]
64pub struct DebugTrace {
65    pub logical_plan: String,
66    pub physical_plan: String,
67    pub rows: usize,
68    pub duration_ms: u128,
69    pub logical_tree: DebugPlanNode,
70    pub physical_tree: DebugPlanNode,
71}
72
73#[derive(Debug, Clone, Serialize)]
74pub struct DebugPlanNode {
75    pub op: String,
76    pub children: Vec<DebugPlanNode>,
77}
78
79#[derive(Debug, Clone, Serialize)]
80pub struct DebugPlanSnapshot {
81    pub logical: DebugPlanNode,
82    pub physical: DebugPlanNode,
83}
84
85#[derive(Debug, Clone, Serialize)]
86pub struct MvccVersionSample {
87    pub table: String,
88    pub rid: String,
89    pub insert_txn: u64,
90    pub delete_txn: u64,
91    pub visible: bool,
92}
93
94#[derive(Debug, Clone, Serialize)]
95pub struct MvccVersionsDebug {
96    pub samples: Vec<MvccVersionSample>,
97    pub note: String,
98}
99
100impl DebugPlanNode {
101    pub fn from_physical(plan: &PhysicalPlan) -> Self {
102        Self {
103            op: plan.display_name(),
104            children: plan
105                .inputs()
106                .iter()
107                .map(|child| Self::from_physical(child))
108                .collect(),
109        }
110    }
111
112    pub fn from_logical(plan: &LogicalPlan) -> Self {
113        Self {
114            op: plan.to_string(),
115            children: plan
116                .inputs()
117                .iter()
118                .map(|child| Self::from_logical(child))
119                .collect(),
120        }
121    }
122}
123
124impl Database {
125    pub fn new_on_disk(db_path: &str) -> QuillSQLResult<Self> {
126        Self::new_on_disk_with_options(db_path, DatabaseOptions::default())
127    }
128
129    pub fn new_on_disk_with_options(
130        db_path: &str,
131        options: DatabaseOptions,
132    ) -> QuillSQLResult<Self> {
133        Self::new_with_location(DatabaseLocation::OnDisk(db_path.to_string()), options)
134    }
135
136    pub fn new_temp() -> QuillSQLResult<Self> {
137        Self::new_temp_with_options(DatabaseOptions::default())
138    }
139
140    pub fn new_temp_with_options(options: DatabaseOptions) -> QuillSQLResult<Self> {
141        Self::new_with_location(DatabaseLocation::Temporary, options)
142    }
143
144    fn new_with_location(
145        location: DatabaseLocation,
146        options: DatabaseOptions,
147    ) -> QuillSQLResult<Self> {
148        let (holt_dir, temp_dir) = holt_directory_for_location(&location, &options.holt)?;
149        let holt_store = Arc::new(HoltStore::open(holt_dir)?);
150        let transaction_manager = Arc::new(TransactionManager::new());
151        seed_holt_transaction_statuses(&holt_store, &transaction_manager)?;
152
153        let catalog = Catalog::new(holt_store.clone());
154        let storage = Arc::new(HoltStorage::new(holt_store.clone()));
155
156        let mut db = Self {
157            catalog,
158            transaction_manager,
159            holt_store,
160            default_isolation: options
161                .default_isolation_level
162                .unwrap_or(IsolationLevel::ReadUncommitted),
163            storage,
164            debug_trace: Arc::new(Mutex::new(None)),
165            _temp_dir: temp_dir,
166        };
167        load_catalog_data(&mut db)?;
168        Ok(db)
169    }
170
171    pub fn run(&mut self, sql: &str) -> QuillSQLResult<Vec<Tuple>> {
172        let mut session = SessionContext::new(self.default_isolation);
173        self.run_with_session(&mut session, sql)
174    }
175
176    pub fn run_with_session(
177        &mut self,
178        session: &mut SessionContext,
179        sql: &str,
180    ) -> QuillSQLResult<Vec<Tuple>> {
181        let start = Instant::now();
182        let PreparedStatement {
183            optimized_logical_plan,
184            physical_plan,
185        } = self.plan_statement(sql)?;
186        let logical_plan_str = pretty_format_logical_plan(&optimized_logical_plan);
187        let physical_plan_str = pretty_format_physical_plan(&physical_plan);
188        let logical_tree = DebugPlanNode::from_logical(&optimized_logical_plan);
189        let physical_tree = DebugPlanNode::from_physical(&physical_plan);
190
191        if let Some(result) = self.execute_transaction_control(session, &optimized_logical_plan)? {
192            return Ok(result);
193        }
194
195        let result = self.execute_physical_plan(session, physical_plan)?;
196
197        if let Ok(mut guard) = self.debug_trace.lock() {
198            *guard = Some(DebugTrace {
199                logical_plan: logical_plan_str,
200                physical_plan: physical_plan_str,
201                logical_tree,
202                physical_tree,
203                rows: result.len(),
204                duration_ms: start.elapsed().as_millis(),
205            });
206        }
207
208        Ok(result)
209    }
210
211    pub fn default_isolation(&self) -> IsolationLevel {
212        self.default_isolation
213    }
214
215    pub fn create_logical_plan(&mut self, sql: &str) -> QuillSQLResult<LogicalPlan> {
216        let stmts = crate::sql::parser::parse_sql(sql)?;
217        if stmts.len() != 1 {
218            return Err(QuillSQLError::NotSupport(
219                "only support one sql statement".to_string(),
220            ));
221        }
222        let stmt = &stmts[0];
223        let mut planner = LogicalPlanner {
224            context: PlannerContext {
225                catalog: &self.catalog,
226            },
227        };
228        planner.plan(stmt)
229    }
230
231    pub fn analyze_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<TableStatistics> {
232        self.catalog.analyze_table(table_ref)
233    }
234
235    pub fn flush(&self) -> QuillSQLResult<()> {
236        self.holt_store
237            .db()
238            .checkpoint()
239            .map_err(crate::storage::holt::map_holt_err)
240    }
241
242    #[cfg(test)]
243    pub(crate) fn table_binding(
244        &self,
245        table_ref: &TableReference,
246    ) -> QuillSQLResult<crate::storage::TableBinding> {
247        self.storage.table(&self.catalog, table_ref)
248    }
249
250    pub fn debug_last_trace(&self) -> Option<DebugTrace> {
251        self.debug_trace.lock().ok().and_then(|guard| guard.clone())
252    }
253
254    pub fn debug_lock_snapshot(&self) -> LockDebugSnapshot {
255        self.transaction_manager.lock_manager_arc().debug_snapshot()
256    }
257
258    pub fn debug_txn_snapshot(&self) -> TxnDebugSnapshot {
259        self.transaction_manager.debug_snapshot()
260    }
261
262    pub fn debug_mvcc_versions(&self) -> QuillSQLResult<MvccVersionsDebug> {
263        let snapshot = self
264            .transaction_manager
265            .snapshot(self.transaction_manager.next_txn_id_hint());
266        let mut samples = Vec::new();
267        let max_samples = 20usize;
268        for (schema_name, schema) in &self.catalog.schemas {
269            if schema_name == crate::catalog::INFORMATION_SCHEMA_NAME {
270                continue;
271            }
272            for (table_name, table) in &schema.tables {
273                let table_ref = TableReference::Full {
274                    catalog: crate::catalog::DEFAULT_CATALOG_NAME.to_string(),
275                    schema: schema_name.clone(),
276                    table: table_name.clone(),
277                };
278                let handle = HoltTableHandle::new(
279                    table_ref.clone(),
280                    table.schema.clone(),
281                    table.table_id,
282                    self.holt_store.clone(),
283                );
284                let mut stream = handle.full_scan()?;
285                while let Some((rid, meta, _tuple)) = stream.next()? {
286                    if samples.len() >= max_samples {
287                        break;
288                    }
289                    let visible = snapshot.is_visible(&meta, 0 as CommandId, |tid| {
290                        self.transaction_manager.transaction_status(tid)
291                    });
292                    samples.push(MvccVersionSample {
293                        table: table_ref.to_string(),
294                        rid: rid.to_string(),
295                        insert_txn: meta.insert_txn_id,
296                        delete_txn: meta.delete_txn_id,
297                        visible,
298                    });
299                }
300                if samples.len() >= max_samples {
301                    break;
302                }
303            }
304            if samples.len() >= max_samples {
305                break;
306            }
307        }
308
309        Ok(MvccVersionsDebug {
310            samples,
311            note: format!("sampled up to {} tuples from Holt tables", max_samples),
312        })
313    }
314
315    pub fn debug_last_plan(&self) -> Option<DebugPlanSnapshot> {
316        self.debug_trace
317            .lock()
318            .ok()
319            .and_then(|opt| opt.clone())
320            .map(|trace| DebugPlanSnapshot {
321                logical: trace.logical_tree,
322                physical: trace.physical_tree,
323            })
324    }
325
326    pub fn table_statistics(
327        &self,
328        table_ref: &TableReference,
329    ) -> Option<&crate::catalog::TableStatistics> {
330        self.catalog.table_statistics(table_ref)
331    }
332
333    pub fn transaction_manager(&self) -> Arc<TransactionManager> {
334        self.transaction_manager.clone()
335    }
336
337    fn plan_statement(&mut self, sql: &str) -> QuillSQLResult<PreparedStatement> {
338        let logical_plan = self.create_logical_plan(sql)?;
339        debug!(
340            "Logical Plan: \n{}",
341            pretty_format_logical_plan(&logical_plan)
342        );
343
344        let optimized_logical_plan = self.optimize_logical_plan(&logical_plan)?;
345        debug!(
346            "Optimized Logical Plan: \n{}",
347            pretty_format_logical_plan(&optimized_logical_plan)
348        );
349
350        let physical_plan = self.build_physical_plan(&optimized_logical_plan);
351        debug!(
352            "Physical Plan: \n{}",
353            pretty_format_physical_plan(&physical_plan)
354        );
355
356        Ok(PreparedStatement {
357            optimized_logical_plan,
358            physical_plan,
359        })
360    }
361
362    fn optimize_logical_plan(&self, logical_plan: &LogicalPlan) -> QuillSQLResult<LogicalPlan> {
363        LogicalOptimizer::new().optimize(logical_plan)
364    }
365
366    fn build_physical_plan(&self, logical_plan: &LogicalPlan) -> PhysicalPlan {
367        let physical_planner = PhysicalPlanner::with_catalog(&self.catalog);
368        physical_planner.create_physical_plan(logical_plan.clone())
369    }
370
371    fn execute_transaction_control(
372        &self,
373        session: &mut SessionContext,
374        plan: &LogicalPlan,
375    ) -> QuillSQLResult<Option<Vec<Tuple>>> {
376        match plan {
377            LogicalPlan::BeginTransaction(modes) => {
378                if session.has_active_transaction() {
379                    return Err(QuillSQLError::Execution(
380                        "transaction already active".to_string(),
381                    ));
382                }
383                let txn = self.transaction_manager.begin(
384                    modes.unwrap_effective_isolation(session.default_isolation()),
385                    modes
386                        .access_mode
387                        .unwrap_or(TransactionAccessMode::ReadWrite),
388                )?;
389                session.set_active_transaction(txn)?;
390                Ok(Some(vec![]))
391            }
392            LogicalPlan::CommitTransaction => {
393                let txn_ref = session
394                    .active_txn_mut()
395                    .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
396                let txn_id = txn_ref.id();
397                self.transaction_manager.commit(txn_ref)?;
398                self.holt_store
399                    .put_txn_status(txn_id, TransactionStatus::Committed)?;
400                session.clear_active_transaction();
401                Ok(Some(vec![]))
402            }
403            LogicalPlan::RollbackTransaction => {
404                let txn_ref = session
405                    .active_txn_mut()
406                    .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
407                let txn_id = txn_ref.id();
408                self.transaction_manager.abort(txn_ref)?;
409                self.holt_store
410                    .put_txn_status(txn_id, TransactionStatus::Aborted)?;
411                session.clear_active_transaction();
412                Ok(Some(vec![]))
413            }
414            LogicalPlan::SetTransaction { scope, modes } => {
415                match scope {
416                    TransactionScope::Session => session.apply_session_modes(modes),
417                    TransactionScope::Transaction => session.apply_transaction_modes(modes),
418                }
419                Ok(Some(vec![]))
420            }
421            _ => Ok(None),
422        }
423    }
424
425    fn execute_physical_plan(
426        &mut self,
427        session: &mut SessionContext,
428        physical_plan: PhysicalPlan,
429    ) -> QuillSQLResult<Vec<Tuple>> {
430        let needs_cleanup = !session.has_active_transaction();
431        let autocommit = session.autocommit();
432        let result = {
433            let txn = session.ensure_active_transaction(&self.transaction_manager)?;
434            let context = crate::execution::ExecutionContext::new(
435                &mut self.catalog,
436                txn,
437                self.transaction_manager.clone(),
438                self.storage.clone(),
439            );
440            let mut engine = ExecutionEngine { context };
441            engine.execute(Rc::new(physical_plan))?
442        };
443
444        if autocommit && needs_cleanup {
445            if let Some(txn) = session.active_txn_mut() {
446                let txn_id = txn.id();
447                self.transaction_manager.commit(txn)?;
448                self.holt_store
449                    .put_txn_status(txn_id, TransactionStatus::Committed)?;
450            }
451            session.clear_active_transaction();
452        }
453
454        Ok(result)
455    }
456}
457
458fn holt_directory_for_location(
459    location: &DatabaseLocation,
460    overrides: &HoltOptions,
461) -> QuillSQLResult<(PathBuf, Option<TempDir>)> {
462    if let Some(directory) = &overrides.directory {
463        return Ok((directory.clone(), None));
464    }
465    match location {
466        DatabaseLocation::OnDisk(path) => Ok((PathBuf::from(path), None)),
467        DatabaseLocation::Temporary => {
468            let temp_dir = TempDir::new()?;
469            let holt_dir = temp_dir.path().join("holt");
470            Ok((holt_dir, Some(temp_dir)))
471        }
472    }
473}
474
475fn seed_holt_transaction_statuses(
476    holt_store: &HoltStore,
477    transaction_manager: &TransactionManager,
478) -> QuillSQLResult<()> {
479    let mut next_txn_id = 1;
480    for (txn_id, status) in holt_store.recover_txn_statuses()? {
481        transaction_manager.record_recovered_status(txn_id, status);
482        next_txn_id = next_txn_id.max(txn_id.saturating_add(1));
483    }
484    transaction_manager.ensure_next_txn_id_at_least(next_txn_id);
485    Ok(())
486}