turbomcp_server/
service.rs

1//! Core MCP service implementation using Tower pattern
2//!
3//! This module provides the core MCP service that can be wrapped with middleware
4//! layers to create a complete, production-ready MCP server.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12use bytes::Bytes;
13use http::{Request, Response, StatusCode};
14use tower::Service;
15use tracing::{error, info, warn};
16
17use turbomcp_protocol::RequestContext;
18use turbomcp_protocol::jsonrpc::{
19    JsonRpcError, JsonRpcMessage, JsonRpcResponse, JsonRpcResponsePayload, JsonRpcVersion,
20    ResponseId,
21};
22
23use crate::{
24    ServerError, metrics::ServerMetrics, registry::HandlerRegistry, routing::RequestRouter,
25};
26
27/// Core MCP service that handles JSON-RPC requests
28#[derive(Debug, Clone)]
29pub struct McpService {
30    registry: Arc<HandlerRegistry>,
31    router: Arc<RequestRouter>,
32    metrics: Arc<ServerMetrics>,
33}
34
35impl McpService {
36    /// Create a new MCP service
37    pub fn new(
38        registry: Arc<HandlerRegistry>,
39        router: Arc<RequestRouter>,
40        metrics: Arc<ServerMetrics>,
41    ) -> Self {
42        Self {
43            registry,
44            router,
45            metrics,
46        }
47    }
48
49    /// Process a JSON-RPC message and return a response (None for notifications)
50    async fn process_jsonrpc(
51        &self,
52        message: JsonRpcMessage,
53        ctx: RequestContext,
54    ) -> Option<JsonRpcResponse> {
55        match message {
56            JsonRpcMessage::Request(req) => {
57                info!(
58                    request_id = ?req.id,
59                    method = %req.method,
60                    "Processing JSON-RPC request"
61                );
62
63                // Record request start
64                self.metrics.record_request_start();
65
66                let start_time = std::time::Instant::now();
67
68                // Route the request through our business logic
69                let response = self.router.route(req, ctx).await;
70
71                let duration = start_time.elapsed();
72
73                // Update metrics based on response
74                match &response.payload {
75                    JsonRpcResponsePayload::Success { .. } => {
76                        self.metrics.record_request_success(duration);
77                    }
78                    JsonRpcResponsePayload::Error { error } => {
79                        // Categorize error type for metrics
80                        let error_type = match error.code {
81                            -32700 => "validation", // Parse error
82                            -32600 => "validation", // Invalid Request
83                            -32601 => "validation", // Method not found
84                            -32602 => "validation", // Invalid params
85                            -32603 => "internal",   // Internal error
86                            _ => "unknown",
87                        };
88                        self.metrics.record_request_failure(error_type, duration);
89                    }
90                }
91
92                Some(response)
93            }
94            JsonRpcMessage::Notification(notif) => {
95                // JSON-RPC 2.0 spec: "The Server MUST NOT reply to a Notification"
96                // Notifications are fire-and-forget. We log them but don't respond.
97                info!(method = %notif.method, "Received notification (fire-and-forget)");
98
99                // For MCP protocol, notifications/initialized is expected and valid
100                // We acknowledge it but send no response per JSON-RPC spec
101                None
102            }
103            JsonRpcMessage::Response(_) => {
104                warn!("Received JSON-RPC response (unexpected)");
105                Some(JsonRpcResponse {
106                    jsonrpc: JsonRpcVersion,
107                    payload: JsonRpcResponsePayload::Error {
108                        error: JsonRpcError {
109                            code: -32600,
110                            message: "Invalid request: unexpected response".to_string(),
111                            data: None,
112                        },
113                    },
114                    id: ResponseId::null(),
115                })
116            }
117            // Allow deprecated for defensive pattern matching on batch types
118            // These exist only to return proper errors per MCP 2025-06-18 spec
119            #[allow(deprecated)]
120            JsonRpcMessage::RequestBatch(_) => {
121                warn!("Received JSON-RPC request batch (not supported per MCP 2025-06-18)");
122                Some(JsonRpcResponse {
123                    jsonrpc: JsonRpcVersion,
124                    payload: JsonRpcResponsePayload::Error {
125                        error: JsonRpcError {
126                            code: -32601,
127                            message: "Batch requests are not supported per MCP specification"
128                                .to_string(),
129                            data: None,
130                        },
131                    },
132                    id: ResponseId::null(),
133                })
134            }
135            #[allow(deprecated)]
136            JsonRpcMessage::ResponseBatch(_) => {
137                warn!(
138                    "Received JSON-RPC response batch (unexpected, not supported per MCP 2025-06-18)"
139                );
140                Some(JsonRpcResponse {
141                    jsonrpc: JsonRpcVersion,
142                    payload: JsonRpcResponsePayload::Error {
143                        error: JsonRpcError {
144                            code: -32600,
145                            message: "Invalid request: response batches not supported per MCP specification".to_string(),
146                            data: None,
147                        },
148                    },
149                    id: ResponseId::null(),
150                })
151            }
152            #[allow(deprecated)]
153            JsonRpcMessage::MessageBatch(_) => {
154                warn!("Received JSON-RPC message batch (not supported per MCP 2025-06-18)");
155                Some(JsonRpcResponse {
156                    jsonrpc: JsonRpcVersion,
157                    payload: JsonRpcResponsePayload::Error {
158                        error: JsonRpcError {
159                            code: -32601,
160                            message: "Message batches are not supported per MCP specification"
161                                .to_string(),
162                            data: None,
163                        },
164                    },
165                    id: ResponseId::null(),
166                })
167            }
168        }
169    }
170}
171
172impl Service<Request<Bytes>> for McpService {
173    type Response = Response<Bytes>;
174    type Error = ServerError;
175    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
176
177    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
178        Poll::Ready(Ok(()))
179    }
180
181    fn call(&mut self, req: Request<Bytes>) -> Self::Future {
182        let registry = Arc::clone(&self.registry);
183        let router = Arc::clone(&self.router);
184        let metrics = Arc::clone(&self.metrics);
185
186        Box::pin(async move {
187            // Extract headers before consuming the request
188            let (parts, body) = req.into_parts();
189
190            // Convert headers to a HashMap for metadata
191            let headers: std::collections::HashMap<String, String> = parts
192                .headers
193                .iter()
194                .filter_map(|(name, value)| {
195                    value
196                        .to_str()
197                        .ok()
198                        .map(|v| (name.to_string(), v.to_string()))
199                })
200                .collect();
201
202            // Extract the body as a string
203            let json_str = match std::str::from_utf8(&body) {
204                Ok(s) => s,
205                Err(e) => {
206                    error!("Invalid UTF-8 in request body: {}", e);
207                    let error_response = JsonRpcResponse {
208                        jsonrpc: JsonRpcVersion,
209                        payload: JsonRpcResponsePayload::Error {
210                            error: JsonRpcError {
211                                code: -32700,
212                                message: "Parse error: invalid UTF-8".to_string(),
213                                data: None,
214                            },
215                        },
216                        id: ResponseId::null(),
217                    };
218                    let response_json = serde_json::to_string(&error_response)
219                        .unwrap_or_else(|_| r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}}"#.to_string());
220
221                    return Ok(Response::builder()
222                        .status(StatusCode::BAD_REQUEST)
223                        .header("content-type", "application/json")
224                        .body(Bytes::from(response_json))
225                        .unwrap());
226                }
227            };
228
229            // Parse JSON-RPC message
230            let parsed = serde_json::from_str::<JsonRpcMessage>(json_str);
231            let response_opt = match parsed {
232                Ok(message) => {
233                    // Create properly configured context with server-to-client capabilities
234                    // Extract headers into HashMap for propagation
235                    let headers_map: HashMap<String, String> = headers
236                        .iter()
237                        .map(|(name, value)| (name.clone(), value.clone()))
238                        .collect();
239
240                    let ctx = router.create_context(Some(headers_map), Some("http"));
241
242                    let service = McpService::new(registry, router, metrics);
243                    service.process_jsonrpc(message, ctx).await
244                }
245                Err(e) => {
246                    error!("Failed to parse JSON-RPC: {}", e);
247                    Some(JsonRpcResponse {
248                        jsonrpc: JsonRpcVersion,
249                        payload: JsonRpcResponsePayload::Error {
250                            error: JsonRpcError {
251                                code: -32700,
252                                message: format!("Parse error: {}", e),
253                                data: None,
254                            },
255                        },
256                        id: ResponseId::null(),
257                    })
258                }
259            };
260
261            // If no response (notification), return 204 No Content
262            let Some(response) = response_opt else {
263                return Ok(Response::builder()
264                    .status(StatusCode::NO_CONTENT)
265                    .body(Bytes::new())
266                    .unwrap());
267            };
268
269            // Serialize response
270            let response_json = match serde_json::to_string(&response) {
271                Ok(json) => json,
272                Err(e) => {
273                    error!("Failed to serialize JSON-RPC response: {}", e);
274                    r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error: failed to serialize response"}}"#.to_string()
275                }
276            };
277
278            Ok(Response::builder()
279                .status(StatusCode::OK)
280                .header("content-type", "application/json")
281                .body(Bytes::from(response_json))
282                .unwrap())
283        })
284    }
285}
286
287/// Type alias for the complete middleware-wrapped MCP service
288pub type WrappedMcpService = Box<
289    dyn Service<
290            Request<Bytes>,
291            Response = Response<Bytes>,
292            Error = ServerError,
293            Future = Pin<Box<dyn Future<Output = Result<Response<Bytes>, ServerError>> + Send>>,
294        > + Send
295        + Sync,
296>;