use std::collections::VecDeque;
use std::fmt;
use std::sync::Mutex;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use anyhow;
use thiserror::Error;
pub const SLO_LATENCY_P99_MS: f64 = 100.0;
pub const SLO_ERROR_RATE_PCT: f64 = 0.1;
pub const SLO_AVAILABILITY_PCT: f64 = 99.9;
#[derive(Debug, Clone, PartialEq)]
pub struct ServiceLevelIndicators {
pub latency_p99_ms: f64,
pub error_rate_pct: f64,
pub availability_pct: f64,
}
#[derive(Debug, Clone, PartialEq, Error)]
pub enum SloViolation {
#[error("p99 latency SLO breach: observed {observed_ms}ms > threshold {threshold_ms}ms")]
LatencyP99 { observed_ms: f64, threshold_ms: f64 },
#[error("error rate SLO breach: observed {observed_pct:.3}% > threshold {threshold_pct:.3}%")]
ErrorRate { observed_pct: f64, threshold_pct: f64 },
#[error("availability SLO breach: observed {observed_pct:.3}% < threshold {threshold_pct:.3}%")]
Availability { observed_pct: f64, threshold_pct: f64 },
}
struct Observation<T> {
timestamp: Instant,
value: T,
}
pub struct MetricsCollector {
inner: Mutex<CollectorState>,
window_duration: Duration,
}
struct CollectorState {
verified_total: VecDeque<Observation<()>>,
accepted_total: VecDeque<Observation<()>>,
stage_errors_total: VecDeque<Observation<()>>,
stage_latencies: VecDeque<Observation<Duration>>,
}
impl MetricsCollector {
pub fn new() -> Self {
Self::with_window(Duration::from_secs(300))
}
pub fn new_noop() -> Self {
Self::new()
}
pub fn with_window(window_duration: Duration) -> Self {
Self {
inner: Mutex::new(CollectorState {
verified_total: VecDeque::new(),
accepted_total: VecDeque::new(),
stage_errors_total: VecDeque::new(),
stage_latencies: VecDeque::new(),
}),
window_duration,
}
}
pub fn record_receipt_verified(&self, accepted: bool) {
let mut state = self.inner.lock().unwrap();
let now = Instant::now();
state.verified_total.push_back(Observation { timestamp: now, value: () });
if accepted {
state.accepted_total.push_back(Observation { timestamp: now, value: () });
}
self.purge_expired(&mut state, now);
}
pub fn record_stage_error(&self, _stage: &str) {
let mut state = self.inner.lock().unwrap();
let now = Instant::now();
state.stage_errors_total.push_back(Observation { timestamp: now, value: () });
self.purge_expired(&mut state, now);
}
pub fn record_stage_latency(&self, _stage: &str, duration: Duration) {
let mut state = self.inner.lock().unwrap();
let now = Instant::now();
state.stage_latencies.push_back(Observation { timestamp: now, value: duration });
self.purge_expired(&mut state, now);
}
pub fn compute_sli(&self) -> anyhow::Result<ServiceLevelIndicators> {
let mut state = self.inner.lock().unwrap();
self.purge_expired(&mut state, Instant::now());
let verified = state.verified_total.len() as f64;
let accepted = state.accepted_total.len() as f64;
let errors = state.stage_errors_total.len() as f64;
if verified == 0.0 {
return Ok(ServiceLevelIndicators {
latency_p99_ms: 0.0,
error_rate_pct: 0.0,
availability_pct: 100.0,
});
}
let mut latencies: Vec<f64> = state.stage_latencies.iter()
.map(|o| o.value.as_secs_f64() * 1000.0)
.collect();
let latency_p99_ms = if latencies.is_empty() {
0.0
} else {
latencies.sort_by(|a, b| a.partial_cmp(b).unwrap());
let index = (latencies.len() * 99 / 100).min(latencies.len() - 1);
latencies[index]
};
let error_rate_pct = (errors / verified) * 100.0;
let availability_pct = (accepted / verified) * 100.0;
Ok(ServiceLevelIndicators {
latency_p99_ms,
error_rate_pct,
availability_pct,
})
}
pub fn check_slo(&self) -> anyhow::Result<(), SloViolation> {
let sli = self.compute_sli().map_err(|_| {
SloViolation::LatencyP99 { observed_ms: f64::NAN, threshold_ms: SLO_LATENCY_P99_MS }
})?;
if sli.latency_p99_ms > SLO_LATENCY_P99_MS || sli.latency_p99_ms.is_nan() {
return Err(SloViolation::LatencyP99 {
observed_ms: sli.latency_p99_ms,
threshold_ms: SLO_LATENCY_P99_MS,
});
}
if sli.error_rate_pct > SLO_ERROR_RATE_PCT {
return Err(SloViolation::ErrorRate {
observed_pct: sli.error_rate_pct,
threshold_pct: SLO_ERROR_RATE_PCT,
});
}
if sli.availability_pct < SLO_AVAILABILITY_PCT {
return Err(SloViolation::Availability {
observed_pct: sli.availability_pct,
threshold_pct: SLO_AVAILABILITY_PCT,
});
}
Ok(())
}
fn purge_expired(&self, state: &mut CollectorState, now: Instant) {
let window = self.window_duration;
let is_expired = |ts: Instant| now.duration_since(ts) > window;
while state.verified_total.front().map_or(false, |o| is_expired(o.timestamp)) {
state.verified_total.pop_front();
}
while state.accepted_total.front().map_or(false, |o| is_expired(o.timestamp)) {
state.accepted_total.pop_front();
}
while state.stage_errors_total.front().map_or(false, |o| is_expired(o.timestamp)) {
state.stage_errors_total.pop_front();
}
while state.stage_latencies.front().map_or(false, |o| is_expired(o.timestamp)) {
state.stage_latencies.pop_front();
}
}
}
pub struct PrometheusExporter<'a> {
collector: &'a MetricsCollector,
}
impl<'a> PrometheusExporter<'a> {
pub fn new(collector: &'a MetricsCollector) -> Self {
Self { collector }
}
pub fn render(&self) -> String {
let sli = self.collector.compute_sli().unwrap_or(ServiceLevelIndicators {
latency_p99_ms: 0.0,
error_rate_pct: 0.0,
availability_pct: 100.0,
});
let mut out = String::new();
let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
let state = self.collector.inner.lock().unwrap();
out.push_str("# HELP affidavit_receipts_verified_total Total count of receipts processed by the verifier.\n");
out.push_str("# TYPE affidavit_receipts_verified_total counter\n");
out.push_str(&format!("affidavit_receipts_verified_total {} {}\n", state.verified_total.len(), timestamp_ms));
out.push_str("affidavit_receipts_verified_total{verdict=\"accept\"} ");
out.push_str(&format!("{} {}\n", state.accepted_total.len(), timestamp_ms));
out.push_str("# HELP affidavit_stage_errors_total Total count of pipeline stage failures.\n");
out.push_str("# TYPE affidavit_stage_errors_total counter\n");
out.push_str(&format!("affidavit_stage_errors_total {} {}\n", state.stage_errors_total.len(), timestamp_ms));
out.push_str("# HELP affidavit_slo_p99_latency_ms Current p99 stage latency SLI.\n");
out.push_str("# TYPE affidavit_slo_p99_latency_ms gauge\n");
out.push_str(&format!("affidavit_slo_p99_latency_ms {} {}\n", sli.latency_p99_ms, timestamp_ms));
out.push_str("# HELP affidavit_slo_error_rate_pct Current error rate SLI.\n");
out.push_str("# TYPE affidavit_slo_error_rate_pct gauge\n");
out.push_str(&format!("affidavit_slo_error_rate_pct {} {}\n", sli.error_rate_pct, timestamp_ms));
out.push_str("# HELP affidavit_slo_availability_pct Current availability SLI.\n");
out.push_str("# TYPE affidavit_slo_availability_pct gauge\n");
out.push_str(&format!("affidavit_slo_availability_pct {} {}\n", sli.availability_pct, timestamp_ms));
out
}
}
impl fmt::Display for PrometheusExporter<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.render())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zero_state() {
let collector = MetricsCollector::new();
let sli = collector.compute_sli().unwrap();
assert_eq!(sli.latency_p99_ms, 0.0);
assert_eq!(sli.error_rate_pct, 0.0);
assert_eq!(sli.availability_pct, 100.0);
assert!(collector.check_slo().is_ok());
}
#[test]
fn test_p99_computation() {
let collector = MetricsCollector::new();
for _ in 0..99 {
collector.record_stage_latency("test", Duration::from_millis(10));
}
collector.record_stage_latency("test", Duration::from_millis(500));
collector.record_receipt_verified(true);
let sli = collector.compute_sli().unwrap();
assert!(sli.latency_p99_ms >= 400.0 && sli.latency_p99_ms <= 600.0,
"p99 should be near 500ms, got {}", sli.latency_p99_ms);
}
#[test]
fn test_error_rate_breach() {
let collector = MetricsCollector::new();
for _ in 0..1000 {
collector.record_receipt_verified(true);
}
collector.record_stage_error("test");
collector.record_stage_error("test");
let sli = collector.compute_sli().unwrap();
assert_eq!(sli.error_rate_pct, 0.2);
let result = collector.check_slo();
assert!(matches!(result, Err(SloViolation::ErrorRate { .. })));
}
#[test]
fn test_availability_breach() {
let collector = MetricsCollector::new();
for _ in 0..998 {
collector.record_receipt_verified(true);
}
collector.record_receipt_verified(false);
collector.record_receipt_verified(false);
let sli = collector.compute_sli().unwrap();
assert_eq!(sli.availability_pct, 99.8);
let result = collector.check_slo();
assert!(matches!(result, Err(SloViolation::Availability { .. })));
}
#[test]
fn test_prometheus_output() {
let collector = MetricsCollector::new();
collector.record_receipt_verified(true);
collector.record_stage_latency("decode", Duration::from_millis(5));
let exporter = PrometheusExporter::new(&collector);
let output = exporter.render();
assert!(output.contains("affidavit_receipts_verified_total 1"));
assert!(output.contains("affidavit_slo_p99_latency_ms 5"));
assert!(output.contains("TYPE affidavit_stage_errors_total counter"));
}
}