net-mesh 0.23.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
//! Adaptive batch sizing for Net.
//!
//! This module provides dynamic batch sizing based on queue pressure
//! and latency targets, optimizing throughput for different workload patterns.

use std::sync::atomic::{AtomicU64, Ordering};

use super::protocol::MAX_PAYLOAD_SIZE;

/// Default minimum batch size (1KB)
pub const DEFAULT_MIN_BATCH_SIZE: usize = 1024;

/// Default maximum batch size (8KB - aligned with MAX_PAYLOAD_SIZE)
pub const DEFAULT_MAX_BATCH_SIZE: usize = MAX_PAYLOAD_SIZE;

/// Default target latency in microseconds (100μs)
pub const DEFAULT_TARGET_LATENCY_US: u64 = 100;

/// Queue depth threshold for burst detection
const BURST_THRESHOLD: usize = 50;

/// High queue depth threshold for maximum batching
const HIGH_QUEUE_THRESHOLD: usize = 100;

/// Adaptive batch sizing based on queue pressure and latency.
///
/// This batcher dynamically adjusts batch size to optimize throughput:
///
/// - **Burst mode**: When queue depth > 100, uses maximum batch size
/// - **Latency pressure**: When latency exceeds target, reduces batch size
/// - **Normal mode**: Gradually grows toward maximum batch size
///
/// # Performance
///
/// Adaptive batching can improve throughput by 15-30% for bursty workloads
/// by amortizing per-packet overhead while maintaining latency SLOs.
pub struct AdaptiveBatcher {
    /// Minimum batch size in bytes
    min_batch_size: usize,
    /// Maximum batch size in bytes
    max_batch_size: usize,
    /// Target latency in microseconds
    target_latency_us: u64,
    /// Exponential moving average of batch latency (in microseconds * 1000 for precision)
    avg_batch_latency_us_x1000: AtomicU64,
    /// Current queue depth
    queue_depth: AtomicU64,
    /// Burst detected flag
    burst_detected: std::sync::atomic::AtomicBool,
    /// Total batches processed (for metrics)
    total_batches: AtomicU64,
    /// Total bytes sent (for metrics)
    total_bytes: AtomicU64,
}

impl AdaptiveBatcher {
    /// Create a new adaptive batcher with default settings.
    pub fn new() -> Self {
        Self::with_config(
            DEFAULT_MIN_BATCH_SIZE,
            DEFAULT_MAX_BATCH_SIZE,
            DEFAULT_TARGET_LATENCY_US,
        )
    }

    /// Create a new adaptive batcher with custom configuration.
    pub fn with_config(
        min_batch_size: usize,
        max_batch_size: usize,
        target_latency_us: u64,
    ) -> Self {
        Self {
            min_batch_size,
            max_batch_size,
            target_latency_us,
            // Start at half target, using saturating arithmetic so an
            // unusually large `target_latency_us` cannot wrap u64.
            avg_batch_latency_us_x1000: AtomicU64::new(target_latency_us.saturating_mul(1000) / 2),
            queue_depth: AtomicU64::new(0),
            burst_detected: std::sync::atomic::AtomicBool::new(false),
            total_batches: AtomicU64::new(0),
            total_bytes: AtomicU64::new(0),
        }
    }

    /// Get the optimal batch size based on current conditions.
    ///
    /// This method is called before building a batch to determine
    /// how much data to accumulate before sending.
    #[inline]
    pub fn optimal_size(&self) -> usize {
        let queue_depth = self.queue_depth.load(Ordering::Relaxed) as usize;
        let burst = self.burst_detected.load(Ordering::Relaxed);
        let avg_latency = self.avg_batch_latency_us_x1000.load(Ordering::Relaxed) / 1000;

        if burst || queue_depth > HIGH_QUEUE_THRESHOLD {
            // Burst mode: maximize batch size for throughput
            self.max_batch_size
        } else if avg_latency > self.target_latency_us {
            // Latency pressure: reduce batch size
            // Scale down based on how much we exceed the target
            let ratio = self.target_latency_us as f64 / avg_latency as f64;
            let scaled = (self.max_batch_size as f64 * ratio) as usize;
            scaled.clamp(self.min_batch_size, self.max_batch_size)
        } else if queue_depth > BURST_THRESHOLD {
            // Medium pressure: use larger batches
            (self.min_batch_size + self.max_batch_size * 3) / 4
        } else {
            // Normal mode: use moderate batch size
            (self.min_batch_size + self.max_batch_size) / 2
        }
    }

