1use std::collections::VecDeque;
51use std::sync::atomic::{AtomicU64, Ordering};
52use std::sync::{Arc, Condvar, Mutex};
53use std::thread::JoinHandle;
54use std::time::{Duration, Instant};
55
56pub struct PendingCommitV2 {
58 pub txn_id: u64,
60 pub enqueue_time: Instant,
62 pub notifier: Arc<(Mutex<CommitResult>, Condvar)>,
64}
65
66#[derive(Debug, Clone)]
68pub enum CommitResult {
69 Pending,
71 Success(u64),
73 Error(String),
75}
76
77#[allow(dead_code)]
79pub struct EventDrivenGroupCommit {
80 pending: Mutex<VecDeque<PendingCommitV2>>,
82 commit_available: Condvar,
84 config: GroupCommitConfig,
86 metrics: GroupCommitMetrics,
88 #[allow(clippy::type_complexity)]
90 flush_fn: Arc<dyn Fn(&[u64]) -> Result<u64, String> + Send + Sync>,
91 running: AtomicU64, flush_thread: Mutex<Option<JoinHandle<()>>>,
95}
96
97#[derive(Clone)]
99pub struct GroupCommitConfig {
100 pub min_batch_size: usize,
102 pub max_batch_size: usize,
104 pub max_wait_us: u64,
106 pub fsync_latency_us: u64,
108 pub ema_alpha: f64,
110}
111
112impl Default for GroupCommitConfig {
113 fn default() -> Self {
114 Self {
115 min_batch_size: 1,
116 max_batch_size: 1000,
117 max_wait_us: 10_000, fsync_latency_us: 5_000, ema_alpha: 0.1,
120 }
121 }
122}
123
124pub struct GroupCommitMetrics {
126 pub adaptive_batch_size: AtomicU64,
128 pub arrival_rate_ema: AtomicU64,
130 pub fsync_latency_us: AtomicU64,
132 pub total_commits: AtomicU64,
134 pub total_batches: AtomicU64,
136 pub total_fsync_time_us: AtomicU64,
138 pub last_arrival_us: AtomicU64,
140}
141
142impl Default for GroupCommitMetrics {
143 fn default() -> Self {
144 Self {
145 adaptive_batch_size: AtomicU64::new(10),
146 arrival_rate_ema: AtomicU64::new(100_000), fsync_latency_us: AtomicU64::new(5_000),
148 total_commits: AtomicU64::new(0),
149 total_batches: AtomicU64::new(0),
150 total_fsync_time_us: AtomicU64::new(0),
151 last_arrival_us: AtomicU64::new(0),
152 }
153 }
154}
155
156impl EventDrivenGroupCommit {
157 pub fn new<F>(flush_fn: F) -> Self
162 where
163 F: Fn(&[u64]) -> Result<u64, String> + Send + Sync + 'static,
164 {
165 Self::with_config(flush_fn, GroupCommitConfig::default())
166 }
167
168 pub fn with_config<F>(flush_fn: F, config: GroupCommitConfig) -> Self
170 where
171 F: Fn(&[u64]) -> Result<u64, String> + Send + Sync + 'static,
172 {
173 let gc = Self {
174 pending: Mutex::new(VecDeque::new()),
175 commit_available: Condvar::new(),
176 config,
177 metrics: GroupCommitMetrics::default(),
178 flush_fn: Arc::new(flush_fn),
179 running: AtomicU64::new(0),
180 flush_thread: Mutex::new(None),
181 };
182
183 gc.metrics
184 .fsync_latency_us
185 .store(gc.config.fsync_latency_us, Ordering::Relaxed);
186 gc
187 }
188
189 pub fn start(&self) -> Result<(), String> {
191 if self
192 .running
193 .compare_exchange(0, 1, Ordering::SeqCst, Ordering::Relaxed)
194 .is_err()
195 {
196 return Err("Already running".into());
197 }
198
199 Ok(())
203 }
204
205 pub fn stop(&self) {
207 self.running.store(0, Ordering::SeqCst);
208
209 let _lock = self.pending.lock().unwrap();
211 self.commit_available.notify_all();
212 }
213
214 pub fn is_running(&self) -> bool {
216 self.running.load(Ordering::SeqCst) == 1
217 }
218
219 pub fn submit_and_wait(&self, txn_id: u64) -> Result<u64, String> {
224 self.update_arrival_rate();
226
227 let notifier = Arc::new((Mutex::new(CommitResult::Pending), Condvar::new()));
229 let commit = PendingCommitV2 {
230 txn_id,
231 enqueue_time: Instant::now(),
232 notifier: notifier.clone(),
233 };
234
235 let should_flush = {
237 let mut pending = self.pending.lock().unwrap();
238 pending.push_back(commit);
239
240 let batch_size = self.optimal_batch_size();
241 pending.len() >= batch_size
242 };
243
244 self.commit_available.notify_one();
246
247 if should_flush && !self.is_running() {
249 self.flush_batch();
250 }
251
252 let (lock, cvar) = &*notifier;
254 let mut result = lock.lock().unwrap();
255
256 while matches!(*result, CommitResult::Pending) {
257 let timeout = Duration::from_micros(self.config.max_wait_us * 2);
259 let (new_result, timeout_result) = cvar.wait_timeout(result, timeout).unwrap();
260 result = new_result;
261
262 if timeout_result.timed_out() {
263 if !self.is_running() {
265 drop(result);
266 self.flush_batch();
267 result = lock.lock().unwrap();
268 }
269 }
270 }
271
272 match &*result {
273 CommitResult::Success(ts) => Ok(*ts),
274 CommitResult::Error(e) => Err(e.clone()),
275 CommitResult::Pending => Err("Unexpected pending state".into()),
276 }
277 }
278
279 pub fn flush_batch(&self) {
283 let batch = {
284 let mut pending = self.pending.lock().unwrap();
285 if pending.is_empty() {
286 return;
287 }
288
289 let batch_size = self.optimal_batch_size().min(pending.len());
290 pending.drain(..batch_size).collect::<Vec<_>>()
291 };
292
293 if batch.is_empty() {
294 return;
295 }
296
297 let txn_ids: Vec<_> = batch.iter().map(|c| c.txn_id).collect();
298 let batch_size = batch.len();
299
300 let start = Instant::now();
302 let result = (self.flush_fn)(&txn_ids);
303 let elapsed_us = start.elapsed().as_micros() as u64;
304
305 self.update_fsync_latency(elapsed_us);
307 self.metrics.total_batches.fetch_add(1, Ordering::Relaxed);
308 self.metrics
309 .total_commits
310 .fetch_add(batch_size as u64, Ordering::Relaxed);
311 self.metrics
312 .total_fsync_time_us
313 .fetch_add(elapsed_us, Ordering::Relaxed);
314
315 for commit in batch {
317 let (lock, cvar) = &*commit.notifier;
318 let mut result_lock = lock.lock().unwrap();
319 *result_lock = match &result {
320 Ok(ts) => CommitResult::Success(*ts),
321 Err(e) => CommitResult::Error(e.clone()),
322 };
323 cvar.notify_one();
324 }
325 }
326
327 pub fn flush_loop(&self) {
329 while self.is_running() {
330 let should_flush = {
331 let pending = self.pending.lock().unwrap();
332 let batch_size = self.optimal_batch_size();
333
334 if pending.len() >= batch_size {
335 true
336 } else if pending.is_empty() {
337 let _pending = self
339 .commit_available
340 .wait_timeout(pending, Duration::from_micros(self.config.max_wait_us))
341 .unwrap()
342 .0;
343 false
344 } else {
345 let oldest = pending
347 .front()
348 .map(|c| c.enqueue_time.elapsed().as_micros() as u64)
349 .unwrap_or(0);
350
351 if oldest > self.config.max_wait_us {
352 true
353 } else {
354 let remaining =
356 Duration::from_micros(self.config.max_wait_us.saturating_sub(oldest));
357 let _pending = self
358 .commit_available
359 .wait_timeout(pending, remaining)
360 .unwrap()
361 .0;
362 true }
364 }
365 };
366
367 if should_flush {
368 self.flush_batch();
369 }
370 }
371 }
372
373 fn optimal_batch_size(&self) -> usize {
377 let lambda = self.metrics.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0;
378 let l_fsync = self.metrics.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0;
379 let c_wait = 1.0; let n_opt = (2.0 * l_fsync * lambda / c_wait).sqrt();
382 let batch_size = (n_opt as usize)
383 .max(self.config.min_batch_size)
384 .min(self.config.max_batch_size);
385
386 self.metrics
387 .adaptive_batch_size
388 .store(batch_size as u64, Ordering::Relaxed);
389 batch_size
390 }
391
392 fn update_arrival_rate(&self) {
394 let now_us = Self::now_us();
395 let last = self.metrics.last_arrival_us.swap(now_us, Ordering::Relaxed);
396
397 if last > 0 {
398 let delta_us = now_us.saturating_sub(last);
399 if delta_us > 0 {
400 let instant_rate = 1_000_000_000 / delta_us;
403
404 let old_rate = self.metrics.arrival_rate_ema.load(Ordering::Relaxed);
405 let alpha = (self.config.ema_alpha * 1000.0) as u64;
406 let new_rate = (old_rate * (1000 - alpha) + instant_rate * alpha) / 1000;
407 self.metrics
408 .arrival_rate_ema
409 .store(new_rate, Ordering::Relaxed);
410 }
411 }
412 }
413
414 fn update_fsync_latency(&self, latency_us: u64) {
416 let old = self.metrics.fsync_latency_us.load(Ordering::Relaxed);
417 let alpha = (self.config.ema_alpha * 1000.0) as u64;
418 let new = (old * (1000 - alpha) + latency_us * alpha) / 1000;
419 self.metrics.fsync_latency_us.store(new, Ordering::Relaxed);
420 }
421
422 fn now_us() -> u64 {
424 use std::time::{SystemTime, UNIX_EPOCH};
425 SystemTime::now()
426 .duration_since(UNIX_EPOCH)
427 .unwrap()
428 .as_micros() as u64
429 }
430
431 pub fn stats(&self) -> GroupCommitStatsV2 {
433 GroupCommitStatsV2 {
434 adaptive_batch_size: self.metrics.adaptive_batch_size.load(Ordering::Relaxed) as usize,
435 arrival_rate: self.metrics.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0,
436 fsync_latency_us: self.metrics.fsync_latency_us.load(Ordering::Relaxed),
437 pending_count: self.pending.lock().unwrap().len(),
438 total_commits: self.metrics.total_commits.load(Ordering::Relaxed),
439 total_batches: self.metrics.total_batches.load(Ordering::Relaxed),
440 avg_batch_size: {
441 let batches = self.metrics.total_batches.load(Ordering::Relaxed);
442 let commits = self.metrics.total_commits.load(Ordering::Relaxed);
443 if batches > 0 {
444 commits as f64 / batches as f64
445 } else {
446 0.0
447 }
448 },
449 avg_fsync_time_us: {
450 let batches = self.metrics.total_batches.load(Ordering::Relaxed);
451 let time = self.metrics.total_fsync_time_us.load(Ordering::Relaxed);
452 if batches > 0 { time / batches } else { 0 }
453 },
454 }
455 }
456}
457
458#[derive(Debug, Clone)]
460pub struct GroupCommitStatsV2 {
461 pub adaptive_batch_size: usize,
463 pub arrival_rate: f64,
465 pub fsync_latency_us: u64,
467 pub pending_count: usize,
469 pub total_commits: u64,
471 pub total_batches: u64,
473 pub avg_batch_size: f64,
475 pub avg_fsync_time_us: u64,
477}
478
479#[cfg(test)]
480mod tests {
481 use super::*;
482 use std::sync::atomic::AtomicU64;
483
484 #[test]
485 fn test_single_commit() {
486 let commit_ts = AtomicU64::new(100);
487 let gc = EventDrivenGroupCommit::new(move |_txn_ids| {
488 Ok(commit_ts.fetch_add(1, Ordering::SeqCst))
489 });
490
491 let result = gc.submit_and_wait(1);
492 assert!(result.is_ok());
493 assert!(result.unwrap() >= 100);
494 }
495
496 #[test]
497 fn test_batch_commit() {
498 use parking_lot::RwLock;
499 use std::sync::Arc;
500 use std::thread;
501
502 let _commit_ts = Arc::new(AtomicU64::new(100));
503 let batch_sizes = Arc::new(RwLock::new(Vec::new()));
504 let batch_sizes_clone = batch_sizes.clone();
505
506 let gc = Arc::new(EventDrivenGroupCommit::with_config(
507 move |txn_ids| {
508 batch_sizes_clone.write().push(txn_ids.len());
509 Ok(100)
510 },
511 GroupCommitConfig {
512 min_batch_size: 3,
513 max_wait_us: 1_000_000, ..Default::default()
515 },
516 ));
517
518 let mut handles = vec![];
520 for i in 0..3 {
521 let gc = Arc::clone(&gc);
522 handles.push(thread::spawn(move || gc.submit_and_wait(i)));
523 }
524
525 for h in handles {
527 assert!(h.join().unwrap().is_ok());
528 }
529
530 let sizes = batch_sizes.read();
532 assert!(!sizes.is_empty());
533 let total: usize = sizes.iter().sum();
535 assert_eq!(total, 3);
536 }
537
538 #[test]
539 fn test_adaptive_sizing() {
540 let gc = EventDrivenGroupCommit::with_config(
541 |_| Ok(1),
542 GroupCommitConfig {
543 fsync_latency_us: 5000, ..Default::default()
545 },
546 );
547
548 gc.metrics
550 .arrival_rate_ema
551 .store(1_000_000, Ordering::Relaxed);
552
553 let batch_size = gc.optimal_batch_size();
554
555 assert!((3..=10).contains(&batch_size));
557 }
558
559 #[test]
560 fn test_stats() {
561 let gc = EventDrivenGroupCommit::new(|_| Ok(1));
562
563 gc.metrics.total_commits.store(100, Ordering::Relaxed);
564 gc.metrics.total_batches.store(10, Ordering::Relaxed);
565 gc.metrics
566 .total_fsync_time_us
567 .store(50_000, Ordering::Relaxed);
568
569 let stats = gc.stats();
570 assert_eq!(stats.total_commits, 100);
571 assert_eq!(stats.total_batches, 10);
572 assert_eq!(stats.avg_batch_size, 10.0);
573 assert_eq!(stats.avg_fsync_time_us, 5000);
574 }
575}