use crate::yahoo_error::YahooError;
use std::collections::HashMap;
use std::sync::Arc;
#[allow(unused_imports)] use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
#[cfg(feature = "metrics")]
use metrics::{
counter, describe_counter, describe_gauge, describe_histogram,
gauge, histogram, Unit
};
#[cfg(feature = "metrics")]
use metrics_exporter_prometheus::PrometheusBuilder;
#[derive(Debug)]
pub struct PrometheusMetrics {
#[allow(dead_code)]
start_time: Instant,
#[allow(dead_code)]
request_durations: Arc<RwLock<Vec<Duration>>>,
#[allow(dead_code)]
error_counts: Arc<RwLock<HashMap<String, AtomicU64>>>,
#[allow(dead_code)]
symbol_counts: Arc<RwLock<HashMap<String, AtomicU64>>>,
}
impl Default for PrometheusMetrics {
fn default() -> Self {
Self::new()
}
}
impl PrometheusMetrics {
pub fn new() -> Self {
Self {
start_time: Instant::now(),
request_durations: Arc::new(RwLock::new(Vec::new())),
error_counts: Arc::new(RwLock::new(HashMap::new())),
symbol_counts: Arc::new(RwLock::new(HashMap::new())),
}
}
#[cfg(feature = "metrics")]
pub async fn start_prometheus_server(port: u16) -> Result<(), YahooError> {
let addr: std::net::SocketAddr = format!("0.0.0.0:{}", port).parse().map_err(|e| {
YahooError::InvalidStatusCode(format!("Invalid port {}: {}", port, e))
})?;
let builder = PrometheusBuilder::new().with_http_listener(addr);
builder.install().map_err(|e| {
YahooError::InvalidStatusCode(format!("Failed to start Prometheus server: {}", e))
})?;
Self::register_metrics();
println!("📊 Prometheus metrics server started on http://0.0.0.0:{}/metrics", port);
Ok(())
}
#[cfg(feature = "metrics")]
fn register_metrics() {
describe_counter!("eeyf_requests_total", "Total number of requests made to Yahoo Finance API");
describe_counter!("eeyf_requests_success_total", "Number of successful requests");
describe_counter!("eeyf_requests_error_total", "Number of failed requests");
describe_histogram!("eeyf_request_duration_seconds", Unit::Seconds, "Request duration in seconds");
describe_counter!("eeyf_symbol_requests_total", "Requests per symbol");
describe_counter!("eeyf_rate_limit_hits_total", "Number of times rate limit was hit");
describe_histogram!("eeyf_rate_limit_wait_seconds", Unit::Seconds, "Time spent waiting for rate limit");
describe_gauge!("eeyf_rate_limit_tokens", "Current available rate limit tokens");
describe_counter!("eeyf_circuit_breaker_opens_total", "Circuit breaker open events");
describe_counter!("eeyf_circuit_breaker_closes_total", "Circuit breaker close events");
describe_counter!("eeyf_circuit_breaker_half_opens_total", "Circuit breaker half-open events");
describe_gauge!("eeyf_circuit_breaker_state", "Circuit breaker state (0=closed, 1=open, 2=half-open)");
describe_counter!("eeyf_requests_rejected_total", "Requests rejected by circuit breaker");
describe_counter!("eeyf_cache_hits_total", "Cache hit count");
describe_counter!("eeyf_cache_misses_total", "Cache miss count");
describe_counter!("eeyf_cache_evictions_total", "Cache eviction count");
describe_gauge!("eeyf_cache_size", "Current cache size");
describe_histogram!("eeyf_cache_lookup_duration_seconds", Unit::Seconds, "Cache lookup duration");
describe_counter!("eeyf_retry_attempts_total", "Total retry attempts");
describe_counter!("eeyf_retry_success_total", "Successful retries");
describe_counter!("eeyf_retry_exhausted_total", "Retries that exhausted all attempts");
describe_histogram!("eeyf_retry_delay_seconds", Unit::Seconds, "Retry delay duration");
describe_gauge!("eeyf_connection_pool_size", "Connection pool size");
describe_gauge!("eeyf_active_connections", "Active connections");
describe_counter!("eeyf_connection_timeouts_total", "Connection timeout count");
describe_histogram!("eeyf_connection_acquire_duration_seconds", Unit::Seconds, "Connection acquire duration");
describe_counter!("eeyf_errors_by_type_total", "Errors categorized by type");
describe_gauge!("eeyf_uptime_seconds", "Library uptime in seconds");
describe_gauge!("eeyf_success_rate", "Success rate (0-1)");
describe_gauge!("eeyf_error_rate", "Error rate (0-1)");
describe_gauge!("eeyf_throughput_rps", "Throughput in requests per second");
}
#[cfg(feature = "metrics")]
pub async fn record_request_success(&self, symbol: &str, duration: Duration, endpoint: &str) {
counter!("eeyf_requests_total", "symbol" => symbol.to_string(), "endpoint" => endpoint.to_string(), "status" => "success").increment(1);
counter!("eeyf_requests_success_total", "symbol" => symbol.to_string(), "endpoint" => endpoint.to_string()).increment(1);
histogram!("eeyf_request_duration_seconds", "symbol" => symbol.to_string(), "endpoint" => endpoint.to_string()).record(duration.as_secs_f64());
counter!("eeyf_symbol_requests_total", "symbol" => symbol.to_string()).increment(1);
self.request_durations.write().await.push(duration);
let mut durations = self.request_durations.write().await;
if durations.len() > 1000 {
durations.remove(0);
}
}
#[cfg(feature = "metrics")]
pub async fn record_request_error(&self, symbol: &str, duration: Duration, endpoint: &str, error_type: &str) {
counter!("eeyf_requests_total", "symbol" => symbol.to_string(), "endpoint" => endpoint.to_string(), "status" => "error", "error_type" => error_type.to_string()).increment(1);
counter!("eeyf_requests_error_total", "symbol" => symbol.to_string(), "endpoint" => endpoint.to_string(), "error_type" => error_type.to_string()).increment(1);
counter!("eeyf_errors_by_type_total", "error_type" => error_type.to_string()).increment(1);
histogram!("eeyf_request_duration_seconds", "symbol" => symbol.to_string(), "endpoint" => endpoint.to_string()).record(duration.as_secs_f64());
let mut error_counts = self.error_counts.write().await;
error_counts.entry(error_type.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
#[cfg(feature = "metrics")]
pub fn record_rate_limit_hit(&self, wait_duration: Duration) {
counter!("eeyf_rate_limit_hits_total").increment(1);
histogram!("eeyf_rate_limit_wait_seconds").record(wait_duration.as_secs_f64());
}
#[cfg(feature = "metrics")]
pub fn update_rate_limit_tokens(&self, tokens: f64) {
gauge!("eeyf_rate_limit_tokens").set(tokens);
}
#[cfg(feature = "metrics")]
pub fn record_circuit_breaker_open(&self) {
counter!("eeyf_circuit_breaker_opens_total").increment(1);
gauge!("eeyf_circuit_breaker_state").set(1.0); }
#[cfg(feature = "metrics")]
pub fn record_circuit_breaker_close(&self) {
counter!("eeyf_circuit_breaker_closes_total").increment(1);
gauge!("eeyf_circuit_breaker_state").set(0.0); }
#[cfg(feature = "metrics")]
pub fn record_circuit_breaker_half_open(&self) {
counter!("eeyf_circuit_breaker_half_opens_total").increment(1);
gauge!("eeyf_circuit_breaker_state").set(2.0); }
#[cfg(feature = "metrics")]
pub fn record_request_rejected(&self) {
counter!("eeyf_requests_rejected_total").increment(1);
}
#[cfg(feature = "metrics")]
pub fn record_cache_hit(&self, lookup_duration: Duration) {
counter!("eeyf_cache_hits_total").increment(1);
histogram!("eeyf_cache_lookup_duration_seconds").record(lookup_duration.as_secs_f64());
}
#[cfg(feature = "metrics")]
pub fn record_cache_miss(&self, lookup_duration: Duration) {
counter!("eeyf_cache_misses_total").increment(1);
histogram!("eeyf_cache_lookup_duration_seconds").record(lookup_duration.as_secs_f64());
}
#[cfg(feature = "metrics")]
pub fn record_cache_eviction(&self) {
counter!("eeyf_cache_evictions_total").increment(1);
}
#[cfg(feature = "metrics")]
pub fn update_cache_size(&self, size: f64) {
gauge!("eeyf_cache_size").set(size);
}
#[cfg(feature = "metrics")]
pub fn record_retry_attempt(&self, delay: Duration) {
counter!("eeyf_retry_attempts_total").increment(1);
histogram!("eeyf_retry_delay_seconds").record(delay.as_secs_f64());
}
#[cfg(feature = "metrics")]
pub fn record_retry_success(&self) {
counter!("eeyf_retry_success_total").increment(1);
}
#[cfg(feature = "metrics")]
pub fn record_retry_exhausted(&self) {
counter!("eeyf_retry_exhausted_total").increment(1);
}
#[cfg(feature = "metrics")]
pub fn update_connection_pool(&self, pool_size: f64, active_connections: f64) {
gauge!("eeyf_connection_pool_size").set(pool_size);
gauge!("eeyf_active_connections").set(active_connections);
}
#[cfg(feature = "metrics")]
pub fn record_connection_timeout(&self) {
counter!("eeyf_connection_timeouts_total").increment(1);
}
#[cfg(feature = "metrics")]
pub fn record_connection_acquire(&self, duration: Duration) {
histogram!("eeyf_connection_acquire_duration_seconds").record(duration.as_secs_f64());
}
#[cfg(feature = "metrics")]
pub async fn update_performance_metrics(&self) {
let uptime = self.start_time.elapsed().as_secs_f64();
gauge!("eeyf_uptime_seconds").set(uptime);
let durations = self.request_durations.read().await;
if !durations.is_empty() {
let total_requests = durations.len() as f64;
let total_errors = {
let error_counts = self.error_counts.read().await;
error_counts.values().map(|c| c.load(Ordering::Relaxed)).sum::<u64>() as f64
};
let success_rate = (total_requests - total_errors) / total_requests;
let error_rate = total_errors / total_requests;
let throughput = total_requests / uptime;
gauge!("eeyf_success_rate").set(success_rate);
gauge!("eeyf_error_rate").set(error_rate);
gauge!("eeyf_throughput_rps").set(throughput);
}
}
}
pub fn categorize_error(error: &YahooError) -> &'static str {
match error {
YahooError::InvalidStatusCode(_) => "http_error",
YahooError::DeserializeFailed(_) => "parse_error",
YahooError::ConnectionFailed(_) => "connection_error",
YahooError::TooManyRequests(_) => "rate_limit",
YahooError::Unauthorized => "auth_error",
YahooError::FetchFailed(_) => "fetch_error",
YahooError::ApiError(_) => "api_error",
YahooError::NoResult | YahooError::NoQuotes => "no_data",
YahooError::DataInconsistency => "data_error",
YahooError::BuilderFailed => "config_error",
_ => "unknown_error",
}
}
#[cfg(not(feature = "metrics"))]
impl PrometheusMetrics {
pub async fn start_prometheus_server(_port: u16) -> Result<(), YahooError> {
Ok(())
}
pub async fn record_request_success(&self, _symbol: &str, _duration: Duration, _endpoint: &str) {}
pub async fn record_request_error(&self, _symbol: &str, _duration: Duration, _endpoint: &str, _error_type: &str) {}
pub fn record_rate_limit_hit(&self, _wait_duration: Duration) {}
pub fn update_rate_limit_tokens(&self, _tokens: f64) {}
pub fn record_circuit_breaker_open(&self) {}
pub fn record_circuit_breaker_close(&self) {}
pub fn record_circuit_breaker_half_open(&self) {}
pub fn record_request_rejected(&self) {}
pub fn record_cache_hit(&self, _lookup_duration: Duration) {}
pub fn record_cache_miss(&self, _lookup_duration: Duration) {}
pub fn record_cache_eviction(&self) {}
pub fn update_cache_size(&self, _size: f64) {}
pub fn record_retry_attempt(&self, _delay: Duration) {}
pub fn record_retry_success(&self) {}
pub fn record_retry_exhausted(&self) {}
pub fn update_connection_pool(&self, _pool_size: f64, _active_connections: f64) {}
pub fn record_connection_timeout(&self) {}
pub fn record_connection_acquire(&self, _duration: Duration) {}
pub async fn update_performance_metrics(&self) {}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_metrics_creation() {
let metrics = PrometheusMetrics::new();
assert!(metrics.request_durations.read().await.is_empty());
}
#[test]
fn test_error_categorization() {
let fetch_error = YahooError::FetchFailed("test fetch".to_string());
assert_eq!(categorize_error(&fetch_error), "fetch_error");
let connection_error = YahooError::ConnectionFailed("test connection".to_string());
assert_eq!(categorize_error(&connection_error), "connection_error");
}
#[cfg(feature = "metrics")]
#[tokio::test]
async fn test_request_tracking() {
let metrics = PrometheusMetrics::new();
metrics.record_request_success("AAPL", Duration::from_millis(100), "quotes").await;
let durations = metrics.request_durations.read().await;
assert_eq!(durations.len(), 1);
assert_eq!(durations[0], Duration::from_millis(100));
}
}