1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3use std::thread;
4use std::thread::JoinHandle;
5use std::time::Duration;
6
7use log::{debug, warn};
8
9use crate::buffer::{BufferManager, PageId};
10use crate::catalog::registry::{global_index_registry, global_table_registry};
11use crate::catalog::INFORMATION_SCHEMA_NAME;
12use crate::config::IndexVacuumConfig;
13use crate::error::QuillSQLResult;
14use crate::recovery::wal::codec::CheckpointPayload;
15use crate::recovery::{Lsn, WalManager, WalWriterHandle};
16use crate::storage::page::{RecordId, TupleMeta};
17use crate::storage::table_heap::{TableHeap, TableIterator};
18use crate::transaction::{TransactionId, TransactionManager, TransactionStatus};
19
20pub trait CheckpointWal: Send + Sync {
21 fn max_assigned_lsn(&self) -> Lsn;
22 fn flush_until(&self, target: Lsn) -> QuillSQLResult<Lsn>;
23 fn log_checkpoint(&self, payload: CheckpointPayload) -> QuillSQLResult<Lsn>;
24}
25
26pub trait BufferMaintenance: Send + Sync {
27 fn dirty_page_ids(&self) -> Vec<PageId>;
28 fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)>;
29 fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool>;
30}
31
32pub trait TxnSnapshotOps: Send + Sync {
33 fn active_transactions(&self) -> Vec<TransactionId>;
34 fn oldest_active_txn(&self) -> Option<TransactionId>;
35 fn next_txn_id_hint(&self) -> TransactionId;
36 fn transaction_status(&self, txn_id: TransactionId) -> TransactionStatus;
37}
38
39pub trait WalFlushControl: Send + 'static {
40 fn stop(self) -> QuillSQLResult<()>;
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum WorkerKind {
46 WalWriter,
47 Checkpoint,
48 BufferPoolWriter,
49 MvccVacuum,
50}
51
52#[derive(Debug, Clone, Copy)]
53pub struct WorkerMetadata {
54 pub kind: WorkerKind,
55 pub interval: Option<Duration>,
56}
57
58pub struct WorkerHandle {
59 metadata: WorkerMetadata,
60 stop_fn: Option<Box<dyn FnOnce() + Send + 'static>>,
61 join_handle: Option<JoinHandle<()>>,
62}
63
64impl WorkerHandle {
65 pub fn new(
66 metadata: WorkerMetadata,
67 stop_fn: impl FnOnce() + Send + 'static,
68 join_handle: Option<JoinHandle<()>>,
69 ) -> Self {
70 Self {
71 metadata,
72 stop_fn: Some(Box::new(stop_fn)),
73 join_handle,
74 }
75 }
76
77 pub fn metadata(&self) -> WorkerMetadata {
78 self.metadata
79 }
80
81 pub fn shutdown(&mut self) {
82 if let Some(stop) = self.stop_fn.take() {
83 stop();
84 }
85 }
86
87 pub fn join(&mut self) {
88 if let Some(handle) = self.join_handle.take() {
89 if let Err(err) = handle.join() {
90 log::warn!(
91 "Background worker {:?} terminated with panic: {:?}",
92 self.metadata.kind,
93 err
94 );
95 }
96 }
97 }
98}
99
100impl Drop for WorkerHandle {
101 fn drop(&mut self) {
102 self.shutdown();
103 self.join();
104 }
105}
106
107impl WalFlushControl for WalWriterHandle {
108 fn stop(self) -> QuillSQLResult<()> {
109 WalWriterHandle::stop(self)
110 }
111}
112
113pub struct BackgroundWorkers {
114 workers: Vec<WorkerHandle>,
115}
116
117impl Default for BackgroundWorkers {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl BackgroundWorkers {
124 pub fn new() -> Self {
125 Self {
126 workers: Vec::new(),
127 }
128 }
129
130 pub fn register(&mut self, handle: WorkerHandle) {
131 self.workers.push(handle);
132 }
133
134 pub fn register_opt(&mut self, handle: Option<WorkerHandle>) {
135 if let Some(handle) = handle {
136 self.register(handle);
137 }
138 }
139
140 pub fn shutdown_all(&mut self) {
141 for worker in &mut self.workers {
142 worker.shutdown();
143 }
144 for worker in &mut self.workers {
145 worker.join();
146 }
147 }
148
149 pub fn workers(&self) -> &[WorkerHandle] {
150 &self.workers
151 }
152
153 pub fn snapshot(&self) -> Vec<WorkerMetadata> {
154 self.workers
155 .iter()
156 .map(|worker| worker.metadata())
157 .collect()
158 }
159}
160
161impl Drop for BackgroundWorkers {
162 fn drop(&mut self) {
163 self.shutdown_all();
164 }
165}
166
167pub fn wal_writer_worker<H>(handle: H, interval: Duration) -> WorkerHandle
168where
169 H: WalFlushControl,
170{
171 WorkerHandle::new(
172 WorkerMetadata {
173 kind: WorkerKind::WalWriter,
174 interval: Some(interval),
175 },
176 move || {
177 if let Err(err) = handle.stop() {
178 warn!("Failed to stop WAL writer: {}", err);
179 }
180 },
181 None,
182 )
183}
184
185pub fn spawn_checkpoint_worker(
186 wal: Arc<dyn CheckpointWal>,
187 buffer_pool: Arc<dyn BufferMaintenance>,
188 transaction_manager: Arc<dyn TxnSnapshotOps>,
189 interval: Option<Duration>,
190) -> Option<WorkerHandle> {
191 let interval = interval?;
192 if interval.is_zero() {
193 return None;
194 }
195
196 let wal_ref = wal.clone();
197 let buffer = buffer_pool.clone();
198 let txn_mgr = transaction_manager.clone();
199
200 spawn_periodic_worker(
201 "checkpoint-worker",
202 WorkerKind::Checkpoint,
203 interval,
204 move || {
205 let dirty_pages = buffer.dirty_page_ids();
206 let dpt_snapshot = buffer.dirty_page_table_snapshot();
207 let active_txns = txn_mgr.active_transactions();
208 let last_lsn = wal_ref.max_assigned_lsn();
209
210 if last_lsn != 0 {
211 if let Err(e) = wal_ref.flush_until(last_lsn) {
212 warn!("Checkpoint flush failed: {}", e);
213 }
214 let payload = CheckpointPayload {
215 last_lsn,
216 dirty_pages,
217 active_transactions: active_txns,
218 dpt: dpt_snapshot,
219 };
220 if let Err(e) = wal_ref.log_checkpoint(payload) {
221 warn!("Checkpoint write failed: {}", e);
222 }
223 }
224 },
225 )
226}
227
228pub fn spawn_bg_writer(
229 buffer_pool: Arc<dyn BufferMaintenance>,
230 interval: Option<Duration>,
231 vacuum_cfg: IndexVacuumConfig,
232) -> Option<WorkerHandle> {
233 let interval = interval?;
234 if interval.is_zero() {
235 return None;
236 }
237 let bp = buffer_pool.clone();
238 spawn_periodic_worker(
239 "bg-writer",
240 WorkerKind::BufferPoolWriter,
241 interval,
242 move || {
243 let dirty_ids = bp.dirty_page_ids();
244 for page_id in dirty_ids.into_iter().take(16) {
245 let _ = bp.flush_page(page_id);
246 }
247
248 let registry = global_index_registry();
249 for (idx, heap) in registry.iter().take(16) {
250 let pending = idx.take_pending_garbage();
251 if pending >= vacuum_cfg.trigger_threshold {
252 let _ = idx.lazy_cleanup_with(
253 |rid| heap.tuple_meta(*rid).map(|m| m.is_deleted).unwrap_or(false),
254 Some(vacuum_cfg.batch_limit),
255 );
256 }
257 }
258 },
259 )
260}
261
262pub fn spawn_mvcc_vacuum_worker(
263 transaction_manager: Arc<dyn TxnSnapshotOps>,
264 interval: Option<Duration>,
265 batch_limit: usize,
266) -> Option<WorkerHandle> {
267 let interval = interval?;
268 if interval.is_zero() || batch_limit == 0 {
269 return None;
270 }
271
272 let txn_mgr = transaction_manager.clone();
273 let tables = global_table_registry();
274
275 spawn_periodic_worker("mvcc-vacuum", WorkerKind::MvccVacuum, interval, move || {
276 let safe_xmin = txn_mgr
277 .oldest_active_txn()
278 .unwrap_or_else(|| txn_mgr.next_txn_id_hint());
279 let mut remaining = batch_limit;
280
281 for (table_ref, heap) in tables.iter_tables() {
282 if remaining == 0 {
283 break;
284 }
285 if matches!(table_ref.schema(), Some(schema) if schema == INFORMATION_SCHEMA_NAME) {
286 continue;
287 }
288
289 match vacuum_table_versions(&heap, &txn_mgr, safe_xmin, &mut remaining) {
290 Ok(cleaned) if cleaned > 0 => {
291 debug!(
292 "MVCC vacuum reclaimed {} tuple(s) from {}",
293 cleaned,
294 table_ref.to_log_string()
295 );
296 }
297 Ok(_) => {}
298 Err(err) => {
299 warn!(
300 "MVCC vacuum on {} failed: {}",
301 table_ref.to_log_string(),
302 err
303 );
304 }
305 }
306 }
307 })
308}
309
310impl CheckpointWal for WalManager {
311 fn max_assigned_lsn(&self) -> Lsn {
312 WalManager::max_assigned_lsn(self)
313 }
314
315 fn flush_until(&self, target: Lsn) -> QuillSQLResult<Lsn> {
316 WalManager::flush_until(self, target)
317 }
318
319 fn log_checkpoint(&self, payload: CheckpointPayload) -> QuillSQLResult<Lsn> {
320 WalManager::log_checkpoint(self, payload)
321 }
322}
323
324impl BufferMaintenance for BufferManager {
325 fn dirty_page_ids(&self) -> Vec<PageId> {
326 BufferManager::dirty_page_ids(self)
327 }
328
329 fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)> {
330 BufferManager::dirty_page_table_snapshot(self)
331 }
332
333 fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
334 BufferManager::flush_page(self, page_id)
335 }
336}
337
338impl TxnSnapshotOps for TransactionManager {
339 fn active_transactions(&self) -> Vec<TransactionId> {
340 TransactionManager::active_transactions(self)
341 }
342
343 fn oldest_active_txn(&self) -> Option<TransactionId> {
344 TransactionManager::oldest_active_txn(self)
345 }
346
347 fn next_txn_id_hint(&self) -> TransactionId {
348 TransactionManager::next_txn_id_hint(self)
349 }
350
351 fn transaction_status(&self, txn_id: TransactionId) -> TransactionStatus {
352 TransactionManager::transaction_status(self, txn_id)
353 }
354}
355
356fn spawn_periodic_worker<F>(
357 name: &str,
358 kind: WorkerKind,
359 interval: Duration,
360 mut tick: F,
361) -> Option<WorkerHandle>
362where
363 F: FnMut() + Send + 'static,
364{
365 let stop_flag = Arc::new(AtomicBool::new(false));
366 let thread_flag = Arc::clone(&stop_flag);
367
368 match thread::Builder::new().name(name.into()).spawn(move || {
369 while !thread_flag.load(Ordering::Relaxed) {
370 tick();
371 if thread_flag.load(Ordering::Relaxed) {
372 break;
373 }
374 thread::sleep(interval);
375 }
376 }) {
377 Ok(join_handle) => {
378 let stop_handle = Arc::clone(&stop_flag);
379 Some(WorkerHandle::new(
380 WorkerMetadata {
381 kind,
382 interval: Some(interval),
383 },
384 move || {
385 stop_handle.store(true, Ordering::Release);
386 },
387 Some(join_handle),
388 ))
389 }
390 Err(err) => {
391 warn!("Failed to spawn {}: {}", name, err);
392 None
393 }
394 }
395}
396
397fn vacuum_table_versions(
398 table: &Arc<TableHeap>,
399 txn_mgr: &Arc<dyn TxnSnapshotOps>,
400 safe_xmin: TransactionId,
401 remaining: &mut usize,
402) -> crate::error::QuillSQLResult<usize> {
403 if *remaining == 0 {
404 return Ok(0);
405 }
406
407 let mut cleaned = 0usize;
408 let mut iter = TableIterator::new(table.clone(), ..);
409 while *remaining > 0 {
410 match iter.next()? {
411 Some((rid, meta, _tuple)) => {
412 let removed = if meta.is_deleted {
413 try_reclaim_deleted(table, txn_mgr, safe_xmin, rid, &meta)?
414 } else {
415 try_reclaim_aborted(table, txn_mgr, safe_xmin, rid, &meta)?
416 };
417 if removed {
418 cleaned += 1;
419 *remaining -= 1;
420 }
421 }
422 None => break,
423 }
424 }
425 Ok(cleaned)
426}
427
428fn try_reclaim_deleted(
429 table: &Arc<TableHeap>,
430 txn_mgr: &Arc<dyn TxnSnapshotOps>,
431 safe_xmin: TransactionId,
432 rid: RecordId,
433 meta: &TupleMeta,
434) -> crate::error::QuillSQLResult<bool> {
435 if meta.delete_txn_id == 0 {
436 return Ok(false);
437 }
438 let status = txn_mgr.transaction_status(meta.delete_txn_id);
439 let removable = match status {
440 TransactionStatus::Committed => meta.delete_txn_id < safe_xmin,
441 TransactionStatus::Aborted => true,
442 TransactionStatus::InProgress | TransactionStatus::Unknown => false,
443 };
444 if !removable {
445 return Ok(false);
446 }
447
448 let delete_txn = meta.delete_txn_id;
449 let delete_cid = meta.delete_cid;
450 let insert_txn = meta.insert_txn_id;
451 table.vacuum_slot_if(rid, |current| {
452 current.is_deleted
453 && current.delete_txn_id == delete_txn
454 && current.delete_cid == delete_cid
455 && current.insert_txn_id == insert_txn
456 })
457}
458
459fn try_reclaim_aborted(
460 table: &Arc<TableHeap>,
461 txn_mgr: &Arc<dyn TxnSnapshotOps>,
462 safe_xmin: TransactionId,
463 rid: RecordId,
464 meta: &TupleMeta,
465) -> crate::error::QuillSQLResult<bool> {
466 if meta.insert_txn_id == 0 {
467 return Ok(false);
468 }
469 let status = txn_mgr.transaction_status(meta.insert_txn_id);
470 if status != TransactionStatus::Aborted {
471 return Ok(false);
472 }
473 if meta.insert_txn_id >= safe_xmin {
474 return Ok(false);
475 }
476
477 let insert_txn = meta.insert_txn_id;
478 table.vacuum_slot_if(rid, |current| {
479 !current.is_deleted && current.insert_txn_id == insert_txn
480 })
481}