Skip to main content

nodedb_wal/
group_commit.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Group commit coordinator.
4//!
5//! Batches concurrent WAL write requests into a single fsync for maximum NVMe IOPS.
6//!
7//! ## How it works
8//!
9//! 1. Multiple threads submit `PendingWrite` to the commit queue.
10//! 2. One thread becomes the "commit leader" (acquires the commit lock).
11//! 3. The leader drains all pending writes, appends them to the WAL writer,
12//!    issues a single `fsync`, and advances `durable_lsn`.
13//! 4. Non-leader threads discover their write was already committed when the
14//!    pending queue is empty after acquiring the commit lock. They verify the
15//!    commit succeeded via `last_commit_failed` before returning `durable: true`.
16//!
17//! This converts N fsyncs into 1 fsync, which is critical for NVMe performance.
18//!
19//! ## Safety invariants
20//!
21//! - `durable_lsn` is updated atomically **only after** a successful fsync.
22//! - If fsync fails, `last_commit_failed` is set so non-leader threads whose
23//!   writes were in the failed batch propagate the error instead of falsely
24//!   reporting `durable: true`.
25//! - Fsync failure is treated as **fatal for the batch** — the data may or may not
26//!   be on disk, and the caller must handle the error (retry, abort, etc.).
27
28use std::sync::Mutex;
29use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
30
31use crate::error::{Result, WalError};
32use crate::writer::WalWriter;
33
34/// A pending write request waiting to be committed.
35#[derive(Debug)]
36pub struct PendingWrite {
37    /// Record type discriminant.
38    pub record_type: u32,
39    /// Tenant ID.
40    pub tenant_id: u64,
41    /// Virtual shard ID.
42    pub vshard_id: u32,
43    /// Payload bytes.
44    pub payload: Vec<u8>,
45}
46
47/// Result delivered to each waiter after group commit completes.
48#[derive(Debug, Clone, Copy)]
49pub struct CommitResult {
50    /// The LSN assigned to this record.
51    pub lsn: u64,
52    /// Whether the fsync succeeded (record is durable).
53    pub durable: bool,
54}
55
56/// Thread-safe group commit coordinator.
57///
58/// Multiple threads call `submit()` which blocks until the record is durable.
59/// Internally, one thread becomes the commit leader and batches all pending
60/// writes into a single WAL flush.
61///
62/// ## Safety invariants
63///
64/// - A non-leader thread only returns `durable: true` if it confirms its
65///   write was drained from the pending queue **and** the leader's fsync
66///   succeeded (checked via `last_commit_failed`).
67/// - If the leader's fsync fails, all non-leader threads whose writes were
68///   in the failed batch receive an error.
69/// - `durable_lsn` is updated **only after** a successful fsync.
70pub struct GroupCommitter {
71    /// Pending writes queue.
72    pending: Mutex<Vec<PendingWrite>>,
73
74    /// The durable LSN — all records with LSN <= this value are on disk.
75    durable_lsn: AtomicU64,
76
77    /// Set to `true` if the most recent commit batch failed (fsync error).
78    /// Non-leader threads whose writes were part of the failed batch check
79    /// this flag and propagate the error instead of returning `durable: true`.
80    /// Cleared at the start of each successful commit.
81    last_commit_failed: AtomicBool,
82
83    /// Serializes commit operations (drain + write + fsync).
84    /// Separate from `pending` so enqueue doesn't block on I/O.
85    commit_lock: Mutex<()>,
86}
87
88impl GroupCommitter {
89    /// Create a new group committer.
90    pub fn new() -> Self {
91        Self {
92            pending: Mutex::new(Vec::with_capacity(1024)),
93            durable_lsn: AtomicU64::new(0),
94            last_commit_failed: AtomicBool::new(false),
95            commit_lock: Mutex::new(()),
96        }
97    }
98
99    /// Submit a write and block until it's durable.
100    ///
101    /// Returns the assigned LSN once the batch containing this write has been
102    /// fsynced to disk. If fsync fails, the error is propagated to all threads
103    /// whose writes were in the failed batch.
104    pub fn submit(&self, writer: &Mutex<WalWriter>, write: PendingWrite) -> Result<CommitResult> {
105        // Enqueue the write.
106        {
107            let mut pending = self.pending.lock().map_err(|_| WalError::LockPoisoned {
108                context: "pending queue",
109            })?;
110            pending.push(write);
111        }
112
113        // Try to become the commit leader. If another thread holds the commit
114        // lock, we block here until it finishes — then we check if our write
115        // was already committed by that leader.
116        let _commit_guard = self
117            .commit_lock
118            .lock()
119            .map_err(|_| WalError::LockPoisoned {
120                context: "commit lock",
121            })?;
122
123        // Drain pending writes. If the previous leader already committed our
124        // write, the pending queue will be empty (our write was drained by
125        // that leader). If the queue is non-empty, we are the new leader.
126        let batch: Vec<PendingWrite> = {
127            let mut pending = self.pending.lock().map_err(|_| WalError::LockPoisoned {
128                context: "pending queue (drain)",
129            })?;
130            std::mem::take(&mut *pending)
131        };
132
133        if batch.is_empty() {
134            // Previous leader drained our write. Check if that commit
135            // succeeded or failed. This is the critical fix: without this
136            // check, a non-leader would return durable:true even if the
137            // leader's fsync failed.
138            if self.last_commit_failed.load(Ordering::Acquire) {
139                return Err(WalError::Io(std::io::Error::other(
140                    "WAL fsync failed in previous group commit batch",
141                )));
142            }
143            let lsn = self.durable_lsn.load(Ordering::Acquire);
144            return Ok(CommitResult { lsn, durable: true });
145        }
146
147        // We are the leader — append and fsync.
148        let mut wal = writer.lock().map_err(|_| WalError::LockPoisoned {
149            context: "WAL writer",
150        })?;
151        let mut last_lsn = 0;
152
153        for w in &batch {
154            last_lsn = wal.append(w.record_type, w.tenant_id, w.vshard_id, &w.payload)?;
155        }
156
157        let sync_result = wal.sync();
158        drop(wal);
159
160        match sync_result {
161            Ok(()) => {
162                // Order matters: clear error flag before advancing durable_lsn.
163                // Non-leaders check error flag AFTER seeing empty batch, so
164                // the flag must be cleared before they can observe the new LSN.
165                self.last_commit_failed.store(false, Ordering::Release);
166                self.durable_lsn.store(last_lsn, Ordering::Release);
167                Ok(CommitResult {
168                    lsn: last_lsn,
169                    durable: true,
170                })
171            }
172            Err(e) => {
173                // Fsync failed. Mark the error so non-leader threads whose
174                // writes were in this batch (drained from pending) know not
175                // to report success. Do NOT update durable_lsn.
176                self.last_commit_failed.store(true, Ordering::Release);
177                Err(e)
178            }
179        }
180    }
181
182    /// Current durable LSN (all records <= this are on disk).
183    pub fn durable_lsn(&self) -> u64 {
184        self.durable_lsn.load(Ordering::Acquire)
185    }
186}
187
188impl Default for GroupCommitter {
189    fn default() -> Self {
190        Self::new()
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use crate::reader::WalReader;
198    use crate::record::RecordType;
199    use std::sync::Arc;
200    use std::thread;
201
202    #[test]
203    fn single_thread_group_commit() {
204        let dir = tempfile::tempdir().unwrap();
205        let path = dir.path().join("test.wal");
206
207        let writer = Mutex::new(WalWriter::open_without_direct_io(&path).unwrap());
208        let gc = GroupCommitter::new();
209
210        let result = gc
211            .submit(
212                &writer,
213                PendingWrite {
214                    record_type: RecordType::Put as u32,
215                    tenant_id: 1,
216                    vshard_id: 0,
217                    payload: b"hello".to_vec(),
218                },
219            )
220            .unwrap();
221
222        assert!(result.durable);
223        assert_eq!(result.lsn, 1);
224        assert_eq!(gc.durable_lsn(), 1);
225
226        // Verify the record is readable.
227        let reader = WalReader::open(&path).unwrap();
228        let records: Vec<_> = reader
229            .records()
230            .collect::<crate::error::Result<_>>()
231            .unwrap();
232        assert_eq!(records.len(), 1);
233        assert_eq!(records[0].payload, b"hello");
234    }
235
236    #[test]
237    fn concurrent_group_commit() {
238        let dir = tempfile::tempdir().unwrap();
239        let path = dir.path().join("test.wal");
240
241        let writer = Arc::new(Mutex::new(
242            WalWriter::open_without_direct_io(&path).unwrap(),
243        ));
244        let gc = Arc::new(GroupCommitter::new());
245
246        let mut handles = Vec::new();
247
248        for i in 0..10 {
249            let w = Arc::clone(&writer);
250            let g = Arc::clone(&gc);
251            handles.push(thread::spawn(move || {
252                let payload = format!("record-{i}");
253                let result = g
254                    .submit(
255                        &w,
256                        PendingWrite {
257                            record_type: RecordType::Put as u32,
258                            tenant_id: 1,
259                            vshard_id: 0,
260                            payload: payload.into_bytes(),
261                        },
262                    )
263                    .unwrap();
264                assert!(result.durable);
265                result.lsn
266            }));
267        }
268
269        let lsns: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
270
271        // All should have gotten durable results.
272        assert!(lsns.iter().all(|l| *l > 0));
273
274        // All 10 records should be in the WAL.
275        let reader = WalReader::open(&path).unwrap();
276        let records: Vec<_> = reader
277            .records()
278            .collect::<crate::error::Result<_>>()
279            .unwrap();
280        assert_eq!(records.len(), 10);
281    }
282}