Skip to main content

reddb_server/storage/cache/
bgwriter.rs

1//! Background writer task — Post-MVP credibility item.
2//!
3//! Decouples dirty-page eviction from query execution. A
4//! background tokio task scans the SIEVE buffer pool on a
5//! cadence and writes dirty pages to disk via the pager,
6//! freeing them for fast eviction by the next query that needs
7//! a buffer.
8//!
9//! Mirrors PG's `bgwriter.c`:
10//!
11//! - **bgwriter_delay** — sleep between rounds (default 200 ms)
12//! - **bgwriter_lru_maxpages** — max pages to flush per round
13//! - **bgwriter_lru_multiplier** — adaptive scaling based on
14//!   recent allocation rate
15//!
16//! ## Why
17//!
18//! Without a bgwriter, every query that needs a fresh buffer
19//! pays the eviction cost: pick a dirty page, write it to
20//! disk via the pager (which involves DWB + WAL flush
21//! checkpoint), then return the cleaned slot. That write is
22//! ~1-5 ms per page on spinning disk, ~100-300 µs on SSD.
23//!
24//! With a bgwriter, the writes happen on a separate task
25//! during quiet moments. Query-side eviction finds clean
26//! pages most of the time and just takes them.
27//!
28//! ## Wiring
29//!
30//! Phase post-MVP wiring spawns the bgwriter as a tokio task
31//! during `Database::open`, parameterized by the config knobs
32//! above. The task holds a `Weak<PageCache>` so it doesn't
33//! prevent shutdown — when the cache is dropped, the next
34//! `upgrade()` returns None and the task exits.
35
36use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
37use std::sync::Arc;
38use std::time::Duration;
39
40/// Configuration for the background writer task. All fields
41/// have PG-equivalent defaults; production code can override
42/// via `BgWriterConfig::with_*` builders.
43#[derive(Debug, Clone, Copy)]
44pub struct BgWriterConfig {
45    /// Sleep between scan rounds.
46    pub delay: Duration,
47    /// Maximum dirty pages to flush per round. Soft limit —
48    /// the writer stops at the first round that hits it.
49    pub max_pages_per_round: usize,
50    /// LRU adaptive multiplier. The writer estimates how
51    /// many fresh buffers will be needed in the next round
52    /// based on recent allocation rate, then flushes
53    /// `multiplier × estimate`. Higher values flush more
54    /// aggressively; lower values save I/O at the cost of
55    /// query-side stall risk.
56    pub lru_multiplier: f64,
57    /// Soft cap on the dirty-page percentage. When the buffer
58    /// pool's dirty fraction exceeds this, the writer scans
59    /// every round regardless of `delay`.
60    pub max_dirty_fraction: f64,
61}
62
63impl Default for BgWriterConfig {
64    fn default() -> Self {
65        Self {
66            delay: Duration::from_millis(200),
67            max_pages_per_round: 100,
68            lru_multiplier: 2.0,
69            max_dirty_fraction: 0.5,
70        }
71    }
72}
73
74/// Diagnostic counters published by the background writer for
75/// monitoring / EXPLAIN ANALYZE-style introspection.
76#[derive(Debug, Default)]
77pub struct BgWriterStats {
78    /// Total scan rounds executed since startup.
79    pub rounds: AtomicU64,
80    /// Total pages flushed since startup.
81    pub pages_flushed: AtomicU64,
82    /// Total times the writer exited a round early because
83    /// it hit `max_pages_per_round`.
84    pub max_round_hit: AtomicU64,
85    /// Last reported dirty fraction (×1000 to keep it integer).
86    pub last_dirty_fraction_milli: AtomicU64,
87}
88
89impl BgWriterStats {
90    pub fn new() -> Arc<Self> {
91        Arc::new(Self::default())
92    }
93
94    /// Snapshot the counters as a plain struct for display.
95    pub fn snapshot(&self) -> BgWriterStatsSnapshot {
96        BgWriterStatsSnapshot {
97            rounds: self.rounds.load(Ordering::Relaxed),
98            pages_flushed: self.pages_flushed.load(Ordering::Relaxed),
99            max_round_hit: self.max_round_hit.load(Ordering::Relaxed),
100            last_dirty_fraction: self.last_dirty_fraction_milli.load(Ordering::Relaxed) as f64
101                / 1000.0,
102        }
103    }
104}
105
106#[derive(Debug, Clone, Copy)]
107pub struct BgWriterStatsSnapshot {
108    pub rounds: u64,
109    pub pages_flushed: u64,
110    pub max_round_hit: u64,
111    pub last_dirty_fraction: f64,
112}
113
114/// Trait the buffer pool must implement so the writer can
115/// scan and flush. Decoupled from the concrete `PageCache`
116/// type so this module doesn't pull the SIEVE implementation
117/// into its dep graph and so tests can plug a mock pool.
118pub trait DirtyPageFlusher: Send + Sync {
119    /// Estimated dirty fraction of the buffer pool, in [0, 1].
120    fn dirty_fraction(&self) -> f64;
121    /// Walk up to `max` dirty pages and flush them via the
122    /// pager. Returns the number actually flushed.
123    fn flush_some(&self, max: usize) -> usize;
124}
125
126/// Production `DirtyPageFlusher` wrapping the engine's `Pager`.
127/// Holds the pager via `Weak` so dropping the database doesn't
128/// keep the file alive via the background thread; the writer
129/// exits at the next round when the upgrade fails.
130pub struct PagerDirtyFlusher {
131    pager: std::sync::Weak<crate::storage::engine::pager::Pager>,
132}
133
134impl PagerDirtyFlusher {
135    pub fn new(pager: std::sync::Weak<crate::storage::engine::pager::Pager>) -> Self {
136        Self { pager }
137    }
138}
139
140impl DirtyPageFlusher for PagerDirtyFlusher {
141    fn dirty_fraction(&self) -> f64 {
142        match self.pager.upgrade() {
143            Some(p) => p.dirty_fraction(),
144            None => 0.0,
145        }
146    }
147
148    fn flush_some(&self, max: usize) -> usize {
149        let Some(p) = self.pager.upgrade() else {
150            return 0;
151        };
152        match p.flush_some_dirty(max) {
153            Ok(n) => n,
154            Err(err) => {
155                tracing::warn!(error = ?err, "bgwriter flush_some_dirty failed");
156                0
157            }
158        }
159    }
160}
161
162/// Shutdown handle returned by `spawn`. Drop the handle (or
163/// call `stop()` explicitly) to signal the task to exit at
164/// the start of its next round.
165pub struct BgWriterHandle {
166    stop: Arc<AtomicBool>,
167    pub stats: Arc<BgWriterStats>,
168}
169
170impl BgWriterHandle {
171    pub fn stop(&self) {
172        self.stop.store(true, Ordering::Release);
173    }
174}
175
176impl Drop for BgWriterHandle {
177    fn drop(&mut self) {
178        self.stop();
179    }
180}
181
182/// Spawn the background writer as a std::thread that polls
183/// the supplied flusher on a cadence. Returns a handle the
184/// caller drops to stop the loop.
185///
186/// The implementation is intentionally tokio-free so it works
187/// in the embedded-library use case where the user hasn't
188/// brought up a runtime.
189pub fn spawn(flusher: Arc<dyn DirtyPageFlusher>, config: BgWriterConfig) -> BgWriterHandle {
190    let stop = Arc::new(AtomicBool::new(false));
191    let stats = BgWriterStats::new();
192    let stop_clone = Arc::clone(&stop);
193    let stats_clone = Arc::clone(&stats);
194
195    std::thread::spawn(move || {
196        loop {
197            if stop_clone.load(Ordering::Acquire) {
198                break;
199            }
200            // Estimate: based on allocation rate proxy
201            // (lru_multiplier × max_pages_per_round / 4) plus
202            // a forced sweep when dirty fraction is high.
203            let dirty = flusher.dirty_fraction();
204            stats_clone
205                .last_dirty_fraction_milli
206                .store((dirty * 1000.0) as u64, Ordering::Relaxed);
207
208            let target_pages = if dirty > config.max_dirty_fraction {
209                // Aggressive: flush the full budget every round.
210                config.max_pages_per_round
211            } else {
212                // Adaptive: scale by lru_multiplier against a
213                // baseline of max_pages / 4. Clamped to budget.
214                ((config.max_pages_per_round as f64 / 4.0) * config.lru_multiplier) as usize
215            };
216            let target_pages = target_pages.min(config.max_pages_per_round);
217
218            let flushed = flusher.flush_some(target_pages);
219            stats_clone
220                .pages_flushed
221                .fetch_add(flushed as u64, Ordering::Relaxed);
222            stats_clone.rounds.fetch_add(1, Ordering::Relaxed);
223            if flushed >= config.max_pages_per_round {
224                stats_clone.max_round_hit.fetch_add(1, Ordering::Relaxed);
225            }
226
227            std::thread::sleep(config.delay);
228        }
229    });
230
231    BgWriterHandle { stop, stats }
232}