use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
pub const DEFAULT_MAX_GROUP_COMMIT: usize = 20;
pub const DEFAULT_GROUP_COMMIT_INTERVAL_MS: u64 = 20;
pub trait GroupCommit: Send + Sync {
fn is_enabled(&self) -> bool;
fn buffer_commit(&self, commit_vlsn: i64) -> bool;
fn shutdown(&self);
}
pub struct GroupCommitMaster {
enabled: AtomicBool,
max_count: usize,
interval_ms: u64,
pending_count: AtomicUsize,
flush_count: AtomicUsize,
}
impl GroupCommitMaster {
pub fn new(max_count: usize, interval_ms: u64) -> Self {
GroupCommitMaster {
enabled: AtomicBool::new(max_count > 0),
max_count,
interval_ms,
pending_count: AtomicUsize::new(0),
flush_count: AtomicUsize::new(0),
}
}
pub fn interval_ms(&self) -> u64 {
self.interval_ms
}
pub fn flush_count(&self) -> usize {
self.flush_count.load(Ordering::Relaxed)
}
}
impl Default for GroupCommitMaster {
fn default() -> Self {
Self::new(DEFAULT_MAX_GROUP_COMMIT, DEFAULT_GROUP_COMMIT_INTERVAL_MS)
}
}
impl GroupCommit for GroupCommitMaster {
fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Relaxed)
}
fn buffer_commit(&self, _commit_vlsn: i64) -> bool {
if !self.enabled.load(Ordering::Relaxed) {
return false; }
let prev = self.pending_count.fetch_add(1, Ordering::AcqRel);
if prev + 1 >= self.max_count {
self.pending_count.store(0, Ordering::Release);
self.flush_count.fetch_add(1, Ordering::Relaxed);
return false; }
true }
fn shutdown(&self) {
self.enabled.store(false, Ordering::Relaxed);
}
}
pub struct GroupCommitReplica {
enabled: AtomicBool,
interval_ms: u64,
}
impl GroupCommitReplica {
pub fn new(interval_ms: u64) -> Self {
GroupCommitReplica { enabled: AtomicBool::new(true), interval_ms }
}
}
impl Default for GroupCommitReplica {
fn default() -> Self {
Self::new(DEFAULT_GROUP_COMMIT_INTERVAL_MS)
}
}
impl GroupCommit for GroupCommitReplica {
fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Relaxed)
}
fn buffer_commit(&self, _commit_vlsn: i64) -> bool {
true
}
fn shutdown(&self) {
self.enabled.store(false, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_master_default_enabled() {
let gc = GroupCommitMaster::default();
assert!(gc.is_enabled());
}
#[test]
fn test_master_disabled_when_max_zero() {
let gc = GroupCommitMaster::new(0, 20);
assert!(!gc.is_enabled());
}
#[test]
fn test_master_buffer_commit_first_is_buffered() {
let gc = GroupCommitMaster::new(3, 20);
assert!(gc.buffer_commit(1), "first commit should be buffered");
assert_eq!(gc.flush_count(), 0);
}
#[test]
fn test_master_threshold_fires_at_max_count() {
let gc = GroupCommitMaster::new(3, 20);
assert!(gc.buffer_commit(1), "commit 1 should be buffered");
assert!(gc.buffer_commit(2), "commit 2 should be buffered");
assert!(
!gc.buffer_commit(3),
"commit 3 must trigger flush (threshold)"
);
assert_eq!(gc.flush_count(), 1, "exactly one flush should have fired");
}
#[test]
fn test_master_threshold_resets_after_flush() {
let gc = GroupCommitMaster::new(3, 20);
assert!(gc.buffer_commit(1));
assert!(gc.buffer_commit(2));
assert!(!gc.buffer_commit(3)); assert!(gc.buffer_commit(4));
assert!(gc.buffer_commit(5));
assert!(!gc.buffer_commit(6)); assert_eq!(gc.flush_count(), 2);
}
#[test]
fn test_master_disabled_always_flushes() {
let gc = GroupCommitMaster::new(0, 20);
assert!(!gc.is_enabled());
assert!(
!gc.buffer_commit(1),
"disabled GC must return false (always flush)"
);
assert!(!gc.buffer_commit(2));
}
#[test]
fn test_master_shutdown() {
let gc = GroupCommitMaster::default();
gc.shutdown();
assert!(!gc.is_enabled());
assert!(!gc.buffer_commit(99), "post-shutdown must return false");
}
#[test]
fn test_master_interval_ms_accessible() {
let gc = GroupCommitMaster::new(10, 50);
assert_eq!(gc.interval_ms(), 50);
}
#[test]
fn test_replica_default_enabled() {
let gc = GroupCommitReplica::default();
assert!(gc.is_enabled());
}
#[test]
fn test_replica_buffer_commit() {
let gc = GroupCommitReplica::default();
assert!(gc.buffer_commit(10));
}
}