clicktype-batch 0.2.0

Async batching system for ClickType with backpressure and metrics
Documentation
//! Batcher metrics

use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

/// Batcher metrics with atomic counters
#[derive(Debug, Default)]
pub struct BatcherMetrics {
    /// Total rows inserted successfully
    pub rows_inserted: AtomicU64,

    /// Total bytes sent to ClickHouse
    pub bytes_sent: AtomicU64,

    /// Number of flush operations completed
    pub flush_count: AtomicU64,

    /// Number of insertion errors
    pub insert_errors: AtomicU64,

    /// Number of retry attempts
    pub retry_count: AtomicU64,

    /// Rows currently pending in buffer
    pub pending_rows: AtomicU64,

    /// Bytes currently pending in buffer
    pub pending_bytes: AtomicU64,

    /// Total time spent in flush operations (nanoseconds)
    pub total_flush_time_ns: AtomicU64,
}

impl BatcherMetrics {
    /// Create new metrics
    pub fn new() -> Self {
        Self::default()
    }

    /// Take a snapshot of current metrics
    pub fn snapshot(&self) -> BatcherMetricsSnapshot {
        let flush_count = self.flush_count.load(Ordering::Relaxed);
        let total_flush_ns = self.total_flush_time_ns.load(Ordering::Relaxed);

        let avg_flush_time = if flush_count > 0 {
            Duration::from_nanos(total_flush_ns / flush_count)
        } else {
            Duration::ZERO
        };

        BatcherMetricsSnapshot {
            rows_inserted: self.rows_inserted.load(Ordering::Relaxed),
            bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
            flush_count,
            insert_errors: self.insert_errors.load(Ordering::Relaxed),
            retry_count: self.retry_count.load(Ordering::Relaxed),
            pending_rows: self.pending_rows.load(Ordering::Relaxed),
            pending_bytes: self.pending_bytes.load(Ordering::Relaxed),
            avg_flush_time,
            total_flush_time: Duration::from_nanos(total_flush_ns),
        }
    }

    /// Increment rows inserted
    pub fn inc_rows_inserted(&self, count: u64) {
        self.rows_inserted.fetch_add(count, Ordering::Relaxed);
    }

    /// Increment bytes sent
    pub fn inc_bytes_sent(&self, count: u64) {
        self.bytes_sent.fetch_add(count, Ordering::Relaxed);
    }

    /// Increment flush count
    pub fn inc_flush_count(&self) {
        self.flush_count.fetch_add(1, Ordering::Relaxed);
    }

    /// Increment error count
    pub fn inc_insert_errors(&self) {
        self.insert_errors.fetch_add(1, Ordering::Relaxed);
    }

    /// Increment retry count
    pub fn inc_retry_count(&self) {
        self.retry_count.fetch_add(1, Ordering::Relaxed);
    }

    /// Set pending rows
    pub fn set_pending_rows(&self, count: u64) {
        self.pending_rows.store(count, Ordering::Relaxed);
    }

    /// Set pending bytes
    pub fn set_pending_bytes(&self, count: u64) {
        self.pending_bytes.store(count, Ordering::Relaxed);
    }

    /// Add flush time
    pub fn add_flush_time(&self, duration: Duration) {
        self.total_flush_time_ns
            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
    }

    /// Reset all metrics
    pub fn reset(&self) {
        self.rows_inserted.store(0, Ordering::Relaxed);
        self.bytes_sent.store(0, Ordering::Relaxed);
        self.flush_count.store(0, Ordering::Relaxed);
        self.insert_errors.store(0, Ordering::Relaxed);
        self.retry_count.store(0, Ordering::Relaxed);
        self.pending_rows.store(0, Ordering::Relaxed);
        self.pending_bytes.store(0, Ordering::Relaxed);
        self.total_flush_time_ns.store(0, Ordering::Relaxed);
    }
}

/// Immutable snapshot of batcher metrics
#[derive(Debug, Clone, Copy)]
pub struct BatcherMetricsSnapshot {
    /// Total rows inserted
    pub rows_inserted: u64,

    /// Total bytes sent
    pub bytes_sent: u64,

    /// Number of flushes
    pub flush_count: u64,

    /// Number of errors
    pub insert_errors: u64,

    /// Number of retries
    pub retry_count: u64,

    /// Pending rows
    pub pending_rows: u64,

    /// Pending bytes
    pub pending_bytes: u64,

    /// Average flush time
    pub avg_flush_time: Duration,

    /// Total flush time
    pub total_flush_time: Duration,
}

impl BatcherMetricsSnapshot {
    /// Calculate throughput in rows per second
    pub fn rows_per_second(&self) -> f64 {
        if self.total_flush_time.as_secs_f64() > 0.0 {
            self.rows_inserted as f64 / self.total_flush_time.as_secs_f64()
        } else {
            0.0
        }
    }

    /// Calculate throughput in bytes per second
    pub fn bytes_per_second(&self) -> f64 {
        if self.total_flush_time.as_secs_f64() > 0.0 {
            self.bytes_sent as f64 / self.total_flush_time.as_secs_f64()
        } else {
            0.0
        }
    }

    /// Calculate error rate
    pub fn error_rate(&self) -> f64 {
        if self.flush_count > 0 {
            self.insert_errors as f64 / self.flush_count as f64
        } else {
            0.0
        }
    }
}