use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, Copy, Default)]
pub struct ConnectionPoolStats {
pub active_connections: u32,
pub idle_connections: u32,
pub total_connections_created: u64,
pub connections_closed: u64,
}
pub trait Metrics: Send + Sync + 'static {
fn on_request(&self, _method: &str) {}
fn on_response(&self, _method: &str) {}
fn on_error(&self, _method: &str, _error: &str) {}
fn on_latency(&self, _method: &str, _duration: Duration) {}
fn on_queue_depth_change(&self, _active_queues: usize) {}
fn on_connection_pool_stats(&self, _stats: &ConnectionPoolStats) {}
}
#[derive(Debug, Default)]
pub struct NoopMetrics;
impl Metrics for NoopMetrics {}
impl<T: Metrics + ?Sized> Metrics for Arc<T> {
fn on_request(&self, method: &str) {
(**self).on_request(method);
}
fn on_response(&self, method: &str) {
(**self).on_response(method);
}
fn on_error(&self, method: &str, error: &str) {
(**self).on_error(method, error);
}
fn on_latency(&self, method: &str, duration: Duration) {
(**self).on_latency(method, duration);
}
fn on_queue_depth_change(&self, active_queues: usize) {
(**self).on_queue_depth_change(active_queues);
}
fn on_connection_pool_stats(&self, stats: &ConnectionPoolStats) {
(**self).on_connection_pool_stats(stats);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
struct RecordingMetrics {
requests: AtomicU64,
responses: AtomicU64,
errors: AtomicU64,
latencies: AtomicU64,
queue_depths: AtomicU64,
pool_stats: AtomicU64,
}
impl RecordingMetrics {
fn new() -> Self {
Self {
requests: AtomicU64::new(0),
responses: AtomicU64::new(0),
errors: AtomicU64::new(0),
latencies: AtomicU64::new(0),
queue_depths: AtomicU64::new(0),
pool_stats: AtomicU64::new(0),
}
}
}
impl Metrics for RecordingMetrics {
fn on_request(&self, _method: &str) {
self.requests.fetch_add(1, Ordering::Relaxed);
}
fn on_response(&self, _method: &str) {
self.responses.fetch_add(1, Ordering::Relaxed);
}
fn on_error(&self, _method: &str, _error: &str) {
self.errors.fetch_add(1, Ordering::Relaxed);
}
fn on_latency(&self, _method: &str, _duration: Duration) {
self.latencies.fetch_add(1, Ordering::Relaxed);
}
fn on_queue_depth_change(&self, _active_queues: usize) {
self.queue_depths.fetch_add(1, Ordering::Relaxed);
}
fn on_connection_pool_stats(&self, _stats: &ConnectionPoolStats) {
self.pool_stats.fetch_add(1, Ordering::Relaxed);
}
}
#[test]
fn arc_delegates_on_request() {
let inner = Arc::new(RecordingMetrics::new());
let arc_metrics: Arc<RecordingMetrics> = Arc::clone(&inner);
arc_metrics.on_request("test");
assert_eq!(inner.requests.load(Ordering::Relaxed), 1);
}
#[test]
fn arc_delegates_on_response() {
let inner = Arc::new(RecordingMetrics::new());
let arc_metrics: Arc<RecordingMetrics> = Arc::clone(&inner);
arc_metrics.on_response("test");
assert_eq!(inner.responses.load(Ordering::Relaxed), 1);
}
#[test]
fn arc_delegates_on_error() {
let inner = Arc::new(RecordingMetrics::new());
let arc_metrics: Arc<RecordingMetrics> = Arc::clone(&inner);
arc_metrics.on_error("test", "err");
assert_eq!(inner.errors.load(Ordering::Relaxed), 1);
}
#[test]
fn arc_delegates_on_latency() {
let inner = Arc::new(RecordingMetrics::new());
let arc_metrics: Arc<RecordingMetrics> = Arc::clone(&inner);
arc_metrics.on_latency("test", Duration::from_millis(10));
assert_eq!(inner.latencies.load(Ordering::Relaxed), 1);
}
#[test]
fn arc_delegates_on_queue_depth_change() {
let inner = Arc::new(RecordingMetrics::new());
let arc_metrics: Arc<RecordingMetrics> = Arc::clone(&inner);
arc_metrics.on_queue_depth_change(5);
assert_eq!(inner.queue_depths.load(Ordering::Relaxed), 1);
}
#[test]
fn arc_delegates_on_connection_pool_stats() {
let inner = Arc::new(RecordingMetrics::new());
let arc_metrics: Arc<RecordingMetrics> = Arc::clone(&inner);
arc_metrics.on_connection_pool_stats(&ConnectionPoolStats::default());
assert_eq!(inner.pool_stats.load(Ordering::Relaxed), 1);
}
}