use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
const LATENCY_BUCKET_BOUNDS_US: [u64; 13] = [
100, 500, 1_000, 5_000, 10_000, 25_000, 50_000, 100_000, 250_000, 500_000, 1_000_000, 5_000_000, 10_000_000, ];
pub struct LatencyHistogram {
buckets: [AtomicU64; 14],
count: AtomicU64,
sum_us: AtomicU64,
min_us: AtomicU64,
max_us: AtomicU64,
}
impl LatencyHistogram {
pub const fn new() -> Self {
Self {
buckets: [
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
],
count: AtomicU64::new(0),
sum_us: AtomicU64::new(0),
min_us: AtomicU64::new(u64::MAX),
max_us: AtomicU64::new(0),
}
}
#[inline]
pub fn record(&self, duration: Duration) {
let us = duration.as_micros() as u64;
self.record_us(us);
}
#[inline]
pub fn record_us(&self, us: u64) {
let bucket_idx = LATENCY_BUCKET_BOUNDS_US
.iter()
.position(|&bound| us <= bound)
.unwrap_or(13);
self.buckets[bucket_idx].fetch_add(1, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
self.sum_us.fetch_add(us, Ordering::Relaxed);
let mut current_min = self.min_us.load(Ordering::Relaxed);
while us < current_min {
match self.min_us.compare_exchange_weak(
current_min,
us,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current_min = actual,
}
}
let mut current_max = self.max_us.load(Ordering::Relaxed);
while us > current_max {
match self.max_us.compare_exchange_weak(
current_max,
us,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current_max = actual,
}
}
}
#[must_use]
pub fn snapshot(&self) -> LatencySnapshot {
let count = self.count.load(Ordering::Relaxed);
let sum_us = self.sum_us.load(Ordering::Relaxed);
let min_us = self.min_us.load(Ordering::Relaxed);
let max_us = self.max_us.load(Ordering::Relaxed);
let mut buckets = [0u64; 14];
for (i, bucket) in self.buckets.iter().enumerate() {
buckets[i] = bucket.load(Ordering::Relaxed);
}
LatencySnapshot {
count,
sum_us,
min_us: if min_us == u64::MAX { 0 } else { min_us },
max_us,
buckets,
}
}
}
impl Default for LatencyHistogram {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct LatencySnapshot {
pub count: u64,
pub sum_us: u64,
pub min_us: u64,
pub max_us: u64,
pub buckets: [u64; 14],
}
impl LatencySnapshot {
#[must_use]
pub fn avg_us(&self) -> f64 {
if self.count == 0 {
0.0
} else {
self.sum_us as f64 / self.count as f64
}
}
#[must_use]
pub fn percentile(&self, p: f64) -> u64 {
if self.count == 0 || !(0.0..=100.0).contains(&p) {
return 0;
}
let target = ((p / 100.0) * self.count as f64).ceil() as u64;
let mut cumulative = 0u64;
for (i, &bucket_count) in self.buckets.iter().enumerate() {
cumulative += bucket_count;
if cumulative >= target {
return if i < 13 {
LATENCY_BUCKET_BOUNDS_US[i]
} else {
self.max_us };
}
}
self.max_us
}
#[must_use]
pub fn p50(&self) -> u64 {
self.percentile(50.0)
}
#[must_use]
pub fn p95(&self) -> u64 {
self.percentile(95.0)
}
#[must_use]
pub fn p99(&self) -> u64 {
self.percentile(99.0)
}
#[must_use]
pub fn p999(&self) -> u64 {
self.percentile(99.9)
}
}
pub struct LatencyTimer<'a> {
histogram: &'a LatencyHistogram,
start: Instant,
}
impl<'a> LatencyTimer<'a> {
pub fn new(histogram: &'a LatencyHistogram) -> Self {
Self {
histogram,
start: Instant::now(),
}
}
pub fn stop(self) {
self.histogram.record(self.start.elapsed());
}
}
impl Drop for LatencyTimer<'_> {
fn drop(&mut self) {
self.histogram.record(self.start.elapsed());
}
}
pub static LATENCY_INGEST: LatencyHistogram = LatencyHistogram::new();
pub static LATENCY_FETCH: LatencyHistogram = LatencyHistogram::new();
pub static LATENCY_IO: LatencyHistogram = LatencyHistogram::new();
pub static LATENCY_REPLICATION: LatencyHistogram = LatencyHistogram::new();
pub static LATENCY_NETWORK: LatencyHistogram = LatencyHistogram::new();
pub struct RateTracker {
last_sample_ms: AtomicU64,
last_value: AtomicU64,
rate_scaled: AtomicU64,
}
impl RateTracker {
pub const fn new() -> Self {
Self {
last_sample_ms: AtomicU64::new(0),
last_value: AtomicU64::new(0),
rate_scaled: AtomicU64::new(0),
}
}
pub fn update(&self, current_value: u64, current_time_ms: u64) {
let last_ms = self.last_sample_ms.load(Ordering::Relaxed);
let last_val = self.last_value.load(Ordering::Relaxed);
let elapsed_ms = current_time_ms.saturating_sub(last_ms);
if elapsed_ms > 0 {
let delta = current_value.saturating_sub(last_val);
let rate = (delta * 1_000_000) / elapsed_ms;
self.rate_scaled.store(rate, Ordering::Relaxed);
}
self.last_sample_ms
.store(current_time_ms, Ordering::Relaxed);
self.last_value.store(current_value, Ordering::Relaxed);
}
#[must_use]
pub fn rate_per_second(&self) -> f64 {
self.rate_scaled.load(Ordering::Relaxed) as f64 / 1000.0
}
}
impl Default for RateTracker {
fn default() -> Self {
Self::new()
}
}
pub static RATE_INGEST_OPS: RateTracker = RateTracker::new();
pub static RATE_INGEST_BYTES: RateTracker = RateTracker::new();
pub static RATE_READ_OPS: RateTracker = RateTracker::new();
pub static RATE_READ_BYTES: RateTracker = RateTracker::new();
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum ErrorType {
Protocol = 0,
Io = 1,
Timeout = 2,
Checksum = 3,
Replication = 4,
Backpressure = 5,
Auth = 6,
Internal = 7,
}
impl ErrorType {
pub const COUNT: usize = 8;
pub fn as_str(&self) -> &'static str {
match self {
Self::Protocol => "protocol",
Self::Io => "io",
Self::Timeout => "timeout",
Self::Checksum => "checksum",
Self::Replication => "replication",
Self::Backpressure => "backpressure",
Self::Auth => "auth",
Self::Internal => "internal",
}
}
}
pub static ERRORS_BY_TYPE: [AtomicU64; ErrorType::COUNT] = [
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
];
pub static ERRORS_TOTAL: AtomicU64 = AtomicU64::new(0);
#[inline]
pub fn record_error(error_type: ErrorType) {
ERRORS_BY_TYPE[error_type as usize].fetch_add(1, Ordering::Relaxed);
ERRORS_TOTAL.fetch_add(1, Ordering::Relaxed);
}
#[inline]
#[must_use]
pub fn get_error_count(error_type: ErrorType) -> u64 {
ERRORS_BY_TYPE[error_type as usize].load(Ordering::Relaxed)
}
#[inline]
#[must_use]
pub fn get_total_errors() -> u64 {
ERRORS_TOTAL.load(Ordering::Relaxed)
}
pub static SATURATION_QUEUE_DEPTH: AtomicU64 = AtomicU64::new(0);
pub static SATURATION_QUEUE_CAPACITY: AtomicU64 = AtomicU64::new(0);
pub static SATURATION_PENDING_IO: AtomicU64 = AtomicU64::new(0);
pub static SATURATION_MEMORY_USED: AtomicU64 = AtomicU64::new(0);
pub static SATURATION_MEMORY_TOTAL: AtomicU64 = AtomicU64::new(0);
pub static SATURATION_BUFFER_POOL_USED: AtomicU64 = AtomicU64::new(0);
pub static SATURATION_BUFFER_POOL_TOTAL: AtomicU64 = AtomicU64::new(0);
pub static SATURATION_CONNECTIONS_USED: AtomicU64 = AtomicU64::new(0);
pub static SATURATION_CONNECTIONS_MAX: AtomicU64 = AtomicU64::new(0);
#[inline]
pub fn set_queue_depth(current: u64, capacity: u64) {
SATURATION_QUEUE_DEPTH.store(current, Ordering::Relaxed);
SATURATION_QUEUE_CAPACITY.store(capacity, Ordering::Relaxed);
}
#[inline]
pub fn set_pending_io(count: u64) {
SATURATION_PENDING_IO.store(count, Ordering::Relaxed);
}
#[inline]
pub fn set_memory_usage(used: u64, total: u64) {
SATURATION_MEMORY_USED.store(used, Ordering::Relaxed);
SATURATION_MEMORY_TOTAL.store(total, Ordering::Relaxed);
}
#[inline]
pub fn set_buffer_pool_usage(used: u64, total: u64) {
SATURATION_BUFFER_POOL_USED.store(used, Ordering::Relaxed);
SATURATION_BUFFER_POOL_TOTAL.store(total, Ordering::Relaxed);
}
#[inline]
pub fn set_connection_usage(used: u64, max: u64) {
SATURATION_CONNECTIONS_USED.store(used, Ordering::Relaxed);
SATURATION_CONNECTIONS_MAX.store(max, Ordering::Relaxed);
}
#[inline]
#[must_use]
pub fn saturation_ratio(used: u64, total: u64) -> f64 {
if total == 0 {
0.0
} else {
used as f64 / total as f64
}
}
#[inline]
#[must_use]
pub fn queue_saturation() -> f64 {
saturation_ratio(
SATURATION_QUEUE_DEPTH.load(Ordering::Relaxed),
SATURATION_QUEUE_CAPACITY.load(Ordering::Relaxed),
)
}
#[inline]
#[must_use]
pub fn memory_saturation() -> f64 {
saturation_ratio(
SATURATION_MEMORY_USED.load(Ordering::Relaxed),
SATURATION_MEMORY_TOTAL.load(Ordering::Relaxed),
)
}
#[inline]
#[must_use]
pub fn buffer_pool_saturation() -> f64 {
saturation_ratio(
SATURATION_BUFFER_POOL_USED.load(Ordering::Relaxed),
SATURATION_BUFFER_POOL_TOTAL.load(Ordering::Relaxed),
)
}
#[derive(Debug, Clone)]
pub struct GoldenSignalsSnapshot {
pub latency_ingest: LatencySnapshot,
pub latency_fetch: LatencySnapshot,
pub latency_io: LatencySnapshot,
pub latency_replication: LatencySnapshot,
pub latency_network: LatencySnapshot,
pub traffic_ingest_ops_per_sec: f64,
pub traffic_ingest_bytes_per_sec: f64,
pub traffic_read_ops_per_sec: f64,
pub traffic_read_bytes_per_sec: f64,
pub errors_total: u64,
pub errors_by_type: [u64; ErrorType::COUNT],
pub saturation_queue: f64,
pub saturation_memory: f64,
pub saturation_buffer_pool: f64,
pub saturation_connections: f64,
pub saturation_pending_io: u64,
}
impl GoldenSignalsSnapshot {
#[must_use]
pub fn capture() -> Self {
let mut errors_by_type = [0u64; ErrorType::COUNT];
for (i, counter) in ERRORS_BY_TYPE.iter().enumerate() {
errors_by_type[i] = counter.load(Ordering::Relaxed);
}
Self {
latency_ingest: LATENCY_INGEST.snapshot(),
latency_fetch: LATENCY_FETCH.snapshot(),
latency_io: LATENCY_IO.snapshot(),
latency_replication: LATENCY_REPLICATION.snapshot(),
latency_network: LATENCY_NETWORK.snapshot(),
traffic_ingest_ops_per_sec: RATE_INGEST_OPS.rate_per_second(),
traffic_ingest_bytes_per_sec: RATE_INGEST_BYTES.rate_per_second(),
traffic_read_ops_per_sec: RATE_READ_OPS.rate_per_second(),
traffic_read_bytes_per_sec: RATE_READ_BYTES.rate_per_second(),
errors_total: ERRORS_TOTAL.load(Ordering::Relaxed),
errors_by_type,
saturation_queue: queue_saturation(),
saturation_memory: memory_saturation(),
saturation_buffer_pool: buffer_pool_saturation(),
saturation_connections: saturation_ratio(
SATURATION_CONNECTIONS_USED.load(Ordering::Relaxed),
SATURATION_CONNECTIONS_MAX.load(Ordering::Relaxed),
),
saturation_pending_io: SATURATION_PENDING_IO.load(Ordering::Relaxed),
}
}
#[must_use]
pub fn has_warnings(&self) -> bool {
self.latency_ingest.p99() > 100_000 ||
self.latency_fetch.p99() > 100_000 ||
self.saturation_queue > 0.7 ||
self.saturation_memory > 0.7 ||
self.saturation_buffer_pool > 0.7 ||
self.saturation_connections > 0.7
}
#[must_use]
pub fn has_critical(&self) -> bool {
self.latency_ingest.p99() > 1_000_000 ||
self.latency_fetch.p99() > 1_000_000 ||
self.saturation_queue > 0.9 ||
self.saturation_memory > 0.9 ||
self.saturation_buffer_pool > 0.9 ||
self.saturation_connections > 0.9
}
}
#[inline]
pub fn record_ingest_latency(duration: Duration) {
LATENCY_INGEST.record(duration);
}
#[inline]
pub fn record_fetch_latency(duration: Duration) {
LATENCY_FETCH.record(duration);
}
#[inline]
pub fn record_io_latency(duration: Duration) {
LATENCY_IO.record(duration);
}
#[inline]
pub fn record_replication_latency(duration: Duration) {
LATENCY_REPLICATION.record(duration);
}
#[inline]
pub fn record_peer_replication_latency(_peer_id: u16, duration: Duration) {
LATENCY_REPLICATION.record(duration);
}
#[inline]
pub fn record_network_latency(duration: Duration) {
LATENCY_NETWORK.record(duration);
}
#[inline]
#[must_use]
pub fn time_ingest() -> LatencyTimer<'static> {
LatencyTimer::new(&LATENCY_INGEST)
}
#[inline]
#[must_use]
pub fn time_fetch() -> LatencyTimer<'static> {
LatencyTimer::new(&LATENCY_FETCH)
}
#[inline]
#[must_use]
pub fn time_io() -> LatencyTimer<'static> {
LatencyTimer::new(&LATENCY_IO)
}
pub static LATENCY_SAMPLE_RATE: AtomicU64 = AtomicU64::new(100);
thread_local! {
static SAMPLE_COUNTER: std::cell::Cell<u64> = const { std::cell::Cell::new(0) };
}
#[inline]
pub fn should_sample() -> bool {
let rate = LATENCY_SAMPLE_RATE.load(Ordering::Relaxed);
if rate == 0 {
return true; }
SAMPLE_COUNTER.with(|counter| {
let current = counter.get();
counter.set(current.wrapping_add(1));
current % rate == 0
})
}
#[inline]
pub fn set_latency_sample_rate(rate: u64) {
LATENCY_SAMPLE_RATE.store(rate, Ordering::Relaxed);
}
#[inline]
pub fn record_ingest_latency_sampled(duration: Duration) {
if should_sample() {
LATENCY_INGEST.record(duration);
}
}
#[inline]
pub fn record_fetch_latency_sampled(duration: Duration) {
if should_sample() {
LATENCY_FETCH.record(duration);
}
}
#[inline]
pub fn record_io_latency_sampled(duration: Duration) {
if should_sample() {
LATENCY_IO.record(duration);
}
}
pub struct SampledTimer {
start: Option<Instant>,
histogram: &'static LatencyHistogram,
}
impl SampledTimer {
#[inline]
pub fn new(histogram: &'static LatencyHistogram) -> Self {
let start = if should_sample() {
Some(Instant::now())
} else {
None
};
Self { start, histogram }
}
#[inline]
#[must_use]
pub fn is_active(&self) -> bool {
self.start.is_some()
}
}
impl Drop for SampledTimer {
#[inline]
fn drop(&mut self) {
if let Some(start) = self.start {
self.histogram.record(start.elapsed());
}
}
}
#[inline]
#[must_use]
pub fn time_ingest_sampled() -> SampledTimer {
SampledTimer::new(&LATENCY_INGEST)
}
#[inline]
#[must_use]
pub fn time_fetch_sampled() -> SampledTimer {
SampledTimer::new(&LATENCY_FETCH)
}
#[inline]
#[must_use]
pub fn time_io_sampled() -> SampledTimer {
SampledTimer::new(&LATENCY_IO)
}
pub fn update_rates(
ingest_ops: u64,
ingest_bytes: u64,
read_ops: u64,
read_bytes: u64,
current_time_ms: u64,
) {
RATE_INGEST_OPS.update(ingest_ops, current_time_ms);
RATE_INGEST_BYTES.update(ingest_bytes, current_time_ms);
RATE_READ_OPS.update(read_ops, current_time_ms);
RATE_READ_BYTES.update(read_bytes, current_time_ms);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_latency_histogram() {
let hist = LatencyHistogram::new();
hist.record_us(50); hist.record_us(200); hist.record_us(1500); hist.record_us(50000); hist.record_us(150000);
let snap = hist.snapshot();
assert_eq!(snap.count, 5);
assert_eq!(snap.min_us, 50);
assert_eq!(snap.max_us, 150000);
}
#[test]
fn test_percentile_calculation() {
let hist = LatencyHistogram::new();
for _ in 0..100 {
hist.record_us(50);
}
let snap = hist.snapshot();
assert_eq!(snap.p50(), 100); assert_eq!(snap.p99(), 100);
}
#[test]
fn test_error_tracking() {
record_error(ErrorType::Io);
record_error(ErrorType::Io);
record_error(ErrorType::Timeout);
assert_eq!(get_error_count(ErrorType::Io), 2);
assert_eq!(get_error_count(ErrorType::Timeout), 1);
assert_eq!(get_total_errors(), 3);
}
#[test]
fn test_saturation_ratio() {
assert!((saturation_ratio(50, 100) - 0.5).abs() < f64::EPSILON);
assert!((saturation_ratio(0, 100) - 0.0).abs() < f64::EPSILON);
assert!((saturation_ratio(100, 100) - 1.0).abs() < f64::EPSILON);
assert!((saturation_ratio(0, 0) - 0.0).abs() < f64::EPSILON);
}
}