1use std::collections::VecDeque;
54use std::sync::atomic::{AtomicU64, Ordering};
55use std::sync::{Arc, Condvar, Mutex};
56use std::thread::JoinHandle;
57use std::time::{Duration, Instant};
58
59pub struct PendingCommitV2 {
61 pub txn_id: u64,
63 pub enqueue_time: Instant,
65 pub notifier: Arc<(Mutex<CommitResult>, Condvar)>,
67}
68
69#[derive(Debug, Clone)]
71pub enum CommitResult {
72 Pending,
74 Success(u64),
76 Error(String),
78}
79
80#[allow(dead_code)]
82pub struct EventDrivenGroupCommit {
83 pending: Mutex<VecDeque<PendingCommitV2>>,
85 commit_available: Condvar,
87 config: GroupCommitConfig,
89 metrics: GroupCommitMetrics,
91 #[allow(clippy::type_complexity)]
93 flush_fn: Arc<dyn Fn(&[u64]) -> Result<u64, String> + Send + Sync>,
94 running: AtomicU64, flush_thread: Mutex<Option<JoinHandle<()>>>,
98}
99
100#[derive(Clone)]
102pub struct GroupCommitConfig {
103 pub min_batch_size: usize,
105 pub max_batch_size: usize,
107 pub max_wait_us: u64,
109 pub fsync_latency_us: u64,
111 pub ema_alpha: f64,
113}
114
115impl Default for GroupCommitConfig {
116 fn default() -> Self {
117 Self {
118 min_batch_size: 1,
119 max_batch_size: 1000,
120 max_wait_us: 10_000, fsync_latency_us: 5_000, ema_alpha: 0.1,
123 }
124 }
125}
126
127pub struct GroupCommitMetrics {
129 pub adaptive_batch_size: AtomicU64,
131 pub arrival_rate_ema: AtomicU64,
133 pub fsync_latency_us: AtomicU64,
135 pub total_commits: AtomicU64,
137 pub total_batches: AtomicU64,
139 pub total_fsync_time_us: AtomicU64,
141 pub last_arrival_us: AtomicU64,
143}
144
145impl Default for GroupCommitMetrics {
146 fn default() -> Self {
147 Self {
148 adaptive_batch_size: AtomicU64::new(10),
149 arrival_rate_ema: AtomicU64::new(100_000), fsync_latency_us: AtomicU64::new(5_000),
151 total_commits: AtomicU64::new(0),
152 total_batches: AtomicU64::new(0),
153 total_fsync_time_us: AtomicU64::new(0),
154 last_arrival_us: AtomicU64::new(0),
155 }
156 }
157}
158
159impl EventDrivenGroupCommit {
160 pub fn new<F>(flush_fn: F) -> Self
165 where
166 F: Fn(&[u64]) -> Result<u64, String> + Send + Sync + 'static,
167 {
168 Self::with_config(flush_fn, GroupCommitConfig::default())
169 }
170
171 pub fn with_config<F>(flush_fn: F, config: GroupCommitConfig) -> Self
173 where
174 F: Fn(&[u64]) -> Result<u64, String> + Send + Sync + 'static,
175 {
176 let gc = Self {
177 pending: Mutex::new(VecDeque::new()),
178 commit_available: Condvar::new(),
179 config,
180 metrics: GroupCommitMetrics::default(),
181 flush_fn: Arc::new(flush_fn),
182 running: AtomicU64::new(0),
183 flush_thread: Mutex::new(None),
184 };
185
186 gc.metrics
187 .fsync_latency_us
188 .store(gc.config.fsync_latency_us, Ordering::Relaxed);
189 gc
190 }
191
192 pub fn start(&self) -> Result<(), String> {
194 if self
195 .running
196 .compare_exchange(0, 1, Ordering::SeqCst, Ordering::Relaxed)
197 .is_err()
198 {
199 return Err("Already running".into());
200 }
201
202 Ok(())
206 }
207
208 pub fn stop(&self) {
210 self.running.store(0, Ordering::SeqCst);
211
212 let _lock = self.pending.lock().unwrap();
214 self.commit_available.notify_all();
215 }
216
217 pub fn is_running(&self) -> bool {
219 self.running.load(Ordering::SeqCst) == 1
220 }
221
222 pub fn submit_and_wait(&self, txn_id: u64) -> Result<u64, String> {
227 self.update_arrival_rate();
229
230 let notifier = Arc::new((Mutex::new(CommitResult::Pending), Condvar::new()));
232 let commit = PendingCommitV2 {
233 txn_id,
234 enqueue_time: Instant::now(),
235 notifier: notifier.clone(),
236 };
237
238 let should_flush = {
240 let mut pending = self.pending.lock().unwrap();
241 pending.push_back(commit);
242
243 let batch_size = self.optimal_batch_size();
244 pending.len() >= batch_size
245 };
246
247 self.commit_available.notify_one();
249
250 if should_flush && !self.is_running() {
252 self.flush_batch();
253 }
254
255 let (lock, cvar) = &*notifier;
257 let mut result = lock.lock().unwrap();
258
259 while matches!(*result, CommitResult::Pending) {
260 let timeout = Duration::from_micros(self.config.max_wait_us * 2);
262 let (new_result, timeout_result) = cvar.wait_timeout(result, timeout).unwrap();
263 result = new_result;
264
265 if timeout_result.timed_out() {
266 if !self.is_running() {
268 drop(result);
269 self.flush_batch();
270 result = lock.lock().unwrap();
271 }
272 }
273 }
274
275 match &*result {
276 CommitResult::Success(ts) => Ok(*ts),
277 CommitResult::Error(e) => Err(e.clone()),
278 CommitResult::Pending => Err("Unexpected pending state".into()),
279 }
280 }
281
282 pub fn flush_batch(&self) {
286 let batch = {
287 let mut pending = self.pending.lock().unwrap();
288 if pending.is_empty() {
289 return;
290 }
291
292 let batch_size = self.optimal_batch_size().min(pending.len());
293 pending.drain(..batch_size).collect::<Vec<_>>()
294 };
295
296 if batch.is_empty() {
297 return;
298 }
299
300 let txn_ids: Vec<_> = batch.iter().map(|c| c.txn_id).collect();
301 let batch_size = batch.len();
302
303 let start = Instant::now();
305 let result = (self.flush_fn)(&txn_ids);
306 let elapsed_us = start.elapsed().as_micros() as u64;
307
308 self.update_fsync_latency(elapsed_us);
310 self.metrics.total_batches.fetch_add(1, Ordering::Relaxed);
311 self.metrics
312 .total_commits
313 .fetch_add(batch_size as u64, Ordering::Relaxed);
314 self.metrics
315 .total_fsync_time_us
316 .fetch_add(elapsed_us, Ordering::Relaxed);
317
318 for commit in batch {
320 let (lock, cvar) = &*commit.notifier;
321 let mut result_lock = lock.lock().unwrap();
322 *result_lock = match &result {
323 Ok(ts) => CommitResult::Success(*ts),
324 Err(e) => CommitResult::Error(e.clone()),
325 };
326 cvar.notify_one();
327 }
328 }
329
330 pub fn flush_loop(&self) {
332 while self.is_running() {
333 let should_flush = {
334 let pending = self.pending.lock().unwrap();
335 let batch_size = self.optimal_batch_size();
336
337 if pending.len() >= batch_size {
338 true
339 } else if pending.is_empty() {
340 let _pending = self
342 .commit_available
343 .wait_timeout(pending, Duration::from_micros(self.config.max_wait_us))
344 .unwrap()
345 .0;
346 false
347 } else {
348 let oldest = pending
350 .front()
351 .map(|c| c.enqueue_time.elapsed().as_micros() as u64)
352 .unwrap_or(0);
353
354 if oldest > self.config.max_wait_us {
355 true
356 } else {
357 let remaining =
359 Duration::from_micros(self.config.max_wait_us.saturating_sub(oldest));
360 let _pending = self
361 .commit_available
362 .wait_timeout(pending, remaining)
363 .unwrap()
364 .0;
365 true }
367 }
368 };
369
370 if should_flush {
371 self.flush_batch();
372 }
373 }
374 }
375
376 fn optimal_batch_size(&self) -> usize {
380 let lambda = self.metrics.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0;
381 let l_fsync = self.metrics.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0;
382 let c_wait = 1.0; let n_opt = (2.0 * l_fsync * lambda / c_wait).sqrt();
385 let batch_size = (n_opt as usize)
386 .max(self.config.min_batch_size)
387 .min(self.config.max_batch_size);
388
389 self.metrics
390 .adaptive_batch_size
391 .store(batch_size as u64, Ordering::Relaxed);
392 batch_size
393 }
394
395 fn update_arrival_rate(&self) {
397 let now_us = Self::now_us();
398 let last = self.metrics.last_arrival_us.swap(now_us, Ordering::Relaxed);
399
400 if last > 0 {
401 let delta_us = now_us.saturating_sub(last);
402 if delta_us > 0 {
403 let instant_rate = 1_000_000_000 / delta_us;
406
407 let old_rate = self.metrics.arrival_rate_ema.load(Ordering::Relaxed);
408 let alpha = (self.config.ema_alpha * 1000.0) as u64;
409 let new_rate = (old_rate * (1000 - alpha) + instant_rate * alpha) / 1000;
410 self.metrics
411 .arrival_rate_ema
412 .store(new_rate, Ordering::Relaxed);
413 }
414 }
415 }
416
417 fn update_fsync_latency(&self, latency_us: u64) {
419 let old = self.metrics.fsync_latency_us.load(Ordering::Relaxed);
420 let alpha = (self.config.ema_alpha * 1000.0) as u64;
421 let new = (old * (1000 - alpha) + latency_us * alpha) / 1000;
422 self.metrics.fsync_latency_us.store(new, Ordering::Relaxed);
423 }
424
425 fn now_us() -> u64 {
427 use std::time::{SystemTime, UNIX_EPOCH};
428 SystemTime::now()
429 .duration_since(UNIX_EPOCH)
430 .unwrap()
431 .as_micros() as u64
432 }
433
434 pub fn stats(&self) -> GroupCommitStatsV2 {
436 GroupCommitStatsV2 {
437 adaptive_batch_size: self.metrics.adaptive_batch_size.load(Ordering::Relaxed) as usize,
438 arrival_rate: self.metrics.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0,
439 fsync_latency_us: self.metrics.fsync_latency_us.load(Ordering::Relaxed),
440 pending_count: self.pending.lock().unwrap().len(),
441 total_commits: self.metrics.total_commits.load(Ordering::Relaxed),
442 total_batches: self.metrics.total_batches.load(Ordering::Relaxed),
443 avg_batch_size: {
444 let batches = self.metrics.total_batches.load(Ordering::Relaxed);
445 let commits = self.metrics.total_commits.load(Ordering::Relaxed);
446 if batches > 0 {
447 commits as f64 / batches as f64
448 } else {
449 0.0
450 }
451 },
452 avg_fsync_time_us: {
453 let batches = self.metrics.total_batches.load(Ordering::Relaxed);
454 let time = self.metrics.total_fsync_time_us.load(Ordering::Relaxed);
455 if batches > 0 { time / batches } else { 0 }
456 },
457 }
458 }
459}
460
461#[derive(Debug, Clone)]
463pub struct GroupCommitStatsV2 {
464 pub adaptive_batch_size: usize,
466 pub arrival_rate: f64,
468 pub fsync_latency_us: u64,
470 pub pending_count: usize,
472 pub total_commits: u64,
474 pub total_batches: u64,
476 pub avg_batch_size: f64,
478 pub avg_fsync_time_us: u64,
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485 use std::sync::atomic::AtomicU64;
486
487 #[test]
488 fn test_single_commit() {
489 let commit_ts = AtomicU64::new(100);
490 let gc = EventDrivenGroupCommit::new(move |_txn_ids| {
491 Ok(commit_ts.fetch_add(1, Ordering::SeqCst))
492 });
493
494 let result = gc.submit_and_wait(1);
495 assert!(result.is_ok());
496 assert!(result.unwrap() >= 100);
497 }
498
499 #[test]
500 fn test_batch_commit() {
501 use parking_lot::RwLock;
502 use std::sync::Arc;
503 use std::thread;
504
505 let _commit_ts = Arc::new(AtomicU64::new(100));
506 let batch_sizes = Arc::new(RwLock::new(Vec::new()));
507 let batch_sizes_clone = batch_sizes.clone();
508
509 let gc = Arc::new(EventDrivenGroupCommit::with_config(
510 move |txn_ids| {
511 batch_sizes_clone.write().push(txn_ids.len());
512 Ok(100)
513 },
514 GroupCommitConfig {
515 min_batch_size: 3,
516 max_wait_us: 1_000_000, ..Default::default()
518 },
519 ));
520
521 let mut handles = vec![];
523 for i in 0..3 {
524 let gc = Arc::clone(&gc);
525 handles.push(thread::spawn(move || gc.submit_and_wait(i)));
526 }
527
528 for h in handles {
530 assert!(h.join().unwrap().is_ok());
531 }
532
533 let sizes = batch_sizes.read();
535 assert!(!sizes.is_empty());
536 let total: usize = sizes.iter().sum();
538 assert_eq!(total, 3);
539 }
540
541 #[test]
542 fn test_adaptive_sizing() {
543 let gc = EventDrivenGroupCommit::with_config(
544 |_| Ok(1),
545 GroupCommitConfig {
546 fsync_latency_us: 5000, ..Default::default()
548 },
549 );
550
551 gc.metrics
553 .arrival_rate_ema
554 .store(1_000_000, Ordering::Relaxed);
555
556 let batch_size = gc.optimal_batch_size();
557
558 assert!((3..=10).contains(&batch_size));
560 }
561
562 #[test]
563 fn test_stats() {
564 let gc = EventDrivenGroupCommit::new(|_| Ok(1));
565
566 gc.metrics.total_commits.store(100, Ordering::Relaxed);
567 gc.metrics.total_batches.store(10, Ordering::Relaxed);
568 gc.metrics
569 .total_fsync_time_us
570 .store(50_000, Ordering::Relaxed);
571
572 let stats = gc.stats();
573 assert_eq!(stats.total_commits, 100);
574 assert_eq!(stats.total_batches, 10);
575 assert_eq!(stats.avg_batch_size, 10.0);
576 assert_eq!(stats.avg_fsync_time_us, 5000);
577 }
578}