use mod_interface::mod_interface;
mod private
{
use crate::
{
error ::{ OpenAIError, Result },
connection_manager ::{ ConnectionEfficiencyMetrics, PoolStatistics },
};
#[ cfg( feature = "caching" ) ]
use crate::response_cache::CacheStatistics;
use std::
{
collections ::HashMap,
sync ::Arc,
time ::{ SystemTime, UNIX_EPOCH },
};
use core::
{
fmt ::Write,
time ::Duration,
};
use std::time::Instant;
use tokio::sync::RwLock;
use serde::{ Serialize, Deserialize };
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
#[ allow( clippy::struct_excessive_bools ) ]
pub struct MetricsConfig
{
pub collect_connection_metrics : bool,
pub collect_cache_metrics : bool,
pub collect_circuit_breaker_metrics : bool,
pub collect_timing_metrics : bool,
pub collect_error_metrics : bool,
pub max_entries : usize,
pub collection_interval : Duration,
pub retention_period : Duration,
pub enable_streaming : bool,
}
impl Default for MetricsConfig
{
#[ inline ]
fn default() -> Self
{
Self
{
collect_connection_metrics : true,
collect_cache_metrics : true,
collect_circuit_breaker_metrics : true,
collect_timing_metrics : true,
collect_error_metrics : true,
max_entries : 10000,
collection_interval : Duration::from_secs( 10 ),
retention_period : Duration::from_secs( 3600 ), enable_streaming : false,
}
}
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct MetricsSnapshot
{
pub timestamp : u64,
pub connection_metrics : Option< ConnectionMetrics >,
pub cache_metrics : Option< CacheMetrics >,
#[ cfg( feature = "circuit_breaker" ) ]
pub circuit_breaker_metrics : Option< CircuitBreakerMetrics >,
pub timing_metrics : Option< TimingMetrics >,
pub error_metrics : Option< ErrorMetrics >,
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct ConnectionMetrics
{
pub efficiency_score : f64,
pub connection_reuse_ratio : f64,
pub average_pool_utilization : f64,
pub total_requests_served : u64,
pub average_response_time_seconds : f64,
pub active_connections : usize,
pub health_score : f64,
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct CacheMetrics
{
pub total_requests : u64,
pub cache_hits : u64,
pub cache_misses : u64,
pub hit_ratio : f64,
pub current_entries : usize,
pub total_cached_bytes : usize,
pub average_ttl_seconds : f64,
pub efficiency_score : f64,
}
#[ cfg( feature = "circuit_breaker" ) ]
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct CircuitBreakerMetrics
{
pub state : String,
pub total_requests : u64,
pub total_failures : u64,
pub trip_count : u64,
pub failure_rate : f64,
pub time_in_state_seconds : f64,
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct TimingMetrics
{
pub average_duration_ms : f64,
pub min_duration_ms : f64,
pub max_duration_ms : f64,
pub p95_duration_ms : f64,
pub p99_duration_ms : f64,
pub total_requests : u64,
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct ErrorMetrics
{
pub total_errors : u64,
pub error_types : HashMap< String, u64 >,
pub error_rate_per_minute : f64,
pub most_common_error : Option< String >,
pub trend : String,
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct MetricsDataPoint
{
pub timestamp : u64,
pub metric_name : String,
pub value : f64,
pub tags : HashMap< String, String >,
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct MetricsAggregation
{
pub period_seconds : u64,
pub start_timestamp : u64,
pub end_timestamp : u64,
pub aggregated_data : HashMap< String, f64 >,
pub quality_score : f64,
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct MetricsAnalysisReport
{
pub timestamp : u64,
pub health_score : f64,
pub performance_grade : String,
pub kpis : Vec< String >,
pub trends : Vec< String >,
pub issues : Vec< String >,
pub recommendations : Vec< String >,
pub risk_level : String,
}
#[ derive( Debug ) ]
pub struct MetricsCollector
{
config : MetricsConfig,
metrics_history : Arc< RwLock< Vec< MetricsSnapshot > > >,
timing_data : Arc< RwLock< Vec< f64 > > >,
error_counts : Arc< RwLock< HashMap< String, u64 > > >,
start_time : Instant,
collection_handle : Option< tokio::task::JoinHandle< () > >,
}
impl Default for MetricsCollector
{
#[ inline ]
fn default() -> Self
{
Self::with_config( MetricsConfig::default() )
}
}
impl MetricsCollector
{
#[ inline ]
#[ must_use ]
pub fn new() -> Self
{
Self::default()
}
#[ inline ]
#[ must_use ]
pub fn with_config( config : MetricsConfig ) -> Self
{
Self
{
config,
metrics_history : Arc::new( RwLock::new( Vec::new() ) ),
timing_data : Arc::new( RwLock::new( Vec::new() ) ),
error_counts : Arc::new( RwLock::new( HashMap::new() ) ),
start_time : Instant::now(),
collection_handle : None,
}
}
#[ inline ]
pub fn start_collection( &mut self )
{
if self.config.collection_interval > Duration::ZERO
{
let metrics_history = Arc::clone( &self.metrics_history );
let timing_data = Arc::clone( &self.timing_data );
let _error_counts = Arc::clone( &self.error_counts );
let config = self.config.clone();
let handle = tokio::spawn( async move
{
let mut interval = tokio::time::interval( config.collection_interval );
loop
{
interval.tick().await;
let retention_cutoff = SystemTime::now()
.duration_since( UNIX_EPOCH )
.unwrap_or_default()
.as_secs()
.saturating_sub( config.retention_period.as_secs() );
{
let mut history = metrics_history.write().await;
history.retain( | snapshot | snapshot.timestamp > retention_cutoff );
if history.len() > config.max_entries
{
let excess = history.len() - config.max_entries;
history.drain( 0..excess );
}
}
{
let mut timing = timing_data.write().await;
if timing.len() > config.max_entries
{
let excess = timing.len() - config.max_entries;
timing.drain( 0..excess );
}
}
}
} );
self.collection_handle = Some( handle );
}
}
#[ inline ]
pub fn stop_collection( &mut self )
{
if let Some( handle ) = self.collection_handle.take()
{
handle.abort();
}
}
#[ inline ]
pub async fn record_timing( &self, duration : Duration )
{
if self.config.collect_timing_metrics
{
let mut timing_data = self.timing_data.write().await;
timing_data.push( duration.as_millis() as f64 );
}
}
#[ inline ]
pub async fn record_error( &self, error_type : &str )
{
if self.config.collect_error_metrics
{
let mut error_counts = self.error_counts.write().await;
*error_counts.entry( error_type.to_string() ).or_insert( 0 ) += 1;
}
}
#[ inline ]
pub async fn collect_snapshot(
&self,
connection_metrics : Option< &ConnectionEfficiencyMetrics >,
pool_stats : Option< &Vec< PoolStatistics > >,
#[ cfg( feature = "caching" ) ]
cache_stats : Option< &CacheStatistics >,
#[ cfg( not( feature = "caching" ) ) ]
_cache_stats : Option< &() >,
_circuit_breaker_stats : Option< &() >, ) -> MetricsSnapshot
{
let timestamp = SystemTime::now()
.duration_since( UNIX_EPOCH )
.unwrap_or_default()
.as_secs();
let connection_metrics = if self.config.collect_connection_metrics
{
connection_metrics.map( | cm | Self::build_connection_metrics( cm, pool_stats ) )
}
else
{
None
};
#[ cfg( feature = "caching" ) ]
let cache_metrics = if self.config.collect_cache_metrics
{
cache_stats.map( Self::build_cache_metrics )
}
else
{
None
};
#[ cfg( not( feature = "caching" ) ) ]
let cache_metrics = None;
#[ cfg( feature = "circuit_breaker" ) ]
let circuit_breaker_metrics = None;
let timing_metrics = if self.config.collect_timing_metrics
{
Some( self.build_timing_metrics().await )
}
else
{
None
};
let error_metrics = if self.config.collect_error_metrics
{
Some( self.build_error_metrics().await )
}
else
{
None
};
MetricsSnapshot
{
timestamp,
connection_metrics,
cache_metrics,
#[ cfg( feature = "circuit_breaker" ) ]
circuit_breaker_metrics,
timing_metrics,
error_metrics,
}
}
#[ inline ]
pub async fn store_snapshot( &self, snapshot : MetricsSnapshot )
{
let mut history = self.metrics_history.write().await;
history.push( snapshot );
if history.len() > self.config.max_entries
{
history.remove( 0 );
}
}
#[ inline ]
pub async fn get_history( &self ) -> Vec< MetricsSnapshot >
{
let history = self.metrics_history.read().await;
history.clone()
}
#[ inline ]
pub async fn generate_analysis_report( &self ) -> MetricsAnalysisReport
{
let history = self.metrics_history.read().await;
let timestamp = SystemTime::now()
.duration_since( UNIX_EPOCH )
.unwrap_or_default()
.as_secs();
if history.is_empty()
{
return MetricsAnalysisReport
{
timestamp,
health_score : 0.0,
performance_grade : "N/A".to_string(),
kpis : vec![ "No metrics data available".to_string() ],
trends : vec![],
issues : vec![ "Insufficient metrics data for analysis".to_string() ],
recommendations : vec![ "Enable metrics collection and allow time for data accumulation".to_string() ],
risk_level : "Unknown".to_string(),
};
}
let latest = &history[ history.len() - 1 ];
let mut health_scores = Vec::new();
let mut kpis = Vec::new();
let mut trends = Vec::new();
let mut issues = Vec::new();
let mut recommendations = Vec::new();
if let Some( ref conn_metrics ) = latest.connection_metrics
{
health_scores.push( conn_metrics.health_score );
kpis.push( format!( "Connection Efficiency : {:.1}%", conn_metrics.efficiency_score * 100.0 ) );
if conn_metrics.efficiency_score < 0.7
{
issues.push( "Low connection efficiency detected".to_string() );
recommendations.push( "Review connection pool configuration".to_string() );
}
}
if let Some( ref cache_metrics ) = latest.cache_metrics
{
health_scores.push( cache_metrics.efficiency_score );
kpis.push( format!( "Cache Hit Ratio : {:.1}%", cache_metrics.hit_ratio * 100.0 ) );
if cache_metrics.hit_ratio < 0.5
{
issues.push( "Low cache hit ratio".to_string() );
recommendations.push( "Review cache TTL settings and request patterns".to_string() );
}
}
if let Some( ref error_metrics ) = latest.error_metrics
{
kpis.push( format!( "Error Rate : {:.1}/min", error_metrics.error_rate_per_minute ) );
if error_metrics.error_rate_per_minute > 5.0
{
issues.push( "High error rate detected".to_string() );
recommendations.push( "Investigate root cause of errors".to_string() );
}
}
let health_score = if health_scores.is_empty()
{
0.5 }
else
{
health_scores.iter().sum::< f64 >() / health_scores.len() as f64
};
let performance_grade = match health_score
{
s if s >= 0.9 => "A",
s if s >= 0.8 => "B",
s if s >= 0.7 => "C",
s if s >= 0.6 => "D",
_ => "F",
};
let risk_level = match health_score
{
s if s >= 0.8 => "Low",
s if s >= 0.6 => "Medium",
_ => "High",
};
if history.len() >= 5
{
trends.push( "Trend analysis available".to_string() );
}
else
{
trends.push( "Insufficient data for trend analysis".to_string() );
}
MetricsAnalysisReport
{
timestamp,
health_score,
performance_grade : performance_grade.to_string(),
kpis,
trends,
issues,
recommendations,
risk_level : risk_level.to_string(),
}
}
#[ inline ]
pub async fn export_json( &self ) -> Result< String >
{
let history = self.get_history().await;
serde_json ::to_string_pretty( &history )
.map_err( | e | OpenAIError::Internal( format!( "Failed to export metrics to JSON: {e}" ) ).into() )
}
#[ inline ]
pub async fn export_prometheus( &self ) -> String
{
let mut output = String::new();
let history = self.metrics_history.read().await;
if let Some( latest ) = history.last()
{
if let Some( ref conn_metrics ) = latest.connection_metrics
{
output.push_str( "# HELP openai_connection_efficiency Connection efficiency score\n" );
output.push_str( "# TYPE openai_connection_efficiency gauge\n" );
let _ = writeln!( output, "openai_connection_efficiency {}", conn_metrics.efficiency_score );
output.push_str( "# HELP openai_connection_reuse_ratio Connection reuse ratio\n" );
output.push_str( "# TYPE openai_connection_reuse_ratio gauge\n" );
let _ = writeln!( output, "openai_connection_reuse_ratio {}", conn_metrics.connection_reuse_ratio );
}
if let Some( ref cache_metrics ) = latest.cache_metrics
{
output.push_str( "# HELP openai_cache_hit_ratio Cache hit ratio\n" );
output.push_str( "# TYPE openai_cache_hit_ratio gauge\n" );
let _ = writeln!( output, "openai_cache_hit_ratio {}", cache_metrics.hit_ratio );
output.push_str( "# HELP openai_cache_entries Current cache entries\n" );
output.push_str( "# TYPE openai_cache_entries gauge\n" );
let _ = writeln!( output, "openai_cache_entries {}", cache_metrics.current_entries );
}
}
output
}
#[ inline ]
#[ must_use ]
pub fn get_config( &self ) -> &MetricsConfig
{
&self.config
}
#[ inline ]
pub fn update_config( &mut self, new_config : MetricsConfig )
{
self.config = new_config;
}
fn build_connection_metrics(
conn_metrics : &ConnectionEfficiencyMetrics,
pool_stats : Option< &Vec< PoolStatistics > >
) -> ConnectionMetrics
{
let active_connections = pool_stats
.map_or( 0, | pools | pools.iter().map( | p | p.in_use_connections ).sum() );
let health_score = if conn_metrics.efficiency_score > 0.8 { 0.9 } else { conn_metrics.efficiency_score };
ConnectionMetrics
{
efficiency_score : conn_metrics.efficiency_score,
connection_reuse_ratio : conn_metrics.connection_reuse_ratio,
average_pool_utilization : conn_metrics.average_pool_utilization,
total_requests_served : conn_metrics.total_requests_served,
average_response_time_seconds : 0.0, active_connections,
health_score,
}
}
#[ cfg( feature = "caching" ) ]
fn build_cache_metrics( cache_stats : &CacheStatistics ) -> CacheMetrics
{
let efficiency_score = if cache_stats.hit_ratio > 0.8 { 0.9 } else { cache_stats.hit_ratio };
CacheMetrics
{
total_requests : cache_stats.total_requests,
cache_hits : cache_stats.cache_hits,
cache_misses : cache_stats.cache_misses,
hit_ratio : cache_stats.hit_ratio,
current_entries : cache_stats.current_entries,
total_cached_bytes : cache_stats.total_cached_bytes,
average_ttl_seconds : cache_stats.average_ttl_seconds,
efficiency_score,
}
}
async fn build_timing_metrics( &self ) -> TimingMetrics
{
let timing_data = self.timing_data.read().await;
if timing_data.is_empty()
{
return TimingMetrics
{
average_duration_ms : 0.0,
min_duration_ms : 0.0,
max_duration_ms : 0.0,
p95_duration_ms : 0.0,
p99_duration_ms : 0.0,
total_requests : 0,
};
}
let mut sorted_data = timing_data.clone();
sorted_data.sort_by( | a, b | a.partial_cmp( b ).unwrap_or( core::cmp::Ordering::Equal ) );
let average_duration_ms = sorted_data.iter().sum::< f64 >() / sorted_data.len() as f64;
let min_duration_ms = sorted_data[ 0 ];
let max_duration_ms = sorted_data[ sorted_data.len() - 1 ];
let len = sorted_data.len();
let p95_index = ((len * 95) / 100).min(len.saturating_sub(1));
let p99_index = ((len * 99) / 100).min(len.saturating_sub(1));
let p95_duration_ms = sorted_data.get( p95_index ).copied().unwrap_or( max_duration_ms );
let p99_duration_ms = sorted_data.get( p99_index ).copied().unwrap_or( max_duration_ms );
TimingMetrics
{
average_duration_ms,
min_duration_ms,
max_duration_ms,
p95_duration_ms,
p99_duration_ms,
total_requests : sorted_data.len() as u64,
}
}
async fn build_error_metrics( &self ) -> ErrorMetrics
{
let error_counts = self.error_counts.read().await;
let total_errors = error_counts.values().sum();
let most_common_error = error_counts
.iter()
.max_by_key( | ( _, count ) | *count )
.map( | ( error_type, _ ) | error_type.clone() );
let elapsed_minutes = self.start_time.elapsed().as_secs_f64() / 60.0;
let error_rate_per_minute = if elapsed_minutes > 0.0
{
total_errors as f64 / elapsed_minutes
}
else
{
0.0
};
let trend = if error_rate_per_minute < 1.0
{
"Stable".to_string()
}
else if error_rate_per_minute < 5.0
{
"Increasing".to_string()
}
else
{
"Critical".to_string()
};
ErrorMetrics
{
total_errors,
error_types : error_counts.clone(),
error_rate_per_minute,
most_common_error,
trend,
}
}
}
impl Drop for MetricsCollector
{
#[ inline ]
fn drop( &mut self )
{
self.stop_collection();
}
}
#[ cfg( test ) ]
mod tests
{
use super::*;
#[ tokio::test ]
async fn test_metrics_collector_creation()
{
let collector = MetricsCollector::new();
assert!( collector.get_config().collect_connection_metrics );
}
#[ tokio::test ]
async fn test_timing_recording()
{
let collector = MetricsCollector::new();
collector.record_timing( Duration::from_millis( 100 ) ).await;
let timing_metrics = collector.build_timing_metrics().await;
assert_eq!( timing_metrics.total_requests, 1 );
assert!( (timing_metrics.average_duration_ms - 100.0).abs() < f64::EPSILON, "Expected average_duration_ms to be approximately 100.0, got {}", timing_metrics.average_duration_ms );
}
#[ tokio::test ]
async fn test_error_recording()
{
let collector = MetricsCollector::new();
collector.record_error( "timeout" ).await;
collector.record_error( "timeout" ).await;
collector.record_error( "network" ).await;
let error_metrics = collector.build_error_metrics().await;
assert_eq!( error_metrics.total_errors, 3 );
assert_eq!( error_metrics.most_common_error, Some( "timeout".to_string() ) );
}
#[ tokio::test ]
async fn test_metrics_export()
{
let collector = MetricsCollector::new();
let json_export = collector.export_json().await.unwrap();
assert!( json_export.starts_with( '[' ) );
let prometheus_export = collector.export_prometheus().await;
assert!( prometheus_export.contains( "# HELP" ) || prometheus_export.is_empty() );
}
}
}
mod_interface!
{
orphan use private::
{
MetricsConfig,
MetricsSnapshot,
ConnectionMetrics,
CacheMetrics,
TimingMetrics,
ErrorMetrics,
MetricsDataPoint,
MetricsAggregation,
MetricsAnalysisReport,
MetricsCollector,
};
#[ cfg( feature = "circuit_breaker" ) ]
orphan use private::
{
CircuitBreakerMetrics,
};
}