runbound 0.4.5

RFC-compliant DNS resolver — drop-in Unbound with REST API, ACME auto-TLS, HMAC audit log, and master/slave HA
// SPDX-License-Identifier: AGPL-3.0-or-later
// Copyright (C) 2024-2026 RedLemonBe — https://github.com/redlemonbe/Runbound
// Query statistics — shared between DNS hot path and REST API.
//
// All counters are AtomicU64: DNS increments and API reads never contend.
// Latency histogram uses fixed buckets — zero allocation per query.
// QPS ring buffer: 300 one-second slots (5-minute window), updated by a
// dedicated background task that reads the total counter each second.

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};

// ── Latency histogram ──────────────────────────────────────────────────────
//
// 9 upper-bound thresholds in microseconds define 10 buckets:
//   [0]  ≤ 0.1 ms   [1]  ≤ 0.5 ms   [2]  ≤ 1 ms    [3]  ≤ 2 ms
//   [4]  ≤ 5 ms     [5]  ≤ 10 ms    [6]  ≤ 50 ms   [7]  ≤ 100 ms
//   [8]  ≤ 500 ms   [9]  > 500 ms  (overflow)
pub const HIST_BOUNDS_US: [u64; 9] = [
    100, 500, 1_000, 2_000, 5_000, 10_000, 50_000, 100_000, 500_000,
];
pub const HIST_BUCKETS: usize = 10;

// ── QPS ring buffer ────────────────────────────────────────────────────────
// 300 slots × 1 second each = 5-minute sliding window.
pub const QPS_RING_SIZE: usize = 300;

// ── Hickory resolver cache size ────────────────────────────────────────────
// Configured in build_resolver(); used to cap the cache_entries approximation.
const HICKORY_CACHE_SIZE: u64 = 8_192;

// ── Cache hit threshold ────────────────────────────────────────────────────
// Forward lookups completing in < 2 ms are almost certainly served from
// hickory's in-process cache (real upstream RTT is typically 5–200 ms).
pub const CACHE_HIT_THRESHOLD_US: u64 = 2_000;

pub struct Stats {
    // Core query counters
    pub total:     AtomicU64,
    pub blocked:   AtomicU64,
    pub forwarded: AtomicU64,
    pub nxdomain:  AtomicU64,
    pub refused:   AtomicU64,
    pub servfail:  AtomicU64,
    pub started_at: Instant,

    // Latency histogram — fixed 10 buckets, zero alloc per query
    pub lat_hist:  Vec<AtomicU64>,

    // QPS ring buffer — 300 one-second slots
    pub qps_ring:  Vec<AtomicU64>,
    pub qps_head:  AtomicU64,   // next write slot index
    pub qps_peak:  AtomicU64,   // all-time peak (queries in any one second)

    // Cache / local resolution metrics
    // cache_hits: forwarded lookups < CACHE_HIT_THRESHOLD_US (likely in-process cache)
    // cache_misses: forwarded lookups ≥ threshold (network round-trip)
    // cache_entries: approximate count of distinct cached domains (0..HICKORY_CACHE_SIZE)
    pub cache_hits:    AtomicU64,
    pub cache_misses:  AtomicU64,
    pub cache_entries: AtomicU64,
    // local_hits: queries answered from local zone data (config + API dns_entries)
    pub local_hits:    AtomicU64,

    // DNSSEC counters — only incremented when dnssec-validation is enabled.
    // secure:   resolved with valid DNSSEC signature chain (RRSIG present)
    // bogus:    DNSSEC validation failed (ProtoErrorKind::RrsigsNotPresent)
    // insecure: resolved OK but unsigned (no RRSIG — delegation proven unsigned by parent)
    pub dnssec_secure:   AtomicU64,
    pub dnssec_bogus:    AtomicU64,
    pub dnssec_insecure: AtomicU64,
}

impl Stats {
    pub fn new() -> Arc<Self> {
        Arc::new(Self {
            total:      AtomicU64::new(0),
            blocked:    AtomicU64::new(0),
            forwarded:  AtomicU64::new(0),
            nxdomain:   AtomicU64::new(0),
            refused:    AtomicU64::new(0),
            servfail:   AtomicU64::new(0),
            started_at: Instant::now(),
            lat_hist:   (0..HIST_BUCKETS).map(|_| AtomicU64::new(0)).collect(),
            qps_ring:   (0..QPS_RING_SIZE).map(|_| AtomicU64::new(0)).collect(),
            qps_head:   AtomicU64::new(0),
            qps_peak:   AtomicU64::new(0),
            cache_hits:    AtomicU64::new(0),
            cache_misses:  AtomicU64::new(0),
            cache_entries: AtomicU64::new(0),
            local_hits:    AtomicU64::new(0),
            dnssec_secure:   AtomicU64::new(0),
            dnssec_bogus:    AtomicU64::new(0),
            dnssec_insecure: AtomicU64::new(0),
        })
    }

