1use crate::background::{self, BackgroundWorkers};
2use log::{debug, warn};
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::Duration;
6use tempfile::TempDir;
7
8use crate::buffer::{BufferManager, BUFFER_POOL_SIZE};
9use crate::catalog::load_catalog_data;
10use crate::config::{background_config, IndexVacuumConfig, MvccVacuumConfig, WalConfig};
11use crate::error::{QuillSQLError, QuillSQLResult};
12use crate::optimizer::LogicalOptimizer;
13use crate::plan::logical_plan::{LogicalPlan, TransactionScope};
14use crate::plan::PhysicalPlanner;
15use crate::recovery::{ControlFileManager, RecoveryManager, WalManager};
16use crate::session::SessionContext;
17use crate::utils::util::{pretty_format_logical_plan, pretty_format_physical_plan};
18use crate::{
19 catalog::Catalog,
20 execution::ExecutionEngine,
21 plan::{LogicalPlanner, PlannerContext},
22 storage::{
23 disk_manager::DiskManager, disk_scheduler::DiskScheduler, tuple::Tuple,
24 DefaultStorageEngine, StorageEngine,
25 },
26 transaction::{IsolationLevel, TransactionManager},
27};
28use sqlparser::ast::TransactionAccessMode;
29
30#[derive(Debug, Default, Clone)]
31pub struct WalOptions {
32 pub directory: Option<PathBuf>,
33 pub segment_size: Option<u64>,
34 pub sync_on_flush: Option<bool>,
35 pub writer_interval_ms: Option<Option<u64>>,
36 pub buffer_capacity: Option<usize>,
37 pub flush_coalesce_bytes: Option<usize>,
38 pub synchronous_commit: Option<bool>,
39 pub checkpoint_interval_ms: Option<Option<u64>>,
40 pub retain_segments: Option<usize>,
41}
42
43#[derive(Debug, Clone, Default)]
44pub struct DatabaseOptions {
45 pub wal: WalOptions,
46 pub default_isolation_level: Option<IsolationLevel>,
47}
48
49enum DatabaseLocation {
50 OnDisk(String),
51 Temporary,
52}
53
54fn bootstrap_storage(
55 location: DatabaseLocation,
56 wal_options: &WalOptions,
57) -> QuillSQLResult<(Arc<DiskManager>, WalConfig, Option<TempDir>)> {
58 match location {
59 DatabaseLocation::OnDisk(path) => {
60 let disk_manager = Arc::new(DiskManager::try_new(path.as_str())?);
61 let wal_config = wal_config_for_path(path.as_str(), wal_options);
62 Ok((disk_manager, wal_config, None))
63 }
64 DatabaseLocation::Temporary => {
65 let temp_dir = TempDir::new()?;
66 let temp_path = temp_dir.path().join("test.db");
67 let temp_str = temp_path
68 .to_str()
69 .ok_or_else(|| QuillSQLError::Internal("Invalid temp path".to_string()))?;
70 let disk_manager = Arc::new(DiskManager::try_new(temp_str)?);
71 let wal_config = wal_config_for_temp(temp_dir.path(), wal_options);
72 Ok((disk_manager, wal_config, Some(temp_dir)))
73 }
74 }
75}
76
77pub struct Database {
78 _temp_dir: Option<TempDir>,
79 pub(crate) buffer_pool: Arc<BufferManager>,
80 pub(crate) catalog: Catalog,
81 background_workers: BackgroundWorkers,
82 pub(crate) wal_manager: Arc<WalManager>,
83 pub(crate) transaction_manager: Arc<TransactionManager>,
84 default_isolation: IsolationLevel,
85 storage_engine: Arc<dyn StorageEngine>,
86}
87impl Database {
88 pub fn new_on_disk(db_path: &str) -> QuillSQLResult<Self> {
89 Self::new_on_disk_with_options(db_path, DatabaseOptions::default())
90 }
91
92 pub fn new_on_disk_with_options(
93 db_path: &str,
94 options: DatabaseOptions,
95 ) -> QuillSQLResult<Self> {
96 Self::new_with_location(DatabaseLocation::OnDisk(db_path.to_string()), options)
97 }
98
99 pub fn new_temp() -> QuillSQLResult<Self> {
100 Self::new_temp_with_options(DatabaseOptions::default())
101 }
102
103 pub fn new_temp_with_options(options: DatabaseOptions) -> QuillSQLResult<Self> {
104 Self::new_with_location(DatabaseLocation::Temporary, options)
105 }
106
107 fn new_with_location(
108 location: DatabaseLocation,
109 options: DatabaseOptions,
110 ) -> QuillSQLResult<Self> {
111 let (disk_manager, wal_config, temp_dir) = bootstrap_storage(location, &options.wal)?;
112
113 let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager.clone()));
114 let buffer_pool = Arc::new(BufferManager::new(BUFFER_POOL_SIZE, disk_scheduler.clone()));
115
116 let synchronous_commit = wal_config.synchronous_commit;
117 let (control_file, wal_init) =
118 ControlFileManager::load_or_init(&wal_config.directory, wal_config.segment_size)?;
119 let control_file = Arc::new(control_file);
120 let wal_manager = Arc::new(WalManager::new_with_scheduler(
121 wal_config.clone(),
122 Some(wal_init),
123 Some(control_file.clone()),
124 disk_scheduler.clone(),
125 )?);
126 let transaction_manager = Arc::new(TransactionManager::new(
127 wal_manager.clone(),
128 synchronous_commit,
129 ));
130
131 let worker_cfg = background_config(
132 &wal_config,
133 IndexVacuumConfig::default(),
134 MvccVacuumConfig::default(),
135 );
136 let mut background_workers = BackgroundWorkers::new();
137 if let Some(interval) = worker_cfg.wal_writer_interval {
138 if let Some(handle) = wal_manager.start_background_flush(interval)? {
139 background_workers.register(background::wal_writer_worker(handle, interval));
140 }
141 }
142 buffer_pool.set_wal_manager(wal_manager.clone());
143
144 let catalog = Catalog::new(buffer_pool.clone(), disk_manager.clone());
145 let storage_engine: Arc<dyn StorageEngine> = Arc::new(DefaultStorageEngine::default());
146
147 let recovery_summary = RecoveryManager::new(wal_manager.clone(), disk_scheduler.clone())
148 .with_buffer_pool(buffer_pool.clone())
149 .replay()?;
150 if recovery_summary.redo_count > 0 {
151 debug!(
152 "Recovery replayed {} record(s) starting at LSN {}",
153 recovery_summary.redo_count, recovery_summary.start_lsn
154 );
155 }
156 if !recovery_summary.loser_transactions.is_empty() {
157 warn!(
158 "{} transaction(s) require undo after recovery: {:?}",
159 recovery_summary.loser_transactions.len(),
160 recovery_summary.loser_transactions
161 );
162 }
163
164 let wal_for_workers: Arc<dyn background::CheckpointWal> = wal_manager.clone();
165 let buffer_for_workers: Arc<dyn background::BufferMaintenance> = buffer_pool.clone();
166 let txn_for_workers: Arc<dyn background::TxnSnapshotOps> = transaction_manager.clone();
167
168 background_workers.register_opt(background::spawn_checkpoint_worker(
169 wal_for_workers.clone(),
170 buffer_for_workers.clone(),
171 txn_for_workers.clone(),
172 worker_cfg.checkpoint_interval,
173 ));
174
175 background_workers.register_opt(background::spawn_bg_writer(
176 buffer_for_workers.clone(),
177 worker_cfg.bg_writer_interval,
178 worker_cfg.vacuum,
179 ));
180
181 let mvcc_interval = if worker_cfg.mvcc_vacuum.interval_ms == 0 {
182 None
183 } else {
184 Some(Duration::from_millis(worker_cfg.mvcc_vacuum.interval_ms))
185 };
186 background_workers.register_opt(background::spawn_mvcc_vacuum_worker(
187 txn_for_workers,
188 mvcc_interval,
189 worker_cfg.mvcc_vacuum.batch_limit,
190 ));
191
192 let mut db = Self {
193 _temp_dir: temp_dir,
194 buffer_pool,
195 catalog,
196 background_workers,
197 wal_manager,
198 transaction_manager,
199 default_isolation: options
200 .default_isolation_level
201 .unwrap_or(IsolationLevel::ReadUncommitted),
202 storage_engine,
203 };
204 load_catalog_data(&mut db)?;
205 Ok(db)
206 }
207
208 pub fn run(&mut self, sql: &str) -> QuillSQLResult<Vec<Tuple>> {
209 let mut session = SessionContext::new(self.default_isolation);
210 self.run_with_session(&mut session, sql)
211 }
212
213 pub fn run_with_session(
214 &mut self,
215 session: &mut SessionContext,
216 sql: &str,
217 ) -> QuillSQLResult<Vec<Tuple>> {
218 let logical_plan = self.create_logical_plan(sql)?;
219 debug!(
220 "Logical Plan: \n{}",
221 pretty_format_logical_plan(&logical_plan)
222 );
223
224 let optimized_logical_plan = LogicalOptimizer::new().optimize(&logical_plan)?;
225 debug!(
226 "Optimized Logical Plan: \n{}",
227 pretty_format_logical_plan(&logical_plan)
228 );
229
230 let physical_planner = PhysicalPlanner {
231 catalog: &self.catalog,
232 };
233 let physical_plan = physical_planner.create_physical_plan(optimized_logical_plan.clone());
234 debug!(
235 "Physical Plan: \n{}",
236 pretty_format_physical_plan(&physical_plan)
237 );
238
239 match optimized_logical_plan {
240 LogicalPlan::BeginTransaction(ref modes) => {
241 if session.has_active_transaction() {
242 return Err(QuillSQLError::Execution(
243 "transaction already active".to_string(),
244 ));
245 }
246 let txn = self.transaction_manager.begin(
247 modes.unwrap_effective_isolation(session.default_isolation()),
248 modes
249 .access_mode
250 .unwrap_or(TransactionAccessMode::ReadWrite),
251 )?;
252 session.set_active_transaction(txn)?;
253 Ok(vec![])
254 }
255 LogicalPlan::CommitTransaction => {
256 let txn_ref = session
257 .active_txn_mut()
258 .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
259 self.transaction_manager.commit(txn_ref)?;
260 session.clear_active_transaction();
261 Ok(vec![])
262 }
263 LogicalPlan::RollbackTransaction => {
264 let txn_ref = session
265 .active_txn_mut()
266 .ok_or_else(|| QuillSQLError::Execution("no active transaction".to_string()))?;
267 self.transaction_manager.abort(txn_ref)?;
268 session.clear_active_transaction();
269 Ok(vec![])
270 }
271 LogicalPlan::SetTransaction {
272 ref scope,
273 ref modes,
274 } => {
275 match scope {
276 TransactionScope::Session => session.apply_session_modes(modes),
277 TransactionScope::Transaction => session.apply_transaction_modes(modes),
278 }
279 Ok(vec![])
280 }
281 _ => {
282 let needs_cleanup = !session.has_active_transaction();
283 let autocommit = session.autocommit();
284
285 let result = {
286 let txn = session.ensure_active_transaction(&self.transaction_manager)?;
287 let context = crate::execution::ExecutionContext::new(
288 &mut self.catalog,
289 txn,
290 &self.transaction_manager,
291 self.storage_engine.clone(),
292 );
293 let mut engine = ExecutionEngine { context };
294 engine.execute(Arc::new(physical_plan))?
295 };
296
297 if autocommit && needs_cleanup {
298 if let Some(txn) = session.active_txn_mut() {
299 self.transaction_manager.commit(txn)?;
300 }
301 session.clear_active_transaction();
302 }
303
304 Ok(result)
305 }
306 }
307 }
308
309 pub fn default_isolation(&self) -> IsolationLevel {
310 self.default_isolation
311 }
312
313 pub fn create_logical_plan(&mut self, sql: &str) -> QuillSQLResult<LogicalPlan> {
314 let stmts = crate::sql::parser::parse_sql(sql)?;
316 if stmts.len() != 1 {
317 return Err(QuillSQLError::NotSupport(
318 "only support one sql statement".to_string(),
319 ));
320 }
321 let stmt = &stmts[0];
322 let mut planner = LogicalPlanner {
323 context: PlannerContext {
324 catalog: &self.catalog,
325 },
326 };
327 planner.plan(stmt)
329 }
330
331 pub fn flush(&self) -> QuillSQLResult<()> {
332 let _ = self.wal_manager.flush(None)?;
333 self.buffer_pool.flush_all_pages()
334 }
335
336 pub fn transaction_manager(&self) -> Arc<TransactionManager> {
337 self.transaction_manager.clone()
338 }
339}
340
341impl Drop for Database {
342 fn drop(&mut self) {
343 self.background_workers.shutdown_all();
344 }
345}
346
347fn wal_config_for_path(db_path: &str, overrides: &WalOptions) -> WalConfig {
348 let mut config = WalConfig {
349 directory: overrides
350 .directory
351 .clone()
352 .unwrap_or_else(|| wal_directory_from_path(db_path)),
353 ..WalConfig::default()
354 };
355 if let Some(size) = overrides.segment_size {
356 config.segment_size = size;
357 }
358 if let Some(sync) = overrides.sync_on_flush {
359 config.sync_on_flush = sync;
360 }
361 if let Some(interval) = overrides.writer_interval_ms {
362 config.writer_interval_ms = interval;
363 }
364 if let Some(capacity) = overrides.buffer_capacity {
365 config.buffer_capacity = capacity;
366 }
367 if let Some(bytes) = overrides.flush_coalesce_bytes {
368 config.flush_coalesce_bytes = bytes;
369 }
370 if let Some(sync_commit) = overrides.synchronous_commit {
371 config.synchronous_commit = sync_commit;
372 }
373 if let Some(interval) = overrides.checkpoint_interval_ms {
374 config.checkpoint_interval_ms = interval;
375 }
376 if let Some(retain) = overrides.retain_segments {
377 config.retain_segments = retain.max(1);
378 }
379 config
380}
381
382fn wal_directory_from_path(db_path: &str) -> PathBuf {
383 let mut base = PathBuf::from(db_path);
384 base.set_extension("wal");
385 if base.extension().is_none() {
386 PathBuf::from(format!("{}.wal", db_path))
387 } else {
388 base
389 }
390}
391
392fn wal_config_for_temp(temp_root: &Path, overrides: &WalOptions) -> WalConfig {
393 let mut config = WalConfig {
394 directory: overrides
395 .directory
396 .clone()
397 .unwrap_or_else(|| temp_root.join("wal")),
398 ..WalConfig::default()
399 };
400 if let Some(size) = overrides.segment_size {
401 config.segment_size = size;
402 }
403 if let Some(sync) = overrides.sync_on_flush {
404 config.sync_on_flush = sync;
405 }
406 if let Some(interval) = overrides.writer_interval_ms {
407 config.writer_interval_ms = interval;
408 }
409 if let Some(capacity) = overrides.buffer_capacity {
410 config.buffer_capacity = capacity;
411 }
412 if let Some(bytes) = overrides.flush_coalesce_bytes {
413 config.flush_coalesce_bytes = bytes;
414 }
415 if let Some(sync_commit) = overrides.synchronous_commit {
416 config.synchronous_commit = sync_commit;
417 }
418 if let Some(interval) = overrides.checkpoint_interval_ms {
419 config.checkpoint_interval_ms = interval;
420 }
421 if let Some(retain) = overrides.retain_segments {
422 config.retain_segments = retain.max(1);
423 }
424 config
425}