Skip to main content

agentic_codebase/mcp/
sse.rs

1//! SSE transport — HTTP server with auth, multi-tenant routing, and /health.
2
3#[cfg(feature = "sse")]
4use std::sync::Arc;
5
6#[cfg(feature = "sse")]
7use axum::{
8    extract::State,
9    http::{HeaderMap, StatusCode},
10    middleware,
11    response::{IntoResponse, Json as AxumJson, Response},
12    routing::{get, post},
13    Router,
14};
15
16#[cfg(feature = "sse")]
17use tokio::sync::Mutex;
18
19#[cfg(feature = "sse")]
20use super::server::McpServer;
21#[cfg(feature = "sse")]
22use super::tenant::TenantRegistry;
23
24/// Server operating mode.
25#[cfg(feature = "sse")]
26pub enum ServerMode {
27    /// Single-user: one shared MCP server instance.
28    Single(Arc<Mutex<McpServer>>),
29    /// Multi-tenant: per-user graph files in a data directory.
30    MultiTenant {
31        data_dir: std::path::PathBuf,
32        registry: Arc<Mutex<TenantRegistry>>,
33    },
34}
35
36/// Shared server state passed to all handlers via axum State.
37#[cfg(feature = "sse")]
38pub struct ServerState {
39    pub token: Option<String>,
40    pub mode: ServerMode,
41}
42
43/// SSE transport for web-based MCP clients.
44#[cfg(feature = "sse")]
45pub struct SseTransport {
46    state: Arc<ServerState>,
47}
48
49#[cfg(feature = "sse")]
50impl SseTransport {
51    /// Create a single-user SSE transport.
52    pub fn new(server: McpServer) -> Self {
53        Self {
54            state: Arc::new(ServerState {
55                token: None,
56                mode: ServerMode::Single(Arc::new(Mutex::new(server))),
57            }),
58        }
59    }
60
61    /// Create an SSE transport with full configuration.
62    pub fn with_config(token: Option<String>, mode: ServerMode) -> Self {
63        Self {
64            state: Arc::new(ServerState { token, mode }),
65        }
66    }
67
68    /// Run the HTTP server on the given address.
69    pub async fn run(&self, addr: &str) -> crate::AcbResult<()> {
70        let state = self.state.clone();
71
72        let app = Router::new()
73            .route("/mcp", post(handle_request))
74            .layer(middleware::from_fn_with_state(state.clone(), auth_layer))
75            .route("/health", get(handle_health))
76            .with_state(state);
77
78        let listener = tokio::net::TcpListener::bind(addr)
79            .await
80            .map_err(crate::AcbError::Io)?;
81
82        tracing::info!("HTTP transport listening on {addr}");
83
84        axum::serve(listener, app)
85            .await
86            .map_err(|e| crate::AcbError::Io(std::io::Error::other(e.to_string())))?;
87
88        Ok(())
89    }
90}
91
92/// Auth middleware — checks Bearer token if configured.
93#[cfg(feature = "sse")]
94async fn auth_layer(
95    State(state): State<Arc<ServerState>>,
96    headers: HeaderMap,
97    request: axum::extract::Request,
98    next: middleware::Next,
99) -> Response {
100    if let Some(expected) = &state.token {
101        let authorized = headers
102            .get("authorization")
103            .and_then(|v| v.to_str().ok())
104            .and_then(|v| v.strip_prefix("Bearer "))
105            .is_some_and(|token| token == expected);
106
107        if !authorized {
108            return (
109                StatusCode::UNAUTHORIZED,
110                AxumJson(serde_json::json!({
111                    "jsonrpc": "2.0",
112                    "id": null,
113                    "error": {
114                        "code": -32900,
115                        "message": "Unauthorized"
116                    }
117                })),
118            )
119                .into_response();
120        }
121    }
122
123    next.run(request).await
124}
125
126/// Handle JSON-RPC requests. In multi-tenant mode, routes by X-User-ID header.
127#[cfg(feature = "sse")]
128async fn handle_request(
129    State(state): State<Arc<ServerState>>,
130    headers: HeaderMap,
131    AxumJson(body): AxumJson<serde_json::Value>,
132) -> Result<AxumJson<serde_json::Value>, Response> {
133    let raw = serde_json::to_string(&body).map_err(|_| {
134        (
135            StatusCode::BAD_REQUEST,
136            AxumJson(serde_json::json!({
137                "jsonrpc": "2.0",
138                "id": null,
139                "error": {
140                    "code": -32700,
141                    "message": "Parse error"
142                }
143            })),
144        )
145            .into_response()
146    })?;
147
148    let server = match &state.mode {
149        ServerMode::Single(server) => server.clone(),
150        ServerMode::MultiTenant { registry, .. } => {
151            let user_id = headers
152                .get("x-user-id")
153                .and_then(|v| v.to_str().ok())
154                .ok_or_else(|| {
155                    (
156                        StatusCode::BAD_REQUEST,
157                        AxumJson(serde_json::json!({
158                            "jsonrpc": "2.0",
159                            "id": null,
160                            "error": {
161                                "code": -32901,
162                                "message": "Missing X-User-ID header (required in multi-tenant mode)"
163                            }
164                        })),
165                    )
166                        .into_response()
167                })?;
168
169            let mut reg = registry.lock().await;
170            reg.get_or_create(user_id).map_err(|e| {
171                (
172                    StatusCode::INTERNAL_SERVER_ERROR,
173                    AxumJson(serde_json::json!({
174                        "jsonrpc": "2.0",
175                        "id": null,
176                        "error": {
177                            "code": -32603,
178                            "message": format!("Failed to open graph for user '{user_id}': {e}")
179                        }
180                    })),
181                )
182                    .into_response()
183            })?
184        }
185    };
186
187    let response_str = {
188        let mut srv = server.lock().await;
189        srv.handle_raw(&raw)
190    };
191
192    let response_value: serde_json::Value =
193        serde_json::from_str(&response_str).unwrap_or(serde_json::Value::Null);
194
195    Ok(AxumJson(response_value))
196}
197
198/// Health check endpoint — no auth required.
199#[cfg(feature = "sse")]
200async fn handle_health(State(state): State<Arc<ServerState>>) -> AxumJson<serde_json::Value> {
201    let profile = std::env::var("ACB_AUTONOMIC_PROFILE")
202        .unwrap_or_else(|_| "desktop".to_string())
203        .trim()
204        .to_ascii_lowercase();
205    let migration_policy = std::env::var("ACB_STORAGE_MIGRATION_POLICY")
206        .unwrap_or_else(|_| "auto-safe".to_string())
207        .trim()
208        .to_ascii_lowercase();
209    let ledger_dir = std::env::var("ACB_HEALTH_LEDGER_DIR")
210        .ok()
211        .or_else(|| std::env::var("AGENTRA_HEALTH_LEDGER_DIR").ok())
212        .unwrap_or_else(|| "~/.agentra/health-ledger".to_string());
213
214    let mut health = serde_json::json!({
215        "status": "ok",
216        "version": env!("CARGO_PKG_VERSION"),
217        "autonomic": {
218            "profile": profile,
219            "migration_policy": migration_policy,
220            "health_ledger_dir": ledger_dir,
221        }
222    });
223
224    if let ServerMode::MultiTenant { registry, .. } = &state.mode {
225        let reg = registry.lock().await;
226        health["tenants"] = serde_json::json!(reg.count());
227    }
228
229    AxumJson(health)
230}