1use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::SystemTime};
2
3use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
4use serde::Serialize;
5use tokio::sync::{broadcast, oneshot};
6use tracing::info;
7
8use crate::{ConnectionManager, connection::StatEvent};
9
10#[derive(Debug, Serialize)]
11struct HealthResponse {
12 status: &'static str,
13 tcp_connections: u32,
14 rtu_status: &'static str,
15}
16
17#[derive(Debug, Serialize)]
18struct IpStatsResponse {
19 active_connections: usize,
20 total_requests: u64,
21 total_errors: u64,
22 avg_response_time_ms: u64,
23 last_active: SystemTime,
24 last_error: Option<SystemTime>,
25}
26
27#[derive(Debug, Serialize)]
28struct StatsResponse {
29 total_connections: u64,
31 active_connections: u32,
32 total_requests: u64,
33 total_errors: u64,
34 requests_per_second: f64,
35 avg_response_time_ms: u64,
36
37 per_ip_stats: HashMap<SocketAddr, IpStatsResponse>,
39}
40
41type ApiState = Arc<ConnectionManager>;
42
43async fn health_handler(State(state): State<ApiState>) -> impl IntoResponse {
44 let (tx, rx) = oneshot::channel();
45
46 if (state
47 .stats_tx()
48 .send(StatEvent::QueryConnectionStats { response_tx: tx })
49 .await)
50 .is_err()
51 {
52 return (
53 StatusCode::INTERNAL_SERVER_ERROR,
54 Json(HealthResponse {
55 status: "error",
56 tcp_connections: 0,
57 rtu_status: "unknown",
58 }),
59 );
60 }
61
62 match rx.await {
63 Ok(stats) => {
64 (
65 StatusCode::OK,
66 Json(HealthResponse {
67 status: "ok",
68 tcp_connections: stats.active_connections as u32,
69 rtu_status: "ok", }),
71 )
72 }
73 Err(_) => (
74 StatusCode::INTERNAL_SERVER_ERROR,
75 Json(HealthResponse {
76 status: "error",
77 tcp_connections: 0,
78 rtu_status: "unknown",
79 }),
80 ),
81 }
82}
83
84async fn stats_handler(State(state): State<ApiState>) -> impl IntoResponse {
85 let (tx, rx) = oneshot::channel();
86
87 if (state
88 .stats_tx()
89 .send(StatEvent::QueryConnectionStats { response_tx: tx })
90 .await)
91 .is_err()
92 {
93 return (
94 StatusCode::INTERNAL_SERVER_ERROR,
95 Json(StatsResponse {
96 total_connections: 0,
97 active_connections: 0,
98 total_requests: 0,
99 total_errors: 0,
100 requests_per_second: 0.0,
101 avg_response_time_ms: 0,
102 per_ip_stats: HashMap::new(),
103 }),
104 );
105 }
106
107 match rx.await {
108 Ok(stats) => {
109 let per_ip_stats = stats
110 .per_ip_stats
111 .into_iter()
112 .map(|(addr, ip_stats)| {
113 (
114 addr,
115 IpStatsResponse {
116 active_connections: ip_stats.active_connections,
117 total_requests: ip_stats.total_requests,
118 total_errors: ip_stats.total_errors,
119 avg_response_time_ms: ip_stats.avg_response_time_ms,
120 last_active: ip_stats.last_active,
121 last_error: ip_stats.last_error,
122 },
123 )
124 })
125 .collect();
126
127 (
128 StatusCode::OK,
129 Json(StatsResponse {
130 total_connections: stats.total_connections,
131 active_connections: stats.active_connections as u32,
132 total_requests: stats.total_requests,
133 total_errors: stats.total_errors,
134 requests_per_second: stats.requests_per_second,
135 avg_response_time_ms: stats.avg_response_time_ms,
136 per_ip_stats,
137 }),
138 )
139 }
140 Err(_) => (
141 StatusCode::INTERNAL_SERVER_ERROR,
142 Json(StatsResponse {
143 total_connections: 0,
144 active_connections: 0,
145 total_requests: 0,
146 total_errors: 0,
147 requests_per_second: 0.0,
148 avg_response_time_ms: 0,
149 per_ip_stats: HashMap::new(),
150 }),
151 ),
152 }
153}
154
155pub async fn start_http_server(
156 address: String,
157 port: u16,
158 manager: Arc<ConnectionManager>,
159 mut shutdown_rx: broadcast::Receiver<()>,
160) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
161 let app = Router::new()
162 .route("/health", get(health_handler))
163 .route("/stats", get(stats_handler))
164 .with_state(manager);
165
166 let addr = format!("{}:{}", address, port);
167 let listener = tokio::net::TcpListener::bind(&addr).await?;
168
169 info!("HTTP server listening on {}", addr);
170
171 axum::serve(listener, app)
172 .with_graceful_shutdown(async move {
173 let _ = shutdown_rx.recv().await;
174 info!("HTTP server shutting down");
175 })
176 .await?;
177
178 info!("HTTP server shutdown complete");
179
180 Ok(())
181}
182
183#[cfg(test)]
184mod tests {
185 use crate::{ConnectionConfig, StatsManager};
186
187 use super::*;
188 use axum::body::Body;
189 use axum::http::Request;
190 use tokio::sync::Mutex;
191 use tower::ServiceExt;
192
193 #[tokio::test]
194 async fn test_health_endpoint() {
195 let config = ConnectionConfig::default();
197 let stats_config = crate::StatsConfig::default();
198 let (stats_manager, stats_tx) = StatsManager::new(stats_config);
199 let stats_manager = Arc::new(Mutex::new(stats_manager));
200
201 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
202
203 let stats_handle = tokio::spawn({
204 async move {
205 let mut stats_manager = stats_manager.lock().await;
206 stats_manager.run(shutdown_rx).await;
207 }
208 });
209
210 let manager = Arc::new(ConnectionManager::new(config, stats_tx));
211
212 let app = Router::new()
214 .route("/health", get(health_handler))
215 .with_state(manager);
216
217 let req = Request::builder()
219 .uri("/health")
220 .body(Body::empty())
221 .unwrap();
222
223 let response = app.oneshot(req).await.unwrap();
225
226 assert_eq!(response.status(), StatusCode::OK);
227
228 shutdown_tx.send(true).unwrap();
229 stats_handle.await.unwrap();
230 }
231
232 #[tokio::test]
233 async fn test_stats_endpoint() {
234 let config = ConnectionConfig::default();
235 let stats_config = crate::StatsConfig::default();
236 let (stats_manager, stats_tx) = StatsManager::new(stats_config);
237 let stats_manager = Arc::new(Mutex::new(stats_manager));
238
239 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
240
241 let stats_handle = tokio::spawn({
242 async move {
243 let mut stats_manager = stats_manager.lock().await;
244 stats_manager.run(shutdown_rx).await;
245 }
246 });
247
248 let manager = Arc::new(ConnectionManager::new(config, stats_tx));
249
250 let app = Router::new()
251 .route("/stats", get(stats_handler))
252 .with_state(manager);
253
254 let req = Request::builder()
255 .uri("/stats")
256 .body(Body::empty())
257 .unwrap();
258
259 let response = app.oneshot(req).await.unwrap();
260
261 assert_eq!(response.status(), StatusCode::OK);
262
263 shutdown_tx.send(true).unwrap();
264 stats_handle.await.unwrap();
265 }
266}