model_context_protocol/server/
http.rs1use std::sync::Arc;
21
22use actix_web::{web, App, HttpResponse, HttpServer};
23use tokio::sync::mpsc;
24
25use super::{McpServer, McpServerConfig, ServerError};
26use crate::protocol::{ClientInbound, JsonRpcId, JsonRpcMessage, JsonRpcResponse, ServerOutbound};
27
28struct AppState {
30 inbound_tx: mpsc::Sender<ClientInbound>,
31 server: Arc<McpServer>,
32}
33
34pub struct McpHttpServer;
41
42impl McpHttpServer {
43 pub async fn run(config: McpServerConfig, host: &str, port: u16) -> Result<(), ServerError> {
67 let (server, mut channels) = McpServer::new(config);
68 let inbound_tx = channels.inbound_tx.clone();
69
70 let _outbound_handle = tokio::spawn(async move {
72 while let Some(outbound) = channels.outbound_rx.recv().await {
73 match &outbound {
76 ServerOutbound::Notification(n) => {
77 eprintln!("[MCP] Notification: {}", n.method);
78 }
79 ServerOutbound::Request(r) => {
80 eprintln!("[MCP] Server request: {}", r.method);
81 }
82 _ => {}
83 }
84 }
85 });
86
87 let state = web::Data::new(AppState {
88 inbound_tx,
89 server: Arc::clone(&server),
90 });
91
92 HttpServer::new(move || {
93 let state = state.clone();
94
95 App::new()
96 .app_data(state)
97 .route("/rpc", web::post().to(handle_rpc))
98 .route("/tools", web::get().to(handle_tools_list))
99 .route("/call", web::post().to(handle_tool_call))
100 .route("/health", web::get().to(handle_health))
101 })
102 .bind((host, port))
103 .map_err(|e| ServerError::Io(std::io::Error::new(std::io::ErrorKind::AddrInUse, e)))?
104 .run()
105 .await
106 .map_err(|e| ServerError::Io(std::io::Error::other(e)))
107 }
108}
109
110async fn handle_rpc(state: web::Data<AppState>, body: String) -> HttpResponse {
112 let message = match JsonRpcMessage::parse(&body) {
113 Ok(m) => m,
114 Err(e) => {
115 let error_response = JsonRpcResponse::error(
116 JsonRpcId::Null,
117 -32700,
118 format!("Parse error: {}", e),
119 None,
120 );
121 return HttpResponse::Ok().json(error_response);
122 }
123 };
124
125 match message {
128 JsonRpcMessage::Request(request) => {
129 let response = handle_request_directly(&state.server, request).await;
131 HttpResponse::Ok().json(response)
132 }
133 JsonRpcMessage::Notification(notification) => {
134 let inbound = ClientInbound::Notification(notification);
136 let _ = state.inbound_tx.send(inbound).await;
137 HttpResponse::NoContent().finish()
138 }
139 JsonRpcMessage::Response(_) => HttpResponse::BadRequest().json(serde_json::json!({
140 "error": "Unexpected response message"
141 })),
142 }
143}
144
145async fn handle_request_directly(
147 server: &McpServer,
148 request: crate::protocol::JsonRpcRequest,
149) -> JsonRpcResponse {
150 match request.method.as_str() {
151 "initialize" => {
152 JsonRpcResponse::success(
153 request.id,
154 serde_json::json!({
155 "protocolVersion": crate::protocol::MCP_PROTOCOL_VERSION,
156 "serverInfo": server.server_info(),
157 "capabilities": {} }),
159 )
160 }
161 "tools/list" => {
162 let tools = server.list_tools();
163 JsonRpcResponse::success(request.id, serde_json::json!({ "tools": tools }))
164 }
165 "tools/call" => {
166 let params = match request.params {
167 Some(p) => p,
168 None => {
169 return JsonRpcResponse::error(
170 request.id,
171 -32602,
172 "Missing params".to_string(),
173 None,
174 );
175 }
176 };
177
178 let name = match params.get("name").and_then(|n| n.as_str()) {
179 Some(n) => n,
180 None => {
181 return JsonRpcResponse::error(
182 request.id,
183 -32602,
184 "Missing tool name".to_string(),
185 None,
186 );
187 }
188 };
189
190 let arguments = params
191 .get("arguments")
192 .cloned()
193 .unwrap_or(serde_json::json!({}));
194
195 let result = server.call_tool(name, arguments).await;
196
197 match result {
198 Ok(content) => JsonRpcResponse::success(
199 request.id,
200 serde_json::json!({
201 "content": content,
202 "isError": false
203 }),
204 ),
205 Err(e) => JsonRpcResponse::success(
206 request.id,
207 serde_json::json!({
208 "content": [{ "type": "text", "text": e.to_string() }],
209 "isError": true
210 }),
211 ),
212 }
213 }
214 "ping" => JsonRpcResponse::success(request.id, serde_json::json!({})),
215 _ => JsonRpcResponse::error(
216 request.id,
217 -32601,
218 format!("Method not found: {}", request.method),
219 None,
220 ),
221 }
222}
223
224async fn handle_tools_list(state: web::Data<AppState>) -> HttpResponse {
226 let tools = state.server.list_tools();
227 HttpResponse::Ok().json(tools)
228}
229
230#[derive(serde::Deserialize)]
232struct CallToolRequest {
233 name: String,
234 arguments: serde_json::Value,
235}
236
237async fn handle_tool_call(
239 state: web::Data<AppState>,
240 body: web::Json<CallToolRequest>,
241) -> HttpResponse {
242 let result = state
243 .server
244 .call_tool(&body.name, body.arguments.clone())
245 .await;
246
247 match result {
248 Ok(content) => HttpResponse::Ok().json(content),
249 Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({
250 "error": e.to_string()
251 })),
252 }
253}
254
255async fn handle_health(state: web::Data<AppState>) -> HttpResponse {
257 let status = state.server.status();
258 HttpResponse::Ok().json(serde_json::json!({
259 "status": format!("{:?}", status),
260 "name": state.server.name(),
261 "version": state.server.version()
262 }))
263}
264
265#[cfg(test)]
266mod tests {
267 #[test]
268 fn test_http_server_module_exists() {
269 }
272}