Skip to main content

reddb_server/storage/wal/
checkpointer_task.rs

1//! Checkpointer task — Post-MVP credibility item.
2//!
3//! Pairs with `cache/bgwriter.rs` to give reddb the
4//! Postgres-style three-tier I/O setup:
5//!
6//! 1. **bgwriter** — flushes dirty pages on a cadence so
7//!    eviction is fast.
8//! 2. **checkpointer** — periodically writes a checkpoint
9//!    record to the WAL, fsyncs every dirty page, and
10//!    advances the recovery floor (the LSN below which the
11//!    WAL can be truncated).
12//! 3. **walwriter** — flushes the WAL itself (already in
13//!    `wal/group_commit.rs`).
14//!
15//! Mirrors PG's `checkpointer.c`. Each checkpoint:
16//!
17//! 1. Records start LSN.
18//! 2. Walks every dirty page, calling pager flush.
19//! 3. Records end LSN.
20//! 4. Writes a `Checkpoint` WAL record with both LSNs.
21//! 5. Truncates the WAL up to the previous checkpoint's
22//!    redo pointer.
23//!
24//! Crash recovery starts from the most recent checkpoint
25//! record's redo pointer instead of replaying the entire
26//! WAL — bounded by checkpoint_timeout (default 5 min) so
27//! recovery wall time is bounded too.
28//!
29//! ## Why
30//!
31//! Without checkpoints, the WAL grows unbounded and crash
32//! recovery has to replay everything since database creation.
33//! With checkpoints every N minutes / M MB of WAL written,
34//! recovery is bounded to "WAL since last checkpoint".
35//!
36//! ## Wiring
37//!
38//! Phase post-MVP wiring spawns this task during
39//! `Database::open` alongside the bgwriter. Stops when the
40//! database is dropped.
41
42use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45
46/// Checkpoint cadence configuration.
47#[derive(Debug, Clone, Copy)]
48pub struct CheckpointConfig {
49    /// Maximum time between checkpoints. PG default 5 min.
50    pub timeout: Duration,
51    /// Maximum WAL bytes between checkpoints. PG default 1 GB.
52    pub max_wal_bytes: u64,
53    /// Throttle: if a checkpoint runs faster than this, sleep
54    /// before the next round to avoid I/O storms.
55    pub min_completion_target_ratio: f64,
56}
57
58impl Default for CheckpointConfig {
59    fn default() -> Self {
60        Self {
61            timeout: Duration::from_secs(300),
62            max_wal_bytes: 1024 * 1024 * 1024,
63            min_completion_target_ratio: 0.9,
64        }
65    }
66}
67
68/// Checkpoint diagnostic counters.
69#[derive(Debug, Default)]
70pub struct CheckpointStats {
71    pub checkpoints_completed: AtomicU64,
72    pub pages_flushed_total: AtomicU64,
73    pub wal_truncated_bytes: AtomicU64,
74    pub last_checkpoint_lsn: AtomicU64,
75    pub last_checkpoint_duration_ms: AtomicU64,
76}
77
78impl CheckpointStats {
79    pub fn new() -> Arc<Self> {
80        Arc::new(Self::default())
81    }
82
83    pub fn snapshot(&self) -> CheckpointStatsSnapshot {
84        CheckpointStatsSnapshot {
85            checkpoints_completed: self.checkpoints_completed.load(Ordering::Relaxed),
86            pages_flushed_total: self.pages_flushed_total.load(Ordering::Relaxed),
87            wal_truncated_bytes: self.wal_truncated_bytes.load(Ordering::Relaxed),
88            last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::Relaxed),
89            last_checkpoint_duration_ms: self.last_checkpoint_duration_ms.load(Ordering::Relaxed),
90        }
91    }
92}
93
94#[derive(Debug, Clone, Copy)]
95pub struct CheckpointStatsSnapshot {
96    pub checkpoints_completed: u64,
97    pub pages_flushed_total: u64,
98    pub wal_truncated_bytes: u64,
99    pub last_checkpoint_lsn: u64,
100    pub last_checkpoint_duration_ms: u64,
101}
102
103/// Trait the database must implement so the checkpointer can
104/// drive the actual checkpoint without depending on the full
105/// `Database` struct.
106pub trait CheckpointDriver: Send + Sync {
107    /// Current WAL byte position. Used to decide when WAL has
108    /// grown enough to trigger a checkpoint.
109    fn current_wal_bytes(&self) -> u64;
110
111    /// Last completed checkpoint's WAL byte position.
112    fn last_checkpoint_wal_bytes(&self) -> u64;
113
114    /// Run a full checkpoint:
115    /// 1. Walk every dirty page, flush via pager.
116    /// 2. Write a Checkpoint WAL record.
117    /// 3. Truncate WAL up to the previous checkpoint's redo.
118    ///
119    /// Returns (pages_flushed, new_redo_lsn, truncated_bytes).
120    fn run_checkpoint(&self) -> CheckpointResult;
121}
122
123/// Result of a single checkpoint pass.
124#[derive(Debug, Clone, Copy)]
125pub struct CheckpointResult {
126    pub pages_flushed: u64,
127    pub new_redo_lsn: u64,
128    pub wal_truncated_bytes: u64,
129}
130
131/// Shutdown handle returned by `spawn`.
132pub struct CheckpointerHandle {
133    stop: Arc<AtomicBool>,
134    pub stats: Arc<CheckpointStats>,
135}
136
137impl CheckpointerHandle {
138    pub fn stop(&self) {
139        self.stop.store(true, Ordering::Release);
140    }
141}
142
143impl Drop for CheckpointerHandle {
144    fn drop(&mut self) {
145        self.stop();
146    }
147}
148
149/// Spawn the checkpointer thread. Wakes on a 1-second cadence
150/// to check both the timeout and the WAL byte threshold; runs
151/// a checkpoint as soon as either trips.
152pub fn spawn(driver: Arc<dyn CheckpointDriver>, config: CheckpointConfig) -> CheckpointerHandle {
153    let stop = Arc::new(AtomicBool::new(false));
154    let stats = CheckpointStats::new();
155    let stop_clone = Arc::clone(&stop);
156    let stats_clone = Arc::clone(&stats);
157
158    std::thread::spawn(move || {
159        let mut last_checkpoint_at = Instant::now();
160        loop {
161            if stop_clone.load(Ordering::Acquire) {
162                break;
163            }
164
165            let elapsed = last_checkpoint_at.elapsed();
166            let wal_grown = driver.current_wal_bytes() - driver.last_checkpoint_wal_bytes();
167
168            let trigger = elapsed >= config.timeout || wal_grown >= config.max_wal_bytes;
169            if trigger {
170                let start = Instant::now();
171                let result = driver.run_checkpoint();
172                let duration = start.elapsed();
173
174                stats_clone
175                    .checkpoints_completed
176                    .fetch_add(1, Ordering::Relaxed);
177                stats_clone
178                    .pages_flushed_total
179                    .fetch_add(result.pages_flushed, Ordering::Relaxed);
180                stats_clone
181                    .wal_truncated_bytes
182                    .fetch_add(result.wal_truncated_bytes, Ordering::Relaxed);
183                stats_clone
184                    .last_checkpoint_lsn
185                    .store(result.new_redo_lsn, Ordering::Relaxed);
186                stats_clone
187                    .last_checkpoint_duration_ms
188                    .store(duration.as_millis() as u64, Ordering::Relaxed);
189
190                last_checkpoint_at = Instant::now();
191            }
192
193            // Wake up every second to check.
194            std::thread::sleep(Duration::from_secs(1));
195        }
196    });
197
198    CheckpointerHandle { stop, stats }
199}