Skip to main content

mcp_postgres/
http.rs

1use axum::{
2    extract::State,
3    response::sse::{Event, Sse},
4    routing::{get, post},
5    Json, Router,
6};
7use futures::stream::{self, Stream};
8use serde_json::{json, Value};
9use std::sync::Arc;
10use tracing::debug;
11
12use crate::config::Config;
13use crate::pool::ConnectionPool;
14use crate::protocol::{JsonRpcRequest, JsonRpcResponse};
15
16#[derive(Clone)]
17pub struct HttpState {
18    pool: Arc<ConnectionPool>,
19    config: Config,
20}
21
22pub async fn create_http_server(
23    pool: Arc<ConnectionPool>,
24    config: Config,
25    port: u16,
26) -> Result<(), Box<dyn std::error::Error>> {
27    let http_state = HttpState {
28        pool,
29        config: config.clone(),
30    };
31
32    let app = Router::new()
33        .route("/rpc", post(handle_rpc))
34        .route("/subscribe", get(handle_subscribe))
35        .route("/health", get(handle_health))
36        .with_state(http_state);
37
38    let addr = format!("{}:{}", config.server.host, port);
39    let listener = tokio::net::TcpListener::bind(&addr).await?;
40
41    tracing::info!("HTTP/2 server listening on {}", addr);
42
43    axum::serve(listener, app).await?;
44
45    Ok(())
46}
47
48/// Handle JSON-RPC requests via HTTP POST
49async fn handle_rpc(
50    State(state): State<HttpState>,
51    Json(req): Json<JsonRpcRequest>,
52) -> Json<JsonRpcResponse> {
53    debug!("HTTP RPC request: {:?}", req.method);
54
55    // Reuse server request processing
56    let response = crate::server::process_request_http(&req, &state.pool, &state.config).await;
57
58    Json(response)
59}
60
61/// Handle SSE subscriptions
62async fn handle_subscribe(
63    State(_state): State<HttpState>,
64) -> Sse<impl Stream<Item = Result<Event, axum::Error>>> {
65    debug!("SSE subscription established");
66
67    // For now, just send a simple hello message
68    // In production, this would subscribe to query results or database changes
69    let stream = stream::iter(vec![
70        Ok(Event::default()
71            .event("message")
72            .json_data(json!({
73                "jsonrpc": "2.0",
74                "result": {
75                    "type": "subscription",
76                    "message": "Connected to MCP server"
77                },
78                "id": null
79            }))
80            .expect("valid json")),
81    ]);
82
83    Sse::new(stream)
84}
85
86/// Health check endpoint
87async fn handle_health() -> Json<Value> {
88    Json(json!({
89        "status": "healthy",
90        "service": "mcp-postgres",
91        "version": env!("CARGO_PKG_VERSION")
92    }))
93}