    /// Record metrics after sending a batch.
    ///
    /// This updates the internal state used by `optimal_size()`.
    ///
    /// # Arguments
    ///
    /// * `batch_size` - Size of the batch in bytes
    /// * `latency_us` - Time taken to send the batch in microseconds
    /// * `queue_depth` - Current pending queue depth
    #[inline]
    pub fn record(&self, batch_size: usize, latency_us: u64, queue_depth: usize) {
        // Update exponential moving average (alpha = 0.1)
        // EMA = alpha * new + (1 - alpha) * old
        // Using fixed-point: new_x1000 = (100 * new * 1000 + 900 * old) / 1000
        //
        // Saturating arithmetic prevents a pathological `latency_us` (e.g.
        // from a stuck timer) from wrapping the u64 and corrupting the EMA.
        let old = self.avg_batch_latency_us_x1000.load(Ordering::Relaxed);
        let new_scaled = latency_us.saturating_mul(100).saturating_mul(1000);
        let old_scaled = old.saturating_mul(900);
        let new_x1000 = new_scaled.saturating_add(old_scaled) / 1000;
        self.avg_batch_latency_us_x1000
            .store(new_x1000, Ordering::Relaxed);

        // Update queue depth
        self.queue_depth
            .store(queue_depth as u64, Ordering::Relaxed);

        // Detect burst
        self.burst_detected
            .store(queue_depth > BURST_THRESHOLD, Ordering::Relaxed);

        // Update metrics
        self.total_batches.fetch_add(1, Ordering::Relaxed);
        self.total_bytes
            .fetch_add(batch_size as u64, Ordering::Relaxed);
    }

    /// Get the minimum batch size.
    #[inline]
    pub fn min_batch_size(&self) -> usize {
        self.min_batch_size
    }

    /// Get the maximum batch size.
    #[inline]
    pub fn max_batch_size(&self) -> usize {
        self.max_batch_size
    }

    /// Get the target latency in microseconds.
    #[inline]
    pub fn target_latency_us(&self) -> u64 {
        self.target_latency_us
    }

    /// Get the current average batch latency in microseconds.
    #[inline]
    pub fn avg_latency_us(&self) -> u64 {
        self.avg_batch_latency_us_x1000.load(Ordering::Relaxed) / 1000
    }

    /// Get the current queue depth.
    #[inline]
    pub fn current_queue_depth(&self) -> usize {
        self.queue_depth.load(Ordering::Relaxed) as usize
    }

    /// Check if burst mode is active.
    #[inline]
    pub fn is_burst_mode(&self) -> bool {
        self.burst_detected.load(Ordering::Relaxed)
    }

    /// Get the total number of batches processed.
    #[inline]
    pub fn total_batches(&self) -> u64 {
        self.total_batches.load(Ordering::Relaxed)
    }

    /// Get the total bytes sent.
    #[inline]
    pub fn total_bytes(&self) -> u64 {
        self.total_bytes.load(Ordering::Relaxed)
    }

    /// Get the average batch size.
    #[inline]
    pub fn avg_batch_size(&self) -> usize {
        let batches = self.total_batches.load(Ordering::Relaxed);
        if batches == 0 {
            return 0;
        }
        (self.total_bytes.load(Ordering::Relaxed) / batches) as usize
    }

    /// Reset all metrics.
    pub fn reset_metrics(&self) {
        self.total_batches.store(0, Ordering::Relaxed);
        self.total_bytes.store(0, Ordering::Relaxed);
    }
}

impl Default for AdaptiveBatcher {
    fn default() -> Self {
        Self::new()
    }
}

impl std::fmt::Debug for AdaptiveBatcher {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("AdaptiveBatcher")
            .field("min_batch_size", &self.min_batch_size)
            .field("max_batch_size", &self.max_batch_size)
            .field("target_latency_us", &self.target_latency_us)
            .field("avg_latency_us", &self.avg_latency_us())
            .field("queue_depth", &self.current_queue_depth())
            .field("burst_mode", &self.is_burst_mode())
            .field("total_batches", &self.total_batches())
            .field("total_bytes", &self.total_bytes())
            .finish()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_adaptive_batcher_default() {
        let batcher = AdaptiveBatcher::new();
        assert_eq!(batcher.min_batch_size(), DEFAULT_MIN_BATCH_SIZE);
        assert_eq!(batcher.max_batch_size(), DEFAULT_MAX_BATCH_SIZE);
        assert_eq!(batcher.target_latency_us(), DEFAULT_TARGET_LATENCY_US);
    }

    #[test]
    fn test_adaptive_batcher_custom_config() {
        let batcher = AdaptiveBatcher::with_config(512, 4096, 50);
        assert_eq!(batcher.min_batch_size(), 512);
        assert_eq!(batcher.max_batch_size(), 4096);
        assert_eq!(batcher.target_latency_us(), 50);
    }

