Skip to main content

sochdb_storage/
group_commit.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Event-Driven Group Commit Buffer
19//!
20//! This module implements a proper group commit mechanism with:
21//! - Event-driven wait using condition variables (not polling)
22//! - Single fsync per batch with durability guarantee
23//! - Adaptive batch sizing based on Little's Law
24//!
25//! ## Algorithm
26//!
27//! Group Commit Queueing Model:
28//!
29//! Little's Law: N = λ × W
30//!   Where: N = avg number of requests in system
31//!          λ = arrival rate (req/sec)
32//!          W = avg time in system (sec)
33//!
34//! Optimal Batch Size: N* = sqrt(2 × L_fsync × λ / C_wait)
35//!   Where: L_fsync = fsync latency
36//!          C_wait = normalized waiting cost
37//!
38//! Example: L = 5ms, λ = 1000 req/s, C_wait = 1.0
39//!   N* = sqrt(2 × 0.005 × 1000 / 1.0) ≈ 3.16 → 3 commits/batch
40//!
41//! ## Throughput Analysis
42//!
43//! Without group commit:
44//!   Throughput = 1 / L = 200 commits/sec (for L = 5ms)
45//!
46//! With group commit (batch size N):
47//!   Throughput = N / L = N × 200 commits/sec
48//!
49//! For N = 100:
50//!   Throughput = 20,000 commits/sec
51//!   Speedup = 100x
52
53use std::collections::VecDeque;
54use std::sync::atomic::{AtomicU64, Ordering};
55use std::sync::{Arc, Condvar, Mutex};
56use std::thread::JoinHandle;
57use std::time::{Duration, Instant};
58
59/// Pending commit with notification channel
60pub struct PendingCommitV2 {
61    /// Transaction ID
62    pub txn_id: u64,
63    /// Enqueue timestamp
64    pub enqueue_time: Instant,
65    /// Notification channel (oneshot-style via Arc<Condvar>)
66    pub notifier: Arc<(Mutex<CommitResult>, Condvar)>,
67}
68
69/// Result of a commit operation
70#[derive(Debug, Clone)]
71pub enum CommitResult {
72    /// Commit pending (initial state)
73    Pending,
74    /// Commit succeeded with timestamp
75    Success(u64),
76    /// Commit failed with error message
77    Error(String),
78}
79
80/// Event-driven group commit buffer with proper synchronization
81#[allow(dead_code)]
82pub struct EventDrivenGroupCommit {
83    /// Pending commits queue
84    pending: Mutex<VecDeque<PendingCommitV2>>,
85    /// Signal that new commits are available
86    commit_available: Condvar,
87    /// Configuration
88    config: GroupCommitConfig,
89    /// Metrics
90    metrics: GroupCommitMetrics,
91    /// Flush callback (performs actual WAL fsync)
92    #[allow(clippy::type_complexity)]
93    flush_fn: Arc<dyn Fn(&[u64]) -> Result<u64, String> + Send + Sync>,
94    /// Running flag
95    running: AtomicU64, // 1 = running, 0 = stopped
96    /// Flush thread handle
97    flush_thread: Mutex<Option<JoinHandle<()>>>,
98}
99
100/// Group commit configuration
101#[derive(Clone)]
102pub struct GroupCommitConfig {
103    /// Minimum batch size before flush
104    pub min_batch_size: usize,
105    /// Maximum batch size
106    pub max_batch_size: usize,
107    /// Maximum wait time before flush (microseconds)
108    pub max_wait_us: u64,
109    /// Initial fsync latency estimate (microseconds)
110    pub fsync_latency_us: u64,
111    /// Arrival rate EMA alpha (0.0-1.0)
112    pub ema_alpha: f64,
113}
114
115impl Default for GroupCommitConfig {
116    fn default() -> Self {
117        Self {
118            min_batch_size: 1,
119            max_batch_size: 1000,
120            max_wait_us: 10_000,     // 10ms max wait
121            fsync_latency_us: 5_000, // 5ms default
122            ema_alpha: 0.1,
123        }
124    }
125}
126
127/// Metrics for group commit monitoring
128pub struct GroupCommitMetrics {
129    /// Current adaptive batch size
130    pub adaptive_batch_size: AtomicU64,
131    /// Estimated arrival rate (req/s × 1000 for precision)
132    pub arrival_rate_ema: AtomicU64,
133    /// Estimated fsync latency (microseconds)
134    pub fsync_latency_us: AtomicU64,
135    /// Total commits processed
136    pub total_commits: AtomicU64,
137    /// Total batches processed
138    pub total_batches: AtomicU64,
139    /// Total fsync time (microseconds)
140    pub total_fsync_time_us: AtomicU64,
141    /// Last arrival timestamp (microseconds since epoch)
142    pub last_arrival_us: AtomicU64,
143}
144
145impl Default for GroupCommitMetrics {
146    fn default() -> Self {
147        Self {
148            adaptive_batch_size: AtomicU64::new(10),
149            arrival_rate_ema: AtomicU64::new(100_000), // 100 req/s initial
150            fsync_latency_us: AtomicU64::new(5_000),
151            total_commits: AtomicU64::new(0),
152            total_batches: AtomicU64::new(0),
153            total_fsync_time_us: AtomicU64::new(0),
154            last_arrival_us: AtomicU64::new(0),
155        }
156    }
157}
158
159impl EventDrivenGroupCommit {
160    /// Create a new event-driven group commit buffer
161    ///
162    /// # Arguments
163    /// * `flush_fn` - Callback that performs WAL fsync. Takes list of txn_ids, returns commit timestamp.
164    pub fn new<F>(flush_fn: F) -> Self
165    where
166        F: Fn(&[u64]) -> Result<u64, String> + Send + Sync + 'static,
167    {
168        Self::with_config(flush_fn, GroupCommitConfig::default())
169    }
170
171    /// Create with custom configuration
172    pub fn with_config<F>(flush_fn: F, config: GroupCommitConfig) -> Self
173    where
174        F: Fn(&[u64]) -> Result<u64, String> + Send + Sync + 'static,
175    {
176        let gc = Self {
177            pending: Mutex::new(VecDeque::new()),
178            commit_available: Condvar::new(),
179            config,
180            metrics: GroupCommitMetrics::default(),
181            flush_fn: Arc::new(flush_fn),
182            running: AtomicU64::new(0),
183            flush_thread: Mutex::new(None),
184        };
185
186        gc.metrics
187            .fsync_latency_us
188            .store(gc.config.fsync_latency_us, Ordering::Relaxed);
189        gc
190    }
191
192    /// Start the background flush thread
193    pub fn start(&self) -> Result<(), String> {
194        if self
195            .running
196            .compare_exchange(0, 1, Ordering::SeqCst, Ordering::Relaxed)
197            .is_err()
198        {
199            return Err("Already running".into());
200        }
201
202        // We can't easily start a thread that references self
203        // In a real implementation, we'd use Arc<Self> pattern
204        // For now, document that flush_loop should be called from the owner
205        Ok(())
206    }
207
208    /// Stop the flush thread
209    pub fn stop(&self) {
210        self.running.store(0, Ordering::SeqCst);
211
212        // Wake up the flush thread
213        let _lock = self.pending.lock().unwrap();
214        self.commit_available.notify_all();
215    }
216
217    /// Check if running
218    pub fn is_running(&self) -> bool {
219        self.running.load(Ordering::SeqCst) == 1
220    }
221
222    /// Submit a commit and wait for it to complete
223    ///
224    /// This blocks until the transaction's batch has been fsynced.
225    /// Returns the commit timestamp on success.
226    pub fn submit_and_wait(&self, txn_id: u64) -> Result<u64, String> {
227        // Update arrival rate
228        self.update_arrival_rate();
229
230        // Create notification channel
231        let notifier = Arc::new((Mutex::new(CommitResult::Pending), Condvar::new()));
232        let commit = PendingCommitV2 {
233            txn_id,
234            enqueue_time: Instant::now(),
235            notifier: notifier.clone(),
236        };
237
238        // Enqueue and check if we should trigger flush
239        let should_flush = {
240            let mut pending = self.pending.lock().unwrap();
241            pending.push_back(commit);
242
243            let batch_size = self.optimal_batch_size();
244            pending.len() >= batch_size
245        };
246
247        // Signal availability
248        self.commit_available.notify_one();
249
250        // If we should flush immediately and no background thread, flush inline
251        if should_flush && !self.is_running() {
252            self.flush_batch();
253        }
254
255        // Wait for result
256        let (lock, cvar) = &*notifier;
257        let mut result = lock.lock().unwrap();
258
259        while matches!(*result, CommitResult::Pending) {
260            // Wait with timeout for defensive programming
261            let timeout = Duration::from_micros(self.config.max_wait_us * 2);
262            let (new_result, timeout_result) = cvar.wait_timeout(result, timeout).unwrap();
263            result = new_result;
264
265            if timeout_result.timed_out() {
266                // Timeout - try flushing ourselves if no background thread
267                if !self.is_running() {
268                    drop(result);
269                    self.flush_batch();
270                    result = lock.lock().unwrap();
271                }
272            }
273        }
274
275        match &*result {
276            CommitResult::Success(ts) => Ok(*ts),
277            CommitResult::Error(e) => Err(e.clone()),
278            CommitResult::Pending => Err("Unexpected pending state".into()),
279        }
280    }
281
282    /// Flush one batch of pending commits
283    ///
284    /// Called by background flush thread or inline when needed.
285    pub fn flush_batch(&self) {
286        let batch = {
287            let mut pending = self.pending.lock().unwrap();
288            if pending.is_empty() {
289                return;
290            }
291
292            let batch_size = self.optimal_batch_size().min(pending.len());
293            pending.drain(..batch_size).collect::<Vec<_>>()
294        };
295
296        if batch.is_empty() {
297            return;
298        }
299
300        let txn_ids: Vec<_> = batch.iter().map(|c| c.txn_id).collect();
301        let batch_size = batch.len();
302
303        // Measure fsync time
304        let start = Instant::now();
305        let result = (self.flush_fn)(&txn_ids);
306        let elapsed_us = start.elapsed().as_micros() as u64;
307
308        // Update metrics
309        self.update_fsync_latency(elapsed_us);
310        self.metrics.total_batches.fetch_add(1, Ordering::Relaxed);
311        self.metrics
312            .total_commits
313            .fetch_add(batch_size as u64, Ordering::Relaxed);
314        self.metrics
315            .total_fsync_time_us
316            .fetch_add(elapsed_us, Ordering::Relaxed);
317
318        // Notify all waiters
319        for commit in batch {
320            let (lock, cvar) = &*commit.notifier;
321            let mut result_lock = lock.lock().unwrap();
322            *result_lock = match &result {
323                Ok(ts) => CommitResult::Success(*ts),
324                Err(e) => CommitResult::Error(e.clone()),
325            };
326            cvar.notify_one();
327        }
328    }
329
330    /// Background flush loop (call from owner thread)
331    pub fn flush_loop(&self) {
332        while self.is_running() {
333            let should_flush = {
334                let pending = self.pending.lock().unwrap();
335                let batch_size = self.optimal_batch_size();
336
337                if pending.len() >= batch_size {
338                    true
339                } else if pending.is_empty() {
340                    // Wait for commits
341                    let _pending = self
342                        .commit_available
343                        .wait_timeout(pending, Duration::from_micros(self.config.max_wait_us))
344                        .unwrap()
345                        .0;
346                    false
347                } else {
348                    // Have some commits, check if we should wait longer
349                    let oldest = pending
350                        .front()
351                        .map(|c| c.enqueue_time.elapsed().as_micros() as u64)
352                        .unwrap_or(0);
353
354                    if oldest > self.config.max_wait_us {
355                        true
356                    } else {
357                        // Wait for more commits
358                        let remaining =
359                            Duration::from_micros(self.config.max_wait_us.saturating_sub(oldest));
360                        let _pending = self
361                            .commit_available
362                            .wait_timeout(pending, remaining)
363                            .unwrap()
364                            .0;
365                        true // Flush after wait
366                    }
367                }
368            };
369
370            if should_flush {
371                self.flush_batch();
372            }
373        }
374    }
375
376    /// Compute optimal batch size using Little's Law
377    ///
378    /// N* = sqrt(2 × L_fsync × λ / C_wait)
379    fn optimal_batch_size(&self) -> usize {
380        let lambda = self.metrics.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0;
381        let l_fsync = self.metrics.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0;
382        let c_wait = 1.0; // Normalized waiting cost
383
384        let n_opt = (2.0 * l_fsync * lambda / c_wait).sqrt();
385        let batch_size = (n_opt as usize)
386            .max(self.config.min_batch_size)
387            .min(self.config.max_batch_size);
388
389        self.metrics
390            .adaptive_batch_size
391            .store(batch_size as u64, Ordering::Relaxed);
392        batch_size
393    }
394
395    /// Update arrival rate using exponential moving average
396    fn update_arrival_rate(&self) {
397        let now_us = Self::now_us();
398        let last = self.metrics.last_arrival_us.swap(now_us, Ordering::Relaxed);
399
400        if last > 0 {
401            let delta_us = now_us.saturating_sub(last);
402            if delta_us > 0 {
403                // Rate = 1_000_000 / delta_us (requests per second)
404                // Stored as rate × 1000 for precision
405                let instant_rate = 1_000_000_000 / delta_us;
406
407                let old_rate = self.metrics.arrival_rate_ema.load(Ordering::Relaxed);
408                let alpha = (self.config.ema_alpha * 1000.0) as u64;
409                let new_rate = (old_rate * (1000 - alpha) + instant_rate * alpha) / 1000;
410                self.metrics
411                    .arrival_rate_ema
412                    .store(new_rate, Ordering::Relaxed);
413            }
414        }
415    }
416
417    /// Update fsync latency estimate
418    fn update_fsync_latency(&self, latency_us: u64) {
419        let old = self.metrics.fsync_latency_us.load(Ordering::Relaxed);
420        let alpha = (self.config.ema_alpha * 1000.0) as u64;
421        let new = (old * (1000 - alpha) + latency_us * alpha) / 1000;
422        self.metrics.fsync_latency_us.store(new, Ordering::Relaxed);
423    }
424
425    /// Get current time in microseconds
426    fn now_us() -> u64 {
427        use std::time::{SystemTime, UNIX_EPOCH};
428        SystemTime::now()
429            .duration_since(UNIX_EPOCH)
430            .unwrap()
431            .as_micros() as u64
432    }
433
434    /// Get statistics for monitoring
435    pub fn stats(&self) -> GroupCommitStatsV2 {
436        GroupCommitStatsV2 {
437            adaptive_batch_size: self.metrics.adaptive_batch_size.load(Ordering::Relaxed) as usize,
438            arrival_rate: self.metrics.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0,
439            fsync_latency_us: self.metrics.fsync_latency_us.load(Ordering::Relaxed),
440            pending_count: self.pending.lock().unwrap().len(),
441            total_commits: self.metrics.total_commits.load(Ordering::Relaxed),
442            total_batches: self.metrics.total_batches.load(Ordering::Relaxed),
443            avg_batch_size: {
444                let batches = self.metrics.total_batches.load(Ordering::Relaxed);
445                let commits = self.metrics.total_commits.load(Ordering::Relaxed);
446                if batches > 0 {
447                    commits as f64 / batches as f64
448                } else {
449                    0.0
450                }
451            },
452            avg_fsync_time_us: {
453                let batches = self.metrics.total_batches.load(Ordering::Relaxed);
454                let time = self.metrics.total_fsync_time_us.load(Ordering::Relaxed);
455                if batches > 0 { time / batches } else { 0 }
456            },
457        }
458    }
459}
460
461/// Statistics for event-driven group commit
462#[derive(Debug, Clone)]
463pub struct GroupCommitStatsV2 {
464    /// Current adaptive batch size
465    pub adaptive_batch_size: usize,
466    /// Estimated arrival rate (requests/second)
467    pub arrival_rate: f64,
468    /// Estimated fsync latency (microseconds)
469    pub fsync_latency_us: u64,
470    /// Current pending commit count
471    pub pending_count: usize,
472    /// Total commits processed
473    pub total_commits: u64,
474    /// Total batches processed
475    pub total_batches: u64,
476    /// Average batch size
477    pub avg_batch_size: f64,
478    /// Average fsync time (microseconds)
479    pub avg_fsync_time_us: u64,
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485    use std::sync::atomic::AtomicU64;
486
487    #[test]
488    fn test_single_commit() {
489        let commit_ts = AtomicU64::new(100);
490        let gc = EventDrivenGroupCommit::new(move |_txn_ids| {
491            Ok(commit_ts.fetch_add(1, Ordering::SeqCst))
492        });
493
494        let result = gc.submit_and_wait(1);
495        assert!(result.is_ok());
496        assert!(result.unwrap() >= 100);
497    }
498
499    #[test]
500    fn test_batch_commit() {
501        use parking_lot::RwLock;
502        use std::sync::Arc;
503        use std::thread;
504
505        let _commit_ts = Arc::new(AtomicU64::new(100));
506        let batch_sizes = Arc::new(RwLock::new(Vec::new()));
507        let batch_sizes_clone = batch_sizes.clone();
508
509        let gc = Arc::new(EventDrivenGroupCommit::with_config(
510            move |txn_ids| {
511                batch_sizes_clone.write().push(txn_ids.len());
512                Ok(100)
513            },
514            GroupCommitConfig {
515                min_batch_size: 3,
516                max_wait_us: 1_000_000, // 1 second - long enough to batch
517                ..Default::default()
518            },
519        ));
520
521        // Submit 3 commits in parallel
522        let mut handles = vec![];
523        for i in 0..3 {
524            let gc = Arc::clone(&gc);
525            handles.push(thread::spawn(move || gc.submit_and_wait(i)));
526        }
527
528        // Wait for all
529        for h in handles {
530            assert!(h.join().unwrap().is_ok());
531        }
532
533        // Should have been batched
534        let sizes = batch_sizes.read();
535        assert!(!sizes.is_empty());
536        // The 3 commits should have been batched together
537        let total: usize = sizes.iter().sum();
538        assert_eq!(total, 3);
539    }
540
541    #[test]
542    fn test_adaptive_sizing() {
543        let gc = EventDrivenGroupCommit::with_config(
544            |_| Ok(1),
545            GroupCommitConfig {
546                fsync_latency_us: 5000, // 5ms
547                ..Default::default()
548            },
549        );
550
551        // Simulate high arrival rate (1000 req/s)
552        gc.metrics
553            .arrival_rate_ema
554            .store(1_000_000, Ordering::Relaxed);
555
556        let batch_size = gc.optimal_batch_size();
557
558        // N* = sqrt(2 × 0.005 × 1000 / 1) ≈ 3.16
559        assert!((3..=10).contains(&batch_size));
560    }
561
562    #[test]
563    fn test_stats() {
564        let gc = EventDrivenGroupCommit::new(|_| Ok(1));
565
566        gc.metrics.total_commits.store(100, Ordering::Relaxed);
567        gc.metrics.total_batches.store(10, Ordering::Relaxed);
568        gc.metrics
569            .total_fsync_time_us
570            .store(50_000, Ordering::Relaxed);
571
572        let stats = gc.stats();
573        assert_eq!(stats.total_commits, 100);
574        assert_eq!(stats.total_batches, 10);
575        assert_eq!(stats.avg_batch_size, 10.0);
576        assert_eq!(stats.avg_fsync_time_us, 5000);
577    }
578}