use bytes::Bytes;
use http::{Response, StatusCode};
use http_body_util::Full;
use serde::Serialize;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, info, trace};
use zentinel_config::{BuiltinHandler, Config};
use crate::cache::{CacheManager, HttpCacheStats};
pub struct BuiltinHandlerState {
start_time: Instant,
version: String,
instance_id: String,
}
impl BuiltinHandlerState {
pub fn new(version: String, instance_id: String) -> Self {
Self {
start_time: Instant::now(),
version,
instance_id,
}
}
pub fn uptime(&self) -> Duration {
self.start_time.elapsed()
}
pub fn uptime_string(&self) -> String {
let uptime = self.uptime();
let secs = uptime.as_secs();
let days = secs / 86400;
let hours = (secs % 86400) / 3600;
let mins = (secs % 3600) / 60;
let secs = secs % 60;
if days > 0 {
format!("{}d {}h {}m {}s", days, hours, mins, secs)
} else if hours > 0 {
format!("{}h {}m {}s", hours, mins, secs)
} else if mins > 0 {
format!("{}m {}s", mins, secs)
} else {
format!("{}s", secs)
}
}
}
#[derive(Debug, Serialize)]
pub struct StatusResponse {
pub status: &'static str,
pub version: String,
pub uptime: String,
pub uptime_secs: u64,
pub instance_id: String,
pub timestamp: String,
}
#[derive(Debug, Serialize)]
pub struct HealthResponse {
pub status: &'static str,
pub timestamp: String,
}
#[derive(Debug, Clone, Default)]
pub struct UpstreamHealthSnapshot {
pub upstreams: HashMap<String, UpstreamStatus>,
}
#[derive(Debug, Clone, Serialize)]
pub struct UpstreamStatus {
pub id: String,
pub load_balancing: String,
pub targets: Vec<TargetStatus>,
}
#[derive(Debug, Clone, Serialize)]
pub struct TargetStatus {
pub address: String,
pub weight: u32,
pub status: TargetHealthStatus,
pub failure_rate: Option<f64>,
pub last_error: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum TargetHealthStatus {
Healthy,
Unhealthy,
Unknown,
}
#[derive(Debug, Clone)]
pub struct CachePurgeRequest {
pub pattern: String,
pub wildcard: bool,
}
pub fn execute_handler(
handler: BuiltinHandler,
state: &BuiltinHandlerState,
request_id: &str,
config: Option<Arc<Config>>,
upstreams: Option<UpstreamHealthSnapshot>,
cache_stats: Option<Arc<HttpCacheStats>>,
cache_purge: Option<CachePurgeRequest>,
cache_manager: Option<&Arc<CacheManager>>,
) -> Response<Full<Bytes>> {
trace!(
handler = ?handler,
request_id = %request_id,
"Executing builtin handler"
);
let response = match handler {
BuiltinHandler::Status => status_handler(state, request_id),
BuiltinHandler::Health => health_handler(request_id),
BuiltinHandler::Metrics => metrics_handler(request_id, cache_stats.as_ref()),
BuiltinHandler::NotFound => not_found_handler(request_id),
BuiltinHandler::Config => config_handler(config, request_id),
BuiltinHandler::Upstreams => upstreams_handler(upstreams, request_id),
BuiltinHandler::CachePurge => cache_purge_handler(cache_purge, cache_manager, request_id),
BuiltinHandler::CacheStats => cache_stats_handler(cache_stats, request_id),
};
debug!(
handler = ?handler,
request_id = %request_id,
status = response.status().as_u16(),
"Builtin handler completed"
);
response
}
fn status_handler(state: &BuiltinHandlerState, request_id: &str) -> Response<Full<Bytes>> {
trace!(
request_id = %request_id,
uptime_secs = state.uptime().as_secs(),
"Generating status response"
);
let response = StatusResponse {
status: "ok",
version: state.version.clone(),
uptime: state.uptime_string(),
uptime_secs: state.uptime().as_secs(),
instance_id: state.instance_id.clone(),
timestamp: chrono::Utc::now().to_rfc3339(),
};
let body =
serde_json::to_vec_pretty(&response).unwrap_or_else(|_| b"{\"status\":\"ok\"}".to_vec());
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json; charset=utf-8")
.header("X-Request-Id", request_id)
.header("Cache-Control", "no-cache, no-store, must-revalidate")
.body(Full::new(Bytes::from(body)))
.expect("static response builder with valid headers cannot fail")
}
fn health_handler(request_id: &str) -> Response<Full<Bytes>> {
let response = HealthResponse {
status: "healthy",
timestamp: chrono::Utc::now().to_rfc3339(),
};
let body =
serde_json::to_vec(&response).unwrap_or_else(|_| b"{\"status\":\"healthy\"}".to_vec());
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json; charset=utf-8")
.header("X-Request-Id", request_id)
.header("Cache-Control", "no-cache, no-store, must-revalidate")
.body(Full::new(Bytes::from(body)))
.expect("static response builder with valid headers cannot fail")
}
fn metrics_handler(
request_id: &str,
cache_stats: Option<&Arc<HttpCacheStats>>,
) -> Response<Full<Bytes>> {
use prometheus::{Encoder, TextEncoder};
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
let mut buffer = Vec::new();
match encoder.encode(&metric_families, &mut buffer) {
Ok(()) => {
let extra_metrics = format!(
"# HELP zentinel_up Zentinel proxy is up and running\n\
# TYPE zentinel_up gauge\n\
zentinel_up 1\n\
# HELP zentinel_build_info Build information\n\
# TYPE zentinel_build_info gauge\n\
zentinel_build_info{{version=\"{}\"}} 1\n",
env!("CARGO_PKG_VERSION")
);
buffer.extend_from_slice(extra_metrics.as_bytes());
if let Some(stats) = cache_stats {
let cache_metrics = format!(
"# HELP zentinel_cache_hits_total Total number of cache hits\n\
# TYPE zentinel_cache_hits_total counter\n\
zentinel_cache_hits_total {}\n\
# HELP zentinel_cache_misses_total Total number of cache misses\n\
# TYPE zentinel_cache_misses_total counter\n\
zentinel_cache_misses_total {}\n\
# HELP zentinel_cache_stores_total Total number of cache stores\n\
# TYPE zentinel_cache_stores_total counter\n\
zentinel_cache_stores_total {}\n\
# HELP zentinel_cache_hit_ratio Cache hit ratio (0.0 to 1.0)\n\
# TYPE zentinel_cache_hit_ratio gauge\n\
zentinel_cache_hit_ratio {:.4}\n\
# HELP zentinel_cache_memory_hits_total Cache hits from memory tier\n\
# TYPE zentinel_cache_memory_hits_total counter\n\
zentinel_cache_memory_hits_total {}\n\
# HELP zentinel_cache_disk_hits_total Cache hits from disk tier\n\
# TYPE zentinel_cache_disk_hits_total counter\n\
zentinel_cache_disk_hits_total {}\n",
stats.hits(),
stats.misses(),
stats.stores(),
stats.hit_ratio(),
stats.memory_hits(),
stats.disk_hits()
);
buffer.extend_from_slice(cache_metrics.as_bytes());
}
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", encoder.format_type())
.header("X-Request-Id", request_id)
.body(Full::new(Bytes::from(buffer)))
.expect("static response builder with valid headers cannot fail")
}
Err(e) => {
tracing::error!(error = %e, "Failed to encode Prometheus metrics");
let error_body = format!("# ERROR: Failed to encode metrics: {}\n", e);
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header("Content-Type", "text/plain; charset=utf-8")
.header("X-Request-Id", request_id)
.body(Full::new(Bytes::from(error_body)))
.expect("static response builder with valid headers cannot fail")
}
}
}
fn not_found_handler(request_id: &str) -> Response<Full<Bytes>> {
let body = serde_json::json!({
"error": "Not Found",
"status": 404,
"message": "The requested resource could not be found.",
"request_id": request_id,
"timestamp": chrono::Utc::now().to_rfc3339(),
});
let body_bytes = serde_json::to_vec_pretty(&body)
.unwrap_or_else(|_| b"{\"error\":\"Not Found\",\"status\":404}".to_vec());
Response::builder()
.status(StatusCode::NOT_FOUND)
.header("Content-Type", "application/json; charset=utf-8")
.header("X-Request-Id", request_id)
.body(Full::new(Bytes::from(body_bytes)))
.expect("static response builder with valid headers cannot fail")
}
fn config_handler(config: Option<Arc<Config>>, request_id: &str) -> Response<Full<Bytes>> {
let body = match &config {
Some(cfg) => {
let response = serde_json::json!({
"timestamp": chrono::Utc::now().to_rfc3339(),
"request_id": request_id,
"config": {
"server": &cfg.server,
"listeners": cfg.listeners.iter().map(|l| {
serde_json::json!({
"id": l.id,
"address": l.address,
"protocol": l.protocol,
"default_route": l.default_route,
"request_timeout_secs": l.request_timeout_secs,
"keepalive_timeout_secs": l.keepalive_timeout_secs,
"tls_enabled": l.tls.is_some(),
})
}).collect::<Vec<_>>(),
"routes": cfg.routes.iter().map(|r| {
serde_json::json!({
"id": r.id,
"priority": r.priority,
"matches": r.matches,
"upstream": r.upstream,
"service_type": r.service_type,
"builtin_handler": r.builtin_handler,
"filters": r.filters,
"waf_enabled": r.waf_enabled,
})
}).collect::<Vec<_>>(),
"upstreams": cfg.upstreams.iter().map(|(id, u)| {
serde_json::json!({
"id": id,
"targets": u.targets.iter().map(|t| {
serde_json::json!({
"address": t.address,
"weight": t.weight,
})
}).collect::<Vec<_>>(),
"load_balancing": u.load_balancing,
"health_check": u.health_check.as_ref().map(|h| {
serde_json::json!({
"interval_secs": h.interval_secs,
"timeout_secs": h.timeout_secs,
"healthy_threshold": h.healthy_threshold,
"unhealthy_threshold": h.unhealthy_threshold,
})
}),
"tls_enabled": u.tls.is_some(),
})
}).collect::<Vec<_>>(),
"agents": cfg.agents.iter().map(|a| {
serde_json::json!({
"id": a.id,
"agent_type": a.agent_type,
"timeout_ms": a.timeout_ms,
})
}).collect::<Vec<_>>(),
"filters": cfg.filters.keys().collect::<Vec<_>>(),
"waf": cfg.waf.as_ref().map(|w| {
serde_json::json!({
"mode": w.mode,
"engine": w.engine,
"audit_log": w.audit_log,
})
}),
"limits": &cfg.limits,
}
});
serde_json::to_vec_pretty(&response).unwrap_or_else(|e| {
serde_json::to_vec(&serde_json::json!({
"error": "Failed to serialize config",
"message": e.to_string(),
}))
.unwrap_or_default()
})
}
None => serde_json::to_vec_pretty(&serde_json::json!({
"error": "Configuration unavailable",
"status": 503,
"message": "Config manager not available",
"request_id": request_id,
"timestamp": chrono::Utc::now().to_rfc3339(),
}))
.unwrap_or_default(),
};
let status = if config.is_some() {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
Response::builder()
.status(status)
.header("Content-Type", "application/json; charset=utf-8")
.header("X-Request-Id", request_id)
.header("Cache-Control", "no-cache, no-store, must-revalidate")
.body(Full::new(Bytes::from(body)))
.expect("static response builder with valid headers cannot fail")
}
fn upstreams_handler(
snapshot: Option<UpstreamHealthSnapshot>,
request_id: &str,
) -> Response<Full<Bytes>> {
let body = match snapshot {
Some(data) => {
let mut total_healthy = 0;
let mut total_unhealthy = 0;
let mut total_unknown = 0;
for upstream in data.upstreams.values() {
for target in &upstream.targets {
match target.status {
TargetHealthStatus::Healthy => total_healthy += 1,
TargetHealthStatus::Unhealthy => total_unhealthy += 1,
TargetHealthStatus::Unknown => total_unknown += 1,
}
}
}
let response = serde_json::json!({
"timestamp": chrono::Utc::now().to_rfc3339(),
"request_id": request_id,
"summary": {
"total_upstreams": data.upstreams.len(),
"total_targets": total_healthy + total_unhealthy + total_unknown,
"healthy": total_healthy,
"unhealthy": total_unhealthy,
"unknown": total_unknown,
},
"upstreams": data.upstreams.values().collect::<Vec<_>>(),
});
serde_json::to_vec_pretty(&response).unwrap_or_else(|e| {
serde_json::to_vec(&serde_json::json!({
"error": "Failed to serialize upstreams",
"message": e.to_string(),
}))
.unwrap_or_default()
})
}
None => {
serde_json::to_vec_pretty(&serde_json::json!({
"timestamp": chrono::Utc::now().to_rfc3339(),
"request_id": request_id,
"summary": {
"total_upstreams": 0,
"total_targets": 0,
"healthy": 0,
"unhealthy": 0,
"unknown": 0,
},
"upstreams": [],
"message": "No upstreams configured",
}))
.unwrap_or_default()
}
};
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json; charset=utf-8")
.header("X-Request-Id", request_id)
.header("Cache-Control", "no-cache, no-store, must-revalidate")
.body(Full::new(Bytes::from(body)))
.expect("static response builder with valid headers cannot fail")
}
fn cache_purge_handler(
purge_request: Option<CachePurgeRequest>,
cache_manager: Option<&Arc<CacheManager>>,
request_id: &str,
) -> Response<Full<Bytes>> {
let body = match (&purge_request, cache_manager) {
(Some(request), Some(manager)) => {
info!(
pattern = %request.pattern,
wildcard = request.wildcard,
request_id = %request_id,
"Processing cache purge request"
);
let purged_count = if request.wildcard {
manager.purge_wildcard(&request.pattern)
} else {
manager.purge(&request.pattern)
};
info!(
pattern = %request.pattern,
wildcard = request.wildcard,
purged_count = purged_count,
request_id = %request_id,
"Cache purge completed"
);
serde_json::to_vec_pretty(&serde_json::json!({
"status": "ok",
"message": "Cache purge request processed",
"pattern": request.pattern,
"wildcard": request.wildcard,
"purged_entries": purged_count,
"active_purges": manager.active_purge_count(),
"request_id": request_id,
"timestamp": chrono::Utc::now().to_rfc3339(),
}))
.unwrap_or_default()
}
(Some(request), None) => {
tracing::warn!(
pattern = %request.pattern,
request_id = %request_id,
"Cache purge requested but cache manager not available"
);
serde_json::to_vec_pretty(&serde_json::json!({
"status": "warning",
"message": "Cache purge acknowledged but cache manager unavailable",
"pattern": request.pattern,
"wildcard": request.wildcard,
"purged_entries": 0,
"request_id": request_id,
"timestamp": chrono::Utc::now().to_rfc3339(),
}))
.unwrap_or_default()
}
(None, _) => {
serde_json::to_vec_pretty(&serde_json::json!({
"error": "Bad Request",
"status": 400,
"message": "Cache purge requires a pattern. Use PURGE /path or X-Purge-Pattern header.",
"request_id": request_id,
"timestamp": chrono::Utc::now().to_rfc3339(),
})).unwrap_or_default()
}
};
let status = if purge_request.is_some() {
StatusCode::OK
} else {
StatusCode::BAD_REQUEST
};
Response::builder()
.status(status)
.header("Content-Type", "application/json; charset=utf-8")
.header("X-Request-Id", request_id)
.header("Cache-Control", "no-cache, no-store, must-revalidate")
.body(Full::new(Bytes::from(body)))
.expect("static response builder with valid headers cannot fail")
}
#[derive(Debug, Serialize)]
struct CacheStatsResponse {
hits: u64,
misses: u64,
stores: u64,
evictions: u64,
hit_ratio: f64,
memory_hits: u64,
disk_hits: u64,
request_id: String,
timestamp: String,
}
fn cache_stats_handler(
cache_stats: Option<Arc<HttpCacheStats>>,
request_id: &str,
) -> Response<Full<Bytes>> {
let body = match cache_stats {
Some(stats) => {
let response = CacheStatsResponse {
hits: stats.hits(),
misses: stats.misses(),
stores: stats.stores(),
evictions: stats.evictions(),
hit_ratio: stats.hit_ratio(),
memory_hits: stats.memory_hits(),
disk_hits: stats.disk_hits(),
request_id: request_id.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
};
serde_json::to_vec_pretty(&response)
.unwrap_or_else(|_| b"{\"error\":\"Failed to serialize stats\"}".to_vec())
}
None => serde_json::to_vec_pretty(&serde_json::json!({
"hits": 0,
"misses": 0,
"stores": 0,
"evictions": 0,
"hit_ratio": 0.0,
"memory_hits": 0,
"disk_hits": 0,
"message": "Cache statistics not available",
"request_id": request_id,
"timestamp": chrono::Utc::now().to_rfc3339(),
}))
.unwrap_or_default(),
};
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json; charset=utf-8")
.header("X-Request-Id", request_id)
.header("Cache-Control", "no-cache, no-store, must-revalidate")
.body(Full::new(Bytes::from(body)))
.expect("static response builder with valid headers cannot fail")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_status_handler() {
let state = BuiltinHandlerState::new("0.1.0".to_string(), "test-instance".to_string());
let response = status_handler(&state, "test-request-id");
assert_eq!(response.status(), StatusCode::OK);
let content_type = response.headers().get("Content-Type").unwrap();
assert_eq!(content_type, "application/json; charset=utf-8");
}
#[test]
fn test_health_handler() {
let response = health_handler("test-request-id");
assert_eq!(response.status(), StatusCode::OK);
}
#[test]
fn test_metrics_handler() {
let response = metrics_handler("test-request-id", None);
assert_eq!(response.status(), StatusCode::OK);
let content_type = response.headers().get("Content-Type").unwrap();
assert!(content_type.to_str().unwrap().contains("text/plain"));
}
#[test]
fn test_metrics_handler_with_cache_stats() {
let stats = Arc::new(HttpCacheStats::default());
stats.record_hit();
stats.record_miss();
stats.record_store();
let response = metrics_handler("test-request-id", Some(&stats));
assert_eq!(response.status(), StatusCode::OK);
}
#[test]
fn test_cache_purge_handler_with_request() {
let cache_manager = Arc::new(CacheManager::new());
let request = CachePurgeRequest {
pattern: "/api/users/*".to_string(),
wildcard: true,
};
let response = cache_purge_handler(Some(request), Some(&cache_manager), "test-request-id");
assert_eq!(response.status(), StatusCode::OK);
assert!(cache_manager.active_purge_count() > 0);
}
#[test]
fn test_cache_purge_handler_single_entry() {
let cache_manager = Arc::new(CacheManager::new());
let request = CachePurgeRequest {
pattern: "/api/users/123".to_string(),
wildcard: false,
};
let response = cache_purge_handler(Some(request), Some(&cache_manager), "test-request-id");
assert_eq!(response.status(), StatusCode::OK);
assert!(cache_manager.should_invalidate("/api/users/123"));
}
#[test]
fn test_cache_purge_handler_without_request() {
let cache_manager = Arc::new(CacheManager::new());
let response = cache_purge_handler(None, Some(&cache_manager), "test-request-id");
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[test]
fn test_cache_purge_handler_without_manager() {
let request = CachePurgeRequest {
pattern: "/api/users/*".to_string(),
wildcard: true,
};
let response = cache_purge_handler(Some(request), None, "test-request-id");
assert_eq!(response.status(), StatusCode::OK);
}
#[test]
fn test_cache_stats_handler_with_stats() {
let stats = Arc::new(HttpCacheStats::default());
stats.record_hit();
stats.record_hit();
stats.record_miss();
let response = cache_stats_handler(Some(stats), "test-request-id");
assert_eq!(response.status(), StatusCode::OK);
let content_type = response.headers().get("Content-Type").unwrap();
assert_eq!(content_type, "application/json; charset=utf-8");
}
#[test]
fn test_cache_stats_handler_without_stats() {
let response = cache_stats_handler(None, "test-request-id");
assert_eq!(response.status(), StatusCode::OK);
}
#[test]
fn test_not_found_handler() {
let response = not_found_handler("test-request-id");
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
#[test]
fn test_config_handler_with_config() {
let config = Arc::new(Config::default_for_testing());
let response = config_handler(Some(config), "test-request-id");
assert_eq!(response.status(), StatusCode::OK);
let content_type = response.headers().get("Content-Type").unwrap();
assert_eq!(content_type, "application/json; charset=utf-8");
}
#[test]
fn test_config_handler_without_config() {
let response = config_handler(None, "test-request-id");
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
}
#[test]
fn test_upstreams_handler_with_data() {
let mut upstreams = HashMap::new();
upstreams.insert(
"backend".to_string(),
UpstreamStatus {
id: "backend".to_string(),
load_balancing: "round_robin".to_string(),
targets: vec![
TargetStatus {
address: "10.0.0.1:8080".to_string(),
weight: 1,
status: TargetHealthStatus::Healthy,
failure_rate: Some(0.0),
last_error: None,
},
TargetStatus {
address: "10.0.0.2:8080".to_string(),
weight: 1,
status: TargetHealthStatus::Unhealthy,
failure_rate: Some(0.8),
last_error: Some("connection refused".to_string()),
},
],
},
);
let snapshot = UpstreamHealthSnapshot { upstreams };
let response = upstreams_handler(Some(snapshot), "test-request-id");
assert_eq!(response.status(), StatusCode::OK);
let content_type = response.headers().get("Content-Type").unwrap();
assert_eq!(content_type, "application/json; charset=utf-8");
}
#[test]
fn test_upstreams_handler_no_upstreams() {
let response = upstreams_handler(None, "test-request-id");
assert_eq!(response.status(), StatusCode::OK);
}
#[test]
fn test_uptime_formatting() {
let state = BuiltinHandlerState::new("0.1.0".to_string(), "test".to_string());
let uptime = state.uptime_string();
assert!(!uptime.is_empty());
}
}