1use axum::{
2 extract::State,
3 response::sse::{Event, Sse},
4 routing::{get, post},
5 Json, Router,
6};
7use futures::stream::{self, Stream};
8use serde_json::{json, Value};
9use std::sync::Arc;
10use tracing::debug;
11
12use crate::config::Config;
13use crate::pool::ConnectionPool;
14use crate::protocol::{JsonRpcRequest, JsonRpcResponse};
15
16#[derive(Clone)]
17pub struct HttpState {
18 pool: Arc<ConnectionPool>,
19 config: Config,
20}
21
22pub async fn create_http_server(
23 pool: Arc<ConnectionPool>,
24 config: Config,
25 port: u16,
26) -> Result<(), Box<dyn std::error::Error>> {
27 let http_state = HttpState {
28 pool,
29 config: config.clone(),
30 };
31
32 let app = Router::new()
33 .route("/rpc", post(handle_rpc))
34 .route("/subscribe", get(handle_subscribe))
35 .route("/health", get(handle_health))
36 .with_state(http_state);
37
38 let addr = format!("{}:{}", config.server.host, port);
39 let listener = tokio::net::TcpListener::bind(&addr).await?;
40
41 tracing::info!("HTTP/2 server listening on {}", addr);
42
43 axum::serve(listener, app).await?;
44
45 Ok(())
46}
47
48async fn handle_rpc(
50 State(state): State<HttpState>,
51 Json(req): Json<JsonRpcRequest>,
52) -> Json<JsonRpcResponse> {
53 debug!("HTTP RPC request: {:?}", req.method);
54
55 let response = crate::server::process_request_http(&req, &state.pool, &state.config).await;
57
58 Json(response)
59}
60
61async fn handle_subscribe(
63 State(_state): State<HttpState>,
64) -> Sse<impl Stream<Item = Result<Event, axum::Error>>> {
65 debug!("SSE subscription established");
66
67 let stream = stream::iter(vec![
70 Ok(Event::default()
71 .event("message")
72 .json_data(json!({
73 "jsonrpc": "2.0",
74 "result": {
75 "type": "subscription",
76 "message": "Connected to MCP server"
77 },
78 "id": null
79 }))
80 .expect("valid json")),
81 ]);
82
83 Sse::new(stream)
84}
85
86async fn handle_health() -> Json<Value> {
88 Json(json!({
89 "status": "healthy",
90 "service": "mcp-postgres",
91 "version": env!("CARGO_PKG_VERSION")
92 }))
93}