    #[inline] pub fn inc_total(&self)           { self.total.fetch_add(1, Ordering::Relaxed); }
    #[inline] pub fn inc_blocked(&self)         { self.blocked.fetch_add(1, Ordering::Relaxed); }
    #[inline] pub fn inc_forwarded(&self)       { self.forwarded.fetch_add(1, Ordering::Relaxed); }
    #[inline] pub fn inc_nxdomain(&self)        { self.nxdomain.fetch_add(1, Ordering::Relaxed); }
    #[inline] pub fn inc_refused(&self)         { self.refused.fetch_add(1, Ordering::Relaxed); }
    #[inline] pub fn inc_servfail(&self)        { self.servfail.fetch_add(1, Ordering::Relaxed); }
    #[inline] pub fn inc_local_hits(&self)      { self.local_hits.fetch_add(1, Ordering::Relaxed); }
    #[inline] pub fn inc_dnssec_secure(&self)   { self.dnssec_secure.fetch_add(1, Ordering::Relaxed); }
    #[inline] pub fn inc_dnssec_bogus(&self)    { self.dnssec_bogus.fetch_add(1, Ordering::Relaxed); }
    #[inline] pub fn inc_dnssec_insecure(&self) { self.dnssec_insecure.fetch_add(1, Ordering::Relaxed); }

    /// Record query latency — zero allocation, single atomic increment.
    /// Finds the histogram bucket via binary search on the 9 thresholds.
    #[inline]
    pub fn record_latency_us(&self, us: u64) {
        // partition_point returns the first index i where HIST_BOUNDS_US[i] >= us,
        // i.e. the first bucket whose upper bound is ≥ the measured latency.
        let bucket = HIST_BOUNDS_US.partition_point(|&b| us > b);
        self.lat_hist[bucket].fetch_add(1, Ordering::Relaxed);
    }

    /// Record a completed forwarded lookup and update cache metrics.
    /// elapsed_us < 2 ms → cache hit (hickory served from its in-process DNS cache).
    /// elapsed_us ≥ 2 ms → cache miss (round-trip to upstream resolver).
    #[inline]
    pub fn record_forward(&self, elapsed_us: u64) {
        if elapsed_us < CACHE_HIT_THRESHOLD_US {
            self.cache_hits.fetch_add(1, Ordering::Relaxed);
        } else {
            self.cache_misses.fetch_add(1, Ordering::Relaxed);
            // Approximate cache fill: increment up to hickory's cache size.
            // Saturates at HICKORY_CACHE_SIZE (matching hickory's eviction behaviour).
            self.cache_entries.fetch_update(
                Ordering::Relaxed, Ordering::Relaxed,
                |n| if n < HICKORY_CACHE_SIZE { Some(n + 1) } else { None },
            ).ok();
        }
    }

    /// Reset cache counters after a resolver cache flush (called by memory_guard_loop).
    pub fn reset_cache(&self) {
        self.cache_hits.store(0, Ordering::Relaxed);
        self.cache_misses.store(0, Ordering::Relaxed);
        self.cache_entries.store(0, Ordering::Relaxed);
    }

    /// Compute a percentile (0–100) from the current latency histogram.
    /// Returns the result in milliseconds.
    pub fn percentile_ms(&self, pct: f64) -> f64 {
        let counts: [u64; HIST_BUCKETS] = std::array::from_fn(|i| {
            self.lat_hist[i].load(Ordering::Relaxed)
        });
        let total: u64 = counts.iter().sum();
        if total == 0 { return 0.0; }
        let target = ((total as f64 * pct / 100.0) as u64).max(1);
        let mut cum = 0u64;
        for (i, &c) in counts.iter().enumerate() {
            cum += c;
            if cum >= target {
                // Midpoint of bucket in µs → convert to ms
                let mid_us: u64 = if i == 0 {
                    50 // midpoint of [0, 100µs]
                } else {
                    let lo = HIST_BOUNDS_US[i - 1];
                    let hi = HIST_BOUNDS_US.get(i).copied().unwrap_or(1_000_000);
                    (lo + hi) / 2
                };
                return (mid_us as f64 / 1000.0 * 10.0).round() / 10.0;
            }
        }
        1000.0
    }

