quill_sql/background/
mod.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3use std::thread;
4use std::thread::JoinHandle;
5use std::time::Duration;
6
7use log::{debug, warn};
8
9use crate::buffer::{BufferManager, PageId};
10use crate::catalog::registry::TableRegistry;
11use crate::catalog::INFORMATION_SCHEMA_NAME;
12use crate::error::QuillSQLResult;
13use crate::recovery::wal::codec::CheckpointPayload;
14use crate::recovery::{Lsn, WalManager, WalWriterHandle};
15use crate::storage::page::{RecordId, TupleMeta};
16use crate::storage::table_heap::{TableHeap, TableIterator};
17use crate::transaction::{TransactionId, TransactionManager, TransactionStatus};
18
19pub trait CheckpointWal: Send + Sync {
20    fn max_assigned_lsn(&self) -> Lsn;
21    fn flush_until(&self, target: Lsn) -> QuillSQLResult<Lsn>;
22    fn log_checkpoint(&self, payload: CheckpointPayload) -> QuillSQLResult<Lsn>;
23}
24
25pub trait BufferMaintenance: Send + Sync {
26    fn dirty_page_ids(&self) -> Vec<PageId>;
27    fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)>;
28    fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool>;
29}
30
31pub trait TxnSnapshotOps: Send + Sync {
32    fn active_transactions(&self) -> Vec<TransactionId>;
33    fn oldest_active_txn(&self) -> Option<TransactionId>;
34    fn next_txn_id_hint(&self) -> TransactionId;
35    fn transaction_status(&self, txn_id: TransactionId) -> TransactionStatus;
36}
37
38pub trait WalFlushControl: Send + 'static {
39    fn stop(self) -> QuillSQLResult<()>;
40}
41
42/// High-level categories of background workers maintained by the database.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum WorkerKind {
45    WalWriter,
46    Checkpoint,
47    BufferPoolWriter,
48    MvccVacuum,
49}
50
51#[derive(Debug, Clone, Copy)]
52pub struct WorkerMetadata {
53    pub kind: WorkerKind,
54    pub interval: Option<Duration>,
55}
56
57pub struct WorkerHandle {
58    metadata: WorkerMetadata,
59    stop_fn: Option<Box<dyn FnOnce() + Send + 'static>>,
60    join_handle: Option<JoinHandle<()>>,
61}
62
63impl WorkerHandle {
64    pub fn new(
65        metadata: WorkerMetadata,
66        stop_fn: impl FnOnce() + Send + 'static,
67        join_handle: Option<JoinHandle<()>>,
68    ) -> Self {
69        Self {
70            metadata,
71            stop_fn: Some(Box::new(stop_fn)),
72            join_handle,
73        }
74    }
75
76    pub fn metadata(&self) -> WorkerMetadata {
77        self.metadata
78    }
79
80    pub fn shutdown(&mut self) {
81        if let Some(stop) = self.stop_fn.take() {
82            stop();
83        }
84    }
85
86    pub fn join(&mut self) {
87        if let Some(handle) = self.join_handle.take() {
88            if let Err(err) = handle.join() {
89                log::warn!(
90                    "Background worker {:?} terminated with panic: {:?}",
91                    self.metadata.kind,
92                    err
93                );
94            }
95        }
96    }
97}
98
99impl Drop for WorkerHandle {
100    fn drop(&mut self) {
101        self.shutdown();
102        self.join();
103    }
104}
105
106impl WalFlushControl for WalWriterHandle {
107    fn stop(self) -> QuillSQLResult<()> {
108        WalWriterHandle::stop(self)
109    }
110}
111
112pub struct BackgroundWorkers {
113    workers: Vec<WorkerHandle>,
114}
115
116impl Default for BackgroundWorkers {
117    fn default() -> Self {
118        Self::new()
119    }
120}
121
122impl BackgroundWorkers {
123    pub fn new() -> Self {
124        Self {
125            workers: Vec::new(),
126        }
127    }
128
129    pub fn register(&mut self, handle: WorkerHandle) {
130        self.workers.push(handle);
131    }
132
133    pub fn register_opt(&mut self, handle: Option<WorkerHandle>) {
134        if let Some(handle) = handle {
135            self.register(handle);
136        }
137    }
138
139    pub fn shutdown_all(&mut self) {
140        for worker in &mut self.workers {
141            worker.shutdown();
142        }
143        for worker in &mut self.workers {
144            worker.join();
145        }
146    }
147
148    pub fn workers(&self) -> &[WorkerHandle] {
149        &self.workers
150    }
151
152    pub fn snapshot(&self) -> Vec<WorkerMetadata> {
153        self.workers
154            .iter()
155            .map(|worker| worker.metadata())
156            .collect()
157    }
158}
159
160impl Drop for BackgroundWorkers {
161    fn drop(&mut self) {
162        self.shutdown_all();
163    }
164}
165
166pub fn wal_writer_worker<H>(handle: H, interval: Duration) -> WorkerHandle
167where
168    H: WalFlushControl,
169{
170    WorkerHandle::new(
171        WorkerMetadata {
172            kind: WorkerKind::WalWriter,
173            interval: Some(interval),
174        },
175        move || {
176            if let Err(err) = handle.stop() {
177                warn!("Failed to stop WAL writer: {}", err);
178            }
179        },
180        None,
181    )
182}
183
184pub fn spawn_checkpoint_worker(
185    wal: Arc<dyn CheckpointWal>,
186    buffer_pool: Arc<dyn BufferMaintenance>,
187    transaction_manager: Arc<dyn TxnSnapshotOps>,
188    interval: Option<Duration>,
189) -> Option<WorkerHandle> {
190    let interval = interval?;
191    if interval.is_zero() {
192        return None;
193    }
194
195    let wal_ref = wal.clone();
196    let buffer = buffer_pool.clone();
197    let txn_mgr = transaction_manager.clone();
198
199    spawn_periodic_worker(
200        "checkpoint-worker",
201        WorkerKind::Checkpoint,
202        interval,
203        move || {
204            let dirty_pages = buffer.dirty_page_ids();
205            let dpt_snapshot = buffer.dirty_page_table_snapshot();
206            let active_txns = txn_mgr.active_transactions();
207            let last_lsn = wal_ref.max_assigned_lsn();
208
209            if last_lsn != 0 {
210                if let Err(e) = wal_ref.flush_until(last_lsn) {
211                    warn!("Checkpoint flush failed: {}", e);
212                }
213                let payload = CheckpointPayload {
214                    last_lsn,
215                    dirty_pages,
216                    active_transactions: active_txns,
217                    dpt: dpt_snapshot,
218                };
219                if let Err(e) = wal_ref.log_checkpoint(payload) {
220                    warn!("Checkpoint write failed: {}", e);
221                }
222            }
223        },
224    )
225}
226
227pub fn spawn_bg_writer(
228    buffer_pool: Arc<dyn BufferMaintenance>,
229    interval: Option<Duration>,
230) -> Option<WorkerHandle> {
231    let interval = interval?;
232    if interval.is_zero() {
233        return None;
234    }
235    let bp = buffer_pool.clone();
236    spawn_periodic_worker(
237        "bg-writer",
238        WorkerKind::BufferPoolWriter,
239        interval,
240        move || {
241            let dirty_ids = bp.dirty_page_ids();
242            for page_id in dirty_ids.into_iter().take(16) {
243                let _ = bp.flush_page(page_id);
244            }
245        },
246    )
247}
248
249pub fn spawn_mvcc_vacuum_worker(
250    transaction_manager: Arc<dyn TxnSnapshotOps>,
251    interval: Option<Duration>,
252    batch_limit: usize,
253    table_registry: Arc<TableRegistry>,
254) -> Option<WorkerHandle> {
255    let interval = interval?;
256    if interval.is_zero() || batch_limit == 0 {
257        return None;
258    }
259
260    let txn_mgr = transaction_manager.clone();
261    let tables = table_registry.clone();
262
263    spawn_periodic_worker("mvcc-vacuum", WorkerKind::MvccVacuum, interval, move || {
264        let safe_xmin = txn_mgr
265            .oldest_active_txn()
266            .unwrap_or_else(|| txn_mgr.next_txn_id_hint());
267        let mut remaining = batch_limit;
268
269        for (table_ref, heap) in tables.iter_tables() {
270            if remaining == 0 {
271                break;
272            }
273            if matches!(table_ref.schema(), Some(schema) if schema == INFORMATION_SCHEMA_NAME) {
274                continue;
275            }
276
277            match vacuum_table_versions(&heap, &txn_mgr, safe_xmin, &mut remaining) {
278                Ok(cleaned) if cleaned > 0 => {
279                    debug!(
280                        "MVCC vacuum reclaimed {} tuple(s) from {}",
281                        cleaned,
282                        table_ref.to_log_string()
283                    );
284                }
285                Ok(_) => {}
286                Err(err) => {
287                    warn!(
288                        "MVCC vacuum on {} failed: {}",
289                        table_ref.to_log_string(),
290                        err
291                    );
292                }
293            }
294        }
295    })
296}
297
298impl CheckpointWal for WalManager {
299    fn max_assigned_lsn(&self) -> Lsn {
300        WalManager::max_assigned_lsn(self)
301    }
302
303    fn flush_until(&self, target: Lsn) -> QuillSQLResult<Lsn> {
304        WalManager::flush_until(self, target)
305    }
306
307    fn log_checkpoint(&self, payload: CheckpointPayload) -> QuillSQLResult<Lsn> {
308        WalManager::log_checkpoint(self, payload)
309    }
310}
311
312impl BufferMaintenance for BufferManager {
313    fn dirty_page_ids(&self) -> Vec<PageId> {
314        BufferManager::dirty_page_ids(self)
315    }
316
317    fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)> {
318        BufferManager::dirty_page_table_snapshot(self)
319    }
320
321    fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
322        BufferManager::flush_page(self, page_id)
323    }
324}
325
326impl TxnSnapshotOps for TransactionManager {
327    fn active_transactions(&self) -> Vec<TransactionId> {
328        TransactionManager::active_transactions(self)
329    }
330
331    fn oldest_active_txn(&self) -> Option<TransactionId> {
332        TransactionManager::oldest_active_txn(self)
333    }
334
335    fn next_txn_id_hint(&self) -> TransactionId {
336        TransactionManager::next_txn_id_hint(self)
337    }
338
339    fn transaction_status(&self, txn_id: TransactionId) -> TransactionStatus {
340        TransactionManager::transaction_status(self, txn_id)
341    }
342}
343
344fn spawn_periodic_worker<F>(
345    name: &str,
346    kind: WorkerKind,
347    interval: Duration,
348    mut tick: F,
349) -> Option<WorkerHandle>
350where
351    F: FnMut() + Send + 'static,
352{
353    let stop_flag = Arc::new(AtomicBool::new(false));
354    let thread_flag = Arc::clone(&stop_flag);
355
356    match thread::Builder::new().name(name.into()).spawn(move || {
357        while !thread_flag.load(Ordering::Relaxed) {
358            tick();
359            if thread_flag.load(Ordering::Relaxed) {
360                break;
361            }
362            thread::sleep(interval);
363        }
364    }) {
365        Ok(join_handle) => {
366            let stop_handle = Arc::clone(&stop_flag);
367            Some(WorkerHandle::new(
368                WorkerMetadata {
369                    kind,
370                    interval: Some(interval),
371                },
372                move || {
373                    stop_handle.store(true, Ordering::Release);
374                },
375                Some(join_handle),
376            ))
377        }
378        Err(err) => {
379            warn!("Failed to spawn {}: {}", name, err);
380            None
381        }
382    }
383}
384
385fn vacuum_table_versions(
386    table: &Arc<TableHeap>,
387    txn_mgr: &Arc<dyn TxnSnapshotOps>,
388    safe_xmin: TransactionId,
389    remaining: &mut usize,
390) -> crate::error::QuillSQLResult<usize> {
391    if *remaining == 0 {
392        return Ok(0);
393    }
394
395    let mut cleaned = 0usize;
396    let mut iter = TableIterator::new(table.clone(), ..);
397    while *remaining > 0 {
398        match iter.next()? {
399            Some((rid, meta, _tuple)) => {
400                let removed = if meta.is_deleted {
401                    try_reclaim_deleted(table, txn_mgr, safe_xmin, rid, &meta)?
402                } else {
403                    try_reclaim_aborted(table, txn_mgr, safe_xmin, rid, &meta)?
404                };
405                if removed {
406                    cleaned += 1;
407                    *remaining -= 1;
408                }
409            }
410            None => break,
411        }
412    }
413    Ok(cleaned)
414}
415
416fn try_reclaim_deleted(
417    table: &Arc<TableHeap>,
418    txn_mgr: &Arc<dyn TxnSnapshotOps>,
419    safe_xmin: TransactionId,
420    rid: RecordId,
421    meta: &TupleMeta,
422) -> crate::error::QuillSQLResult<bool> {
423    if meta.delete_txn_id == 0 {
424        return Ok(false);
425    }
426    let status = txn_mgr.transaction_status(meta.delete_txn_id);
427    let removable = match status {
428        TransactionStatus::Committed => meta.delete_txn_id < safe_xmin,
429        TransactionStatus::Aborted => true,
430        TransactionStatus::InProgress | TransactionStatus::Unknown => false,
431    };
432    if !removable {
433        return Ok(false);
434    }
435
436    let delete_txn = meta.delete_txn_id;
437    let delete_cid = meta.delete_cid;
438    let insert_txn = meta.insert_txn_id;
439    table.vacuum_slot_if(rid, |current| {
440        current.is_deleted
441            && current.delete_txn_id == delete_txn
442            && current.delete_cid == delete_cid
443            && current.insert_txn_id == insert_txn
444    })
445}
446
447fn try_reclaim_aborted(
448    table: &Arc<TableHeap>,
449    txn_mgr: &Arc<dyn TxnSnapshotOps>,
450    safe_xmin: TransactionId,
451    rid: RecordId,
452    meta: &TupleMeta,
453) -> crate::error::QuillSQLResult<bool> {
454    if meta.insert_txn_id == 0 {
455        return Ok(false);
456    }
457    let status = txn_mgr.transaction_status(meta.insert_txn_id);
458    if status != TransactionStatus::Aborted {
459        return Ok(false);
460    }
461    if meta.insert_txn_id >= safe_xmin {
462        return Ok(false);
463    }
464
465    let insert_txn = meta.insert_txn_id;
466    table.vacuum_slot_if(rid, |current| {
467        !current.is_deleted && current.insert_txn_id == insert_txn
468    })
469}