use std::time::Duration;
pub use file::{Exporter, FileExporter};
#[cfg(feature = "prometheus-exporter")]
use std::collections::HashMap;
#[cfg(feature = "prometheus-exporter")]
use std::fmt::Write as _;
#[cfg(feature = "prometheus-exporter")]
use std::io::{self, ErrorKind, Read, Write as IoWrite};
#[cfg(feature = "prometheus-exporter")]
use std::net::{IpAddr, Shutdown, SocketAddr, TcpListener, TcpStream};
#[cfg(feature = "prometheus-exporter")]
use std::time::{Instant, SystemTime, UNIX_EPOCH};
#[cfg(feature = "prometheus-exporter")]
use varta_vlp::crypto::BearerToken;
#[cfg(feature = "prometheus-exporter")]
use varta_vlp::DecodeError;
#[cfg(feature = "prometheus-exporter")]
use varta_vlp::Status;
#[cfg(feature = "prometheus-exporter")]
use crate::ip_state_table::{IpStateTable, LastSeen};
#[cfg(feature = "prometheus-exporter")]
use crate::log_ratelimit::{LogKind, LOG_RATE_LIMITER};
#[cfg(feature = "prometheus-exporter")]
use crate::observer::Event;
#[cfg(feature = "prometheus-exporter")]
const LOG_KIND_LABELS: [&str; LogKind::COUNT] = [
"file_export_io",
"audit_io",
"prom_serve",
"heartbeat_io",
"audit_ring_warn",
"audit_ring_critical",
];
#[cfg(feature = "prometheus-exporter")]
const DECODE_KIND_LABELS: [&str; 8] = [
"bad_magic",
"bad_version",
"bad_status",
"bad_pid",
"bad_timestamp",
"bad_nonce",
"stall_on_wire",
"bad_crc",
];
#[cfg(feature = "prometheus-exporter")]
fn decode_kind_index(err: &DecodeError) -> usize {
match err {
DecodeError::BadMagic => 0,
DecodeError::BadVersion => 1,
DecodeError::BadStatus(_) => 2,
DecodeError::BadPid(_) => 3,
DecodeError::BadTimestamp(_) => 4,
DecodeError::BadNonce { .. } => 5,
DecodeError::StallOnWire => 6,
DecodeError::BadCrc { .. } => 7,
}
}
#[cfg(feature = "prometheus-exporter")]
#[derive(Clone, Copy, Debug)]
struct GaugeRow {
beats_total: u64,
stalls_total: u64,
last_status: Option<u8>,
}
#[cfg(feature = "prometheus-exporter")]
impl GaugeRow {
const fn new() -> Self {
GaugeRow {
beats_total: 0,
stalls_total: 0,
last_status: None,
}
}
}
#[cfg(feature = "prometheus-exporter")]
const PROM_READ_DEADLINE: Duration = Duration::from_millis(10);
#[cfg(feature = "prometheus-exporter")]
const PROM_WRITE_TIMEOUT: Duration = Duration::from_millis(50);
#[cfg(feature = "prometheus-exporter")]
const PROM_MAX_CONNECTIONS_PER_SERVE: usize = 8;
#[cfg(feature = "prometheus-exporter")]
const PROM_MAX_DRAIN_PER_SERVE: usize = 50;
#[cfg(feature = "prometheus-exporter")]
const ITERATION_BUCKET_BOUNDS_S: [f64; 8] =
[0.001, 0.005, 0.010, 0.050, 0.100, 0.250, 0.500, 1.000];
#[cfg(feature = "prometheus-exporter")]
#[derive(Clone, Copy)]
pub enum IterStage {
DrainPending = 0,
Poll = 1,
Maintenance = 2,
RecoveryReap = 3,
ServePending = 4,
Housekeeping = 5,
}
#[cfg(feature = "prometheus-exporter")]
pub const STAGE_LABELS: [&str; 6] = [
"drain_pending",
"poll",
"maintenance",
"recovery_reap",
"serve_pending",
"housekeeping",
];
pub const DEFAULT_ITERATION_BUDGET: Duration = Duration::from_millis(250);
pub const DEFAULT_SCRAPE_BUDGET: Duration = Duration::from_millis(250);
#[cfg(feature = "prometheus-exporter")]
const PROM_REQUEST_CAP: usize = 4096;
#[cfg(feature = "prometheus-exporter")]
const PROM_MIN_SCRAPE_INTERVAL: Duration = Duration::from_secs(1);
#[cfg(feature = "prometheus-exporter")]
const MAX_PROM_IP_STATES: usize = 1024;
#[cfg(feature = "prometheus-exporter")]
const PROM_IP_STATE_TTL: Duration = Duration::from_secs(60);
#[cfg(feature = "prometheus-exporter")]
const PROM_IP_STATE_SWEEP_INTERVAL: Duration = Duration::from_secs(10);
#[cfg(feature = "prometheus-exporter")]
#[derive(Clone, Copy, Debug)]
struct PromIpState {
tokens_milli: u32,
last_refill: Instant,
last_seen: Instant,
}
#[cfg(feature = "prometheus-exporter")]
impl LastSeen for PromIpState {
fn last_seen(&self) -> Instant {
self.last_seen
}
}
#[cfg(feature = "prometheus-exporter")]
const DROP_REASON_LABELS: [&str; 3] = ["drain", "rate_limit", "ip_table_full"];
#[cfg(feature = "prometheus-exporter")]
const RECOVERY_OUTCOME_LABELS: [&str; 11] = [
"spawned",
"debounced",
"reaped_zero",
"reaped_nonzero",
"killed",
"spawn_failed",
"refused_unauthenticated_transport",
"refused_cross_namespace",
"refused_debounce_capacity",
"refused_outstanding_capacity",
"refused_socket_mode_only",
];
#[cfg(feature = "prometheus-exporter")]
const RECOVERY_REFUSED_REASON_LABELS: [&str; 5] = [
"unauthenticated_transport",
"cross_namespace_agent",
"debounce_capacity",
"outstanding_capacity",
"socket_mode_only",
];
#[cfg(feature = "prometheus-exporter")]
fn recovery_outcome_index(outcome: &crate::recovery::RecoveryOutcome) -> usize {
use crate::recovery::RecoveryOutcome;
match outcome {
RecoveryOutcome::Spawned { .. } => 0,
RecoveryOutcome::Debounced => 1,
RecoveryOutcome::Reaped { status, .. } => {
if status.success() {
2
} else {
3
}
}
RecoveryOutcome::Killed { .. } => 4,
RecoveryOutcome::SpawnFailed(_) => 5,
RecoveryOutcome::RefusedUnauthenticatedSource { .. } => 6,
RecoveryOutcome::RefusedCrossNamespace { .. } => 7,
RecoveryOutcome::RefusedDebounceCapacity { .. } => 8,
RecoveryOutcome::RefusedOutstandingCapacity { .. } => 9,
RecoveryOutcome::RefusedSocketModeOnly { .. } => 10,
RecoveryOutcome::ReapFailed(_) => 3,
}
}
#[cfg(feature = "prometheus-exporter")]
#[derive(Clone, Copy, Debug)]
enum RefusedReason {
UnauthenticatedTransport,
CrossNamespaceAgent,
DebounceCapacity,
OutstandingCapacity,
SocketModeOnly,
}
#[cfg(feature = "prometheus-exporter")]
fn refused_reason_index(r: RefusedReason) -> usize {
match r {
RefusedReason::UnauthenticatedTransport => 0,
RefusedReason::CrossNamespaceAgent => 1,
RefusedReason::DebounceCapacity => 2,
RefusedReason::OutstandingCapacity => 3,
RefusedReason::SocketModeOnly => 4,
}
}
#[cfg(feature = "prometheus-exporter")]
#[derive(Clone, Copy, Debug)]
enum DropReason {
Drain,
RateLimit,
IpTableFull,
}
#[cfg(feature = "prometheus-exporter")]
fn drop_reason_index(r: DropReason) -> usize {
match r {
DropReason::Drain => 0,
DropReason::RateLimit => 1,
DropReason::IpTableFull => 2,
}
}
#[cfg(feature = "prometheus-exporter")]
pub struct PromExporter {
listener: TcpListener,
rows: HashMap<u32, GaugeRow>,
body_buf: String,
last_scrape: Option<Instant>,
evicted_total: u64,
auth_failures_total: u64,
token: BearerToken,
decode_errors_total: [u64; DECODE_KIND_LABELS.len()],
io_errors_total: u64,
ctrl_truncated_total: u64,
capacity_exceeded_total: u64,
decrypt_failures_total: u64,
truncated_total: u64,
sender_state_full_total: u64,
secure_aead_attempts_total: u64,
rate_limited_total: [u64; 2],
uds_rcvbuf_bytes: u32,
pid_max_current: u32,
clock_regressions_total: u64,
clock_jumps_forward_total: u64,
nonce_wrap_total: u64,
eviction_scan_truncated_total: u64,
tracker_capacity_cfg: usize,
eviction_scan_window_max: usize,
recovery_outcomes_total: [u64; RECOVERY_OUTCOME_LABELS.len()],
recovery_refused_total: [u64; RECOVERY_REFUSED_REASON_LABELS.len()],
recovery_last_fired_evictions_total: u64,
recovery_invariant_violations_total: u64,
origin_conflict_total: u64,
frame_namespace_mismatch_total: u64,
frame_rejected_pid_above_max_total: u64,
tracker_namespace_conflict_total: u64,
tracker_invariant_violations_total: u64,
tracker_pid_index_probe_exhausted_total: u64,
recovery_outstanding_probe_exhausted_total: u64,
recovery_reap_truncated_total: u64,
prom_ip_state_probe_exhausted_total: u64,
recovery_duration_ns_sum: u64,
recovery_duration_count_total: u64,
scrape_skipped_total: u64,
scrape_budget_exhausted_total: u64,
iteration_buckets: [u64; ITERATION_BUCKET_BOUNDS_S.len() + 1],
iteration_duration_ns_sum: u64,
iteration_count_total: u64,
iteration_budget_exceeded_total: u64,
iteration_budget: Duration,
serve_pending_buckets: [u64; ITERATION_BUCKET_BOUNDS_S.len() + 1],
serve_pending_duration_ns_sum: u64,
serve_pending_count_total: u64,
scrape_budget_exceeded_total: u64,
stage_buckets: [[u64; ITERATION_BUCKET_BOUNDS_S.len() + 1]; STAGE_LABELS.len()],
stage_duration_ns_sum: [u64; STAGE_LABELS.len()],
stage_count_total: [u64; STAGE_LABELS.len()],
audit_dropped_total: u64,
audit_flush_budget_exceeded_total: u64,
audit_fsync_buckets: [u64; ITERATION_BUCKET_BOUNDS_S.len() + 1],
audit_fsync_duration_ns_sum: u64,
audit_fsync_count_total: u64,
bind_dir_fsync_failed_total: u64,
audit_fsync_budget_exceeded_total: u64,
audit_rotation_budget_exceeded_total: u64,
audit_ring_watermark_total: [u64; 2],
scrape_budget: Duration,
ip_state: IpStateTable<PromIpState>,
rate_per_sec: u32,
rate_burst: u32,
last_ip_sweep: Instant,
connections_dropped_total: [u64; DROP_REASON_LABELS.len()],
started_at: Instant,
last_loop_system: SystemTime,
signal_handler_mode: &'static str,
}
#[cfg(feature = "prometheus-exporter")]
impl PromExporter {
pub fn bind(addr: SocketAddr, token: BearerToken) -> io::Result<Self> {
Self::bind_with_rate_limit(
addr,
token,
crate::config::DEFAULT_PROM_RATE_LIMIT_PER_SEC,
crate::config::DEFAULT_PROM_RATE_LIMIT_BURST,
)
}
pub fn bind_with_rate_limit(
addr: SocketAddr,
token: BearerToken,
rate_per_sec: u32,
rate_burst: u32,
) -> io::Result<Self> {
let listener = TcpListener::bind(addr)?;
listener.set_nonblocking(true)?;
let now = Instant::now();
Ok(PromExporter {
listener,
rows: HashMap::new(),
body_buf: String::new(),
last_scrape: None,
evicted_total: 0,
auth_failures_total: 0,
token,
decode_errors_total: [0; DECODE_KIND_LABELS.len()],
io_errors_total: 0,
ctrl_truncated_total: 0,
capacity_exceeded_total: 0,
decrypt_failures_total: 0,
truncated_total: 0,
sender_state_full_total: 0,
secure_aead_attempts_total: 0,
rate_limited_total: [0; 2],
uds_rcvbuf_bytes: 0,
pid_max_current: 0,
clock_regressions_total: 0,
clock_jumps_forward_total: 0,
nonce_wrap_total: 0,
eviction_scan_truncated_total: 0,
tracker_capacity_cfg: 0,
eviction_scan_window_max: 0,
recovery_outcomes_total: [0; RECOVERY_OUTCOME_LABELS.len()],
recovery_refused_total: [0; RECOVERY_REFUSED_REASON_LABELS.len()],
recovery_last_fired_evictions_total: 0,
recovery_invariant_violations_total: 0,
origin_conflict_total: 0,
frame_namespace_mismatch_total: 0,
frame_rejected_pid_above_max_total: 0,
tracker_namespace_conflict_total: 0,
tracker_invariant_violations_total: 0,
tracker_pid_index_probe_exhausted_total: 0,
recovery_outstanding_probe_exhausted_total: 0,
recovery_reap_truncated_total: 0,
prom_ip_state_probe_exhausted_total: 0,
recovery_duration_ns_sum: 0,
recovery_duration_count_total: 0,
scrape_skipped_total: 0,
scrape_budget_exhausted_total: 0,
iteration_buckets: [0; ITERATION_BUCKET_BOUNDS_S.len() + 1],
iteration_duration_ns_sum: 0,
iteration_count_total: 0,
iteration_budget_exceeded_total: 0,
iteration_budget: DEFAULT_ITERATION_BUDGET,
serve_pending_buckets: [0; ITERATION_BUCKET_BOUNDS_S.len() + 1],
serve_pending_duration_ns_sum: 0,
serve_pending_count_total: 0,
scrape_budget_exceeded_total: 0,
stage_buckets: [[0; ITERATION_BUCKET_BOUNDS_S.len() + 1]; STAGE_LABELS.len()],
stage_duration_ns_sum: [0; STAGE_LABELS.len()],
stage_count_total: [0; STAGE_LABELS.len()],
audit_dropped_total: 0,
audit_flush_budget_exceeded_total: 0,
audit_fsync_buckets: [0; ITERATION_BUCKET_BOUNDS_S.len() + 1],
audit_fsync_duration_ns_sum: 0,
audit_fsync_count_total: 0,
bind_dir_fsync_failed_total: 0,
audit_fsync_budget_exceeded_total: 0,
audit_rotation_budget_exceeded_total: 0,
audit_ring_watermark_total: [0; 2],
scrape_budget: DEFAULT_SCRAPE_BUDGET,
ip_state: IpStateTable::with_capacity(MAX_PROM_IP_STATES),
rate_per_sec,
rate_burst,
last_ip_sweep: now,
connections_dropped_total: [0; DROP_REASON_LABELS.len()],
started_at: now,
last_loop_system: SystemTime::now(),
signal_handler_mode: "direct",
})
}
fn allow_ip(&mut self, ip: IpAddr, now: Instant) -> bool {
if self.rate_burst == 0 {
return true;
}
let cap_milli: u32 = self.rate_burst.saturating_mul(1000);
let refill_per_ms: u32 = self.rate_per_sec;
if now.duration_since(self.last_ip_sweep) >= PROM_IP_STATE_SWEEP_INTERVAL {
self.last_ip_sweep = now;
self.ip_state.evict_older_than(now, PROM_IP_STATE_TTL);
}
match self.ip_state.get_mut(ip) {
Some(st) => {
let elapsed_ms = now.duration_since(st.last_refill).as_millis() as u64;
if elapsed_ms > 0 {
let add_milli =
(elapsed_ms as u128 * refill_per_ms as u128).min(u32::MAX as u128) as u32;
st.tokens_milli = st.tokens_milli.saturating_add(add_milli).min(cap_milli);
st.last_refill = now;
}
st.last_seen = now;
if st.tokens_milli >= 1000 {
st.tokens_milli -= 1000;
true
} else {
self.connections_dropped_total[drop_reason_index(DropReason::RateLimit)] = self
.connections_dropped_total[drop_reason_index(DropReason::RateLimit)]
.saturating_add(1);
false
}
}
None => {
if self.ip_state.len() >= MAX_PROM_IP_STATES {
self.ip_state.evict_older_than(now, PROM_IP_STATE_TTL);
}
if self.ip_state.len() >= MAX_PROM_IP_STATES {
if let Some(oldest_ip) = self.ip_state.oldest_ip() {
self.ip_state.remove(oldest_ip);
}
self.connections_dropped_total[drop_reason_index(DropReason::IpTableFull)] =
self.connections_dropped_total[drop_reason_index(DropReason::IpTableFull)]
.saturating_add(1);
}
let tokens_milli = cap_milli.saturating_sub(1000);
let _ = self.ip_state.insert(
ip,
PromIpState {
tokens_milli,
last_refill: now,
last_seen: now,
},
);
true
}
}
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.listener.local_addr()
}
pub fn record_eviction(&mut self, count: u64) {
self.evicted_total = self.evicted_total.saturating_add(count);
}
pub fn record_evicted_pid(&mut self, pid: u32) {
self.rows.remove(&pid);
}
pub fn record_capacity_exceeded(&mut self, count: u64) {
self.capacity_exceeded_total = self.capacity_exceeded_total.saturating_add(count);
}
pub fn record_decrypt_failures(&mut self, count: u64) {
self.decrypt_failures_total = self.decrypt_failures_total.saturating_add(count);
}
pub fn record_truncated(&mut self, count: u64) {
self.truncated_total = self.truncated_total.saturating_add(count);
}
pub fn record_sender_state_full(&mut self, count: u64) {
self.sender_state_full_total = self.sender_state_full_total.saturating_add(count);
}
pub fn record_secure_aead_attempts(&mut self, count: u64) {
self.secure_aead_attempts_total = self.secure_aead_attempts_total.saturating_add(count);
}
pub fn record_per_pid_rate_limited(&mut self, count: u64) {
self.rate_limited_total[0] = self.rate_limited_total[0].saturating_add(count);
}
pub fn record_global_rate_limited(&mut self, count: u64) {
self.rate_limited_total[1] = self.rate_limited_total[1].saturating_add(count);
}
pub fn set_uds_rcvbuf_bytes(&mut self, bytes: u32) {
self.uds_rcvbuf_bytes = bytes;
}
pub fn set_pid_max_current(&mut self, value: u32) {
self.pid_max_current = value;
}
pub fn record_clock_regressions(&mut self, count: u64) {
self.clock_regressions_total = self.clock_regressions_total.saturating_add(count);
}
pub fn record_clock_jumps_forward(&mut self, count: u64) {
self.clock_jumps_forward_total = self.clock_jumps_forward_total.saturating_add(count);
}
pub fn record_nonce_wraps(&mut self, count: u64) {
self.nonce_wrap_total = self.nonce_wrap_total.saturating_add(count);
}
pub fn record_eviction_scan_truncated(&mut self, count: u64) {
self.eviction_scan_truncated_total =
self.eviction_scan_truncated_total.saturating_add(count);
}
pub fn set_signal_handler_mode(&mut self, mode: &'static str) {
self.signal_handler_mode = mode;
}
pub fn set_tracker_config(&mut self, capacity: usize, eviction_scan_window: usize) {
self.tracker_capacity_cfg = capacity;
self.eviction_scan_window_max = eviction_scan_window;
}
pub fn record_recovery_outcome(
&mut self,
outcome: &crate::recovery::RecoveryOutcome,
duration_ns: Option<u64>,
) {
let idx = recovery_outcome_index(outcome);
self.recovery_outcomes_total[idx] = self.recovery_outcomes_total[idx].saturating_add(1);
match outcome {
crate::recovery::RecoveryOutcome::RefusedUnauthenticatedSource { .. } => {
let r_idx = refused_reason_index(RefusedReason::UnauthenticatedTransport);
self.recovery_refused_total[r_idx] =
self.recovery_refused_total[r_idx].saturating_add(1);
}
crate::recovery::RecoveryOutcome::RefusedCrossNamespace { .. } => {
let r_idx = refused_reason_index(RefusedReason::CrossNamespaceAgent);
self.recovery_refused_total[r_idx] =
self.recovery_refused_total[r_idx].saturating_add(1);
}
crate::recovery::RecoveryOutcome::RefusedDebounceCapacity { .. } => {
let r_idx = refused_reason_index(RefusedReason::DebounceCapacity);
self.recovery_refused_total[r_idx] =
self.recovery_refused_total[r_idx].saturating_add(1);
}
crate::recovery::RecoveryOutcome::RefusedOutstandingCapacity { .. } => {
let r_idx = refused_reason_index(RefusedReason::OutstandingCapacity);
self.recovery_refused_total[r_idx] =
self.recovery_refused_total[r_idx].saturating_add(1);
}
crate::recovery::RecoveryOutcome::RefusedSocketModeOnly { .. } => {
let r_idx = refused_reason_index(RefusedReason::SocketModeOnly);
self.recovery_refused_total[r_idx] =
self.recovery_refused_total[r_idx].saturating_add(1);
}
_ => {}
}
if let Some(d) = duration_ns {
self.recovery_duration_ns_sum = self.recovery_duration_ns_sum.saturating_add(d);
self.recovery_duration_count_total =
self.recovery_duration_count_total.saturating_add(1);
}
}
pub fn record_origin_conflicts(&mut self, count: u64) {
self.origin_conflict_total = self.origin_conflict_total.saturating_add(count);
}
pub fn record_frame_namespace_mismatches(&mut self, count: u64) {
self.frame_namespace_mismatch_total =
self.frame_namespace_mismatch_total.saturating_add(count);
}
pub fn record_pid_above_max_drops(&mut self, count: u64) {
self.frame_rejected_pid_above_max_total = self
.frame_rejected_pid_above_max_total
.saturating_add(count);
}
pub fn record_tracker_namespace_conflicts(&mut self, count: u64) {
self.tracker_namespace_conflict_total =
self.tracker_namespace_conflict_total.saturating_add(count);
}
pub fn record_tracker_invariant_violations(&mut self, count: u64) {
self.tracker_invariant_violations_total = self
.tracker_invariant_violations_total
.saturating_add(count);
}
pub fn record_recovery_last_fired_evictions(&mut self, count: u64) {
self.recovery_last_fired_evictions_total = self
.recovery_last_fired_evictions_total
.saturating_add(count);
}
pub fn record_recovery_invariant_violations(&mut self, count: u64) {
self.recovery_invariant_violations_total = self
.recovery_invariant_violations_total
.saturating_add(count);
}
pub fn record_tracker_pid_index_probe_exhausted(&mut self, count: u64) {
self.tracker_pid_index_probe_exhausted_total = self
.tracker_pid_index_probe_exhausted_total
.saturating_add(count);
}
pub fn record_recovery_outstanding_probe_exhausted(&mut self, count: u64) {
self.recovery_outstanding_probe_exhausted_total = self
.recovery_outstanding_probe_exhausted_total
.saturating_add(count);
}
pub fn record_recovery_reap_truncated(&mut self, count: u64) {
self.recovery_reap_truncated_total =
self.recovery_reap_truncated_total.saturating_add(count);
}
pub fn record_audit_dropped(&mut self, count: u64) {
self.audit_dropped_total = self.audit_dropped_total.saturating_add(count);
}
pub fn record_audit_flush_budget_exceeded(&mut self, count: u64) {
self.audit_flush_budget_exceeded_total =
self.audit_flush_budget_exceeded_total.saturating_add(count);
}
pub fn record_audit_fsync_duration(&mut self, d: Duration) {
let secs = d.as_secs_f64();
let ns = u64::try_from(d.as_nanos()).unwrap_or(u64::MAX);
self.audit_fsync_duration_ns_sum = self.audit_fsync_duration_ns_sum.saturating_add(ns);
self.audit_fsync_count_total = self.audit_fsync_count_total.saturating_add(1);
let mut placed = false;
for (i, &bound) in ITERATION_BUCKET_BOUNDS_S.iter().enumerate() {
if secs <= bound {
self.audit_fsync_buckets[i] = self.audit_fsync_buckets[i].saturating_add(1);
placed = true;
break;
}
}
if !placed {
let inf_idx = ITERATION_BUCKET_BOUNDS_S.len();
self.audit_fsync_buckets[inf_idx] = self.audit_fsync_buckets[inf_idx].saturating_add(1);
}
}
pub fn record_bind_dir_fsync_failed(&mut self, count: u64) {
self.bind_dir_fsync_failed_total = self.bind_dir_fsync_failed_total.saturating_add(count);
}
pub fn record_audit_fsync_budget_exceeded(&mut self, count: u64) {
self.audit_fsync_budget_exceeded_total =
self.audit_fsync_budget_exceeded_total.saturating_add(count);
}
pub fn record_audit_rotation_budget_exceeded(&mut self, count: u64) {
self.audit_rotation_budget_exceeded_total = self
.audit_rotation_budget_exceeded_total
.saturating_add(count);
}
pub fn record_audit_ring_watermark(&mut self, level: &str, count: u64) {
let idx = match level {
"warn" => 0,
"critical" => 1,
_ => return,
};
self.audit_ring_watermark_total[idx] =
self.audit_ring_watermark_total[idx].saturating_add(count);
}
pub fn record_scrape_skipped(&mut self, count: u64) {
self.scrape_skipped_total = self.scrape_skipped_total.saturating_add(count);
}
pub fn record_loop_tick(&mut self) {
self.last_loop_system = SystemTime::now();
}
pub fn with_iteration_budget(mut self, budget: Duration) -> Self {
self.iteration_budget = budget;
self
}
pub fn with_scrape_budget(mut self, budget: Duration) -> Self {
self.scrape_budget = budget;
self
}
pub fn record_serve_pending_duration(&mut self, d: Duration) {
let secs = d.as_secs_f64();
let ns = u64::try_from(d.as_nanos()).unwrap_or(u64::MAX);
self.serve_pending_duration_ns_sum = self.serve_pending_duration_ns_sum.saturating_add(ns);
self.serve_pending_count_total = self.serve_pending_count_total.saturating_add(1);
let mut placed = false;
for (i, &bound) in ITERATION_BUCKET_BOUNDS_S.iter().enumerate() {
if secs <= bound {
self.serve_pending_buckets[i] = self.serve_pending_buckets[i].saturating_add(1);
placed = true;
break;
}
}
if !placed {
let inf_idx = ITERATION_BUCKET_BOUNDS_S.len();
self.serve_pending_buckets[inf_idx] =
self.serve_pending_buckets[inf_idx].saturating_add(1);
}
if d > self.scrape_budget {
self.scrape_budget_exceeded_total = self.scrape_budget_exceeded_total.saturating_add(1);
}
}
pub fn record_iteration_duration(&mut self, d: Duration) {
let secs = d.as_secs_f64();
let ns = u64::try_from(d.as_nanos()).unwrap_or(u64::MAX);
self.iteration_duration_ns_sum = self.iteration_duration_ns_sum.saturating_add(ns);
self.iteration_count_total = self.iteration_count_total.saturating_add(1);
let mut placed = false;
for (i, &bound) in ITERATION_BUCKET_BOUNDS_S.iter().enumerate() {
if secs <= bound {
self.iteration_buckets[i] = self.iteration_buckets[i].saturating_add(1);
placed = true;
break;
}
}
if !placed {
let inf_idx = ITERATION_BUCKET_BOUNDS_S.len();
self.iteration_buckets[inf_idx] = self.iteration_buckets[inf_idx].saturating_add(1);
}
if d > self.iteration_budget {
self.iteration_budget_exceeded_total =
self.iteration_budget_exceeded_total.saturating_add(1);
}
}
pub fn record_stage_duration(&mut self, stage: IterStage, d: Duration) {
let idx = stage as usize;
let secs = d.as_secs_f64();
let ns = u64::try_from(d.as_nanos()).unwrap_or(u64::MAX);
self.stage_duration_ns_sum[idx] = self.stage_duration_ns_sum[idx].saturating_add(ns);
self.stage_count_total[idx] = self.stage_count_total[idx].saturating_add(1);
let mut placed = false;
for (i, &bound) in ITERATION_BUCKET_BOUNDS_S.iter().enumerate() {
if secs <= bound {
self.stage_buckets[idx][i] = self.stage_buckets[idx][i].saturating_add(1);
placed = true;
break;
}
}
if !placed {
let inf_i = ITERATION_BUCKET_BOUNDS_S.len();
self.stage_buckets[idx][inf_i] = self.stage_buckets[idx][inf_i].saturating_add(1);
}
}
pub fn record_ctrl_truncated(&mut self, count: u64) {
self.ctrl_truncated_total = self.ctrl_truncated_total.saturating_add(count);
}
pub fn serve_pending(&mut self) -> io::Result<()> {
let render_fresh = self
.last_scrape
.map_or(true, |last| last.elapsed() >= PROM_MIN_SCRAPE_INTERVAL);
let serve_deadline = Instant::now() + Duration::from_millis(100);
let mut served = 0;
let result = loop {
if Instant::now() >= serve_deadline {
self.scrape_budget_exhausted_total =
self.scrape_budget_exhausted_total.saturating_add(1);
break Ok(());
}
if served >= PROM_MAX_CONNECTIONS_PER_SERVE {
self.scrape_budget_exhausted_total =
self.scrape_budget_exhausted_total.saturating_add(1);
break Ok(());
}
match self.listener.accept() {
Ok((stream, peer)) => {
if !self.allow_ip(peer.ip(), Instant::now()) {
drop(stream);
continue;
}
self.serve_one(stream, render_fresh)?;
served += 1;
if !render_fresh {
self.scrape_skipped_total = self.scrape_skipped_total.saturating_add(1);
}
}
Err(e) if e.kind() == ErrorKind::WouldBlock => break Ok(()),
Err(e) => break Err(e),
}
};
if served > 0 && render_fresh {
self.last_scrape = Some(Instant::now());
}
let mut drained = 0;
while drained < PROM_MAX_DRAIN_PER_SERVE {
if Instant::now() >= serve_deadline + Duration::from_millis(100) {
break;
}
match self.listener.accept() {
Ok((stream, peer)) => {
let _ = self.allow_ip(peer.ip(), Instant::now());
drop(stream);
drained += 1;
self.connections_dropped_total[drop_reason_index(DropReason::Drain)] = self
.connections_dropped_total[drop_reason_index(DropReason::Drain)]
.saturating_add(1);
}
Err(e) if e.kind() == ErrorKind::WouldBlock => break,
Err(_) => break,
}
}
result
}
fn serve_one(&mut self, mut stream: TcpStream, render_fresh: bool) -> io::Result<()> {
stream.set_nonblocking(true)?;
let deadline = Instant::now() + PROM_READ_DEADLINE;
let mut buf = [0u8; 512];
let mut total = 0;
loop {
if Instant::now() >= deadline {
break;
}
if total >= buf.len() {
break;
}
match stream.read(&mut buf[total..]) {
Ok(0) => break,
Ok(n) => {
total += n;
if buf[..total].windows(4).any(|w| w == b"\r\n\r\n")
|| total >= PROM_REQUEST_CAP
{
break;
}
}
Err(e) if e.kind() == ErrorKind::WouldBlock => break,
Err(e) => return Err(e),
}
}
if total < 4 || buf[..4] != *b"GET " {
let response = b"HTTP/1.0 405 Method Not Allowed\r\nAllow: GET\r\nContent-Length: 0\r\nConnection: close\r\n\r\n";
let _ =
write_all_nonblocking(&mut stream, response, Instant::now() + PROM_WRITE_TIMEOUT);
drain_read_to_would_block(&mut stream);
let _ = stream.shutdown(Shutdown::Write);
return Ok(());
}
let authorized = match parse_authorization_bearer(&buf[..total]) {
Some(presented) => varta_vlp::ct_eq(&presented, self.token.as_bytes()),
None => false,
};
if !authorized {
self.auth_failures_total = self.auth_failures_total.saturating_add(1);
let response = b"HTTP/1.0 401 Unauthorized\r\nWWW-Authenticate: Bearer realm=\"varta\"\r\nContent-Length: 0\r\nConnection: close\r\n\r\n";
let _ =
write_all_nonblocking(&mut stream, response, Instant::now() + PROM_WRITE_TIMEOUT);
drain_read_to_would_block(&mut stream);
let _ = stream.shutdown(Shutdown::Write);
return Ok(());
}
if render_fresh {
self.render_body();
}
let body_len = self.body_buf.len();
let write_deadline = Instant::now() + PROM_WRITE_TIMEOUT;
let _ = write_headers_with_len(&mut stream, body_len, write_deadline);
let _ = write_all_nonblocking(&mut stream, self.body_buf.as_bytes(), write_deadline);
drain_read_to_would_block(&mut stream);
let _ = stream.shutdown(Shutdown::Write);
Ok(())
}
fn render_body(&mut self) {
self.body_buf.clear();
const BODY_BUF_MAX_CAPACITY: usize = 65_536;
if self.body_buf.capacity() > BODY_BUF_MAX_CAPACITY {
self.body_buf = String::with_capacity(BODY_BUF_MAX_CAPACITY);
}
let prom_ip_probes = self.ip_state.take_probe_exhausted();
if prom_ip_probes > 0 {
self.prom_ip_state_probe_exhausted_total = self
.prom_ip_state_probe_exhausted_total
.saturating_add(prom_ip_probes);
}
let mut pids: Vec<u32> = self.rows.keys().copied().collect();
pids.sort_unstable();
self.body_buf
.push_str("# HELP varta_beats_total Total accepted beats per agent pid.\n");
self.body_buf.push_str("# TYPE varta_beats_total counter\n");
for pid in &pids {
let row = &self.rows[pid];
let _ = writeln!(
self.body_buf,
"varta_beats_total{{pid=\"{pid}\"}} {}",
row.beats_total
);
}
self.body_buf
.push_str("# HELP varta_stalls_total Total observer-detected stalls per agent pid.\n");
self.body_buf
.push_str("# TYPE varta_stalls_total counter\n");
for pid in &pids {
let row = &self.rows[pid];
let _ = writeln!(
self.body_buf,
"varta_stalls_total{{pid=\"{pid}\"}} {}",
row.stalls_total
);
}
self.body_buf.push_str("# HELP varta_status Last reported status code per agent pid (0=ok,1=degraded,2=critical,3=stall).\n");
self.body_buf.push_str("# TYPE varta_status gauge\n");
for pid in &pids {
let row = &self.rows[pid];
if let Some(code) = row.last_status {
let _ = writeln!(self.body_buf, "varta_status{{pid=\"{pid}\"}} {code}");
}
}
self.body_buf.push_str(
"# HELP varta_tracker_evicted_total Total tracker slots reclaimed from dead agents.\n",
);
self.body_buf
.push_str("# TYPE varta_tracker_evicted_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_tracker_evicted_total {}",
self.evicted_total
);
self.body_buf.push_str(
"# HELP varta_frame_auth_failures_total Frames rejected due to PID spoofing or authentication failure.\n",
);
self.body_buf
.push_str("# TYPE varta_frame_auth_failures_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_frame_auth_failures_total {}",
self.auth_failures_total
);
self.body_buf
.push_str("# HELP varta_decode_errors_total Total VLP decode failures by kind.\n");
self.body_buf
.push_str("# TYPE varta_decode_errors_total counter\n");
for (idx, kind) in DECODE_KIND_LABELS.iter().enumerate() {
let _ = writeln!(
self.body_buf,
"varta_decode_errors_total{{kind=\"{kind}\"}} {}",
self.decode_errors_total[idx]
);
}
self.body_buf
.push_str("# HELP varta_io_errors_total Total socket receive errors.\n");
self.body_buf
.push_str("# TYPE varta_io_errors_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_io_errors_total {}",
self.io_errors_total
);
self.body_buf
.push_str("# HELP varta_ctrl_truncated_total Total ancillary-data truncation events (MSG_CTRUNC on Linux).\n");
self.body_buf
.push_str("# TYPE varta_ctrl_truncated_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_ctrl_truncated_total {}",
self.ctrl_truncated_total
);
self.body_buf.push_str("# HELP varta_tracker_capacity_exceeded_total Total beats dropped because tracker is full.\n");
self.body_buf
.push_str("# TYPE varta_tracker_capacity_exceeded_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_tracker_capacity_exceeded_total {}",
self.capacity_exceeded_total
);
self.body_buf.push_str("# HELP varta_tracker_eviction_scan_truncated_total Total bounded eviction scans that exhausted the window without finding a victim.\n");
self.body_buf
.push_str("# TYPE varta_tracker_eviction_scan_truncated_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_tracker_eviction_scan_truncated_total {}",
self.eviction_scan_truncated_total
);
self.body_buf.push_str("# HELP varta_tracker_capacity Configured tracker capacity (max distinct agent pids).\n");
self.body_buf
.push_str("# TYPE varta_tracker_capacity gauge\n");
let _ = writeln!(
self.body_buf,
"varta_tracker_capacity {}",
self.tracker_capacity_cfg
);
self.body_buf.push_str("# HELP varta_tracker_eviction_scan_window_max Configured eviction scan window; per-frame WCET = ceil(capacity / window_max) calls.\n");
self.body_buf
.push_str("# TYPE varta_tracker_eviction_scan_window_max gauge\n");
let _ = writeln!(
self.body_buf,
"varta_tracker_eviction_scan_window_max {}",
self.eviction_scan_window_max
);
self.body_buf
.push_str("# HELP varta_recovery_outcomes_total Total recovery outcomes by kind.\n");
self.body_buf
.push_str("# TYPE varta_recovery_outcomes_total counter\n");
for (idx, outcome) in RECOVERY_OUTCOME_LABELS.iter().enumerate() {
let _ = writeln!(
self.body_buf,
"varta_recovery_outcomes_total{{outcome=\"{outcome}\"}} {}",
self.recovery_outcomes_total[idx]
);
}
self.body_buf.push_str(
"# HELP varta_recovery_duration_ns_sum Sum of recovery child wall-clock durations in ns.\n",
);
self.body_buf
.push_str("# TYPE varta_recovery_duration_ns_sum counter\n");
let _ = writeln!(
self.body_buf,
"varta_recovery_duration_ns_sum {}",
self.recovery_duration_ns_sum
);
self.body_buf.push_str(
"# HELP varta_recovery_duration_count_total Number of recovery completions contributing to varta_recovery_duration_ns_sum.\n",
);
self.body_buf
.push_str("# TYPE varta_recovery_duration_count_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_recovery_duration_count_total {}",
self.recovery_duration_count_total
);
self.body_buf.push_str(
"# HELP varta_recovery_refused_total Recovery commands NOT spawned because of a structural safety gate, by reason.\n",
);
self.body_buf
.push_str("# TYPE varta_recovery_refused_total counter\n");
for (idx, reason) in RECOVERY_REFUSED_REASON_LABELS.iter().enumerate() {
let _ = writeln!(
self.body_buf,
"varta_recovery_refused_total{{reason=\"{reason}\"}} {}",
self.recovery_refused_total[idx]
);
}
self.body_buf.push_str(
"# HELP varta_recovery_last_fired_evictions_total LastFiredTable entries dropped (debounce-respecting) to make room for a new pid at table capacity.\n",
);
self.body_buf
.push_str("# TYPE varta_recovery_last_fired_evictions_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_recovery_last_fired_evictions_total {}",
self.recovery_last_fired_evictions_total
);
self.body_buf.push_str(
"# HELP varta_recovery_invariant_violations_total LastFiredTable defensive fall-throughs — should remain at 0 in correct operation.\n",
);
self.body_buf
.push_str("# TYPE varta_recovery_invariant_violations_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_recovery_invariant_violations_total {}",
self.recovery_invariant_violations_total
);
{
let suppressed = LOG_RATE_LIMITER
.lock()
.map(|g| g.snapshot_totals())
.unwrap_or([0; LogKind::COUNT]);
self.body_buf.push_str(
"# HELP varta_log_suppressed_total Log messages suppressed by the per-kind cooldown rate limiter.\n",
);
self.body_buf
.push_str("# TYPE varta_log_suppressed_total counter\n");
for (idx, kind) in LOG_KIND_LABELS.iter().enumerate() {
let _ = writeln!(
self.body_buf,
"varta_log_suppressed_total{{kind=\"{kind}\"}} {}",
suppressed[idx]
);
}
}
self.body_buf.push_str(
"# HELP varta_origin_conflict_total Beats dropped because the slot's pinned transport origin disagreed with the beat's origin.\n",
);
self.body_buf
.push_str("# TYPE varta_origin_conflict_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_origin_conflict_total {}",
self.origin_conflict_total
);
self.body_buf.push_str(
"# HELP varta_frame_namespace_mismatch_total Frames dropped at receive because the peer's PID-namespace inode differs from the observer's.\n",
);
self.body_buf
.push_str("# TYPE varta_frame_namespace_mismatch_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_frame_namespace_mismatch_total {}",
self.frame_namespace_mismatch_total
);
self.body_buf.push_str(
"# HELP varta_frame_rejected_pid_above_max_total Frames dropped at receive because frame.pid exceeded the kernel's configured pid_max.\n",
);
self.body_buf
.push_str("# TYPE varta_frame_rejected_pid_above_max_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_frame_rejected_pid_above_max_total {}",
self.frame_rejected_pid_above_max_total
);
self.body_buf.push_str(
"# HELP varta_pid_max_current Observer's cached /proc/sys/kernel/pid_max (refreshed every 60s).\n",
);
self.body_buf
.push_str("# TYPE varta_pid_max_current gauge\n");
let _ = writeln!(
self.body_buf,
"varta_pid_max_current {}",
self.pid_max_current
);
self.body_buf.push_str(
"# HELP varta_tracker_namespace_conflict_total Beats dropped because the slot's pinned PID-namespace inode disagreed with the beat's (first-namespace-wins).\n",
);
self.body_buf
.push_str("# TYPE varta_tracker_namespace_conflict_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_tracker_namespace_conflict_total {}",
self.tracker_namespace_conflict_total
);
self.body_buf.push_str(
"# HELP varta_tracker_invariant_violations_total Tracker hot-path invariant violations recovered by defensive .get() fall-throughs (e.g. stale PidIndex entry pointing at an OOB slot). Non-zero = bug, not a panic.\n",
);
self.body_buf
.push_str("# TYPE varta_tracker_invariant_violations_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_tracker_invariant_violations_total {}",
self.tracker_invariant_violations_total
);
self.body_buf.push_str(
"# HELP varta_tracker_pid_index_probe_exhausted_total PidIndex lookups/inserts that ran the full MAX_PROBE budget. Should stay at 0 at load factor ≤ 0.5.\n",
);
self.body_buf
.push_str("# TYPE varta_tracker_pid_index_probe_exhausted_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_tracker_pid_index_probe_exhausted_total {}",
self.tracker_pid_index_probe_exhausted_total
);
self.body_buf.push_str(
"# HELP varta_recovery_outstanding_probe_exhausted_total OutstandingTable pid-index lookups/inserts that ran the full MAX_PROBE budget.\n",
);
self.body_buf
.push_str("# TYPE varta_recovery_outstanding_probe_exhausted_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_recovery_outstanding_probe_exhausted_total {}",
self.recovery_outstanding_probe_exhausted_total
);
self.body_buf.push_str(
"# HELP varta_recovery_reap_truncated_total try_reap calls cut short because outstanding children exceeded the per-tick cap (REAP_MAX_PER_TICK=64).\n",
);
self.body_buf
.push_str("# TYPE varta_recovery_reap_truncated_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_recovery_reap_truncated_total {}",
self.recovery_reap_truncated_total
);
self.body_buf.push_str(
"# HELP varta_recovery_audit_dropped_total Audit lines dropped because the ring was full when they arrived.\n",
);
self.body_buf
.push_str("# TYPE varta_recovery_audit_dropped_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_recovery_audit_dropped_total {}",
self.audit_dropped_total
);
self.body_buf.push_str(
"# HELP varta_recovery_audit_flush_budget_exceeded_total Ticks where flush_pending hit its budget before emptying the audit ring.\n",
);
self.body_buf
.push_str("# TYPE varta_recovery_audit_flush_budget_exceeded_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_recovery_audit_flush_budget_exceeded_total {}",
self.audit_flush_budget_exceeded_total
);
self.body_buf.push_str(
"# HELP varta_audit_fsync_seconds Wall time per fdatasync(2) on the recovery audit log. Bounded by --audit-fsync-budget-ms; overruns increment varta_audit_fsync_budget_exceeded_total.\n",
);
self.body_buf
.push_str("# TYPE varta_audit_fsync_seconds histogram\n");
let mut cum_af: u64 = 0;
for (idx, bound) in ITERATION_BUCKET_BOUNDS_S.iter().enumerate() {
cum_af = cum_af.saturating_add(self.audit_fsync_buckets[idx]);
let _ = writeln!(
self.body_buf,
"varta_audit_fsync_seconds_bucket{{le=\"{bound}\"}} {cum_af}",
);
}
let inf_idx_af = ITERATION_BUCKET_BOUNDS_S.len();
cum_af = cum_af.saturating_add(self.audit_fsync_buckets[inf_idx_af]);
let _ = writeln!(
self.body_buf,
"varta_audit_fsync_seconds_bucket{{le=\"+Inf\"}} {cum_af}"
);
let sum_s_af = (self.audit_fsync_duration_ns_sum as f64) / 1e9;
let _ = writeln!(self.body_buf, "varta_audit_fsync_seconds_sum {sum_s_af:.9}");
let _ = writeln!(
self.body_buf,
"varta_audit_fsync_seconds_count {}",
self.audit_fsync_count_total
);
self.body_buf.push_str(
"# HELP varta_socket_bind_dir_fsync_failed_total fsync(2) calls on the UDS socket parent directory during observer bind that returned an error. Non-zero indicates a durability degradation — the unlink+bind sequence may not survive a power-loss journal replay.\n",
);
self.body_buf
.push_str("# TYPE varta_socket_bind_dir_fsync_failed_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_socket_bind_dir_fsync_failed_total {}",
self.bind_dir_fsync_failed_total
);
self.body_buf.push_str(
"# HELP varta_audit_fsync_budget_exceeded_total fdatasync(2) calls on the recovery audit log whose wall time exceeded --audit-fsync-budget-ms. Remaining records in the affected drain are written-to-BufWriter only; the next maintenance tick reattempts the sync.\n",
);
self.body_buf
.push_str("# TYPE varta_audit_fsync_budget_exceeded_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_audit_fsync_budget_exceeded_total {}",
self.audit_fsync_budget_exceeded_total
);
self.body_buf.push_str(
"# HELP varta_audit_rotation_budget_exceeded_total drive_audit_rotation calls that exceeded --audit-rotation-budget-ms. The state machine preserves progress and the next tick resumes.\n",
);
self.body_buf
.push_str("# TYPE varta_audit_rotation_budget_exceeded_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_audit_rotation_budget_exceeded_total {}",
self.audit_rotation_budget_exceeded_total
);
self.body_buf.push_str(
"# HELP varta_audit_ring_watermark_total Rising-edge transitions of the audit-record ring fill across warning (75%) and critical (95%) thresholds. Increment indicates drain pressure that has not yet caused records to drop.\n",
);
self.body_buf
.push_str("# TYPE varta_audit_ring_watermark_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_audit_ring_watermark_total{{level=\"warn\"}} {}",
self.audit_ring_watermark_total[0]
);
let _ = writeln!(
self.body_buf,
"varta_audit_ring_watermark_total{{level=\"critical\"}} {}",
self.audit_ring_watermark_total[1]
);
self.body_buf.push_str(
"# HELP varta_prom_ip_state_probe_exhausted_total IpStateTable lookups/inserts that ran the full MAX_PROBE budget.\n",
);
self.body_buf
.push_str("# TYPE varta_prom_ip_state_probe_exhausted_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_prom_ip_state_probe_exhausted_total {}",
self.prom_ip_state_probe_exhausted_total
);
self.body_buf.push_str(
"# HELP varta_frame_decrypt_failures_total Total AEAD decryption/tag-verification failures.\n",
);
self.body_buf
.push_str("# TYPE varta_frame_decrypt_failures_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_frame_decrypt_failures_total {}",
self.decrypt_failures_total
);
self.body_buf.push_str(
"# HELP varta_truncated_datagrams_total Total datagrams received with wrong size.\n",
);
self.body_buf
.push_str("# TYPE varta_truncated_datagrams_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_truncated_datagrams_total {}",
self.truncated_total
);
self.body_buf.push_str(
"# HELP varta_sender_state_full_total Total times the sender-state map was full and an entry was force-evicted.\n",
);
self.body_buf
.push_str("# TYPE varta_sender_state_full_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_sender_state_full_total {}",
self.sender_state_full_total
);
self.body_buf.push_str(
"# HELP varta_secure_aead_attempts_total Total ChaCha20-Poly1305 decryption attempts across the loaded key set. The listener trials every loaded key (and the master-key derivation, if configured) on every frame, removing the linear-in-key-index timing side-channel. In steady state this equals frames_received * (keys.len() + master_key_configured as u64).\n",
);
self.body_buf
.push_str("# TYPE varta_secure_aead_attempts_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_secure_aead_attempts_total {}",
self.secure_aead_attempts_total
);
self.body_buf
.push_str("# HELP varta_rate_limited_total Frames dropped due to rate limiting.\n");
self.body_buf
.push_str("# TYPE varta_rate_limited_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_rate_limited_total{{reason=\"per_pid\"}} {}",
self.rate_limited_total[0]
);
let _ = writeln!(
self.body_buf,
"varta_rate_limited_total{{reason=\"global\"}} {}",
self.rate_limited_total[1]
);
self.body_buf.push_str(
"# HELP varta_observer_uds_rcvbuf_bytes Effective SO_RCVBUF size on the observer UDS, in bytes.\n",
);
self.body_buf
.push_str("# TYPE varta_observer_uds_rcvbuf_bytes gauge\n");
let _ = writeln!(
self.body_buf,
"varta_observer_uds_rcvbuf_bytes {}",
self.uds_rcvbuf_bytes
);
self.body_buf.push_str(
"# HELP varta_observer_clock_regression_total Times the observer monotonic clock returned a value strictly less than the previously observed one and the forward clamp absorbed the regression. Non-zero values indicate TSC drift, VM live migration, or another clock anomaly.\n",
);
self.body_buf
.push_str("# TYPE varta_observer_clock_regression_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_observer_clock_regression_total {}",
self.clock_regressions_total
);
self.body_buf.push_str(
"# HELP varta_observer_clock_jump_forward_total Times the observer monotonic clock advanced by more than 5 s between adjacent poll ticks. Non-zero values indicate sleep/wake on monotonic-raw/boottime, VM live migration, or a hypervisor pause.\n",
);
self.body_buf
.push_str("# TYPE varta_observer_clock_jump_forward_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_observer_clock_jump_forward_total {}",
self.clock_jumps_forward_total
);
self.body_buf.push_str(
"# HELP varta_scrape_skipped_total Number of /metrics scrapes served from cache (rate-limited).\n",
);
self.body_buf
.push_str("# TYPE varta_scrape_skipped_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_scrape_skipped_total {}",
self.scrape_skipped_total
);
self.body_buf.push_str(
"# HELP varta_scrape_budget_exhausted_total Times the serve budget (max connections or deadline) was exhausted during a poll tick.\n",
);
self.body_buf
.push_str("# TYPE varta_scrape_budget_exhausted_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_scrape_budget_exhausted_total {}",
self.scrape_budget_exhausted_total
);
self.body_buf.push_str(
"# HELP varta_observer_iteration_seconds Observer poll-loop iteration wall time (excludes idle sleep and test-hooks wedge).\n",
);
self.body_buf
.push_str("# TYPE varta_observer_iteration_seconds histogram\n");
let mut cum: u64 = 0;
for (idx, bound) in ITERATION_BUCKET_BOUNDS_S.iter().enumerate() {
cum = cum.saturating_add(self.iteration_buckets[idx]);
let _ = writeln!(
self.body_buf,
"varta_observer_iteration_seconds_bucket{{le=\"{bound}\"}} {cum}",
);
}
let inf_idx = ITERATION_BUCKET_BOUNDS_S.len();
cum = cum.saturating_add(self.iteration_buckets[inf_idx]);
let _ = writeln!(
self.body_buf,
"varta_observer_iteration_seconds_bucket{{le=\"+Inf\"}} {cum}"
);
let sum_s = (self.iteration_duration_ns_sum as f64) / 1e9;
let _ = writeln!(
self.body_buf,
"varta_observer_iteration_seconds_sum {sum_s:.9}"
);
let _ = writeln!(
self.body_buf,
"varta_observer_iteration_seconds_count {}",
self.iteration_count_total
);
self.body_buf.push_str(
"# HELP varta_observer_iteration_budget_exceeded_total Observer poll iterations that exceeded the soft --iteration-budget-ms.\n",
);
self.body_buf
.push_str("# TYPE varta_observer_iteration_budget_exceeded_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_observer_iteration_budget_exceeded_total {}",
self.iteration_budget_exceeded_total
);
self.body_buf.push_str(
"# HELP varta_observer_serve_pending_seconds Wall time spent in PromExporter::serve_pending per poll-loop tick. Subtract from iteration_seconds to derive beat-path latency.\n",
);
self.body_buf
.push_str("# TYPE varta_observer_serve_pending_seconds histogram\n");
let mut cum_sp: u64 = 0;
for (idx, bound) in ITERATION_BUCKET_BOUNDS_S.iter().enumerate() {
cum_sp = cum_sp.saturating_add(self.serve_pending_buckets[idx]);
let _ = writeln!(
self.body_buf,
"varta_observer_serve_pending_seconds_bucket{{le=\"{bound}\"}} {cum_sp}",
);
}
let inf_idx_sp = ITERATION_BUCKET_BOUNDS_S.len();
cum_sp = cum_sp.saturating_add(self.serve_pending_buckets[inf_idx_sp]);
let _ = writeln!(
self.body_buf,
"varta_observer_serve_pending_seconds_bucket{{le=\"+Inf\"}} {cum_sp}"
);
let sum_s_sp = (self.serve_pending_duration_ns_sum as f64) / 1e9;
let _ = writeln!(
self.body_buf,
"varta_observer_serve_pending_seconds_sum {sum_s_sp:.9}"
);
let _ = writeln!(
self.body_buf,
"varta_observer_serve_pending_seconds_count {}",
self.serve_pending_count_total
);
self.body_buf.push_str(
"# HELP varta_observer_scrape_budget_exceeded_total serve_pending calls that exceeded the soft --scrape-budget-ms.\n",
);
self.body_buf
.push_str("# TYPE varta_observer_scrape_budget_exceeded_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_observer_scrape_budget_exceeded_total {}",
self.scrape_budget_exceeded_total
);
self.body_buf.push_str(
"# HELP varta_observer_stage_seconds Per-stage observer poll-loop wall time for latency attribution.\n",
);
self.body_buf
.push_str("# TYPE varta_observer_stage_seconds histogram\n");
for (stage_idx, stage_label) in STAGE_LABELS.iter().enumerate() {
let mut cum_st: u64 = 0;
for (b_idx, bound) in ITERATION_BUCKET_BOUNDS_S.iter().enumerate() {
cum_st = cum_st.saturating_add(self.stage_buckets[stage_idx][b_idx]);
let _ = writeln!(
self.body_buf,
"varta_observer_stage_seconds_bucket{{stage=\"{stage_label}\",le=\"{bound}\"}} {cum_st}",
);
}
let inf_i = ITERATION_BUCKET_BOUNDS_S.len();
cum_st = cum_st.saturating_add(self.stage_buckets[stage_idx][inf_i]);
let _ = writeln!(
self.body_buf,
"varta_observer_stage_seconds_bucket{{stage=\"{stage_label}\",le=\"+Inf\"}} {cum_st}"
);
let sum_s = (self.stage_duration_ns_sum[stage_idx] as f64) / 1e9;
let _ = writeln!(
self.body_buf,
"varta_observer_stage_seconds_sum{{stage=\"{stage_label}\"}} {sum_s:.9}"
);
let _ = writeln!(
self.body_buf,
"varta_observer_stage_seconds_count{{stage=\"{stage_label}\"}} {}",
self.stage_count_total[stage_idx]
);
}
self.body_buf.push_str(
"# HELP varta_prom_auth_failures_total Number of /metrics scrapes rejected because the bearer token was missing or wrong.\n",
);
self.body_buf
.push_str("# TYPE varta_prom_auth_failures_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_prom_auth_failures_total {}",
self.auth_failures_total
);
self.body_buf.push_str(
"# HELP varta_prom_connections_dropped_total Connections accepted on /metrics but closed before serving, by reason.\n",
);
self.body_buf
.push_str("# TYPE varta_prom_connections_dropped_total counter\n");
for (idx, reason) in DROP_REASON_LABELS.iter().enumerate() {
let _ = writeln!(
self.body_buf,
"varta_prom_connections_dropped_total{{reason=\"{reason}\"}} {}",
self.connections_dropped_total[idx]
);
}
self.body_buf.push_str(
"# HELP varta_nonce_wrap_total Total nonce-space wrap events detected (agent exhausted u64 nonces).\n",
);
self.body_buf
.push_str("# TYPE varta_nonce_wrap_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_nonce_wrap_total {}",
self.nonce_wrap_total
);
self.body_buf.push_str(
"# HELP varta_signal_handler_install_total Signal-handler installation events since startup, labelled by mode (direct or libc). Always 1 in steady state; 0 means install was skipped or the label was never set.\n",
);
self.body_buf
.push_str("# TYPE varta_signal_handler_install_total counter\n");
let _ = writeln!(
self.body_buf,
"varta_signal_handler_install_total{{mode=\"{}\"}} 1",
self.signal_handler_mode,
);
self.body_buf
.push_str("# HELP varta_watch_uptime_seconds Observer process uptime in seconds.\n");
self.body_buf
.push_str("# TYPE varta_watch_uptime_seconds gauge\n");
let uptime = self.started_at.elapsed().as_secs_f64();
let _ = writeln!(self.body_buf, "varta_watch_uptime_seconds {uptime:.3}");
self.body_buf.push_str(
"# HELP varta_watch_last_poll_loop_timestamp_seconds Unix timestamp of the most recent poll loop iteration.\n",
);
self.body_buf
.push_str("# TYPE varta_watch_last_poll_loop_timestamp_seconds gauge\n");
let loop_ts = self
.last_loop_system
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
let _ = writeln!(
self.body_buf,
"varta_watch_last_poll_loop_timestamp_seconds {loop_ts:.3}"
);
self.body_buf.push_str(
"# HELP varta_watch_pids_tracked Current number of agent PIDs in the tracker.\n",
);
self.body_buf
.push_str("# TYPE varta_watch_pids_tracked gauge\n");
let _ = writeln!(
self.body_buf,
"varta_watch_pids_tracked {}",
self.rows.len()
);
}
}
#[cfg(feature = "prometheus-exporter")]
impl Exporter for PromExporter {
fn record(&mut self, ev: &Event) -> io::Result<()> {
match ev {
Event::Beat {
pid,
status,
observer_ns: _,
..
} => {
let row = self.rows.entry(*pid).or_insert_with(GaugeRow::new);
row.beats_total = row.beats_total.saturating_add(1);
row.last_status = Some(*status as u8);
}
Event::Stall {
pid,
observer_ns: _,
..
} => {
let row = self.rows.entry(*pid).or_insert_with(GaugeRow::new);
row.stalls_total = row.stalls_total.saturating_add(1);
row.last_status = Some(Status::Stall as u8);
}
Event::AuthFailure { observer_ns: _, .. } => {
self.auth_failures_total = self.auth_failures_total.saturating_add(1);
}
Event::OriginConflict { .. } => {
}
Event::NamespaceConflict { .. } => {
}
Event::Decode(err, _) => {
let idx = decode_kind_index(err);
self.decode_errors_total[idx] = self.decode_errors_total[idx].saturating_add(1);
}
Event::Io(_, _) => {
self.io_errors_total = self.io_errors_total.saturating_add(1);
}
Event::CtrlTruncated(_, _) => {
self.ctrl_truncated_total = self.ctrl_truncated_total.saturating_add(1);
}
}
Ok(())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[cfg(feature = "prometheus-exporter")]
fn write_headers_with_len(
stream: &mut TcpStream,
body_len: usize,
deadline: Instant,
) -> io::Result<()> {
let mut buf = [0u8; 128];
let prefix = b"HTTP/1.0 200 OK\r\nContent-Type: text/plain; version=0.0.4\r\nContent-Length: ";
let suffix = b"\r\nConnection: close\r\n\r\n";
let len_str_len = write_usize(&mut buf[prefix.len()..], body_len);
let total = prefix.len() + len_str_len + suffix.len();
buf[..prefix.len()].copy_from_slice(prefix);
buf[prefix.len() + len_str_len..total].copy_from_slice(suffix);
write_all_nonblocking(stream, &buf[..total], deadline)
}
#[cfg(feature = "prometheus-exporter")]
fn write_usize(buf: &mut [u8], mut n: usize) -> usize {
debug_assert!(
buf.len() >= 20,
"write_usize: buffer too small ({})",
buf.len()
);
if n == 0 {
buf[0] = b'0';
return 1;
}
let mut pos = buf.len();
while n > 0 {
pos -= 1;
buf[pos] = (n % 10) as u8 + b'0';
n /= 10;
}
let len = buf.len() - pos;
buf.copy_within(pos.., 0);
len
}
#[cfg(feature = "prometheus-exporter")]
const MAX_WRITE_YIELDS: usize = 10;
#[cfg(feature = "prometheus-exporter")]
fn write_all_nonblocking(stream: &mut TcpStream, buf: &[u8], deadline: Instant) -> io::Result<()> {
let mut written = 0;
let mut yields = 0;
while written < buf.len() {
if Instant::now() >= deadline {
break;
}
match stream.write(&buf[written..]) {
Ok(0) => break,
Ok(n) => written += n,
Err(e) if e.kind() == ErrorKind::WouldBlock => {
if yields >= MAX_WRITE_YIELDS {
break;
}
yields += 1;
std::thread::yield_now();
continue;
}
Err(e) => return Err(e),
}
}
Ok(())
}
#[cfg(feature = "prometheus-exporter")]
fn parse_authorization_bearer(buf: &[u8]) -> Option<[u8; 32]> {
let mut rest = match find_crlf(buf) {
Some(eol) => &buf[eol + 2..],
None => return None,
};
while let Some(eol) = find_crlf(rest) {
let line = &rest[..eol];
rest = &rest[eol + 2..];
if line.is_empty() {
return None;
}
const HDR: &[u8] = b"authorization:";
if line.len() >= HDR.len() && line[..HDR.len()].eq_ignore_ascii_case(HDR) {
let mut value = &line[HDR.len()..];
while let Some(b) = value.first().copied() {
if b == b' ' || b == b'\t' {
value = &value[1..];
} else {
break;
}
}
const BEARER: &[u8] = b"bearer ";
if value.len() < BEARER.len() {
return None;
}
if !value[..BEARER.len()].eq_ignore_ascii_case(BEARER) {
return None;
}
let mut token_part = &value[BEARER.len()..];
while let Some(b) = token_part.first().copied() {
if b == b' ' || b == b'\t' {
token_part = &token_part[1..];
} else {
break;
}
}
if token_part.len() < 64 {
return None;
}
return varta_vlp::decode_hex_32(&token_part[..64]).ok();
}
}
None
}
#[cfg(feature = "prometheus-exporter")]
fn find_crlf(buf: &[u8]) -> Option<usize> {
buf.windows(2).position(|w| w == b"\r\n")
}
#[cfg(feature = "prometheus-exporter")]
fn drain_read_to_would_block(stream: &mut TcpStream) {
let mut buf = [0u8; 128];
loop {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(_) => continue,
Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
Err(_) => break,
}
}
}
#[cfg(all(test, feature = "prometheus-exporter"))]
mod tests {
use super::*;
const TEST_TOKEN: [u8; 32] = [0xab; 32];
const TEST_TOKEN_HEX: &str = "abababababababababababababababababababababababababababababababab";
fn make_token() -> BearerToken {
BearerToken::from_bytes(TEST_TOKEN)
}
#[test]
fn render_body_sorts_pids_numerically() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
prom.record(&Event::Beat {
pid: 30,
status: Status::Ok,
nonce: 1,
payload: 0,
observer_ns: 0,
origin: crate::peer_cred::BeatOrigin::KernelAttested,
pid_ns_inode: None,
})
.unwrap();
prom.record(&Event::Beat {
pid: 2,
status: Status::Ok,
nonce: 1,
payload: 0,
observer_ns: 0,
origin: crate::peer_cred::BeatOrigin::KernelAttested,
pid_ns_inode: None,
})
.unwrap();
prom.record(&Event::Beat {
pid: 11,
status: Status::Ok,
nonce: 1,
payload: 0,
observer_ns: 0,
origin: crate::peer_cred::BeatOrigin::KernelAttested,
pid_ns_inode: None,
})
.unwrap();
prom.render_body();
let body = &prom.body_buf;
let pos2 = body.find("pid=\"2\"").expect("pid 2");
let pos11 = body.find("pid=\"11\"").expect("pid 11");
let pos30 = body.find("pid=\"30\"").expect("pid 30");
assert!(pos2 < pos11 && pos11 < pos30, "sort order broken:\n{body}");
}
#[test]
fn decode_and_io_events_do_not_create_rows() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
prom.record(&Event::Decode(varta_vlp::DecodeError::BadMagic, 0))
.unwrap();
prom.record(&Event::Io(io::Error::other("x"), 0)).unwrap();
assert!(prom.rows.is_empty());
}
#[test]
fn decode_errors_emit_kind_label_for_every_variant_even_at_zero() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
prom.record(&Event::Decode(DecodeError::BadMagic, 0))
.unwrap();
prom.record(&Event::Decode(DecodeError::BadMagic, 0))
.unwrap();
prom.record(&Event::Decode(DecodeError::BadStatus(0xff), 0))
.unwrap();
prom.render_body();
let body = &prom.body_buf;
assert!(
body.contains("varta_decode_errors_total{kind=\"bad_magic\"} 2"),
"missing or wrong bad_magic series:\n{body}"
);
assert!(
body.contains("varta_decode_errors_total{kind=\"bad_version\"} 0"),
"missing zero-valued bad_version series:\n{body}"
);
assert!(
body.contains("varta_decode_errors_total{kind=\"bad_status\"} 1"),
"missing or wrong bad_status series:\n{body}"
);
}
#[test]
fn non_get_request_returns_405() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
let addr = prom.local_addr().expect("local_addr");
let mut stream = TcpStream::connect(addr).expect("connect");
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.expect("read timeout");
stream
.write_all(b"POST /metrics HTTP/1.0\r\n\r\n")
.expect("write");
std::thread::sleep(Duration::from_millis(5));
prom.serve_pending().expect("serve_pending");
let mut response = String::new();
stream.read_to_string(&mut response).expect("read");
assert!(
response.starts_with("HTTP/1.0 405 Method Not Allowed"),
"expected 405, got: {response}"
);
assert!(
response.contains("Allow: GET"),
"missing Allow header: {response}"
);
}
fn one_get(prom: &mut PromExporter, addr: SocketAddr, auth: Option<&str>) -> String {
let mut stream = TcpStream::connect(addr).expect("connect");
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.expect("read timeout");
let mut req = String::from("GET /metrics HTTP/1.0\r\nHost: localhost\r\n");
if let Some(a) = auth {
req.push_str("Authorization: ");
req.push_str(a);
req.push_str("\r\n");
}
req.push_str("Connection: close\r\n\r\n");
stream.write_all(req.as_bytes()).expect("write");
for _ in 0..20 {
std::thread::sleep(Duration::from_millis(5));
prom.serve_pending().expect("serve_pending");
}
let mut response = String::new();
stream.read_to_string(&mut response).expect("read");
response
}
#[test]
fn metrics_requires_bearer_token() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
let addr = prom.local_addr().expect("local_addr");
let response = one_get(&mut prom, addr, None);
assert!(
response.starts_with("HTTP/1.0 401 Unauthorized"),
"expected 401 on missing auth, got: {response}"
);
assert!(
response.contains("WWW-Authenticate: Bearer"),
"missing WWW-Authenticate header: {response}"
);
assert_eq!(
prom.auth_failures_total, 1,
"auth_failures_total must bump on missing auth"
);
}
#[test]
fn metrics_rejects_wrong_token() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
let addr = prom.local_addr().expect("local_addr");
let bad = "Bearer 0000000000000000000000000000000000000000000000000000000000000000";
let response = one_get(&mut prom, addr, Some(bad));
assert!(
response.starts_with("HTTP/1.0 401 Unauthorized"),
"expected 401 on wrong token, got: {response}"
);
assert_eq!(
prom.auth_failures_total, 1,
"auth_failures_total must bump on wrong token"
);
}
#[test]
fn metrics_accepts_valid_token() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
let addr = prom.local_addr().expect("local_addr");
let good = format!("Bearer {TEST_TOKEN_HEX}");
let response = one_get(&mut prom, addr, Some(&good));
assert!(
response.starts_with("HTTP/1.0 200 OK"),
"expected 200 on valid token, got: {response}"
);
assert_eq!(
prom.auth_failures_total, 0,
"auth_failures_total must not bump on success"
);
}
#[test]
fn metrics_authorization_header_is_case_insensitive() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
let addr = prom.local_addr().expect("local_addr");
let token_upper = TEST_TOKEN_HEX.to_uppercase();
let mut stream = TcpStream::connect(addr).expect("connect");
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.expect("read timeout");
let req = format!(
"GET /metrics HTTP/1.0\r\nauthorization: bearer {token_upper}\r\nConnection: close\r\n\r\n"
);
stream.write_all(req.as_bytes()).expect("write");
std::thread::sleep(Duration::from_millis(5));
prom.serve_pending().expect("serve_pending");
let mut response = String::new();
stream.read_to_string(&mut response).expect("read");
assert!(
response.starts_with("HTTP/1.0 200 OK"),
"expected 200 with case-insensitive header, got: {response}"
);
}
#[test]
fn auth_failures_counter_emitted_at_zero_in_body() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
prom.render_body();
assert!(
prom.body_buf.contains("varta_prom_auth_failures_total 0"),
"auth_failures_total must emit at zero; body:\n{}",
prom.body_buf
);
}
#[test]
fn parse_authorization_bearer_finds_token_among_many_headers() {
let req = format!(
"GET /metrics HTTP/1.0\r\nHost: localhost\r\nX-Foo: bar\r\nAuthorization: Bearer {TEST_TOKEN_HEX}\r\nUser-Agent: prom/2\r\n\r\n"
);
let parsed =
parse_authorization_bearer(req.as_bytes()).expect("token must parse out of headers");
assert_eq!(parsed, TEST_TOKEN);
}
#[test]
fn parse_authorization_bearer_rejects_non_bearer_scheme() {
let req = "GET /metrics HTTP/1.0\r\nAuthorization: Basic dXNlcjpwYXNz\r\n\r\n";
assert!(parse_authorization_bearer(req.as_bytes()).is_none());
}
#[test]
fn parse_authorization_bearer_rejects_short_token() {
let req = "GET /metrics HTTP/1.0\r\nAuthorization: Bearer abc\r\n\r\n";
assert!(parse_authorization_bearer(req.as_bytes()).is_none());
}
#[test]
fn record_evicted_pid_removes_row() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
prom.record(&Event::Beat {
pid: 42,
status: Status::Ok,
nonce: 1,
payload: 0,
observer_ns: 0,
origin: crate::peer_cred::BeatOrigin::KernelAttested,
pid_ns_inode: None,
})
.unwrap();
assert!(prom.rows.contains_key(&42), "row should exist after beat");
prom.record_evicted_pid(42);
assert!(
!prom.rows.contains_key(&42),
"row should be removed after eviction"
);
}
#[test]
fn record_evicted_pid_ignores_unknown_pid() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
prom.record_evicted_pid(99);
assert!(prom.rows.is_empty());
}
#[test]
fn self_health_metrics_are_emitted() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
prom.record(&Event::Beat {
pid: 7,
status: Status::Ok,
nonce: 1,
payload: 0,
observer_ns: 1,
origin: crate::peer_cred::BeatOrigin::KernelAttested,
pid_ns_inode: None,
})
.unwrap();
prom.record_loop_tick();
prom.render_body();
let body = &prom.body_buf;
assert!(
body.contains("varta_watch_uptime_seconds"),
"missing varta_watch_uptime_seconds:\n{body}"
);
assert!(
body.contains("varta_watch_last_poll_loop_timestamp_seconds"),
"missing varta_watch_last_poll_loop_timestamp_seconds:\n{body}"
);
assert!(
body.contains("varta_watch_pids_tracked 1"),
"missing/incorrect varta_watch_pids_tracked:\n{body}"
);
let needle = "varta_watch_uptime_seconds 0.";
assert!(body.contains(needle), "uptime should start near 0:\n{body}");
prom.record_evicted_pid(7);
prom.render_body();
let body2 = &prom.body_buf;
assert!(
body2.contains("varta_watch_pids_tracked 0"),
"pids_tracked should be 0 after eviction:\n{body2}"
);
}
#[test]
fn connections_dropped_emit_every_reason_label_at_zero() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
prom.render_body();
let body = &prom.body_buf;
for reason in DROP_REASON_LABELS {
let series = format!("varta_prom_connections_dropped_total{{reason=\"{reason}\"}} 0");
assert!(
body.contains(&series),
"missing zero-emission for reason={reason}:\n{body}"
);
}
}
#[test]
fn allow_ip_denies_after_burst_and_records_rate_limit() {
let mut prom = PromExporter::bind_with_rate_limit(
"127.0.0.1:0".parse().unwrap(),
make_token(),
1,
3,
)
.expect("bind");
let ip: std::net::IpAddr = "127.0.0.1".parse().unwrap();
let t0 = Instant::now();
for _ in 0..3 {
assert!(prom.allow_ip(ip, t0));
}
assert!(!prom.allow_ip(ip, t0));
let idx = drop_reason_index(DropReason::RateLimit);
assert_eq!(
prom.connections_dropped_total[idx], 1,
"rate_limit drop counter must increment on denial"
);
let t1 = t0 + Duration::from_secs(2);
assert!(prom.allow_ip(ip, t1));
}
#[test]
fn allow_ip_burst_zero_is_unlimited() {
let mut prom = PromExporter::bind_with_rate_limit(
"127.0.0.1:0".parse().unwrap(),
make_token(),
5,
0,
)
.expect("bind");
let ip: std::net::IpAddr = "127.0.0.1".parse().unwrap();
let t = Instant::now();
for _ in 0..1000 {
assert!(prom.allow_ip(ip, t));
}
assert!(
prom.ip_state.is_empty(),
"burst=0 path must not allocate per-IP state"
);
}
#[test]
fn allow_ip_table_full_force_evicts_and_records() {
let mut prom = PromExporter::bind_with_rate_limit(
"127.0.0.1:0".parse().unwrap(),
make_token(),
1000,
1000,
)
.expect("bind");
let t0 = Instant::now();
for i in 0..MAX_PROM_IP_STATES {
let ip = std::net::IpAddr::V4(std::net::Ipv4Addr::new(
10,
((i >> 16) & 0xff) as u8,
((i >> 8) & 0xff) as u8,
(i & 0xff) as u8,
));
assert!(prom.allow_ip(ip, t0));
}
assert_eq!(prom.ip_state.len(), MAX_PROM_IP_STATES);
let new_ip = std::net::IpAddr::V4(std::net::Ipv4Addr::new(11, 0, 0, 1));
assert!(prom.allow_ip(new_ip, t0));
assert_eq!(
prom.ip_state.len(),
MAX_PROM_IP_STATES,
"table size must remain capped"
);
let idx = drop_reason_index(DropReason::IpTableFull);
assert!(
prom.connections_dropped_total[idx] >= 1,
"ip_table_full drop counter must increment on force-eviction"
);
}
#[test]
fn recovery_refused_debounce_capacity_label_emitted_at_zero() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
prom.render_body();
let body = &prom.body_buf;
for reason in RECOVERY_REFUSED_REASON_LABELS.iter() {
let needle = format!("varta_recovery_refused_total{{reason=\"{reason}\"}} 0");
assert!(
body.contains(&needle),
"missing first-scrape zero line for reason {reason:?}; body:\n{body}"
);
}
assert!(
body.contains("varta_recovery_last_fired_evictions_total 0"),
"evictions counter missing zero line in first scrape"
);
assert!(
body.contains("varta_recovery_invariant_violations_total 0"),
"invariant-violations counter missing zero line in first scrape"
);
}
#[test]
fn recovery_refused_debounce_capacity_outcome_drives_counters() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
let outcome = crate::recovery::RecoveryOutcome::RefusedDebounceCapacity { pid: 42 };
prom.record_recovery_outcome(&outcome, None);
prom.render_body();
let body = &prom.body_buf;
assert!(
body.contains("varta_recovery_outcomes_total{outcome=\"refused_debounce_capacity\"} 1"),
"outcome counter must increment under refused_debounce_capacity; body:\n{body}"
);
assert!(
body.contains("varta_recovery_refused_total{reason=\"debounce_capacity\"} 1"),
"refused-reason counter must increment under debounce_capacity; body:\n{body}"
);
}
#[test]
fn stage_histogram_emits_all_labels_at_zero_on_first_scrape() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
prom.render_body();
let body = &prom.body_buf;
for stage_label in STAGE_LABELS.iter() {
let inf_key = format!(
"varta_observer_stage_seconds_bucket{{stage=\"{stage_label}\",le=\"+Inf\"}} 0"
);
assert!(
body.contains(&inf_key),
"stage={stage_label} +Inf bucket missing or non-zero at first scrape; body:\n{body}"
);
let count_key =
format!("varta_observer_stage_seconds_count{{stage=\"{stage_label}\"}} 0");
assert!(
body.contains(&count_key),
"stage={stage_label} _count missing at first scrape; body:\n{body}"
);
}
}
#[test]
fn stage_histogram_records_observation_in_correct_bucket() {
let mut prom =
PromExporter::bind("127.0.0.1:0".parse().unwrap(), make_token()).expect("bind");
prom.record_stage_duration(IterStage::Poll, Duration::from_millis(2));
prom.render_body();
let body = &prom.body_buf;
assert!(
body.contains("varta_observer_stage_seconds_bucket{stage=\"poll\",le=\"0.005\"} 1"),
"Poll 2 ms must land in le=0.005; body:\n{body}"
);
assert!(
body.contains("varta_observer_stage_seconds_count{stage=\"poll\"} 1"),
"Poll count must be 1; body:\n{body}"
);
assert!(
body.contains("varta_observer_stage_seconds_count{stage=\"drain_pending\"} 0"),
"drain_pending count must remain 0; body:\n{body}"
);
}
}
#[cfg(feature = "prometheus-exporter")]
mod bearer_token;
mod file;
#[cfg(feature = "prometheus-exporter")]
mod http;
#[cfg(feature = "prometheus-exporter")]
mod prometheus;