nodedb_wal/group_commit.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Group commit coordinator.
4//!
5//! Batches concurrent WAL write requests into a single fsync for maximum NVMe IOPS.
6//!
7//! ## How it works
8//!
9//! 1. Multiple threads submit `PendingWrite` to the commit queue.
10//! 2. One thread becomes the "commit leader" (acquires the commit lock).
11//! 3. The leader drains all pending writes, appends them to the WAL writer,
12//! issues a single `fsync`, and advances `durable_lsn`.
13//! 4. Non-leader threads discover their write was already committed when the
14//! pending queue is empty after acquiring the commit lock. They verify the
15//! commit succeeded via `last_commit_failed` before returning `durable: true`.
16//!
17//! This converts N fsyncs into 1 fsync, which is critical for NVMe performance.
18//!
19//! ## Safety invariants
20//!
21//! - `durable_lsn` is updated atomically **only after** a successful fsync.
22//! - If fsync fails, `last_commit_failed` is set so non-leader threads whose
23//! writes were in the failed batch propagate the error instead of falsely
24//! reporting `durable: true`.
25//! - Fsync failure is treated as **fatal for the batch** — the data may or may not
26//! be on disk, and the caller must handle the error (retry, abort, etc.).
27
28use std::sync::Mutex;
29use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
30
31use crate::error::{Result, WalError};
32use crate::writer::WalWriter;
33
34/// A pending write request waiting to be committed.
35#[derive(Debug)]
36pub struct PendingWrite {
37 /// Record type discriminant.
38 pub record_type: u32,
39 /// Tenant ID.
40 pub tenant_id: u64,
41 /// Virtual shard ID.
42 pub vshard_id: u32,
43 /// Payload bytes.
44 pub payload: Vec<u8>,
45}
46
47/// Result delivered to each waiter after group commit completes.
48#[derive(Debug, Clone, Copy)]
49pub struct CommitResult {
50 /// The LSN assigned to this record.
51 pub lsn: u64,
52 /// Whether the fsync succeeded (record is durable).
53 pub durable: bool,
54}
55
56/// Thread-safe group commit coordinator.
57///
58/// Multiple threads call `submit()` which blocks until the record is durable.
59/// Internally, one thread becomes the commit leader and batches all pending
60/// writes into a single WAL flush.
61///
62/// ## Safety invariants
63///
64/// - A non-leader thread only returns `durable: true` if it confirms its
65/// write was drained from the pending queue **and** the leader's fsync
66/// succeeded (checked via `last_commit_failed`).
67/// - If the leader's fsync fails, all non-leader threads whose writes were
68/// in the failed batch receive an error.
69/// - `durable_lsn` is updated **only after** a successful fsync.
70pub struct GroupCommitter {
71 /// Pending writes queue.
72 pending: Mutex<Vec<PendingWrite>>,
73
74 /// The durable LSN — all records with LSN <= this value are on disk.
75 durable_lsn: AtomicU64,
76
77 /// Set to `true` if the most recent commit batch failed (fsync error).
78 /// Non-leader threads whose writes were part of the failed batch check
79 /// this flag and propagate the error instead of returning `durable: true`.
80 /// Cleared at the start of each successful commit.
81 last_commit_failed: AtomicBool,
82
83 /// Serializes commit operations (drain + write + fsync).
84 /// Separate from `pending` so enqueue doesn't block on I/O.
85 commit_lock: Mutex<()>,
86}
87
88impl GroupCommitter {
89 /// Create a new group committer.
90 pub fn new() -> Self {
91 Self {
92 pending: Mutex::new(Vec::with_capacity(1024)),
93 durable_lsn: AtomicU64::new(0),
94 last_commit_failed: AtomicBool::new(false),
95 commit_lock: Mutex::new(()),
96 }
97 }
98
99 /// Submit a write and block until it's durable.
100 ///
101 /// Returns the assigned LSN once the batch containing this write has been
102 /// fsynced to disk. If fsync fails, the error is propagated to all threads
103 /// whose writes were in the failed batch.
104 pub fn submit(&self, writer: &Mutex<WalWriter>, write: PendingWrite) -> Result<CommitResult> {
105 // Enqueue the write.
106 {
107 let mut pending = self.pending.lock().map_err(|_| WalError::LockPoisoned {
108 context: "pending queue",
109 })?;
110 pending.push(write);
111 }
112
113 // Try to become the commit leader. If another thread holds the commit
114 // lock, we block here until it finishes — then we check if our write
115 // was already committed by that leader.
116 let _commit_guard = self
117 .commit_lock
118 .lock()
119 .map_err(|_| WalError::LockPoisoned {
120 context: "commit lock",
121 })?;
122
123 // Drain pending writes. If the previous leader already committed our
124 // write, the pending queue will be empty (our write was drained by
125 // that leader). If the queue is non-empty, we are the new leader.
126 let batch: Vec<PendingWrite> = {
127 let mut pending = self.pending.lock().map_err(|_| WalError::LockPoisoned {
128 context: "pending queue (drain)",
129 })?;
130 std::mem::take(&mut *pending)
131 };
132
133 if batch.is_empty() {
134 // Previous leader drained our write. Check if that commit
135 // succeeded or failed. This is the critical fix: without this
136 // check, a non-leader would return durable:true even if the
137 // leader's fsync failed.
138 if self.last_commit_failed.load(Ordering::Acquire) {
139 return Err(WalError::Io(std::io::Error::other(
140 "WAL fsync failed in previous group commit batch",
141 )));
142 }
143 let lsn = self.durable_lsn.load(Ordering::Acquire);
144 return Ok(CommitResult { lsn, durable: true });
145 }
146
147 // We are the leader — append and fsync.
148 let mut wal = writer.lock().map_err(|_| WalError::LockPoisoned {
149 context: "WAL writer",
150 })?;
151 let mut last_lsn = 0;
152
153 for w in &batch {
154 last_lsn = wal.append(w.record_type, w.tenant_id, w.vshard_id, &w.payload)?;
155 }
156
157 let sync_result = wal.sync();
158 drop(wal);
159
160 match sync_result {
161 Ok(()) => {
162 // Order matters: clear error flag before advancing durable_lsn.
163 // Non-leaders check error flag AFTER seeing empty batch, so
164 // the flag must be cleared before they can observe the new LSN.
165 self.last_commit_failed.store(false, Ordering::Release);
166 self.durable_lsn.store(last_lsn, Ordering::Release);
167 Ok(CommitResult {
168 lsn: last_lsn,
169 durable: true,
170 })
171 }
172 Err(e) => {
173 // Fsync failed. Mark the error so non-leader threads whose
174 // writes were in this batch (drained from pending) know not
175 // to report success. Do NOT update durable_lsn.
176 self.last_commit_failed.store(true, Ordering::Release);
177 Err(e)
178 }
179 }
180 }
181
182 /// Current durable LSN (all records <= this are on disk).
183 pub fn durable_lsn(&self) -> u64 {
184 self.durable_lsn.load(Ordering::Acquire)
185 }
186}
187
188impl Default for GroupCommitter {
189 fn default() -> Self {
190 Self::new()
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use crate::reader::WalReader;
198 use crate::record::RecordType;
199 use std::sync::Arc;
200 use std::thread;
201
202 #[test]
203 fn single_thread_group_commit() {
204 let dir = tempfile::tempdir().unwrap();
205 let path = dir.path().join("test.wal");
206
207 let writer = Mutex::new(WalWriter::open_without_direct_io(&path).unwrap());
208 let gc = GroupCommitter::new();
209
210 let result = gc
211 .submit(
212 &writer,
213 PendingWrite {
214 record_type: RecordType::Put as u32,
215 tenant_id: 1,
216 vshard_id: 0,
217 payload: b"hello".to_vec(),
218 },
219 )
220 .unwrap();
221
222 assert!(result.durable);
223 assert_eq!(result.lsn, 1);
224 assert_eq!(gc.durable_lsn(), 1);
225
226 // Verify the record is readable.
227 let reader = WalReader::open(&path).unwrap();
228 let records: Vec<_> = reader
229 .records()
230 .collect::<crate::error::Result<_>>()
231 .unwrap();
232 assert_eq!(records.len(), 1);
233 assert_eq!(records[0].payload, b"hello");
234 }
235
236 #[test]
237 fn concurrent_group_commit() {
238 let dir = tempfile::tempdir().unwrap();
239 let path = dir.path().join("test.wal");
240
241 let writer = Arc::new(Mutex::new(
242 WalWriter::open_without_direct_io(&path).unwrap(),
243 ));
244 let gc = Arc::new(GroupCommitter::new());
245
246 let mut handles = Vec::new();
247
248 for i in 0..10 {
249 let w = Arc::clone(&writer);
250 let g = Arc::clone(&gc);
251 handles.push(thread::spawn(move || {
252 let payload = format!("record-{i}");
253 let result = g
254 .submit(
255 &w,
256 PendingWrite {
257 record_type: RecordType::Put as u32,
258 tenant_id: 1,
259 vshard_id: 0,
260 payload: payload.into_bytes(),
261 },
262 )
263 .unwrap();
264 assert!(result.durable);
265 result.lsn
266 }));
267 }
268
269 let lsns: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
270
271 // All should have gotten durable results.
272 assert!(lsns.iter().all(|l| *l > 0));
273
274 // All 10 records should be in the WAL.
275 let reader = WalReader::open(&path).unwrap();
276 let records: Vec<_> = reader
277 .records()
278 .collect::<crate::error::Result<_>>()
279 .unwrap();
280 assert_eq!(records.len(), 10);
281 }
282}