use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use axum::{
extract::{Query, State},
http::StatusCode,
response::IntoResponse,
routing::get,
Json, Router,
};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tracing::info;
use super::prometheus_export::PrometheusMetrics;
#[derive(Debug, Clone)]
pub struct HttpServerConfig {
pub bind_addr: SocketAddr,
pub enable_cors: bool,
pub shutdown_timeout: Duration,
}
impl Default for HttpServerConfig {
fn default() -> Self {
Self {
bind_addr: "0.0.0.0:9090".parse().unwrap(),
enable_cors: true,
shutdown_timeout: Duration::from_secs(5),
}
}
}
#[derive(Clone)]
pub struct HttpServerState {
pub metrics: Arc<PrometheusMetrics>,
pub status_provider: Arc<dyn StatusProvider + Send + Sync>,
pub health_checker: Arc<dyn HealthChecker + Send + Sync>,
pub start_time: Instant,
}
pub trait StatusProvider: Send + Sync {
fn get_status(&self) -> StatusResponse;
}
pub trait HealthChecker: Send + Sync {
fn is_alive(&self) -> bool;
fn is_ready(&self) -> bool;
fn health_details(&self) -> HealthDetails;
}
pub struct DefaultStatusProvider {
uplinks: Arc<RwLock<Vec<UplinkStatus>>>,
sessions: Arc<RwLock<Vec<SessionStatus>>>,
}
impl DefaultStatusProvider {
pub fn new() -> Self {
Self {
uplinks: Arc::new(RwLock::new(Vec::new())),
sessions: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn update_uplinks(&self, uplinks: Vec<UplinkStatus>) {
*self.uplinks.write() = uplinks;
}
pub fn update_sessions(&self, sessions: Vec<SessionStatus>) {
*self.sessions.write() = sessions;
}
}
impl StatusProvider for DefaultStatusProvider {
fn get_status(&self) -> StatusResponse {
StatusResponse {
version: crate::VERSION.to_string(),
uptime_seconds: 0, state: "running".to_string(),
uplinks: self.uplinks.read().clone(),
sessions: self.sessions.read().clone(),
total_bytes_sent: 0,
total_bytes_received: 0,
total_connections: 0,
}
}
}
pub struct DefaultHealthChecker {
alive: Arc<RwLock<bool>>,
ready: Arc<RwLock<bool>>,
}
impl DefaultHealthChecker {
pub fn new() -> Self {
Self {
alive: Arc::new(RwLock::new(true)),
ready: Arc::new(RwLock::new(false)),
}
}
pub fn set_alive(&self, alive: bool) {
*self.alive.write() = alive;
}
pub fn set_ready(&self, ready: bool) {
*self.ready.write() = ready;
}
}
impl HealthChecker for DefaultHealthChecker {
fn is_alive(&self) -> bool {
*self.alive.read()
}
fn is_ready(&self) -> bool {
*self.ready.read()
}
fn health_details(&self) -> HealthDetails {
HealthDetails {
alive: self.is_alive(),
ready: self.is_ready(),
checks: HashMap::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusResponse {
pub version: String,
pub uptime_seconds: u64,
pub state: String,
pub uplinks: Vec<UplinkStatus>,
pub sessions: Vec<SessionStatus>,
pub total_bytes_sent: u64,
pub total_bytes_received: u64,
pub total_connections: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UplinkStatus {
pub id: String,
pub interface: Option<String>,
pub remote_addr: String,
pub state: String,
pub health: String,
pub rtt_ms: f64,
pub loss_percent: f64,
pub bandwidth_mbps: f64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub nat_type: String,
pub external_addr: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionStatus {
pub id: String,
pub user_id: Option<String>,
pub remote_addrs: Vec<String>,
pub connected_at: String,
pub bytes_sent: u64,
pub bytes_received: u64,
pub uplinks_used: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthResponse {
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<HealthDetails>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthDetails {
pub alive: bool,
pub ready: bool,
pub checks: HashMap<String, CheckResult>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckResult {
pub status: String,
pub message: Option<String>,
pub duration_ms: Option<u64>,
}
pub struct MetricsHttpServer {
config: HttpServerConfig,
state: HttpServerState,
shutdown_tx: broadcast::Sender<()>,
}
impl MetricsHttpServer {
pub fn new(
config: HttpServerConfig,
metrics: Arc<PrometheusMetrics>,
status_provider: Arc<dyn StatusProvider + Send + Sync>,
health_checker: Arc<dyn HealthChecker + Send + Sync>,
) -> Self {
let (shutdown_tx, _) = broadcast::channel(1);
Self {
config,
state: HttpServerState {
metrics,
status_provider,
health_checker,
start_time: Instant::now(),
},
shutdown_tx,
}
}
pub async fn start(&self) -> Result<(), std::io::Error> {
let app = self.build_router();
let addr = self.config.bind_addr;
info!("Starting metrics HTTP server on {}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
let mut shutdown_rx = self.shutdown_tx.subscribe();
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.recv().await;
})
.await
}
fn build_router(&self) -> Router {
let state = self.state.clone();
let router = Router::new()
.route("/metrics", get(metrics_handler))
.route("/health", get(health_handler))
.route("/health/live", get(liveness_handler))
.route("/health/ready", get(readiness_handler))
.route("/ready", get(readiness_handler))
.route("/status", get(status_handler))
.route("/", get(root_handler))
.with_state(state);
#[cfg(feature = "metrics")]
let router = {
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
if self.config.enable_cors {
router
.layer(CorsLayer::new().allow_origin(Any))
.layer(TraceLayer::new_for_http())
} else {
router.layer(TraceLayer::new_for_http())
}
};
router
}
pub fn stop(&self) {
let _ = self.shutdown_tx.send(());
}
}
async fn root_handler() -> impl IntoResponse {
Json(serde_json::json!({
"service": "triglav",
"version": crate::VERSION,
"endpoints": ["/metrics", "/health", "/health/live", "/health/ready", "/status"]
}))
}
async fn metrics_handler(State(state): State<HttpServerState>) -> impl IntoResponse {
match state.metrics.encode() {
Ok(metrics) => (
StatusCode::OK,
[("content-type", "text/plain; charset=utf-8")],
metrics,
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
[("content-type", "text/plain; charset=utf-8")],
format!("Failed to encode metrics: {}", e),
),
}
}
#[derive(Debug, Deserialize)]
struct HealthQuery {
#[serde(default)]
verbose: bool,
}
async fn health_handler(
State(state): State<HttpServerState>,
Query(query): Query<HealthQuery>,
) -> impl IntoResponse {
let alive = state.health_checker.is_alive();
let ready = state.health_checker.is_ready();
let status = if alive && ready {
"healthy"
} else if alive {
"degraded"
} else {
"unhealthy"
};
let status_code = if alive {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
let response = HealthResponse {
status: status.to_string(),
details: if query.verbose {
Some(state.health_checker.health_details())
} else {
None
},
};
(status_code, Json(response))
}
async fn liveness_handler(State(state): State<HttpServerState>) -> impl IntoResponse {
if state.health_checker.is_alive() {
(StatusCode::OK, "OK")
} else {
(StatusCode::SERVICE_UNAVAILABLE, "NOT OK")
}
}
async fn readiness_handler(State(state): State<HttpServerState>) -> impl IntoResponse {
if state.health_checker.is_ready() {
(StatusCode::OK, "READY")
} else {
(StatusCode::SERVICE_UNAVAILABLE, "NOT READY")
}
}
async fn status_handler(State(state): State<HttpServerState>) -> impl IntoResponse {
let mut status = state.status_provider.get_status();
status.uptime_seconds = state.start_time.elapsed().as_secs();
Json(status)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_health_response_serialization() {
let response = HealthResponse {
status: "healthy".to_string(),
details: None,
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("healthy"));
assert!(!json.contains("details"));
}
#[test]
fn test_default_health_checker() {
let checker = DefaultHealthChecker::new();
assert!(checker.is_alive());
assert!(!checker.is_ready());
checker.set_ready(true);
assert!(checker.is_ready());
}
}