use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::Json;
use hdrhistogram::Histogram;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use sysinfo::{Pid, ProcessesToUpdate, System};
use tracing::{debug, instrument};
pub const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
pub const SERVER_NAME: &str = env!("CARGO_PKG_NAME");
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthResponse {
pub status: String,
}
impl Default for HealthResponse {
fn default() -> Self {
Self {
status: "healthy".to_string(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusResponse {
pub version: String,
pub name: String,
pub uptime_seconds: u64,
pub captures_processed: u64,
pub active_sse_connections: u64,
pub memory: MemoryMetrics,
pub latency: LatencyMetrics,
pub status: String,
pub timestamp: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct MemoryMetrics {
pub rss_bytes: u64,
pub virtual_bytes: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub cpu_percent: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LatencyMetrics {
pub p50_ms: f64,
pub p95_ms: f64,
pub p99_ms: f64,
pub total_requests: u64,
pub mean_ms: f64,
pub max_ms: f64,
}
impl Default for LatencyMetrics {
fn default() -> Self {
Self {
p50_ms: 0.0,
p95_ms: 0.0,
p99_ms: 0.0,
total_requests: 0,
mean_ms: 0.0,
max_ms: 0.0,
}
}
}
#[derive(Debug)]
pub struct LatencyHistogram {
inner: RwLock<Histogram<u64>>,
}
impl LatencyHistogram {
pub fn new() -> Self {
let histogram =
Histogram::new_with_bounds(1, 60_000_000, 3).expect("Failed to create histogram");
Self {
inner: RwLock::new(histogram),
}
}
pub fn record(&self, latency_us: u64) {
let mut hist = self.inner.write();
let _ = hist.record(latency_us);
}
pub fn record_duration(&self, duration: std::time::Duration) {
self.record(duration.as_micros() as u64);
}
pub fn percentile(&self, percentile: f64) -> u64 {
let hist = self.inner.read();
hist.value_at_percentile(percentile)
}
pub fn count(&self) -> u64 {
let hist = self.inner.read();
hist.len()
}
pub fn mean(&self) -> f64 {
let hist = self.inner.read();
hist.mean()
}
pub fn max(&self) -> u64 {
let hist = self.inner.read();
hist.max()
}
pub fn metrics(&self) -> LatencyMetrics {
let hist = self.inner.read();
LatencyMetrics {
p50_ms: hist.value_at_percentile(50.0) as f64 / 1000.0,
p95_ms: hist.value_at_percentile(95.0) as f64 / 1000.0,
p99_ms: hist.value_at_percentile(99.0) as f64 / 1000.0,
total_requests: hist.len(),
mean_ms: hist.mean() / 1000.0,
max_ms: hist.max() as f64 / 1000.0,
}
}
pub fn reset(&self) {
let mut hist = self.inner.write();
hist.reset();
}
}
impl Default for LatencyHistogram {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct AppState {
start_time: Instant,
captures_processed: AtomicU64,
active_sse_connections: AtomicU64,
latency_histogram: LatencyHistogram,
total_requests: AtomicU64,
error_count: AtomicU64,
}
impl AppState {
pub fn new() -> Self {
Self {
start_time: Instant::now(),
captures_processed: AtomicU64::new(0),
active_sse_connections: AtomicU64::new(0),
latency_histogram: LatencyHistogram::new(),
total_requests: AtomicU64::new(0),
error_count: AtomicU64::new(0),
}
}
#[inline]
pub fn uptime_seconds(&self) -> u64 {
self.start_time.elapsed().as_secs()
}
#[inline]
pub fn start_time(&self) -> Instant {
self.start_time
}
#[inline]
pub fn captures_processed(&self) -> u64 {
self.captures_processed.load(Ordering::Relaxed)
}
#[inline]
pub fn record_capture(&self) -> u64 {
self.captures_processed.fetch_add(1, Ordering::Relaxed) + 1
}
#[inline]
pub fn active_sse_connections(&self) -> u64 {
self.active_sse_connections.load(Ordering::Relaxed)
}
#[inline]
pub fn increment_sse_connections(&self) -> u64 {
self.active_sse_connections.fetch_add(1, Ordering::Relaxed) + 1
}
#[inline]
pub fn decrement_sse_connections(&self) -> u64 {
loop {
let current = self.active_sse_connections.load(Ordering::Relaxed);
if current == 0 {
return 0;
}
match self.active_sse_connections.compare_exchange_weak(
current,
current - 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return current - 1,
Err(_) => continue,
}
}
}
#[inline]
pub fn record_latency_us(&self, latency_us: u64) {
self.latency_histogram.record(latency_us);
self.total_requests.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_latency(&self, duration: std::time::Duration) {
self.latency_histogram.record_duration(duration);
self.total_requests.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn latency_metrics(&self) -> LatencyMetrics {
self.latency_histogram.metrics()
}
#[inline]
pub fn total_requests(&self) -> u64 {
self.total_requests.load(Ordering::Relaxed)
}
#[inline]
pub fn record_error(&self) -> u64 {
self.error_count.fetch_add(1, Ordering::Relaxed) + 1
}
#[inline]
pub fn error_count(&self) -> u64 {
self.error_count.load(Ordering::Relaxed)
}
pub fn health(&self) -> serde_json::Value {
let total_requests = self.total_requests();
let error_count = self.error_count();
let error_rate = if total_requests > 0 {
(error_count as f64 / total_requests as f64) * 100.0
} else {
0.0
};
let latency = self.latency_metrics();
serde_json::json!({
"status": if error_rate < 5.0 { "healthy" } else if error_rate < 20.0 { "degraded" } else { "unhealthy" },
"uptime_seconds": self.uptime_seconds(),
"requests": {
"total": total_requests,
"errors": error_count,
"error_rate_percent": format!("{:.2}", error_rate)
},
"connections": {
"active_sse": self.active_sse_connections(),
"captures_processed": self.captures_processed()
},
"latency": {
"p50_ms": latency.p50_ms,
"p95_ms": latency.p95_ms,
"p99_ms": latency.p99_ms,
"mean_ms": latency.mean_ms,
"max_ms": latency.max_ms
}
})
}
pub fn reset_metrics(&self) {
self.captures_processed.store(0, Ordering::Relaxed);
self.active_sse_connections.store(0, Ordering::Relaxed);
self.total_requests.store(0, Ordering::Relaxed);
self.error_count.store(0, Ordering::Relaxed);
self.latency_histogram.reset();
}
}
impl Default for AppState {
fn default() -> Self {
Self::new()
}
}
fn collect_memory_metrics() -> MemoryMetrics {
let pid = Pid::from_u32(std::process::id());
let mut system = System::new();
system.refresh_processes(ProcessesToUpdate::Some(&[pid]), true);
match system.process(pid) {
Some(process) => MemoryMetrics {
rss_bytes: process.memory(),
virtual_bytes: process.virtual_memory(),
cpu_percent: None, },
None => {
debug!("Could not find current process in sysinfo");
MemoryMetrics::default()
}
}
}
#[instrument(skip_all)]
pub async fn health_handler() -> impl IntoResponse {
debug!("Health check requested");
(StatusCode::OK, Json(HealthResponse::default()))
}
#[instrument(skip_all)]
pub async fn status_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
debug!("Status check requested");
let memory = collect_memory_metrics();
let latency = state.latency_metrics();
let response = StatusResponse {
version: SERVER_VERSION.to_string(),
name: SERVER_NAME.to_string(),
uptime_seconds: state.uptime_seconds(),
captures_processed: state.captures_processed(),
active_sse_connections: state.active_sse_connections(),
memory,
latency,
status: "running".to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
};
(StatusCode::OK, Json(response))
}
#[instrument(skip_all)]
pub async fn readiness_handler() -> impl IntoResponse {
debug!("Readiness check requested");
(StatusCode::OK, Json(HealthResponse::default()))
}
pub fn status_router(state: Arc<AppState>) -> axum::Router {
use axum::routing::get;
axum::Router::new()
.route("/health", get(health_handler))
.route("/status", get(status_handler))
.route("/ready", get(readiness_handler))
.with_state(state)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_health_response_default() {
let health = HealthResponse::default();
assert_eq!(health.status, "healthy");
}
#[test]
fn test_app_state_new() {
let state = AppState::new();
assert_eq!(state.captures_processed(), 0);
assert_eq!(state.active_sse_connections(), 0);
assert!(state.uptime_seconds() < 1);
}
#[test]
fn test_app_state_capture_counter() {
let state = AppState::new();
assert_eq!(state.record_capture(), 1);
assert_eq!(state.record_capture(), 2);
assert_eq!(state.record_capture(), 3);
assert_eq!(state.captures_processed(), 3);
}
#[test]
fn test_app_state_sse_connections() {
let state = AppState::new();
assert_eq!(state.increment_sse_connections(), 1);
assert_eq!(state.increment_sse_connections(), 2);
assert_eq!(state.active_sse_connections(), 2);
assert_eq!(state.decrement_sse_connections(), 1);
assert_eq!(state.active_sse_connections(), 1);
assert_eq!(state.decrement_sse_connections(), 0);
assert_eq!(state.active_sse_connections(), 0);
assert_eq!(state.decrement_sse_connections(), 0);
assert_eq!(state.active_sse_connections(), 0);
}
#[test]
fn test_latency_histogram() {
let histogram = LatencyHistogram::new();
histogram.record(1000); histogram.record(2000); histogram.record(5000); histogram.record(10000); histogram.record(50000);
assert_eq!(histogram.count(), 5);
assert!(histogram.mean() > 0.0);
let max = histogram.max();
assert!(
(50000..=51000).contains(&max),
"max should be ~50000, got {max}"
);
let metrics = histogram.metrics();
assert!(metrics.p50_ms > 0.0);
assert!(metrics.p95_ms >= metrics.p50_ms);
assert!(metrics.p99_ms >= metrics.p95_ms);
}
#[test]
fn test_latency_histogram_reset() {
let histogram = LatencyHistogram::new();
histogram.record(1000);
histogram.record(2000);
assert_eq!(histogram.count(), 2);
histogram.reset();
assert_eq!(histogram.count(), 0);
}
#[test]
fn test_app_state_latency_recording() {
let state = AppState::new();
state.record_latency_us(5000); state.record_latency_us(10000);
assert_eq!(state.total_requests(), 2);
let metrics = state.latency_metrics();
assert!(metrics.total_requests == 2);
}
#[test]
fn test_app_state_error_tracking() {
let state = AppState::new();
assert_eq!(state.error_count(), 0);
assert_eq!(state.record_error(), 1);
assert_eq!(state.record_error(), 2);
assert_eq!(state.error_count(), 2);
}
#[test]
fn test_app_state_reset_metrics() {
let state = AppState::new();
state.record_capture();
state.increment_sse_connections();
state.record_latency_us(1000);
state.record_error();
state.reset_metrics();
assert_eq!(state.captures_processed(), 0);
assert_eq!(state.active_sse_connections(), 0);
assert_eq!(state.total_requests(), 0);
assert_eq!(state.error_count(), 0);
}
#[test]
fn test_memory_metrics_default() {
let metrics = MemoryMetrics::default();
assert_eq!(metrics.rss_bytes, 0);
assert_eq!(metrics.virtual_bytes, 0);
assert!(metrics.cpu_percent.is_none());
}
#[test]
fn test_latency_metrics_default() {
let metrics = LatencyMetrics::default();
assert_eq!(metrics.p50_ms, 0.0);
assert_eq!(metrics.p95_ms, 0.0);
assert_eq!(metrics.p99_ms, 0.0);
assert_eq!(metrics.total_requests, 0);
}
#[test]
fn test_collect_memory_metrics() {
let metrics = collect_memory_metrics();
assert!(metrics.rss_bytes > 0);
}
#[test]
fn test_status_response_serialization() {
let response = StatusResponse {
version: "0.1.0".to_string(),
name: "test-server".to_string(),
uptime_seconds: 3600,
captures_processed: 100,
active_sse_connections: 5,
memory: MemoryMetrics::default(),
latency: LatencyMetrics::default(),
status: "running".to_string(),
timestamp: "2026-01-01T00:00:00Z".to_string(),
};
let json = serde_json::to_string(&response).expect("Failed to serialize");
assert!(json.contains("\"version\":\"0.1.0\""));
assert!(json.contains("\"uptime_seconds\":3600"));
assert!(json.contains("\"status\":\"running\""));
}
#[test]
#[allow(clippy::const_is_empty)]
fn test_server_constants() {
assert!(!SERVER_VERSION.is_empty());
assert!(!SERVER_NAME.is_empty());
assert_eq!(SERVER_NAME, "reasonkit-web");
}
#[tokio::test]
async fn test_health_handler() {
let response = health_handler().await.into_response();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_status_handler() {
let state = Arc::new(AppState::new());
state.record_capture();
state.record_capture();
state.increment_sse_connections();
state.record_latency_us(5000);
let response = status_handler(State(state)).await.into_response();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_readiness_handler() {
let response = readiness_handler().await.into_response();
assert_eq!(response.status(), StatusCode::OK);
}
#[test]
fn test_app_state_thread_safety() {
use std::thread;
let state = Arc::new(AppState::new());
let mut handles = vec![];
for _ in 0..10 {
let state_clone = Arc::clone(&state);
handles.push(thread::spawn(move || {
for _ in 0..1000 {
state_clone.record_capture();
state_clone.increment_sse_connections();
state_clone.decrement_sse_connections();
state_clone.record_latency_us(1000);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert_eq!(state.captures_processed(), 10_000);
assert_eq!(state.total_requests(), 10_000);
assert_eq!(state.active_sse_connections(), 0);
}
}