reddb_server/storage/cache/bgwriter.rs
1//! Background writer task — Post-MVP credibility item.
2//!
3//! Decouples dirty-page eviction from query execution. A
4//! background tokio task scans the SIEVE buffer pool on a
5//! cadence and writes dirty pages to disk via the pager,
6//! freeing them for fast eviction by the next query that needs
7//! a buffer.
8//!
9//! Mirrors PG's `bgwriter.c`:
10//!
11//! - **bgwriter_delay** — sleep between rounds (default 200 ms)
12//! - **bgwriter_lru_maxpages** — max pages to flush per round
13//! - **bgwriter_lru_multiplier** — adaptive scaling based on
14//! recent allocation rate
15//!
16//! ## Why
17//!
18//! Without a bgwriter, every query that needs a fresh buffer
19//! pays the eviction cost: pick a dirty page, write it to
20//! disk via the pager (which involves DWB + WAL flush
21//! checkpoint), then return the cleaned slot. That write is
22//! ~1-5 ms per page on spinning disk, ~100-300 µs on SSD.
23//!
24//! With a bgwriter, the writes happen on a separate task
25//! during quiet moments. Query-side eviction finds clean
26//! pages most of the time and just takes them.
27//!
28//! ## Wiring
29//!
30//! Phase post-MVP wiring spawns the bgwriter as a tokio task
31//! during `Database::open`, parameterized by the config knobs
32//! above. The task holds a `Weak<PageCache>` so it doesn't
33//! prevent shutdown — when the cache is dropped, the next
34//! `upgrade()` returns None and the task exits.
35
36use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
37use std::sync::Arc;
38use std::time::Duration;
39
40/// Configuration for the background writer task. All fields
41/// have PG-equivalent defaults; production code can override
42/// via `BgWriterConfig::with_*` builders.
43#[derive(Debug, Clone, Copy)]
44pub struct BgWriterConfig {
45 /// Sleep between scan rounds.
46 pub delay: Duration,
47 /// Maximum dirty pages to flush per round. Soft limit —
48 /// the writer stops at the first round that hits it.
49 pub max_pages_per_round: usize,
50 /// LRU adaptive multiplier. The writer estimates how
51 /// many fresh buffers will be needed in the next round
52 /// based on recent allocation rate, then flushes
53 /// `multiplier × estimate`. Higher values flush more
54 /// aggressively; lower values save I/O at the cost of
55 /// query-side stall risk.
56 pub lru_multiplier: f64,
57 /// Soft cap on the dirty-page percentage. When the buffer
58 /// pool's dirty fraction exceeds this, the writer scans
59 /// every round regardless of `delay`.
60 pub max_dirty_fraction: f64,
61}
62
63impl Default for BgWriterConfig {
64 fn default() -> Self {
65 Self {
66 delay: Duration::from_millis(200),
67 max_pages_per_round: 100,
68 lru_multiplier: 2.0,
69 max_dirty_fraction: 0.5,
70 }
71 }
72}
73
74/// Diagnostic counters published by the background writer for
75/// monitoring / EXPLAIN ANALYZE-style introspection.
76#[derive(Debug, Default)]
77pub struct BgWriterStats {
78 /// Total scan rounds executed since startup.
79 pub rounds: AtomicU64,
80 /// Total pages flushed since startup.
81 pub pages_flushed: AtomicU64,
82 /// Total times the writer exited a round early because
83 /// it hit `max_pages_per_round`.
84 pub max_round_hit: AtomicU64,
85 /// Last reported dirty fraction (×1000 to keep it integer).
86 pub last_dirty_fraction_milli: AtomicU64,
87}
88
89impl BgWriterStats {
90 pub fn new() -> Arc<Self> {
91 Arc::new(Self::default())
92 }
93
94 /// Snapshot the counters as a plain struct for display.
95 pub fn snapshot(&self) -> BgWriterStatsSnapshot {
96 BgWriterStatsSnapshot {
97 rounds: self.rounds.load(Ordering::Relaxed),
98 pages_flushed: self.pages_flushed.load(Ordering::Relaxed),
99 max_round_hit: self.max_round_hit.load(Ordering::Relaxed),
100 last_dirty_fraction: self.last_dirty_fraction_milli.load(Ordering::Relaxed) as f64
101 / 1000.0,
102 }
103 }
104}
105
106#[derive(Debug, Clone, Copy)]
107pub struct BgWriterStatsSnapshot {
108 pub rounds: u64,
109 pub pages_flushed: u64,
110 pub max_round_hit: u64,
111 pub last_dirty_fraction: f64,
112}
113
114/// Trait the buffer pool must implement so the writer can
115/// scan and flush. Decoupled from the concrete `PageCache`
116/// type so this module doesn't pull the SIEVE implementation
117/// into its dep graph and so tests can plug a mock pool.
118pub trait DirtyPageFlusher: Send + Sync {
119 /// Estimated dirty fraction of the buffer pool, in [0, 1].
120 fn dirty_fraction(&self) -> f64;
121 /// Walk up to `max` dirty pages and flush them via the
122 /// pager. Returns the number actually flushed.
123 fn flush_some(&self, max: usize) -> usize;
124}
125
126/// Production `DirtyPageFlusher` wrapping the engine's `Pager`.
127/// Holds the pager via `Weak` so dropping the database doesn't
128/// keep the file alive via the background thread; the writer
129/// exits at the next round when the upgrade fails.
130pub struct PagerDirtyFlusher {
131 pager: std::sync::Weak<crate::storage::engine::pager::Pager>,
132}
133
134impl PagerDirtyFlusher {
135 pub fn new(pager: std::sync::Weak<crate::storage::engine::pager::Pager>) -> Self {
136 Self { pager }
137 }
138}
139
140impl DirtyPageFlusher for PagerDirtyFlusher {
141 fn dirty_fraction(&self) -> f64 {
142 match self.pager.upgrade() {
143 Some(p) => p.dirty_fraction(),
144 None => 0.0,
145 }
146 }
147
148 fn flush_some(&self, max: usize) -> usize {
149 let Some(p) = self.pager.upgrade() else {
150 return 0;
151 };
152 match p.flush_some_dirty(max) {
153 Ok(n) => n,
154 Err(err) => {
155 tracing::warn!(error = ?err, "bgwriter flush_some_dirty failed");
156 0
157 }
158 }
159 }
160}
161
162/// Shutdown handle returned by `spawn`. Drop the handle (or
163/// call `stop()` explicitly) to signal the task to exit at
164/// the start of its next round.
165pub struct BgWriterHandle {
166 stop: Arc<AtomicBool>,
167 pub stats: Arc<BgWriterStats>,
168}
169
170impl BgWriterHandle {
171 pub fn stop(&self) {
172 self.stop.store(true, Ordering::Release);
173 }
174}
175
176impl Drop for BgWriterHandle {
177 fn drop(&mut self) {
178 self.stop();
179 }
180}
181
182/// Spawn the background writer as a std::thread that polls
183/// the supplied flusher on a cadence. Returns a handle the
184/// caller drops to stop the loop.
185///
186/// The implementation is intentionally tokio-free so it works
187/// in the embedded-library use case where the user hasn't
188/// brought up a runtime.
189pub fn spawn(flusher: Arc<dyn DirtyPageFlusher>, config: BgWriterConfig) -> BgWriterHandle {
190 let stop = Arc::new(AtomicBool::new(false));
191 let stats = BgWriterStats::new();
192 let stop_clone = Arc::clone(&stop);
193 let stats_clone = Arc::clone(&stats);
194
195 std::thread::spawn(move || {
196 loop {
197 if stop_clone.load(Ordering::Acquire) {
198 break;
199 }
200 // Estimate: based on allocation rate proxy
201 // (lru_multiplier × max_pages_per_round / 4) plus
202 // a forced sweep when dirty fraction is high.
203 let dirty = flusher.dirty_fraction();
204 stats_clone
205 .last_dirty_fraction_milli
206 .store((dirty * 1000.0) as u64, Ordering::Relaxed);
207
208 let target_pages = if dirty > config.max_dirty_fraction {
209 // Aggressive: flush the full budget every round.
210 config.max_pages_per_round
211 } else {
212 // Adaptive: scale by lru_multiplier against a
213 // baseline of max_pages / 4. Clamped to budget.
214 ((config.max_pages_per_round as f64 / 4.0) * config.lru_multiplier) as usize
215 };
216 let target_pages = target_pages.min(config.max_pages_per_round);
217
218 let flushed = flusher.flush_some(target_pages);
219 stats_clone
220 .pages_flushed
221 .fetch_add(flushed as u64, Ordering::Relaxed);
222 stats_clone.rounds.fetch_add(1, Ordering::Relaxed);
223 if flushed >= config.max_pages_per_round {
224 stats_clone.max_round_hit.fetch_add(1, Ordering::Relaxed);
225 }
226
227 std::thread::sleep(config.delay);
228 }
229 });
230
231 BgWriterHandle { stop, stats }
232}