nodedb 0.0.0-beta.1

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
//! Lock-free histogram for latency distributions.
//!
//! Fixed bucket boundaries with atomic counters. O(1) recording,
//! O(buckets) serialization. Compatible with Prometheus histogram format.

use std::sync::atomic::{AtomicU64, Ordering};

/// Default latency bucket boundaries in microseconds.
///
/// Covers 10µs to 10s — suitable for database query latency.
pub const DEFAULT_BUCKETS_US: &[u64] = &[
    10,         // 10µs
    50,         // 50µs
    100,        // 100µs
    500,        // 500µs
    1_000,      // 1ms
    5_000,      // 5ms
    10_000,     // 10ms
    50_000,     // 50ms
    100_000,    // 100ms
    500_000,    // 500ms
    1_000_000,  // 1s
    5_000_000,  // 5s
    10_000_000, // 10s
];

/// Atomic histogram with fixed bucket boundaries.
///
/// Each bucket counts observations `≤ boundary`. An `+Inf` overflow
/// bucket is implicit (tracked via `count`).
pub struct AtomicHistogram {
    /// Upper bounds in microseconds.
    boundaries: &'static [u64],
    /// Bucket counters: `buckets[i]` counts observations `≤ boundaries[i]`.
    buckets: Vec<AtomicU64>,
    /// Total observations.
    count: AtomicU64,
    /// Sum of all observed values (microseconds).
    sum: AtomicU64,
}

impl AtomicHistogram {
    /// Create with default latency buckets.
    pub fn new() -> Self {
        Self::with_buckets(DEFAULT_BUCKETS_US)
    }

    /// Create with custom bucket boundaries (must be sorted ascending).
    pub fn with_buckets(boundaries: &'static [u64]) -> Self {
        let buckets = (0..boundaries.len()).map(|_| AtomicU64::new(0)).collect();
        Self {
            boundaries,
            buckets,
            count: AtomicU64::new(0),
            sum: AtomicU64::new(0),
        }
    }

    /// Record an observation in microseconds.
    pub fn observe(&self, value_us: u64) {
        self.count.fetch_add(1, Ordering::Relaxed);
        self.sum.fetch_add(value_us, Ordering::Relaxed);
        for (i, &boundary) in self.boundaries.iter().enumerate() {
            if value_us <= boundary {
                self.buckets[i].fetch_add(1, Ordering::Relaxed);
                return;
            }
        }
        // Overflow: beyond all buckets (counted in count but not in any bucket).
    }

    /// Total observation count.
    pub fn count(&self) -> u64 {
        self.count.load(Ordering::Relaxed)
    }

    /// Sum of all observations in microseconds.
    pub fn sum_us(&self) -> u64 {
        self.sum.load(Ordering::Relaxed)
    }

    /// Estimate a percentile value in microseconds from bucket boundaries.
    ///
    /// Uses linear interpolation within the bucket that contains the target rank.
    pub fn percentile(&self, p: f64) -> u64 {
        let total = self.count.load(Ordering::Relaxed);
        if total == 0 {
            return 0;
        }
        let target = (p * total as f64) as u64;
        let mut cumulative = 0u64;
        let mut prev_boundary = 0u64;

        for (i, &boundary) in self.boundaries.iter().enumerate() {
            let bucket_count = self.buckets[i].load(Ordering::Relaxed);
            cumulative += bucket_count;
            if cumulative >= target {
                // Linear interpolation within this bucket.
                let bucket_start = prev_boundary;
                let bucket_width = boundary - bucket_start;
                if bucket_count == 0 {
                    return boundary;
                }
                let fraction = if cumulative > target {
                    (bucket_count - (cumulative - target)) as f64 / bucket_count as f64
                } else {
                    1.0
                };
                return bucket_start + (fraction * bucket_width as f64) as u64;
            }
            prev_boundary = boundary;
        }
        // Beyond all buckets — return last boundary.
        self.boundaries.last().copied().unwrap_or(0)
    }

    /// Write Prometheus histogram format to the output string.
    ///
    /// Produces `_bucket{le="..."}`, `_count`, `_sum` lines.
    pub fn write_prometheus(&self, out: &mut String, name: &str, help: &str) {
        use std::fmt::Write;
        let _ = writeln!(out, "# HELP {name} {help}");
        let _ = writeln!(out, "# TYPE {name} histogram");

        let mut cumulative = 0u64;
        for (i, &boundary) in self.boundaries.iter().enumerate() {
            cumulative += self.buckets[i].load(Ordering::Relaxed);
            // All boundaries stored in microseconds; Prometheus expects seconds.
            let le = format!("{}", boundary as f64 / 1_000_000.0);
            let _ = writeln!(out, "{name}_bucket{{le=\"{le}\"}} {cumulative}");
        }
        let total = self.count.load(Ordering::Relaxed);
        let _ = writeln!(out, "{name}_bucket{{le=\"+Inf\"}} {total}");
        let _ = writeln!(out, "{name}_sum {}", self.sum_us() as f64 / 1_000_000.0);
        let _ = writeln!(out, "{name}_count {total}");
    }
}

impl Default for AtomicHistogram {
    fn default() -> Self {
        Self::new()
    }
}

// Debug impl — don't print all bucket contents.
impl std::fmt::Debug for AtomicHistogram {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("AtomicHistogram")
            .field("count", &self.count.load(Ordering::Relaxed))
            .field("sum_us", &self.sum.load(Ordering::Relaxed))
            .field("buckets", &self.boundaries.len())
            .finish()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn basic_observation() {
        let h = AtomicHistogram::new();
        h.observe(50); // 50µs → falls in ≤50µs bucket
        h.observe(500); // 500µs bucket
        h.observe(5000); // 5ms bucket
        assert_eq!(h.count(), 3);
        assert_eq!(h.sum_us(), 5550);
    }

    #[test]
    fn percentile_estimation() {
        let h = AtomicHistogram::new();
        // All observations in 1ms bucket.
        for _ in 0..100 {
            h.observe(800); // 800µs → ≤1000µs bucket
        }
        let p50 = h.percentile(0.5);
        // Should be somewhere in the 500-1000µs range.
        assert!((500..=1000).contains(&p50), "p50={p50}");
    }

    #[test]
    fn prometheus_output() {
        let h = AtomicHistogram::new();
        h.observe(100);
        h.observe(5000);
        let mut out = String::new();
        h.write_prometheus(&mut out, "nodedb_query_latency_seconds", "Query latency");
        assert!(out.contains("# TYPE nodedb_query_latency_seconds histogram"));
        assert!(out.contains("nodedb_query_latency_seconds_count 2"));
        assert!(out.contains("le=\"+Inf\""));
    }

    #[test]
    fn overflow_beyond_all_buckets() {
        let h = AtomicHistogram::new();
        h.observe(99_000_000); // 99 seconds — beyond all buckets
        assert_eq!(h.count(), 1);
        // None of the fixed buckets should contain it.
        let mut found_in_bucket = false;
        for i in 0..DEFAULT_BUCKETS_US.len() {
            if h.buckets[i].load(Ordering::Relaxed) > 0 {
                found_in_bucket = true;
            }
        }
        assert!(!found_in_bucket);
    }

    #[test]
    fn empty_histogram() {
        let h = AtomicHistogram::new();
        assert_eq!(h.count(), 0);
        assert_eq!(h.percentile(0.5), 0);
    }
}