    #[test]
    fn test_optimal_size_normal() {
        let batcher = AdaptiveBatcher::new();
        let size = batcher.optimal_size();
        // Should be between min and max
        assert!(size >= batcher.min_batch_size());
        assert!(size <= batcher.max_batch_size());
    }

    #[test]
    fn test_optimal_size_burst_mode() {
        let batcher = AdaptiveBatcher::new();

        // Simulate burst: high queue depth
        batcher.record(1000, 50, 150);

        // Should use maximum batch size
        let size = batcher.optimal_size();
        assert_eq!(size, batcher.max_batch_size());
    }

    #[test]
    fn test_optimal_size_latency_pressure() {
        let batcher = AdaptiveBatcher::with_config(1024, 8192, 100);

        // Simulate high latency (200μs when target is 100μs)
        for _ in 0..10 {
            batcher.record(4096, 200, 10);
        }

        // Should reduce batch size due to latency pressure
        let size = batcher.optimal_size();
        assert!(size < batcher.max_batch_size());
        assert!(size >= batcher.min_batch_size());
    }

    #[test]
    fn test_record_updates_metrics() {
        let batcher = AdaptiveBatcher::new();

        batcher.record(1000, 50, 20);
        batcher.record(2000, 60, 30);
        batcher.record(1500, 55, 25);

        assert_eq!(batcher.total_batches(), 3);
        assert_eq!(batcher.total_bytes(), 4500);
        assert_eq!(batcher.avg_batch_size(), 1500);
    }

    #[test]
    fn test_burst_detection() {
        let batcher = AdaptiveBatcher::new();

        // Low queue depth - no burst
        batcher.record(1000, 50, 10);
        assert!(!batcher.is_burst_mode());

        // High queue depth - burst detected
        batcher.record(1000, 50, 100);
        assert!(batcher.is_burst_mode());

        // Queue drains - burst ends
        batcher.record(1000, 50, 10);
        assert!(!batcher.is_burst_mode());
    }

    #[test]
    fn test_ema_convergence() {
        let batcher = AdaptiveBatcher::with_config(1024, 8192, 100);

        // Record consistent latency
        for _ in 0..100 {
            batcher.record(4096, 80, 10);
        }

        // EMA should converge close to 80
        let avg = batcher.avg_latency_us();
        assert!(
            (75..=85).contains(&avg),
            "EMA should converge to ~80, got {}",
            avg
        );
    }

    #[test]
    fn test_reset_metrics() {
        let batcher = AdaptiveBatcher::new();

        batcher.record(1000, 50, 20);
        batcher.record(2000, 60, 30);

        assert!(batcher.total_batches() > 0);
        assert!(batcher.total_bytes() > 0);

        batcher.reset_metrics();

        assert_eq!(batcher.total_batches(), 0);
        assert_eq!(batcher.total_bytes(), 0);
    }

    #[test]
    fn test_debug_format() {
        let batcher = AdaptiveBatcher::new();
        let debug = format!("{:?}", batcher);
        assert!(debug.contains("AdaptiveBatcher"));
        assert!(debug.contains("min_batch_size"));
        assert!(debug.contains("max_batch_size"));
    }

    #[test]
    fn test_ema_init_does_not_overflow_on_huge_target() {
        // Regression: `target_latency_us * 1000 / 2` previously wrapped for
        // large `target_latency_us`. Saturating arithmetic must keep the
        // seed within u64 and leave the batcher in a sane state.
        let batcher = AdaptiveBatcher::with_config(1024, 8192, u64::MAX);
        // No panic, and the seed is clamped rather than wrapped to a small value.
        assert!(batcher.avg_latency_us() >= u64::MAX / 2000);
    }

    #[test]
    fn test_ema_update_does_not_overflow_on_huge_latency() {
        // Regression: `100 * latency_us * 1000 + 900 * old` previously
        // wrapped u64 for pathological inputs. Saturating arithmetic must
        // clamp to u64::MAX without corrupting the stored EMA.
        let batcher = AdaptiveBatcher::with_config(1024, 8192, 100);
        batcher.record(4096, u64::MAX, 10);
        // No panic in debug; no wrapping in release. EMA should be huge,
        // not a tiny wrapped value.
        assert!(batcher.avg_latency_us() > 1_000_000);

        // optimal_size must still return a value within bounds rather than
        // panicking or returning 0 from an infinity cast.
        let size = batcher.optimal_size();
        assert!(size >= batcher.min_batch_size());
        assert!(size <= batcher.max_batch_size());
    }
}