modbus_relay/
http_api.rs

1use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::SystemTime};
2
3use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
4use serde::Serialize;
5use tokio::sync::{broadcast, oneshot};
6use tracing::info;
7
8use crate::{ConnectionManager, connection::StatEvent};
9
10#[derive(Debug, Serialize)]
11struct HealthResponse {
12    status: &'static str,
13    tcp_connections: u32,
14    rtu_status: &'static str,
15}
16
17#[derive(Debug, Serialize)]
18struct IpStatsResponse {
19    active_connections: usize,
20    total_requests: u64,
21    total_errors: u64,
22    avg_response_time_ms: u64,
23    last_active: SystemTime,
24    last_error: Option<SystemTime>,
25}
26
27#[derive(Debug, Serialize)]
28struct StatsResponse {
29    // Basic stats
30    total_connections: u64,
31    active_connections: u32,
32    total_requests: u64,
33    total_errors: u64,
34    requests_per_second: f64,
35    avg_response_time_ms: u64,
36
37    // Stats per IP
38    per_ip_stats: HashMap<SocketAddr, IpStatsResponse>,
39}
40
41type ApiState = Arc<ConnectionManager>;
42
43async fn health_handler(State(state): State<ApiState>) -> impl IntoResponse {
44    let (tx, rx) = oneshot::channel();
45
46    if (state
47        .stats_tx()
48        .send(StatEvent::QueryConnectionStats { response_tx: tx })
49        .await)
50        .is_err()
51    {
52        return (
53            StatusCode::INTERNAL_SERVER_ERROR,
54            Json(HealthResponse {
55                status: "error",
56                tcp_connections: 0,
57                rtu_status: "unknown",
58            }),
59        );
60    }
61
62    match rx.await {
63        Ok(stats) => {
64            (
65                StatusCode::OK,
66                Json(HealthResponse {
67                    status: "ok",
68                    tcp_connections: stats.active_connections as u32,
69                    rtu_status: "ok", // TODO(aljen): Implement RTU status check
70                }),
71            )
72        }
73        Err(_) => (
74            StatusCode::INTERNAL_SERVER_ERROR,
75            Json(HealthResponse {
76                status: "error",
77                tcp_connections: 0,
78                rtu_status: "unknown",
79            }),
80        ),
81    }
82}
83
84async fn stats_handler(State(state): State<ApiState>) -> impl IntoResponse {
85    let (tx, rx) = oneshot::channel();
86
87    if (state
88        .stats_tx()
89        .send(StatEvent::QueryConnectionStats { response_tx: tx })
90        .await)
91        .is_err()
92    {
93        return (
94            StatusCode::INTERNAL_SERVER_ERROR,
95            Json(StatsResponse {
96                total_connections: 0,
97                active_connections: 0,
98                total_requests: 0,
99                total_errors: 0,
100                requests_per_second: 0.0,
101                avg_response_time_ms: 0,
102                per_ip_stats: HashMap::new(),
103            }),
104        );
105    }
106
107    match rx.await {
108        Ok(stats) => {
109            let per_ip_stats = stats
110                .per_ip_stats
111                .into_iter()
112                .map(|(addr, ip_stats)| {
113                    (
114                        addr,
115                        IpStatsResponse {
116                            active_connections: ip_stats.active_connections,
117                            total_requests: ip_stats.total_requests,
118                            total_errors: ip_stats.total_errors,
119                            avg_response_time_ms: ip_stats.avg_response_time_ms,
120                            last_active: ip_stats.last_active,
121                            last_error: ip_stats.last_error,
122                        },
123                    )
124                })
125                .collect();
126
127            (
128                StatusCode::OK,
129                Json(StatsResponse {
130                    total_connections: stats.total_connections,
131                    active_connections: stats.active_connections as u32,
132                    total_requests: stats.total_requests,
133                    total_errors: stats.total_errors,
134                    requests_per_second: stats.requests_per_second,
135                    avg_response_time_ms: stats.avg_response_time_ms,
136                    per_ip_stats,
137                }),
138            )
139        }
140        Err(_) => (
141            StatusCode::INTERNAL_SERVER_ERROR,
142            Json(StatsResponse {
143                total_connections: 0,
144                active_connections: 0,
145                total_requests: 0,
146                total_errors: 0,
147                requests_per_second: 0.0,
148                avg_response_time_ms: 0,
149                per_ip_stats: HashMap::new(),
150            }),
151        ),
152    }
153}
154
155pub async fn start_http_server(
156    address: String,
157    port: u16,
158    manager: Arc<ConnectionManager>,
159    mut shutdown_rx: broadcast::Receiver<()>,
160) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
161    let app = Router::new()
162        .route("/health", get(health_handler))
163        .route("/stats", get(stats_handler))
164        .with_state(manager);
165
166    let addr = format!("{}:{}", address, port);
167    let listener = tokio::net::TcpListener::bind(&addr).await?;
168
169    info!("HTTP server listening on {}", addr);
170
171    axum::serve(listener, app)
172        .with_graceful_shutdown(async move {
173            let _ = shutdown_rx.recv().await;
174            info!("HTTP server shutting down");
175        })
176        .await?;
177
178    info!("HTTP server shutdown complete");
179
180    Ok(())
181}
182
183#[cfg(test)]
184mod tests {
185    use crate::{ConnectionConfig, StatsManager};
186
187    use super::*;
188    use axum::body::Body;
189    use axum::http::Request;
190    use tokio::sync::Mutex;
191    use tower::ServiceExt;
192
193    #[tokio::test]
194    async fn test_health_endpoint() {
195        // Create a test stats manager
196        let config = ConnectionConfig::default();
197        let stats_config = crate::StatsConfig::default();
198        let (stats_manager, stats_tx) = StatsManager::new(stats_config);
199        let stats_manager = Arc::new(Mutex::new(stats_manager));
200
201        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
202
203        let stats_handle = tokio::spawn({
204            async move {
205                let mut stats_manager = stats_manager.lock().await;
206                stats_manager.run(shutdown_rx).await;
207            }
208        });
209
210        let manager = Arc::new(ConnectionManager::new(config, stats_tx));
211
212        // Build test app
213        let app = Router::new()
214            .route("/health", get(health_handler))
215            .with_state(manager);
216
217        // Create test request
218        let req = Request::builder()
219            .uri("/health")
220            .body(Body::empty())
221            .unwrap();
222
223        // Get response
224        let response = app.oneshot(req).await.unwrap();
225
226        assert_eq!(response.status(), StatusCode::OK);
227
228        shutdown_tx.send(true).unwrap();
229        stats_handle.await.unwrap();
230    }
231
232    #[tokio::test]
233    async fn test_stats_endpoint() {
234        let config = ConnectionConfig::default();
235        let stats_config = crate::StatsConfig::default();
236        let (stats_manager, stats_tx) = StatsManager::new(stats_config);
237        let stats_manager = Arc::new(Mutex::new(stats_manager));
238
239        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
240
241        let stats_handle = tokio::spawn({
242            async move {
243                let mut stats_manager = stats_manager.lock().await;
244                stats_manager.run(shutdown_rx).await;
245            }
246        });
247
248        let manager = Arc::new(ConnectionManager::new(config, stats_tx));
249
250        let app = Router::new()
251            .route("/stats", get(stats_handler))
252            .with_state(manager);
253
254        let req = Request::builder()
255            .uri("/stats")
256            .body(Body::empty())
257            .unwrap();
258
259        let response = app.oneshot(req).await.unwrap();
260
261        assert_eq!(response.status(), StatusCode::OK);
262
263        shutdown_tx.send(true).unwrap();
264        stats_handle.await.unwrap();
265    }
266}