use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
#[derive(Debug, Default)]
pub struct BatcherMetrics {
pub rows_inserted: AtomicU64,
pub bytes_sent: AtomicU64,
pub flush_count: AtomicU64,
pub insert_errors: AtomicU64,
pub retry_count: AtomicU64,
pub pending_rows: AtomicU64,
pub pending_bytes: AtomicU64,
pub total_flush_time_ns: AtomicU64,
}
impl BatcherMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn snapshot(&self) -> BatcherMetricsSnapshot {
let flush_count = self.flush_count.load(Ordering::Relaxed);
let total_flush_ns = self.total_flush_time_ns.load(Ordering::Relaxed);
let avg_flush_time = if flush_count > 0 {
Duration::from_nanos(total_flush_ns / flush_count)
} else {
Duration::ZERO
};
BatcherMetricsSnapshot {
rows_inserted: self.rows_inserted.load(Ordering::Relaxed),
bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
flush_count,
insert_errors: self.insert_errors.load(Ordering::Relaxed),
retry_count: self.retry_count.load(Ordering::Relaxed),
pending_rows: self.pending_rows.load(Ordering::Relaxed),
pending_bytes: self.pending_bytes.load(Ordering::Relaxed),
avg_flush_time,
total_flush_time: Duration::from_nanos(total_flush_ns),
}
}
pub fn inc_rows_inserted(&self, count: u64) {
self.rows_inserted.fetch_add(count, Ordering::Relaxed);
}
pub fn inc_bytes_sent(&self, count: u64) {
self.bytes_sent.fetch_add(count, Ordering::Relaxed);
}
pub fn inc_flush_count(&self) {
self.flush_count.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_insert_errors(&self) {
self.insert_errors.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_retry_count(&self) {
self.retry_count.fetch_add(1, Ordering::Relaxed);
}
pub fn set_pending_rows(&self, count: u64) {
self.pending_rows.store(count, Ordering::Relaxed);
}
pub fn set_pending_bytes(&self, count: u64) {
self.pending_bytes.store(count, Ordering::Relaxed);
}
pub fn add_flush_time(&self, duration: Duration) {
self.total_flush_time_ns
.fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
}
pub fn reset(&self) {
self.rows_inserted.store(0, Ordering::Relaxed);
self.bytes_sent.store(0, Ordering::Relaxed);
self.flush_count.store(0, Ordering::Relaxed);
self.insert_errors.store(0, Ordering::Relaxed);
self.retry_count.store(0, Ordering::Relaxed);
self.pending_rows.store(0, Ordering::Relaxed);
self.pending_bytes.store(0, Ordering::Relaxed);
self.total_flush_time_ns.store(0, Ordering::Relaxed);
}
}
#[derive(Debug, Clone, Copy)]
pub struct BatcherMetricsSnapshot {
pub rows_inserted: u64,
pub bytes_sent: u64,
pub flush_count: u64,
pub insert_errors: u64,
pub retry_count: u64,
pub pending_rows: u64,
pub pending_bytes: u64,
pub avg_flush_time: Duration,
pub total_flush_time: Duration,
}
impl BatcherMetricsSnapshot {
pub fn rows_per_second(&self) -> f64 {
if self.total_flush_time.as_secs_f64() > 0.0 {
self.rows_inserted as f64 / self.total_flush_time.as_secs_f64()
} else {
0.0
}
}
pub fn bytes_per_second(&self) -> f64 {
if self.total_flush_time.as_secs_f64() > 0.0 {
self.bytes_sent as f64 / self.total_flush_time.as_secs_f64()
} else {
0.0
}
}
pub fn error_rate(&self) -> f64 {
if self.flush_count > 0 {
self.insert_errors as f64 / self.flush_count as f64
} else {
0.0
}
}
}