1use std::sync::atomic::{AtomicU64, Ordering};
41use std::sync::Arc;
42use std::time::Instant;
43
44use axum::extract::State;
45use axum::http::StatusCode;
46use axum::response::IntoResponse;
47use axum::Json;
48use hdrhistogram::Histogram;
49use parking_lot::RwLock;
50use serde::{Deserialize, Serialize};
51use sysinfo::{Pid, ProcessesToUpdate, System};
52use tracing::{debug, instrument};
53
54pub const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
56
57pub const SERVER_NAME: &str = env!("CARGO_PKG_NAME");
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct HealthResponse {
69 pub status: String,
71}
72
73impl Default for HealthResponse {
74 fn default() -> Self {
75 Self {
76 status: "healthy".to_string(),
77 }
78 }
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct StatusResponse {
87 pub version: String,
89
90 pub name: String,
92
93 pub uptime_seconds: u64,
95
96 pub captures_processed: u64,
98
99 pub active_sse_connections: u64,
101
102 pub memory: MemoryMetrics,
104
105 pub latency: LatencyMetrics,
107
108 pub status: String,
110
111 pub timestamp: String,
113}
114
115#[derive(Debug, Clone, Default, Serialize, Deserialize)]
117pub struct MemoryMetrics {
118 pub rss_bytes: u64,
120
121 pub virtual_bytes: u64,
123
124 #[serde(skip_serializing_if = "Option::is_none")]
126 pub cpu_percent: Option<f32>,
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct LatencyMetrics {
132 pub p50_ms: f64,
134
135 pub p95_ms: f64,
137
138 pub p99_ms: f64,
140
141 pub total_requests: u64,
143
144 pub mean_ms: f64,
146
147 pub max_ms: f64,
149}
150
151impl Default for LatencyMetrics {
152 fn default() -> Self {
153 Self {
154 p50_ms: 0.0,
155 p95_ms: 0.0,
156 p99_ms: 0.0,
157 total_requests: 0,
158 mean_ms: 0.0,
159 max_ms: 0.0,
160 }
161 }
162}
163
164#[derive(Debug)]
174pub struct LatencyHistogram {
175 inner: RwLock<Histogram<u64>>,
177}
178
179impl LatencyHistogram {
180 pub fn new() -> Self {
184 let histogram =
186 Histogram::new_with_bounds(1, 60_000_000, 3).expect("Failed to create histogram");
187 Self {
188 inner: RwLock::new(histogram),
189 }
190 }
191
192 pub fn record(&self, latency_us: u64) {
196 let mut hist = self.inner.write();
197 let _ = hist.record(latency_us);
199 }
200
201 pub fn record_duration(&self, duration: std::time::Duration) {
205 self.record(duration.as_micros() as u64);
206 }
207
208 pub fn percentile(&self, percentile: f64) -> u64 {
216 let hist = self.inner.read();
217 hist.value_at_percentile(percentile)
218 }
219
220 pub fn count(&self) -> u64 {
222 let hist = self.inner.read();
223 hist.len()
224 }
225
226 pub fn mean(&self) -> f64 {
228 let hist = self.inner.read();
229 hist.mean()
230 }
231
232 pub fn max(&self) -> u64 {
234 let hist = self.inner.read();
235 hist.max()
236 }
237
238 pub fn metrics(&self) -> LatencyMetrics {
242 let hist = self.inner.read();
243 LatencyMetrics {
244 p50_ms: hist.value_at_percentile(50.0) as f64 / 1000.0,
245 p95_ms: hist.value_at_percentile(95.0) as f64 / 1000.0,
246 p99_ms: hist.value_at_percentile(99.0) as f64 / 1000.0,
247 total_requests: hist.len(),
248 mean_ms: hist.mean() / 1000.0,
249 max_ms: hist.max() as f64 / 1000.0,
250 }
251 }
252
253 pub fn reset(&self) {
255 let mut hist = self.inner.write();
256 hist.reset();
257 }
258}
259
260impl Default for LatencyHistogram {
261 fn default() -> Self {
262 Self::new()
263 }
264}
265
266#[derive(Debug)]
302pub struct AppState {
303 start_time: Instant,
305
306 captures_processed: AtomicU64,
308
309 active_sse_connections: AtomicU64,
311
312 latency_histogram: LatencyHistogram,
314
315 total_requests: AtomicU64,
317
318 error_count: AtomicU64,
320}
321
322impl AppState {
323 pub fn new() -> Self {
327 Self {
328 start_time: Instant::now(),
329 captures_processed: AtomicU64::new(0),
330 active_sse_connections: AtomicU64::new(0),
331 latency_histogram: LatencyHistogram::new(),
332 total_requests: AtomicU64::new(0),
333 error_count: AtomicU64::new(0),
334 }
335 }
336
337 #[inline]
339 pub fn uptime_seconds(&self) -> u64 {
340 self.start_time.elapsed().as_secs()
341 }
342
343 #[inline]
345 pub fn start_time(&self) -> Instant {
346 self.start_time
347 }
348
349 #[inline]
351 pub fn captures_processed(&self) -> u64 {
352 self.captures_processed.load(Ordering::Relaxed)
353 }
354
355 #[inline]
357 pub fn record_capture(&self) -> u64 {
358 self.captures_processed.fetch_add(1, Ordering::Relaxed) + 1
359 }
360
361 #[inline]
363 pub fn active_sse_connections(&self) -> u64 {
364 self.active_sse_connections.load(Ordering::Relaxed)
365 }
366
367 #[inline]
369 pub fn increment_sse_connections(&self) -> u64 {
370 self.active_sse_connections.fetch_add(1, Ordering::Relaxed) + 1
371 }
372
373 #[inline]
377 pub fn decrement_sse_connections(&self) -> u64 {
378 loop {
380 let current = self.active_sse_connections.load(Ordering::Relaxed);
381 if current == 0 {
382 return 0;
383 }
384 match self.active_sse_connections.compare_exchange_weak(
385 current,
386 current - 1,
387 Ordering::Relaxed,
388 Ordering::Relaxed,
389 ) {
390 Ok(_) => return current - 1,
391 Err(_) => continue,
392 }
393 }
394 }
395
396 #[inline]
398 pub fn record_latency_us(&self, latency_us: u64) {
399 self.latency_histogram.record(latency_us);
400 self.total_requests.fetch_add(1, Ordering::Relaxed);
401 }
402
403 #[inline]
405 pub fn record_latency(&self, duration: std::time::Duration) {
406 self.latency_histogram.record_duration(duration);
407 self.total_requests.fetch_add(1, Ordering::Relaxed);
408 }
409
410 #[inline]
412 pub fn latency_metrics(&self) -> LatencyMetrics {
413 self.latency_histogram.metrics()
414 }
415
416 #[inline]
418 pub fn total_requests(&self) -> u64 {
419 self.total_requests.load(Ordering::Relaxed)
420 }
421
422 #[inline]
424 pub fn record_error(&self) -> u64 {
425 self.error_count.fetch_add(1, Ordering::Relaxed) + 1
426 }
427
428 #[inline]
430 pub fn error_count(&self) -> u64 {
431 self.error_count.load(Ordering::Relaxed)
432 }
433
434 pub fn reset_metrics(&self) {
436 self.captures_processed.store(0, Ordering::Relaxed);
437 self.active_sse_connections.store(0, Ordering::Relaxed);
438 self.total_requests.store(0, Ordering::Relaxed);
439 self.error_count.store(0, Ordering::Relaxed);
440 self.latency_histogram.reset();
441 }
442}
443
444impl Default for AppState {
445 fn default() -> Self {
446 Self::new()
447 }
448}
449
450fn collect_memory_metrics() -> MemoryMetrics {
459 let pid = Pid::from_u32(std::process::id());
460 let mut system = System::new();
461
462 system.refresh_processes(ProcessesToUpdate::Some(&[pid]), true);
465
466 match system.process(pid) {
467 Some(process) => MemoryMetrics {
468 rss_bytes: process.memory(),
469 virtual_bytes: process.virtual_memory(),
470 cpu_percent: None, },
472 None => {
473 debug!("Could not find current process in sysinfo");
474 MemoryMetrics::default()
475 }
476 }
477}
478
479#[instrument(skip_all)]
501pub async fn health_handler() -> impl IntoResponse {
502 debug!("Health check requested");
503 (StatusCode::OK, Json(HealthResponse::default()))
504}
505
506#[instrument(skip_all)]
550pub async fn status_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
551 debug!("Status check requested");
552
553 let memory = collect_memory_metrics();
554 let latency = state.latency_metrics();
555
556 let response = StatusResponse {
557 version: SERVER_VERSION.to_string(),
558 name: SERVER_NAME.to_string(),
559 uptime_seconds: state.uptime_seconds(),
560 captures_processed: state.captures_processed(),
561 active_sse_connections: state.active_sse_connections(),
562 memory,
563 latency,
564 status: "running".to_string(),
565 timestamp: chrono::Utc::now().to_rfc3339(),
566 };
567
568 (StatusCode::OK, Json(response))
569}
570
571#[instrument(skip_all)]
584pub async fn readiness_handler() -> impl IntoResponse {
585 debug!("Readiness check requested");
586 (StatusCode::OK, Json(HealthResponse::default()))
588}
589
590pub fn status_router(state: Arc<AppState>) -> axum::Router<Arc<AppState>> {
620 use axum::routing::get;
621
622 axum::Router::new()
623 .route("/health", get(health_handler))
624 .route("/status", get(status_handler))
625 .route("/ready", get(readiness_handler))
626 .with_state(state)
627}
628
629#[cfg(test)]
634mod tests {
635 use super::*;
636
637 #[test]
638 fn test_health_response_default() {
639 let health = HealthResponse::default();
640 assert_eq!(health.status, "healthy");
641 }
642
643 #[test]
644 fn test_app_state_new() {
645 let state = AppState::new();
646 assert_eq!(state.captures_processed(), 0);
647 assert_eq!(state.active_sse_connections(), 0);
648 assert!(state.uptime_seconds() < 1);
649 }
650
651 #[test]
652 fn test_app_state_capture_counter() {
653 let state = AppState::new();
654
655 assert_eq!(state.record_capture(), 1);
656 assert_eq!(state.record_capture(), 2);
657 assert_eq!(state.record_capture(), 3);
658 assert_eq!(state.captures_processed(), 3);
659 }
660
661 #[test]
662 fn test_app_state_sse_connections() {
663 let state = AppState::new();
664
665 assert_eq!(state.increment_sse_connections(), 1);
666 assert_eq!(state.increment_sse_connections(), 2);
667 assert_eq!(state.active_sse_connections(), 2);
668
669 assert_eq!(state.decrement_sse_connections(), 1);
670 assert_eq!(state.active_sse_connections(), 1);
671
672 assert_eq!(state.decrement_sse_connections(), 0);
673 assert_eq!(state.active_sse_connections(), 0);
674
675 assert_eq!(state.decrement_sse_connections(), 0);
677 assert_eq!(state.active_sse_connections(), 0);
678 }
679
680 #[test]
681 fn test_latency_histogram() {
682 let histogram = LatencyHistogram::new();
683
684 histogram.record(1000); histogram.record(2000); histogram.record(5000); histogram.record(10000); histogram.record(50000); assert_eq!(histogram.count(), 5);
691 assert!(histogram.mean() > 0.0);
692 let max = histogram.max();
694 assert!(
695 (50000..=51000).contains(&max),
696 "max should be ~50000, got {max}"
697 );
698
699 let metrics = histogram.metrics();
700 assert!(metrics.p50_ms > 0.0);
701 assert!(metrics.p95_ms >= metrics.p50_ms);
702 assert!(metrics.p99_ms >= metrics.p95_ms);
703 }
704
705 #[test]
706 fn test_latency_histogram_reset() {
707 let histogram = LatencyHistogram::new();
708
709 histogram.record(1000);
710 histogram.record(2000);
711 assert_eq!(histogram.count(), 2);
712
713 histogram.reset();
714 assert_eq!(histogram.count(), 0);
715 }
716
717 #[test]
718 fn test_app_state_latency_recording() {
719 let state = AppState::new();
720
721 state.record_latency_us(5000); state.record_latency_us(10000); assert_eq!(state.total_requests(), 2);
725
726 let metrics = state.latency_metrics();
727 assert!(metrics.total_requests == 2);
728 }
729
730 #[test]
731 fn test_app_state_error_tracking() {
732 let state = AppState::new();
733
734 assert_eq!(state.error_count(), 0);
735 assert_eq!(state.record_error(), 1);
736 assert_eq!(state.record_error(), 2);
737 assert_eq!(state.error_count(), 2);
738 }
739
740 #[test]
741 fn test_app_state_reset_metrics() {
742 let state = AppState::new();
743
744 state.record_capture();
745 state.increment_sse_connections();
746 state.record_latency_us(1000);
747 state.record_error();
748
749 state.reset_metrics();
750
751 assert_eq!(state.captures_processed(), 0);
752 assert_eq!(state.active_sse_connections(), 0);
753 assert_eq!(state.total_requests(), 0);
754 assert_eq!(state.error_count(), 0);
755 }
756
757 #[test]
758 fn test_memory_metrics_default() {
759 let metrics = MemoryMetrics::default();
760 assert_eq!(metrics.rss_bytes, 0);
761 assert_eq!(metrics.virtual_bytes, 0);
762 assert!(metrics.cpu_percent.is_none());
763 }
764
765 #[test]
766 fn test_latency_metrics_default() {
767 let metrics = LatencyMetrics::default();
768 assert_eq!(metrics.p50_ms, 0.0);
769 assert_eq!(metrics.p95_ms, 0.0);
770 assert_eq!(metrics.p99_ms, 0.0);
771 assert_eq!(metrics.total_requests, 0);
772 }
773
774 #[test]
775 fn test_collect_memory_metrics() {
776 let metrics = collect_memory_metrics();
778 assert!(metrics.rss_bytes > 0);
780 }
781
782 #[test]
783 fn test_status_response_serialization() {
784 let response = StatusResponse {
785 version: "0.1.0".to_string(),
786 name: "test-server".to_string(),
787 uptime_seconds: 3600,
788 captures_processed: 100,
789 active_sse_connections: 5,
790 memory: MemoryMetrics::default(),
791 latency: LatencyMetrics::default(),
792 status: "running".to_string(),
793 timestamp: "2026-01-01T00:00:00Z".to_string(),
794 };
795
796 let json = serde_json::to_string(&response).expect("Failed to serialize");
797 assert!(json.contains("\"version\":\"0.1.0\""));
798 assert!(json.contains("\"uptime_seconds\":3600"));
799 assert!(json.contains("\"status\":\"running\""));
800 }
801
802 #[test]
803 fn test_server_constants() {
804 assert!(!SERVER_VERSION.is_empty());
805 assert!(!SERVER_NAME.is_empty());
806 assert_eq!(SERVER_NAME, "reasonkit-web");
807 }
808
809 #[tokio::test]
810 async fn test_health_handler() {
811 let response = health_handler().await.into_response();
812 assert_eq!(response.status(), StatusCode::OK);
813 }
814
815 #[tokio::test]
816 async fn test_status_handler() {
817 let state = Arc::new(AppState::new());
818
819 state.record_capture();
821 state.record_capture();
822 state.increment_sse_connections();
823 state.record_latency_us(5000);
824
825 let response = status_handler(State(state)).await.into_response();
826 assert_eq!(response.status(), StatusCode::OK);
827 }
828
829 #[tokio::test]
830 async fn test_readiness_handler() {
831 let response = readiness_handler().await.into_response();
832 assert_eq!(response.status(), StatusCode::OK);
833 }
834
835 #[test]
837 fn test_app_state_thread_safety() {
838 use std::thread;
839
840 let state = Arc::new(AppState::new());
841 let mut handles = vec![];
842
843 for _ in 0..10 {
845 let state_clone = Arc::clone(&state);
846 handles.push(thread::spawn(move || {
847 for _ in 0..1000 {
848 state_clone.record_capture();
849 state_clone.increment_sse_connections();
850 state_clone.decrement_sse_connections();
851 state_clone.record_latency_us(1000);
852 }
853 }));
854 }
855
856 for handle in handles {
857 handle.join().expect("Thread panicked");
858 }
859
860 assert_eq!(state.captures_processed(), 10_000);
862 assert_eq!(state.total_requests(), 10_000);
864 assert_eq!(state.active_sse_connections(), 0);
866 }
867}