reddb_server/storage/wal/
checkpointer_task.rs1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45
46#[derive(Debug, Clone, Copy)]
48pub struct CheckpointConfig {
49 pub timeout: Duration,
51 pub max_wal_bytes: u64,
53 pub min_completion_target_ratio: f64,
56}
57
58impl Default for CheckpointConfig {
59 fn default() -> Self {
60 Self {
61 timeout: Duration::from_secs(300),
62 max_wal_bytes: 1024 * 1024 * 1024,
63 min_completion_target_ratio: 0.9,
64 }
65 }
66}
67
68#[derive(Debug, Default)]
70pub struct CheckpointStats {
71 pub checkpoints_completed: AtomicU64,
72 pub pages_flushed_total: AtomicU64,
73 pub wal_truncated_bytes: AtomicU64,
74 pub last_checkpoint_lsn: AtomicU64,
75 pub last_checkpoint_duration_ms: AtomicU64,
76}
77
78impl CheckpointStats {
79 pub fn new() -> Arc<Self> {
80 Arc::new(Self::default())
81 }
82
83 pub fn snapshot(&self) -> CheckpointStatsSnapshot {
84 CheckpointStatsSnapshot {
85 checkpoints_completed: self.checkpoints_completed.load(Ordering::Relaxed),
86 pages_flushed_total: self.pages_flushed_total.load(Ordering::Relaxed),
87 wal_truncated_bytes: self.wal_truncated_bytes.load(Ordering::Relaxed),
88 last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::Relaxed),
89 last_checkpoint_duration_ms: self.last_checkpoint_duration_ms.load(Ordering::Relaxed),
90 }
91 }
92}
93
94#[derive(Debug, Clone, Copy)]
95pub struct CheckpointStatsSnapshot {
96 pub checkpoints_completed: u64,
97 pub pages_flushed_total: u64,
98 pub wal_truncated_bytes: u64,
99 pub last_checkpoint_lsn: u64,
100 pub last_checkpoint_duration_ms: u64,
101}
102
103pub trait CheckpointDriver: Send + Sync {
107 fn current_wal_bytes(&self) -> u64;
110
111 fn last_checkpoint_wal_bytes(&self) -> u64;
113
114 fn run_checkpoint(&self) -> CheckpointResult;
121}
122
123#[derive(Debug, Clone, Copy)]
125pub struct CheckpointResult {
126 pub pages_flushed: u64,
127 pub new_redo_lsn: u64,
128 pub wal_truncated_bytes: u64,
129}
130
131pub struct CheckpointerHandle {
133 stop: Arc<AtomicBool>,
134 pub stats: Arc<CheckpointStats>,
135}
136
137impl CheckpointerHandle {
138 pub fn stop(&self) {
139 self.stop.store(true, Ordering::Release);
140 }
141}
142
143impl Drop for CheckpointerHandle {
144 fn drop(&mut self) {
145 self.stop();
146 }
147}
148
149pub fn spawn(driver: Arc<dyn CheckpointDriver>, config: CheckpointConfig) -> CheckpointerHandle {
153 let stop = Arc::new(AtomicBool::new(false));
154 let stats = CheckpointStats::new();
155 let stop_clone = Arc::clone(&stop);
156 let stats_clone = Arc::clone(&stats);
157
158 std::thread::spawn(move || {
159 let mut last_checkpoint_at = Instant::now();
160 loop {
161 if stop_clone.load(Ordering::Acquire) {
162 break;
163 }
164
165 let elapsed = last_checkpoint_at.elapsed();
166 let wal_grown = driver.current_wal_bytes() - driver.last_checkpoint_wal_bytes();
167
168 let trigger = elapsed >= config.timeout || wal_grown >= config.max_wal_bytes;
169 if trigger {
170 let start = Instant::now();
171 let result = driver.run_checkpoint();
172 let duration = start.elapsed();
173
174 stats_clone
175 .checkpoints_completed
176 .fetch_add(1, Ordering::Relaxed);
177 stats_clone
178 .pages_flushed_total
179 .fetch_add(result.pages_flushed, Ordering::Relaxed);
180 stats_clone
181 .wal_truncated_bytes
182 .fetch_add(result.wal_truncated_bytes, Ordering::Relaxed);
183 stats_clone
184 .last_checkpoint_lsn
185 .store(result.new_redo_lsn, Ordering::Relaxed);
186 stats_clone
187 .last_checkpoint_duration_ms
188 .store(duration.as_millis() as u64, Ordering::Relaxed);
189
190 last_checkpoint_at = Instant::now();
191 }
192
193 std::thread::sleep(Duration::from_secs(1));
195 }
196 });
197
198 CheckpointerHandle { stop, stats }
199}