1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3use std::thread;
4use std::thread::JoinHandle;
5use std::time::Duration;
6
7use log::{debug, warn};
8
9use crate::buffer::{BufferManager, PageId};
10use crate::catalog::registry::TableRegistry;
11use crate::catalog::INFORMATION_SCHEMA_NAME;
12use crate::error::QuillSQLResult;
13use crate::recovery::wal::codec::CheckpointPayload;
14use crate::recovery::{Lsn, WalManager, WalWriterHandle};
15use crate::storage::page::{RecordId, TupleMeta};
16use crate::storage::table_heap::{TableHeap, TableIterator};
17use crate::transaction::{TransactionId, TransactionManager, TransactionStatus};
18
19pub trait CheckpointWal: Send + Sync {
20 fn max_assigned_lsn(&self) -> Lsn;
21 fn flush_until(&self, target: Lsn) -> QuillSQLResult<Lsn>;
22 fn log_checkpoint(&self, payload: CheckpointPayload) -> QuillSQLResult<Lsn>;
23}
24
25pub trait BufferMaintenance: Send + Sync {
26 fn dirty_page_ids(&self) -> Vec<PageId>;
27 fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)>;
28 fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool>;
29}
30
31pub trait TxnSnapshotOps: Send + Sync {
32 fn active_transactions(&self) -> Vec<TransactionId>;
33 fn oldest_active_txn(&self) -> Option<TransactionId>;
34 fn next_txn_id_hint(&self) -> TransactionId;
35 fn transaction_status(&self, txn_id: TransactionId) -> TransactionStatus;
36}
37
38pub trait WalFlushControl: Send + 'static {
39 fn stop(self) -> QuillSQLResult<()>;
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum WorkerKind {
45 WalWriter,
46 Checkpoint,
47 BufferPoolWriter,
48 MvccVacuum,
49}
50
51#[derive(Debug, Clone, Copy)]
52pub struct WorkerMetadata {
53 pub kind: WorkerKind,
54 pub interval: Option<Duration>,
55}
56
57pub struct WorkerHandle {
58 metadata: WorkerMetadata,
59 stop_fn: Option<Box<dyn FnOnce() + Send + 'static>>,
60 join_handle: Option<JoinHandle<()>>,
61}
62
63impl WorkerHandle {
64 pub fn new(
65 metadata: WorkerMetadata,
66 stop_fn: impl FnOnce() + Send + 'static,
67 join_handle: Option<JoinHandle<()>>,
68 ) -> Self {
69 Self {
70 metadata,
71 stop_fn: Some(Box::new(stop_fn)),
72 join_handle,
73 }
74 }
75
76 pub fn metadata(&self) -> WorkerMetadata {
77 self.metadata
78 }
79
80 pub fn shutdown(&mut self) {
81 if let Some(stop) = self.stop_fn.take() {
82 stop();
83 }
84 }
85
86 pub fn join(&mut self) {
87 if let Some(handle) = self.join_handle.take() {
88 if let Err(err) = handle.join() {
89 log::warn!(
90 "Background worker {:?} terminated with panic: {:?}",
91 self.metadata.kind,
92 err
93 );
94 }
95 }
96 }
97}
98
99impl Drop for WorkerHandle {
100 fn drop(&mut self) {
101 self.shutdown();
102 self.join();
103 }
104}
105
106impl WalFlushControl for WalWriterHandle {
107 fn stop(self) -> QuillSQLResult<()> {
108 WalWriterHandle::stop(self)
109 }
110}
111
112pub struct BackgroundWorkers {
113 workers: Vec<WorkerHandle>,
114}
115
116impl Default for BackgroundWorkers {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122impl BackgroundWorkers {
123 pub fn new() -> Self {
124 Self {
125 workers: Vec::new(),
126 }
127 }
128
129 pub fn register(&mut self, handle: WorkerHandle) {
130 self.workers.push(handle);
131 }
132
133 pub fn register_opt(&mut self, handle: Option<WorkerHandle>) {
134 if let Some(handle) = handle {
135 self.register(handle);
136 }
137 }
138
139 pub fn shutdown_all(&mut self) {
140 for worker in &mut self.workers {
141 worker.shutdown();
142 }
143 for worker in &mut self.workers {
144 worker.join();
145 }
146 }
147
148 pub fn workers(&self) -> &[WorkerHandle] {
149 &self.workers
150 }
151
152 pub fn snapshot(&self) -> Vec<WorkerMetadata> {
153 self.workers
154 .iter()
155 .map(|worker| worker.metadata())
156 .collect()
157 }
158}
159
160impl Drop for BackgroundWorkers {
161 fn drop(&mut self) {
162 self.shutdown_all();
163 }
164}
165
166pub fn wal_writer_worker<H>(handle: H, interval: Duration) -> WorkerHandle
167where
168 H: WalFlushControl,
169{
170 WorkerHandle::new(
171 WorkerMetadata {
172 kind: WorkerKind::WalWriter,
173 interval: Some(interval),
174 },
175 move || {
176 if let Err(err) = handle.stop() {
177 warn!("Failed to stop WAL writer: {}", err);
178 }
179 },
180 None,
181 )
182}
183
184pub fn spawn_checkpoint_worker(
185 wal: Arc<dyn CheckpointWal>,
186 buffer_pool: Arc<dyn BufferMaintenance>,
187 transaction_manager: Arc<dyn TxnSnapshotOps>,
188 interval: Option<Duration>,
189) -> Option<WorkerHandle> {
190 let interval = interval?;
191 if interval.is_zero() {
192 return None;
193 }
194
195 let wal_ref = wal.clone();
196 let buffer = buffer_pool.clone();
197 let txn_mgr = transaction_manager.clone();
198
199 spawn_periodic_worker(
200 "checkpoint-worker",
201 WorkerKind::Checkpoint,
202 interval,
203 move || {
204 let dirty_pages = buffer.dirty_page_ids();
205 let dpt_snapshot = buffer.dirty_page_table_snapshot();
206 let active_txns = txn_mgr.active_transactions();
207 let last_lsn = wal_ref.max_assigned_lsn();
208
209 if last_lsn != 0 {
210 if let Err(e) = wal_ref.flush_until(last_lsn) {
211 warn!("Checkpoint flush failed: {}", e);
212 }
213 let payload = CheckpointPayload {
214 last_lsn,
215 dirty_pages,
216 active_transactions: active_txns,
217 dpt: dpt_snapshot,
218 };
219 if let Err(e) = wal_ref.log_checkpoint(payload) {
220 warn!("Checkpoint write failed: {}", e);
221 }
222 }
223 },
224 )
225}
226
227pub fn spawn_bg_writer(
228 buffer_pool: Arc<dyn BufferMaintenance>,
229 interval: Option<Duration>,
230) -> Option<WorkerHandle> {
231 let interval = interval?;
232 if interval.is_zero() {
233 return None;
234 }
235 let bp = buffer_pool.clone();
236 spawn_periodic_worker(
237 "bg-writer",
238 WorkerKind::BufferPoolWriter,
239 interval,
240 move || {
241 let dirty_ids = bp.dirty_page_ids();
242 for page_id in dirty_ids.into_iter().take(16) {
243 let _ = bp.flush_page(page_id);
244 }
245 },
246 )
247}
248
249pub fn spawn_mvcc_vacuum_worker(
250 transaction_manager: Arc<dyn TxnSnapshotOps>,
251 interval: Option<Duration>,
252 batch_limit: usize,
253 table_registry: Arc<TableRegistry>,
254) -> Option<WorkerHandle> {
255 let interval = interval?;
256 if interval.is_zero() || batch_limit == 0 {
257 return None;
258 }
259
260 let txn_mgr = transaction_manager.clone();
261 let tables = table_registry.clone();
262
263 spawn_periodic_worker("mvcc-vacuum", WorkerKind::MvccVacuum, interval, move || {
264 let safe_xmin = txn_mgr
265 .oldest_active_txn()
266 .unwrap_or_else(|| txn_mgr.next_txn_id_hint());
267 let mut remaining = batch_limit;
268
269 for (table_ref, heap) in tables.iter_tables() {
270 if remaining == 0 {
271 break;
272 }
273 if matches!(table_ref.schema(), Some(schema) if schema == INFORMATION_SCHEMA_NAME) {
274 continue;
275 }
276
277 match vacuum_table_versions(&heap, &txn_mgr, safe_xmin, &mut remaining) {
278 Ok(cleaned) if cleaned > 0 => {
279 debug!(
280 "MVCC vacuum reclaimed {} tuple(s) from {}",
281 cleaned,
282 table_ref.to_log_string()
283 );
284 }
285 Ok(_) => {}
286 Err(err) => {
287 warn!(
288 "MVCC vacuum on {} failed: {}",
289 table_ref.to_log_string(),
290 err
291 );
292 }
293 }
294 }
295 })
296}
297
298impl CheckpointWal for WalManager {
299 fn max_assigned_lsn(&self) -> Lsn {
300 WalManager::max_assigned_lsn(self)
301 }
302
303 fn flush_until(&self, target: Lsn) -> QuillSQLResult<Lsn> {
304 WalManager::flush_until(self, target)
305 }
306
307 fn log_checkpoint(&self, payload: CheckpointPayload) -> QuillSQLResult<Lsn> {
308 WalManager::log_checkpoint(self, payload)
309 }
310}
311
312impl BufferMaintenance for BufferManager {
313 fn dirty_page_ids(&self) -> Vec<PageId> {
314 BufferManager::dirty_page_ids(self)
315 }
316
317 fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)> {
318 BufferManager::dirty_page_table_snapshot(self)
319 }
320
321 fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
322 BufferManager::flush_page(self, page_id)
323 }
324}
325
326impl TxnSnapshotOps for TransactionManager {
327 fn active_transactions(&self) -> Vec<TransactionId> {
328 TransactionManager::active_transactions(self)
329 }
330
331 fn oldest_active_txn(&self) -> Option<TransactionId> {
332 TransactionManager::oldest_active_txn(self)
333 }
334
335 fn next_txn_id_hint(&self) -> TransactionId {
336 TransactionManager::next_txn_id_hint(self)
337 }
338
339 fn transaction_status(&self, txn_id: TransactionId) -> TransactionStatus {
340 TransactionManager::transaction_status(self, txn_id)
341 }
342}
343
344fn spawn_periodic_worker<F>(
345 name: &str,
346 kind: WorkerKind,
347 interval: Duration,
348 mut tick: F,
349) -> Option<WorkerHandle>
350where
351 F: FnMut() + Send + 'static,
352{
353 let stop_flag = Arc::new(AtomicBool::new(false));
354 let thread_flag = Arc::clone(&stop_flag);
355
356 match thread::Builder::new().name(name.into()).spawn(move || {
357 while !thread_flag.load(Ordering::Relaxed) {
358 tick();
359 if thread_flag.load(Ordering::Relaxed) {
360 break;
361 }
362 thread::sleep(interval);
363 }
364 }) {
365 Ok(join_handle) => {
366 let stop_handle = Arc::clone(&stop_flag);
367 Some(WorkerHandle::new(
368 WorkerMetadata {
369 kind,
370 interval: Some(interval),
371 },
372 move || {
373 stop_handle.store(true, Ordering::Release);
374 },
375 Some(join_handle),
376 ))
377 }
378 Err(err) => {
379 warn!("Failed to spawn {}: {}", name, err);
380 None
381 }
382 }
383}
384
385fn vacuum_table_versions(
386 table: &Arc<TableHeap>,
387 txn_mgr: &Arc<dyn TxnSnapshotOps>,
388 safe_xmin: TransactionId,
389 remaining: &mut usize,
390) -> crate::error::QuillSQLResult<usize> {
391 if *remaining == 0 {
392 return Ok(0);
393 }
394
395 let mut cleaned = 0usize;
396 let mut iter = TableIterator::new(table.clone(), ..);
397 while *remaining > 0 {
398 match iter.next()? {
399 Some((rid, meta, _tuple)) => {
400 let removed = if meta.is_deleted {
401 try_reclaim_deleted(table, txn_mgr, safe_xmin, rid, &meta)?
402 } else {
403 try_reclaim_aborted(table, txn_mgr, safe_xmin, rid, &meta)?
404 };
405 if removed {
406 cleaned += 1;
407 *remaining -= 1;
408 }
409 }
410 None => break,
411 }
412 }
413 Ok(cleaned)
414}
415
416fn try_reclaim_deleted(
417 table: &Arc<TableHeap>,
418 txn_mgr: &Arc<dyn TxnSnapshotOps>,
419 safe_xmin: TransactionId,
420 rid: RecordId,
421 meta: &TupleMeta,
422) -> crate::error::QuillSQLResult<bool> {
423 if meta.delete_txn_id == 0 {
424 return Ok(false);
425 }
426 let status = txn_mgr.transaction_status(meta.delete_txn_id);
427 let removable = match status {
428 TransactionStatus::Committed => meta.delete_txn_id < safe_xmin,
429 TransactionStatus::Aborted => true,
430 TransactionStatus::InProgress | TransactionStatus::Unknown => false,
431 };
432 if !removable {
433 return Ok(false);
434 }
435
436 let delete_txn = meta.delete_txn_id;
437 let delete_cid = meta.delete_cid;
438 let insert_txn = meta.insert_txn_id;
439 table.vacuum_slot_if(rid, |current| {
440 current.is_deleted
441 && current.delete_txn_id == delete_txn
442 && current.delete_cid == delete_cid
443 && current.insert_txn_id == insert_txn
444 })
445}
446
447fn try_reclaim_aborted(
448 table: &Arc<TableHeap>,
449 txn_mgr: &Arc<dyn TxnSnapshotOps>,
450 safe_xmin: TransactionId,
451 rid: RecordId,
452 meta: &TupleMeta,
453) -> crate::error::QuillSQLResult<bool> {
454 if meta.insert_txn_id == 0 {
455 return Ok(false);
456 }
457 let status = txn_mgr.transaction_status(meta.insert_txn_id);
458 if status != TransactionStatus::Aborted {
459 return Ok(false);
460 }
461 if meta.insert_txn_id >= safe_xmin {
462 return Ok(false);
463 }
464
465 let insert_txn = meta.insert_txn_id;
466 table.vacuum_slot_if(rid, |current| {
467 !current.is_deleted && current.insert_txn_id == insert_txn
468 })
469}