Skip to main content

reddb_server/storage/wal/
group_commit.rs

1//! Cooperative group-commit coordinator for the WAL.
2//!
3//! Mirrors PostgreSQL's `XLogFlush` waiter logic
4//! (`src/backend/access/transam/xlog.c`). The single-writer commit
5//! path used to call `wal.sync()` once per commit, so N concurrent
6//! writers paid N independent fsyncs (~N × 100 µs on SSD). Group
7//! commit collapses those into **one** fsync that covers every byte
8//! appended up to the slowest writer's LSN.
9//!
10//! # Algorithm
11//!
12//! 1. A writer appends its records under the WAL lock and captures
13//!    the resulting `commit_lsn = wal.current_lsn()` after its
14//!    `Commit` record.
15//! 2. The writer releases the WAL lock and calls
16//!    [`GroupCommit::commit_at_least(commit_lsn, &wal)`].
17//! 3. Inside `commit_at_least`:
18//!    - **Fast path:** if `flushed_lsn >= commit_lsn`, the write is
19//!      already durable from a piggyback on a previous fsync.
20//!      Return immediately.
21//!    - Otherwise take the coordinator state lock. Re-check the
22//!      flushed LSN (another leader may have raced).
23//!    - If a leader is already mid-flush, wait on the condvar until
24//!      that flush finishes. Re-check `flushed_lsn`; if the batch now
25//!      covers `commit_lsn`, return, otherwise race to become the next
26//!      leader for the remaining tail.
27//!    - If no leader is in progress, become the leader: mark
28//!      `in_progress = true`, drop the state lock, take the WAL
29//!      lock, call `wal.sync()`, publish the new `flushed_lsn`,
30//!      take the state lock again, clear `in_progress`, and
31//!      notify all waiters.
32//!
33//! # Why this works
34//!
35//! Between the first writer's `append` and the leader's `wal.sync()`,
36//! other writers can grab the WAL lock and append more records.
37//! When the leader finally calls `sync()`, it flushes **everything**
38//! that has been appended so far — not just its own records. Writers
39//! whose LSN is now covered return immediately; writers that appended
40//! after the leader captured `target_lsn` wake up, see they still need
41//! more durability, and one of them becomes the next leader.
42//!
43//! So `commit_at_least` produces one fsync per *batch* of concurrent
44//! writers, not per writer. On a workload with 8 concurrent
45//! committers, the throughput goes from ~8 × 100 µs ≈ 1 250
46//! commits/s to ~1 × 100 µs ≈ 10 000 commits/s, an 8× win.
47//!
48//! # Correctness
49//!
50//! - `flushed_lsn` is monotonic: only the leader writes it, and
51//!   only after a successful `sync()`.
52//! - The state lock + condvar guarantee that exactly one leader is
53//!   ever in flight, so we never have two parallel fsyncs racing.
54//! - Waiters re-check `flushed_lsn` under the state lock before
55//!   sleeping, so we never miss a wake-up.
56//! - The leader does **only** the WAL `sync()` while holding
57//!   leadership — no extra work — to keep the critical section as
58//!   short as possible.
59
60use std::io;
61use std::sync::atomic::{AtomicU64, Ordering};
62use std::sync::{Condvar, Mutex};
63
64use super::writer::WalWriter;
65
66/// Coordinator state guarded by `GroupCommit::state`.
67struct GroupCommitState {
68    /// True when a leader is currently inside `wal.sync()`.
69    in_progress: bool,
70}
71
72/// Cooperative WAL flush coordinator.
73///
74/// Owned by the [`super::transaction::TransactionManager`]; cheap to
75/// share across writers via `Arc`.
76pub struct GroupCommit {
77    /// Highest LSN known to be durable. Atomic so the fast path can
78    /// read it without taking the state lock.
79    flushed_lsn: AtomicU64,
80    /// Coordination state.
81    state: Mutex<GroupCommitState>,
82    /// Wakes waiters when `flushed_lsn` advances.
83    cond: Condvar,
84}
85
86struct LeadershipGuard<'a> {
87    group_commit: &'a GroupCommit,
88}
89
90impl GroupCommit {
91    /// Create a new coordinator initialised with the WAL's current
92    /// durable position. Pass `wal.durable_lsn()` from a freshly
93    /// opened `WalWriter`.
94    pub fn new(initial_durable_lsn: u64) -> Self {
95        Self {
96            flushed_lsn: AtomicU64::new(initial_durable_lsn),
97            state: Mutex::new(GroupCommitState { in_progress: false }),
98            cond: Condvar::new(),
99        }
100    }
101
102    /// Highest LSN that is known durable on disk. Cheap atomic read,
103    /// no lock taken. Used by tests and by the diagnostics surface.
104    pub fn flushed_lsn(&self) -> u64 {
105        self.flushed_lsn.load(Ordering::Acquire)
106    }
107
108    /// Block the caller until the WAL is durable up to at least
109    /// `target`. If another writer is already mid-flush, piggyback
110    /// on it; otherwise become the leader and do the fsync ourselves.
111    ///
112    /// `wal` is the same `Mutex<WalWriter>` the transaction manager
113    /// holds. The leader briefly takes that lock to call `sync()`.
114    pub fn commit_at_least(&self, target: u64, wal: &Mutex<WalWriter>) -> io::Result<()> {
115        // ── Fast path: already flushed past us. ─────────────────────
116        if self.flushed_lsn.load(Ordering::Acquire) >= target {
117            return Ok(());
118        }
119
120        let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
121
122        loop {
123            // Re-check under the lock: another leader may have raced
124            // and already flushed past our target between the load
125            // above and the lock acquisition.
126            if self.flushed_lsn.load(Ordering::Acquire) >= target {
127                return Ok(());
128            }
129
130            if state.in_progress {
131                // Another leader is mid-flush. Wait until they wake us,
132                // then re-check whether our target is covered. If not,
133                // loop and try to become the next leader instead of
134                // going back to sleep forever.
135                state = self
136                    .cond
137                    .wait_while(state, |state| {
138                        state.in_progress && self.flushed_lsn.load(Ordering::Acquire) < target
139                    })
140                    .unwrap_or_else(|p| p.into_inner());
141                continue;
142            }
143
144            // ── We are the leader. ──────────────────────────────────
145            state.in_progress = true;
146            break;
147        }
148
149        // Drop the state lock so waiters can re-check `flushed_lsn`
150        // without bouncing on this mutex.
151        drop(state);
152        let _leader = LeadershipGuard { group_commit: self };
153
154        // Phase 1 — take the WAL lock BRIEFLY to drain the BufWriter
155        // into the kernel and capture both the LSN and a cloned
156        // file handle for the fsync. The whole point of this dance
157        // is that we DO NOT hold the WAL lock during the expensive
158        // sync_all() call below.
159        let (target_lsn, sync_handle) = {
160            let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
161            wal_guard.drain_for_group_sync()?
162        };
163
164        // Phase 2 — the lock is released. Other writers can now
165        // append into the BufWriter while we wait on fsync. Their
166        // bytes will either be picked up by THIS sync_all() (if
167        // they reach the kernel before the syscall returns) or
168        // by the NEXT leader.
169        //
170        // sync_all() on the cloned handle flushes the same kernel
171        // inode the BufWriter writes to, so coverage is correct.
172        sync_handle.sync_all()?;
173
174        // Phase 3 — take the WAL lock briefly to publish the new
175        // durable LSN. Other writers may have appended in the
176        // meantime; their bytes that reached the kernel are NOW
177        // also durable, but we conservatively only mark `target_lsn`
178        // (the LSN we drained) as durable. Any later writer's
179        // commit_at_least call will quickly become a no-op via the
180        // fast path if the kernel already had their bytes when we
181        // fsync'd.
182        {
183            let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
184            wal_guard.mark_durable(target_lsn);
185        }
186
187        // Publish the new flushed LSN to atomic readers, then
188        // let the leadership guard wake every waiter.
189        self.flushed_lsn.store(target_lsn, Ordering::Release);
190
191        Ok(())
192    }
193
194    fn release_leadership(&self) {
195        let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
196        state.in_progress = false;
197        drop(state);
198        self.cond.notify_all();
199    }
200}
201
202impl std::fmt::Debug for GroupCommit {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.debug_struct("GroupCommit")
205            .field("flushed_lsn", &self.flushed_lsn.load(Ordering::Acquire))
206            .finish()
207    }
208}
209
210impl Drop for LeadershipGuard<'_> {
211    fn drop(&mut self) {
212        self.group_commit.release_leadership();
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use crate::storage::wal::record::WalRecord;
220    use crate::storage::wal::writer::WalWriter;
221    use std::path::PathBuf;
222    use std::sync::mpsc;
223    use std::sync::Arc;
224    use std::thread;
225    use std::time::{Duration, SystemTime, UNIX_EPOCH};
226
227    struct FileGuard {
228        path: PathBuf,
229    }
230
231    impl Drop for FileGuard {
232        fn drop(&mut self) {
233            let _ = std::fs::remove_file(&self.path);
234        }
235    }
236
237    fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
238        let nanos = SystemTime::now()
239            .duration_since(UNIX_EPOCH)
240            .unwrap()
241            .as_nanos();
242        let path = std::env::temp_dir().join(format!(
243            "rb_group_commit_{}_{}_{}.wal",
244            name,
245            std::process::id(),
246            nanos
247        ));
248        let _ = std::fs::remove_file(&path);
249        (FileGuard { path: path.clone() }, path)
250    }
251
252    #[test]
253    fn fast_path_when_already_flushed() {
254        let (_g, path) = temp_wal("fast_path");
255        let wal = Mutex::new(WalWriter::open(&path).unwrap());
256        let initial = wal.lock().unwrap().durable_lsn();
257        let gc = GroupCommit::new(initial);
258        // Target equal to the initial flushed_lsn → no fsync.
259        gc.commit_at_least(initial, &wal).unwrap();
260        assert_eq!(gc.flushed_lsn(), initial);
261    }
262
263    #[test]
264    fn single_writer_advances_flushed_lsn() {
265        let (_g, path) = temp_wal("single_writer");
266        let wal = Mutex::new(WalWriter::open(&path).unwrap());
267        let initial = wal.lock().unwrap().durable_lsn();
268        let gc = GroupCommit::new(initial);
269
270        // Append a record, capture its LSN, then commit.
271        let target = {
272            let mut w = wal.lock().unwrap();
273            w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
274            w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
275            w.current_lsn()
276        };
277        assert!(target > initial);
278
279        gc.commit_at_least(target, &wal).unwrap();
280        assert!(gc.flushed_lsn() >= target);
281    }
282
283    #[test]
284    fn flushed_lsn_is_monotonic() {
285        let (_g, path) = temp_wal("monotonic");
286        let wal = Mutex::new(WalWriter::open(&path).unwrap());
287        let initial = wal.lock().unwrap().durable_lsn();
288        let gc = GroupCommit::new(initial);
289
290        // First commit.
291        let lo = {
292            let mut w = wal.lock().unwrap();
293            w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
294            w.current_lsn()
295        };
296        gc.commit_at_least(lo, &wal).unwrap();
297        let after_lo = gc.flushed_lsn();
298
299        // Second commit advances further.
300        let hi = {
301            let mut w = wal.lock().unwrap();
302            w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
303            w.current_lsn()
304        };
305        gc.commit_at_least(hi, &wal).unwrap();
306        let after_hi = gc.flushed_lsn();
307
308        assert!(after_hi >= after_lo);
309        // Calling commit_at_least with `lo` after `hi` is a no-op.
310        gc.commit_at_least(lo, &wal).unwrap();
311        assert_eq!(gc.flushed_lsn(), after_hi);
312    }
313
314    #[test]
315    fn concurrent_writers_coalesce_through_one_coordinator() {
316        // Two threads each commit a few records. Both must succeed
317        // and `flushed_lsn` must reflect every byte they wrote.
318        // We can't directly count fsyncs at this layer (the WAL
319        // doesn't expose a sync counter), but the absence of
320        // deadlock and the correct final LSN are the contract.
321        let (_g, path) = temp_wal("two_writers");
322        let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
323        let initial = wal.lock().unwrap().durable_lsn();
324        let gc = Arc::new(GroupCommit::new(initial));
325
326        let mut handles = Vec::new();
327        for tx in 0..2u64 {
328            let wal_c = Arc::clone(&wal);
329            let gc_c = Arc::clone(&gc);
330            handles.push(thread::spawn(move || -> io::Result<()> {
331                for i in 0..10u64 {
332                    let target = {
333                        let mut w = wal_c.lock().unwrap();
334                        w.append(&WalRecord::Begin {
335                            tx_id: tx * 100 + i,
336                        })?;
337                        w.append(&WalRecord::Commit {
338                            tx_id: tx * 100 + i,
339                        })?;
340                        w.current_lsn()
341                    };
342                    gc_c.commit_at_least(target, &wal_c)?;
343                }
344                Ok(())
345            }));
346        }
347
348        for h in handles {
349            h.join().unwrap().unwrap();
350        }
351
352        let final_durable = wal.lock().unwrap().durable_lsn();
353        assert!(gc.flushed_lsn() >= final_durable);
354        // 20 commits worth of 13-byte Begin + 13-byte Commit
355        // records = 520 bytes minimum on top of the 8-byte header.
356        assert!(final_durable >= 8 + 520);
357    }
358
359    #[test]
360    fn waiter_retries_after_wakeup_if_previous_flush_was_too_small() {
361        let (_g, path) = temp_wal("waiter_retry");
362        let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
363        let initial = wal.lock().unwrap().durable_lsn();
364        let gc = Arc::new(GroupCommit::new(initial));
365
366        let target = {
367            let mut w = wal.lock().unwrap();
368            w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
369            w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
370            w.current_lsn()
371        };
372
373        {
374            let mut state = gc.state.lock().unwrap();
375            state.in_progress = true;
376        }
377
378        let (done_tx, done_rx) = mpsc::channel();
379        let wal_c = Arc::clone(&wal);
380        let gc_c = Arc::clone(&gc);
381        let waiter = thread::spawn(move || {
382            let result = gc_c.commit_at_least(target, &wal_c);
383            let _ = done_tx.send(result);
384        });
385
386        thread::sleep(Duration::from_millis(50));
387
388        {
389            let mut state = gc.state.lock().unwrap_or_else(|p| p.into_inner());
390            state.in_progress = false;
391        }
392        gc.cond.notify_all();
393
394        done_rx
395            .recv_timeout(Duration::from_secs(2))
396            .expect("waiter should retry as the next leader")
397            .unwrap();
398        waiter.join().unwrap();
399
400        assert!(gc.flushed_lsn() >= target);
401        assert!(wal.lock().unwrap().durable_lsn() >= target);
402    }
403
404    #[test]
405    fn high_concurrency_eight_writers_no_deadlock() {
406        // 8 threads × 50 commits each. Stress the coordinator:
407        // expected to complete without deadlock and with the WAL
408        // fully durable up to the largest committed LSN.
409        let (_g, path) = temp_wal("eight_writers");
410        let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
411        let initial = wal.lock().unwrap().durable_lsn();
412        let gc = Arc::new(GroupCommit::new(initial));
413
414        let mut handles = Vec::new();
415        for tx in 0..8u64 {
416            let wal_c = Arc::clone(&wal);
417            let gc_c = Arc::clone(&gc);
418            handles.push(thread::spawn(move || -> io::Result<()> {
419                for i in 0..50u64 {
420                    let target = {
421                        let mut w = wal_c.lock().unwrap();
422                        w.append(&WalRecord::Begin {
423                            tx_id: tx * 1000 + i,
424                        })?;
425                        w.append(&WalRecord::Commit {
426                            tx_id: tx * 1000 + i,
427                        })?;
428                        w.current_lsn()
429                    };
430                    gc_c.commit_at_least(target, &wal_c)?;
431                }
432                Ok(())
433            }));
434        }
435
436        for h in handles {
437            h.join().unwrap().unwrap();
438        }
439
440        let current = wal.lock().unwrap().current_lsn();
441        let durable = wal.lock().unwrap().durable_lsn();
442        assert_eq!(durable, current, "every appended byte must be durable");
443        assert!(gc.flushed_lsn() >= current);
444    }
445
446    #[test]
447    fn writers_recover_from_poisoned_state() {
448        // If a previous panic poisoned the state mutex, subsequent
449        // writers must still be able to commit (we recover via
450        // `unwrap_or_else(into_inner)`).
451        let (_g, path) = temp_wal("poison_recovery");
452        let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
453        let initial = wal.lock().unwrap().durable_lsn();
454        let gc = Arc::new(GroupCommit::new(initial));
455
456        // Poison the state mutex by panicking inside it.
457        let gc_c = Arc::clone(&gc);
458        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
459            let _state = gc_c.state.lock().unwrap();
460            panic!("intentional poison");
461        }));
462
463        // The mutex is now poisoned — but commit_at_least must
464        // still work because we recover from poisoning.
465        let target = {
466            let mut w = wal.lock().unwrap();
467            w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
468            w.current_lsn()
469        };
470        gc.commit_at_least(target, &wal).unwrap();
471        assert!(gc.flushed_lsn() >= target);
472    }
473}