use std::collections::HashMap;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
const MAX_KEY_LENGTH: usize = 256;
const MAX_DENIED_KEYS_LIMIT: usize = 10_000;
pub(crate) struct TopDeniedKeys {
counts: HashMap<String, u64>,
max_size: usize,
}
impl TopDeniedKeys {
fn new(max_size: usize) -> Self {
Self {
counts: HashMap::with_capacity(max_size * 2),
max_size,
}
}
fn update(&mut self, key: String) {
if key.len() > MAX_KEY_LENGTH {
return;
}
*self.counts.entry(key).or_insert(0) += 1;
if self.counts.len() > self.max_size * 3 {
self.cleanup();
}
}
fn cleanup(&mut self) {
if self.counts.len() <= self.max_size {
return;
}
let mut entries: Vec<_> = self.counts.drain().collect();
entries.sort_by_key(|e| std::cmp::Reverse(e.1));
entries.truncate(self.max_size);
self.counts = entries.into_iter().collect();
}
fn get_top(&self) -> Vec<(String, u64)> {
let mut entries: Vec<_> = self.counts.iter().map(|(k, v)| (k.clone(), *v)).collect();
entries.sort_by_key(|e| std::cmp::Reverse(e.1));
entries.truncate(self.max_size);
entries
}
}
pub struct Metrics {
start_time: Instant,
pub total_requests: AtomicU64,
pub http_requests: AtomicU64,
pub grpc_requests: AtomicU64,
pub redis_requests: AtomicU64,
pub requests_allowed: AtomicU64,
pub requests_denied: AtomicU64,
pub requests_errors: AtomicU64,
pub(crate) top_denied_keys: Option<Mutex<TopDeniedKeys>>,
}
pub struct MetricsBuilder {
max_denied_keys: usize,
}
impl MetricsBuilder {
pub fn new() -> Self {
Self {
max_denied_keys: 100,
}
}
pub fn max_denied_keys(mut self, count: usize) -> Self {
self.max_denied_keys = count.clamp(0, MAX_DENIED_KEYS_LIMIT);
self
}
pub fn build(self) -> Metrics {
Metrics {
start_time: Instant::now(),
total_requests: AtomicU64::new(0),
http_requests: AtomicU64::new(0),
grpc_requests: AtomicU64::new(0),
redis_requests: AtomicU64::new(0),
requests_allowed: AtomicU64::new(0),
requests_denied: AtomicU64::new(0),
requests_errors: AtomicU64::new(0),
top_denied_keys: if self.max_denied_keys == 0 {
None
} else {
Some(Mutex::new(TopDeniedKeys::new(self.max_denied_keys)))
},
}
}
}
impl Default for MetricsBuilder {
fn default() -> Self {
Self::new()
}
}
impl Metrics {
pub fn builder() -> MetricsBuilder {
MetricsBuilder::new()
}
pub fn new() -> Self {
MetricsBuilder::new().build()
}
pub fn record_request_with_key(&self, transport: Transport, allowed: bool, key: &str) {
self.record_request(transport, allowed);
if !allowed
&& let Some(ref top_denied_keys) = self.top_denied_keys
&& let Ok(mut top_keys) = top_denied_keys.lock()
{
top_keys.update(key.to_string());
}
}
pub fn record_request(&self, transport: Transport, allowed: bool) {
self.total_requests.fetch_add(1, Ordering::Relaxed);
match transport {
Transport::Http => self.http_requests.fetch_add(1, Ordering::Relaxed),
Transport::Grpc => self.grpc_requests.fetch_add(1, Ordering::Relaxed),
Transport::Redis => self.redis_requests.fetch_add(1, Ordering::Relaxed),
};
if allowed {
self.requests_allowed.fetch_add(1, Ordering::Relaxed);
} else {
self.requests_denied.fetch_add(1, Ordering::Relaxed);
}
}
pub fn record_error(&self, transport: Transport) {
self.total_requests.fetch_add(1, Ordering::Relaxed);
self.requests_errors.fetch_add(1, Ordering::Relaxed);
match transport {
Transport::Http => self.http_requests.fetch_add(1, Ordering::Relaxed),
Transport::Grpc => self.grpc_requests.fetch_add(1, Ordering::Relaxed),
Transport::Redis => self.redis_requests.fetch_add(1, Ordering::Relaxed),
};
}
pub fn uptime_seconds(&self) -> u64 {
self.start_time.elapsed().as_secs()
}
fn escape_prometheus_label(s: &str) -> String {
let mut result = String::with_capacity(s.len() * 2);
for ch in s.chars() {
match ch {
'"' => result.push_str("\\\""),
'\\' => result.push_str("\\\\"),
'\n' => result.push_str("\\n"),
'\r' => result.push_str("\\r"),
'\t' => result.push_str("\\t"),
c if c.is_control() => {
result.push_str(&format!("\\x{:02x}", c as u8));
}
c => result.push(c),
}
}
result
}
pub fn export_prometheus(&self) -> String {
let mut output = String::with_capacity(500);
output.push_str("# HELP throttlecrab_uptime_seconds Time since server start in seconds\n");
output.push_str("# TYPE throttlecrab_uptime_seconds gauge\n");
output.push_str(&format!(
"throttlecrab_uptime_seconds {}\n\n",
self.uptime_seconds()
));
output.push_str("# HELP throttlecrab_requests_total Total number of requests processed\n");
output.push_str("# TYPE throttlecrab_requests_total counter\n");
output.push_str(&format!(
"throttlecrab_requests_total {}\n\n",
self.total_requests.load(Ordering::Relaxed)
));
output.push_str(
"# HELP throttlecrab_requests_by_transport Total requests by transport type\n",
);
output.push_str("# TYPE throttlecrab_requests_by_transport counter\n");
output.push_str(&format!(
"throttlecrab_requests_by_transport{{transport=\"http\"}} {}\n",
self.http_requests.load(Ordering::Relaxed)
));
output.push_str(&format!(
"throttlecrab_requests_by_transport{{transport=\"grpc\"}} {}\n",
self.grpc_requests.load(Ordering::Relaxed)
));
output.push_str(&format!(
"throttlecrab_requests_by_transport{{transport=\"redis\"}} {}\n\n",
self.redis_requests.load(Ordering::Relaxed)
));
output.push_str("# HELP throttlecrab_requests_allowed Total requests allowed\n");
output.push_str("# TYPE throttlecrab_requests_allowed counter\n");
output.push_str(&format!(
"throttlecrab_requests_allowed {}\n\n",
self.requests_allowed.load(Ordering::Relaxed)
));
output.push_str("# HELP throttlecrab_requests_denied Total requests denied\n");
output.push_str("# TYPE throttlecrab_requests_denied counter\n");
output.push_str(&format!(
"throttlecrab_requests_denied {}\n\n",
self.requests_denied.load(Ordering::Relaxed)
));
output.push_str("# HELP throttlecrab_requests_errors Total internal errors\n");
output.push_str("# TYPE throttlecrab_requests_errors counter\n");
output.push_str(&format!(
"throttlecrab_requests_errors {}\n\n",
self.requests_errors.load(Ordering::Relaxed)
));
if let Some(ref top_denied_keys) = self.top_denied_keys {
output.push_str("# HELP throttlecrab_top_denied_keys Top keys by denial count\n");
output.push_str("# TYPE throttlecrab_top_denied_keys gauge\n");
if let Ok(top_keys) = top_denied_keys.lock() {
for (rank, (key, count)) in top_keys.get_top().iter().enumerate() {
output.push_str(&format!(
"throttlecrab_top_denied_keys{{key=\"{}\",rank=\"{}\"}} {}\n",
Self::escape_prometheus_label(key),
rank + 1,
count
));
}
}
}
output
}
}
#[derive(Debug, Clone, Copy)]
pub enum Transport {
Http,
Grpc,
Redis,
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering;
#[test]
fn test_metrics_creation() {
let metrics = Metrics::new();
assert_eq!(metrics.total_requests.load(Ordering::Relaxed), 0);
assert_eq!(metrics.requests_allowed.load(Ordering::Relaxed), 0);
assert_eq!(metrics.requests_denied.load(Ordering::Relaxed), 0);
assert_eq!(metrics.requests_errors.load(Ordering::Relaxed), 0);
}
#[test]
fn test_record_request() {
let metrics = Metrics::new();
metrics.record_request(Transport::Http, true);
assert_eq!(metrics.total_requests.load(Ordering::Relaxed), 1);
assert_eq!(metrics.http_requests.load(Ordering::Relaxed), 1);
assert_eq!(metrics.requests_allowed.load(Ordering::Relaxed), 1);
assert_eq!(metrics.requests_denied.load(Ordering::Relaxed), 0);
metrics.record_request(Transport::Grpc, false);
assert_eq!(metrics.total_requests.load(Ordering::Relaxed), 2);
assert_eq!(metrics.grpc_requests.load(Ordering::Relaxed), 1);
assert_eq!(metrics.http_requests.load(Ordering::Relaxed), 1);
assert_eq!(metrics.requests_allowed.load(Ordering::Relaxed), 1);
assert_eq!(metrics.requests_denied.load(Ordering::Relaxed), 1);
}
#[test]
fn test_prometheus_export() {
let metrics = Metrics::new();
metrics.record_request(Transport::Http, true);
metrics.record_request(Transport::Grpc, false);
let output = metrics.export_prometheus();
assert!(output.contains("throttlecrab_uptime_seconds"));
assert!(output.contains("throttlecrab_requests_total 2"));
assert!(output.contains("throttlecrab_requests_allowed 1"));
assert!(output.contains("throttlecrab_requests_denied 1"));
assert!(output.contains("throttlecrab_requests_by_transport{transport=\"http\"} 1"));
assert!(output.contains("throttlecrab_requests_by_transport{transport=\"grpc\"} 1"));
}
#[test]
fn test_counter_consistency() {
let metrics = Metrics::new();
metrics.record_request(Transport::Http, true); metrics.record_request(Transport::Http, false); metrics.record_request(Transport::Grpc, true); metrics.record_request(Transport::Grpc, false); metrics.record_error(Transport::Http);
assert_eq!(metrics.total_requests.load(Ordering::Relaxed), 5);
let transport_sum = metrics.http_requests.load(Ordering::Relaxed)
+ metrics.grpc_requests.load(Ordering::Relaxed);
assert_eq!(transport_sum, 5);
let decision_sum = metrics.requests_allowed.load(Ordering::Relaxed)
+ metrics.requests_denied.load(Ordering::Relaxed)
+ metrics.requests_errors.load(Ordering::Relaxed);
assert_eq!(decision_sum, 5);
assert_eq!(metrics.requests_allowed.load(Ordering::Relaxed), 2);
assert_eq!(metrics.requests_denied.load(Ordering::Relaxed), 2);
assert_eq!(metrics.requests_errors.load(Ordering::Relaxed), 1);
}
}