Skip to main content

mcp_postgres/
http.rs

1use axum::{
2    Json, Router,
3    extract::State,
4    http::{HeaderMap, StatusCode, header},
5    response::sse::{Event, Sse},
6    response::{IntoResponse, Response},
7    routing::{get, post},
8};
9use futures::stream::{self, Stream};
10use serde_json::{Value, json};
11use std::sync::Arc;
12use tracing::{debug, warn};
13
14use crate::config::Config;
15use crate::pool::ConnectionPool;
16use crate::protocol::JsonRpcRequest;
17
18#[derive(Clone)]
19pub struct HttpState {
20    pool: Arc<ConnectionPool>,
21    config: Config,
22}
23
24pub async fn create_http_server(
25    pool: Arc<ConnectionPool>,
26    config: Config,
27    port: u16,
28) -> Result<(), Box<dyn std::error::Error>> {
29    let http_state = HttpState {
30        pool,
31        config: config.clone(),
32    };
33
34    let app = Router::new()
35        .route("/rpc", post(handle_rpc))
36        .route("/subscribe", get(handle_subscribe))
37        .route("/health", get(handle_health))
38        .with_state(http_state);
39
40    let addr = format!("{}:{}", config.server.host, port);
41    let listener = tokio::net::TcpListener::bind(&addr).await?;
42
43    tracing::info!("HTTP/2 server listening on {}", addr);
44
45    axum::serve(listener, app).await?;
46
47    Ok(())
48}
49
50/// Check the `Authorization: Bearer <token>` header against the configured
51/// secret in constant time. Returns `true` when no token is configured.
52fn http_authorized(state: &HttpState, headers: &HeaderMap) -> bool {
53    let Some(ref token) = state.config.server.auth_token else {
54        return true;
55    };
56    headers
57        .get(header::AUTHORIZATION)
58        .and_then(|v| v.to_str().ok())
59        .and_then(|v| v.strip_prefix("Bearer "))
60        .map(|presented| crate::auth::verify_token(token, presented))
61        .unwrap_or(false)
62}
63
64fn unauthorized() -> Response {
65    (
66        StatusCode::UNAUTHORIZED,
67        Json(json!({
68            "jsonrpc": "2.0",
69            "error": { "code": -32600, "message": "Unauthorized" },
70            "id": null
71        })),
72    )
73        .into_response()
74}
75
76/// Handle JSON-RPC requests via HTTP POST
77async fn handle_rpc(
78    State(state): State<HttpState>,
79    headers: HeaderMap,
80    Json(req): Json<JsonRpcRequest>,
81) -> Response {
82    if !http_authorized(&state, &headers) {
83        warn!("HTTP RPC request rejected: unauthorized");
84        return unauthorized();
85    }
86
87    debug!("HTTP RPC request: {:?}", req.method);
88
89    // Reuse server request processing
90    let response = crate::server::process_request_http(&req, &state.pool, &state.config).await;
91
92    Json(response).into_response()
93}
94
95/// Handle SSE subscriptions
96async fn handle_subscribe(State(state): State<HttpState>, headers: HeaderMap) -> Response {
97    if !http_authorized(&state, &headers) {
98        warn!("SSE subscription rejected: unauthorized");
99        return unauthorized();
100    }
101    Sse::new(subscribe_stream()).into_response()
102}
103
104fn subscribe_stream() -> impl Stream<Item = Result<Event, axum::Error>> {
105    debug!("SSE subscription established");
106
107    // For now, just send a simple hello message
108    // In production, this would subscribe to query results or database changes
109    stream::iter(vec![Ok(Event::default()
110        .event("message")
111        .json_data(json!({
112            "jsonrpc": "2.0",
113            "result": {
114                "type": "subscription",
115                "message": "Connected to MCP server"
116            },
117            "id": null
118        }))
119        .expect("valid json"))])
120}
121
122/// Health check endpoint
123async fn handle_health() -> Json<Value> {
124    Json(json!({
125        "status": "healthy",
126        "service": "mcp-postgres",
127        "version": env!("CARGO_PKG_VERSION")
128    }))
129}