    /// Compute QPS statistics from the ring buffer.
    /// Returns (qps_1m, qps_5m, qps_peak).
    pub fn qps_stats(&self) -> (f64, f64, u64) {
        let head = self.qps_head.load(Ordering::Relaxed) as usize;
        let mut sum_1m: u64 = 0;
        let mut sum_5m: u64 = 0;
        for i in 0..QPS_RING_SIZE {
            // Walk backwards from the last written slot
            let slot = (head + QPS_RING_SIZE - 1 - i) % QPS_RING_SIZE;
            let v = self.qps_ring[slot].load(Ordering::Relaxed);
            if i < 60 { sum_1m += v; }
            sum_5m += v;
        }
        let qps_peak = self.qps_peak.load(Ordering::Relaxed);
        (
            (sum_1m as f64 / 60.0 * 10.0).round() / 10.0,
            (sum_5m as f64 / 300.0 * 10.0).round() / 10.0,
            qps_peak,
        )
    }

    pub fn snapshot(&self) -> StatsSnapshot {
        let total    = self.total.load(Ordering::Relaxed);
        let blocked  = self.blocked.load(Ordering::Relaxed);
        let nxdomain = self.nxdomain.load(Ordering::Relaxed);
        let ch = self.cache_hits.load(Ordering::Relaxed);
        let cm = self.cache_misses.load(Ordering::Relaxed);
        let cache_hit_rate = if ch + cm > 0 {
            (ch as f64 / (ch + cm) as f64 * 1000.0).round() / 10.0
        } else {
            0.0
        };
        let (qps_1m, qps_5m, qps_peak) = self.qps_stats();
        StatsSnapshot {
            total,
            blocked,
            forwarded:       self.forwarded.load(Ordering::Relaxed),
            nxdomain,
            refused:         self.refused.load(Ordering::Relaxed),
            servfail:        self.servfail.load(Ordering::Relaxed),
            uptime_secs:     self.started_at.elapsed().as_secs(),
            qps_1m,
            qps_5m,
            qps_peak,
            latency_p50_ms:  self.percentile_ms(50.0),
            latency_p95_ms:  self.percentile_ms(95.0),
            latency_p99_ms:  self.percentile_ms(99.0),
            cache_hit_rate,
            cache_entries:   self.cache_entries.load(Ordering::Relaxed),
            local_hits:      self.local_hits.load(Ordering::Relaxed),
            dnssec_secure:   self.dnssec_secure.load(Ordering::Relaxed),
            dnssec_bogus:    self.dnssec_bogus.load(Ordering::Relaxed),
            dnssec_insecure: self.dnssec_insecure.load(Ordering::Relaxed),
        }
    }
}

pub struct StatsSnapshot {
    pub total:          u64,
    pub blocked:        u64,
    pub forwarded:      u64,
    pub nxdomain:       u64,
    pub refused:        u64,
    pub servfail:       u64,
    pub uptime_secs:    u64,
    pub qps_1m:         f64,
    pub qps_5m:         f64,
    pub qps_peak:       u64,
    pub latency_p50_ms: f64,
    pub latency_p95_ms: f64,
    pub latency_p99_ms: f64,
    pub cache_hit_rate: f64,
    pub cache_entries:  u64,
    pub local_hits:     u64,
    pub dnssec_secure:   u64,
    pub dnssec_bogus:    u64,
    pub dnssec_insecure: u64,
}

// ── QPS background task ────────────────────────────────────────────────────
//
// Runs every second, computes queries in the last second (delta of total
// counter), stores in the ring buffer, and updates the all-time peak.
// Spawned once in main.rs alongside the other background tasks.
pub async fn qps_update_loop(stats: Arc<Stats>) {
    let mut interval = tokio::time::interval(Duration::from_secs(1));
    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
    let mut prev_total: u64 = 0;

    loop {
        interval.tick().await;
        let total = stats.total.load(Ordering::Relaxed);
        let qps   = total.saturating_sub(prev_total);
        prev_total = total;

        // Write to ring slot and advance head atomically.
        let slot = (stats.qps_head.fetch_add(1, Ordering::Relaxed) as usize) % QPS_RING_SIZE;
        stats.qps_ring[slot].store(qps, Ordering::Relaxed);

        // Update peak (lock-free max).
        stats.qps_peak.fetch_max(qps, Ordering::Relaxed);
    }
}