Skip to main content

nodedb_wal/
group_commit.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Group commit coordinator.
4//!
5//! Batches concurrent WAL write requests into a single fsync for maximum NVMe IOPS.
6//!
7//! ## How it works
8//!
9//! 1. Multiple threads submit `PendingWrite` to the commit queue.
10//! 2. One thread becomes the "commit leader" (acquires the commit lock).
11//! 3. The leader drains all pending writes, appends them to the WAL writer,
12//!    issues a single `fsync`, and advances `durable_lsn`.
13//! 4. Non-leader threads discover their write was already committed when the
14//!    pending queue is empty after acquiring the commit lock. They verify the
15//!    commit succeeded via `last_commit_failed` before returning `durable: true`.
16//!
17//! This converts N fsyncs into 1 fsync, which is critical for NVMe performance.
18//!
19//! ## Safety invariants
20//!
21//! - `durable_lsn` is updated atomically **only after** a successful fsync.
22//! - If fsync fails, `last_commit_failed` is set so non-leader threads whose
23//!   writes were in the failed batch propagate the error instead of falsely
24//!   reporting `durable: true`.
25//! - Fsync failure is treated as **fatal for the batch** — the data may or may not
26//!   be on disk, and the caller must handle the error (retry, abort, etc.).
27
28use std::sync::Mutex;
29use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
30
31use crate::error::{Result, WalError};
32use crate::writer::WalWriter;
33
34/// A pending write request waiting to be committed.
35#[derive(Debug)]
36pub struct PendingWrite {
37    /// Record type discriminant.
38    pub record_type: u32,
39    /// Tenant ID.
40    pub tenant_id: u64,
41    /// Virtual shard ID.
42    pub vshard_id: u32,
43    /// Database identifier (raw u64). Zero maps to the default database.
44    pub database_id: u64,
45    /// Payload bytes.
46    pub payload: Vec<u8>,
47}
48
49/// Result delivered to each waiter after group commit completes.
50#[derive(Debug, Clone, Copy)]
51pub struct CommitResult {
52    /// The LSN assigned to this record.
53    pub lsn: u64,
54    /// Whether the fsync succeeded (record is durable).
55    pub durable: bool,
56}
57
58/// Thread-safe group commit coordinator.
59///
60/// Multiple threads call `submit()` which blocks until the record is durable.
61/// Internally, one thread becomes the commit leader and batches all pending
62/// writes into a single WAL flush.
63///
64/// ## Safety invariants
65///
66/// - A non-leader thread only returns `durable: true` if it confirms its
67///   write was drained from the pending queue **and** the leader's fsync
68///   succeeded (checked via `last_commit_failed`).
69/// - If the leader's fsync fails, all non-leader threads whose writes were
70///   in the failed batch receive an error.
71/// - `durable_lsn` is updated **only after** a successful fsync.
72pub struct GroupCommitter {
73    /// Pending writes queue.
74    pending: Mutex<Vec<PendingWrite>>,
75
76    /// The durable LSN — all records with LSN <= this value are on disk.
77    durable_lsn: AtomicU64,
78
79    /// Set to `true` if the most recent commit batch failed (fsync error).
80    /// Non-leader threads whose writes were part of the failed batch check
81    /// this flag and propagate the error instead of returning `durable: true`.
82    /// Cleared at the start of each successful commit.
83    last_commit_failed: AtomicBool,
84
85    /// Serializes commit operations (drain + write + fsync).
86    /// Separate from `pending` so enqueue doesn't block on I/O.
87    commit_lock: Mutex<()>,
88}
89
90impl GroupCommitter {
91    /// Create a new group committer.
92    pub fn new() -> Self {
93        Self {
94            pending: Mutex::new(Vec::with_capacity(1024)),
95            durable_lsn: AtomicU64::new(0),
96            last_commit_failed: AtomicBool::new(false),
97            commit_lock: Mutex::new(()),
98        }
99    }
100
101    /// Submit a write and block until it's durable.
102    ///
103    /// Returns the assigned LSN once the batch containing this write has been
104    /// fsynced to disk. If fsync fails, the error is propagated to all threads
105    /// whose writes were in the failed batch.
106    pub fn submit(&self, writer: &Mutex<WalWriter>, write: PendingWrite) -> Result<CommitResult> {
107        // Enqueue the write.
108        {
109            let mut pending = self.pending.lock().map_err(|_| WalError::LockPoisoned {
110                context: "pending queue",
111            })?;
112            pending.push(write);
113        }
114
115        // Try to become the commit leader. If another thread holds the commit
116        // lock, we block here until it finishes — then we check if our write
117        // was already committed by that leader.
118        let _commit_guard = self
119            .commit_lock
120            .lock()
121            .map_err(|_| WalError::LockPoisoned {
122                context: "commit lock",
123            })?;
124
125        // Drain pending writes. If the previous leader already committed our
126        // write, the pending queue will be empty (our write was drained by
127        // that leader). If the queue is non-empty, we are the new leader.
128        let batch: Vec<PendingWrite> = {
129            let mut pending = self.pending.lock().map_err(|_| WalError::LockPoisoned {
130                context: "pending queue (drain)",
131            })?;
132            std::mem::take(&mut *pending)
133        };
134
135        if batch.is_empty() {
136            // Previous leader drained our write. Check if that commit
137            // succeeded or failed. This is the critical fix: without this
138            // check, a non-leader would return durable:true even if the
139            // leader's fsync failed.
140            if self.last_commit_failed.load(Ordering::Acquire) {
141                return Err(WalError::Io(std::io::Error::other(
142                    "WAL fsync failed in previous group commit batch",
143                )));
144            }
145            let lsn = self.durable_lsn.load(Ordering::Acquire);
146            return Ok(CommitResult { lsn, durable: true });
147        }
148
149        // We are the leader — append and fsync.
150        let mut wal = writer.lock().map_err(|_| WalError::LockPoisoned {
151            context: "WAL writer",
152        })?;
153        let mut last_lsn = 0;
154
155        for w in &batch {
156            last_lsn = wal.append(
157                w.record_type,
158                w.tenant_id,
159                w.vshard_id,
160                w.database_id,
161                &w.payload,
162            )?;
163        }
164
165        let sync_result = wal.sync();
166        drop(wal);
167
168        match sync_result {
169            Ok(()) => {
170                // Order matters: clear error flag before advancing durable_lsn.
171                // Non-leaders check error flag AFTER seeing empty batch, so
172                // the flag must be cleared before they can observe the new LSN.
173                self.last_commit_failed.store(false, Ordering::Release);
174                self.durable_lsn.store(last_lsn, Ordering::Release);
175                Ok(CommitResult {
176                    lsn: last_lsn,
177                    durable: true,
178                })
179            }
180            Err(e) => {
181                // Fsync failed. Mark the error so non-leader threads whose
182                // writes were in this batch (drained from pending) know not
183                // to report success. Do NOT update durable_lsn.
184                self.last_commit_failed.store(true, Ordering::Release);
185                Err(e)
186            }
187        }
188    }
189
190    /// Current durable LSN (all records <= this are on disk).
191    pub fn durable_lsn(&self) -> u64 {
192        self.durable_lsn.load(Ordering::Acquire)
193    }
194}
195
196impl Default for GroupCommitter {
197    fn default() -> Self {
198        Self::new()
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::reader::WalReader;
206    use crate::record::RecordType;
207    use std::sync::Arc;
208    use std::thread;
209
210    #[test]
211    fn single_thread_group_commit() {
212        let dir = tempfile::tempdir().unwrap();
213        let path = dir.path().join("test.wal");
214
215        let writer = Mutex::new(WalWriter::open_without_direct_io(&path).unwrap());
216        let gc = GroupCommitter::new();
217
218        let result = gc
219            .submit(
220                &writer,
221                PendingWrite {
222                    record_type: RecordType::Put as u32,
223                    tenant_id: 1,
224                    vshard_id: 0,
225                    database_id: 0,
226                    payload: b"hello".to_vec(),
227                },
228            )
229            .unwrap();
230
231        assert!(result.durable);
232        assert_eq!(result.lsn, 1);
233        assert_eq!(gc.durable_lsn(), 1);
234
235        // Verify the record is readable.
236        let reader = WalReader::open(&path).unwrap();
237        let records: Vec<_> = reader
238            .records()
239            .collect::<crate::error::Result<_>>()
240            .unwrap();
241        assert_eq!(records.len(), 1);
242        assert_eq!(records[0].payload, b"hello");
243    }
244
245    #[test]
246    fn concurrent_group_commit() {
247        let dir = tempfile::tempdir().unwrap();
248        let path = dir.path().join("test.wal");
249
250        let writer = Arc::new(Mutex::new(
251            WalWriter::open_without_direct_io(&path).unwrap(),
252        ));
253        let gc = Arc::new(GroupCommitter::new());
254
255        let mut handles = Vec::new();
256
257        for i in 0..10 {
258            let w = Arc::clone(&writer);
259            let g = Arc::clone(&gc);
260            handles.push(thread::spawn(move || {
261                let payload = format!("record-{i}");
262                let result = g
263                    .submit(
264                        &w,
265                        PendingWrite {
266                            record_type: RecordType::Put as u32,
267                            tenant_id: 1,
268                            vshard_id: 0,
269                            database_id: 0,
270                            payload: payload.into_bytes(),
271                        },
272                    )
273                    .unwrap();
274                assert!(result.durable);
275                result.lsn
276            }));
277        }
278
279        let lsns: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
280
281        // All should have gotten durable results.
282        assert!(lsns.iter().all(|l| *l > 0));
283
284        // All 10 records should be in the WAL.
285        let reader = WalReader::open(&path).unwrap();
286        let records: Vec<_> = reader
287            .records()
288            .collect::<crate::error::Result<_>>()
289            .unwrap();
290        assert_eq!(records.len(), 10);
291    }
292}