use axum::{
extract::State,
http::{Request, StatusCode},
middleware::Next,
response::Response,
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use uuid::Uuid;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MetricEvent {
#[serde(rename = "@timestamp")]
pub timestamp: DateTime<Utc>,
pub event_id: String,
pub event_type: String,
pub method: String,
pub path: String,
pub status_code: u16,
pub duration_ms: u64,
pub client_ip: Option<String>,
pub user_agent: Option<String>,
pub error_message: Option<String>,
pub api_key_id: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SystemMetrics {
#[serde(rename = "@timestamp")]
pub timestamp: DateTime<Utc>,
pub metric_type: String,
pub cpu_usage: f32,
pub memory_usage_mb: u64,
pub active_connections: u32,
pub total_requests: u64,
pub error_rate: f32,
pub avg_response_time_ms: f64,
}
pub async fn metrics_middleware(
State(state): State<Arc<crate::AppState>>,
request: Request<axum::body::Body>,
next: Next,
) -> Result<Response, StatusCode> {
let start = Instant::now();
let method = request.method().to_string();
let path = request.uri().path().to_string();
let client_ip = request
.headers()
.get("x-forwarded-for")
.or_else(|| request.headers().get("x-real-ip"))
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let user_agent = request
.headers()
.get("user-agent")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let api_key_id = request
.headers()
.get("x-api-key")
.and_then(|v| v.to_str().ok())
.map(|k| format!("key_{}...", &k[..8.min(k.len())]));
let response = next.run(request).await;
let duration = start.elapsed();
let status_code = response.status().as_u16();
let metric_event = MetricEvent {
timestamp: Utc::now(),
event_id: Uuid::new_v4().to_string(),
event_type: "api_request".to_string(),
method,
path: path.clone(),
status_code,
duration_ms: duration.as_millis() as u64,
client_ip,
user_agent,
error_message: if status_code >= 400 {
Some(format!("HTTP {}", status_code))
} else {
None
},
api_key_id,
};
let state_clone = state.clone();
tokio::spawn(async move {
let _ = index_metric(&state_clone, metric_event).await;
});
Ok(response)
}
async fn index_metric(
state: &Arc<crate::AppState>,
metric: MetricEvent,
) -> Result<(), reqwest::Error> {
let index_name = format!("opensearch-api-metrics-{}", Utc::now().format("%Y.%m"));
let url = format!("{}/{}/_doc", state.config.opensearch_url, index_name);
state.client.post(&url).json(&metric).send().await?;
Ok(())
}
pub async fn collect_system_metrics(state: Arc<crate::AppState>) {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
let metrics = SystemMetrics {
timestamp: Utc::now(),
metric_type: "system".to_string(),
cpu_usage: get_cpu_usage(),
memory_usage_mb: get_memory_usage(),
active_connections: get_active_connections(),
total_requests: get_total_requests(),
error_rate: calculate_error_rate(),
avg_response_time_ms: calculate_avg_response_time(),
};
let index_name = format!("opensearch-api-system-{}", Utc::now().format("%Y.%m"));
let url = format!("{}/{}/_doc", state.config.opensearch_url, index_name);
let _ = state.client.post(&url).json(&metrics).send().await;
}
}
pub async fn get_metrics(
State(state): State<Arc<crate::AppState>>,
) -> Result<axum::Json<MetricsResponse>, StatusCode> {
let query = serde_json::json!({
"query": {
"range": {
"@timestamp": {
"gte": "now-5m"
}
}
},
"aggs": {
"avg_duration": {
"avg": {
"field": "duration_ms"
}
},
"status_codes": {
"terms": {
"field": "status_code"
}
},
"top_endpoints": {
"terms": {
"field": "path.keyword",
"size": 10
}
}
},
"size": 0
});
let index_pattern = format!("opensearch-api-metrics-*");
let url = format!("{}/{}/_search", state.config.opensearch_url, index_pattern);
let response = state
.client
.post(&url)
.json(&query)
.send()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if !response.status().is_success() {
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
let body: serde_json::Value = response
.json()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let avg_duration = body["aggregations"]["avg_duration"]["value"]
.as_f64()
.unwrap_or(0.0);
let total_requests = body["hits"]["total"]["value"].as_u64().unwrap_or(0);
let status_codes: Vec<StatusCodeCount> = body["aggregations"]["status_codes"]["buckets"]
.as_array()
.unwrap_or(&Vec::new())
.iter()
.map(|bucket| StatusCodeCount {
status_code: bucket["key"].as_u64().unwrap_or(0) as u16,
count: bucket["doc_count"].as_u64().unwrap_or(0),
})
.collect();
let top_endpoints: Vec<EndpointCount> = body["aggregations"]["top_endpoints"]["buckets"]
.as_array()
.unwrap_or(&Vec::new())
.iter()
.map(|bucket| EndpointCount {
endpoint: bucket["key"].as_str().unwrap_or("").to_string(),
count: bucket["doc_count"].as_u64().unwrap_or(0),
})
.collect();
Ok(axum::Json(MetricsResponse {
period: "last_5_minutes".to_string(),
total_requests,
avg_duration_ms: avg_duration,
status_codes,
top_endpoints,
}))
}
#[derive(Serialize)]
pub struct MetricsResponse {
pub period: String,
pub total_requests: u64,
pub avg_duration_ms: f64,
pub status_codes: Vec<StatusCodeCount>,
pub top_endpoints: Vec<EndpointCount>,
}
#[derive(Serialize)]
pub struct StatusCodeCount {
pub status_code: u16,
pub count: u64,
}
#[derive(Serialize)]
pub struct EndpointCount {
pub endpoint: String,
pub count: u64,
}
fn get_cpu_usage() -> f32 {
0.0
}
fn get_memory_usage() -> u64 {
0
}
fn get_active_connections() -> u32 {
0
}
fn get_total_requests() -> u64 {
0
}
fn calculate_error_rate() -> f32 {
0.0
}
fn calculate_avg_response_time() -> f64 {
0.0
}
pub async fn setup_metrics_indices(state: &Arc<crate::AppState>) -> Result<(), reqwest::Error> {
let template = serde_json::json!({
"index_patterns": ["opensearch-api-metrics-*", "opensearch-api-system-*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"index.refresh_interval": "5s"
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"event_id": {
"type": "keyword"
},
"event_type": {
"type": "keyword"
},
"method": {
"type": "keyword"
},
"path": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"status_code": {
"type": "integer"
},
"duration_ms": {
"type": "long"
},
"client_ip": {
"type": "ip"
},
"user_agent": {
"type": "text"
},
"error_message": {
"type": "text"
},
"api_key_id": {
"type": "keyword"
},
"cpu_usage": {
"type": "float"
},
"memory_usage_mb": {
"type": "long"
},
"active_connections": {
"type": "integer"
},
"total_requests": {
"type": "long"
},
"error_rate": {
"type": "float"
},
"avg_response_time_ms": {
"type": "float"
}
}
}
});
let url = format!(
"{}/_index_template/opensearch-api-metrics",
state.config.opensearch_url
);
state.client.put(&url).json(&template).send().await?;
Ok(())
}