1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12use bytes::Bytes;
13use http::{Request, Response, StatusCode};
14use tower::Service;
15use tracing::{error, info, warn};
16
17use turbomcp_protocol::RequestContext;
18use turbomcp_protocol::jsonrpc::{
19 JsonRpcError, JsonRpcMessage, JsonRpcResponse, JsonRpcResponsePayload, JsonRpcVersion,
20 ResponseId,
21};
22
23use crate::{
24 ServerError, metrics::ServerMetrics, registry::HandlerRegistry, routing::RequestRouter,
25};
26
27#[derive(Debug, Clone)]
29pub struct McpService {
30 registry: Arc<HandlerRegistry>,
31 router: Arc<RequestRouter>,
32 metrics: Arc<ServerMetrics>,
33}
34
35impl McpService {
36 pub fn new(
38 registry: Arc<HandlerRegistry>,
39 router: Arc<RequestRouter>,
40 metrics: Arc<ServerMetrics>,
41 ) -> Self {
42 Self {
43 registry,
44 router,
45 metrics,
46 }
47 }
48
49 async fn process_jsonrpc(
51 &self,
52 message: JsonRpcMessage,
53 ctx: RequestContext,
54 ) -> Option<JsonRpcResponse> {
55 match message {
56 JsonRpcMessage::Request(req) => {
57 info!(
58 request_id = ?req.id,
59 method = %req.method,
60 "Processing JSON-RPC request"
61 );
62
63 self.metrics.record_request_start();
65
66 let start_time = std::time::Instant::now();
67
68 let response = self.router.route(req, ctx).await;
70
71 let duration = start_time.elapsed();
72
73 match &response.payload {
75 JsonRpcResponsePayload::Success { .. } => {
76 self.metrics.record_request_success(duration);
77 }
78 JsonRpcResponsePayload::Error { error } => {
79 let error_type = match error.code {
81 -32700 => "validation", -32600 => "validation", -32601 => "validation", -32602 => "validation", -32603 => "internal", _ => "unknown",
87 };
88 self.metrics.record_request_failure(error_type, duration);
89 }
90 }
91
92 Some(response)
93 }
94 JsonRpcMessage::Notification(notif) => {
95 info!(method = %notif.method, "Received notification (fire-and-forget)");
98
99 None
102 }
103 JsonRpcMessage::Response(_) => {
104 warn!("Received JSON-RPC response (unexpected)");
105 Some(JsonRpcResponse {
106 jsonrpc: JsonRpcVersion,
107 payload: JsonRpcResponsePayload::Error {
108 error: JsonRpcError {
109 code: -32600,
110 message: "Invalid request: unexpected response".to_string(),
111 data: None,
112 },
113 },
114 id: ResponseId::null(),
115 })
116 }
117 JsonRpcMessage::RequestBatch(_) => {
118 warn!("Received JSON-RPC request batch (not yet supported)");
119 Some(JsonRpcResponse {
120 jsonrpc: JsonRpcVersion,
121 payload: JsonRpcResponsePayload::Error {
122 error: JsonRpcError {
123 code: -32601,
124 message: "Batch requests are not yet supported".to_string(),
125 data: None,
126 },
127 },
128 id: ResponseId::null(),
129 })
130 }
131 JsonRpcMessage::ResponseBatch(_) => {
132 warn!("Received JSON-RPC response batch (unexpected)");
133 Some(JsonRpcResponse {
134 jsonrpc: JsonRpcVersion,
135 payload: JsonRpcResponsePayload::Error {
136 error: JsonRpcError {
137 code: -32600,
138 message: "Invalid request: unexpected response batch".to_string(),
139 data: None,
140 },
141 },
142 id: ResponseId::null(),
143 })
144 }
145 JsonRpcMessage::MessageBatch(_) => {
146 warn!("Received JSON-RPC message batch (not yet supported)");
147 Some(JsonRpcResponse {
148 jsonrpc: JsonRpcVersion,
149 payload: JsonRpcResponsePayload::Error {
150 error: JsonRpcError {
151 code: -32601,
152 message: "Message batches are not yet supported".to_string(),
153 data: None,
154 },
155 },
156 id: ResponseId::null(),
157 })
158 }
159 }
160 }
161}
162
163impl Service<Request<Bytes>> for McpService {
164 type Response = Response<Bytes>;
165 type Error = ServerError;
166 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
167
168 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
169 Poll::Ready(Ok(()))
170 }
171
172 fn call(&mut self, req: Request<Bytes>) -> Self::Future {
173 let registry = Arc::clone(&self.registry);
174 let router = Arc::clone(&self.router);
175 let metrics = Arc::clone(&self.metrics);
176
177 Box::pin(async move {
178 let (parts, body) = req.into_parts();
180
181 let headers: std::collections::HashMap<String, String> = parts
183 .headers
184 .iter()
185 .filter_map(|(name, value)| {
186 value
187 .to_str()
188 .ok()
189 .map(|v| (name.to_string(), v.to_string()))
190 })
191 .collect();
192
193 let json_str = match std::str::from_utf8(&body) {
195 Ok(s) => s,
196 Err(e) => {
197 error!("Invalid UTF-8 in request body: {}", e);
198 let error_response = JsonRpcResponse {
199 jsonrpc: JsonRpcVersion,
200 payload: JsonRpcResponsePayload::Error {
201 error: JsonRpcError {
202 code: -32700,
203 message: "Parse error: invalid UTF-8".to_string(),
204 data: None,
205 },
206 },
207 id: ResponseId::null(),
208 };
209 let response_json = serde_json::to_string(&error_response)
210 .unwrap_or_else(|_| r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}}"#.to_string());
211
212 return Ok(Response::builder()
213 .status(StatusCode::BAD_REQUEST)
214 .header("content-type", "application/json")
215 .body(Bytes::from(response_json))
216 .unwrap());
217 }
218 };
219
220 let parsed = serde_json::from_str::<JsonRpcMessage>(json_str);
222 let response_opt = match parsed {
223 Ok(message) => {
224 let headers_map: HashMap<String, String> = headers
227 .iter()
228 .map(|(name, value)| (name.clone(), value.clone()))
229 .collect();
230
231 let ctx = router.create_context(Some(headers_map), Some("http"));
232
233 let service = McpService::new(registry, router, metrics);
234 service.process_jsonrpc(message, ctx).await
235 }
236 Err(e) => {
237 error!("Failed to parse JSON-RPC: {}", e);
238 Some(JsonRpcResponse {
239 jsonrpc: JsonRpcVersion,
240 payload: JsonRpcResponsePayload::Error {
241 error: JsonRpcError {
242 code: -32700,
243 message: format!("Parse error: {}", e),
244 data: None,
245 },
246 },
247 id: ResponseId::null(),
248 })
249 }
250 };
251
252 let Some(response) = response_opt else {
254 return Ok(Response::builder()
255 .status(StatusCode::NO_CONTENT)
256 .body(Bytes::new())
257 .unwrap());
258 };
259
260 let response_json = match serde_json::to_string(&response) {
262 Ok(json) => json,
263 Err(e) => {
264 error!("Failed to serialize JSON-RPC response: {}", e);
265 r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error: failed to serialize response"}}"#.to_string()
266 }
267 };
268
269 Ok(Response::builder()
270 .status(StatusCode::OK)
271 .header("content-type", "application/json")
272 .body(Bytes::from(response_json))
273 .unwrap())
274 })
275 }
276}
277
278pub type WrappedMcpService = Box<
280 dyn Service<
281 Request<Bytes>,
282 Response = Response<Bytes>,
283 Error = ServerError,
284 Future = Pin<Box<dyn Future<Output = Result<Response<Bytes>, ServerError>> + Send>>,
285 > + Send
286 + Sync,
287>;