Skip to main content

nodedb_wal/
group_commit.rs

1//! Group commit coordinator.
2//!
3//! Batches concurrent WAL write requests into a single fsync for maximum NVMe IOPS.
4//!
5//! ## How it works
6//!
7//! 1. Multiple threads submit `PendingWrite` to the commit queue.
8//! 2. One thread becomes the "commit leader" (acquires the commit lock).
9//! 3. The leader drains all pending writes, appends them to the WAL writer,
10//!    issues a single `fsync`, and advances `durable_lsn`.
11//! 4. Non-leader threads discover their write was already committed when the
12//!    pending queue is empty after acquiring the commit lock. They verify the
13//!    commit succeeded via `last_commit_failed` before returning `durable: true`.
14//!
15//! This converts N fsyncs into 1 fsync, which is critical for NVMe performance.
16//!
17//! ## Safety invariants
18//!
19//! - `durable_lsn` is updated atomically **only after** a successful fsync.
20//! - If fsync fails, `last_commit_failed` is set so non-leader threads whose
21//!   writes were in the failed batch propagate the error instead of falsely
22//!   reporting `durable: true`.
23//! - Fsync failure is treated as **fatal for the batch** — the data may or may not
24//!   be on disk, and the caller must handle the error (retry, abort, etc.).
25
26use std::sync::Mutex;
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28
29use crate::error::{Result, WalError};
30use crate::writer::WalWriter;
31
32/// A pending write request waiting to be committed.
33#[derive(Debug)]
34pub struct PendingWrite {
35    /// Record type discriminant.
36    pub record_type: u16,
37    /// Tenant ID.
38    pub tenant_id: u32,
39    /// Virtual shard ID.
40    pub vshard_id: u16,
41    /// Payload bytes.
42    pub payload: Vec<u8>,
43}
44
45/// Result delivered to each waiter after group commit completes.
46#[derive(Debug, Clone, Copy)]
47pub struct CommitResult {
48    /// The LSN assigned to this record.
49    pub lsn: u64,
50    /// Whether the fsync succeeded (record is durable).
51    pub durable: bool,
52}
53
54/// Thread-safe group commit coordinator.
55///
56/// Multiple threads call `submit()` which blocks until the record is durable.
57/// Internally, one thread becomes the commit leader and batches all pending
58/// writes into a single WAL flush.
59///
60/// ## Safety invariants
61///
62/// - A non-leader thread only returns `durable: true` if it confirms its
63///   write was drained from the pending queue **and** the leader's fsync
64///   succeeded (checked via `last_commit_failed`).
65/// - If the leader's fsync fails, all non-leader threads whose writes were
66///   in the failed batch receive an error.
67/// - `durable_lsn` is updated **only after** a successful fsync.
68pub struct GroupCommitter {
69    /// Pending writes queue.
70    pending: Mutex<Vec<PendingWrite>>,
71
72    /// The durable LSN — all records with LSN <= this value are on disk.
73    durable_lsn: AtomicU64,
74
75    /// Set to `true` if the most recent commit batch failed (fsync error).
76    /// Non-leader threads whose writes were part of the failed batch check
77    /// this flag and propagate the error instead of returning `durable: true`.
78    /// Cleared at the start of each successful commit.
79    last_commit_failed: AtomicBool,
80
81    /// Serializes commit operations (drain + write + fsync).
82    /// Separate from `pending` so enqueue doesn't block on I/O.
83    commit_lock: Mutex<()>,
84}
85
86impl GroupCommitter {
87    /// Create a new group committer.
88    pub fn new() -> Self {
89        Self {
90            pending: Mutex::new(Vec::with_capacity(1024)),
91            durable_lsn: AtomicU64::new(0),
92            last_commit_failed: AtomicBool::new(false),
93            commit_lock: Mutex::new(()),
94        }
95    }
96
97    /// Submit a write and block until it's durable.
98    ///
99    /// Returns the assigned LSN once the batch containing this write has been
100    /// fsynced to disk. If fsync fails, the error is propagated to all threads
101    /// whose writes were in the failed batch.
102    pub fn submit(&self, writer: &Mutex<WalWriter>, write: PendingWrite) -> Result<CommitResult> {
103        // Enqueue the write.
104        {
105            let mut pending = self.pending.lock().map_err(|_| WalError::LockPoisoned {
106                context: "pending queue",
107            })?;
108            pending.push(write);
109        }
110
111        // Try to become the commit leader. If another thread holds the commit
112        // lock, we block here until it finishes — then we check if our write
113        // was already committed by that leader.
114        let _commit_guard = self
115            .commit_lock
116            .lock()
117            .map_err(|_| WalError::LockPoisoned {
118                context: "commit lock",
119            })?;
120
121        // Drain pending writes. If the previous leader already committed our
122        // write, the pending queue will be empty (our write was drained by
123        // that leader). If the queue is non-empty, we are the new leader.
124        let batch: Vec<PendingWrite> = {
125            let mut pending = self.pending.lock().map_err(|_| WalError::LockPoisoned {
126                context: "pending queue (drain)",
127            })?;
128            std::mem::take(&mut *pending)
129        };
130
131        if batch.is_empty() {
132            // Previous leader drained our write. Check if that commit
133            // succeeded or failed. This is the critical fix: without this
134            // check, a non-leader would return durable:true even if the
135            // leader's fsync failed.
136            if self.last_commit_failed.load(Ordering::Acquire) {
137                return Err(WalError::Io(std::io::Error::other(
138                    "WAL fsync failed in previous group commit batch",
139                )));
140            }
141            let lsn = self.durable_lsn.load(Ordering::Acquire);
142            return Ok(CommitResult { lsn, durable: true });
143        }
144
145        // We are the leader — append and fsync.
146        let mut wal = writer.lock().map_err(|_| WalError::LockPoisoned {
147            context: "WAL writer",
148        })?;
149        let mut last_lsn = 0;
150
151        for w in &batch {
152            last_lsn = wal.append(w.record_type, w.tenant_id, w.vshard_id, &w.payload)?;
153        }
154
155        let sync_result = wal.sync();
156        drop(wal);
157
158        match sync_result {
159            Ok(()) => {
160                // Order matters: clear error flag before advancing durable_lsn.
161                // Non-leaders check error flag AFTER seeing empty batch, so
162                // the flag must be cleared before they can observe the new LSN.
163                self.last_commit_failed.store(false, Ordering::Release);
164                self.durable_lsn.store(last_lsn, Ordering::Release);
165                Ok(CommitResult {
166                    lsn: last_lsn,
167                    durable: true,
168                })
169            }
170            Err(e) => {
171                // Fsync failed. Mark the error so non-leader threads whose
172                // writes were in this batch (drained from pending) know not
173                // to report success. Do NOT update durable_lsn.
174                self.last_commit_failed.store(true, Ordering::Release);
175                Err(e)
176            }
177        }
178    }
179
180    /// Current durable LSN (all records <= this are on disk).
181    pub fn durable_lsn(&self) -> u64 {
182        self.durable_lsn.load(Ordering::Acquire)
183    }
184}
185
186impl Default for GroupCommitter {
187    fn default() -> Self {
188        Self::new()
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use crate::reader::WalReader;
196    use crate::record::RecordType;
197    use std::sync::Arc;
198    use std::thread;
199
200    #[test]
201    fn single_thread_group_commit() {
202        let dir = tempfile::tempdir().unwrap();
203        let path = dir.path().join("test.wal");
204
205        let writer = Mutex::new(WalWriter::open_without_direct_io(&path).unwrap());
206        let gc = GroupCommitter::new();
207
208        let result = gc
209            .submit(
210                &writer,
211                PendingWrite {
212                    record_type: RecordType::Put as u16,
213                    tenant_id: 1,
214                    vshard_id: 0,
215                    payload: b"hello".to_vec(),
216                },
217            )
218            .unwrap();
219
220        assert!(result.durable);
221        assert_eq!(result.lsn, 1);
222        assert_eq!(gc.durable_lsn(), 1);
223
224        // Verify the record is readable.
225        let reader = WalReader::open(&path).unwrap();
226        let records: Vec<_> = reader
227            .records()
228            .collect::<crate::error::Result<_>>()
229            .unwrap();
230        assert_eq!(records.len(), 1);
231        assert_eq!(records[0].payload, b"hello");
232    }
233
234    #[test]
235    fn concurrent_group_commit() {
236        let dir = tempfile::tempdir().unwrap();
237        let path = dir.path().join("test.wal");
238
239        let writer = Arc::new(Mutex::new(
240            WalWriter::open_without_direct_io(&path).unwrap(),
241        ));
242        let gc = Arc::new(GroupCommitter::new());
243
244        let mut handles = Vec::new();
245
246        for i in 0..10 {
247            let w = Arc::clone(&writer);
248            let g = Arc::clone(&gc);
249            handles.push(thread::spawn(move || {
250                let payload = format!("record-{i}");
251                let result = g
252                    .submit(
253                        &w,
254                        PendingWrite {
255                            record_type: RecordType::Put as u16,
256                            tenant_id: 1,
257                            vshard_id: 0,
258                            payload: payload.into_bytes(),
259                        },
260                    )
261                    .unwrap();
262                assert!(result.durable);
263                result.lsn
264            }));
265        }
266
267        let lsns: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
268
269        // All should have gotten durable results.
270        assert!(lsns.iter().all(|l| *l > 0));
271
272        // All 10 records should be in the WAL.
273        let reader = WalReader::open(&path).unwrap();
274        let records: Vec<_> = reader
275            .records()
276            .collect::<crate::error::Result<_>>()
277            .unwrap();
278        assert_eq!(records.len(), 10);
279    }
280}