use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::time::interval;
pub const WINDOW_SECS: u64 = 5;
const FINE_SLOTS: usize = 720; const MINUTE_SLOTS: usize = 1440; const HOURLY_SLOTS: usize = 720; const DAILY_SLOTS: usize = 365;
const MINUTE_RATIO: u64 = 12; const HOURLY_RATIO: u64 = 60; const DAILY_RATIO: u64 = 24;
const MAX_TRACKED_PATHS: usize = 200; const COARSE_TOP_PATHS: usize = 20; const TOP_PATHS_LIMIT: usize = 20;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum TimePeriod {
Min5,
Min15,
Min60,
Hr3,
Hr6,
Hr12,
Day1,
Day7,
Day30,
Month1,
Month3,
Month6,
Month12,
}
impl TimePeriod {
pub fn from_query(s: &str) -> Self {
match s {
"5min" | "5m" => Self::Min5,
"15min" | "15m" => Self::Min15,
"1h" | "60min" | "60m" => Self::Min60,
"3h" => Self::Hr3,
"6h" => Self::Hr6,
"12h" => Self::Hr12,
"1d" => Self::Day1,
"7d" => Self::Day7,
"30d" => Self::Day30,
"1mo" => Self::Month1,
"3mo" => Self::Month3,
"6mo" => Self::Month6,
"1y" | "12mo" => Self::Month12,
_ => Self::Min15,
}
}
pub fn as_str(self) -> &'static str {
match self {
Self::Min5 => "5min",
Self::Min15 => "15min",
Self::Min60 => "1h",
Self::Hr3 => "3h",
Self::Hr6 => "6h",
Self::Hr12 => "12h",
Self::Day1 => "1d",
Self::Day7 => "7d",
Self::Day30 => "30d",
Self::Month1 => "1mo",
Self::Month3 => "3mo",
Self::Month6 => "6mo",
Self::Month12 => "1y",
}
}
fn archive(&self) -> (u8, usize) {
match self {
Self::Min5 => (0, 60),
Self::Min15 => (0, 180),
Self::Min60 => (0, FINE_SLOTS),
Self::Hr3 => (1, 180),
Self::Hr6 => (1, 360),
Self::Hr12 => (1, 720),
Self::Day1 => (1, MINUTE_SLOTS),
Self::Day7 => (2, 168),
Self::Day30 => (2, HOURLY_SLOTS),
Self::Month1 => (2, HOURLY_SLOTS),
Self::Month3 => (3, 90),
Self::Month6 => (3, 180),
Self::Month12 => (3, DAILY_SLOTS),
}
}
pub fn step_secs(self) -> u64 {
match self.archive().0 {
0 => WINDOW_SECS,
1 => WINDOW_SECS * MINUTE_RATIO,
2 => WINDOW_SECS * MINUTE_RATIO * HOURLY_RATIO,
_ => WINDOW_SECS * MINUTE_RATIO * HOURLY_RATIO * DAILY_RATIO,
}
}
}
pub struct SparklineData {
pub step_secs: u64,
pub req_rate: Vec<f64>,
pub mem_kb: Vec<Option<u32>>,
pub cpu_pct: Vec<Option<f64>>,
pub auth_fail: Vec<u32>,
pub jwt_fail: Vec<u32>,
pub jwt_expiry: Vec<u32>,
pub jwt_issued: Vec<u32>,
pub err4xx: Vec<u32>,
pub err5xx: Vec<u32>,
pub active: Vec<u32>,
}
#[derive(Clone, Copy, Default)]
pub struct StreamSnap {
pub conns_active: i64,
pub conns_total: u64,
pub bytes_in: u64,
pub bytes_out: u64,
}
#[derive(Clone, Copy, Default)]
pub struct CompressionSnap {
pub responses: u64,
pub skipped: u64,
pub bytes_in: u64,
pub bytes_out: u64,
pub gzip: u64,
pub brotli: u64,
pub zstd: u64,
}
#[derive(Clone, Copy, Default)]
pub struct TlsSnap {
pub handshakes: u64,
pub failures: u64,
pub timeouts: u64,
}
#[derive(Clone, Copy, Default)]
pub struct GeoipSnap {
pub lookups: u64,
pub misses: u64,
}
#[derive(Clone, Copy, Default)]
pub struct ShutdownSnap {
pub drained: u64,
pub abandoned: u64,
}
#[derive(Clone, Copy, Default)]
pub struct AcmeSnap {
pub issuances: u64,
pub issuance_failures: u64,
pub renewals: u64,
pub renewal_failures: u64,
}
#[derive(Clone, Copy, Default)]
pub struct OcspSnap {
pub refreshes: u64,
pub refresh_failures: u64,
}
#[derive(Clone, Copy, Default)]
pub struct LbSnap {
pub picks: u64,
pub no_upstream: u64,
pub retries: u64,
pub ejections: u64,
pub health_failures: u64,
pub health_recoveries: u64,
pub health_checks: u64,
}
#[derive(Clone, Copy, Default)]
pub struct UpstreamSnap {
pub bytes_in: u64,
pub bytes_out: u64,
pub connect_errors: u64,
pub latency: [u64; 6],
}
#[derive(Clone, Copy, Default)]
pub struct RateLimitSnap {
pub triggers: u64,
pub active_keys: u64,
}
#[derive(Clone, Copy, Default)]
pub struct CacheSnap {
pub hits: u64,
pub misses: u64,
pub stores: u64,
pub bypass: u64,
pub evictions: u64,
pub revalidations: u64,
pub entries: u64,
pub bytes: u64,
}
#[derive(Clone, Copy, Default)]
pub struct DatagramSnap {
pub flows_active: u64,
pub datagrams_in: u64,
pub datagrams_out: u64,
pub bytes_in: u64,
pub bytes_out: u64,
pub flow_create: u64,
pub flow_evict: u64,
}
#[derive(Clone, Copy, Default)]
pub struct HttpConnSnap {
pub active: i64,
pub total: u64,
}
#[derive(Clone, Copy, Default)]
pub struct BackendSnap {
pub requests: u64,
pub errors: u64,
pub in_flight: i64,
}
#[derive(Clone, Copy, Default)]
pub struct CgiSnap {
pub requests: u64,
pub errors: u64,
pub in_flight: i64,
pub spawn_failures: u64,
pub timeouts: u64,
}
#[derive(Clone, Copy, Default)]
pub struct StaticSnap {
pub bytes_served: u64,
pub not_modified: u64,
pub range: u64,
}
#[derive(Clone, Copy, Default)]
pub struct OidcSnap {
pub refreshes: u64,
pub refresh_failures: u64,
pub logouts: u64,
pub discoveries: u64,
pub discovery_failures: u64,
pub userinfo_failures: u64,
pub backchannel_logouts: u64,
pub backchannel_failures: u64,
pub bearer_validations: u64,
pub bearer_failures: u64,
pub revocations: u64,
pub revocation_failures: u64,
pub callback_iss_mismatches: u64,
}
#[derive(Default)]
pub struct Snapshot {
pub uptime: Duration,
pub requests_total: u64,
pub requests_active: i64,
pub status_2xx: u64,
pub status_3xx: u64,
pub status_4xx: u64,
pub status_5xx: u64,
pub latency: [u64; 6],
pub rate_current: f64,
pub rate_1min: f64,
pub rate_5min: f64,
pub rate_15min: f64,
pub memory_kb: Option<u64>,
pub cpu_percent: Option<f64>,
pub auth_failures_total: u64,
pub jwt_failures_total: u64,
pub jwt_expiries_total: u64,
pub jwt_issued_total: u64,
pub auth_fail_1h: u64,
pub jwt_fail_1h: u64,
pub jwt_expiry_1h: u64,
pub jwt_issued_1h: u64,
pub quic_handshakes_total: u64,
pub quic_handshake_failures_total: u64,
pub quic_connections_active: i64,
pub quic_requests_total: u64,
pub quic_outbound_handshakes_total: u64,
pub stream: StreamSnap,
pub datagram: DatagramSnap,
pub compression: CompressionSnap,
pub tls: TlsSnap,
pub geoip: GeoipSnap,
pub shutdown: ShutdownSnap,
pub acme: AcmeSnap,
pub ocsp: OcspSnap,
pub lb: LbSnap,
pub upstream: UpstreamSnap,
pub rate_limit: RateLimitSnap,
pub cache: CacheSnap,
pub oidc: OidcSnap,
pub http_conns: HttpConnSnap,
pub fcgi: BackendSnap,
pub scgi: BackendSnap,
pub cgi: CgiSnap,
pub static_files: StaticSnap,
pub by_handler: Vec<(&'static str, ClassSnapshot)>,
pub by_vhost: Vec<(String, ClassSnapshot)>,
}
impl Snapshot {
pub fn uptime_human(&self) -> String {
let s = self.uptime.as_secs();
let (d, h, m, s) =
(s / 86400, (s % 86400) / 3600, (s % 3600) / 60, s % 60);
if d > 0 {
format!("{d}d {h}h {m}m")
} else if h > 0 {
format!("{h}h {m}m {s}s")
} else if m > 0 {
format!("{m}m {s}s")
} else {
format!("{s}s")
}
}
}
struct FineHistory {
req: Vec<u32>, mem: Vec<u32>, cpu: Vec<u16>, auth: Vec<u16>, jwt: Vec<u16>, jwt_expiry: Vec<u16>, jwt_issued: Vec<u16>, err4xx: Vec<u16>, err5xx: Vec<u16>, active: Vec<u16>, head: usize,
written: u64,
last_total: u64,
last_auth: u64,
last_jwt: u64,
last_jwt_expiry: u64,
last_jwt_issued: u64,
last_4xx: u64,
last_5xx: u64,
last_cpu_ticks: u64,
}
impl FineHistory {
fn new() -> Self {
Self {
req: vec![0; FINE_SLOTS],
mem: vec![0; FINE_SLOTS],
cpu: vec![0; FINE_SLOTS],
auth: vec![0; FINE_SLOTS],
jwt: vec![0; FINE_SLOTS],
jwt_expiry: vec![0; FINE_SLOTS],
jwt_issued: vec![0; FINE_SLOTS],
err4xx: vec![0; FINE_SLOTS],
err5xx: vec![0; FINE_SLOTS],
active: vec![0; FINE_SLOTS],
head: 0,
written: 0,
last_total: 0,
last_auth: 0,
last_jwt: 0,
last_jwt_expiry: 0,
last_jwt_issued: 0,
last_4xx: 0,
last_5xx: 0,
last_cpu_ticks: 0,
}
}
fn window_req(&self, n: usize) -> u64 {
let n = n.min(FINE_SLOTS);
(0..n)
.map(|i| {
let idx = (self.head + FINE_SLOTS - 1 - i) % FINE_SLOTS;
self.req[idx] as u64
})
.sum()
}
}
struct PathData {
slots: Vec<HashMap<String, u64>>,
current: HashMap<String, u64>,
total: HashMap<String, u64>,
head: usize,
}
struct CoarseArchive {
req: Vec<u32>,
mem: Vec<u32>,
cpu: Vec<u16>,
auth: Vec<u16>,
jwt: Vec<u16>,
jwt_expiry: Vec<u16>,
jwt_issued: Vec<u16>,
err4xx: Vec<u16>,
err5xx: Vec<u16>,
active: Vec<u16>,
paths: Vec<Vec<(String, u64)>>,
cap: usize,
head: usize,
written: u64,
}
impl CoarseArchive {
fn new(cap: usize) -> Self {
Self {
req: vec![0; cap],
mem: vec![0; cap],
cpu: vec![0; cap],
auth: vec![0; cap],
jwt: vec![0; cap],
jwt_expiry: vec![0; cap],
jwt_issued: vec![0; cap],
err4xx: vec![0; cap],
err5xx: vec![0; cap],
active: vec![0; cap],
paths: vec![Vec::new(); cap],
cap,
head: 0,
written: 0,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum HandlerKind {
Static,
Proxy,
Redirect,
Respond,
FastCgi,
Scgi,
Cgi,
Status,
AuthRequest,
}
pub const HANDLER_KINDS: usize = 9;
pub const HANDLER_KIND_ALL: [HandlerKind; HANDLER_KINDS] = [
HandlerKind::Static,
HandlerKind::Proxy,
HandlerKind::Redirect,
HandlerKind::Respond,
HandlerKind::FastCgi,
HandlerKind::Scgi,
HandlerKind::Cgi,
HandlerKind::Status,
HandlerKind::AuthRequest,
];
impl HandlerKind {
fn index(self) -> usize {
self as usize
}
pub fn label(self) -> &'static str {
match self {
Self::Static => "static",
Self::Proxy => "proxy",
Self::Redirect => "redirect",
Self::Respond => "respond",
Self::FastCgi => "fastcgi",
Self::Scgi => "scgi",
Self::Cgi => "cgi",
Self::Status => "status",
Self::AuthRequest => "auth-request",
}
}
}
#[derive(Default)]
pub struct ClassCounters {
pub total: AtomicU64,
pub s2xx: AtomicU64,
pub s3xx: AtomicU64,
pub s4xx: AtomicU64,
pub s5xx: AtomicU64,
}
impl ClassCounters {
fn record(&self, status: u16) {
self.total.fetch_add(1, Ordering::Relaxed);
match status / 100 {
2 => self.s2xx.fetch_add(1, Ordering::Relaxed),
3 => self.s3xx.fetch_add(1, Ordering::Relaxed),
4 => self.s4xx.fetch_add(1, Ordering::Relaxed),
5 => self.s5xx.fetch_add(1, Ordering::Relaxed),
_ => 0,
};
}
fn snapshot(&self) -> ClassSnapshot {
ClassSnapshot {
total: self.total.load(Ordering::Relaxed),
s2xx: self.s2xx.load(Ordering::Relaxed),
s3xx: self.s3xx.load(Ordering::Relaxed),
s4xx: self.s4xx.load(Ordering::Relaxed),
s5xx: self.s5xx.load(Ordering::Relaxed),
}
}
}
pub type ClassSnapshots =
(Vec<(&'static str, ClassSnapshot)>, Vec<(String, ClassSnapshot)>);
#[derive(Clone, Copy, Default)]
pub struct ClassSnapshot {
pub total: u64,
pub s2xx: u64,
pub s3xx: u64,
pub s4xx: u64,
pub s5xx: u64,
}
pub struct Metrics {
pub start_time: Instant,
pub requests_total: AtomicU64,
pub requests_active: AtomicI64,
pub status_2xx: AtomicU64,
pub status_3xx: AtomicU64,
pub status_4xx: AtomicU64,
pub status_5xx: AtomicU64,
pub latency: [AtomicU64; 6],
pub auth_failures: AtomicU64,
pub jwt_failures: AtomicU64,
pub jwt_expiries: AtomicU64,
pub jwt_issued: AtomicU64,
pub oidc_refreshes: AtomicU64,
pub oidc_refresh_failures: AtomicU64,
pub oidc_logouts: AtomicU64,
pub oidc_discoveries: AtomicU64,
pub oidc_discovery_failures: AtomicU64,
pub oidc_userinfo_failures: AtomicU64,
pub oidc_backchannel_logouts: AtomicU64,
pub oidc_backchannel_failures: AtomicU64,
pub oidc_bearer_validations: AtomicU64,
pub oidc_bearer_failures: AtomicU64,
pub oidc_revocations: AtomicU64,
pub oidc_revocation_failures: AtomicU64,
pub oidc_callback_iss_mismatches: AtomicU64,
pub quic_handshakes_total: AtomicU64,
pub quic_handshake_failures_total: AtomicU64,
pub quic_connections_active: AtomicI64,
pub quic_requests_total: AtomicU64,
pub quic_outbound_handshakes_total: AtomicU64,
pub proxy_lb_picks: AtomicU64,
pub proxy_lb_no_upstream: AtomicU64,
pub proxy_lb_retries: AtomicU64,
pub proxy_lb_ejections: AtomicU64,
pub proxy_lb_health_failures: AtomicU64,
pub proxy_lb_health_recoveries: AtomicU64,
pub rate_limit_triggers: AtomicU64,
pub rate_limit_active_keys: AtomicU64,
pub cache_hits: AtomicU64,
pub cache_misses: AtomicU64,
pub cache_stores: AtomicU64,
pub cache_bypass: AtomicU64,
pub cache_evictions: AtomicU64,
pub cache_revalidations: AtomicU64,
pub cache_entries: AtomicU64,
pub cache_bytes: AtomicU64,
pub ocsp_refreshes: AtomicU64,
pub ocsp_refresh_failures: AtomicU64,
pub datagram_flows_active: AtomicU64,
pub datagrams_in_total: AtomicU64,
pub datagrams_out_total: AtomicU64,
pub bytes_in_total: AtomicU64,
pub bytes_out_total: AtomicU64,
#[allow(dead_code)]
pub datagrams_dropped_oversize_total: AtomicU64,
pub datagram_flow_create_total: AtomicU64,
pub datagram_flow_evict_total: AtomicU64,
pub stream_conns_active: AtomicI64,
pub stream_conns_total: AtomicU64,
pub stream_bytes_in_total: AtomicU64,
pub stream_bytes_out_total: AtomicU64,
pub compress_responses_total: AtomicU64,
pub compress_skipped_total: AtomicU64,
pub compress_bytes_in_total: AtomicU64,
pub compress_bytes_out_total: AtomicU64,
pub compress_gzip_total: AtomicU64,
pub compress_brotli_total: AtomicU64,
pub compress_zstd_total: AtomicU64,
pub tls_handshakes_total: AtomicU64,
pub tls_handshake_failures_total: AtomicU64,
pub tls_handshake_timeouts_total: AtomicU64,
pub geoip_lookups_total: AtomicU64,
pub geoip_lookup_misses_total: AtomicU64,
pub shutdown_drained_total: AtomicU64,
pub shutdown_abandoned_total: AtomicU64,
pub acme_issuances_total: AtomicU64,
pub acme_issuance_failures_total: AtomicU64,
pub acme_renewals_total: AtomicU64,
pub acme_renewal_failures_total: AtomicU64,
pub proxy_lb_health_checks_total: AtomicU64,
pub proxy_upstream_bytes_in_total: AtomicU64,
pub proxy_upstream_bytes_out_total: AtomicU64,
pub proxy_upstream_connect_errors_total: AtomicU64,
pub proxy_upstream_latency: [AtomicU64; 6],
pub http_conns_active: AtomicI64,
pub http_conns_total: AtomicU64,
pub fcgi_requests_total: AtomicU64,
pub fcgi_errors_total: AtomicU64,
pub fcgi_in_flight: AtomicI64,
pub scgi_requests_total: AtomicU64,
pub scgi_errors_total: AtomicU64,
pub scgi_in_flight: AtomicI64,
pub cgi_requests_total: AtomicU64,
pub cgi_errors_total: AtomicU64,
pub cgi_in_flight: AtomicI64,
pub cgi_spawn_failures_total: AtomicU64,
pub cgi_timeouts_total: AtomicU64,
pub static_bytes_served_total: AtomicU64,
pub static_not_modified_total: AtomicU64,
pub static_range_total: AtomicU64,
pub per_kind: [ClassCounters; HANDLER_KINDS],
pub per_vhost: RwLock<HashMap<String, Arc<ClassCounters>>>,
fine: Mutex<FineHistory>,
paths: Mutex<PathData>,
minute: Mutex<CoarseArchive>,
hourly: Mutex<CoarseArchive>,
daily: Mutex<CoarseArchive>,
}
fn latency_bucket(latency_ms: u128) -> usize {
match latency_ms {
ms if ms < 1 => 0,
ms if ms < 10 => 1,
ms if ms < 50 => 2,
ms if ms < 200 => 3,
ms if ms < 1000 => 4,
_ => 5,
}
}
impl Metrics {
pub fn new() -> Self {
Self {
start_time: Instant::now(),
requests_total: AtomicU64::new(0),
requests_active: AtomicI64::new(0),
status_2xx: AtomicU64::new(0),
status_3xx: AtomicU64::new(0),
status_4xx: AtomicU64::new(0),
status_5xx: AtomicU64::new(0),
latency: std::array::from_fn(|_| AtomicU64::new(0)),
auth_failures: AtomicU64::new(0),
jwt_failures: AtomicU64::new(0),
jwt_expiries: AtomicU64::new(0),
jwt_issued: AtomicU64::new(0),
oidc_refreshes: AtomicU64::new(0),
oidc_refresh_failures: AtomicU64::new(0),
oidc_logouts: AtomicU64::new(0),
oidc_discoveries: AtomicU64::new(0),
oidc_discovery_failures: AtomicU64::new(0),
oidc_userinfo_failures: AtomicU64::new(0),
oidc_backchannel_logouts: AtomicU64::new(0),
oidc_backchannel_failures: AtomicU64::new(0),
oidc_bearer_validations: AtomicU64::new(0),
oidc_bearer_failures: AtomicU64::new(0),
oidc_revocations: AtomicU64::new(0),
oidc_revocation_failures: AtomicU64::new(0),
oidc_callback_iss_mismatches: AtomicU64::new(0),
quic_handshakes_total: AtomicU64::new(0),
quic_handshake_failures_total: AtomicU64::new(0),
quic_connections_active: AtomicI64::new(0),
quic_requests_total: AtomicU64::new(0),
quic_outbound_handshakes_total: AtomicU64::new(0),
proxy_lb_picks: AtomicU64::new(0),
proxy_lb_no_upstream: AtomicU64::new(0),
proxy_lb_retries: AtomicU64::new(0),
proxy_lb_ejections: AtomicU64::new(0),
proxy_lb_health_failures: AtomicU64::new(0),
proxy_lb_health_recoveries: AtomicU64::new(0),
rate_limit_triggers: AtomicU64::new(0),
rate_limit_active_keys: AtomicU64::new(0),
cache_hits: AtomicU64::new(0),
cache_misses: AtomicU64::new(0),
cache_stores: AtomicU64::new(0),
cache_bypass: AtomicU64::new(0),
cache_evictions: AtomicU64::new(0),
cache_revalidations: AtomicU64::new(0),
cache_entries: AtomicU64::new(0),
cache_bytes: AtomicU64::new(0),
ocsp_refreshes: AtomicU64::new(0),
ocsp_refresh_failures: AtomicU64::new(0),
datagram_flows_active: AtomicU64::new(0),
datagrams_in_total: AtomicU64::new(0),
datagrams_out_total: AtomicU64::new(0),
bytes_in_total: AtomicU64::new(0),
bytes_out_total: AtomicU64::new(0),
datagrams_dropped_oversize_total: AtomicU64::new(0),
datagram_flow_create_total: AtomicU64::new(0),
datagram_flow_evict_total: AtomicU64::new(0),
stream_conns_active: AtomicI64::new(0),
stream_conns_total: AtomicU64::new(0),
stream_bytes_in_total: AtomicU64::new(0),
stream_bytes_out_total: AtomicU64::new(0),
compress_responses_total: AtomicU64::new(0),
compress_skipped_total: AtomicU64::new(0),
compress_bytes_in_total: AtomicU64::new(0),
compress_bytes_out_total: AtomicU64::new(0),
compress_gzip_total: AtomicU64::new(0),
compress_brotli_total: AtomicU64::new(0),
compress_zstd_total: AtomicU64::new(0),
tls_handshakes_total: AtomicU64::new(0),
tls_handshake_failures_total: AtomicU64::new(0),
tls_handshake_timeouts_total: AtomicU64::new(0),
geoip_lookups_total: AtomicU64::new(0),
geoip_lookup_misses_total: AtomicU64::new(0),
shutdown_drained_total: AtomicU64::new(0),
shutdown_abandoned_total: AtomicU64::new(0),
acme_issuances_total: AtomicU64::new(0),
acme_issuance_failures_total: AtomicU64::new(0),
acme_renewals_total: AtomicU64::new(0),
acme_renewal_failures_total: AtomicU64::new(0),
proxy_lb_health_checks_total: AtomicU64::new(0),
proxy_upstream_bytes_in_total: AtomicU64::new(0),
proxy_upstream_bytes_out_total: AtomicU64::new(0),
proxy_upstream_connect_errors_total: AtomicU64::new(0),
proxy_upstream_latency: std::array::from_fn(|_| {
AtomicU64::new(0)
}),
http_conns_active: AtomicI64::new(0),
http_conns_total: AtomicU64::new(0),
fcgi_requests_total: AtomicU64::new(0),
fcgi_errors_total: AtomicU64::new(0),
fcgi_in_flight: AtomicI64::new(0),
scgi_requests_total: AtomicU64::new(0),
scgi_errors_total: AtomicU64::new(0),
scgi_in_flight: AtomicI64::new(0),
cgi_requests_total: AtomicU64::new(0),
cgi_errors_total: AtomicU64::new(0),
cgi_in_flight: AtomicI64::new(0),
cgi_spawn_failures_total: AtomicU64::new(0),
cgi_timeouts_total: AtomicU64::new(0),
static_bytes_served_total: AtomicU64::new(0),
static_not_modified_total: AtomicU64::new(0),
static_range_total: AtomicU64::new(0),
per_kind: std::array::from_fn(|_| ClassCounters::default()),
per_vhost: RwLock::new(HashMap::new()),
fine: Mutex::new(FineHistory::new()),
paths: Mutex::new(PathData {
slots: (0..FINE_SLOTS).map(|_| HashMap::new()).collect(),
current: HashMap::new(),
total: HashMap::new(),
head: 0,
}),
minute: Mutex::new(CoarseArchive::new(MINUTE_SLOTS)),
hourly: Mutex::new(CoarseArchive::new(HOURLY_SLOTS)),
daily: Mutex::new(CoarseArchive::new(DAILY_SLOTS)),
}
}
pub fn record_path(&self, path: &str) {
let p = if path.len() > 128 { &path[..128] } else { path };
let mut ph =
self.paths.lock().unwrap_or_else(|e| e.into_inner());
let under_cap = ph.total.len() < MAX_TRACKED_PATHS;
if under_cap || ph.total.contains_key(p) {
*ph.total.entry(p.to_owned()).or_insert(0) += 1;
}
let cur_under = ph.current.len() < MAX_TRACKED_PATHS;
if cur_under || ph.current.contains_key(p) {
*ph.current.entry(p.to_owned()).or_insert(0) += 1;
}
}
pub fn record(&self, status: u16, latency_ms: u128) {
self.requests_total.fetch_add(1, Ordering::Relaxed);
match status / 100 {
2 => {
self.status_2xx.fetch_add(1, Ordering::Relaxed);
}
3 => {
self.status_3xx.fetch_add(1, Ordering::Relaxed);
}
4 => {
self.status_4xx.fetch_add(1, Ordering::Relaxed);
}
5 => {
self.status_5xx.fetch_add(1, Ordering::Relaxed);
}
_ => {}
}
self.latency[latency_bucket(latency_ms)]
.fetch_add(1, Ordering::Relaxed);
}
pub fn record_class(
&self,
kind: HandlerKind,
vhost: &str,
status: u16,
) {
self.per_kind[kind.index()].record(status);
{
let map =
self.per_vhost.read().unwrap_or_else(|p| p.into_inner());
if let Some(c) = map.get(vhost) {
c.record(status);
return;
}
}
let entry = {
let mut map = self
.per_vhost
.write()
.unwrap_or_else(|p| p.into_inner());
map.entry(vhost.to_owned())
.or_insert_with(|| Arc::new(ClassCounters::default()))
.clone()
};
entry.record(status);
}
pub fn record_proxy_upstream_latency(&self, latency_ms: u128) {
self.proxy_upstream_latency[latency_bucket(latency_ms)]
.fetch_add(1, Ordering::Relaxed);
}
pub fn class_snapshots(&self) -> ClassSnapshots {
let by_kind = HANDLER_KIND_ALL
.iter()
.map(|k| (k.label(), self.per_kind[k.index()].snapshot()))
.collect();
let map = self.per_vhost.read().unwrap_or_else(|p| p.into_inner());
let by_vhost =
map.iter().map(|(n, c)| (n.clone(), c.snapshot())).collect();
(by_kind, by_vhost)
}
pub fn inc_active(&self) {
self.requests_active.fetch_add(1, Ordering::Relaxed);
}
pub fn dec_active(&self) {
self.requests_active.fetch_sub(1, Ordering::Relaxed);
}
pub fn snapshot(&self) -> Snapshot {
let total = self.requests_total.load(Ordering::Relaxed);
let auth_total = self.auth_failures.load(Ordering::Relaxed);
let jwt_total = self.jwt_failures.load(Ordering::Relaxed);
let jwt_expiry_total =
self.jwt_expiries.load(Ordering::Relaxed);
let jwt_issued_total = self.jwt_issued.load(Ordering::Relaxed);
let fine = self.fine.lock().unwrap_or_else(|p| p.into_inner());
let since_last = total.saturating_sub(fine.last_total);
let rate_current = since_last as f64 / WINDOW_SECS as f64;
let rate_1min =
fine.window_req(12) as f64 / (12.0 * WINDOW_SECS as f64);
let rate_5min =
fine.window_req(60) as f64 / (60.0 * WINDOW_SECS as f64);
let rate_15min =
fine.window_req(180) as f64 / (180.0 * WINDOW_SECS as f64);
let latest = (fine.head + FINE_SLOTS - 1) % FINE_SLOTS;
let cpu_percent = cpu_pct_from_hp(fine.cpu[latest]);
drop(fine);
let (auth_fail_1h, jwt_fail_1h, jwt_expiry_1h, jwt_issued_1h) = {
let m = self.minute.lock().unwrap_or_else(|p| p.into_inner());
let n = 60.min(m.written as usize);
let auth: u64 = (0..n)
.map(|i| {
let idx = (m.head + m.cap - 1 - i) % m.cap;
m.auth[idx] as u64
})
.sum();
let jwt: u64 = (0..n)
.map(|i| {
let idx = (m.head + m.cap - 1 - i) % m.cap;
m.jwt[idx] as u64
})
.sum();
let jwt_exp: u64 = (0..n)
.map(|i| {
let idx = (m.head + m.cap - 1 - i) % m.cap;
m.jwt_expiry[idx] as u64
})
.sum();
let jwt_iss: u64 = (0..n)
.map(|i| {
let idx = (m.head + m.cap - 1 - i) % m.cap;
m.jwt_issued[idx] as u64
})
.sum();
(auth, jwt, jwt_exp, jwt_iss)
};
let (by_handler, by_vhost) = self.class_snapshots();
Snapshot {
uptime: self.start_time.elapsed(),
requests_total: total,
requests_active: self.requests_active.load(Ordering::Relaxed),
status_2xx: self.status_2xx.load(Ordering::Relaxed),
status_3xx: self.status_3xx.load(Ordering::Relaxed),
status_4xx: self.status_4xx.load(Ordering::Relaxed),
status_5xx: self.status_5xx.load(Ordering::Relaxed),
latency: std::array::from_fn(|i| {
self.latency[i].load(Ordering::Relaxed)
}),
rate_current,
rate_1min,
rate_5min,
rate_15min,
memory_kb: read_memory_kb(),
cpu_percent,
auth_failures_total: auth_total,
jwt_failures_total: jwt_total,
jwt_expiries_total: jwt_expiry_total,
jwt_issued_total,
auth_fail_1h,
jwt_fail_1h,
jwt_expiry_1h,
jwt_issued_1h,
quic_handshakes_total: self
.quic_handshakes_total
.load(Ordering::Relaxed),
quic_handshake_failures_total: self
.quic_handshake_failures_total
.load(Ordering::Relaxed),
quic_connections_active: self
.quic_connections_active
.load(Ordering::Relaxed),
quic_requests_total: self
.quic_requests_total
.load(Ordering::Relaxed),
quic_outbound_handshakes_total: self
.quic_outbound_handshakes_total
.load(Ordering::Relaxed),
stream: self.stream_snap(),
datagram: self.datagram_snap(),
compression: self.compression_snap(),
tls: self.tls_snap(),
geoip: self.geoip_snap(),
shutdown: self.shutdown_snap(),
acme: self.acme_snap(),
ocsp: self.ocsp_snap(),
lb: self.lb_snap(),
upstream: self.upstream_snap(),
rate_limit: self.rate_limit_snap(),
cache: self.cache_snap(),
oidc: self.oidc_snap(),
http_conns: HttpConnSnap {
active: self.http_conns_active.load(Ordering::Relaxed),
total: self.http_conns_total.load(Ordering::Relaxed),
},
fcgi: BackendSnap {
requests: self.fcgi_requests_total.load(Ordering::Relaxed),
errors: self.fcgi_errors_total.load(Ordering::Relaxed),
in_flight: self.fcgi_in_flight.load(Ordering::Relaxed),
},
scgi: BackendSnap {
requests: self.scgi_requests_total.load(Ordering::Relaxed),
errors: self.scgi_errors_total.load(Ordering::Relaxed),
in_flight: self.scgi_in_flight.load(Ordering::Relaxed),
},
cgi: CgiSnap {
requests: self.cgi_requests_total.load(Ordering::Relaxed),
errors: self.cgi_errors_total.load(Ordering::Relaxed),
in_flight: self.cgi_in_flight.load(Ordering::Relaxed),
spawn_failures: self
.cgi_spawn_failures_total
.load(Ordering::Relaxed),
timeouts: self.cgi_timeouts_total.load(Ordering::Relaxed),
},
static_files: StaticSnap {
bytes_served: self
.static_bytes_served_total
.load(Ordering::Relaxed),
not_modified: self
.static_not_modified_total
.load(Ordering::Relaxed),
range: self.static_range_total.load(Ordering::Relaxed),
},
by_handler,
by_vhost,
}
}
fn stream_snap(&self) -> StreamSnap {
StreamSnap {
conns_active: self.stream_conns_active.load(Ordering::Relaxed),
conns_total: self.stream_conns_total.load(Ordering::Relaxed),
bytes_in: self.stream_bytes_in_total.load(Ordering::Relaxed),
bytes_out: self.stream_bytes_out_total.load(Ordering::Relaxed),
}
}
fn datagram_snap(&self) -> DatagramSnap {
DatagramSnap {
flows_active: self.datagram_flows_active.load(Ordering::Relaxed),
datagrams_in: self.datagrams_in_total.load(Ordering::Relaxed),
datagrams_out: self.datagrams_out_total.load(Ordering::Relaxed),
bytes_in: self.bytes_in_total.load(Ordering::Relaxed),
bytes_out: self.bytes_out_total.load(Ordering::Relaxed),
flow_create: self
.datagram_flow_create_total
.load(Ordering::Relaxed),
flow_evict: self
.datagram_flow_evict_total
.load(Ordering::Relaxed),
}
}
fn compression_snap(&self) -> CompressionSnap {
CompressionSnap {
responses: self.compress_responses_total.load(Ordering::Relaxed),
skipped: self.compress_skipped_total.load(Ordering::Relaxed),
bytes_in: self.compress_bytes_in_total.load(Ordering::Relaxed),
bytes_out: self.compress_bytes_out_total.load(Ordering::Relaxed),
gzip: self.compress_gzip_total.load(Ordering::Relaxed),
brotli: self.compress_brotli_total.load(Ordering::Relaxed),
zstd: self.compress_zstd_total.load(Ordering::Relaxed),
}
}
fn tls_snap(&self) -> TlsSnap {
TlsSnap {
handshakes: self.tls_handshakes_total.load(Ordering::Relaxed),
failures: self
.tls_handshake_failures_total
.load(Ordering::Relaxed),
timeouts: self
.tls_handshake_timeouts_total
.load(Ordering::Relaxed),
}
}
fn geoip_snap(&self) -> GeoipSnap {
GeoipSnap {
lookups: self.geoip_lookups_total.load(Ordering::Relaxed),
misses: self.geoip_lookup_misses_total.load(Ordering::Relaxed),
}
}
fn shutdown_snap(&self) -> ShutdownSnap {
ShutdownSnap {
drained: self.shutdown_drained_total.load(Ordering::Relaxed),
abandoned: self.shutdown_abandoned_total.load(Ordering::Relaxed),
}
}
fn acme_snap(&self) -> AcmeSnap {
AcmeSnap {
issuances: self.acme_issuances_total.load(Ordering::Relaxed),
issuance_failures: self
.acme_issuance_failures_total
.load(Ordering::Relaxed),
renewals: self.acme_renewals_total.load(Ordering::Relaxed),
renewal_failures: self
.acme_renewal_failures_total
.load(Ordering::Relaxed),
}
}
fn ocsp_snap(&self) -> OcspSnap {
OcspSnap {
refreshes: self.ocsp_refreshes.load(Ordering::Relaxed),
refresh_failures: self
.ocsp_refresh_failures
.load(Ordering::Relaxed),
}
}
fn lb_snap(&self) -> LbSnap {
LbSnap {
picks: self.proxy_lb_picks.load(Ordering::Relaxed),
no_upstream: self.proxy_lb_no_upstream.load(Ordering::Relaxed),
retries: self.proxy_lb_retries.load(Ordering::Relaxed),
ejections: self.proxy_lb_ejections.load(Ordering::Relaxed),
health_failures: self
.proxy_lb_health_failures
.load(Ordering::Relaxed),
health_recoveries: self
.proxy_lb_health_recoveries
.load(Ordering::Relaxed),
health_checks: self
.proxy_lb_health_checks_total
.load(Ordering::Relaxed),
}
}
fn upstream_snap(&self) -> UpstreamSnap {
UpstreamSnap {
bytes_in: self
.proxy_upstream_bytes_in_total
.load(Ordering::Relaxed),
bytes_out: self
.proxy_upstream_bytes_out_total
.load(Ordering::Relaxed),
connect_errors: self
.proxy_upstream_connect_errors_total
.load(Ordering::Relaxed),
latency: std::array::from_fn(|i| {
self.proxy_upstream_latency[i].load(Ordering::Relaxed)
}),
}
}
fn rate_limit_snap(&self) -> RateLimitSnap {
RateLimitSnap {
triggers: self.rate_limit_triggers.load(Ordering::Relaxed),
active_keys: self.rate_limit_active_keys.load(Ordering::Relaxed),
}
}
fn cache_snap(&self) -> CacheSnap {
CacheSnap {
hits: self.cache_hits.load(Ordering::Relaxed),
misses: self.cache_misses.load(Ordering::Relaxed),
stores: self.cache_stores.load(Ordering::Relaxed),
bypass: self.cache_bypass.load(Ordering::Relaxed),
evictions: self.cache_evictions.load(Ordering::Relaxed),
revalidations: self.cache_revalidations.load(Ordering::Relaxed),
entries: self.cache_entries.load(Ordering::Relaxed),
bytes: self.cache_bytes.load(Ordering::Relaxed),
}
}
fn oidc_snap(&self) -> OidcSnap {
OidcSnap {
refreshes: self.oidc_refreshes.load(Ordering::Relaxed),
refresh_failures: self
.oidc_refresh_failures
.load(Ordering::Relaxed),
logouts: self.oidc_logouts.load(Ordering::Relaxed),
discoveries: self.oidc_discoveries.load(Ordering::Relaxed),
discovery_failures: self
.oidc_discovery_failures
.load(Ordering::Relaxed),
userinfo_failures: self
.oidc_userinfo_failures
.load(Ordering::Relaxed),
backchannel_logouts: self
.oidc_backchannel_logouts
.load(Ordering::Relaxed),
backchannel_failures: self
.oidc_backchannel_failures
.load(Ordering::Relaxed),
bearer_validations: self
.oidc_bearer_validations
.load(Ordering::Relaxed),
bearer_failures: self
.oidc_bearer_failures
.load(Ordering::Relaxed),
revocations: self.oidc_revocations.load(Ordering::Relaxed),
revocation_failures: self
.oidc_revocation_failures
.load(Ordering::Relaxed),
callback_iss_mismatches: self
.oidc_callback_iss_mismatches
.load(Ordering::Relaxed),
}
}
pub fn sparkline_for_period(&self, period: TimePeriod) -> SparklineData {
let (kind, n_slots) = period.archive();
let step = period.step_secs();
match kind {
0 => self.fine_sparkline(n_slots, step),
1 => self.coarse_sparkline(&self.minute, n_slots, step),
2 => self.coarse_sparkline(&self.hourly, n_slots, step),
_ => self.coarse_sparkline(&self.daily, n_slots, step),
}
}
fn fine_sparkline(&self, n: usize, step: u64) -> SparklineData {
let h = self.fine.lock().unwrap_or_else(|p| p.into_inner());
let n = n.min(FINE_SLOTS);
let req_rate = (0..n)
.map(|i| {
let idx = (h.head + FINE_SLOTS - n + i) % FINE_SLOTS;
h.req[idx] as f64 / step as f64
})
.collect();
let mem_kb = (0..n)
.map(|i| {
let idx = (h.head + FINE_SLOTS - n + i) % FINE_SLOTS;
let v = h.mem[idx];
if v == 0 { None } else { Some(v) }
})
.collect();
let cpu_pct = (0..n)
.map(|i| {
let idx = (h.head + FINE_SLOTS - n + i) % FINE_SLOTS;
cpu_pct_from_hp(h.cpu[idx])
})
.collect();
let auth_fail = (0..n)
.map(|i| {
let idx = (h.head + FINE_SLOTS - n + i) % FINE_SLOTS;
h.auth[idx] as u32
})
.collect();
let jwt_fail = (0..n)
.map(|i| {
let idx = (h.head + FINE_SLOTS - n + i) % FINE_SLOTS;
h.jwt[idx] as u32
})
.collect();
let jwt_expiry = (0..n)
.map(|i| {
let idx = (h.head + FINE_SLOTS - n + i) % FINE_SLOTS;
h.jwt_expiry[idx] as u32
})
.collect();
let jwt_issued = (0..n)
.map(|i| {
let idx = (h.head + FINE_SLOTS - n + i) % FINE_SLOTS;
h.jwt_issued[idx] as u32
})
.collect();
let err4xx = (0..n)
.map(|i| {
let idx = (h.head + FINE_SLOTS - n + i) % FINE_SLOTS;
h.err4xx[idx] as u32
})
.collect();
let err5xx = (0..n)
.map(|i| {
let idx = (h.head + FINE_SLOTS - n + i) % FINE_SLOTS;
h.err5xx[idx] as u32
})
.collect();
let active = (0..n)
.map(|i| {
let idx = (h.head + FINE_SLOTS - n + i) % FINE_SLOTS;
h.active[idx] as u32
})
.collect();
SparklineData {
step_secs: step,
req_rate,
mem_kb,
cpu_pct,
auth_fail,
jwt_fail,
jwt_expiry,
jwt_issued,
err4xx,
err5xx,
active,
}
}
fn coarse_sparkline(
&self,
archive: &Mutex<CoarseArchive>,
n: usize,
step: u64,
) -> SparklineData {
let a = archive.lock().unwrap_or_else(|p| p.into_inner());
let n = n.min(a.cap);
let req_rate = (0..n)
.map(|i| {
let idx = (a.head + a.cap - n + i) % a.cap;
a.req[idx] as f64 / step as f64
})
.collect();
let mem_kb = (0..n)
.map(|i| {
let idx = (a.head + a.cap - n + i) % a.cap;
let v = a.mem[idx];
if v == 0 { None } else { Some(v) }
})
.collect();
let cpu_pct = (0..n)
.map(|i| {
let idx = (a.head + a.cap - n + i) % a.cap;
cpu_pct_from_hp(a.cpu[idx])
})
.collect();
let auth_fail = (0..n)
.map(|i| {
let idx = (a.head + a.cap - n + i) % a.cap;
a.auth[idx] as u32
})
.collect();
let jwt_fail = (0..n)
.map(|i| {
let idx = (a.head + a.cap - n + i) % a.cap;
a.jwt[idx] as u32
})
.collect();
let jwt_expiry = (0..n)
.map(|i| {
let idx = (a.head + a.cap - n + i) % a.cap;
a.jwt_expiry[idx] as u32
})
.collect();
let jwt_issued = (0..n)
.map(|i| {
let idx = (a.head + a.cap - n + i) % a.cap;
a.jwt_issued[idx] as u32
})
.collect();
let err4xx = (0..n)
.map(|i| {
let idx = (a.head + a.cap - n + i) % a.cap;
a.err4xx[idx] as u32
})
.collect();
let err5xx = (0..n)
.map(|i| {
let idx = (a.head + a.cap - n + i) % a.cap;
a.err5xx[idx] as u32
})
.collect();
let active = (0..n)
.map(|i| {
let idx = (a.head + a.cap - n + i) % a.cap;
a.active[idx] as u32
})
.collect();
SparklineData {
step_secs: step,
req_rate,
mem_kb,
cpu_pct,
auth_fail,
jwt_fail,
jwt_expiry,
jwt_issued,
err4xx,
err5xx,
active,
}
}
pub fn paths_for_period(
&self,
period: TimePeriod,
) -> Vec<(String, u64)> {
let (kind, n_slots) = period.archive();
match kind {
0 => self.fine_paths(n_slots),
1 => self.coarse_paths(&self.minute, n_slots),
2 => self.coarse_paths(&self.hourly, n_slots),
_ => self.coarse_paths(&self.daily, n_slots),
}
}
fn fine_paths(&self, n: usize) -> Vec<(String, u64)> {
let ph = self.paths.lock().unwrap_or_else(|e| e.into_inner());
let n = n.min(FINE_SLOTS);
let mut counts: HashMap<String, u64> = HashMap::new();
for i in 0..n {
let idx = (ph.head + FINE_SLOTS - 1 - i) % FINE_SLOTS;
for (k, v) in &ph.slots[idx] {
*counts.entry(k.clone()).or_insert(0) += v;
}
}
for (k, v) in &ph.current {
*counts.entry(k.clone()).or_insert(0) += v;
}
top_n(counts.into_iter(), TOP_PATHS_LIMIT)
}
fn coarse_paths(
&self,
archive: &Mutex<CoarseArchive>,
n: usize,
) -> Vec<(String, u64)> {
let a = archive.lock().unwrap_or_else(|p| p.into_inner());
let n = n.min(a.cap);
let mut counts: HashMap<String, u64> = HashMap::new();
for i in 0..n {
let idx = (a.head + a.cap - 1 - i) % a.cap;
for (k, v) in &a.paths[idx] {
*counts.entry(k.clone()).or_insert(0) += v;
}
}
top_n(counts.into_iter(), TOP_PATHS_LIMIT)
}
pub async fn tick_loop(self: std::sync::Arc<Self>) {
let mut iv = interval(Duration::from_secs(WINDOW_SECS));
iv.tick().await; loop {
iv.tick().await;
let total = self.requests_total.load(Ordering::Relaxed);
let auth = self.auth_failures.load(Ordering::Relaxed);
let jwt = self.jwt_failures.load(Ordering::Relaxed);
let jwt_exp =
self.jwt_expiries.load(Ordering::Relaxed);
let jwt_iss = self.jwt_issued.load(Ordering::Relaxed);
let s4xx = self.status_4xx.load(Ordering::Relaxed);
let s5xx = self.status_5xx.load(Ordering::Relaxed);
let active_now =
self.requests_active.load(Ordering::Relaxed);
let mem_kb = read_memory_kb().unwrap_or(0);
let cpu_now = read_cpu_ticks().unwrap_or(0);
let (_fine_written, consolidate_minute) = {
let mut h =
self.fine.lock().unwrap_or_else(|p| p.into_inner());
let req_delta =
total.saturating_sub(h.last_total) as u32;
let auth_delta =
(auth.saturating_sub(h.last_auth) as u32)
.min(u16::MAX as u32) as u16;
let jwt_delta =
(jwt.saturating_sub(h.last_jwt) as u32)
.min(u16::MAX as u32) as u16;
let jwt_exp_delta =
(jwt_exp.saturating_sub(h.last_jwt_expiry) as u32)
.min(u16::MAX as u32) as u16;
let jwt_iss_delta =
(jwt_iss.saturating_sub(h.last_jwt_issued) as u32)
.min(u16::MAX as u32) as u16;
let err4xx_delta =
(s4xx.saturating_sub(h.last_4xx) as u32)
.min(u16::MAX as u32) as u16;
let err5xx_delta =
(s5xx.saturating_sub(h.last_5xx) as u32)
.min(u16::MAX as u32) as u16;
let active_sample =
active_now.max(0).min(u16::MAX as i64) as u16;
let cpu_delta = if h.last_cpu_ticks == 0 {
0u64
} else {
cpu_now.saturating_sub(h.last_cpu_ticks)
};
let cpu_hp = ((cpu_delta as f64 / WINDOW_SECS as f64)
.min(100.0)
* 100.0) as u16;
let head = h.head;
h.req[head] = req_delta;
h.mem[head] = mem_kb as u32;
h.cpu[head] = cpu_hp;
h.auth[head] = auth_delta;
h.jwt[head] = jwt_delta;
h.jwt_expiry[head] = jwt_exp_delta;
h.jwt_issued[head] = jwt_iss_delta;
h.err4xx[head] = err4xx_delta;
h.err5xx[head] = err5xx_delta;
h.active[head] = active_sample;
h.head = (head + 1) % FINE_SLOTS;
h.written += 1;
h.last_total = total;
h.last_auth = auth;
h.last_jwt = jwt;
h.last_jwt_expiry = jwt_exp;
h.last_jwt_issued = jwt_iss;
h.last_4xx = s4xx;
h.last_5xx = s5xx;
h.last_cpu_ticks = cpu_now;
let w = h.written;
(w, w.is_multiple_of(MINUTE_RATIO))
};
{
let mut p =
self.paths.lock().unwrap_or_else(|e| e.into_inner());
let ph = p.head;
p.slots[ph] = std::mem::take(&mut p.current)
.into_iter()
.collect();
p.head = (ph + 1) % FINE_SLOTS;
}
if !consolidate_minute {
continue;
}
let minute_written = self.consolidate_fine_to_minute();
if !minute_written.is_multiple_of(HOURLY_RATIO) {
continue;
}
let hourly_written = self.consolidate_coarse(
&self.minute,
&self.hourly,
HOURLY_RATIO as usize,
);
if !hourly_written.is_multiple_of(DAILY_RATIO) {
continue;
}
let _daily_written = self.consolidate_coarse(
&self.hourly,
&self.daily,
DAILY_RATIO as usize,
);
}
}
fn consolidate_fine_to_minute(&self) -> u64 {
let n = MINUTE_RATIO as usize;
let (
req_sum,
mem_avg,
cpu_avg,
auth_sum,
jwt_sum,
jwt_exp_sum,
jwt_iss_sum,
err4xx_sum,
err5xx_sum,
active_avg,
paths,
) = {
let h = self.fine.lock().unwrap_or_else(|p| p.into_inner());
let ph = self.paths.lock().unwrap_or_else(|p| p.into_inner());
let mut req: u64 = 0;
let mut mem: u64 = 0;
let mut cpu: u64 = 0;
let mut auth: u64 = 0;
let mut jwt: u64 = 0;
let mut jwt_exp: u64 = 0;
let mut jwt_iss: u64 = 0;
let mut e4xx: u64 = 0;
let mut e5xx: u64 = 0;
let mut active: u64 = 0;
let mut path_counts: HashMap<String, u64> = HashMap::new();
let mut mem_count: u64 = 0;
for i in 0..n {
let idx = (h.head + FINE_SLOTS - n + i) % FINE_SLOTS;
req += h.req[idx] as u64;
auth += h.auth[idx] as u64;
jwt += h.jwt[idx] as u64;
jwt_exp += h.jwt_expiry[idx] as u64;
jwt_iss += h.jwt_issued[idx] as u64;
e4xx += h.err4xx[idx] as u64;
e5xx += h.err5xx[idx] as u64;
active += h.active[idx] as u64;
if h.mem[idx] > 0 {
mem += h.mem[idx] as u64;
mem_count += 1;
cpu += h.cpu[idx] as u64;
}
let pidx =
(ph.head + FINE_SLOTS - n + i) % FINE_SLOTS;
for (k, v) in &ph.slots[pidx] {
*path_counts.entry(k.clone()).or_insert(0) += v;
}
}
let mem_avg =
mem.checked_div(mem_count).unwrap_or(0) as u32;
let cpu_avg = ((cpu.checked_div(mem_count).unwrap_or(0)
as u32)
.min(u16::MAX as u32)) as u16;
let active_avg =
((active / n as u64) as u32).min(u16::MAX as u32)
as u16;
let paths = top_n(path_counts.into_iter(), COARSE_TOP_PATHS);
(
req as u32,
mem_avg,
cpu_avg,
auth as u16,
jwt as u16,
jwt_exp as u16,
jwt_iss as u16,
e4xx as u16,
e5xx as u16,
active_avg,
paths,
)
};
let mut m =
self.minute.lock().unwrap_or_else(|p| p.into_inner());
let head = m.head;
m.req[head] = req_sum;
m.mem[head] = mem_avg;
m.cpu[head] = cpu_avg;
m.auth[head] = auth_sum;
m.jwt[head] = jwt_sum;
m.jwt_expiry[head] = jwt_exp_sum;
m.jwt_issued[head] = jwt_iss_sum;
m.err4xx[head] = err4xx_sum;
m.err5xx[head] = err5xx_sum;
m.active[head] = active_avg;
m.paths[head] = paths;
m.head = (head + 1) % m.cap;
m.written += 1;
m.written
}
fn consolidate_coarse(
&self,
src: &Mutex<CoarseArchive>,
dst: &Mutex<CoarseArchive>,
n: usize,
) -> u64 {
let (
req,
mem_avg,
cpu_avg,
auth,
jwt,
jwt_exp,
jwt_iss,
e4xx,
e5xx,
active_avg,
paths,
) = {
let s = src.lock().unwrap_or_else(|p| p.into_inner());
let n = n.min(s.cap);
let mut req: u64 = 0;
let mut mem: u64 = 0;
let mut cpu: u64 = 0;
let mut auth: u64 = 0;
let mut jwt: u64 = 0;
let mut jwt_exp: u64 = 0;
let mut jwt_iss: u64 = 0;
let mut e4xx: u64 = 0;
let mut e5xx: u64 = 0;
let mut active: u64 = 0;
let mut mem_count: u64 = 0;
let mut path_counts: HashMap<String, u64> = HashMap::new();
for i in 0..n {
let idx = (s.head + s.cap - n + i) % s.cap;
req += s.req[idx] as u64;
auth += s.auth[idx] as u64;
jwt += s.jwt[idx] as u64;
jwt_exp += s.jwt_expiry[idx] as u64;
jwt_iss += s.jwt_issued[idx] as u64;
e4xx += s.err4xx[idx] as u64;
e5xx += s.err5xx[idx] as u64;
active += s.active[idx] as u64;
if s.mem[idx] > 0 {
mem += s.mem[idx] as u64;
mem_count += 1;
cpu += s.cpu[idx] as u64;
}
for (k, v) in &s.paths[idx] {
*path_counts.entry(k.clone()).or_insert(0) += v;
}
}
let mem_avg =
mem.checked_div(mem_count).unwrap_or(0) as u32;
let cpu_avg = ((cpu.checked_div(mem_count).unwrap_or(0)
as u32)
.min(u16::MAX as u32)) as u16;
let active_avg =
((active / n as u64) as u32).min(u16::MAX as u32)
as u16;
let paths = top_n(path_counts.into_iter(), COARSE_TOP_PATHS);
(
req as u32,
mem_avg,
cpu_avg,
auth as u16,
jwt as u16,
jwt_exp as u16,
jwt_iss as u16,
e4xx as u16,
e5xx as u16,
active_avg,
paths,
)
};
let mut d = dst.lock().unwrap_or_else(|p| p.into_inner());
let head = d.head;
d.req[head] = req;
d.mem[head] = mem_avg;
d.cpu[head] = cpu_avg;
d.auth[head] = auth;
d.jwt[head] = jwt;
d.jwt_expiry[head] = jwt_exp;
d.jwt_issued[head] = jwt_iss;
d.err4xx[head] = e4xx;
d.err5xx[head] = e5xx;
d.active[head] = active_avg;
d.paths[head] = paths;
d.head = (head + 1) % d.cap;
d.written += 1;
d.written
}
}
fn top_n(
iter: impl Iterator<Item = (String, u64)>,
limit: usize,
) -> Vec<(String, u64)> {
let mut v: Vec<(String, u64)> = iter.collect();
v.sort_by_key(|b| std::cmp::Reverse(b.1));
v.truncate(limit);
v
}
#[cfg(target_os = "linux")]
fn read_memory_kb() -> Option<u64> {
let s = std::fs::read_to_string("/proc/self/status").ok()?;
for line in s.lines() {
if let Some(rest) = line.strip_prefix("VmRSS:") {
return rest.split_whitespace().next().and_then(|n| n.parse().ok());
}
}
None
}
#[cfg(not(target_os = "linux"))]
fn read_memory_kb() -> Option<u64> {
None
}
#[cfg(target_os = "linux")]
fn read_cpu_ticks() -> Option<u64> {
let s = std::fs::read_to_string("/proc/self/stat").ok()?;
let after = s.split(')').nth(1)?;
let fields: Vec<&str> = after.split_whitespace().collect();
let utime: u64 = fields.get(11)?.parse().ok()?;
let stime: u64 = fields.get(12)?.parse().ok()?;
Some(utime + stime)
}
#[cfg(not(target_os = "linux"))]
fn read_cpu_ticks() -> Option<u64> {
None
}
#[cfg(target_os = "linux")]
fn cpu_pct_from_hp(hp: u16) -> Option<f64> {
Some(hp as f64 / 100.0)
}
#[cfg(not(target_os = "linux"))]
fn cpu_pct_from_hp(_hp: u16) -> Option<f64> {
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_metrics_start_at_zero() {
let m = Metrics::new();
assert_eq!(m.requests_total.load(Ordering::Relaxed), 0);
assert_eq!(m.requests_active.load(Ordering::Relaxed), 0);
assert_eq!(m.status_2xx.load(Ordering::Relaxed), 0);
assert_eq!(m.status_5xx.load(Ordering::Relaxed), 0);
assert_eq!(m.auth_failures.load(Ordering::Relaxed), 0);
assert_eq!(m.jwt_failures.load(Ordering::Relaxed), 0);
for b in &m.latency {
assert_eq!(b.load(Ordering::Relaxed), 0);
}
}
#[test]
fn record_increments_correct_status_bucket() {
let m = Metrics::new();
m.record(200, 1);
m.record(204, 1);
m.record(301, 1);
m.record(404, 1);
m.record(503, 1);
assert_eq!(m.status_2xx.load(Ordering::Relaxed), 2);
assert_eq!(m.status_3xx.load(Ordering::Relaxed), 1);
assert_eq!(m.status_4xx.load(Ordering::Relaxed), 1);
assert_eq!(m.status_5xx.load(Ordering::Relaxed), 1);
assert_eq!(m.requests_total.load(Ordering::Relaxed), 5);
}
#[test]
fn record_increments_correct_latency_bucket() {
let m = Metrics::new();
m.record(200, 0); m.record(200, 5); m.record(200, 30); m.record(200, 100); m.record(200, 500); m.record(200, 2000); for (i, b) in m.latency.iter().enumerate() {
assert_eq!(
b.load(Ordering::Relaxed),
1,
"bucket {i} should have count 1"
);
}
}
#[test]
fn inc_dec_active_tracks_concurrency() {
let m = Metrics::new();
m.inc_active();
m.inc_active();
m.inc_active();
assert_eq!(m.requests_active.load(Ordering::Relaxed), 3);
m.dec_active();
assert_eq!(m.requests_active.load(Ordering::Relaxed), 2);
}
#[test]
fn rate_is_zero_before_any_tick() {
let m = Metrics::new();
m.record(200, 1);
let snap = m.snapshot();
assert_eq!(snap.rate_1min, 0.0);
assert_eq!(snap.rate_5min, 0.0);
assert_eq!(snap.rate_15min, 0.0);
assert!(snap.rate_current > 0.0);
}
#[test]
fn tick_advances_ring_buffer() {
let m = Metrics::new();
for _ in 0..5 {
m.record(200, 1);
}
let total = m.requests_total.load(Ordering::Relaxed);
{
let mut h = m.fine.lock().unwrap();
let delta = total.saturating_sub(h.last_total);
let head = h.head;
h.req[head] = delta as u32;
h.head = (head + 1) % FINE_SLOTS;
h.written += 1;
h.last_total = total;
}
let snap = m.snapshot();
let expected = 5.0 / (12.0 * WINDOW_SECS as f64);
assert!(
(snap.rate_1min - expected).abs() < 0.001,
"rate_1min={} expected={}",
snap.rate_1min,
expected
);
}
#[test]
fn uptime_human_formats_correctly() {
let snap = |secs: u64| -> String {
Snapshot {
uptime: Duration::from_secs(secs),
requests_total: 0,
requests_active: 0,
status_2xx: 0,
status_3xx: 0,
status_4xx: 0,
status_5xx: 0,
latency: [0; 6],
rate_current: 0.0,
rate_1min: 0.0,
rate_5min: 0.0,
rate_15min: 0.0,
memory_kb: None,
cpu_percent: None,
auth_failures_total: 0,
jwt_failures_total: 0,
jwt_expiries_total: 0,
jwt_issued_total: 0,
auth_fail_1h: 0,
jwt_fail_1h: 0,
jwt_expiry_1h: 0,
jwt_issued_1h: 0,
quic_handshakes_total: 0,
quic_handshake_failures_total: 0,
quic_connections_active: 0,
quic_requests_total: 0,
quic_outbound_handshakes_total: 0,
..Default::default()
}
.uptime_human()
};
assert_eq!(snap(30), "30s");
assert_eq!(snap(90), "1m 30s");
assert_eq!(snap(3661), "1h 1m 1s");
assert_eq!(snap(86400 + 3661), "1d 1h 1m");
}
#[test]
fn sparkline_for_period_returns_correct_length() {
let m = Metrics::new();
let sd = m.sparkline_for_period(TimePeriod::Min15);
assert_eq!(sd.req_rate.len(), 180);
assert_eq!(sd.mem_kb.len(), 180);
assert_eq!(sd.cpu_pct.len(), 180);
assert_eq!(sd.auth_fail.len(), 180);
assert_eq!(sd.jwt_fail.len(), 180);
let sd5 = m.sparkline_for_period(TimePeriod::Min5);
assert_eq!(sd5.req_rate.len(), 60);
}
#[test]
fn sparkline_step_secs_matches_archive() {
assert_eq!(TimePeriod::Min5.step_secs(), WINDOW_SECS);
assert_eq!(
TimePeriod::Hr3.step_secs(),
WINDOW_SECS * MINUTE_RATIO
);
assert_eq!(
TimePeriod::Day7.step_secs(),
WINDOW_SECS * MINUTE_RATIO * HOURLY_RATIO
);
}
#[test]
fn record_path_appears_in_paths_for_period() {
let m = Metrics::new();
m.record_path("/foo");
m.record_path("/foo");
m.record_path("/bar");
let paths = m.paths_for_period(TimePeriod::Min5);
assert!(
paths.iter().any(|(p, c)| p == "/foo" && *c == 2),
"expected /foo × 2 in paths"
);
assert!(
paths.iter().any(|(p, c)| p == "/bar" && *c == 1),
"expected /bar × 1 in paths"
);
}
#[test]
fn sparkline_req_rate_reflects_ticked_data() {
let m = Metrics::new();
for _ in 0..10 {
m.record(200, 1);
}
let total = m.requests_total.load(Ordering::Relaxed);
{
let mut h = m.fine.lock().unwrap();
let delta = total.saturating_sub(h.last_total) as u32;
let head = h.head;
h.req[head] = delta;
h.head = (head + 1) % FINE_SLOTS;
h.written += 1;
h.last_total = total;
}
let sd = m.sparkline_for_period(TimePeriod::Min5);
let last = *sd.req_rate.last().unwrap();
assert!(
(last - 2.0).abs() < 0.01,
"expected ~2.0 req/s, got {last}"
);
}
#[test]
fn auth_failure_counter_increments() {
let m = Metrics::new();
m.auth_failures.fetch_add(3, Ordering::Relaxed);
m.jwt_failures.fetch_add(1, Ordering::Relaxed);
let snap = m.snapshot();
assert_eq!(snap.auth_failures_total, 3);
assert_eq!(snap.jwt_failures_total, 1);
}
#[test]
fn time_period_roundtrip() {
let pairs = [
("5min", TimePeriod::Min5),
("15min", TimePeriod::Min15),
("1h", TimePeriod::Min60),
("3h", TimePeriod::Hr3),
("1d", TimePeriod::Day1),
("7d", TimePeriod::Day7),
("1y", TimePeriod::Month12),
];
for (s, p) in pairs {
assert_eq!(TimePeriod::from_query(s), p, "from_query({s})");
assert_eq!(TimePeriod::from_query(p.as_str()), p, "roundtrip {s}");
}
}
#[test]
fn consolidation_writes_minute_slot() {
let m = Metrics::new();
for i in 0u32..12 {
let mut h = m.fine.lock().unwrap();
let head = h.head;
h.req[head] = i + 1;
h.head = (head + 1) % FINE_SLOTS;
h.written += 1;
}
{
let mut p = m.paths.lock().unwrap();
for i in 0..12 {
let ph = p.head;
p.slots[ph].insert("/test".to_owned(), (i + 1) as u64);
p.head = (ph + 1) % FINE_SLOTS;
}
}
m.consolidate_fine_to_minute();
let minute = m.minute.lock().unwrap();
let idx = (minute.head + minute.cap - 1) % minute.cap;
assert_eq!(minute.req[idx], 78, "consolidated req sum");
assert!(
minute.paths[idx].iter().any(|(p, c)| p == "/test" && *c == 78),
"consolidated path count"
);
}
#[test]
fn snapshot_surfaces_stream_counters() {
let m = Metrics::new();
m.stream_conns_total.fetch_add(3, Ordering::Relaxed);
m.stream_conns_active.fetch_add(2, Ordering::Relaxed);
m.stream_bytes_in_total.fetch_add(100, Ordering::Relaxed);
m.stream_bytes_out_total.fetch_add(200, Ordering::Relaxed);
let s = m.snapshot();
assert_eq!(s.stream.conns_total, 3);
assert_eq!(s.stream.conns_active, 2);
assert_eq!(s.stream.bytes_in, 100);
assert_eq!(s.stream.bytes_out, 200);
}
#[test]
fn snapshot_surfaces_datagram_counters() {
let m = Metrics::new();
m.datagram_flows_active.fetch_add(5, Ordering::Relaxed);
m.bytes_in_total.fetch_add(42, Ordering::Relaxed);
let s = m.snapshot();
assert_eq!(s.datagram.flows_active, 5);
assert_eq!(s.datagram.bytes_in, 42);
}
#[test]
fn snapshot_surfaces_lb_and_upstream_counters() {
let m = Metrics::new();
m.proxy_lb_picks.fetch_add(7, Ordering::Relaxed);
m.proxy_lb_health_checks_total.fetch_add(4, Ordering::Relaxed);
m.proxy_upstream_connect_errors_total
.fetch_add(1, Ordering::Relaxed);
m.record_proxy_upstream_latency(5); let s = m.snapshot();
assert_eq!(s.lb.picks, 7);
assert_eq!(s.lb.health_checks, 4);
assert_eq!(s.upstream.connect_errors, 1);
assert_eq!(s.upstream.latency[1], 1);
}
#[test]
fn snapshot_surfaces_compression_tls_geoip() {
let m = Metrics::new();
m.compress_responses_total.fetch_add(2, Ordering::Relaxed);
m.compress_zstd_total.fetch_add(2, Ordering::Relaxed);
m.tls_handshakes_total.fetch_add(9, Ordering::Relaxed);
m.geoip_lookups_total.fetch_add(3, Ordering::Relaxed);
m.geoip_lookup_misses_total.fetch_add(1, Ordering::Relaxed);
let s = m.snapshot();
assert_eq!(s.compression.responses, 2);
assert_eq!(s.compression.zstd, 2);
assert_eq!(s.tls.handshakes, 9);
assert_eq!(s.geoip.lookups, 3);
assert_eq!(s.geoip.misses, 1);
}
#[test]
fn snapshot_surfaces_oidc_and_backends() {
let m = Metrics::new();
m.oidc_bearer_validations.fetch_add(6, Ordering::Relaxed);
m.fcgi_requests_total.fetch_add(2, Ordering::Relaxed);
m.cgi_spawn_failures_total.fetch_add(1, Ordering::Relaxed);
m.static_bytes_served_total.fetch_add(4096, Ordering::Relaxed);
let s = m.snapshot();
assert_eq!(s.oidc.bearer_validations, 6);
assert_eq!(s.fcgi.requests, 2);
assert_eq!(s.cgi.spawn_failures, 1);
assert_eq!(s.static_files.bytes_served, 4096);
}
#[test]
fn record_class_aggregates_by_handler_and_vhost() {
let m = Metrics::new();
m.record_class(HandlerKind::Proxy, "example.com", 200);
m.record_class(HandlerKind::Proxy, "example.com", 502);
m.record_class(HandlerKind::Static, "other.com", 404);
let s = m.snapshot();
let proxy = s
.by_handler
.iter()
.find(|(n, _)| *n == "proxy")
.map(|(_, c)| *c)
.unwrap();
assert_eq!(proxy.total, 2);
assert_eq!(proxy.s2xx, 1);
assert_eq!(proxy.s5xx, 1);
let ex = s
.by_vhost
.iter()
.find(|(n, _)| n == "example.com")
.map(|(_, c)| *c)
.unwrap();
assert_eq!(ex.total, 2);
assert_eq!(ex.s4xx, 0);
}
}