1use axum::{
2 Json, Router,
3 extract::State,
4 http::{HeaderMap, StatusCode, header},
5 response::sse::{Event, Sse},
6 response::{IntoResponse, Response},
7 routing::{get, post},
8};
9use futures::stream::{self, Stream};
10use serde_json::{Value, json};
11use std::sync::Arc;
12use tracing::{debug, warn};
13
14use crate::config::Config;
15use crate::pool::ConnectionPool;
16use crate::protocol::JsonRpcRequest;
17
18#[derive(Clone)]
19pub struct HttpState {
20 pool: Arc<ConnectionPool>,
21 config: Config,
22}
23
24pub async fn create_http_server(
25 pool: Arc<ConnectionPool>,
26 config: Config,
27 port: u16,
28) -> Result<(), Box<dyn std::error::Error>> {
29 let http_state = HttpState {
30 pool,
31 config: config.clone(),
32 };
33
34 let app = Router::new()
35 .route("/rpc", post(handle_rpc))
36 .route("/subscribe", get(handle_subscribe))
37 .route("/health", get(handle_health))
38 .with_state(http_state);
39
40 let addr = format!("{}:{}", config.server.host, port);
41 let listener = tokio::net::TcpListener::bind(&addr).await?;
42
43 tracing::info!("HTTP/2 server listening on {}", addr);
44
45 axum::serve(listener, app).await?;
46
47 Ok(())
48}
49
50fn http_authorized(state: &HttpState, headers: &HeaderMap) -> bool {
53 let Some(ref token) = state.config.server.auth_token else {
54 return true;
55 };
56 headers
57 .get(header::AUTHORIZATION)
58 .and_then(|v| v.to_str().ok())
59 .and_then(|v| v.strip_prefix("Bearer "))
60 .map(|presented| crate::auth::verify_token(token, presented))
61 .unwrap_or(false)
62}
63
64fn unauthorized() -> Response {
65 (
66 StatusCode::UNAUTHORIZED,
67 Json(json!({
68 "jsonrpc": "2.0",
69 "error": { "code": -32600, "message": "Unauthorized" },
70 "id": null
71 })),
72 )
73 .into_response()
74}
75
76async fn handle_rpc(
78 State(state): State<HttpState>,
79 headers: HeaderMap,
80 Json(req): Json<JsonRpcRequest>,
81) -> Response {
82 if !http_authorized(&state, &headers) {
83 warn!("HTTP RPC request rejected: unauthorized");
84 return unauthorized();
85 }
86
87 debug!("HTTP RPC request: {:?}", req.method);
88
89 let response = crate::server::process_request_http(&req, &state.pool, &state.config).await;
91
92 Json(response).into_response()
93}
94
95async fn handle_subscribe(State(state): State<HttpState>, headers: HeaderMap) -> Response {
97 if !http_authorized(&state, &headers) {
98 warn!("SSE subscription rejected: unauthorized");
99 return unauthorized();
100 }
101 Sse::new(subscribe_stream()).into_response()
102}
103
104fn subscribe_stream() -> impl Stream<Item = Result<Event, axum::Error>> {
105 debug!("SSE subscription established");
106
107 stream::iter(vec![Ok(Event::default()
110 .event("message")
111 .json_data(json!({
112 "jsonrpc": "2.0",
113 "result": {
114 "type": "subscription",
115 "message": "Connected to MCP server"
116 },
117 "id": null
118 }))
119 .expect("valid json"))])
120}
121
122async fn handle_health() -> Json<Value> {
124 Json(json!({
125 "status": "healthy",
126 "service": "mcp-postgres",
127 "version": env!("CARGO_PKG_VERSION")
128 }))
129}