1use log::debug;
2use serde::Serialize;
3use sqlparser::ast::TransactionAccessMode;
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::{Arc, Mutex};
7use std::time::Instant;
8use tempfile::TempDir;
9
10use crate::catalog::{load_catalog_data, Catalog, TableStatistics};
11use crate::error::{QuillSQLError, QuillSQLResult};
12use crate::execution::physical_plan::PhysicalPlan;
13use crate::execution::ExecutionEngine;
14use crate::optimizer::LogicalOptimizer;
15use crate::plan::logical_plan::{LogicalPlan, TransactionScope};
16use crate::plan::{LogicalPlanner, PhysicalPlanner, PlannerContext};
17use crate::session::SessionContext;
18use crate::storage::engine::TableHandle;
19use crate::storage::holt::{HoltStore, HoltTableHandle};
20use crate::storage::tuple::Tuple;
21use crate::storage::HoltStorage;
22use crate::transaction::{
23 CommandId, IsolationLevel, LockDebugSnapshot, TransactionManager, TransactionStatus,
24 TxnDebugSnapshot,
25};
26use crate::utils::table_ref::TableReference;
27use crate::utils::util::{pretty_format_logical_plan, pretty_format_physical_plan};
28
29#[derive(Debug, Clone, Default)]
30pub struct DatabaseOptions {
31 pub holt: HoltOptions,
32 pub default_isolation_level: Option<IsolationLevel>,
33}
34
35#[derive(Debug, Default, Clone)]
36pub struct HoltOptions {
37 pub directory: Option<PathBuf>,
38}
39
40#[derive(Clone)]
41enum DatabaseLocation {
42 OnDisk(String),
43 Temporary,
44}
45
46pub struct Database {
47 pub(crate) catalog: Catalog,
48 pub(crate) transaction_manager: Arc<TransactionManager>,
49 pub(crate) holt_store: Arc<HoltStore>,
50 default_isolation: IsolationLevel,
51 storage: Arc<HoltStorage>,
52 debug_trace: Arc<Mutex<Option<DebugTrace>>>,
53 _temp_dir: Option<TempDir>,
56}
57
58struct PreparedStatement {
59 optimized_logical_plan: LogicalPlan,
60 physical_plan: PhysicalPlan,
61}
62
63#[derive(Debug, Clone, Serialize)]
64pub struct DebugTrace {
65 pub logical_plan: String,
66 pub physical_plan: String,
67 pub rows: usize,
68 pub duration_ms: u128,
69 pub logical_tree: DebugPlanNode,
70 pub physical_tree: DebugPlanNode,
71}
72
73#[derive(Debug, Clone, Serialize)]
74pub struct DebugPlanNode {
75 pub op: String,
76 pub children: Vec<DebugPlanNode>,
77}
78
79#[derive(Debug, Clone, Serialize)]
80pub struct DebugPlanSnapshot {
81 pub logical: DebugPlanNode,
82 pub physical: DebugPlanNode,
83}
84
85#[derive(Debug, Clone, Serialize)]
86pub struct MvccVersionSample {
87 pub table: String,
88 pub rid: String,
89 pub insert_txn: u64,
90 pub delete_txn: u64,
91 pub visible: bool,
92}
93
94#[derive(Debug, Clone, Serialize)]
95pub struct MvccVersionsDebug {
96 pub samples: Vec<MvccVersionSample>,
97 pub note: String,
98}
99
100impl DebugPlanNode {
101 pub fn from_physical(plan: &PhysicalPlan) -> Self {
102 Self {
103 op: plan.display_name(),
104 children: plan
105 .inputs()
106 .iter()
107 .map(|child| Self::from_physical(child))
108 .collect(),
109 }
110 }
111
112 pub fn from_logical(plan: &LogicalPlan) -> Self {
113 Self {
114 op: plan.to_string(),
115 children: plan
116 .inputs()
117 .iter()
118 .map(|child| Self::from_logical(child))
119 .collect(),
120 }
121 }
122}
123
124impl Database {
125 pub fn new_on_disk(db_path: &str) -> QuillSQLResult<Self> {
126 Self::new_on_disk_with_options(db_path, DatabaseOptions::default())
127 }
128
129 pub fn new_on_disk_with_options(
130 db_path: &str,
131 options: DatabaseOptions,
132 ) -> QuillSQLResult<Self> {
133 Self::new_with_location(DatabaseLocation::OnDisk(db_path.to_string()), options)
134 }
135
136 pub fn new_temp() -> QuillSQLResult<Self> {
137 Self::new_temp_with_options(DatabaseOptions::default())
138 }
139
140 pub fn new_temp_with_options(options: DatabaseOptions) -> QuillSQLResult<Self> {
141 Self::new_with_location(DatabaseLocation::Temporary, options)
142 }
143
144 fn new_with_location(
145 location: DatabaseLocation,
146 options: DatabaseOptions,
147 ) -> QuillSQLResult<Self> {
148 let (holt_dir, temp_dir) = holt_directory_for_location(&location, &options.holt)?;
149 let holt_store = Arc::new(HoltStore::open(holt_dir)?);
150 let transaction_manager = Arc::new(TransactionManager::new());
151 seed_holt_transaction_statuses(&holt_store, &transaction_manager)?;
152
153 let catalog = Catalog::new(holt_store.clone());
154 let storage = Arc::new(HoltStorage::new(holt_store.clone()));
155
156 let mut db = Self {
157 catalog,
158 transaction_manager,
159 holt_store,
160 default_isolation: options
161 .default_isolation_level
162 .unwrap_or(IsolationLevel::ReadUncommitted),
163 storage,
164 debug_trace: Arc::new(Mutex::new(None)),
165 _temp_dir: temp_dir,
166 };
167 load_catalog_data(&mut db)?;
168 Ok(db)
169 }
170
171 pub fn run(&mut self, sql: &str) -> QuillSQLResult<Vec<Tuple>> {
172 let mut session = SessionContext::new(self.default_isolation);
173 self.run_with_session(&mut session, sql)
174 }
175
176 pub fn run_with_session(
177 &mut self,
178 session: &mut SessionContext,
179 sql: &str,
180 ) -> QuillSQLResult<Vec<Tuple>> {
181 let start = Instant::now();
182 let PreparedStatement {
183 optimized_logical_plan,
184 physical_plan,
185 } = self.plan_statement(sql)?;
186 let logical_plan_str = pretty_format_logical_plan(&optimized_logical_plan);
187 let physical_plan_str = pretty_format_physical_plan(&physical_plan);
188 let logical_tree = DebugPlanNode::from_logical(&optimized_logical_plan);
189 let physical_tree = DebugPlanNode::from_physical(&physical_plan);
190
191 if let Some(result) = self.execute_transaction_control(session, &optimized_logical_plan)? {
192 return Ok(result);
193 }
194
195 let result = self.execute_physical_plan(session, physical_plan)?;
196
197 if let Ok(mut guard) = self.debug_trace.lock() {
198 *guard = Some(DebugTrace {
199 logical_plan: logical_plan_str,
200 physical_plan: physical_plan_str,
201 logical_tree,
202 physical_tree,
203 rows: result.len(),
204 duration_ms: start.elapsed().as_millis(),
205 });
206 }
207
208 Ok(result)
209 }
210
211 pub fn default_isolation(&self) -> IsolationLevel {
212 self.default_isolation
213 }
214
215 pub fn create_logical_plan(&mut self, sql: &str) -> QuillSQLResult<LogicalPlan> {
216 let stmts = crate::sql::parser::parse_sql(sql)?;
217 if stmts.len() != 1 {
218 return Err(QuillSQLError::NotSupport(
219 "only support one sql statement".to_string(),
220 ));
221 }
222 let stmt = &stmts[0];
223 let mut planner = LogicalPlanner {
224 context: PlannerContext {
225 catalog: &self.catalog,
226 },
227 };
228 planner.plan(stmt)
229 }
230
231 pub fn analyze_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<TableStatistics> {
232 self.catalog.analyze_table(table_ref)
233 }
234
235 pub fn flush(&self) -> QuillSQLResult<()> {
236 self.holt_store
237 .db()
238 .checkpoint()
239 .map_err(crate::storage::holt::map_holt_err)
240 }
241
242 #[cfg(test)]
243 pub(crate) fn table_binding(
244 &self,
245 table_ref: &TableReference,
246 ) -> QuillSQLResult<crate::storage::TableBinding> {
247 self.storage.table(&self.catalog, table_ref)
248 }
249
250 pub fn debug_last_trace(&self) -> Option<DebugTrace> {
251 self.debug_trace.lock().ok().and_then(|guard| guard.clone())
252 }
253
254 pub fn debug_lock_snapshot(&self) -> LockDebugSnapshot {
255 self.transaction_manager.lock_manager_arc().debug_snapshot()
256 }
257
258 pub fn debug_txn_snapshot(&self) -> TxnDebugSnapshot {
259 self.transaction_manager.debug_snapshot()
260 }
261
262 pub fn debug_mvcc_versions(&self) -> QuillSQLResult<MvccVersionsDebug> {
263 let snapshot = self
264 .transaction_manager
265 .snapshot(self.transaction_manager.next_txn_id_hint());
266 let mut samples = Vec::new();
267 let max_samples = 20usize;
268 for (schema_name, schema) in &self.catalog.schemas {
269 if schema_name == crate::catalog::INFORMATION_SCHEMA_NAME {
270 continue;
271 }
272 for (table_name, table) in &schema.tables {
273 let table_ref = TableReference::Full {
274 catalog: crate::catalog::DEFAULT_CATALOG_NAME.to_string(),
275 schema: schema_name.clone(),
276 table: table_name.clone(),
277 };
278 let handle = HoltTableHandle::new(
279 table_ref.clone(),
280 table.schema.clone(),
281 table.table_id,
282 self.holt_store.clone(),
283 );
284 let mut stream = handle.full_scan()?;
285 while let Some((rid, meta, _tuple)) = stream.next()? {
286 if samples.len() >= max_samples {
287 break;
288 }
289 let visible = snapshot.is_visible(&meta, 0 as CommandId, |tid| {
290 self.transaction_manager.transaction_status(tid)
291 });
292 samples.push(MvccVersionSample {
293 table: table_ref.to_string(),
294 rid: rid.to_string(),
295 insert_txn: meta.insert_txn_id,
296 delete_txn: meta.delete_txn_id,
297 visible,
298 });
299 }
300 if samples.len() >= max_samples {
301 break;
302 }
303 }
304 if samples.len() >= max_samples {
305 break;
306 }
307 }
308
309 Ok(MvccVersionsDebug {
310 samples,
311 note: format!("sampled up to {} tuples from Holt tables", max_samples),
312 })
313 }
314
315 pub fn debug_last_plan(&self) -> Option<DebugPlanSnapshot> {
316 self.debug_trace
317 .lock()
318 .ok()
319 .and_then(|opt| opt.clone())
320 .map(|trace| DebugPlanSnapshot {
321 logical: trace.logical_tree,
322 physical: trace.physical_tree,
323 })
324 }
325
326 pub fn table_statistics(
327 &self,
328 table_ref: &TableReference,
329 ) -> Option<&crate::catalog::TableStatistics> {
330 self.catalog.table_statistics(table_ref)
331 }
332
333 pub fn transaction_manager(&self) -> Arc<TransactionManager> {
334 self.transaction_manager.clone()
335 }
336
337 fn plan_statement(&mut self, sql: &str) -> QuillSQLResult<PreparedStatement> {
338 let logical_plan = self.create_logical_plan(sql)?;
339 debug!(
340 "Logical Plan: \n{}",
341 pretty_format_logical_plan(&logical_plan)
342 );
343
344 let optimized_logical_plan = self.optimize_logical_plan(&logical_plan)?;
345 debug!(
346 "Optimized Logical Plan: \n{}",
347 pretty_format_logical_plan(&optimized_logical_plan)
348 );
349
350 let physical_plan = self.build_physical_plan(&optimized_logical_plan);
351 debug!(
352 "Physical Plan: \n{}",
353 pretty_format_physical_plan(&physical_plan)
354 );
355
356 Ok(PreparedStatement {
357 optimized_logical_plan,
358 physical_plan,
359 })
360 }
361
362 fn optimize_logical_plan(&self, logical_plan: &LogicalPlan) -> QuillSQLResult<LogicalPlan> {
363 LogicalOptimizer::new().optimize(logical_plan)
364 }
365
366 fn build_physical_plan(&self, logical_plan: &LogicalPlan) -> PhysicalPlan {
367 let physical_planner = PhysicalPlanner::with_catalog(&self.catalog);
368 physical_planner.create_physical_plan(logical_plan.clone())
369 }
370
371 fn execute_transaction_control(
372 &self,
373 session: &mut SessionContext,
374 plan: &LogicalPlan,
375 ) -> QuillSQLResult<Option<Vec<Tuple>>> {
376 match plan {
377 LogicalPlan::BeginTransaction(modes) => {
378 if session.has_active_transaction() {
379 return Err(QuillSQLError::Execution(
380 "transaction already active".to_string(),
381 ));
382 }
383 let txn = self.transaction_manager.begin(
384 modes.unwrap_effective_isolation(session.default_isolation()),
385 modes
386 .access_mode
387 .unwrap_or(TransactionAccessMode::ReadWrite),
388 )?;
389 session.set_active_transaction(txn)?;
390 Ok(Some(vec![]))
391 }
392 LogicalPlan::CommitTransaction => {
393 let txn_ref = session
394 .active_txn_mut()
395 .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
396 let txn_id = txn_ref.id();
397 self.transaction_manager.commit(txn_ref)?;
398 self.holt_store
399 .put_txn_status(txn_id, TransactionStatus::Committed)?;
400 session.clear_active_transaction();
401 Ok(Some(vec![]))
402 }
403 LogicalPlan::RollbackTransaction => {
404 let txn_ref = session
405 .active_txn_mut()
406 .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
407 let txn_id = txn_ref.id();
408 self.transaction_manager.abort(txn_ref)?;
409 self.holt_store
410 .put_txn_status(txn_id, TransactionStatus::Aborted)?;
411 session.clear_active_transaction();
412 Ok(Some(vec![]))
413 }
414 LogicalPlan::SetTransaction { scope, modes } => {
415 match scope {
416 TransactionScope::Session => session.apply_session_modes(modes),
417 TransactionScope::Transaction => session.apply_transaction_modes(modes),
418 }
419 Ok(Some(vec![]))
420 }
421 _ => Ok(None),
422 }
423 }
424
425 fn execute_physical_plan(
426 &mut self,
427 session: &mut SessionContext,
428 physical_plan: PhysicalPlan,
429 ) -> QuillSQLResult<Vec<Tuple>> {
430 let needs_cleanup = !session.has_active_transaction();
431 let autocommit = session.autocommit();
432 let result = {
433 let txn = session.ensure_active_transaction(&self.transaction_manager)?;
434 let context = crate::execution::ExecutionContext::new(
435 &mut self.catalog,
436 txn,
437 self.transaction_manager.clone(),
438 self.storage.clone(),
439 );
440 let mut engine = ExecutionEngine { context };
441 engine.execute(Rc::new(physical_plan))?
442 };
443
444 if autocommit && needs_cleanup {
445 if let Some(txn) = session.active_txn_mut() {
446 let txn_id = txn.id();
447 self.transaction_manager.commit(txn)?;
448 self.holt_store
449 .put_txn_status(txn_id, TransactionStatus::Committed)?;
450 }
451 session.clear_active_transaction();
452 }
453
454 Ok(result)
455 }
456}
457
458fn holt_directory_for_location(
459 location: &DatabaseLocation,
460 overrides: &HoltOptions,
461) -> QuillSQLResult<(PathBuf, Option<TempDir>)> {
462 if let Some(directory) = &overrides.directory {
463 return Ok((directory.clone(), None));
464 }
465 match location {
466 DatabaseLocation::OnDisk(path) => Ok((PathBuf::from(path), None)),
467 DatabaseLocation::Temporary => {
468 let temp_dir = TempDir::new()?;
469 let holt_dir = temp_dir.path().join("holt");
470 Ok((holt_dir, Some(temp_dir)))
471 }
472 }
473}
474
475fn seed_holt_transaction_statuses(
476 holt_store: &HoltStore,
477 transaction_manager: &TransactionManager,
478) -> QuillSQLResult<()> {
479 let mut next_txn_id = 1;
480 for (txn_id, status) in holt_store.recover_txn_statuses()? {
481 transaction_manager.record_recovered_status(txn_id, status);
482 next_txn_id = next_txn_id.max(txn_id.saturating_add(1));
483 }
484 transaction_manager.ensure_next_txn_id_at_least(next_txn_id);
485 Ok(())
486}