Skip to main content

noxu_txn/
group_commit.rs

1//! Group commit interface and implementations.
2//!
3//! Txn.GroupCommitMaster`, and
4//! group commit for transactional log writes.
5//!
6//! # Overview
7//!
8//! The `GroupCommit` mechanism batches transaction fsyncs to improve throughput
9//! in replicated environments. A "leader" thread sleeps briefly
10//! (`MASTER_GROUP_COMMIT_INTERVAL`) or until the batch reaches a size limit
11//! (`MASTER_MAX_GROUP_COMMIT`), then issues a single fsync that covers all
12//! buffered transactions.
13//!
14//! ## Roles
15//!
16//! * **Master** ([`GroupCommitMaster`]): used by the primary node; waits for
17//!   time/size thresholds before issuing an fsync and then sends ACKs.
18//! * **Replica** ([`GroupCommitReplica`]): used by replica nodes during log
19//!   replay; batches acknowledgements for the feeder after applying commits.
20//!
21//! The `GroupCommit` trait abstracts both roles so that `TxnManager` can hold
22//! either implementation behind an `Arc<dyn GroupCommit>`.
23//!
24//! In non-replicated environments `TxnManager.group_commit` is `None` and
25//! transactions use the base `FSyncManager` path directly.
26
27use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
28
29/// Maximum number of transactions to batch before forcing an fsync.
30///
31/// Default: 20.
32pub const DEFAULT_MAX_GROUP_COMMIT: usize = 20;
33
34/// Time window for batching transactions before forcing an fsync, in
35/// milliseconds.
36///
37/// Default: 20 ms.
38pub const DEFAULT_GROUP_COMMIT_INTERVAL_MS: u64 = 20;
39
40/// Shared group-commit abstraction used by [`crate::TxnManager`].
41///
42///
43pub trait GroupCommit: Send + Sync {
44    /// Returns `true` if group commit is currently enabled.
45    ///
46    ///
47    fn is_enabled(&self) -> bool;
48
49    /// Called by each committing transaction to buffer itself and potentially
50    /// trigger an fsync.
51    ///
52    /// * `commit_vlsn` — VLSN assigned to this commit entry.
53    ///
54    /// Returns `true` if the commit was durably fsynced (or piggybacked on a
55    /// concurrent fsync) before returning; `false` if fsync was skipped
56    /// (e.g. `CommitNoSync` durability).
57    ///
58    ///
59    fn buffer_commit(&self, commit_vlsn: i64) -> bool;
60
61    /// Shuts down the group-commit background machinery.
62    ///
63    /// (implied by `StoppableThread`).
64    fn shutdown(&self);
65}
66
67// ── GroupCommitMaster ─────────────────────────────────────────────────────────
68
69/// Group-commit implementation for the **Master** role.
70///
71/// When a transaction arrives it is added to a pending queue. A leader thread
72/// waits for up to `interval_ms` milliseconds (or until `max_count`
73/// transactions are queued) before issuing a single fsync. After the fsync the
74/// queued transactions are acknowledged.
75///
76/// ## Threshold semantics
77///
78/// `buffer_commit()` returns `false` (caller must fsync) every `max_count`
79/// calls — the count-based threshold.  The caller (`Txn::commit_with_durability`)
80/// treats a `false` return as a signal to call `LogManager::flush_sync()`,
81/// which then handles the time-based coalescing via `FSyncManager`.  This
82/// correctly separates concerns: GroupCommit enforces the batch-size policy;
83/// FSyncManager enforces the time-window and leader/waiter coalescing.
84///
85///
86pub struct GroupCommitMaster {
87    /// Whether group commit is currently active.
88    enabled: AtomicBool,
89    /// Maximum transactions per batch before forcing an fsync.
90    max_count: usize,
91    /// Time window for batching in milliseconds (passed to FSyncManager).
92    interval_ms: u64,
93    /// Running count of buffered commits since the last threshold flush.
94    pending_count: AtomicUsize,
95    /// Number of times the count threshold has fired (observable in tests).
96    flush_count: AtomicUsize,
97}
98
99impl GroupCommitMaster {
100    /// Creates a new `GroupCommitMaster`.
101    ///
102    /// # Arguments
103    ///
104    /// * `max_count` — maximum batch size.
105    /// * `interval_ms` — batch window in milliseconds
106    ///   `MASTER_GROUP_COMMIT_INTERVAL`).
107    pub fn new(max_count: usize, interval_ms: u64) -> Self {
108        GroupCommitMaster {
109            enabled: AtomicBool::new(max_count > 0),
110            max_count,
111            interval_ms,
112            pending_count: AtomicUsize::new(0),
113            flush_count: AtomicUsize::new(0),
114        }
115    }
116
117    /// Returns the batch window in milliseconds.
118    pub fn interval_ms(&self) -> u64 {
119        self.interval_ms
120    }
121
122    /// Returns the number of times the count threshold has fired.
123    ///
124    /// Used in tests to verify durability threshold enforcement.
125    pub fn flush_count(&self) -> usize {
126        self.flush_count.load(Ordering::Relaxed)
127    }
128}
129
130impl Default for GroupCommitMaster {
131    fn default() -> Self {
132        Self::new(DEFAULT_MAX_GROUP_COMMIT, DEFAULT_GROUP_COMMIT_INTERVAL_MS)
133    }
134}
135
136impl GroupCommit for GroupCommitMaster {
137    fn is_enabled(&self) -> bool {
138        self.enabled.load(Ordering::Relaxed)
139    }
140
141    /// Buffer a commit and enforce the count-based threshold.
142    ///
143    /// Returns `false` (caller must fsync) on every `max_count`th call.
144    /// Returns `true` (commit is buffered, skip fsync) otherwise.
145    ///
146    /// Count-threshold path.
147    /// The time-window threshold is handled by `FSyncManager` when the
148    /// caller proceeds to `LogManager::flush_sync()` on a `false` return.
149    fn buffer_commit(&self, _commit_vlsn: i64) -> bool {
150        if !self.enabled.load(Ordering::Relaxed) {
151            return false; // Disabled: caller must always fsync.
152        }
153        // Increment and check threshold.  fetch_add returns the value BEFORE
154        // the increment, so we compare against max_count - 1.
155        let prev = self.pending_count.fetch_add(1, Ordering::AcqRel);
156        if prev + 1 >= self.max_count {
157            // Threshold reached: reset counter and signal caller to fsync.
158            self.pending_count.store(0, Ordering::Release);
159            self.flush_count.fetch_add(1, Ordering::Relaxed);
160            return false; // Caller must call flush_sync().
161        }
162        true // Buffered: caller skips fsync.
163    }
164
165    fn shutdown(&self) {
166        self.enabled.store(false, Ordering::Relaxed);
167    }
168}
169
170// ── GroupCommitReplica ────────────────────────────────────────────────────────
171
172/// Group-commit implementation for the **Replica** role.
173///
174/// Batches acknowledgements during log replay, sending an ACK to the feeder
175/// once a batch of committed transactions has been applied and durably written.
176///
177///
178pub struct GroupCommitReplica {
179    enabled: AtomicBool,
180    interval_ms: u64,
181}
182
183impl GroupCommitReplica {
184    /// Creates a new `GroupCommitReplica`.
185    pub fn new(interval_ms: u64) -> Self {
186        GroupCommitReplica { enabled: AtomicBool::new(true), interval_ms }
187    }
188}
189
190impl Default for GroupCommitReplica {
191    fn default() -> Self {
192        Self::new(DEFAULT_GROUP_COMMIT_INTERVAL_MS)
193    }
194}
195
196impl GroupCommit for GroupCommitReplica {
197    fn is_enabled(&self) -> bool {
198        self.enabled.load(Ordering::Relaxed)
199    }
200
201    fn buffer_commit(&self, _commit_vlsn: i64) -> bool {
202        // On the replica, each committed entry from the feeder is queued.
203        // After the batch window elapses (or the batch fills), an ACK is sent
204        // back. The actual durability is ensured by the fsync that precedes the
205        // ACK.
206        //
207        //   1. Add VLSN to the pending ACK queue.
208        //   2. If a leader exists, piggyback; otherwise become leader and wait
209        //      groupCommitIntervalMs before ACKing the batch.
210        true
211    }
212
213    fn shutdown(&self) {
214        self.enabled.store(false, Ordering::Relaxed);
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221
222    #[test]
223    fn test_master_default_enabled() {
224        let gc = GroupCommitMaster::default();
225        assert!(gc.is_enabled());
226    }
227
228    #[test]
229    fn test_master_disabled_when_max_zero() {
230        let gc = GroupCommitMaster::new(0, 20);
231        assert!(!gc.is_enabled());
232    }
233
234    #[test]
235    fn test_master_buffer_commit_first_is_buffered() {
236        // First commit in a fresh batch is buffered (threshold not yet reached).
237        let gc = GroupCommitMaster::new(3, 20);
238        assert!(gc.buffer_commit(1), "first commit should be buffered");
239        assert_eq!(gc.flush_count(), 0);
240    }
241
242    #[test]
243    fn test_master_threshold_fires_at_max_count() {
244        // With max_count=3: commits 1 and 2 are buffered; commit 3 fires fsync.
245        let gc = GroupCommitMaster::new(3, 20);
246        assert!(gc.buffer_commit(1), "commit 1 should be buffered");
247        assert!(gc.buffer_commit(2), "commit 2 should be buffered");
248        assert!(
249            !gc.buffer_commit(3),
250            "commit 3 must trigger flush (threshold)"
251        );
252        assert_eq!(gc.flush_count(), 1, "exactly one flush should have fired");
253    }
254
255    #[test]
256    fn test_master_threshold_resets_after_flush() {
257        // After threshold fires, the counter resets and the cycle repeats.
258        let gc = GroupCommitMaster::new(3, 20);
259        assert!(gc.buffer_commit(1));
260        assert!(gc.buffer_commit(2));
261        assert!(!gc.buffer_commit(3)); // flush #1
262        // Next batch:
263        assert!(gc.buffer_commit(4));
264        assert!(gc.buffer_commit(5));
265        assert!(!gc.buffer_commit(6)); // flush #2
266        assert_eq!(gc.flush_count(), 2);
267    }
268
269    #[test]
270    fn test_master_disabled_always_flushes() {
271        // When max_count=0, group commit is disabled: every commit requires fsync.
272        let gc = GroupCommitMaster::new(0, 20);
273        assert!(!gc.is_enabled());
274        assert!(
275            !gc.buffer_commit(1),
276            "disabled GC must return false (always flush)"
277        );
278        assert!(!gc.buffer_commit(2));
279    }
280
281    #[test]
282    fn test_master_shutdown() {
283        let gc = GroupCommitMaster::default();
284        gc.shutdown();
285        assert!(!gc.is_enabled());
286        // After shutdown, buffer_commit must return false (always flush).
287        assert!(!gc.buffer_commit(99), "post-shutdown must return false");
288    }
289
290    #[test]
291    fn test_master_interval_ms_accessible() {
292        let gc = GroupCommitMaster::new(10, 50);
293        assert_eq!(gc.interval_ms(), 50);
294    }
295
296    #[test]
297    fn test_replica_default_enabled() {
298        let gc = GroupCommitReplica::default();
299        assert!(gc.is_enabled());
300    }
301
302    #[test]
303    fn test_replica_buffer_commit() {
304        let gc = GroupCommitReplica::default();
305        assert!(gc.buffer_commit(10));
306    }
307}