use super::{MetricStats, MetricType, MetricValue, TimeResolution};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use crate::{read_lock_safe, write_lock_safe};
#[derive(Debug)]
pub struct Metric {
pub name: String,
pub metric_type: MetricType,
values: RwLock<VecDeque<MetricValue>>,
max_values: usize,
current: AtomicU64,
labels: HashMap<String, String>,
}
impl Metric {
pub fn new(name: impl Into<String>, metric_type: MetricType) -> Self {
Metric {
name: name.into(),
metric_type,
values: RwLock::new(VecDeque::with_capacity(1000)),
max_values: 10000,
current: AtomicU64::new(0),
labels: HashMap::new(),
}
}
pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
self.labels = labels;
self
}
pub fn record(&self, value: f64) {
self.record_with_labels(value, HashMap::new());
}
pub fn record_with_labels(&self, value: f64, labels: HashMap<String, String>) {
let metric_value = MetricValue {
timestamp: Instant::now(),
value,
labels,
};
if let Ok(mut values) = self.values.write() {
values.push_back(metric_value);
while values.len() > self.max_values {
values.pop_front();
}
}
match self.metric_type {
MetricType::Counter => {
self.current.fetch_add(value as u64, Ordering::Relaxed);
}
MetricType::Gauge => {
self.current.store(value.to_bits(), Ordering::Relaxed);
}
_ => {}
}
}
pub fn increment(&self) {
self.increment_by(1);
}
pub fn increment_by(&self, amount: u64) {
self.record(amount as f64);
}
pub fn set(&self, value: f64) {
self.current.store(value.to_bits(), Ordering::Relaxed);
self.record(value);
}
pub fn current(&self) -> f64 {
match self.metric_type {
MetricType::Gauge => f64::from_bits(self.current.load(Ordering::Relaxed)),
_ => self.current.load(Ordering::Relaxed) as f64,
}
}
pub fn values_in_window(&self, window: Duration) -> Vec<f64> {
let cutoff = Instant::now() - window;
self.values
.read()
.map(|values| {
values
.iter()
.filter(|v| v.timestamp >= cutoff)
.map(|v| v.value)
.collect()
})
.unwrap_or_default()
}
pub fn stats_in_window(&self, window: Duration) -> MetricStats {
let values = self.values_in_window(window);
MetricStats::from_values(&values)
}
pub fn stats(&self) -> MetricStats {
self.values
.read()
.map(|values| {
let vals: Vec<f64> = values.iter().map(|v| v.value).collect();
MetricStats::from_values(&vals)
})
.unwrap_or_default()
}
pub fn clear(&self) {
if let Ok(mut values) = self.values.write() {
values.clear();
}
self.current.store(0, Ordering::Relaxed);
}
pub fn count(&self) -> usize {
self.values.read().map(|v| v.len()).unwrap_or(0)
}
}
#[derive(Debug)]
pub struct MetricsCollector {
metrics: RwLock<HashMap<String, Arc<Metric>>>,
global_labels: HashMap<String, String>,
enabled: bool,
start_time: Instant,
}
impl MetricsCollector {
pub fn new() -> Self {
MetricsCollector {
metrics: RwLock::new(HashMap::new()),
global_labels: HashMap::new(),
enabled: true,
start_time: Instant::now(),
}
}
pub fn with_global_labels(mut self, labels: HashMap<String, String>) -> Self {
self.global_labels = labels;
self
}
pub fn set_enabled(&mut self, enabled: bool) {
self.enabled = enabled;
}
pub fn register(&self, name: &str, metric_type: MetricType) -> Arc<Metric> {
let metric = Arc::new(Metric::new(name, metric_type));
if let Ok(mut metrics) = self.metrics.write() {
metrics.insert(name.to_string(), Arc::clone(&metric));
}
metric
}
pub fn counter(&self, name: &str) -> Arc<Metric> {
self.get_or_create(name, MetricType::Counter)
}
pub fn gauge(&self, name: &str) -> Arc<Metric> {
self.get_or_create(name, MetricType::Gauge)
}
pub fn histogram(&self, name: &str) -> Arc<Metric> {
self.get_or_create(name, MetricType::Histogram)
}
pub fn timer(&self, name: &str) -> Arc<Metric> {
self.get_or_create(name, MetricType::Timer)
}
fn get_or_create(&self, name: &str, metric_type: MetricType) -> Arc<Metric> {
if let Ok(metrics) = self.metrics.read() {
if let Some(metric) = metrics.get(name) {
return Arc::clone(metric);
}
}
self.register(name, metric_type)
}
pub fn get(&self, name: &str) -> Option<Arc<Metric>> {
self.metrics.read().ok().and_then(|m| m.get(name).cloned())
}
pub fn record(&self, name: &str, value: f64) {
if !self.enabled {
return;
}
if let Some(metric) = self.get(name) {
metric.record(value);
}
}
pub fn increment(&self, name: &str) {
self.increment_by(name, 1);
}
pub fn increment_by(&self, name: &str, amount: u64) {
if !self.enabled {
return;
}
let metric = self.counter(name);
metric.increment_by(amount);
}
pub fn set_gauge(&self, name: &str, value: f64) {
if !self.enabled {
return;
}
let metric = self.gauge(name);
metric.set(value);
}
pub fn record_duration(&self, name: &str, duration: Duration) {
if !self.enabled {
return;
}
let metric = self.timer(name);
metric.record(duration.as_micros() as f64);
}
pub fn time<F, R>(&self, name: &str, f: F) -> R
where
F: FnOnce() -> R,
{
let start = Instant::now();
let result = f();
self.record_duration(name, start.elapsed());
result
}
pub fn metric_names(&self) -> Vec<String> {
self.metrics
.read()
.map(|m| m.keys().cloned().collect())
.unwrap_or_default()
}
pub fn snapshot(&self) -> HashMap<String, MetricStats> {
self.metrics
.read()
.map(|metrics| {
metrics
.iter()
.map(|(name, metric)| (name.clone(), metric.stats()))
.collect()
})
.unwrap_or_default()
}
pub fn uptime(&self) -> Duration {
self.start_time.elapsed()
}
pub fn clear(&self) {
if let Ok(metrics) = self.metrics.read() {
for metric in metrics.values() {
metric.clear();
}
}
}
pub fn remove(&self, name: &str) -> Option<Arc<Metric>> {
self.metrics.write().ok().and_then(|mut m| m.remove(name))
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
pub struct ScopedTimer<'a> {
metric: &'a Metric,
start: Instant,
}
impl<'a> ScopedTimer<'a> {
pub fn new(metric: &'a Metric) -> Self {
ScopedTimer {
metric,
start: Instant::now(),
}
}
}
impl<'a> Drop for ScopedTimer<'a> {
fn drop(&mut self) {
let duration = self.start.elapsed();
self.metric.record(duration.as_micros() as f64);
}
}
#[derive(Debug)]
pub struct RateCalculator {
prev_count: AtomicU64,
prev_time: RwLock<Instant>,
smoothing: f64,
current_rate: RwLock<f64>,
}
impl RateCalculator {
pub fn new(smoothing: f64) -> Self {
RateCalculator {
prev_count: AtomicU64::new(0),
prev_time: RwLock::new(Instant::now()),
smoothing: smoothing.clamp(0.0, 1.0),
current_rate: RwLock::new(0.0),
}
}
pub fn update(&self, count: u64) -> f64 {
let now = Instant::now();
let (prev_count, elapsed) = {
let prev_time =
match read_lock_safe!(self.prev_time, "analytics metrics prev time read") {
Ok(pt) => *pt,
Err(_) => return 0.0,
};
let elapsed = now.duration_since(prev_time);
(self.prev_count.load(Ordering::Relaxed), elapsed)
};
if elapsed.as_secs_f64() > 0.0 {
let delta = count.saturating_sub(prev_count) as f64;
let instant_rate = delta / elapsed.as_secs_f64();
if let Ok(mut current) =
write_lock_safe!(self.current_rate, "analytics metrics current rate write")
{
*current = self.smoothing * instant_rate + (1.0 - self.smoothing) * *current;
self.prev_count.store(count, Ordering::Relaxed);
if let Ok(mut prev_time) = self.prev_time.write() {
*prev_time = now;
}
*current
} else {
0.0
}
} else {
read_lock_safe!(self.current_rate, "analytics metrics current rate read")
.map(|r| *r)
.unwrap_or(0.0)
}
}
pub fn rate(&self) -> f64 {
read_lock_safe!(self.current_rate, "analytics metrics current rate read")
.map(|r| *r)
.unwrap_or(0.0)
}
pub fn reset(&self) {
self.prev_count.store(0, Ordering::Relaxed);
if let Ok(mut prev_time) = self.prev_time.write() {
*prev_time = Instant::now();
}
if let Ok(mut rate) = self.current_rate.write() {
*rate = 0.0;
}
}
}
impl Default for RateCalculator {
fn default() -> Self {
Self::new(0.3) }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_metric_counter() {
let metric = Metric::new("test_counter", MetricType::Counter);
metric.increment();
metric.increment();
metric.increment();
assert_eq!(metric.current(), 3.0);
assert_eq!(metric.count(), 3);
}
#[test]
fn test_metric_gauge() {
let metric = Metric::new("test_gauge", MetricType::Gauge);
metric.set(42.0);
assert_eq!(metric.current(), 42.0);
metric.set(100.0);
assert_eq!(metric.current(), 100.0);
}
#[test]
fn test_metric_stats() {
let metric = Metric::new("test_histogram", MetricType::Histogram);
for i in 1..=10 {
metric.record(i as f64);
}
let stats = metric.stats();
assert_eq!(stats.count, 10);
assert_eq!(stats.min, 1.0);
assert_eq!(stats.max, 10.0);
}
#[test]
fn test_metrics_collector() {
let collector = MetricsCollector::new();
collector.increment("requests");
collector.increment("requests");
collector.set_gauge("memory", 1024.0);
let requests = collector.get("requests").expect("operation should succeed");
assert_eq!(requests.current(), 2.0);
let memory = collector.get("memory").expect("operation should succeed");
assert_eq!(memory.current(), 1024.0);
}
#[test]
fn test_collector_time() {
let collector = MetricsCollector::new();
let result = collector.time("operation", || {
std::thread::sleep(Duration::from_millis(10));
42
});
assert_eq!(result, 42);
let timer = collector
.get("operation")
.expect("operation should succeed");
let stats = timer.stats();
assert!(stats.mean >= 10000.0); }
#[test]
fn test_rate_calculator() {
let calc = RateCalculator::new(1.0);
assert_eq!(calc.rate(), 0.0);
std::thread::sleep(Duration::from_millis(100));
let rate = calc.update(100);
assert!(rate > 0.0);
}
#[test]
fn test_metric_window() {
let metric = Metric::new("windowed", MetricType::Histogram);
for i in 0..100 {
metric.record(i as f64);
}
let values = metric.values_in_window(Duration::from_secs(60));
assert_eq!(values.len(), 100);
let stats = metric.stats_in_window(Duration::from_secs(60));
assert_eq!(stats.count, 100);
}
#[test]
fn test_collector_snapshot() {
let collector = MetricsCollector::new();
collector.increment("counter1");
collector.increment("counter2");
collector.set_gauge("gauge1", 50.0);
let snapshot = collector.snapshot();
assert_eq!(snapshot.len(), 3);
}
}