use crate::server::{AppState, QueryCache};
use axum::{
extract::{Query, State},
http::StatusCode,
response::{IntoResponse, Json},
};
use otelite_core::api::{
ErrorResponse, HistogramBucket, HistogramValue, MetricResponse, MetricValue, Quantile,
Resource, SummaryValue,
};
use otelite_core::storage::QueryParams;
use otelite_core::telemetry::metric::MetricType;
use otelite_core::telemetry::Metric;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Deserialize, Serialize, utoipa::IntoParams)]
pub struct MetricsQuery {
pub name: Option<String>,
pub resource: Option<String>,
pub start_time: Option<i64>,
pub end_time: Option<i64>,
pub limit: Option<usize>,
pub offset: Option<usize>,
}
#[derive(Debug, Deserialize, Serialize, utoipa::IntoParams)]
pub struct AggregateQuery {
pub name: String,
pub function: String,
pub bucket_size: Option<i64>,
pub start_time: Option<i64>,
pub end_time: Option<i64>,
}
#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
pub struct AggregateResponse {
pub name: String,
pub function: String,
pub result: f64,
pub count: usize,
pub buckets: Option<Vec<TimeBucket>>,
}
#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
pub struct TimeBucket {
pub timestamp: i64,
pub value: f64,
pub count: usize,
}
#[utoipa::path(
get,
path = "/api/metrics",
params(MetricsQuery),
responses(
(status = 200, description = "List of metrics", body = Vec<MetricResponse>),
(status = 500, description = "Internal server error", body = ErrorResponse)
),
tag = "metrics"
)]
pub async fn list_metrics(
State(state): State<AppState>,
Query(query): Query<MetricsQuery>,
) -> Result<Json<Vec<MetricResponse>>, (StatusCode, Json<ErrorResponse>)> {
let cache_key = QueryCache::make_key(&query);
if let Some(cached) = state.cache.metrics.get(&cache_key) {
if let Ok(response) = serde_json::from_str(&cached) {
return Ok(Json(response));
}
}
let mut params = QueryParams::default();
if let Some(start) = query.start_time {
params.start_time = Some(start);
}
if let Some(end) = query.end_time {
params.end_time = Some(end);
}
if let Some(limit) = query.limit {
params.limit = Some(limit);
}
let mut metrics = state
.storage
.query_latest_metrics(¶ms)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::storage_error(format!(
"query metrics: {}",
e
))),
)
})?;
if let Some(name_filter) = &query.name {
metrics.retain(|m| m.name.contains(name_filter));
}
if let Some(resource_filter) = &query.resource {
if let Some((key, value)) = resource_filter.split_once('=') {
metrics.retain(|m| {
m.resource
.as_ref()
.and_then(|r| r.attributes.get(key))
.map(|v| v == value)
.unwrap_or(false)
});
}
}
let offset = query.offset.unwrap_or(0);
let limit = query.limit.unwrap_or(100);
let metrics: Vec<_> = metrics.into_iter().skip(offset).take(limit).collect();
let response: Vec<MetricResponse> = metrics
.into_iter()
.map(|metric| {
let (metric_type_str, value) = match &metric.metric_type {
MetricType::Gauge(v) => ("gauge", MetricValue::Gauge(*v)),
MetricType::Counter(v) => ("counter", MetricValue::Counter(*v as i64)),
MetricType::Histogram {
count,
sum,
buckets,
} => (
"histogram",
MetricValue::Histogram(HistogramValue {
count: *count,
sum: *sum,
buckets: buckets
.iter()
.map(|b| HistogramBucket {
upper_bound: b.upper_bound,
count: b.count,
})
.collect(),
}),
),
MetricType::Summary {
count,
sum,
quantiles,
} => (
"summary",
MetricValue::Summary(SummaryValue {
count: *count,
sum: *sum,
quantiles: quantiles
.iter()
.map(|q| Quantile {
quantile: q.quantile,
value: q.value,
})
.collect(),
}),
),
};
MetricResponse {
name: metric.name,
description: metric.description,
unit: metric.unit,
metric_type: metric_type_str.to_string(),
value,
timestamp: metric.timestamp,
attributes: metric.attributes,
resource: metric.resource.map(|r| Resource {
attributes: r.attributes,
}),
}
})
.collect();
Ok(Json(response))
}
#[utoipa::path(
get,
path = "/api/metrics/names",
responses(
(status = 200, description = "List of unique metric names", body = Vec<String>),
(status = 500, description = "Internal server error", body = ErrorResponse)
),
tag = "metrics"
)]
pub async fn list_metric_names(
State(state): State<AppState>,
) -> Result<Json<Vec<String>>, (StatusCode, Json<ErrorResponse>)> {
let params = QueryParams::default();
let metrics = state.storage.query_metrics(¶ms).await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::storage_error(format!(
"list metric names: {}",
e
))),
)
})?;
let mut names: Vec<String> = metrics
.into_iter()
.map(|m| m.name)
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
names.sort();
Ok(Json(names))
}
#[utoipa::path(
get,
path = "/api/metrics/aggregate",
params(AggregateQuery),
responses(
(status = 200, description = "Aggregated metric result", body = AggregateResponse),
(status = 400, description = "Invalid aggregation function", body = ErrorResponse),
(status = 404, description = "Metric not found", body = ErrorResponse),
(status = 500, description = "Internal server error", body = ErrorResponse)
),
tag = "metrics"
)]
pub async fn aggregate_metrics(
State(state): State<AppState>,
Query(query): Query<AggregateQuery>,
) -> Result<Json<AggregateResponse>, (StatusCode, Json<ErrorResponse>)> {
let cache_key = QueryCache::make_key(&query);
if let Some(cached) = state.cache.metrics.get(&cache_key) {
if let Ok(response) = serde_json::from_str(&cached) {
return Ok(Json(response));
}
}
let mut params = QueryParams::default();
if let Some(start) = query.start_time {
params.start_time = Some(start);
}
if let Some(end) = query.end_time {
params.end_time = Some(end);
}
let metrics = state.storage.query_metrics(¶ms).await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::storage_error(format!(
"aggregate metrics: {}",
e
))),
)
})?;
let metrics: Vec<_> = metrics
.into_iter()
.filter(|m| m.name == query.name)
.collect();
if metrics.is_empty() {
return Err((
StatusCode::NOT_FOUND,
Json(ErrorResponse::not_found(format!("Metric '{}'", query.name))),
));
}
let result = match query.function.as_str() {
"sum" => {
let mut sum = 0.0;
let mut count = 0;
for metric in &metrics {
match &metric.metric_type {
MetricType::Gauge(v) => {
sum += v;
count += 1;
},
MetricType::Counter(v) => {
sum += *v as f64;
count += 1;
},
MetricType::Histogram { sum: s, .. } => {
sum += s;
count += 1;
},
MetricType::Summary { sum: s, .. } => {
sum += s;
count += 1;
},
}
}
AggregateResponse {
name: query.name.clone(),
function: "sum".to_string(),
result: sum,
count,
buckets: None,
}
},
"avg" => {
let mut sum = 0.0;
let mut count = 0;
for metric in &metrics {
match &metric.metric_type {
MetricType::Gauge(v) => {
sum += v;
count += 1;
},
MetricType::Counter(v) => {
sum += *v as f64;
count += 1;
},
MetricType::Histogram {
sum: s, count: c, ..
} => {
sum += s;
count += *c as usize;
},
MetricType::Summary {
sum: s, count: c, ..
} => {
sum += s;
count += *c as usize;
},
}
}
let avg = if count > 0 { sum / count as f64 } else { 0.0 };
AggregateResponse {
name: query.name.clone(),
function: "avg".to_string(),
result: avg,
count,
buckets: None,
}
},
"min" => {
let mut min = f64::MAX;
let mut count = 0;
for metric in &metrics {
match &metric.metric_type {
MetricType::Gauge(v) => {
min = min.min(*v);
count += 1;
},
MetricType::Counter(v) => {
min = min.min(*v as f64);
count += 1;
},
_ => {},
}
}
AggregateResponse {
name: query.name.clone(),
function: "min".to_string(),
result: if count > 0 { min } else { 0.0 },
count,
buckets: None,
}
},
"max" => {
let mut max = f64::MIN;
let mut count = 0;
for metric in &metrics {
match &metric.metric_type {
MetricType::Gauge(v) => {
max = max.max(*v);
count += 1;
},
MetricType::Counter(v) => {
max = max.max(*v as f64);
count += 1;
},
_ => {},
}
}
AggregateResponse {
name: query.name.clone(),
function: "max".to_string(),
result: if count > 0 { max } else { 0.0 },
count,
buckets: None,
}
},
_ => {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse::bad_request(format!(
"Invalid aggregation function '{}'. Use: sum, avg, min, max",
query.function
))),
))
},
};
let result = if let Some(bucket_size) = query.bucket_size {
let bucket_size_ns = bucket_size * 1_000_000_000;
let mut buckets: HashMap<i64, Vec<&Metric>> = HashMap::new();
for metric in &metrics {
let bucket_timestamp = (metric.timestamp / bucket_size_ns) * bucket_size_ns;
buckets.entry(bucket_timestamp).or_default().push(metric);
}
let mut time_buckets: Vec<TimeBucket> = buckets
.into_iter()
.map(|(timestamp, bucket_metrics)| {
let (sum, count) = match query.function.as_str() {
"sum" => {
let mut sum = 0.0;
let mut count = 0;
for metric in bucket_metrics {
match &metric.metric_type {
MetricType::Gauge(v) => {
sum += v;
count += 1;
},
MetricType::Counter(v) => {
sum += *v as f64;
count += 1;
},
MetricType::Histogram { sum: s, .. } => {
sum += s;
count += 1;
},
MetricType::Summary { sum: s, .. } => {
sum += s;
count += 1;
},
}
}
(sum, count)
},
"avg" => {
let mut sum = 0.0;
let mut count = 0;
for metric in bucket_metrics {
match &metric.metric_type {
MetricType::Gauge(v) => {
sum += v;
count += 1;
},
MetricType::Counter(v) => {
sum += *v as f64;
count += 1;
},
MetricType::Histogram {
sum: s, count: c, ..
} => {
sum += s;
count += *c as usize;
},
MetricType::Summary {
sum: s, count: c, ..
} => {
sum += s;
count += *c as usize;
},
}
}
let avg = if count > 0 { sum / count as f64 } else { 0.0 };
(avg, count)
},
_ => (0.0, 0),
};
TimeBucket {
timestamp,
value: sum,
count,
}
})
.collect();
time_buckets.sort_by_key(|b| b.timestamp);
AggregateResponse {
buckets: Some(time_buckets),
..result
}
} else {
result
};
Ok(Json(result))
}
#[utoipa::path(
get,
path = "/api/metrics/{name}/timeseries",
params(
("name" = String, Path, description = "Metric name"),
("start_time" = Option<i64>, Query, description = "Start time (nanoseconds since Unix epoch)"),
("end_time" = Option<i64>, Query, description = "End time (nanoseconds since Unix epoch)"),
("step" = Option<i64>, Query, description = "Time step in seconds (default: 60)")
),
responses(
(status = 200, description = "Time-series data points", body = Vec<TimeBucket>),
(status = 404, description = "Metric not found", body = ErrorResponse),
(status = 500, description = "Internal server error", body = ErrorResponse)
),
tag = "metrics"
)]
pub async fn get_metric_timeseries(
State(state): State<AppState>,
axum::extract::Path(name): axum::extract::Path<String>,
Query(query): Query<TimeseriesQuery>,
) -> Result<Json<Vec<TimeBucket>>, (StatusCode, Json<ErrorResponse>)> {
let cache_key = QueryCache::make_key(&(&name, &query));
if let Some(cached) = state.cache.metrics.get(&cache_key) {
if let Ok(response) = serde_json::from_str(&cached) {
return Ok(Json(response));
}
}
let mut params = QueryParams::default();
if let Some(start) = query.start_time {
params.start_time = Some(start);
}
if let Some(end) = query.end_time {
params.end_time = Some(end);
}
let metrics = state.storage.query_metrics(¶ms).await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::storage_error(format!(
"query metric timeseries: {}",
e
))),
)
})?;
let metrics: Vec<_> = metrics.into_iter().filter(|m| m.name == name).collect();
if metrics.is_empty() {
let all_params = QueryParams::default();
let all_metrics = state
.storage
.query_metrics(&all_params)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::storage_error(format!(
"check metric existence: {}",
e
))),
)
})?;
let exists = all_metrics.iter().any(|m| m.name == name);
if !exists {
return Err((
StatusCode::NOT_FOUND,
Json(ErrorResponse::not_found(format!("Metric '{}'", name))),
));
}
return Ok(Json(vec![]));
}
let step_seconds = query.step.unwrap_or(60);
let bucket_size_ns = step_seconds * 1_000_000_000;
let mut buckets: HashMap<i64, Vec<&Metric>> = HashMap::new();
for metric in &metrics {
let bucket_timestamp = (metric.timestamp / bucket_size_ns) * bucket_size_ns;
buckets.entry(bucket_timestamp).or_default().push(metric);
}
let mut time_buckets: Vec<TimeBucket> = buckets
.into_iter()
.map(|(timestamp, bucket_metrics)| {
let mut sum = 0.0;
let mut count = 0;
for metric in bucket_metrics {
match &metric.metric_type {
MetricType::Gauge(v) => {
sum += v;
count += 1;
},
MetricType::Counter(v) => {
sum += *v as f64;
count += 1;
},
MetricType::Histogram {
sum: s, count: c, ..
} => {
sum += s;
count += *c as usize;
},
MetricType::Summary {
sum: s, count: c, ..
} => {
sum += s;
count += *c as usize;
},
}
}
let value = if count > 0 { sum / count as f64 } else { 0.0 };
TimeBucket {
timestamp,
value,
count,
}
})
.collect();
time_buckets.sort_by_key(|b| b.timestamp);
if let Ok(json) = serde_json::to_string(&time_buckets) {
state.cache.metrics.insert(cache_key, json);
}
Ok(Json(time_buckets))
}
#[derive(Debug, Deserialize, Serialize, utoipa::IntoParams, utoipa::ToSchema)]
pub struct TimeseriesQuery {
pub start_time: Option<i64>,
pub end_time: Option<i64>,
pub step: Option<i64>,
}
#[utoipa::path(
get,
path = "/api/metrics/export",
params(MetricsQuery),
responses(
(status = 200, description = "Exported metrics in JSON format"),
(status = 500, description = "Internal server error", body = ErrorResponse)
),
tag = "metrics"
)]
pub async fn export_metrics(
State(state): State<AppState>,
Query(query): Query<MetricsQuery>,
) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
let mut params = QueryParams::default();
if let Some(start) = query.start_time {
params.start_time = Some(start);
}
if let Some(end) = query.end_time {
params.end_time = Some(end);
}
let metrics = state.storage.query_metrics(¶ms).await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::storage_error(format!(
"export metrics: {}",
e
))),
)
})?;
let metrics: Vec<_> = if let Some(name_filter) = &query.name {
metrics
.into_iter()
.filter(|m| m.name.contains(name_filter))
.collect()
} else {
metrics
};
let json = serde_json::to_string_pretty(&metrics).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::internal_error(format!(
"Failed to serialize metrics: {}",
e
))),
)
})?;
Ok((StatusCode::OK, [("Content-Type", "application/json")], json))
}