reddb_server/storage/wal/group_commit.rs
1//! Cooperative group-commit coordinator for the WAL.
2//!
3//! Mirrors PostgreSQL's `XLogFlush` waiter logic
4//! (`src/backend/access/transam/xlog.c`). The single-writer commit
5//! path used to call `wal.sync()` once per commit, so N concurrent
6//! writers paid N independent fsyncs (~N × 100 µs on SSD). Group
7//! commit collapses those into **one** fsync that covers every byte
8//! appended up to the slowest writer's LSN.
9//!
10//! # Algorithm
11//!
12//! 1. A writer appends its records under the WAL lock and captures
13//! the resulting `commit_lsn = wal.current_lsn()` after its
14//! `Commit` record.
15//! 2. The writer releases the WAL lock and calls
16//! [`GroupCommit::commit_at_least(commit_lsn, &wal)`].
17//! 3. Inside `commit_at_least`:
18//! - **Fast path:** if `flushed_lsn >= commit_lsn`, the write is
19//! already durable from a piggyback on a previous fsync.
20//! Return immediately.
21//! - Otherwise take the coordinator state lock. Re-check the
22//! flushed LSN (another leader may have raced).
23//! - If a leader is already mid-flush, wait on the condvar until
24//! that flush finishes. Re-check `flushed_lsn`; if the batch now
25//! covers `commit_lsn`, return, otherwise race to become the next
26//! leader for the remaining tail.
27//! - If no leader is in progress, become the leader: mark
28//! `in_progress = true`, drop the state lock, take the WAL
29//! lock, call `wal.sync()`, publish the new `flushed_lsn`,
30//! take the state lock again, clear `in_progress`, and
31//! notify all waiters.
32//!
33//! # Why this works
34//!
35//! Between the first writer's `append` and the leader's `wal.sync()`,
36//! other writers can grab the WAL lock and append more records.
37//! When the leader finally calls `sync()`, it flushes **everything**
38//! that has been appended so far — not just its own records. Writers
39//! whose LSN is now covered return immediately; writers that appended
40//! after the leader captured `target_lsn` wake up, see they still need
41//! more durability, and one of them becomes the next leader.
42//!
43//! So `commit_at_least` produces one fsync per *batch* of concurrent
44//! writers, not per writer. On a workload with 8 concurrent
45//! committers, the throughput goes from ~8 × 100 µs ≈ 1 250
46//! commits/s to ~1 × 100 µs ≈ 10 000 commits/s, an 8× win.
47//!
48//! # Correctness
49//!
50//! - `flushed_lsn` is monotonic: only the leader writes it, and
51//! only after a successful `sync()`.
52//! - The state lock + condvar guarantee that exactly one leader is
53//! ever in flight, so we never have two parallel fsyncs racing.
54//! - Waiters re-check `flushed_lsn` under the state lock before
55//! sleeping, so we never miss a wake-up.
56//! - The leader does **only** the WAL `sync()` while holding
57//! leadership — no extra work — to keep the critical section as
58//! short as possible.
59
60use std::io;
61use std::sync::atomic::{AtomicU64, Ordering};
62use std::sync::{Condvar, Mutex};
63
64use super::writer::WalWriter;
65
66/// Coordinator state guarded by `GroupCommit::state`.
67struct GroupCommitState {
68 /// True when a leader is currently inside `wal.sync()`.
69 in_progress: bool,
70}
71
72/// Cooperative WAL flush coordinator.
73///
74/// Owned by the [`super::transaction::TransactionManager`]; cheap to
75/// share across writers via `Arc`.
76pub struct GroupCommit {
77 /// Highest LSN known to be durable. Atomic so the fast path can
78 /// read it without taking the state lock.
79 flushed_lsn: AtomicU64,
80 /// Coordination state.
81 state: Mutex<GroupCommitState>,
82 /// Wakes waiters when `flushed_lsn` advances.
83 cond: Condvar,
84}
85
86struct LeadershipGuard<'a> {
87 group_commit: &'a GroupCommit,
88}
89
90impl GroupCommit {
91 /// Create a new coordinator initialised with the WAL's current
92 /// durable position. Pass `wal.durable_lsn()` from a freshly
93 /// opened `WalWriter`.
94 pub fn new(initial_durable_lsn: u64) -> Self {
95 Self {
96 flushed_lsn: AtomicU64::new(initial_durable_lsn),
97 state: Mutex::new(GroupCommitState { in_progress: false }),
98 cond: Condvar::new(),
99 }
100 }
101
102 /// Highest LSN that is known durable on disk. Cheap atomic read,
103 /// no lock taken. Used by tests and by the diagnostics surface.
104 pub fn flushed_lsn(&self) -> u64 {
105 self.flushed_lsn.load(Ordering::Acquire)
106 }
107
108 /// Block the caller until the WAL is durable up to at least
109 /// `target`. If another writer is already mid-flush, piggyback
110 /// on it; otherwise become the leader and do the fsync ourselves.
111 ///
112 /// `wal` is the same `Mutex<WalWriter>` the transaction manager
113 /// holds. The leader briefly takes that lock to call `sync()`.
114 pub fn commit_at_least(&self, target: u64, wal: &Mutex<WalWriter>) -> io::Result<()> {
115 // ── Fast path: already flushed past us. ─────────────────────
116 if self.flushed_lsn.load(Ordering::Acquire) >= target {
117 return Ok(());
118 }
119
120 let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
121
122 loop {
123 // Re-check under the lock: another leader may have raced
124 // and already flushed past our target between the load
125 // above and the lock acquisition.
126 if self.flushed_lsn.load(Ordering::Acquire) >= target {
127 return Ok(());
128 }
129
130 if state.in_progress {
131 // Another leader is mid-flush. Wait until they wake us,
132 // then re-check whether our target is covered. If not,
133 // loop and try to become the next leader instead of
134 // going back to sleep forever.
135 state = self
136 .cond
137 .wait_while(state, |state| {
138 state.in_progress && self.flushed_lsn.load(Ordering::Acquire) < target
139 })
140 .unwrap_or_else(|p| p.into_inner());
141 continue;
142 }
143
144 // ── We are the leader. ──────────────────────────────────
145 state.in_progress = true;
146 break;
147 }
148
149 // Drop the state lock so waiters can re-check `flushed_lsn`
150 // without bouncing on this mutex.
151 drop(state);
152 let _leader = LeadershipGuard { group_commit: self };
153
154 // Phase 1 — take the WAL lock BRIEFLY to drain the BufWriter
155 // into the kernel and capture both the LSN and a cloned
156 // file handle for the fsync. The whole point of this dance
157 // is that we DO NOT hold the WAL lock during the expensive
158 // sync_all() call below.
159 let (target_lsn, sync_handle) = {
160 let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
161 wal_guard.drain_for_group_sync()?
162 };
163
164 // Phase 2 — the lock is released. Other writers can now
165 // append into the BufWriter while we wait on fsync. Their
166 // bytes will either be picked up by THIS sync_all() (if
167 // they reach the kernel before the syscall returns) or
168 // by the NEXT leader.
169 //
170 // sync_all() on the cloned handle flushes the same kernel
171 // inode the BufWriter writes to, so coverage is correct.
172 sync_handle.sync_all()?;
173
174 // Phase 3 — take the WAL lock briefly to publish the new
175 // durable LSN. Other writers may have appended in the
176 // meantime; their bytes that reached the kernel are NOW
177 // also durable, but we conservatively only mark `target_lsn`
178 // (the LSN we drained) as durable. Any later writer's
179 // commit_at_least call will quickly become a no-op via the
180 // fast path if the kernel already had their bytes when we
181 // fsync'd.
182 {
183 let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
184 wal_guard.mark_durable(target_lsn);
185 }
186
187 // Publish the new flushed LSN to atomic readers, then
188 // let the leadership guard wake every waiter.
189 self.flushed_lsn.store(target_lsn, Ordering::Release);
190
191 Ok(())
192 }
193
194 fn release_leadership(&self) {
195 let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
196 state.in_progress = false;
197 drop(state);
198 self.cond.notify_all();
199 }
200}
201
202impl std::fmt::Debug for GroupCommit {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.debug_struct("GroupCommit")
205 .field("flushed_lsn", &self.flushed_lsn.load(Ordering::Acquire))
206 .finish()
207 }
208}
209
210impl Drop for LeadershipGuard<'_> {
211 fn drop(&mut self) {
212 self.group_commit.release_leadership();
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use crate::storage::wal::record::WalRecord;
220 use crate::storage::wal::writer::WalWriter;
221 use std::path::PathBuf;
222 use std::sync::mpsc;
223 use std::sync::Arc;
224 use std::thread;
225 use std::time::{Duration, SystemTime, UNIX_EPOCH};
226
227 struct FileGuard {
228 path: PathBuf,
229 }
230
231 impl Drop for FileGuard {
232 fn drop(&mut self) {
233 let _ = std::fs::remove_file(&self.path);
234 }
235 }
236
237 fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
238 let nanos = SystemTime::now()
239 .duration_since(UNIX_EPOCH)
240 .unwrap()
241 .as_nanos();
242 let path = std::env::temp_dir().join(format!(
243 "rb_group_commit_{}_{}_{}.wal",
244 name,
245 std::process::id(),
246 nanos
247 ));
248 let _ = std::fs::remove_file(&path);
249 (FileGuard { path: path.clone() }, path)
250 }
251
252 #[test]
253 fn fast_path_when_already_flushed() {
254 let (_g, path) = temp_wal("fast_path");
255 let wal = Mutex::new(WalWriter::open(&path).unwrap());
256 let initial = wal.lock().unwrap().durable_lsn();
257 let gc = GroupCommit::new(initial);
258 // Target equal to the initial flushed_lsn → no fsync.
259 gc.commit_at_least(initial, &wal).unwrap();
260 assert_eq!(gc.flushed_lsn(), initial);
261 }
262
263 #[test]
264 fn single_writer_advances_flushed_lsn() {
265 let (_g, path) = temp_wal("single_writer");
266 let wal = Mutex::new(WalWriter::open(&path).unwrap());
267 let initial = wal.lock().unwrap().durable_lsn();
268 let gc = GroupCommit::new(initial);
269
270 // Append a record, capture its LSN, then commit.
271 let target = {
272 let mut w = wal.lock().unwrap();
273 w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
274 w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
275 w.current_lsn()
276 };
277 assert!(target > initial);
278
279 gc.commit_at_least(target, &wal).unwrap();
280 assert!(gc.flushed_lsn() >= target);
281 }
282
283 #[test]
284 fn flushed_lsn_is_monotonic() {
285 let (_g, path) = temp_wal("monotonic");
286 let wal = Mutex::new(WalWriter::open(&path).unwrap());
287 let initial = wal.lock().unwrap().durable_lsn();
288 let gc = GroupCommit::new(initial);
289
290 // First commit.
291 let lo = {
292 let mut w = wal.lock().unwrap();
293 w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
294 w.current_lsn()
295 };
296 gc.commit_at_least(lo, &wal).unwrap();
297 let after_lo = gc.flushed_lsn();
298
299 // Second commit advances further.
300 let hi = {
301 let mut w = wal.lock().unwrap();
302 w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
303 w.current_lsn()
304 };
305 gc.commit_at_least(hi, &wal).unwrap();
306 let after_hi = gc.flushed_lsn();
307
308 assert!(after_hi >= after_lo);
309 // Calling commit_at_least with `lo` after `hi` is a no-op.
310 gc.commit_at_least(lo, &wal).unwrap();
311 assert_eq!(gc.flushed_lsn(), after_hi);
312 }
313
314 #[test]
315 fn concurrent_writers_coalesce_through_one_coordinator() {
316 // Two threads each commit a few records. Both must succeed
317 // and `flushed_lsn` must reflect every byte they wrote.
318 // We can't directly count fsyncs at this layer (the WAL
319 // doesn't expose a sync counter), but the absence of
320 // deadlock and the correct final LSN are the contract.
321 let (_g, path) = temp_wal("two_writers");
322 let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
323 let initial = wal.lock().unwrap().durable_lsn();
324 let gc = Arc::new(GroupCommit::new(initial));
325
326 let mut handles = Vec::new();
327 for tx in 0..2u64 {
328 let wal_c = Arc::clone(&wal);
329 let gc_c = Arc::clone(&gc);
330 handles.push(thread::spawn(move || -> io::Result<()> {
331 for i in 0..10u64 {
332 let target = {
333 let mut w = wal_c.lock().unwrap();
334 w.append(&WalRecord::Begin {
335 tx_id: tx * 100 + i,
336 })?;
337 w.append(&WalRecord::Commit {
338 tx_id: tx * 100 + i,
339 })?;
340 w.current_lsn()
341 };
342 gc_c.commit_at_least(target, &wal_c)?;
343 }
344 Ok(())
345 }));
346 }
347
348 for h in handles {
349 h.join().unwrap().unwrap();
350 }
351
352 let final_durable = wal.lock().unwrap().durable_lsn();
353 assert!(gc.flushed_lsn() >= final_durable);
354 // 20 commits worth of 13-byte Begin + 13-byte Commit
355 // records = 520 bytes minimum on top of the 8-byte header.
356 assert!(final_durable >= 8 + 520);
357 }
358
359 #[test]
360 fn waiter_retries_after_wakeup_if_previous_flush_was_too_small() {
361 let (_g, path) = temp_wal("waiter_retry");
362 let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
363 let initial = wal.lock().unwrap().durable_lsn();
364 let gc = Arc::new(GroupCommit::new(initial));
365
366 let target = {
367 let mut w = wal.lock().unwrap();
368 w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
369 w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
370 w.current_lsn()
371 };
372
373 {
374 let mut state = gc.state.lock().unwrap();
375 state.in_progress = true;
376 }
377
378 let (done_tx, done_rx) = mpsc::channel();
379 let wal_c = Arc::clone(&wal);
380 let gc_c = Arc::clone(&gc);
381 let waiter = thread::spawn(move || {
382 let result = gc_c.commit_at_least(target, &wal_c);
383 let _ = done_tx.send(result);
384 });
385
386 thread::sleep(Duration::from_millis(50));
387
388 {
389 let mut state = gc.state.lock().unwrap_or_else(|p| p.into_inner());
390 state.in_progress = false;
391 }
392 gc.cond.notify_all();
393
394 done_rx
395 .recv_timeout(Duration::from_secs(2))
396 .expect("waiter should retry as the next leader")
397 .unwrap();
398 waiter.join().unwrap();
399
400 assert!(gc.flushed_lsn() >= target);
401 assert!(wal.lock().unwrap().durable_lsn() >= target);
402 }
403
404 #[test]
405 fn high_concurrency_eight_writers_no_deadlock() {
406 // 8 threads × 50 commits each. Stress the coordinator:
407 // expected to complete without deadlock and with the WAL
408 // fully durable up to the largest committed LSN.
409 let (_g, path) = temp_wal("eight_writers");
410 let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
411 let initial = wal.lock().unwrap().durable_lsn();
412 let gc = Arc::new(GroupCommit::new(initial));
413
414 let mut handles = Vec::new();
415 for tx in 0..8u64 {
416 let wal_c = Arc::clone(&wal);
417 let gc_c = Arc::clone(&gc);
418 handles.push(thread::spawn(move || -> io::Result<()> {
419 for i in 0..50u64 {
420 let target = {
421 let mut w = wal_c.lock().unwrap();
422 w.append(&WalRecord::Begin {
423 tx_id: tx * 1000 + i,
424 })?;
425 w.append(&WalRecord::Commit {
426 tx_id: tx * 1000 + i,
427 })?;
428 w.current_lsn()
429 };
430 gc_c.commit_at_least(target, &wal_c)?;
431 }
432 Ok(())
433 }));
434 }
435
436 for h in handles {
437 h.join().unwrap().unwrap();
438 }
439
440 let current = wal.lock().unwrap().current_lsn();
441 let durable = wal.lock().unwrap().durable_lsn();
442 assert_eq!(durable, current, "every appended byte must be durable");
443 assert!(gc.flushed_lsn() >= current);
444 }
445
446 #[test]
447 fn writers_recover_from_poisoned_state() {
448 // If a previous panic poisoned the state mutex, subsequent
449 // writers must still be able to commit (we recover via
450 // `unwrap_or_else(into_inner)`).
451 let (_g, path) = temp_wal("poison_recovery");
452 let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
453 let initial = wal.lock().unwrap().durable_lsn();
454 let gc = Arc::new(GroupCommit::new(initial));
455
456 // Poison the state mutex by panicking inside it.
457 let gc_c = Arc::clone(&gc);
458 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
459 let _state = gc_c.state.lock().unwrap();
460 panic!("intentional poison");
461 }));
462
463 // The mutex is now poisoned — but commit_at_least must
464 // still work because we recover from poisoning.
465 let target = {
466 let mut w = wal.lock().unwrap();
467 w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
468 w.current_lsn()
469 };
470 gc.commit_at_least(target, &wal).unwrap();
471 assert!(gc.flushed_lsn() >= target);
472 }
473}