reddb_server/storage/wal/group_commit.rs
1//! Cooperative group-commit coordinator for the WAL.
2//!
3//! Mirrors PostgreSQL's `XLogFlush` waiter logic
4//! (`src/backend/access/transam/xlog.c`). The single-writer commit
5//! path used to call `wal.sync()` once per commit, so N concurrent
6//! writers paid N independent fsyncs (~N × 100 µs on SSD). Group
7//! commit collapses those into **one** fsync that covers every byte
8//! appended up to the slowest writer's LSN.
9//!
10//! # Algorithm
11//!
12//! 1. A writer appends its records under the WAL lock and captures
13//! the resulting `commit_lsn = wal.current_lsn()` after its
14//! `Commit` record.
15//! 2. The writer releases the WAL lock and calls
16//! [`GroupCommit::commit_at_least(commit_lsn, &wal)`].
17//! 3. Inside `commit_at_least`:
18//! - **Fast path:** if `flushed_lsn >= commit_lsn`, the write is
19//! already durable from a piggyback on a previous fsync.
20//! Return immediately.
21//! - Otherwise take the coordinator state lock. Re-check the
22//! flushed LSN (another leader may have raced).
23//! - If a leader is already mid-flush, wait on the condvar until
24//! that flush finishes. Re-check `flushed_lsn`; if the batch now
25//! covers `commit_lsn`, return, otherwise race to become the next
26//! leader for the remaining tail.
27//! - If no leader is in progress, become the leader: mark
28//! `in_progress = true`, drop the state lock, take the WAL
29//! lock, call `wal.sync()`, publish the new `flushed_lsn`,
30//! take the state lock again, clear `in_progress`, and
31//! notify all waiters.
32//!
33//! # Why this works
34//!
35//! Between the first writer's `append` and the leader's `wal.sync()`,
36//! other writers can grab the WAL lock and append more records.
37//! When the leader finally calls `sync()`, it flushes **everything**
38//! that has been appended so far — not just its own records. Writers
39//! whose LSN is now covered return immediately; writers that appended
40//! after the leader captured `target_lsn` wake up, see they still need
41//! more durability, and one of them becomes the next leader.
42//!
43//! So `commit_at_least` produces one fsync per *batch* of concurrent
44//! writers, not per writer. On a workload with 8 concurrent
45//! committers, the throughput goes from ~8 × 100 µs ≈ 1 250
46//! commits/s to ~1 × 100 µs ≈ 10 000 commits/s, an 8× win.
47//!
48//! # Correctness
49//!
50//! - `flushed_lsn` is monotonic: only the leader writes it, and
51//! only after a successful `sync()`.
52//! - The state lock + condvar guarantee that exactly one leader is
53//! ever in flight, so we never have two parallel fsyncs racing.
54//! - Waiters re-check `flushed_lsn` under the state lock before
55//! sleeping, so we never miss a wake-up.
56//! - The leader does **only** the WAL `sync()` while holding
57//! leadership — no extra work — to keep the critical section as
58//! short as possible.
59
60use std::io;
61use std::sync::atomic::{AtomicU64, Ordering};
62use std::sync::{Condvar, Mutex};
63
64use super::writer::WalWriter;
65
66/// Coordinator state guarded by `GroupCommit::state`.
67struct GroupCommitState {
68 /// True when a leader is currently inside `wal.sync()`.
69 in_progress: bool,
70}
71
72/// Cooperative WAL flush coordinator.
73///
74/// Owned by the [`super::transaction::TransactionManager`]; cheap to
75/// share across writers via `Arc`.
76pub struct GroupCommit {
77 /// Highest LSN known to be durable. Atomic so the fast path can
78 /// read it without taking the state lock.
79 flushed_lsn: AtomicU64,
80 /// Coordination state.
81 state: Mutex<GroupCommitState>,
82 /// Wakes waiters when `flushed_lsn` advances.
83 cond: Condvar,
84}
85
86struct LeadershipGuard<'a> {
87 group_commit: &'a GroupCommit,
88}
89
90impl GroupCommit {
91 /// Create a new coordinator initialised with the WAL's current
92 /// durable position. Pass `wal.durable_lsn()` from a freshly
93 /// opened `WalWriter`.
94 pub fn new(initial_durable_lsn: u64) -> Self {
95 Self {
96 flushed_lsn: AtomicU64::new(initial_durable_lsn),
97 state: Mutex::new(GroupCommitState { in_progress: false }),
98 cond: Condvar::new(),
99 }
100 }
101
102 /// Highest LSN that is known durable on disk. Cheap atomic read,
103 /// no lock taken. Used by tests and by the diagnostics surface.
104 pub fn flushed_lsn(&self) -> u64 {
105 self.flushed_lsn.load(Ordering::Acquire)
106 }
107
108 /// Block the caller until the WAL is durable up to at least
109 /// `target`. If another writer is already mid-flush, piggyback
110 /// on it; otherwise become the leader and do the fsync ourselves.
111 ///
112 /// `wal` is the same `Mutex<WalWriter>` the transaction manager
113 /// holds. The leader briefly takes that lock to call `sync()`.
114 pub fn commit_at_least(&self, target: u64, wal: &Mutex<WalWriter>) -> io::Result<()> {
115 // ── Fast path: already flushed past us. ─────────────────────
116 if self.flushed_lsn.load(Ordering::Acquire) >= target {
117 return Ok(());
118 }
119
120 let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
121
122 loop {
123 // Re-check under the lock: another leader may have raced
124 // and already flushed past our target between the load
125 // above and the lock acquisition.
126 if self.flushed_lsn.load(Ordering::Acquire) >= target {
127 return Ok(());
128 }
129
130 if state.in_progress {
131 // Another leader is mid-flush. Wait until they wake us,
132 // then re-check whether our target is covered. If not,
133 // loop and try to become the next leader instead of
134 // going back to sleep forever.
135 state = self
136 .cond
137 .wait_while(state, |state| {
138 state.in_progress && self.flushed_lsn.load(Ordering::Acquire) < target
139 })
140 .unwrap_or_else(|p| p.into_inner());
141 continue;
142 }
143
144 // ── We are the leader. ──────────────────────────────────
145 state.in_progress = true;
146 break;
147 }
148
149 // Drop the state lock so waiters can re-check `flushed_lsn`
150 // without bouncing on this mutex.
151 drop(state);
152 let _leader = LeadershipGuard { group_commit: self };
153
154 // Phase 1 — take the WAL lock BRIEFLY to drain the BufWriter
155 // into the kernel and capture both the LSN and a cloned
156 // file handle for the size-aware sync. The whole point of this dance
157 // is that we DO NOT hold the WAL lock during the expensive
158 // disk sync below.
159 let sync = {
160 let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
161 wal_guard.drain_for_group_sync()?
162 };
163
164 // Phase 2 — the lock is released. Other writers can now
165 // append into the BufWriter while we wait on fsync. Their
166 // bytes will either be picked up by THIS sync (if
167 // they reach the kernel before the syscall returns) or
168 // by the NEXT leader.
169 //
170 // The sync plan uses sync_data() when the WAL remains inside
171 // already-synced preallocation, and sync_all() when file/allocation
172 // metadata must be forced.
173 sync.sync()?;
174
175 // Phase 3 — take the WAL lock briefly to publish the new
176 // durable LSN. Other writers may have appended in the
177 // meantime; their bytes that reached the kernel are NOW
178 // also durable, but we conservatively only mark `target_lsn`
179 // (the LSN we drained) as durable. Any later writer's
180 // commit_at_least call will quickly become a no-op via the
181 // fast path if the kernel already had their bytes when we
182 // fsync'd.
183 {
184 let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
185 wal_guard.mark_durable(&sync);
186 }
187
188 // Publish the new flushed LSN to atomic readers, then
189 // let the leadership guard wake every waiter.
190 self.flushed_lsn.store(sync.target_lsn(), Ordering::Release);
191
192 Ok(())
193 }
194
195 fn release_leadership(&self) {
196 let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
197 state.in_progress = false;
198 drop(state);
199 self.cond.notify_all();
200 }
201}
202
203impl std::fmt::Debug for GroupCommit {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 f.debug_struct("GroupCommit")
206 .field("flushed_lsn", &self.flushed_lsn.load(Ordering::Acquire))
207 .finish()
208 }
209}
210
211impl Drop for LeadershipGuard<'_> {
212 fn drop(&mut self) {
213 self.group_commit.release_leadership();
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use crate::storage::wal::record::WalRecord;
221 use crate::storage::wal::writer::WalWriter;
222 use std::path::PathBuf;
223 use std::sync::mpsc;
224 use std::sync::Arc;
225 use std::thread;
226 use std::time::{Duration, SystemTime, UNIX_EPOCH};
227
228 struct FileGuard {
229 path: PathBuf,
230 }
231
232 impl Drop for FileGuard {
233 fn drop(&mut self) {
234 let _ = std::fs::remove_file(&self.path);
235 }
236 }
237
238 fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
239 let nanos = SystemTime::now()
240 .duration_since(UNIX_EPOCH)
241 .unwrap()
242 .as_nanos();
243 let path = reddb_file::layout::group_commit_temp_wal_path(
244 &std::env::temp_dir(),
245 name,
246 std::process::id(),
247 nanos,
248 );
249 let _ = std::fs::remove_file(&path);
250 (FileGuard { path: path.clone() }, path)
251 }
252
253 #[test]
254 fn fast_path_when_already_flushed() {
255 let (_g, path) = temp_wal("fast_path");
256 let wal = Mutex::new(WalWriter::open(&path).unwrap());
257 let initial = wal.lock().unwrap().durable_lsn();
258 let gc = GroupCommit::new(initial);
259 // Target equal to the initial flushed_lsn → no fsync.
260 gc.commit_at_least(initial, &wal).unwrap();
261 assert_eq!(gc.flushed_lsn(), initial);
262 }
263
264 #[test]
265 fn single_writer_advances_flushed_lsn() {
266 let (_g, path) = temp_wal("single_writer");
267 let wal = Mutex::new(WalWriter::open(&path).unwrap());
268 let initial = wal.lock().unwrap().durable_lsn();
269 let gc = GroupCommit::new(initial);
270
271 // Append a record, capture its LSN, then commit.
272 let target = {
273 let mut w = wal.lock().unwrap();
274 w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
275 w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
276 w.current_lsn()
277 };
278 assert!(target > initial);
279
280 gc.commit_at_least(target, &wal).unwrap();
281 assert!(gc.flushed_lsn() >= target);
282 }
283
284 #[test]
285 fn flushed_lsn_is_monotonic() {
286 let (_g, path) = temp_wal("monotonic");
287 let wal = Mutex::new(WalWriter::open(&path).unwrap());
288 let initial = wal.lock().unwrap().durable_lsn();
289 let gc = GroupCommit::new(initial);
290
291 // First commit.
292 let lo = {
293 let mut w = wal.lock().unwrap();
294 w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
295 w.current_lsn()
296 };
297 gc.commit_at_least(lo, &wal).unwrap();
298 let after_lo = gc.flushed_lsn();
299
300 // Second commit advances further.
301 let hi = {
302 let mut w = wal.lock().unwrap();
303 w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
304 w.current_lsn()
305 };
306 gc.commit_at_least(hi, &wal).unwrap();
307 let after_hi = gc.flushed_lsn();
308
309 assert!(after_hi >= after_lo);
310 // Calling commit_at_least with `lo` after `hi` is a no-op.
311 gc.commit_at_least(lo, &wal).unwrap();
312 assert_eq!(gc.flushed_lsn(), after_hi);
313 }
314
315 #[test]
316 fn concurrent_writers_coalesce_through_one_coordinator() {
317 // Two threads each commit a few records. Both must succeed
318 // and `flushed_lsn` must reflect every byte they wrote.
319 // We can't directly count fsyncs at this layer (the WAL
320 // doesn't expose a sync counter), but the absence of
321 // deadlock and the correct final LSN are the contract.
322 let (_g, path) = temp_wal("two_writers");
323 let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
324 let initial = wal.lock().unwrap().durable_lsn();
325 let gc = Arc::new(GroupCommit::new(initial));
326
327 let mut handles = Vec::new();
328 for tx in 0..2u64 {
329 let wal_c = Arc::clone(&wal);
330 let gc_c = Arc::clone(&gc);
331 handles.push(thread::spawn(move || -> io::Result<()> {
332 for i in 0..10u64 {
333 let target = {
334 let mut w = wal_c.lock().unwrap();
335 w.append(&WalRecord::Begin {
336 tx_id: tx * 100 + i,
337 })?;
338 w.append(&WalRecord::Commit {
339 tx_id: tx * 100 + i,
340 })?;
341 w.current_lsn()
342 };
343 gc_c.commit_at_least(target, &wal_c)?;
344 }
345 Ok(())
346 }));
347 }
348
349 for h in handles {
350 h.join().unwrap().unwrap();
351 }
352
353 let final_durable = wal.lock().unwrap().durable_lsn();
354 assert!(gc.flushed_lsn() >= final_durable);
355 // 20 commits worth of 21-byte Begin + 21-byte Commit
356 // records = 520 bytes minimum on top of the 8-byte header.
357 assert!(final_durable >= 8 + 520);
358 }
359
360 #[test]
361 fn waiter_retries_after_wakeup_if_previous_flush_was_too_small() {
362 let (_g, path) = temp_wal("waiter_retry");
363 let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
364 let initial = wal.lock().unwrap().durable_lsn();
365 let gc = Arc::new(GroupCommit::new(initial));
366
367 let target = {
368 let mut w = wal.lock().unwrap();
369 w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
370 w.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
371 w.current_lsn()
372 };
373
374 {
375 let mut state = gc.state.lock().unwrap();
376 state.in_progress = true;
377 }
378
379 let (done_tx, done_rx) = mpsc::channel();
380 let wal_c = Arc::clone(&wal);
381 let gc_c = Arc::clone(&gc);
382 let waiter = thread::spawn(move || {
383 let result = gc_c.commit_at_least(target, &wal_c);
384 let _ = done_tx.send(result);
385 });
386
387 thread::sleep(Duration::from_millis(50));
388
389 {
390 let mut state = gc.state.lock().unwrap_or_else(|p| p.into_inner());
391 state.in_progress = false;
392 }
393 gc.cond.notify_all();
394
395 done_rx
396 .recv_timeout(Duration::from_secs(2))
397 .expect("waiter should retry as the next leader")
398 .unwrap();
399 waiter.join().unwrap();
400
401 assert!(gc.flushed_lsn() >= target);
402 assert!(wal.lock().unwrap().durable_lsn() >= target);
403 }
404
405 #[test]
406 fn high_concurrency_eight_writers_no_deadlock() {
407 // 8 threads × 50 commits each. Stress the coordinator:
408 // expected to complete without deadlock and with the WAL
409 // fully durable up to the largest committed LSN.
410 let (_g, path) = temp_wal("eight_writers");
411 let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
412 let initial = wal.lock().unwrap().durable_lsn();
413 let gc = Arc::new(GroupCommit::new(initial));
414
415 let mut handles = Vec::new();
416 for tx in 0..8u64 {
417 let wal_c = Arc::clone(&wal);
418 let gc_c = Arc::clone(&gc);
419 handles.push(thread::spawn(move || -> io::Result<()> {
420 for i in 0..50u64 {
421 let target = {
422 let mut w = wal_c.lock().unwrap();
423 w.append(&WalRecord::Begin {
424 tx_id: tx * 1000 + i,
425 })?;
426 w.append(&WalRecord::Commit {
427 tx_id: tx * 1000 + i,
428 })?;
429 w.current_lsn()
430 };
431 gc_c.commit_at_least(target, &wal_c)?;
432 }
433 Ok(())
434 }));
435 }
436
437 for h in handles {
438 h.join().unwrap().unwrap();
439 }
440
441 let current = wal.lock().unwrap().current_lsn();
442 let durable = wal.lock().unwrap().durable_lsn();
443 assert_eq!(durable, current, "every appended byte must be durable");
444 assert!(gc.flushed_lsn() >= current);
445 }
446
447 #[test]
448 fn writers_recover_from_poisoned_state() {
449 // If a previous panic poisoned the state mutex, subsequent
450 // writers must still be able to commit (we recover via
451 // `unwrap_or_else(into_inner)`).
452 let (_g, path) = temp_wal("poison_recovery");
453 let wal = Arc::new(Mutex::new(WalWriter::open(&path).unwrap()));
454 let initial = wal.lock().unwrap().durable_lsn();
455 let gc = Arc::new(GroupCommit::new(initial));
456
457 // Poison the state mutex by panicking inside it.
458 let gc_c = Arc::clone(&gc);
459 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
460 let _state = gc_c.state.lock().unwrap();
461 panic!("intentional poison");
462 }));
463
464 // The mutex is now poisoned — but commit_at_least must
465 // still work because we recover from poisoning.
466 let target = {
467 let mut w = wal.lock().unwrap();
468 w.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
469 w.current_lsn()
470 };
471 gc.commit_at_least(target, &wal).unwrap();
472 assert!(gc.flushed_lsn() >= target);
473 }
474}