nodedb_wal/group_commit.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Group commit coordinator.
4//!
5//! Batches concurrent WAL write requests into a single fsync for maximum NVMe IOPS.
6//!
7//! ## How it works
8//!
9//! 1. Multiple threads submit `PendingWrite` to the commit queue.
10//! 2. One thread becomes the "commit leader" (acquires the commit lock).
11//! 3. The leader drains all pending writes, appends them to the WAL writer,
12//! issues a single `fsync`, and advances `durable_lsn`.
13//! 4. Non-leader threads discover their write was already committed when the
14//! pending queue is empty after acquiring the commit lock. They verify the
15//! commit succeeded via `last_commit_failed` before returning `durable: true`.
16//!
17//! This converts N fsyncs into 1 fsync, which is critical for NVMe performance.
18//!
19//! ## Safety invariants
20//!
21//! - `durable_lsn` is updated atomically **only after** a successful fsync.
22//! - If fsync fails, `last_commit_failed` is set so non-leader threads whose
23//! writes were in the failed batch propagate the error instead of falsely
24//! reporting `durable: true`.
25//! - Fsync failure is treated as **fatal for the batch** — the data may or may not
26//! be on disk, and the caller must handle the error (retry, abort, etc.).
27
28use std::sync::Mutex;
29use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
30
31use crate::error::{Result, WalError};
32use crate::writer::WalWriter;
33
34/// A pending write request waiting to be committed.
35#[derive(Debug)]
36pub struct PendingWrite {
37 /// Record type discriminant.
38 pub record_type: u32,
39 /// Tenant ID.
40 pub tenant_id: u64,
41 /// Virtual shard ID.
42 pub vshard_id: u32,
43 /// Database identifier (raw u64). Zero maps to the default database.
44 pub database_id: u64,
45 /// Payload bytes.
46 pub payload: Vec<u8>,
47}
48
49/// Result delivered to each waiter after group commit completes.
50#[derive(Debug, Clone, Copy)]
51pub struct CommitResult {
52 /// The LSN assigned to this record.
53 pub lsn: u64,
54 /// Whether the fsync succeeded (record is durable).
55 pub durable: bool,
56}
57
58/// Thread-safe group commit coordinator.
59///
60/// Multiple threads call `submit()` which blocks until the record is durable.
61/// Internally, one thread becomes the commit leader and batches all pending
62/// writes into a single WAL flush.
63///
64/// ## Safety invariants
65///
66/// - A non-leader thread only returns `durable: true` if it confirms its
67/// write was drained from the pending queue **and** the leader's fsync
68/// succeeded (checked via `last_commit_failed`).
69/// - If the leader's fsync fails, all non-leader threads whose writes were
70/// in the failed batch receive an error.
71/// - `durable_lsn` is updated **only after** a successful fsync.
72pub struct GroupCommitter {
73 /// Pending writes queue.
74 pending: Mutex<Vec<PendingWrite>>,
75
76 /// The durable LSN — all records with LSN <= this value are on disk.
77 durable_lsn: AtomicU64,
78
79 /// Set to `true` if the most recent commit batch failed (fsync error).
80 /// Non-leader threads whose writes were part of the failed batch check
81 /// this flag and propagate the error instead of returning `durable: true`.
82 /// Cleared at the start of each successful commit.
83 last_commit_failed: AtomicBool,
84
85 /// Serializes commit operations (drain + write + fsync).
86 /// Separate from `pending` so enqueue doesn't block on I/O.
87 commit_lock: Mutex<()>,
88}
89
90impl GroupCommitter {
91 /// Create a new group committer.
92 pub fn new() -> Self {
93 Self {
94 pending: Mutex::new(Vec::with_capacity(1024)),
95 durable_lsn: AtomicU64::new(0),
96 last_commit_failed: AtomicBool::new(false),
97 commit_lock: Mutex::new(()),
98 }
99 }
100
101 /// Submit a write and block until it's durable.
102 ///
103 /// Returns the assigned LSN once the batch containing this write has been
104 /// fsynced to disk. If fsync fails, the error is propagated to all threads
105 /// whose writes were in the failed batch.
106 pub fn submit(&self, writer: &Mutex<WalWriter>, write: PendingWrite) -> Result<CommitResult> {
107 // Enqueue the write.
108 {
109 let mut pending = self.pending.lock().map_err(|_| WalError::LockPoisoned {
110 context: "pending queue",
111 })?;
112 pending.push(write);
113 }
114
115 // Try to become the commit leader. If another thread holds the commit
116 // lock, we block here until it finishes — then we check if our write
117 // was already committed by that leader.
118 let _commit_guard = self
119 .commit_lock
120 .lock()
121 .map_err(|_| WalError::LockPoisoned {
122 context: "commit lock",
123 })?;
124
125 // Drain pending writes. If the previous leader already committed our
126 // write, the pending queue will be empty (our write was drained by
127 // that leader). If the queue is non-empty, we are the new leader.
128 let batch: Vec<PendingWrite> = {
129 let mut pending = self.pending.lock().map_err(|_| WalError::LockPoisoned {
130 context: "pending queue (drain)",
131 })?;
132 std::mem::take(&mut *pending)
133 };
134
135 if batch.is_empty() {
136 // Previous leader drained our write. Check if that commit
137 // succeeded or failed. This is the critical fix: without this
138 // check, a non-leader would return durable:true even if the
139 // leader's fsync failed.
140 if self.last_commit_failed.load(Ordering::Acquire) {
141 return Err(WalError::Io(std::io::Error::other(
142 "WAL fsync failed in previous group commit batch",
143 )));
144 }
145 let lsn = self.durable_lsn.load(Ordering::Acquire);
146 return Ok(CommitResult { lsn, durable: true });
147 }
148
149 // We are the leader — append and fsync.
150 let mut wal = writer.lock().map_err(|_| WalError::LockPoisoned {
151 context: "WAL writer",
152 })?;
153 let mut last_lsn = 0;
154
155 for w in &batch {
156 last_lsn = wal.append(
157 w.record_type,
158 w.tenant_id,
159 w.vshard_id,
160 w.database_id,
161 &w.payload,
162 )?;
163 }
164
165 let sync_result = wal.sync();
166 drop(wal);
167
168 match sync_result {
169 Ok(()) => {
170 // Order matters: clear error flag before advancing durable_lsn.
171 // Non-leaders check error flag AFTER seeing empty batch, so
172 // the flag must be cleared before they can observe the new LSN.
173 self.last_commit_failed.store(false, Ordering::Release);
174 self.durable_lsn.store(last_lsn, Ordering::Release);
175 Ok(CommitResult {
176 lsn: last_lsn,
177 durable: true,
178 })
179 }
180 Err(e) => {
181 // Fsync failed. Mark the error so non-leader threads whose
182 // writes were in this batch (drained from pending) know not
183 // to report success. Do NOT update durable_lsn.
184 self.last_commit_failed.store(true, Ordering::Release);
185 Err(e)
186 }
187 }
188 }
189
190 /// Current durable LSN (all records <= this are on disk).
191 pub fn durable_lsn(&self) -> u64 {
192 self.durable_lsn.load(Ordering::Acquire)
193 }
194}
195
196impl Default for GroupCommitter {
197 fn default() -> Self {
198 Self::new()
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::reader::WalReader;
206 use crate::record::RecordType;
207 use std::sync::Arc;
208 use std::thread;
209
210 #[test]
211 fn single_thread_group_commit() {
212 let dir = tempfile::tempdir().unwrap();
213 let path = dir.path().join("test.wal");
214
215 let writer = Mutex::new(WalWriter::open_without_direct_io(&path).unwrap());
216 let gc = GroupCommitter::new();
217
218 let result = gc
219 .submit(
220 &writer,
221 PendingWrite {
222 record_type: RecordType::Put as u32,
223 tenant_id: 1,
224 vshard_id: 0,
225 database_id: 0,
226 payload: b"hello".to_vec(),
227 },
228 )
229 .unwrap();
230
231 assert!(result.durable);
232 assert_eq!(result.lsn, 1);
233 assert_eq!(gc.durable_lsn(), 1);
234
235 // Verify the record is readable.
236 let reader = WalReader::open(&path).unwrap();
237 let records: Vec<_> = reader
238 .records()
239 .collect::<crate::error::Result<_>>()
240 .unwrap();
241 assert_eq!(records.len(), 1);
242 assert_eq!(records[0].payload, b"hello");
243 }
244
245 #[test]
246 fn concurrent_group_commit() {
247 let dir = tempfile::tempdir().unwrap();
248 let path = dir.path().join("test.wal");
249
250 let writer = Arc::new(Mutex::new(
251 WalWriter::open_without_direct_io(&path).unwrap(),
252 ));
253 let gc = Arc::new(GroupCommitter::new());
254
255 let mut handles = Vec::new();
256
257 for i in 0..10 {
258 let w = Arc::clone(&writer);
259 let g = Arc::clone(&gc);
260 handles.push(thread::spawn(move || {
261 let payload = format!("record-{i}");
262 let result = g
263 .submit(
264 &w,
265 PendingWrite {
266 record_type: RecordType::Put as u32,
267 tenant_id: 1,
268 vshard_id: 0,
269 database_id: 0,
270 payload: payload.into_bytes(),
271 },
272 )
273 .unwrap();
274 assert!(result.durable);
275 result.lsn
276 }));
277 }
278
279 let lsns: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
280
281 // All should have gotten durable results.
282 assert!(lsns.iter().all(|l| *l > 0));
283
284 // All 10 records should be in the WAL.
285 let reader = WalReader::open(&path).unwrap();
286 let records: Vec<_> = reader
287 .records()
288 .collect::<crate::error::Result<_>>()
289 .unwrap();
290 assert_eq!(records.len(), 10);
291 }
292}