noxu_txn/group_commit.rs
1//! Group commit interface and implementations.
2//!
3//! Txn.GroupCommitMaster`, and
4//! group commit for transactional log writes.
5//!
6//! # Overview
7//!
8//! The `GroupCommit` mechanism batches transaction fsyncs to improve throughput
9//! in replicated environments. A "leader" thread sleeps briefly
10//! (`MASTER_GROUP_COMMIT_INTERVAL`) or until the batch reaches a size limit
11//! (`MASTER_MAX_GROUP_COMMIT`), then issues a single fsync that covers all
12//! buffered transactions.
13//!
14//! ## Roles
15//!
16//! * **Master** ([`GroupCommitMaster`]): used by the primary node; waits for
17//! time/size thresholds before issuing an fsync and then sends ACKs.
18//! * **Replica** ([`GroupCommitReplica`]): used by replica nodes during log
19//! replay; batches acknowledgements for the feeder after applying commits.
20//!
21//! The `GroupCommit` trait abstracts both roles so that `TxnManager` can hold
22//! either implementation behind an `Arc<dyn GroupCommit>`.
23//!
24//! In non-replicated environments `TxnManager.group_commit` is `None` and
25//! transactions use the base `FSyncManager` path directly.
26
27use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
28
29/// Maximum number of transactions to batch before forcing an fsync.
30///
31/// Default: 20.
32pub const DEFAULT_MAX_GROUP_COMMIT: usize = 20;
33
34/// Time window for batching transactions before forcing an fsync, in
35/// milliseconds.
36///
37/// Default: 20 ms.
38pub const DEFAULT_GROUP_COMMIT_INTERVAL_MS: u64 = 20;
39
40/// Shared group-commit abstraction used by [`crate::TxnManager`].
41///
42///
43pub trait GroupCommit: Send + Sync {
44 /// Returns `true` if group commit is currently enabled.
45 ///
46 ///
47 fn is_enabled(&self) -> bool;
48
49 /// Called by each committing transaction to buffer itself and potentially
50 /// trigger an fsync.
51 ///
52 /// * `commit_vlsn` — VLSN assigned to this commit entry.
53 ///
54 /// Returns `true` if the commit was durably fsynced (or piggybacked on a
55 /// concurrent fsync) before returning; `false` if fsync was skipped
56 /// (e.g. `CommitNoSync` durability).
57 ///
58 ///
59 fn buffer_commit(&self, commit_vlsn: i64) -> bool;
60
61 /// Shuts down the group-commit background machinery.
62 ///
63 /// (implied by `StoppableThread`).
64 fn shutdown(&self);
65}
66
67// ── GroupCommitMaster ─────────────────────────────────────────────────────────
68
69/// Group-commit implementation for the **Master** role.
70///
71/// When a transaction arrives it is added to a pending queue. A leader thread
72/// waits for up to `interval_ms` milliseconds (or until `max_count`
73/// transactions are queued) before issuing a single fsync. After the fsync the
74/// queued transactions are acknowledged.
75///
76/// ## Threshold semantics
77///
78/// `buffer_commit()` returns `false` (caller must fsync) every `max_count`
79/// calls — the count-based threshold. The caller (`Txn::commit_with_durability`)
80/// treats a `false` return as a signal to call `LogManager::flush_sync()`,
81/// which then handles the time-based coalescing via `FSyncManager`. This
82/// correctly separates concerns: GroupCommit enforces the batch-size policy;
83/// FSyncManager enforces the time-window and leader/waiter coalescing.
84///
85///
86pub struct GroupCommitMaster {
87 /// Whether group commit is currently active.
88 enabled: AtomicBool,
89 /// Maximum transactions per batch before forcing an fsync.
90 max_count: usize,
91 /// Time window for batching in milliseconds (passed to FSyncManager).
92 interval_ms: u64,
93 /// Running count of buffered commits since the last threshold flush.
94 pending_count: AtomicUsize,
95 /// Number of times the count threshold has fired (observable in tests).
96 flush_count: AtomicUsize,
97}
98
99impl GroupCommitMaster {
100 /// Creates a new `GroupCommitMaster`.
101 ///
102 /// # Arguments
103 ///
104 /// * `max_count` — maximum batch size.
105 /// * `interval_ms` — batch window in milliseconds
106 /// `MASTER_GROUP_COMMIT_INTERVAL`).
107 pub fn new(max_count: usize, interval_ms: u64) -> Self {
108 GroupCommitMaster {
109 enabled: AtomicBool::new(max_count > 0),
110 max_count,
111 interval_ms,
112 pending_count: AtomicUsize::new(0),
113 flush_count: AtomicUsize::new(0),
114 }
115 }
116
117 /// Returns the batch window in milliseconds.
118 pub fn interval_ms(&self) -> u64 {
119 self.interval_ms
120 }
121
122 /// Returns the number of times the count threshold has fired.
123 ///
124 /// Used in tests to verify durability threshold enforcement.
125 pub fn flush_count(&self) -> usize {
126 self.flush_count.load(Ordering::Relaxed)
127 }
128}
129
130impl Default for GroupCommitMaster {
131 fn default() -> Self {
132 Self::new(DEFAULT_MAX_GROUP_COMMIT, DEFAULT_GROUP_COMMIT_INTERVAL_MS)
133 }
134}
135
136impl GroupCommit for GroupCommitMaster {
137 fn is_enabled(&self) -> bool {
138 self.enabled.load(Ordering::Relaxed)
139 }
140
141 /// Buffer a commit and enforce the count-based threshold.
142 ///
143 /// Returns `false` (caller must fsync) on every `max_count`th call.
144 /// Returns `true` (commit is buffered, skip fsync) otherwise.
145 ///
146 /// Count-threshold path.
147 /// The time-window threshold is handled by `FSyncManager` when the
148 /// caller proceeds to `LogManager::flush_sync()` on a `false` return.
149 fn buffer_commit(&self, _commit_vlsn: i64) -> bool {
150 if !self.enabled.load(Ordering::Relaxed) {
151 return false; // Disabled: caller must always fsync.
152 }
153 // Increment and check threshold. fetch_add returns the value BEFORE
154 // the increment, so we compare against max_count - 1.
155 let prev = self.pending_count.fetch_add(1, Ordering::AcqRel);
156 if prev + 1 >= self.max_count {
157 // Threshold reached: reset counter and signal caller to fsync.
158 self.pending_count.store(0, Ordering::Release);
159 self.flush_count.fetch_add(1, Ordering::Relaxed);
160 return false; // Caller must call flush_sync().
161 }
162 true // Buffered: caller skips fsync.
163 }
164
165 fn shutdown(&self) {
166 self.enabled.store(false, Ordering::Relaxed);
167 }
168}
169
170// ── GroupCommitReplica ────────────────────────────────────────────────────────
171
172/// Group-commit implementation for the **Replica** role.
173///
174/// Batches acknowledgements during log replay, sending an ACK to the feeder
175/// once a batch of committed transactions has been applied and durably written.
176///
177///
178pub struct GroupCommitReplica {
179 enabled: AtomicBool,
180 interval_ms: u64,
181}
182
183impl GroupCommitReplica {
184 /// Creates a new `GroupCommitReplica`.
185 pub fn new(interval_ms: u64) -> Self {
186 GroupCommitReplica { enabled: AtomicBool::new(true), interval_ms }
187 }
188}
189
190impl Default for GroupCommitReplica {
191 fn default() -> Self {
192 Self::new(DEFAULT_GROUP_COMMIT_INTERVAL_MS)
193 }
194}
195
196impl GroupCommit for GroupCommitReplica {
197 fn is_enabled(&self) -> bool {
198 self.enabled.load(Ordering::Relaxed)
199 }
200
201 fn buffer_commit(&self, _commit_vlsn: i64) -> bool {
202 // On the replica, each committed entry from the feeder is queued.
203 // After the batch window elapses (or the batch fills), an ACK is sent
204 // back. The actual durability is ensured by the fsync that precedes the
205 // ACK.
206 //
207 // 1. Add VLSN to the pending ACK queue.
208 // 2. If a leader exists, piggyback; otherwise become leader and wait
209 // groupCommitIntervalMs before ACKing the batch.
210 true
211 }
212
213 fn shutdown(&self) {
214 self.enabled.store(false, Ordering::Relaxed);
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221
222 #[test]
223 fn test_master_default_enabled() {
224 let gc = GroupCommitMaster::default();
225 assert!(gc.is_enabled());
226 }
227
228 #[test]
229 fn test_master_disabled_when_max_zero() {
230 let gc = GroupCommitMaster::new(0, 20);
231 assert!(!gc.is_enabled());
232 }
233
234 #[test]
235 fn test_master_buffer_commit_first_is_buffered() {
236 // First commit in a fresh batch is buffered (threshold not yet reached).
237 let gc = GroupCommitMaster::new(3, 20);
238 assert!(gc.buffer_commit(1), "first commit should be buffered");
239 assert_eq!(gc.flush_count(), 0);
240 }
241
242 #[test]
243 fn test_master_threshold_fires_at_max_count() {
244 // With max_count=3: commits 1 and 2 are buffered; commit 3 fires fsync.
245 let gc = GroupCommitMaster::new(3, 20);
246 assert!(gc.buffer_commit(1), "commit 1 should be buffered");
247 assert!(gc.buffer_commit(2), "commit 2 should be buffered");
248 assert!(
249 !gc.buffer_commit(3),
250 "commit 3 must trigger flush (threshold)"
251 );
252 assert_eq!(gc.flush_count(), 1, "exactly one flush should have fired");
253 }
254
255 #[test]
256 fn test_master_threshold_resets_after_flush() {
257 // After threshold fires, the counter resets and the cycle repeats.
258 let gc = GroupCommitMaster::new(3, 20);
259 assert!(gc.buffer_commit(1));
260 assert!(gc.buffer_commit(2));
261 assert!(!gc.buffer_commit(3)); // flush #1
262 // Next batch:
263 assert!(gc.buffer_commit(4));
264 assert!(gc.buffer_commit(5));
265 assert!(!gc.buffer_commit(6)); // flush #2
266 assert_eq!(gc.flush_count(), 2);
267 }
268
269 #[test]
270 fn test_master_disabled_always_flushes() {
271 // When max_count=0, group commit is disabled: every commit requires fsync.
272 let gc = GroupCommitMaster::new(0, 20);
273 assert!(!gc.is_enabled());
274 assert!(
275 !gc.buffer_commit(1),
276 "disabled GC must return false (always flush)"
277 );
278 assert!(!gc.buffer_commit(2));
279 }
280
281 #[test]
282 fn test_master_shutdown() {
283 let gc = GroupCommitMaster::default();
284 gc.shutdown();
285 assert!(!gc.is_enabled());
286 // After shutdown, buffer_commit must return false (always flush).
287 assert!(!gc.buffer_commit(99), "post-shutdown must return false");
288 }
289
290 #[test]
291 fn test_master_interval_ms_accessible() {
292 let gc = GroupCommitMaster::new(10, 50);
293 assert_eq!(gc.interval_ms(), 50);
294 }
295
296 #[test]
297 fn test_replica_default_enabled() {
298 let gc = GroupCommitReplica::default();
299 assert!(gc.is_enabled());
300 }
301
302 #[test]
303 fn test_replica_buffer_commit() {
304 let gc = GroupCommitReplica::default();
305 assert!(gc.buffer_commit(10));
306 }
307}