sochdb_storage/
group_commit.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Event-Driven Group Commit Buffer
16//!
17//! This module implements a proper group commit mechanism with:
18//! - Event-driven wait using condition variables (not polling)
19//! - Single fsync per batch with durability guarantee
20//! - Adaptive batch sizing based on Little's Law
21//!
22//! ## Algorithm
23//!
24//! Group Commit Queueing Model:
25//!
26//! Little's Law: N = λ × W
27//!   Where: N = avg number of requests in system
28//!          λ = arrival rate (req/sec)
29//!          W = avg time in system (sec)
30//!
31//! Optimal Batch Size: N* = sqrt(2 × L_fsync × λ / C_wait)
32//!   Where: L_fsync = fsync latency
33//!          C_wait = normalized waiting cost
34//!
35//! Example: L = 5ms, λ = 1000 req/s, C_wait = 1.0
36//!   N* = sqrt(2 × 0.005 × 1000 / 1.0) ≈ 3.16 → 3 commits/batch
37//!
38//! ## Throughput Analysis
39//!
40//! Without group commit:
41//!   Throughput = 1 / L = 200 commits/sec (for L = 5ms)
42//!
43//! With group commit (batch size N):
44//!   Throughput = N / L = N × 200 commits/sec
45//!
46//! For N = 100:
47//!   Throughput = 20,000 commits/sec
48//!   Speedup = 100x
49
50use std::collections::VecDeque;
51use std::sync::atomic::{AtomicU64, Ordering};
52use std::sync::{Arc, Condvar, Mutex};
53use std::thread::JoinHandle;
54use std::time::{Duration, Instant};
55
56/// Pending commit with notification channel
57pub struct PendingCommitV2 {
58    /// Transaction ID
59    pub txn_id: u64,
60    /// Enqueue timestamp
61    pub enqueue_time: Instant,
62    /// Notification channel (oneshot-style via Arc<Condvar>)
63    pub notifier: Arc<(Mutex<CommitResult>, Condvar)>,
64}
65
66/// Result of a commit operation
67#[derive(Debug, Clone)]
68pub enum CommitResult {
69    /// Commit pending (initial state)
70    Pending,
71    /// Commit succeeded with timestamp
72    Success(u64),
73    /// Commit failed with error message
74    Error(String),
75}
76
77/// Event-driven group commit buffer with proper synchronization
78#[allow(dead_code)]
79pub struct EventDrivenGroupCommit {
80    /// Pending commits queue
81    pending: Mutex<VecDeque<PendingCommitV2>>,
82    /// Signal that new commits are available
83    commit_available: Condvar,
84    /// Configuration
85    config: GroupCommitConfig,
86    /// Metrics
87    metrics: GroupCommitMetrics,
88    /// Flush callback (performs actual WAL fsync)
89    #[allow(clippy::type_complexity)]
90    flush_fn: Arc<dyn Fn(&[u64]) -> Result<u64, String> + Send + Sync>,
91    /// Running flag
92    running: AtomicU64, // 1 = running, 0 = stopped
93    /// Flush thread handle
94    flush_thread: Mutex<Option<JoinHandle<()>>>,
95}
96
97/// Group commit configuration
98#[derive(Clone)]
99pub struct GroupCommitConfig {
100    /// Minimum batch size before flush
101    pub min_batch_size: usize,
102    /// Maximum batch size
103    pub max_batch_size: usize,
104    /// Maximum wait time before flush (microseconds)
105    pub max_wait_us: u64,
106    /// Initial fsync latency estimate (microseconds)
107    pub fsync_latency_us: u64,
108    /// Arrival rate EMA alpha (0.0-1.0)
109    pub ema_alpha: f64,
110}
111
112impl Default for GroupCommitConfig {
113    fn default() -> Self {
114        Self {
115            min_batch_size: 1,
116            max_batch_size: 1000,
117            max_wait_us: 10_000,     // 10ms max wait
118            fsync_latency_us: 5_000, // 5ms default
119            ema_alpha: 0.1,
120        }
121    }
122}
123
124/// Metrics for group commit monitoring
125pub struct GroupCommitMetrics {
126    /// Current adaptive batch size
127    pub adaptive_batch_size: AtomicU64,
128    /// Estimated arrival rate (req/s × 1000 for precision)
129    pub arrival_rate_ema: AtomicU64,
130    /// Estimated fsync latency (microseconds)
131    pub fsync_latency_us: AtomicU64,
132    /// Total commits processed
133    pub total_commits: AtomicU64,
134    /// Total batches processed
135    pub total_batches: AtomicU64,
136    /// Total fsync time (microseconds)
137    pub total_fsync_time_us: AtomicU64,
138    /// Last arrival timestamp (microseconds since epoch)
139    pub last_arrival_us: AtomicU64,
140}
141
142impl Default for GroupCommitMetrics {
143    fn default() -> Self {
144        Self {
145            adaptive_batch_size: AtomicU64::new(10),
146            arrival_rate_ema: AtomicU64::new(100_000), // 100 req/s initial
147            fsync_latency_us: AtomicU64::new(5_000),
148            total_commits: AtomicU64::new(0),
149            total_batches: AtomicU64::new(0),
150            total_fsync_time_us: AtomicU64::new(0),
151            last_arrival_us: AtomicU64::new(0),
152        }
153    }
154}
155
156impl EventDrivenGroupCommit {
157    /// Create a new event-driven group commit buffer
158    ///
159    /// # Arguments
160    /// * `flush_fn` - Callback that performs WAL fsync. Takes list of txn_ids, returns commit timestamp.
161    pub fn new<F>(flush_fn: F) -> Self
162    where
163        F: Fn(&[u64]) -> Result<u64, String> + Send + Sync + 'static,
164    {
165        Self::with_config(flush_fn, GroupCommitConfig::default())
166    }
167
168    /// Create with custom configuration
169    pub fn with_config<F>(flush_fn: F, config: GroupCommitConfig) -> Self
170    where
171        F: Fn(&[u64]) -> Result<u64, String> + Send + Sync + 'static,
172    {
173        let gc = Self {
174            pending: Mutex::new(VecDeque::new()),
175            commit_available: Condvar::new(),
176            config,
177            metrics: GroupCommitMetrics::default(),
178            flush_fn: Arc::new(flush_fn),
179            running: AtomicU64::new(0),
180            flush_thread: Mutex::new(None),
181        };
182
183        gc.metrics
184            .fsync_latency_us
185            .store(gc.config.fsync_latency_us, Ordering::Relaxed);
186        gc
187    }
188
189    /// Start the background flush thread
190    pub fn start(&self) -> Result<(), String> {
191        if self
192            .running
193            .compare_exchange(0, 1, Ordering::SeqCst, Ordering::Relaxed)
194            .is_err()
195        {
196            return Err("Already running".into());
197        }
198
199        // We can't easily start a thread that references self
200        // In a real implementation, we'd use Arc<Self> pattern
201        // For now, document that flush_loop should be called from the owner
202        Ok(())
203    }
204
205    /// Stop the flush thread
206    pub fn stop(&self) {
207        self.running.store(0, Ordering::SeqCst);
208
209        // Wake up the flush thread
210        let _lock = self.pending.lock().unwrap();
211        self.commit_available.notify_all();
212    }
213
214    /// Check if running
215    pub fn is_running(&self) -> bool {
216        self.running.load(Ordering::SeqCst) == 1
217    }
218
219    /// Submit a commit and wait for it to complete
220    ///
221    /// This blocks until the transaction's batch has been fsynced.
222    /// Returns the commit timestamp on success.
223    pub fn submit_and_wait(&self, txn_id: u64) -> Result<u64, String> {
224        // Update arrival rate
225        self.update_arrival_rate();
226
227        // Create notification channel
228        let notifier = Arc::new((Mutex::new(CommitResult::Pending), Condvar::new()));
229        let commit = PendingCommitV2 {
230            txn_id,
231            enqueue_time: Instant::now(),
232            notifier: notifier.clone(),
233        };
234
235        // Enqueue and check if we should trigger flush
236        let should_flush = {
237            let mut pending = self.pending.lock().unwrap();
238            pending.push_back(commit);
239
240            let batch_size = self.optimal_batch_size();
241            pending.len() >= batch_size
242        };
243
244        // Signal availability
245        self.commit_available.notify_one();
246
247        // If we should flush immediately and no background thread, flush inline
248        if should_flush && !self.is_running() {
249            self.flush_batch();
250        }
251
252        // Wait for result
253        let (lock, cvar) = &*notifier;
254        let mut result = lock.lock().unwrap();
255
256        while matches!(*result, CommitResult::Pending) {
257            // Wait with timeout for defensive programming
258            let timeout = Duration::from_micros(self.config.max_wait_us * 2);
259            let (new_result, timeout_result) = cvar.wait_timeout(result, timeout).unwrap();
260            result = new_result;
261
262            if timeout_result.timed_out() {
263                // Timeout - try flushing ourselves if no background thread
264                if !self.is_running() {
265                    drop(result);
266                    self.flush_batch();
267                    result = lock.lock().unwrap();
268                }
269            }
270        }
271
272        match &*result {
273            CommitResult::Success(ts) => Ok(*ts),
274            CommitResult::Error(e) => Err(e.clone()),
275            CommitResult::Pending => Err("Unexpected pending state".into()),
276        }
277    }
278
279    /// Flush one batch of pending commits
280    ///
281    /// Called by background flush thread or inline when needed.
282    pub fn flush_batch(&self) {
283        let batch = {
284            let mut pending = self.pending.lock().unwrap();
285            if pending.is_empty() {
286                return;
287            }
288
289            let batch_size = self.optimal_batch_size().min(pending.len());
290            pending.drain(..batch_size).collect::<Vec<_>>()
291        };
292
293        if batch.is_empty() {
294            return;
295        }
296
297        let txn_ids: Vec<_> = batch.iter().map(|c| c.txn_id).collect();
298        let batch_size = batch.len();
299
300        // Measure fsync time
301        let start = Instant::now();
302        let result = (self.flush_fn)(&txn_ids);
303        let elapsed_us = start.elapsed().as_micros() as u64;
304
305        // Update metrics
306        self.update_fsync_latency(elapsed_us);
307        self.metrics.total_batches.fetch_add(1, Ordering::Relaxed);
308        self.metrics
309            .total_commits
310            .fetch_add(batch_size as u64, Ordering::Relaxed);
311        self.metrics
312            .total_fsync_time_us
313            .fetch_add(elapsed_us, Ordering::Relaxed);
314
315        // Notify all waiters
316        for commit in batch {
317            let (lock, cvar) = &*commit.notifier;
318            let mut result_lock = lock.lock().unwrap();
319            *result_lock = match &result {
320                Ok(ts) => CommitResult::Success(*ts),
321                Err(e) => CommitResult::Error(e.clone()),
322            };
323            cvar.notify_one();
324        }
325    }
326
327    /// Background flush loop (call from owner thread)
328    pub fn flush_loop(&self) {
329        while self.is_running() {
330            let should_flush = {
331                let pending = self.pending.lock().unwrap();
332                let batch_size = self.optimal_batch_size();
333
334                if pending.len() >= batch_size {
335                    true
336                } else if pending.is_empty() {
337                    // Wait for commits
338                    let _pending = self
339                        .commit_available
340                        .wait_timeout(pending, Duration::from_micros(self.config.max_wait_us))
341                        .unwrap()
342                        .0;
343                    false
344                } else {
345                    // Have some commits, check if we should wait longer
346                    let oldest = pending
347                        .front()
348                        .map(|c| c.enqueue_time.elapsed().as_micros() as u64)
349                        .unwrap_or(0);
350
351                    if oldest > self.config.max_wait_us {
352                        true
353                    } else {
354                        // Wait for more commits
355                        let remaining =
356                            Duration::from_micros(self.config.max_wait_us.saturating_sub(oldest));
357                        let _pending = self
358                            .commit_available
359                            .wait_timeout(pending, remaining)
360                            .unwrap()
361                            .0;
362                        true // Flush after wait
363                    }
364                }
365            };
366
367            if should_flush {
368                self.flush_batch();
369            }
370        }
371    }
372
373    /// Compute optimal batch size using Little's Law
374    ///
375    /// N* = sqrt(2 × L_fsync × λ / C_wait)
376    fn optimal_batch_size(&self) -> usize {
377        let lambda = self.metrics.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0;
378        let l_fsync = self.metrics.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0;
379        let c_wait = 1.0; // Normalized waiting cost
380
381        let n_opt = (2.0 * l_fsync * lambda / c_wait).sqrt();
382        let batch_size = (n_opt as usize)
383            .max(self.config.min_batch_size)
384            .min(self.config.max_batch_size);
385
386        self.metrics
387            .adaptive_batch_size
388            .store(batch_size as u64, Ordering::Relaxed);
389        batch_size
390    }
391
392    /// Update arrival rate using exponential moving average
393    fn update_arrival_rate(&self) {
394        let now_us = Self::now_us();
395        let last = self.metrics.last_arrival_us.swap(now_us, Ordering::Relaxed);
396
397        if last > 0 {
398            let delta_us = now_us.saturating_sub(last);
399            if delta_us > 0 {
400                // Rate = 1_000_000 / delta_us (requests per second)
401                // Stored as rate × 1000 for precision
402                let instant_rate = 1_000_000_000 / delta_us;
403
404                let old_rate = self.metrics.arrival_rate_ema.load(Ordering::Relaxed);
405                let alpha = (self.config.ema_alpha * 1000.0) as u64;
406                let new_rate = (old_rate * (1000 - alpha) + instant_rate * alpha) / 1000;
407                self.metrics
408                    .arrival_rate_ema
409                    .store(new_rate, Ordering::Relaxed);
410            }
411        }
412    }
413
414    /// Update fsync latency estimate
415    fn update_fsync_latency(&self, latency_us: u64) {
416        let old = self.metrics.fsync_latency_us.load(Ordering::Relaxed);
417        let alpha = (self.config.ema_alpha * 1000.0) as u64;
418        let new = (old * (1000 - alpha) + latency_us * alpha) / 1000;
419        self.metrics.fsync_latency_us.store(new, Ordering::Relaxed);
420    }
421
422    /// Get current time in microseconds
423    fn now_us() -> u64 {
424        use std::time::{SystemTime, UNIX_EPOCH};
425        SystemTime::now()
426            .duration_since(UNIX_EPOCH)
427            .unwrap()
428            .as_micros() as u64
429    }
430
431    /// Get statistics for monitoring
432    pub fn stats(&self) -> GroupCommitStatsV2 {
433        GroupCommitStatsV2 {
434            adaptive_batch_size: self.metrics.adaptive_batch_size.load(Ordering::Relaxed) as usize,
435            arrival_rate: self.metrics.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0,
436            fsync_latency_us: self.metrics.fsync_latency_us.load(Ordering::Relaxed),
437            pending_count: self.pending.lock().unwrap().len(),
438            total_commits: self.metrics.total_commits.load(Ordering::Relaxed),
439            total_batches: self.metrics.total_batches.load(Ordering::Relaxed),
440            avg_batch_size: {
441                let batches = self.metrics.total_batches.load(Ordering::Relaxed);
442                let commits = self.metrics.total_commits.load(Ordering::Relaxed);
443                if batches > 0 {
444                    commits as f64 / batches as f64
445                } else {
446                    0.0
447                }
448            },
449            avg_fsync_time_us: {
450                let batches = self.metrics.total_batches.load(Ordering::Relaxed);
451                let time = self.metrics.total_fsync_time_us.load(Ordering::Relaxed);
452                if batches > 0 { time / batches } else { 0 }
453            },
454        }
455    }
456}
457
458/// Statistics for event-driven group commit
459#[derive(Debug, Clone)]
460pub struct GroupCommitStatsV2 {
461    /// Current adaptive batch size
462    pub adaptive_batch_size: usize,
463    /// Estimated arrival rate (requests/second)
464    pub arrival_rate: f64,
465    /// Estimated fsync latency (microseconds)
466    pub fsync_latency_us: u64,
467    /// Current pending commit count
468    pub pending_count: usize,
469    /// Total commits processed
470    pub total_commits: u64,
471    /// Total batches processed
472    pub total_batches: u64,
473    /// Average batch size
474    pub avg_batch_size: f64,
475    /// Average fsync time (microseconds)
476    pub avg_fsync_time_us: u64,
477}
478
479#[cfg(test)]
480mod tests {
481    use super::*;
482    use std::sync::atomic::AtomicU64;
483
484    #[test]
485    fn test_single_commit() {
486        let commit_ts = AtomicU64::new(100);
487        let gc = EventDrivenGroupCommit::new(move |_txn_ids| {
488            Ok(commit_ts.fetch_add(1, Ordering::SeqCst))
489        });
490
491        let result = gc.submit_and_wait(1);
492        assert!(result.is_ok());
493        assert!(result.unwrap() >= 100);
494    }
495
496    #[test]
497    fn test_batch_commit() {
498        use parking_lot::RwLock;
499        use std::sync::Arc;
500        use std::thread;
501
502        let _commit_ts = Arc::new(AtomicU64::new(100));
503        let batch_sizes = Arc::new(RwLock::new(Vec::new()));
504        let batch_sizes_clone = batch_sizes.clone();
505
506        let gc = Arc::new(EventDrivenGroupCommit::with_config(
507            move |txn_ids| {
508                batch_sizes_clone.write().push(txn_ids.len());
509                Ok(100)
510            },
511            GroupCommitConfig {
512                min_batch_size: 3,
513                max_wait_us: 1_000_000, // 1 second - long enough to batch
514                ..Default::default()
515            },
516        ));
517
518        // Submit 3 commits in parallel
519        let mut handles = vec![];
520        for i in 0..3 {
521            let gc = Arc::clone(&gc);
522            handles.push(thread::spawn(move || gc.submit_and_wait(i)));
523        }
524
525        // Wait for all
526        for h in handles {
527            assert!(h.join().unwrap().is_ok());
528        }
529
530        // Should have been batched
531        let sizes = batch_sizes.read();
532        assert!(!sizes.is_empty());
533        // The 3 commits should have been batched together
534        let total: usize = sizes.iter().sum();
535        assert_eq!(total, 3);
536    }
537
538    #[test]
539    fn test_adaptive_sizing() {
540        let gc = EventDrivenGroupCommit::with_config(
541            |_| Ok(1),
542            GroupCommitConfig {
543                fsync_latency_us: 5000, // 5ms
544                ..Default::default()
545            },
546        );
547
548        // Simulate high arrival rate (1000 req/s)
549        gc.metrics
550            .arrival_rate_ema
551            .store(1_000_000, Ordering::Relaxed);
552
553        let batch_size = gc.optimal_batch_size();
554
555        // N* = sqrt(2 × 0.005 × 1000 / 1) ≈ 3.16
556        assert!((3..=10).contains(&batch_size));
557    }
558
559    #[test]
560    fn test_stats() {
561        let gc = EventDrivenGroupCommit::new(|_| Ok(1));
562
563        gc.metrics.total_commits.store(100, Ordering::Relaxed);
564        gc.metrics.total_batches.store(10, Ordering::Relaxed);
565        gc.metrics
566            .total_fsync_time_us
567            .store(50_000, Ordering::Relaxed);
568
569        let stats = gc.stats();
570        assert_eq!(stats.total_commits, 100);
571        assert_eq!(stats.total_batches, 10);
572        assert_eq!(stats.avg_batch_size, 10.0);
573        assert_eq!(stats.avg_fsync_time_us, 5000);
574    }
575}