nodedb_wal/group_commit.rs
1//! Group commit coordinator.
2//!
3//! Batches concurrent WAL write requests into a single fsync for maximum NVMe IOPS.
4//!
5//! ## How it works
6//!
7//! 1. Multiple threads submit `PendingWrite` to the commit queue.
8//! 2. One thread becomes the "commit leader" (acquires the commit lock).
9//! 3. The leader drains all pending writes, appends them to the WAL writer,
10//! issues a single `fsync`, and advances `durable_lsn`.
11//! 4. Non-leader threads discover their write was already committed when the
12//! pending queue is empty after acquiring the commit lock. They verify the
13//! commit succeeded via `last_commit_failed` before returning `durable: true`.
14//!
15//! This converts N fsyncs into 1 fsync, which is critical for NVMe performance.
16//!
17//! ## Safety invariants
18//!
19//! - `durable_lsn` is updated atomically **only after** a successful fsync.
20//! - If fsync fails, `last_commit_failed` is set so non-leader threads whose
21//! writes were in the failed batch propagate the error instead of falsely
22//! reporting `durable: true`.
23//! - Fsync failure is treated as **fatal for the batch** — the data may or may not
24//! be on disk, and the caller must handle the error (retry, abort, etc.).
25
26use std::sync::Mutex;
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28
29use crate::error::{Result, WalError};
30use crate::writer::WalWriter;
31
32/// A pending write request waiting to be committed.
33#[derive(Debug)]
34pub struct PendingWrite {
35 /// Record type discriminant.
36 pub record_type: u16,
37 /// Tenant ID.
38 pub tenant_id: u32,
39 /// Virtual shard ID.
40 pub vshard_id: u16,
41 /// Payload bytes.
42 pub payload: Vec<u8>,
43}
44
45/// Result delivered to each waiter after group commit completes.
46#[derive(Debug, Clone, Copy)]
47pub struct CommitResult {
48 /// The LSN assigned to this record.
49 pub lsn: u64,
50 /// Whether the fsync succeeded (record is durable).
51 pub durable: bool,
52}
53
54/// Thread-safe group commit coordinator.
55///
56/// Multiple threads call `submit()` which blocks until the record is durable.
57/// Internally, one thread becomes the commit leader and batches all pending
58/// writes into a single WAL flush.
59///
60/// ## Safety invariants
61///
62/// - A non-leader thread only returns `durable: true` if it confirms its
63/// write was drained from the pending queue **and** the leader's fsync
64/// succeeded (checked via `last_commit_failed`).
65/// - If the leader's fsync fails, all non-leader threads whose writes were
66/// in the failed batch receive an error.
67/// - `durable_lsn` is updated **only after** a successful fsync.
68pub struct GroupCommitter {
69 /// Pending writes queue.
70 pending: Mutex<Vec<PendingWrite>>,
71
72 /// The durable LSN — all records with LSN <= this value are on disk.
73 durable_lsn: AtomicU64,
74
75 /// Set to `true` if the most recent commit batch failed (fsync error).
76 /// Non-leader threads whose writes were part of the failed batch check
77 /// this flag and propagate the error instead of returning `durable: true`.
78 /// Cleared at the start of each successful commit.
79 last_commit_failed: AtomicBool,
80
81 /// Serializes commit operations (drain + write + fsync).
82 /// Separate from `pending` so enqueue doesn't block on I/O.
83 commit_lock: Mutex<()>,
84}
85
86impl GroupCommitter {
87 /// Create a new group committer.
88 pub fn new() -> Self {
89 Self {
90 pending: Mutex::new(Vec::with_capacity(1024)),
91 durable_lsn: AtomicU64::new(0),
92 last_commit_failed: AtomicBool::new(false),
93 commit_lock: Mutex::new(()),
94 }
95 }
96
97 /// Submit a write and block until it's durable.
98 ///
99 /// Returns the assigned LSN once the batch containing this write has been
100 /// fsynced to disk. If fsync fails, the error is propagated to all threads
101 /// whose writes were in the failed batch.
102 pub fn submit(&self, writer: &Mutex<WalWriter>, write: PendingWrite) -> Result<CommitResult> {
103 // Enqueue the write.
104 {
105 let mut pending = self.pending.lock().map_err(|_| WalError::LockPoisoned {
106 context: "pending queue",
107 })?;
108 pending.push(write);
109 }
110
111 // Try to become the commit leader. If another thread holds the commit
112 // lock, we block here until it finishes — then we check if our write
113 // was already committed by that leader.
114 let _commit_guard = self
115 .commit_lock
116 .lock()
117 .map_err(|_| WalError::LockPoisoned {
118 context: "commit lock",
119 })?;
120
121 // Drain pending writes. If the previous leader already committed our
122 // write, the pending queue will be empty (our write was drained by
123 // that leader). If the queue is non-empty, we are the new leader.
124 let batch: Vec<PendingWrite> = {
125 let mut pending = self.pending.lock().map_err(|_| WalError::LockPoisoned {
126 context: "pending queue (drain)",
127 })?;
128 std::mem::take(&mut *pending)
129 };
130
131 if batch.is_empty() {
132 // Previous leader drained our write. Check if that commit
133 // succeeded or failed. This is the critical fix: without this
134 // check, a non-leader would return durable:true even if the
135 // leader's fsync failed.
136 if self.last_commit_failed.load(Ordering::Acquire) {
137 return Err(WalError::Io(std::io::Error::other(
138 "WAL fsync failed in previous group commit batch",
139 )));
140 }
141 let lsn = self.durable_lsn.load(Ordering::Acquire);
142 return Ok(CommitResult { lsn, durable: true });
143 }
144
145 // We are the leader — append and fsync.
146 let mut wal = writer.lock().map_err(|_| WalError::LockPoisoned {
147 context: "WAL writer",
148 })?;
149 let mut last_lsn = 0;
150
151 for w in &batch {
152 last_lsn = wal.append(w.record_type, w.tenant_id, w.vshard_id, &w.payload)?;
153 }
154
155 let sync_result = wal.sync();
156 drop(wal);
157
158 match sync_result {
159 Ok(()) => {
160 // Order matters: clear error flag before advancing durable_lsn.
161 // Non-leaders check error flag AFTER seeing empty batch, so
162 // the flag must be cleared before they can observe the new LSN.
163 self.last_commit_failed.store(false, Ordering::Release);
164 self.durable_lsn.store(last_lsn, Ordering::Release);
165 Ok(CommitResult {
166 lsn: last_lsn,
167 durable: true,
168 })
169 }
170 Err(e) => {
171 // Fsync failed. Mark the error so non-leader threads whose
172 // writes were in this batch (drained from pending) know not
173 // to report success. Do NOT update durable_lsn.
174 self.last_commit_failed.store(true, Ordering::Release);
175 Err(e)
176 }
177 }
178 }
179
180 /// Current durable LSN (all records <= this are on disk).
181 pub fn durable_lsn(&self) -> u64 {
182 self.durable_lsn.load(Ordering::Acquire)
183 }
184}
185
186impl Default for GroupCommitter {
187 fn default() -> Self {
188 Self::new()
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use crate::reader::WalReader;
196 use crate::record::RecordType;
197 use std::sync::Arc;
198 use std::thread;
199
200 #[test]
201 fn single_thread_group_commit() {
202 let dir = tempfile::tempdir().unwrap();
203 let path = dir.path().join("test.wal");
204
205 let writer = Mutex::new(WalWriter::open_without_direct_io(&path).unwrap());
206 let gc = GroupCommitter::new();
207
208 let result = gc
209 .submit(
210 &writer,
211 PendingWrite {
212 record_type: RecordType::Put as u16,
213 tenant_id: 1,
214 vshard_id: 0,
215 payload: b"hello".to_vec(),
216 },
217 )
218 .unwrap();
219
220 assert!(result.durable);
221 assert_eq!(result.lsn, 1);
222 assert_eq!(gc.durable_lsn(), 1);
223
224 // Verify the record is readable.
225 let reader = WalReader::open(&path).unwrap();
226 let records: Vec<_> = reader
227 .records()
228 .collect::<crate::error::Result<_>>()
229 .unwrap();
230 assert_eq!(records.len(), 1);
231 assert_eq!(records[0].payload, b"hello");
232 }
233
234 #[test]
235 fn concurrent_group_commit() {
236 let dir = tempfile::tempdir().unwrap();
237 let path = dir.path().join("test.wal");
238
239 let writer = Arc::new(Mutex::new(
240 WalWriter::open_without_direct_io(&path).unwrap(),
241 ));
242 let gc = Arc::new(GroupCommitter::new());
243
244 let mut handles = Vec::new();
245
246 for i in 0..10 {
247 let w = Arc::clone(&writer);
248 let g = Arc::clone(&gc);
249 handles.push(thread::spawn(move || {
250 let payload = format!("record-{i}");
251 let result = g
252 .submit(
253 &w,
254 PendingWrite {
255 record_type: RecordType::Put as u16,
256 tenant_id: 1,
257 vshard_id: 0,
258 payload: payload.into_bytes(),
259 },
260 )
261 .unwrap();
262 assert!(result.durable);
263 result.lsn
264 }));
265 }
266
267 let lsns: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
268
269 // All should have gotten durable results.
270 assert!(lsns.iter().all(|l| *l > 0));
271
272 // All 10 records should be in the WAL.
273 let reader = WalReader::open(&path).unwrap();
274 let records: Vec<_> = reader
275 .records()
276 .collect::<crate::error::Result<_>>()
277 .unwrap();
278 assert_eq!(records.len(), 10);
279 }
280}