quill_sql/background/
mod.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3use std::thread;
4use std::thread::JoinHandle;
5use std::time::Duration;
6
7use log::{debug, warn};
8
9use crate::buffer::{BufferManager, PageId};
10use crate::catalog::registry::{global_index_registry, global_table_registry};
11use crate::catalog::INFORMATION_SCHEMA_NAME;
12use crate::config::IndexVacuumConfig;
13use crate::error::QuillSQLResult;
14use crate::recovery::wal::codec::CheckpointPayload;
15use crate::recovery::{Lsn, WalManager, WalWriterHandle};
16use crate::storage::page::{RecordId, TupleMeta};
17use crate::storage::table_heap::{TableHeap, TableIterator};
18use crate::transaction::{TransactionId, TransactionManager, TransactionStatus};
19
20pub trait CheckpointWal: Send + Sync {
21    fn max_assigned_lsn(&self) -> Lsn;
22    fn flush_until(&self, target: Lsn) -> QuillSQLResult<Lsn>;
23    fn log_checkpoint(&self, payload: CheckpointPayload) -> QuillSQLResult<Lsn>;
24}
25
26pub trait BufferMaintenance: Send + Sync {
27    fn dirty_page_ids(&self) -> Vec<PageId>;
28    fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)>;
29    fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool>;
30}
31
32pub trait TxnSnapshotOps: Send + Sync {
33    fn active_transactions(&self) -> Vec<TransactionId>;
34    fn oldest_active_txn(&self) -> Option<TransactionId>;
35    fn next_txn_id_hint(&self) -> TransactionId;
36    fn transaction_status(&self, txn_id: TransactionId) -> TransactionStatus;
37}
38
39pub trait WalFlushControl: Send + 'static {
40    fn stop(self) -> QuillSQLResult<()>;
41}
42
43/// High-level categories of background workers maintained by the database.
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum WorkerKind {
46    WalWriter,
47    Checkpoint,
48    BufferPoolWriter,
49    MvccVacuum,
50}
51
52#[derive(Debug, Clone, Copy)]
53pub struct WorkerMetadata {
54    pub kind: WorkerKind,
55    pub interval: Option<Duration>,
56}
57
58pub struct WorkerHandle {
59    metadata: WorkerMetadata,
60    stop_fn: Option<Box<dyn FnOnce() + Send + 'static>>,
61    join_handle: Option<JoinHandle<()>>,
62}
63
64impl WorkerHandle {
65    pub fn new(
66        metadata: WorkerMetadata,
67        stop_fn: impl FnOnce() + Send + 'static,
68        join_handle: Option<JoinHandle<()>>,
69    ) -> Self {
70        Self {
71            metadata,
72            stop_fn: Some(Box::new(stop_fn)),
73            join_handle,
74        }
75    }
76
77    pub fn metadata(&self) -> WorkerMetadata {
78        self.metadata
79    }
80
81    pub fn shutdown(&mut self) {
82        if let Some(stop) = self.stop_fn.take() {
83            stop();
84        }
85    }
86
87    pub fn join(&mut self) {
88        if let Some(handle) = self.join_handle.take() {
89            if let Err(err) = handle.join() {
90                log::warn!(
91                    "Background worker {:?} terminated with panic: {:?}",
92                    self.metadata.kind,
93                    err
94                );
95            }
96        }
97    }
98}
99
100impl Drop for WorkerHandle {
101    fn drop(&mut self) {
102        self.shutdown();
103        self.join();
104    }
105}
106
107impl WalFlushControl for WalWriterHandle {
108    fn stop(self) -> QuillSQLResult<()> {
109        WalWriterHandle::stop(self)
110    }
111}
112
113pub struct BackgroundWorkers {
114    workers: Vec<WorkerHandle>,
115}
116
117impl Default for BackgroundWorkers {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123impl BackgroundWorkers {
124    pub fn new() -> Self {
125        Self {
126            workers: Vec::new(),
127        }
128    }
129
130    pub fn register(&mut self, handle: WorkerHandle) {
131        self.workers.push(handle);
132    }
133
134    pub fn register_opt(&mut self, handle: Option<WorkerHandle>) {
135        if let Some(handle) = handle {
136            self.register(handle);
137        }
138    }
139
140    pub fn shutdown_all(&mut self) {
141        for worker in &mut self.workers {
142            worker.shutdown();
143        }
144        for worker in &mut self.workers {
145            worker.join();
146        }
147    }
148
149    pub fn workers(&self) -> &[WorkerHandle] {
150        &self.workers
151    }
152
153    pub fn snapshot(&self) -> Vec<WorkerMetadata> {
154        self.workers
155            .iter()
156            .map(|worker| worker.metadata())
157            .collect()
158    }
159}
160
161impl Drop for BackgroundWorkers {
162    fn drop(&mut self) {
163        self.shutdown_all();
164    }
165}
166
167pub fn wal_writer_worker<H>(handle: H, interval: Duration) -> WorkerHandle
168where
169    H: WalFlushControl,
170{
171    WorkerHandle::new(
172        WorkerMetadata {
173            kind: WorkerKind::WalWriter,
174            interval: Some(interval),
175        },
176        move || {
177            if let Err(err) = handle.stop() {
178                warn!("Failed to stop WAL writer: {}", err);
179            }
180        },
181        None,
182    )
183}
184
185pub fn spawn_checkpoint_worker(
186    wal: Arc<dyn CheckpointWal>,
187    buffer_pool: Arc<dyn BufferMaintenance>,
188    transaction_manager: Arc<dyn TxnSnapshotOps>,
189    interval: Option<Duration>,
190) -> Option<WorkerHandle> {
191    let interval = interval?;
192    if interval.is_zero() {
193        return None;
194    }
195
196    let wal_ref = wal.clone();
197    let buffer = buffer_pool.clone();
198    let txn_mgr = transaction_manager.clone();
199
200    spawn_periodic_worker(
201        "checkpoint-worker",
202        WorkerKind::Checkpoint,
203        interval,
204        move || {
205            let dirty_pages = buffer.dirty_page_ids();
206            let dpt_snapshot = buffer.dirty_page_table_snapshot();
207            let active_txns = txn_mgr.active_transactions();
208            let last_lsn = wal_ref.max_assigned_lsn();
209
210            if last_lsn != 0 {
211                if let Err(e) = wal_ref.flush_until(last_lsn) {
212                    warn!("Checkpoint flush failed: {}", e);
213                }
214                let payload = CheckpointPayload {
215                    last_lsn,
216                    dirty_pages,
217                    active_transactions: active_txns,
218                    dpt: dpt_snapshot,
219                };
220                if let Err(e) = wal_ref.log_checkpoint(payload) {
221                    warn!("Checkpoint write failed: {}", e);
222                }
223            }
224        },
225    )
226}
227
228pub fn spawn_bg_writer(
229    buffer_pool: Arc<dyn BufferMaintenance>,
230    interval: Option<Duration>,
231    vacuum_cfg: IndexVacuumConfig,
232) -> Option<WorkerHandle> {
233    let interval = interval?;
234    if interval.is_zero() {
235        return None;
236    }
237    let bp = buffer_pool.clone();
238    spawn_periodic_worker(
239        "bg-writer",
240        WorkerKind::BufferPoolWriter,
241        interval,
242        move || {
243            let dirty_ids = bp.dirty_page_ids();
244            for page_id in dirty_ids.into_iter().take(16) {
245                let _ = bp.flush_page(page_id);
246            }
247
248            let registry = global_index_registry();
249            for (idx, heap) in registry.iter().take(16) {
250                let pending = idx.take_pending_garbage();
251                if pending >= vacuum_cfg.trigger_threshold {
252                    let _ = idx.lazy_cleanup_with(
253                        |rid| heap.tuple_meta(*rid).map(|m| m.is_deleted).unwrap_or(false),
254                        Some(vacuum_cfg.batch_limit),
255                    );
256                }
257            }
258        },
259    )
260}
261
262pub fn spawn_mvcc_vacuum_worker(
263    transaction_manager: Arc<dyn TxnSnapshotOps>,
264    interval: Option<Duration>,
265    batch_limit: usize,
266) -> Option<WorkerHandle> {
267    let interval = interval?;
268    if interval.is_zero() || batch_limit == 0 {
269        return None;
270    }
271
272    let txn_mgr = transaction_manager.clone();
273    let tables = global_table_registry();
274
275    spawn_periodic_worker("mvcc-vacuum", WorkerKind::MvccVacuum, interval, move || {
276        let safe_xmin = txn_mgr
277            .oldest_active_txn()
278            .unwrap_or_else(|| txn_mgr.next_txn_id_hint());
279        let mut remaining = batch_limit;
280
281        for (table_ref, heap) in tables.iter_tables() {
282            if remaining == 0 {
283                break;
284            }
285            if matches!(table_ref.schema(), Some(schema) if schema == INFORMATION_SCHEMA_NAME) {
286                continue;
287            }
288
289            match vacuum_table_versions(&heap, &txn_mgr, safe_xmin, &mut remaining) {
290                Ok(cleaned) if cleaned > 0 => {
291                    debug!(
292                        "MVCC vacuum reclaimed {} tuple(s) from {}",
293                        cleaned,
294                        table_ref.to_log_string()
295                    );
296                }
297                Ok(_) => {}
298                Err(err) => {
299                    warn!(
300                        "MVCC vacuum on {} failed: {}",
301                        table_ref.to_log_string(),
302                        err
303                    );
304                }
305            }
306        }
307    })
308}
309
310impl CheckpointWal for WalManager {
311    fn max_assigned_lsn(&self) -> Lsn {
312        WalManager::max_assigned_lsn(self)
313    }
314
315    fn flush_until(&self, target: Lsn) -> QuillSQLResult<Lsn> {
316        WalManager::flush_until(self, target)
317    }
318
319    fn log_checkpoint(&self, payload: CheckpointPayload) -> QuillSQLResult<Lsn> {
320        WalManager::log_checkpoint(self, payload)
321    }
322}
323
324impl BufferMaintenance for BufferManager {
325    fn dirty_page_ids(&self) -> Vec<PageId> {
326        BufferManager::dirty_page_ids(self)
327    }
328
329    fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)> {
330        BufferManager::dirty_page_table_snapshot(self)
331    }
332
333    fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
334        BufferManager::flush_page(self, page_id)
335    }
336}
337
338impl TxnSnapshotOps for TransactionManager {
339    fn active_transactions(&self) -> Vec<TransactionId> {
340        TransactionManager::active_transactions(self)
341    }
342
343    fn oldest_active_txn(&self) -> Option<TransactionId> {
344        TransactionManager::oldest_active_txn(self)
345    }
346
347    fn next_txn_id_hint(&self) -> TransactionId {
348        TransactionManager::next_txn_id_hint(self)
349    }
350
351    fn transaction_status(&self, txn_id: TransactionId) -> TransactionStatus {
352        TransactionManager::transaction_status(self, txn_id)
353    }
354}
355
356fn spawn_periodic_worker<F>(
357    name: &str,
358    kind: WorkerKind,
359    interval: Duration,
360    mut tick: F,
361) -> Option<WorkerHandle>
362where
363    F: FnMut() + Send + 'static,
364{
365    let stop_flag = Arc::new(AtomicBool::new(false));
366    let thread_flag = Arc::clone(&stop_flag);
367
368    match thread::Builder::new().name(name.into()).spawn(move || {
369        while !thread_flag.load(Ordering::Relaxed) {
370            tick();
371            if thread_flag.load(Ordering::Relaxed) {
372                break;
373            }
374            thread::sleep(interval);
375        }
376    }) {
377        Ok(join_handle) => {
378            let stop_handle = Arc::clone(&stop_flag);
379            Some(WorkerHandle::new(
380                WorkerMetadata {
381                    kind,
382                    interval: Some(interval),
383                },
384                move || {
385                    stop_handle.store(true, Ordering::Release);
386                },
387                Some(join_handle),
388            ))
389        }
390        Err(err) => {
391            warn!("Failed to spawn {}: {}", name, err);
392            None
393        }
394    }
395}
396
397fn vacuum_table_versions(
398    table: &Arc<TableHeap>,
399    txn_mgr: &Arc<dyn TxnSnapshotOps>,
400    safe_xmin: TransactionId,
401    remaining: &mut usize,
402) -> crate::error::QuillSQLResult<usize> {
403    if *remaining == 0 {
404        return Ok(0);
405    }
406
407    let mut cleaned = 0usize;
408    let mut iter = TableIterator::new(table.clone(), ..);
409    while *remaining > 0 {
410        match iter.next()? {
411            Some((rid, meta, _tuple)) => {
412                let removed = if meta.is_deleted {
413                    try_reclaim_deleted(table, txn_mgr, safe_xmin, rid, &meta)?
414                } else {
415                    try_reclaim_aborted(table, txn_mgr, safe_xmin, rid, &meta)?
416                };
417                if removed {
418                    cleaned += 1;
419                    *remaining -= 1;
420                }
421            }
422            None => break,
423        }
424    }
425    Ok(cleaned)
426}
427
428fn try_reclaim_deleted(
429    table: &Arc<TableHeap>,
430    txn_mgr: &Arc<dyn TxnSnapshotOps>,
431    safe_xmin: TransactionId,
432    rid: RecordId,
433    meta: &TupleMeta,
434) -> crate::error::QuillSQLResult<bool> {
435    if meta.delete_txn_id == 0 {
436        return Ok(false);
437    }
438    let status = txn_mgr.transaction_status(meta.delete_txn_id);
439    let removable = match status {
440        TransactionStatus::Committed => meta.delete_txn_id < safe_xmin,
441        TransactionStatus::Aborted => true,
442        TransactionStatus::InProgress | TransactionStatus::Unknown => false,
443    };
444    if !removable {
445        return Ok(false);
446    }
447
448    let delete_txn = meta.delete_txn_id;
449    let delete_cid = meta.delete_cid;
450    let insert_txn = meta.insert_txn_id;
451    table.vacuum_slot_if(rid, |current| {
452        current.is_deleted
453            && current.delete_txn_id == delete_txn
454            && current.delete_cid == delete_cid
455            && current.insert_txn_id == insert_txn
456    })
457}
458
459fn try_reclaim_aborted(
460    table: &Arc<TableHeap>,
461    txn_mgr: &Arc<dyn TxnSnapshotOps>,
462    safe_xmin: TransactionId,
463    rid: RecordId,
464    meta: &TupleMeta,
465) -> crate::error::QuillSQLResult<bool> {
466    if meta.insert_txn_id == 0 {
467        return Ok(false);
468    }
469    let status = txn_mgr.transaction_status(meta.insert_txn_id);
470    if status != TransactionStatus::Aborted {
471        return Ok(false);
472    }
473    if meta.insert_txn_id >= safe_xmin {
474        return Ok(false);
475    }
476
477    let insert_txn = meta.insert_txn_id;
478    table.vacuum_slot_if(rid, |current| {
479        !current.is_deleted && current.insert_txn_id == insert_txn
480    })
481}