1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12use bytes::Bytes;
13use http::{Request, Response, StatusCode};
14use tower::Service;
15use tracing::{error, info, warn};
16
17use turbomcp_protocol::RequestContext;
18use turbomcp_protocol::jsonrpc::{
19 JsonRpcError, JsonRpcMessage, JsonRpcResponse, JsonRpcResponsePayload, JsonRpcVersion,
20 ResponseId,
21};
22
23use crate::{
24 ServerError, metrics::ServerMetrics, registry::HandlerRegistry, routing::RequestRouter,
25};
26
27#[derive(Debug, Clone)]
29pub struct McpService {
30 registry: Arc<HandlerRegistry>,
31 router: Arc<RequestRouter>,
32 metrics: Arc<ServerMetrics>,
33}
34
35impl McpService {
36 pub fn new(
38 registry: Arc<HandlerRegistry>,
39 router: Arc<RequestRouter>,
40 metrics: Arc<ServerMetrics>,
41 ) -> Self {
42 Self {
43 registry,
44 router,
45 metrics,
46 }
47 }
48
49 async fn process_jsonrpc(
51 &self,
52 message: JsonRpcMessage,
53 ctx: RequestContext,
54 ) -> Option<JsonRpcResponse> {
55 match message {
56 JsonRpcMessage::Request(req) => {
57 info!(
58 request_id = ?req.id,
59 method = %req.method,
60 "Processing JSON-RPC request"
61 );
62
63 self.metrics.record_request_start();
65
66 let start_time = std::time::Instant::now();
67
68 let response = self.router.route(req, ctx).await;
70
71 let duration = start_time.elapsed();
72
73 match &response.payload {
75 JsonRpcResponsePayload::Success { .. } => {
76 self.metrics.record_request_success(duration);
77 }
78 JsonRpcResponsePayload::Error { error } => {
79 let error_type = match error.code {
81 -32700 => "validation", -32600 => "validation", -32601 => "validation", -32602 => "validation", -32603 => "internal", _ => "unknown",
87 };
88 self.metrics.record_request_failure(error_type, duration);
89 }
90 }
91
92 Some(response)
93 }
94 JsonRpcMessage::Notification(notif) => {
95 info!(method = %notif.method, "Received notification (fire-and-forget)");
98
99 None
102 }
103 JsonRpcMessage::Response(_) => {
104 warn!("Received JSON-RPC response (unexpected)");
105 Some(JsonRpcResponse {
106 jsonrpc: JsonRpcVersion,
107 payload: JsonRpcResponsePayload::Error {
108 error: JsonRpcError {
109 code: -32600,
110 message: "Invalid request: unexpected response".to_string(),
111 data: None,
112 },
113 },
114 id: ResponseId::null(),
115 })
116 }
117 #[allow(deprecated)]
120 JsonRpcMessage::RequestBatch(_) => {
121 warn!("Received JSON-RPC request batch (not supported per MCP 2025-06-18)");
122 Some(JsonRpcResponse {
123 jsonrpc: JsonRpcVersion,
124 payload: JsonRpcResponsePayload::Error {
125 error: JsonRpcError {
126 code: -32601,
127 message: "Batch requests are not supported per MCP specification"
128 .to_string(),
129 data: None,
130 },
131 },
132 id: ResponseId::null(),
133 })
134 }
135 #[allow(deprecated)]
136 JsonRpcMessage::ResponseBatch(_) => {
137 warn!(
138 "Received JSON-RPC response batch (unexpected, not supported per MCP 2025-06-18)"
139 );
140 Some(JsonRpcResponse {
141 jsonrpc: JsonRpcVersion,
142 payload: JsonRpcResponsePayload::Error {
143 error: JsonRpcError {
144 code: -32600,
145 message: "Invalid request: response batches not supported per MCP specification".to_string(),
146 data: None,
147 },
148 },
149 id: ResponseId::null(),
150 })
151 }
152 #[allow(deprecated)]
153 JsonRpcMessage::MessageBatch(_) => {
154 warn!("Received JSON-RPC message batch (not supported per MCP 2025-06-18)");
155 Some(JsonRpcResponse {
156 jsonrpc: JsonRpcVersion,
157 payload: JsonRpcResponsePayload::Error {
158 error: JsonRpcError {
159 code: -32601,
160 message: "Message batches are not supported per MCP specification"
161 .to_string(),
162 data: None,
163 },
164 },
165 id: ResponseId::null(),
166 })
167 }
168 }
169 }
170}
171
172impl Service<Request<Bytes>> for McpService {
173 type Response = Response<Bytes>;
174 type Error = ServerError;
175 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
176
177 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
178 Poll::Ready(Ok(()))
179 }
180
181 fn call(&mut self, req: Request<Bytes>) -> Self::Future {
182 let registry = Arc::clone(&self.registry);
183 let router = Arc::clone(&self.router);
184 let metrics = Arc::clone(&self.metrics);
185
186 Box::pin(async move {
187 let (parts, body) = req.into_parts();
189
190 let headers: std::collections::HashMap<String, String> = parts
192 .headers
193 .iter()
194 .filter_map(|(name, value)| {
195 value
196 .to_str()
197 .ok()
198 .map(|v| (name.to_string(), v.to_string()))
199 })
200 .collect();
201
202 let json_str = match std::str::from_utf8(&body) {
204 Ok(s) => s,
205 Err(e) => {
206 error!("Invalid UTF-8 in request body: {}", e);
207 let error_response = JsonRpcResponse {
208 jsonrpc: JsonRpcVersion,
209 payload: JsonRpcResponsePayload::Error {
210 error: JsonRpcError {
211 code: -32700,
212 message: "Parse error: invalid UTF-8".to_string(),
213 data: None,
214 },
215 },
216 id: ResponseId::null(),
217 };
218 let response_json = serde_json::to_string(&error_response)
219 .unwrap_or_else(|_| r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}}"#.to_string());
220
221 return Ok(Response::builder()
222 .status(StatusCode::BAD_REQUEST)
223 .header("content-type", "application/json")
224 .body(Bytes::from(response_json))
225 .unwrap());
226 }
227 };
228
229 let parsed = serde_json::from_str::<JsonRpcMessage>(json_str);
231 let response_opt = match parsed {
232 Ok(message) => {
233 let headers_map: HashMap<String, String> = headers
236 .iter()
237 .map(|(name, value)| (name.clone(), value.clone()))
238 .collect();
239
240 let ctx = router.create_context(Some(headers_map), Some("http"));
241
242 let service = McpService::new(registry, router, metrics);
243 service.process_jsonrpc(message, ctx).await
244 }
245 Err(e) => {
246 error!("Failed to parse JSON-RPC: {}", e);
247 Some(JsonRpcResponse {
248 jsonrpc: JsonRpcVersion,
249 payload: JsonRpcResponsePayload::Error {
250 error: JsonRpcError {
251 code: -32700,
252 message: format!("Parse error: {}", e),
253 data: None,
254 },
255 },
256 id: ResponseId::null(),
257 })
258 }
259 };
260
261 let Some(response) = response_opt else {
263 return Ok(Response::builder()
264 .status(StatusCode::NO_CONTENT)
265 .body(Bytes::new())
266 .unwrap());
267 };
268
269 let response_json = match serde_json::to_string(&response) {
271 Ok(json) => json,
272 Err(e) => {
273 error!("Failed to serialize JSON-RPC response: {}", e);
274 r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error: failed to serialize response"}}"#.to_string()
275 }
276 };
277
278 Ok(Response::builder()
279 .status(StatusCode::OK)
280 .header("content-type", "application/json")
281 .body(Bytes::from(response_json))
282 .unwrap())
283 })
284 }
285}
286
287pub type WrappedMcpService = Box<
289 dyn Service<
290 Request<Bytes>,
291 Response = Response<Bytes>,
292 Error = ServerError,
293 Future = Pin<Box<dyn Future<Output = Result<Response<Bytes>, ServerError>> + Send>>,
294 > + Send
295 + Sync,
296>;