use anyhow::Result;
use prometheus::{
proto::MetricFamily, Counter as PrometheusCounter, Encoder, Gauge as PrometheusGauge,
Histogram as PrometheusHistogram, HistogramOpts, IntCounter, IntGauge, Opts, Registry,
TextEncoder,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::Mutex as TokioMutex;
use tokio::sync::RwLock;
use tracing::{debug, error, instrument};
use crate::utils::error::OpenCratesError;
use once_cell::sync::Lazy;
#[cfg(feature = "metrics")]
use prometheus::{
opts, register_counter_vec, register_gauge, register_histogram_vec, CounterVec, Gauge,
HistogramVec,
};
#[derive(Debug, Clone)]
pub struct CustomHistogramOpts {
pub name: String,
pub help: String,
pub buckets: Option<Vec<f64>>,
}
#[derive(Debug, Clone)]
pub struct CustomGaugeOpts {
pub name: String,
pub help: String,
}
#[derive(Debug, thiserror::Error)]
pub enum MetricsError {
#[error("Metric not found: {0}")]
MetricNotFound(String),
#[error("Invalid metric value: {0}")]
InvalidValue(String),
#[error("Prometheus error: {0}")]
PrometheusError(String),
#[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MetricType {
Counter,
Gauge,
Histogram,
Summary,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", content = "value")]
pub enum MetricValue {
Counter(u64),
Gauge(f64),
Histogram(HistogramData),
Timer(TimerData),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct HistogramData {
pub buckets: Vec<f64>,
pub counts: Vec<u64>,
pub sum: f64,
pub count: u64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TimerData {
pub count: u64,
pub sum: Duration,
pub min: Duration,
pub max: Duration,
pub avg: Duration,
}
pub type Labels = HashMap<String, String>;
#[derive(Debug, Clone, Serialize)]
pub struct MetricSample {
pub name: String,
pub labels: Labels,
pub value: MetricValue,
pub timestamp: SystemTime,
pub help: Option<String>,
}
impl MetricSample {
#[must_use]
pub fn new(name: String, value: MetricValue) -> Self {
Self {
name,
labels: HashMap::new(),
value,
timestamp: SystemTime::now(),
help: None,
}
}
#[must_use]
pub fn with_labels(mut self, labels: Labels) -> Self {
self.labels = labels;
self
}
#[must_use]
pub fn with_help(mut self, help: String) -> Self {
self.help = Some(help);
self
}
#[must_use]
pub fn with_timestamp(mut self, timestamp: SystemTime) -> Self {
self.timestamp = timestamp;
self
}
}
#[derive(Debug, Clone)]
pub struct Counter {
value: Arc<RwLock<u64>>,
name: String,
help: String,
labels: Labels,
}
impl Counter {
#[must_use]
pub fn new(name: String, help: String) -> Self {
Self {
value: Arc::new(RwLock::new(0)),
name,
help,
labels: HashMap::new(),
}
}
#[must_use]
pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
self.labels = labels;
self
}
pub async fn increment(&self) -> Result<(), MetricsError> {
let mut value = self.value.write().await;
*value = value
.checked_add(1)
.ok_or_else(|| MetricsError::InvalidValue("Counter overflow".to_string()))?;
Ok(())
}
pub async fn add(&self, amount: u64) -> Result<(), MetricsError> {
let mut value = self.value.write().await;
*value = value
.checked_add(amount)
.ok_or_else(|| MetricsError::InvalidValue("Counter overflow".to_string()))?;
Ok(())
}
pub async fn get(&self) -> u64 {
*self.value.read().await
}
pub async fn reset(&self) {
let mut value = self.value.write().await;
*value = 0;
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn help(&self) -> &str {
&self.help
}
#[must_use]
pub fn labels(&self) -> &Labels {
&self.labels
}
}
#[derive(Debug, Clone)]
pub struct Gauge {
value: Arc<RwLock<f64>>,
name: String,
help: String,
labels: Labels,
}
impl Gauge {
#[must_use]
pub fn new(name: String, help: String) -> Self {
Self {
value: Arc::new(RwLock::new(0.0)),
name,
help,
labels: HashMap::new(),
}
}
#[must_use]
pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
self.labels = labels;
self
}
pub async fn set(&self, value: f64) -> Result<(), MetricsError> {
if !value.is_finite() {
return Err(MetricsError::InvalidValue(
"Gauge value must be finite".to_string(),
));
}
let mut v = self.value.write().await;
*v = value;
Ok(())
}
pub async fn increment(&self) -> Result<(), MetricsError> {
self.add(1.0).await
}
pub async fn decrement(&self) -> Result<(), MetricsError> {
self.add(-1.0).await
}
pub async fn add(&self, amount: f64) -> Result<(), MetricsError> {
if !amount.is_finite() {
return Err(MetricsError::InvalidValue(
"Amount must be finite".to_string(),
));
}
let mut value = self.value.write().await;
let new_value = *value + amount;
if !new_value.is_finite() {
return Err(MetricsError::InvalidValue(
"Resulting gauge value would be infinite".to_string(),
));
}
*value = new_value;
Ok(())
}
pub async fn get(&self) -> f64 {
*self.value.read().await
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn help(&self) -> &str {
&self.help
}
#[must_use]
pub fn labels(&self) -> &Labels {
&self.labels
}
}
#[derive(Debug, Clone)]
pub struct Histogram {
name: String,
buckets: Vec<f64>,
counts: Arc<RwLock<Vec<u64>>>,
sum: Arc<RwLock<f64>>,
count: Arc<RwLock<u64>>,
labels: Labels,
help: Option<String>,
}
impl Histogram {
#[must_use]
pub fn new(name: String) -> Self {
Self::with_buckets(name, Self::default_buckets())
}
#[must_use]
pub fn with_buckets(name: String, mut buckets: Vec<f64>) -> Self {
buckets.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let counts = vec![0; buckets.len() + 1];
Self {
name,
buckets: buckets.clone(),
counts: Arc::new(RwLock::new(counts)),
sum: Arc::new(RwLock::new(0.0)),
count: Arc::new(RwLock::new(0)),
labels: HashMap::new(),
help: None,
}
}
#[must_use]
pub fn with_help(mut self, help: String) -> Self {
self.help = Some(help);
self
}
#[must_use]
pub fn with_labels(mut self, labels: Labels) -> Self {
self.labels = labels;
self
}
fn default_buckets() -> Vec<f64> {
vec![
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
]
}
pub async fn observe(&self, value: f64) -> Result<(), MetricsError> {
if !value.is_finite() {
return Err(MetricsError::InvalidValue(
"Histogram value must be finite".to_string(),
));
}
{
let mut sum = self.sum.write().await;
*sum += value;
}
{
let mut count = self.count.write().await;
*count += 1;
}
{
let mut counts = self.counts.write().await;
let mut index = self.buckets.len();
for (i, &bucket) in self.buckets.iter().enumerate() {
if value <= bucket {
index = i;
break;
}
}
counts[index] += 1;
}
Ok(())
}
pub async fn time<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
let start = Instant::now();
let result = f();
let duration = start.elapsed();
let _ = self.observe(duration.as_secs_f64()).await; result
}
pub async fn time_async<F, Fut, R>(&self, f: F) -> R
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = R>,
{
let start = Instant::now();
let result = f().await;
let duration = start.elapsed();
let _ = self.observe(duration.as_secs_f64()).await; result
}
pub async fn get_sum(&self) -> f64 {
*self.sum.read().await
}
pub async fn get_count(&self) -> u64 {
*self.count.read().await
}
pub async fn bucket_counts(&self) -> Vec<u64> {
self.counts.read().await.clone()
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn buckets(&self) -> &[f64] {
&self.buckets
}
#[must_use]
pub fn help(&self) -> Option<&str> {
self.help.as_deref()
}
#[must_use]
pub fn labels(&self) -> &Labels {
&self.labels
}
}
#[derive(Debug)]
pub struct Timer {
start: Instant,
histogram: Option<Arc<Histogram>>,
}
impl Timer {
#[must_use]
pub fn new() -> Self {
Self {
start: Instant::now(),
histogram: None,
}
}
#[must_use]
pub fn with_histogram(histogram: Arc<Histogram>) -> Self {
Self {
start: Instant::now(),
histogram: Some(histogram),
}
}
#[must_use]
pub fn elapsed(&self) -> Duration {
self.start.elapsed()
}
pub async fn finish(self) -> Duration {
let duration = self.elapsed();
if let Some(histogram) = &self.histogram {
let _ = histogram.observe(duration.as_secs_f64()).await;
}
duration
}
}
impl Default for Timer {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct TokenUsage {
pub prompt_tokens: usize,
pub completion_tokens: usize,
pub total_tokens: usize,
}
impl TokenUsage {
#[must_use]
pub fn new(prompt_tokens: usize, completion_tokens: usize) -> Self {
Self {
prompt_tokens,
completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProviderMetrics {
pub provider_name: String,
pub model_name: String,
pub duration: Duration,
pub usage: Option<TokenUsage>,
#[cfg(feature = "metrics")]
pub requests: CounterVec,
#[cfg(feature = "metrics")]
pub latency: HistogramVec,
#[cfg(feature = "metrics")]
pub errors: CounterVec,
}
impl ProviderMetrics {
#[cfg(feature = "metrics")]
pub fn new(provider_name: &str) -> Self {
let requests = register_counter_vec!(
opts!("opencrates_provider_requests_total", "Total number of requests"),
&["provider_name"]
)
.unwrap();
let latency = register_histogram_vec!(
opts!("opencrates_provider_latency_seconds", "Latency of requests"),
&["provider_name"]
)
.unwrap();
let errors = register_counter_vec!(
opts!("opencrates_provider_errors_total", "Total number of errors"),
&["provider_name"]
)
.unwrap();
Self {
provider_name: provider_name.to_string(),
model_name: String::new(),
duration: Duration::default(),
usage: None,
requests,
latency,
errors,
}
}
#[cfg(not(feature = "metrics"))]
pub fn new(provider_name: &str) -> Self {
Self {
provider_name: provider_name.to_string(),
model_name: String::new(),
duration: Duration::default(),
usage: None,
}
}
#[instrument(skip(self))]
pub fn record_request(&self) {
#[cfg(feature = "metrics")]
self.requests.with_label_values(&[&self.provider_name]).inc();
}
#[instrument(skip(self))]
pub fn record_latency(&self, duration: f64) {
#[cfg(feature = "metrics")]
self.latency.with_label_values(&[&self.provider_name]).observe(duration);
}
#[instrument(skip(self))]
pub fn record_error(&self) {
#[cfg(feature = "metrics")]
self.errors.with_label_values(&[&self.provider_name]).inc();
}
}
#[derive(Debug, Clone)]
pub struct MetricRegistry {
counters: Arc<RwLock<HashMap<String, Arc<Counter>>>>,
gauges: Arc<RwLock<HashMap<String, Arc<Gauge>>>>,
histograms: Arc<RwLock<HashMap<String, Arc<Histogram>>>>,
prometheus_registry: Arc<Registry>,
}
impl MetricRegistry {
#[must_use]
pub fn new() -> Self {
Self {
counters: Arc::new(RwLock::new(HashMap::new())),
gauges: Arc::new(RwLock::new(HashMap::new())),
histograms: Arc::new(RwLock::new(HashMap::new())),
prometheus_registry: Arc::new(Registry::new()),
}
}
pub async fn register_counter(
&self,
name: &str,
help: &str,
) -> Result<Arc<Counter>, MetricsError> {
let counter = Arc::new(Counter::new(name.to_string(), help.to_string()));
self.counters
.write()
.await
.insert(name.to_string(), counter.clone());
Ok(counter)
}
pub async fn register_gauge(&self, name: &str, help: &str) -> Result<Arc<Gauge>, MetricsError> {
let gauge = Arc::new(Gauge::new(name.to_string(), help.to_string()));
self.gauges
.write()
.await
.insert(name.to_string(), gauge.clone());
Ok(gauge)
}
pub async fn register_histogram(
&self,
name: &str,
help: &str,
) -> Result<Arc<Histogram>, MetricsError> {
let histogram = Arc::new(Histogram::new(name.to_string()).with_help(help.to_string()));
self.histograms
.write()
.await
.insert(name.to_string(), histogram.clone());
Ok(histogram)
}
pub async fn get_counter(&self, name: &str) -> Option<Arc<Counter>> {
self.counters.read().await.get(name).cloned()
}
pub async fn get_gauge(&self, name: &str) -> Option<Arc<Gauge>> {
self.gauges.read().await.get(name).cloned()
}
pub async fn get_histogram(&self, name: &str) -> Option<Arc<Histogram>> {
self.histograms.read().await.get(name).cloned()
}
pub async fn collect_metrics(&self) -> Vec<MetricSample> {
let mut samples = Vec::new();
for (name, counter) in self.counters.read().await.iter() {
let value = counter.get().await;
samples.push(MetricSample::new(name.clone(), MetricValue::Counter(value)));
}
for (name, gauge) in self.gauges.read().await.iter() {
let value = gauge.get().await;
samples.push(MetricSample::new(name.clone(), MetricValue::Gauge(value)));
}
for (name, histogram) in self.histograms.read().await.iter() {
let buckets = histogram.buckets().to_vec();
let counts = histogram.bucket_counts().await;
let sum = histogram.get_sum().await;
let count = histogram.get_count().await;
let histogram_data = HistogramData {
buckets,
counts,
sum,
count,
};
samples.push(MetricSample::new(
name.clone(),
MetricValue::Histogram(histogram_data),
));
}
samples
}
pub async fn export_prometheus(&self) -> Result<String, MetricsError> {
let encoder = TextEncoder::new();
let metric_families = self.prometheus_registry.gather();
encoder
.encode_to_string(&metric_families)
.map_err(|e| MetricsError::PrometheusError(e.to_string()))
}
}
impl Default for MetricRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct OpenCratesMetrics {
registry: Arc<MetricRegistry>,
pub crate_generations: Arc<Counter>,
pub api_requests: Arc<Counter>,
pub generation_duration: Arc<Histogram>,
pub active_connections: Arc<Gauge>,
pub cache_hits: Arc<Counter>,
pub cache_misses: Arc<Counter>,
}
impl OpenCratesMetrics {
pub async fn new() -> Result<Self, MetricsError> {
let registry = Arc::new(MetricRegistry::new());
let crate_generations = registry
.register_counter(
"opencrates_crate_generations_total",
"Total number of crate generations",
)
.await?;
let api_requests = registry
.register_counter(
"opencrates_api_requests_total",
"Total number of API requests",
)
.await?;
let generation_duration = registry
.register_histogram(
"opencrates_generation_duration_seconds",
"Duration of crate generation in seconds",
)
.await?;
let active_connections = registry
.register_gauge(
"opencrates_active_connections",
"Number of active connections",
)
.await?;
let cache_hits = registry
.register_counter("opencrates_cache_hits_total", "Total number of cache hits")
.await?;
let cache_misses = registry
.register_counter(
"opencrates_cache_misses_total",
"Total number of cache misses",
)
.await?;
Ok(Self {
registry,
crate_generations,
api_requests,
generation_duration,
active_connections,
cache_hits,
cache_misses,
})
}
pub async fn record_generation(&self, duration: Duration) -> Result<(), MetricsError> {
self.crate_generations.increment().await?;
self.generation_duration
.observe(duration.as_secs_f64())
.await?;
Ok(())
}
pub async fn record_api_request(&self) -> Result<(), MetricsError> {
self.api_requests.increment().await
}
pub async fn set_active_connections(&self, count: i64) -> Result<(), MetricsError> {
self.active_connections.set(count as f64).await
}
pub async fn record_cache_hit(&self) -> Result<(), MetricsError> {
self.cache_hits.increment().await
}
pub async fn record_cache_miss(&self) -> Result<(), MetricsError> {
self.cache_misses.increment().await
}
#[must_use]
pub fn registry(&self) -> &Arc<MetricRegistry> {
&self.registry
}
pub async fn export_metrics(&self) -> Result<String, MetricsError> {
let samples = self.registry.collect_metrics().await;
serde_json::to_string_pretty(&samples).map_err(MetricsError::SerializationError)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SystemMetrics {
pub cpu_usage: f32,
pub memory_usage_mb: u64,
}
#[cfg(feature = "metrics")]
pub static CRATE_GENERATION_COUNTER: Lazy<CounterVec> = Lazy::new(|| {
register_counter_vec!(
opts!("opencrates_crate_generations_total", "Total number of crate generations"),
&[]
)
.unwrap()
});
#[cfg(feature = "metrics")]
pub static ACTIVE_AGENTS_GAUGE: Lazy<Gauge> =
Lazy::new(|| register_gauge!(opts!("opencrates_active_agents", "Active AI agents")).unwrap());