Skip to main content

mcp_postgres/
http.rs

1use axum::{
2    Json, Router,
3    extract::State,
4    response::sse::{Event, Sse},
5    routing::{get, post},
6};
7use futures::stream::{self, Stream};
8use serde_json::{Value, json};
9use std::sync::Arc;
10use tracing::debug;
11
12use crate::config::Config;
13use crate::pool::ConnectionPool;
14use crate::protocol::{JsonRpcRequest, JsonRpcResponse};
15
16#[derive(Clone)]
17pub struct HttpState {
18    pool: Arc<ConnectionPool>,
19    config: Config,
20}
21
22pub async fn create_http_server(
23    pool: Arc<ConnectionPool>,
24    config: Config,
25    port: u16,
26) -> Result<(), Box<dyn std::error::Error>> {
27    let http_state = HttpState {
28        pool,
29        config: config.clone(),
30    };
31
32    let app = Router::new()
33        .route("/rpc", post(handle_rpc))
34        .route("/subscribe", get(handle_subscribe))
35        .route("/health", get(handle_health))
36        .with_state(http_state);
37
38    let addr = format!("{}:{}", config.server.host, port);
39    let listener = tokio::net::TcpListener::bind(&addr).await?;
40
41    tracing::info!("HTTP/2 server listening on {}", addr);
42
43    axum::serve(listener, app).await?;
44
45    Ok(())
46}
47
48/// Handle JSON-RPC requests via HTTP POST
49async fn handle_rpc(
50    State(state): State<HttpState>,
51    Json(req): Json<JsonRpcRequest>,
52) -> Json<JsonRpcResponse> {
53    debug!("HTTP RPC request: {:?}", req.method);
54
55    // Reuse server request processing
56    let response = crate::server::process_request_http(&req, &state.pool, &state.config).await;
57
58    Json(response)
59}
60
61/// Handle SSE subscriptions
62async fn handle_subscribe(
63    State(_state): State<HttpState>,
64) -> Sse<impl Stream<Item = Result<Event, axum::Error>>> {
65    debug!("SSE subscription established");
66
67    // For now, just send a simple hello message
68    // In production, this would subscribe to query results or database changes
69    let stream = stream::iter(vec![Ok(Event::default()
70        .event("message")
71        .json_data(json!({
72            "jsonrpc": "2.0",
73            "result": {
74                "type": "subscription",
75                "message": "Connected to MCP server"
76            },
77            "id": null
78        }))
79        .expect("valid json"))]);
80
81    Sse::new(stream)
82}
83
84/// Health check endpoint
85async fn handle_health() -> Json<Value> {
86    Json(json!({
87        "status": "healthy",
88        "service": "mcp-postgres",
89        "version": env!("CARGO_PKG_VERSION")
90    }))
91}