Skip to main content

reddb_server/storage/wal/
group_commit.rs

1//! Cooperative group-commit coordinator for the WAL.
2//!
3//! Mirrors PostgreSQL's `XLogFlush` waiter logic
4//! (`src/backend/access/transam/xlog.c`). The single-writer commit
5//! path used to call `wal.sync()` once per commit, so N concurrent
6//! writers paid N independent fsyncs (~N × 100 µs on SSD). Group
7//! commit collapses those into **one** fsync that covers every byte
8//! appended up to the slowest writer's LSN.
9//!
10//! # Algorithm
11//!
12//! 1. A writer appends its records under the WAL lock and captures
13//!    the resulting `commit_lsn = wal.current_lsn()` after its
14//!    `Commit` record.
15//! 2. The writer releases the WAL lock and calls
16//!    [`GroupCommit::commit_at_least(commit_lsn, &wal)`].
17//! 3. Inside `commit_at_least`:
18//!    - **Fast path:** if `flushed_lsn >= commit_lsn`, the write is
19//!      already durable from a piggyback on a previous fsync.
20//!      Return immediately.
21//!    - Otherwise take the coordinator state lock. Re-check the
22//!      flushed LSN (another leader may have raced).
23//!    - If a leader is already mid-flush, wait on the condvar until
24//!      that flush finishes. Re-check `flushed_lsn`; if the batch now
25//!      covers `commit_lsn`, return, otherwise race to become the next
26//!      leader for the remaining tail.
27//!    - If no leader is in progress, become the leader: mark
28//!      `in_progress = true`, drop the state lock, take the WAL
29//!      lock, call `wal.sync()`, publish the new `flushed_lsn`,
30//!      take the state lock again, clear `in_progress`, and
31//!      notify all waiters.
32//!
33//! # Why this works
34//!
35//! Between the first writer's `append` and the leader's `wal.sync()`,
36//! other writers can grab the WAL lock and append more records.
37//! When the leader finally calls `sync()`, it flushes **everything**
38//! that has been appended so far — not just its own records. Writers
39//! whose LSN is now covered return immediately; writers that appended
40//! after the leader captured `target_lsn` wake up, see they still need
41//! more durability, and one of them becomes the next leader.
42//!
43//! So `commit_at_least` produces one fsync per *batch* of concurrent
44//! writers, not per writer. On a workload with 8 concurrent
45//! committers, the throughput goes from ~8 × 100 µs ≈ 1 250
46//! commits/s to ~1 × 100 µs ≈ 10 000 commits/s, an 8× win.
47//!
48//! # Correctness
49//!
50//! - `flushed_lsn` is monotonic: only the leader writes it, and
51//!   only after a successful `sync()`.
52//! - The state lock + condvar guarantee that exactly one leader is
53//!   ever in flight, so we never have two parallel fsyncs racing.
54//! - Waiters re-check `flushed_lsn` under the state lock before
55//!   sleeping, so we never miss a wake-up.
56//! - The leader does **only** the WAL `sync()` while holding
57//!   leadership — no extra work — to keep the critical section as
58//!   short as possible.
59
60use std::io;
61use std::sync::atomic::{AtomicU64, Ordering};
62use std::sync::{Condvar, Mutex};
63
64use super::writer::WalWriter;
65
66/// Coordinator state guarded by `GroupCommit::state`.
67struct GroupCommitState {
68    /// True when a leader is currently inside `wal.sync()`.
69    in_progress: bool,
70}
71
72/// Cooperative WAL flush coordinator.
73///
74/// Owned by the [`super::transaction::TransactionManager`]; cheap to
75/// share across writers via `Arc`.
76pub struct GroupCommit {
77    /// Highest LSN known to be durable. Atomic so the fast path can
78    /// read it without taking the state lock.
79    flushed_lsn: AtomicU64,
80    /// Coordination state.
81    state: Mutex<GroupCommitState>,
82    /// Wakes waiters when `flushed_lsn` advances.
83    cond: Condvar,
84}
85
86struct LeadershipGuard<'a> {
87    group_commit: &'a GroupCommit,
88}
89
90impl GroupCommit {
91    /// Create a new coordinator initialised with the WAL's current
92    /// durable position. Pass `wal.durable_lsn()` from a freshly
93    /// opened `WalWriter`.
94    pub fn new(initial_durable_lsn: u64) -> Self {
95        Self {
96            flushed_lsn: AtomicU64::new(initial_durable_lsn),
97            state: Mutex::new(GroupCommitState { in_progress: false }),
98            cond: Condvar::new(),
99        }
100    }
101
102    /// Highest LSN that is known durable on disk. Cheap atomic read,
103    /// no lock taken. Used by tests and by the diagnostics surface.
104    pub fn flushed_lsn(&self) -> u64 {
105        self.flushed_lsn.load(Ordering::Acquire)
106    }
107
108    /// Block the caller until the WAL is durable up to at least
109    /// `target`. If another writer is already mid-flush, piggyback
110    /// on it; otherwise become the leader and do the fsync ourselves.
111    ///
112    /// `wal` is the same `Mutex<WalWriter>` the transaction manager
113    /// holds. The leader briefly takes that lock to call `sync()`.
114    pub fn commit_at_least(&self, target: u64, wal: &Mutex<WalWriter>) -> io::Result<()> {
115        // ── Fast path: already flushed past us. ─────────────────────
116        if self.flushed_lsn.load(Ordering::Acquire) >= target {
117            return Ok(());
118        }
119
120        let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
121
122        loop {
123            // Re-check under the lock: another leader may have raced
124            // and already flushed past our target between the load
125            // above and the lock acquisition.
126            if self.flushed_lsn.load(Ordering::Acquire) >= target {
127                return Ok(());
128            }
129
130            if state.in_progress {
131                // Another leader is mid-flush. Wait until they wake us,
132                // then re-check whether our target is covered. If not,
133                // loop and try to become the next leader instead of
134                // going back to sleep forever.
135                state = self
136                    .cond
137                    .wait_while(state, |state| {
138                        state.in_progress && self.flushed_lsn.load(Ordering::Acquire) < target
139                    })
140                    .unwrap_or_else(|p| p.into_inner());
141                continue;
142            }
143
144            // ── We are the leader. ──────────────────────────────────
145            state.in_progress = true;
146            break;
147        }
148
149        // Drop the state lock so waiters can re-check `flushed_lsn`
150        // without bouncing on this mutex.
151        drop(state);
152        let _leader = LeadershipGuard { group_commit: self };
153
154        // Phase 1 — take the WAL lock BRIEFLY to drain the BufWriter
155        // into the kernel and capture both the LSN and a cloned
156        // file handle for the size-aware sync. The whole point of this dance
157        // is that we DO NOT hold the WAL lock during the expensive
158        // disk sync below.
159        let sync = {
160            let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
161            wal_guard.drain_for_group_sync()?
162        };
163
164        // Phase 2 — the lock is released. Other writers can now
165        // append into the BufWriter while we wait on fsync. Their
166        // bytes will either be picked up by THIS sync (if
167        // they reach the kernel before the syscall returns) or
168        // by the NEXT leader.
169        //
170        // The sync plan uses sync_data() when the WAL remains inside
171        // already-synced preallocation, and sync_all() when file/allocation
172        // metadata must be forced.
173        sync.sync()?;
174
175        // Phase 3 — take the WAL lock briefly to publish the new
176        // durable LSN. Other writers may have appended in the
177        // meantime; their bytes that reached the kernel are NOW
178        // also durable, but we conservatively only mark `target_lsn`
179        // (the LSN we drained) as durable. Any later writer's
180        // commit_at_least call will quickly become a no-op via the
181        // fast path if the kernel already had their bytes when we
182        // fsync'd.
183        {
184            let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
185            wal_guard.mark_durable(&sync);
186        }
187
188        // Publish the new flushed LSN to atomic readers, then
189        // let the leadership guard wake every waiter.
190        self.flushed_lsn.store(sync.target_lsn(), Ordering::Release);
191
192        Ok(())
193    }
194
195    fn release_leadership(&self) {
196        let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
197        state.in_progress = false;
198        drop(state);
199        self.cond.notify_all();
200    }
201}
202
203impl std::fmt::Debug for GroupCommit {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        f.debug_struct("GroupCommit")
206            .field("flushed_lsn", &self.flushed_lsn.load(Ordering::Acquire))
207            .finish()
208    }
209}
210
211impl Drop for LeadershipGuard<'_> {
212    fn drop(&mut self) {
213        self.group_commit.release_leadership();
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use crate::storage::wal::record::WalRecord;
221    use crate::storage::wal::writer::WalWriter;
222    use std::path::PathBuf;
223    use std::sync::mpsc;
224    use std::sync::Arc;
225    use std::thread;
226    use std::time::{Duration, SystemTime, UNIX_EPOCH};
227
228    struct FileGuard {
229        path: PathBuf,
230    }
231
232    impl Drop for FileGuard {
233        fn drop(&mut self) {
234            let _ = std::fs::remove_file(&self.path);
235        }
236    }
237
238    fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
239        let nanos = SystemTime::now()
240            .duration_since(UNIX_EPOCH)
241            .unwrap()
242            .as_nanos();
243        let path = reddb_file::layout::group_commit_temp_wal_path(
244            &std::env::temp_dir(),
245            name,
246            std::process::id(),
247            nanos,
248        );
249        let _ = std::fs::remove_file(&path);
250        (FileGuard { path: path.clone() }, path)
251    }
252
253    #[test]
254    fn fast_path_when_already_flushed() {
255        let (_g, path) = temp_wal("fast_path");
256        let wal = Mutex::new(WalWriter::open(&path).unwrap());
257        let initial = wal.lock().unwrap().durable_lsn();
258        let gc = GroupCommit::new(initial);
259        // Target equal to the initial flushed_lsn → no fsync.
260        gc.commit_at_least(initial, &wal).unwrap();
261        assert_eq!(gc.flushed_lsn(), initial);
262    }
263
264    #[test]
265    fn single_writer_advances_flushed_lsn() {
266        let (_g, path) = temp_wal("single_writer");
267        let wal = Mutex::new(WalWriter::open(&path).unwrap());
268        let initial = wal.lock().unwrap().durable_lsn();
269        let gc = GroupCommit::new(initial);
270
271        // Append a record, capture its LSN, then commit.
272        let target = {
273            let mut w = wal.lock().unwrap();
274            w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
275            w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
276            w.current_lsn()
277        };
278        assert!(target > initial);
279
280        gc.commit_at_least(target, &wal).unwrap();
281        assert!(gc.flushed_lsn() >= target);
282    }
283
284    #[test]
285    fn flushed_lsn_is_monotonic() {
286        let (_g, path) = temp_wal("monotonic");
287        let wal = Mutex::new(WalWriter::open(&path).unwrap());
288        let initial = wal.lock().unwrap().durable_lsn();
289        let gc = GroupCommit::new(initial);
290
291        // First commit.
292        let lo = {
293            let mut w = wal.lock().unwrap();
294            w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
295            w.current_lsn()
296        };
297        gc.commit_at_least(lo, &wal).unwrap();
298        let after_lo = gc.flushed_lsn();
299
300        // Second commit advances further.
301        let hi = {
302            let mut w = wal.lock().unwrap();
303            w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
304            w.current_lsn()
305        };
306        gc.commit_at_least(hi, &wal).unwrap();
307        let after_hi = gc.flushed_lsn();
308
309        assert!(after_hi >= after_lo);
310        // Calling commit_at_least with `lo` after `hi` is a no-op.
311        gc.commit_at_least(lo, &wal).unwrap();
312        assert_eq!(gc.flushed_lsn(), after_hi);
313    }
314
315    #[test]
316    fn concurrent_writers_coalesce_through_one_coordinator() {
317        // Two threads each commit a few records. Both must succeed
318        // and `flushed_lsn` must reflect every byte they wrote.
319        // We can't directly count fsyncs at this layer (the WAL
320        // doesn't expose a sync counter), but the absence of
321        // deadlock and the correct final LSN are the contract.
322        let (_g, path) = temp_wal("two_writers");
323        let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
324        let initial = wal.lock().unwrap().durable_lsn();
325        let gc = Arc::new(GroupCommit::new(initial));
326
327        let mut handles = Vec::new();
328        for tx in 0..2u64 {
329            let wal_c = Arc::clone(&wal);
330            let gc_c = Arc::clone(&gc);
331            handles.push(thread::spawn(move || -> io::Result<()> {
332                for i in 0..10u64 {
333                    let target = {
334                        let mut w = wal_c.lock().unwrap();
335                        w.append(&WalRecord::Begin {
336                            tx_id: tx * 100 + i,
337                        })?;
338                        w.append(&WalRecord::Commit {
339                            tx_id: tx * 100 + i,
340                        })?;
341                        w.current_lsn()
342                    };
343                    gc_c.commit_at_least(target, &wal_c)?;
344                }
345                Ok(())
346            }));
347        }
348
349        for h in handles {
350            h.join().unwrap().unwrap();
351        }
352
353        let final_durable = wal.lock().unwrap().durable_lsn();
354        assert!(gc.flushed_lsn() >= final_durable);
355        // 20 commits worth of 21-byte Begin + 21-byte Commit
356        // records = 520 bytes minimum on top of the 8-byte header.
357        assert!(final_durable >= 8 + 520);
358    }
359
360    #[test]
361    fn waiter_retries_after_wakeup_if_previous_flush_was_too_small() {
362        let (_g, path) = temp_wal("waiter_retry");
363        let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
364        let initial = wal.lock().unwrap().durable_lsn();
365        let gc = Arc::new(GroupCommit::new(initial));
366
367        let target = {
368            let mut w = wal.lock().unwrap();
369            w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
370            w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
371            w.current_lsn()
372        };
373
374        {
375            let mut state = gc.state.lock().unwrap();
376            state.in_progress = true;
377        }
378
379        let (done_tx, done_rx) = mpsc::channel();
380        let wal_c = Arc::clone(&wal);
381        let gc_c = Arc::clone(&gc);
382        let waiter = thread::spawn(move || {
383            let result = gc_c.commit_at_least(target, &wal_c);
384            let _ = done_tx.send(result);
385        });
386
387        thread::sleep(Duration::from_millis(50));
388
389        {
390            let mut state = gc.state.lock().unwrap_or_else(|p| p.into_inner());
391            state.in_progress = false;
392        }
393        gc.cond.notify_all();
394
395        done_rx
396            .recv_timeout(Duration::from_secs(2))
397            .expect("waiter should retry as the next leader")
398            .unwrap();
399        waiter.join().unwrap();
400
401        assert!(gc.flushed_lsn() >= target);
402        assert!(wal.lock().unwrap().durable_lsn() >= target);
403    }
404
405    #[test]
406    fn high_concurrency_eight_writers_no_deadlock() {
407        // 8 threads × 50 commits each. Stress the coordinator:
408        // expected to complete without deadlock and with the WAL
409        // fully durable up to the largest committed LSN.
410        let (_g, path) = temp_wal("eight_writers");
411        let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
412        let initial = wal.lock().unwrap().durable_lsn();
413        let gc = Arc::new(GroupCommit::new(initial));
414
415        let mut handles = Vec::new();
416        for tx in 0..8u64 {
417            let wal_c = Arc::clone(&wal);
418            let gc_c = Arc::clone(&gc);
419            handles.push(thread::spawn(move || -> io::Result<()> {
420                for i in 0..50u64 {
421                    let target = {
422                        let mut w = wal_c.lock().unwrap();
423                        w.append(&WalRecord::Begin {
424                            tx_id: tx * 1000 + i,
425                        })?;
426                        w.append(&WalRecord::Commit {
427                            tx_id: tx * 1000 + i,
428                        })?;
429                        w.current_lsn()
430                    };
431                    gc_c.commit_at_least(target, &wal_c)?;
432                }
433                Ok(())
434            }));
435        }
436
437        for h in handles {
438            h.join().unwrap().unwrap();
439        }
440
441        let current = wal.lock().unwrap().current_lsn();
442        let durable = wal.lock().unwrap().durable_lsn();
443        assert_eq!(durable, current, "every appended byte must be durable");
444        assert!(gc.flushed_lsn() >= current);
445    }
446
447    #[test]
448    fn writers_recover_from_poisoned_state() {
449        // If a previous panic poisoned the state mutex, subsequent
450        // writers must still be able to commit (we recover via
451        // `unwrap_or_else(into_inner)`).
452        let (_g, path) = temp_wal("poison_recovery");
453        let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
454        let initial = wal.lock().unwrap().durable_lsn();
455        let gc = Arc::new(GroupCommit::new(initial));
456
457        // Poison the state mutex by panicking inside it.
458        let gc_c = Arc::clone(&gc);
459        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
460            let _state = gc_c.state.lock().unwrap();
461            panic!("intentional poison");
462        }));
463
464        // The mutex is now poisoned — but commit_at_least must
465        // still work because we recover from poisoning.
466        let target = {
467            let mut w = wal.lock().unwrap();
468            w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
469            w.current_lsn()
470        };
471        gc.commit_at_least(target, &wal).unwrap();
472        assert!(gc.flushed_lsn() >= target);
473    }
474}