use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crossbeam_channel::{bounded, Receiver, Sender};
use parking_lot::RwLock;
use super::error::PersistentARTrieError;
use super::wal::{AsyncWalWriter, Lsn, WalRecord};
pub type Result<T> = std::result::Result<T, PersistentARTrieError>;
#[derive(Debug, Clone)]
pub struct GroupCommitConfig {
pub max_batch_size: usize,
pub max_batch_delay_us: u64,
pub min_batch_siblings: usize,
pub dedicated_commit_thread: bool,
pub adaptive_batching: bool,
pub adaptive_latency_target_us: u64,
pub pipelined_sync: bool,
}
impl Default for GroupCommitConfig {
fn default() -> Self {
Self {
max_batch_size: 100,
max_batch_delay_us: 10_000,
min_batch_siblings: 2,
dedicated_commit_thread: true,
adaptive_batching: true,
adaptive_latency_target_us: 10_000,
pipelined_sync: false,
}
}
}
impl GroupCommitConfig {
pub fn low_latency() -> Self {
Self {
max_batch_size: 10,
max_batch_delay_us: 1_000,
min_batch_siblings: 1,
dedicated_commit_thread: true,
adaptive_batching: false,
adaptive_latency_target_us: 2_000,
pipelined_sync: false,
}
}
pub fn high_throughput() -> Self {
Self {
max_batch_size: 1000,
max_batch_delay_us: 50_000,
min_batch_siblings: 5,
dedicated_commit_thread: true,
adaptive_batching: true,
adaptive_latency_target_us: 50_000,
pipelined_sync: true,
}
}
pub fn nvme_optimized() -> Self {
Self {
max_batch_size: 50,
max_batch_delay_us: 500,
min_batch_siblings: 2,
dedicated_commit_thread: true,
adaptive_batching: true,
adaptive_latency_target_us: 1_000,
pipelined_sync: false,
}
}
}
#[derive(Debug, Default, Clone)]
pub struct GroupCommitStats {
pub records_committed: u64,
pub fsync_count: u64,
pub avg_batch_size: f64,
pub latency_p50_us: u64,
pub latency_p99_us: u64,
pub bytes_written: u64,
pub queue_depth: usize,
}
impl GroupCommitStats {
pub fn batching_efficiency(&self) -> f64 {
if self.fsync_count == 0 {
0.0
} else {
self.records_committed as f64 / self.fsync_count as f64
}
}
}
struct PendingWrite {
record: WalRecord,
lsn: Lsn,
response_tx: OneshotSender<Result<Lsn>>,
#[allow(dead_code)]
submitted_at: Instant,
serialized_size: usize,
}
struct AdaptiveController {
current_delay_us: AtomicU64,
current_batch_size: AtomicUsize,
latency_samples: Mutex<VecDeque<u64>>,
target_latency_us: u64,
min_delay_us: u64,
max_delay_us: u64,
}
impl AdaptiveController {
fn new(config: &GroupCommitConfig) -> Self {
Self {
current_delay_us: AtomicU64::new(config.max_batch_delay_us),
current_batch_size: AtomicUsize::new(config.max_batch_size),
latency_samples: Mutex::new(VecDeque::with_capacity(1000)),
target_latency_us: config.adaptive_latency_target_us,
min_delay_us: 100, max_delay_us: 100_000, }
}
fn record_latency(&self, latency_us: u64) {
let mut samples = self.latency_samples.lock().expect("lock poisoned");
if samples.len() >= 1000 {
samples.pop_front();
}
samples.push_back(latency_us);
}
fn adjust(&self) {
let samples = self.latency_samples.lock().expect("lock poisoned");
if samples.len() < 100 {
return; }
let mut sorted: Vec<_> = samples.iter().copied().collect();
sorted.sort_unstable();
let p99_idx = (sorted.len() * 99) / 100;
let current_p99 = sorted[p99_idx];
let current_delay = self.current_delay_us.load(Ordering::Relaxed);
if current_p99 > self.target_latency_us {
let new_delay = (current_delay / 2).max(self.min_delay_us);
self.current_delay_us.store(new_delay, Ordering::Relaxed);
} else if current_p99 < self.target_latency_us / 2 {
let new_delay = (current_delay + 1000).min(self.max_delay_us);
self.current_delay_us.store(new_delay, Ordering::Relaxed);
}
}
fn get_current_delay_us(&self) -> u64 {
self.current_delay_us.load(Ordering::Relaxed)
}
fn get_current_batch_size(&self) -> usize {
self.current_batch_size.load(Ordering::Relaxed)
}
}
pub struct GroupCommitCoordinator {
wal: Arc<AsyncWalWriter>,
#[allow(dead_code)]
config: GroupCommitConfig,
submit_tx: Sender<PendingWrite>,
submit_order: Mutex<()>,
commit_thread: Option<JoinHandle<()>>,
shutdown: Arc<AtomicBool>,
synced_lsn: Arc<AtomicU64>,
adaptive: Option<Arc<AdaptiveController>>,
stats: Arc<RwLock<GroupCommitStats>>,
}
impl GroupCommitCoordinator {
pub fn new(wal: Arc<AsyncWalWriter>, config: GroupCommitConfig) -> Result<Self> {
let (submit_tx, submit_rx) = bounded(config.max_batch_size * 4);
let shutdown = Arc::new(AtomicBool::new(false));
let synced_lsn = Arc::new(AtomicU64::new(0));
let stats = Arc::new(RwLock::new(GroupCommitStats::default()));
let adaptive = if config.adaptive_batching {
Some(Arc::new(AdaptiveController::new(&config)))
} else {
None
};
let commit_thread = if config.dedicated_commit_thread {
let wal_clone = Arc::clone(&wal);
let shutdown_clone = Arc::clone(&shutdown);
let synced_lsn_clone = Arc::clone(&synced_lsn);
let stats_clone = Arc::clone(&stats);
let config_clone = config.clone();
let adaptive_clone = adaptive.clone();
Some(
thread::Builder::new()
.name("artrie-group-commit".to_string())
.spawn(move || {
Self::commit_loop(
submit_rx,
wal_clone,
shutdown_clone,
synced_lsn_clone,
stats_clone,
config_clone,
adaptive_clone,
);
})
.expect("failed to spawn commit thread"),
)
} else {
None
};
Ok(Self {
wal,
config,
submit_tx,
submit_order: Mutex::new(()),
commit_thread,
shutdown,
synced_lsn,
adaptive,
stats,
})
}
pub fn append_with_sync(&self, record: WalRecord) -> Result<Lsn> {
let submitted_at = Instant::now();
let serialized_size = record.serialized_size();
let (response_tx, response_rx) = oneshot_channel();
{
let _submit_guard = self
.submit_order
.lock()
.expect("group commit submit lock poisoned");
let lsn = self.wal.allocate_lsn();
let pending = PendingWrite {
record,
lsn,
response_tx,
submitted_at,
serialized_size,
};
self.submit_tx
.send(pending)
.map_err(|_| PersistentARTrieError::GroupCommitChannelClosed)?;
}
let result = response_rx
.recv()
.map_err(|_| PersistentARTrieError::GroupCommitChannelClosed)??;
if let Some(ref adaptive) = self.adaptive {
let latency_us = submitted_at.elapsed().as_micros() as u64;
adaptive.record_latency(latency_us);
}
Ok(result)
}
pub fn append_async(&self, record: WalRecord) -> Result<Lsn> {
let (response_tx, _response_rx) = oneshot_channel();
let _submit_guard = self
.submit_order
.lock()
.expect("group commit submit lock poisoned");
let lsn = self.wal.allocate_lsn();
let pending = PendingWrite {
record,
lsn,
response_tx,
submitted_at: Instant::now(),
serialized_size: 0, };
self.submit_tx
.send(pending)
.map_err(|_| PersistentARTrieError::GroupCommitChannelClosed)?;
Ok(lsn)
}
pub fn wait_for_lsn(&self, target_lsn: Lsn) {
while self.synced_lsn.load(Ordering::Acquire) < target_lsn {
std::hint::spin_loop();
thread::yield_now();
}
}
pub fn flush(&self) {
let current_lsn = self.wal.current_lsn().saturating_sub(1);
if current_lsn > 0 {
self.wait_for_lsn(current_lsn);
}
}
pub fn stats(&self) -> GroupCommitStats {
self.stats.read().clone()
}
pub fn synced_lsn(&self) -> Lsn {
self.synced_lsn.load(Ordering::Acquire)
}
fn commit_loop(
submit_rx: Receiver<PendingWrite>,
wal: Arc<AsyncWalWriter>,
shutdown: Arc<AtomicBool>,
synced_lsn: Arc<AtomicU64>,
stats: Arc<RwLock<GroupCommitStats>>,
config: GroupCommitConfig,
adaptive: Option<Arc<AdaptiveController>>,
) {
let mut batch: Vec<PendingWrite> = Vec::with_capacity(config.max_batch_size);
let mut accumulated_size: usize = 0;
let mut batch_start = Instant::now();
loop {
if shutdown.load(Ordering::Relaxed) {
if !batch.is_empty() {
Self::flush_batch(&mut batch, &wal, &synced_lsn, &stats);
}
break;
}
let (max_delay_us, max_size) = if let Some(ref adaptive) = adaptive {
(
adaptive.get_current_delay_us(),
adaptive.get_current_batch_size(),
)
} else {
(config.max_batch_delay_us, config.max_batch_size)
};
let elapsed = batch_start.elapsed();
let max_delay = Duration::from_micros(max_delay_us);
let remaining = max_delay.saturating_sub(elapsed);
match submit_rx.recv_timeout(remaining) {
Ok(pending) => {
if batch.is_empty() {
batch_start = Instant::now();
}
accumulated_size += pending.serialized_size;
batch.push(pending);
}
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
}
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
if !batch.is_empty() {
Self::flush_batch(&mut batch, &wal, &synced_lsn, &stats);
}
break;
}
}
let should_flush = !batch.is_empty()
&& (batch.len() >= max_size
|| batch_start.elapsed() >= max_delay
|| accumulated_size >= 1024 * 1024);
if should_flush {
Self::flush_batch(&mut batch, &wal, &synced_lsn, &stats);
accumulated_size = 0;
batch_start = Instant::now();
if let Some(ref adaptive) = adaptive {
adaptive.adjust();
}
}
}
}
fn flush_batch(
batch: &mut Vec<PendingWrite>,
wal: &Arc<AsyncWalWriter>,
synced_lsn: &Arc<AtomicU64>,
stats: &Arc<RwLock<GroupCommitStats>>,
) {
if batch.is_empty() {
return;
}
let batch_size = batch.len();
let mut max_lsn: Lsn = 0;
let mut total_bytes: usize = 0;
let write_result: std::result::Result<(), PersistentARTrieError> = (|| {
for pending in batch.iter() {
wal.append_with_lsn(pending.lsn, pending.record.clone())
.map_err(|e| PersistentARTrieError::Wal(format!("{}", e)))?;
max_lsn = max_lsn.max(pending.lsn);
total_bytes += pending.serialized_size;
}
wal.sync()
.map_err(|e| PersistentARTrieError::Wal(format!("{}", e)))?;
Ok(())
})();
match write_result {
Ok(()) => {
synced_lsn.store(max_lsn, Ordering::Release);
{
let mut stats_guard = stats.write();
stats_guard.records_committed += batch_size as u64;
stats_guard.fsync_count += 1;
stats_guard.bytes_written += total_bytes as u64;
stats_guard.avg_batch_size =
stats_guard.records_committed as f64 / stats_guard.fsync_count as f64;
}
for pending in batch.drain(..) {
let _ = pending.response_tx.send(Ok(pending.lsn));
}
}
Err(e) => {
for pending in batch.drain(..) {
let _ = pending
.response_tx
.send(Err(PersistentARTrieError::Wal(format!("{}", e))));
}
}
}
}
}
impl Drop for GroupCommitCoordinator {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
if let Some(handle) = self.commit_thread.take() {
let _ = handle.join();
}
}
}
fn oneshot_channel<T>() -> (OneshotSender<T>, OneshotReceiver<T>) {
let shared = Arc::new((Mutex::new(None), Condvar::new()));
(
OneshotSender {
shared: Arc::clone(&shared),
},
OneshotReceiver { shared },
)
}
struct OneshotSender<T> {
shared: Arc<(Mutex<Option<T>>, Condvar)>,
}
impl<T> OneshotSender<T> {
fn send(self, value: T) -> std::result::Result<(), T> {
let (lock, cvar) = &*self.shared;
let mut guard = lock.lock().expect("oneshot lock poisoned");
*guard = Some(value);
cvar.notify_one();
Ok(())
}
}
struct OneshotReceiver<T> {
shared: Arc<(Mutex<Option<T>>, Condvar)>,
}
impl<T> OneshotReceiver<T> {
fn recv(self) -> std::result::Result<T, ()> {
let (lock, cvar) = &*self.shared;
let mut guard = lock.lock().expect("oneshot lock poisoned");
while guard.is_none() {
guard = cvar.wait(guard).expect("oneshot condvar poisoned");
}
guard.take().ok_or(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::persistent_artrie_core::wal::{AsyncWalConfig, WalConfig};
use tempfile::tempdir;
#[test]
fn test_group_commit_config_default() {
let config = GroupCommitConfig::default();
assert_eq!(config.max_batch_size, 100);
assert_eq!(config.max_batch_delay_us, 10_000);
assert!(config.dedicated_commit_thread);
assert!(config.adaptive_batching);
}
#[test]
fn test_group_commit_config_presets() {
let low_lat = GroupCommitConfig::low_latency();
assert_eq!(low_lat.max_batch_size, 10);
assert_eq!(low_lat.max_batch_delay_us, 1_000);
let high_tp = GroupCommitConfig::high_throughput();
assert_eq!(high_tp.max_batch_size, 1000);
assert_eq!(high_tp.max_batch_delay_us, 50_000);
let nvme = GroupCommitConfig::nvme_optimized();
assert_eq!(nvme.max_batch_size, 50);
assert_eq!(nvme.max_batch_delay_us, 500);
}
#[test]
fn test_group_commit_stats() {
let mut stats = GroupCommitStats::default();
stats.records_committed = 100;
stats.fsync_count = 10;
assert!((stats.batching_efficiency() - 10.0).abs() < f64::EPSILON);
}
#[test]
fn test_oneshot_channel() {
let (tx, rx) = oneshot_channel::<i32>();
tx.send(42).expect("send failed");
let val = rx.recv().expect("recv failed");
assert_eq!(val, 42);
}
#[test]
fn test_adaptive_controller() {
let config = GroupCommitConfig::default();
let controller = AdaptiveController::new(&config);
for i in 0..200 {
controller.record_latency(i * 100);
}
controller.adjust();
let delay = controller.get_current_delay_us();
assert!(delay <= config.max_batch_delay_us);
}
#[test]
fn test_group_commit_coordinator_basic() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let async_config = AsyncWalConfig::with_pending_dir(dir.path().join("pending"));
let archive_config = WalConfig {
archive_dir: dir.path().join("archive"),
..Default::default()
};
let wal =
AsyncWalWriter::create(&wal_path, async_config, archive_config).expect("create WAL");
let wal = Arc::new(wal);
let config = GroupCommitConfig {
max_batch_size: 10,
max_batch_delay_us: 100_000, dedicated_commit_thread: true,
adaptive_batching: false,
..Default::default()
};
let coordinator =
GroupCommitCoordinator::new(Arc::clone(&wal), config).expect("create coordinator");
for i in 0..5 {
let record = WalRecord::Insert {
term: format!("term{}", i).into_bytes(),
value: None,
};
let lsn = coordinator.append_with_sync(record).expect("append");
assert!(lsn > 0);
}
let stats = coordinator.stats();
assert_eq!(stats.records_committed, 5);
assert!(stats.fsync_count >= 1);
}
#[test]
fn test_group_commit_batching() {
use std::thread;
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let async_config = AsyncWalConfig::with_pending_dir(dir.path().join("pending"));
let archive_config = WalConfig {
archive_dir: dir.path().join("archive"),
..Default::default()
};
let wal =
AsyncWalWriter::create(&wal_path, async_config, archive_config).expect("create WAL");
let wal = Arc::new(wal);
let config = GroupCommitConfig {
max_batch_size: 100,
max_batch_delay_us: 50_000, dedicated_commit_thread: true,
adaptive_batching: false,
..Default::default()
};
let coordinator = Arc::new(
GroupCommitCoordinator::new(Arc::clone(&wal), config).expect("create coordinator"),
);
let num_writers = 4;
let writes_per_writer = 25;
let mut handles = Vec::new();
for writer_id in 0..num_writers {
let coord = Arc::clone(&coordinator);
let handle = thread::spawn(move || {
for i in 0..writes_per_writer {
let record = WalRecord::Insert {
term: format!("writer{}term{}", writer_id, i).into_bytes(),
value: None,
};
coord.append_with_sync(record).expect("append");
}
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("thread join");
}
let stats = coordinator.stats();
assert_eq!(
stats.records_committed,
(num_writers * writes_per_writer) as u64
);
let efficiency = stats.batching_efficiency();
println!(
"Batching efficiency: {:.2} records/fsync ({} records, {} fsyncs)",
efficiency, stats.records_committed, stats.fsync_count
);
}
}