agentic_codebase/mcp/
sse.rs1#[cfg(feature = "sse")]
4use std::sync::Arc;
5
6#[cfg(feature = "sse")]
7use axum::{
8 extract::State,
9 http::{HeaderMap, StatusCode},
10 middleware,
11 response::{IntoResponse, Json as AxumJson, Response},
12 routing::{get, post},
13 Router,
14};
15
16#[cfg(feature = "sse")]
17use tokio::sync::Mutex;
18
19#[cfg(feature = "sse")]
20use super::server::McpServer;
21#[cfg(feature = "sse")]
22use super::tenant::TenantRegistry;
23
24#[cfg(feature = "sse")]
26pub enum ServerMode {
27 Single(Arc<Mutex<McpServer>>),
29 MultiTenant {
31 data_dir: std::path::PathBuf,
32 registry: Arc<Mutex<TenantRegistry>>,
33 },
34}
35
36#[cfg(feature = "sse")]
38pub struct ServerState {
39 pub token: Option<String>,
40 pub mode: ServerMode,
41}
42
43#[cfg(feature = "sse")]
45pub struct SseTransport {
46 state: Arc<ServerState>,
47}
48
49#[cfg(feature = "sse")]
50impl SseTransport {
51 pub fn new(server: McpServer) -> Self {
53 Self {
54 state: Arc::new(ServerState {
55 token: None,
56 mode: ServerMode::Single(Arc::new(Mutex::new(server))),
57 }),
58 }
59 }
60
61 pub fn with_config(token: Option<String>, mode: ServerMode) -> Self {
63 Self {
64 state: Arc::new(ServerState { token, mode }),
65 }
66 }
67
68 pub async fn run(&self, addr: &str) -> crate::AcbResult<()> {
70 let state = self.state.clone();
71
72 let app = Router::new()
73 .route("/mcp", post(handle_request))
74 .layer(middleware::from_fn_with_state(state.clone(), auth_layer))
75 .route("/health", get(handle_health))
76 .with_state(state);
77
78 let listener = tokio::net::TcpListener::bind(addr)
79 .await
80 .map_err(crate::AcbError::Io)?;
81
82 tracing::info!("HTTP transport listening on {addr}");
83
84 axum::serve(listener, app)
85 .await
86 .map_err(|e| crate::AcbError::Io(std::io::Error::other(e.to_string())))?;
87
88 Ok(())
89 }
90}
91
92#[cfg(feature = "sse")]
94async fn auth_layer(
95 State(state): State<Arc<ServerState>>,
96 headers: HeaderMap,
97 request: axum::extract::Request,
98 next: middleware::Next,
99) -> Response {
100 if let Some(expected) = &state.token {
101 let authorized = headers
102 .get("authorization")
103 .and_then(|v| v.to_str().ok())
104 .and_then(|v| v.strip_prefix("Bearer "))
105 .is_some_and(|token| token == expected);
106
107 if !authorized {
108 return (
109 StatusCode::UNAUTHORIZED,
110 AxumJson(serde_json::json!({
111 "jsonrpc": "2.0",
112 "id": null,
113 "error": {
114 "code": -32900,
115 "message": "Unauthorized"
116 }
117 })),
118 )
119 .into_response();
120 }
121 }
122
123 next.run(request).await
124}
125
126#[cfg(feature = "sse")]
128async fn handle_request(
129 State(state): State<Arc<ServerState>>,
130 headers: HeaderMap,
131 AxumJson(body): AxumJson<serde_json::Value>,
132) -> Result<AxumJson<serde_json::Value>, Response> {
133 let raw = serde_json::to_string(&body).map_err(|_| {
134 (
135 StatusCode::BAD_REQUEST,
136 AxumJson(serde_json::json!({
137 "jsonrpc": "2.0",
138 "id": null,
139 "error": {
140 "code": -32700,
141 "message": "Parse error"
142 }
143 })),
144 )
145 .into_response()
146 })?;
147
148 let server = match &state.mode {
149 ServerMode::Single(server) => server.clone(),
150 ServerMode::MultiTenant { registry, .. } => {
151 let user_id = headers
152 .get("x-user-id")
153 .and_then(|v| v.to_str().ok())
154 .ok_or_else(|| {
155 (
156 StatusCode::BAD_REQUEST,
157 AxumJson(serde_json::json!({
158 "jsonrpc": "2.0",
159 "id": null,
160 "error": {
161 "code": -32901,
162 "message": "Missing X-User-ID header (required in multi-tenant mode)"
163 }
164 })),
165 )
166 .into_response()
167 })?;
168
169 let mut reg = registry.lock().await;
170 reg.get_or_create(user_id).map_err(|e| {
171 (
172 StatusCode::INTERNAL_SERVER_ERROR,
173 AxumJson(serde_json::json!({
174 "jsonrpc": "2.0",
175 "id": null,
176 "error": {
177 "code": -32603,
178 "message": format!("Failed to open graph for user '{user_id}': {e}")
179 }
180 })),
181 )
182 .into_response()
183 })?
184 }
185 };
186
187 let response_str = {
188 let mut srv = server.lock().await;
189 srv.handle_raw(&raw)
190 };
191
192 let response_value: serde_json::Value =
193 serde_json::from_str(&response_str).unwrap_or(serde_json::Value::Null);
194
195 Ok(AxumJson(response_value))
196}
197
198#[cfg(feature = "sse")]
200async fn handle_health(State(state): State<Arc<ServerState>>) -> AxumJson<serde_json::Value> {
201 let profile = std::env::var("ACB_AUTONOMIC_PROFILE")
202 .unwrap_or_else(|_| "desktop".to_string())
203 .trim()
204 .to_ascii_lowercase();
205 let migration_policy = std::env::var("ACB_STORAGE_MIGRATION_POLICY")
206 .unwrap_or_else(|_| "auto-safe".to_string())
207 .trim()
208 .to_ascii_lowercase();
209 let ledger_dir = std::env::var("ACB_HEALTH_LEDGER_DIR")
210 .ok()
211 .or_else(|| std::env::var("AGENTRA_HEALTH_LEDGER_DIR").ok())
212 .unwrap_or_else(|| "~/.agentra/health-ledger".to_string());
213
214 let mut health = serde_json::json!({
215 "status": "ok",
216 "version": env!("CARGO_PKG_VERSION"),
217 "autonomic": {
218 "profile": profile,
219 "migration_policy": migration_policy,
220 "health_ledger_dir": ledger_dir,
221 }
222 });
223
224 if let ServerMode::MultiTenant { registry, .. } = &state.mode {
225 let reg = registry.lock().await;
226 health["tenants"] = serde_json::json!(reg.count());
227 }
228
229 AxumJson(health)
230}