turbomcp_server/
service.rs

1//! Core MCP service implementation using Tower pattern
2//!
3//! This module provides the core MCP service that can be wrapped with middleware
4//! layers to create a complete, production-ready MCP server.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12use bytes::Bytes;
13use http::{Request, Response, StatusCode};
14use tower::Service;
15use tracing::{error, info, warn};
16
17use turbomcp_protocol::RequestContext;
18use turbomcp_protocol::jsonrpc::{
19    JsonRpcError, JsonRpcMessage, JsonRpcResponse, JsonRpcResponsePayload, JsonRpcVersion,
20    ResponseId,
21};
22
23use crate::{
24    ServerError, metrics::ServerMetrics, registry::HandlerRegistry, routing::RequestRouter,
25};
26
27/// Core MCP service that handles JSON-RPC requests
28#[derive(Debug, Clone)]
29pub struct McpService {
30    registry: Arc<HandlerRegistry>,
31    router: Arc<RequestRouter>,
32    metrics: Arc<ServerMetrics>,
33}
34
35impl McpService {
36    /// Create a new MCP service
37    pub fn new(
38        registry: Arc<HandlerRegistry>,
39        router: Arc<RequestRouter>,
40        metrics: Arc<ServerMetrics>,
41    ) -> Self {
42        Self {
43            registry,
44            router,
45            metrics,
46        }
47    }
48
49    /// Process a JSON-RPC message and return a response (None for notifications)
50    async fn process_jsonrpc(
51        &self,
52        message: JsonRpcMessage,
53        ctx: RequestContext,
54    ) -> Option<JsonRpcResponse> {
55        match message {
56            JsonRpcMessage::Request(req) => {
57                info!(
58                    request_id = ?req.id,
59                    method = %req.method,
60                    "Processing JSON-RPC request"
61                );
62
63                // Record request start
64                self.metrics.record_request_start();
65
66                let start_time = std::time::Instant::now();
67
68                // Route the request through our business logic
69                let response = self.router.route(req, ctx).await;
70
71                let duration = start_time.elapsed();
72
73                // Update metrics based on response
74                match &response.payload {
75                    JsonRpcResponsePayload::Success { .. } => {
76                        self.metrics.record_request_success(duration);
77                    }
78                    JsonRpcResponsePayload::Error { error } => {
79                        // Categorize error type for metrics
80                        let error_type = match error.code {
81                            -32700 => "validation", // Parse error
82                            -32600 => "validation", // Invalid Request
83                            -32601 => "validation", // Method not found
84                            -32602 => "validation", // Invalid params
85                            -32603 => "internal",   // Internal error
86                            _ => "unknown",
87                        };
88                        self.metrics.record_request_failure(error_type, duration);
89                    }
90                }
91
92                Some(response)
93            }
94            JsonRpcMessage::Notification(notif) => {
95                // JSON-RPC 2.0 spec: "The Server MUST NOT reply to a Notification"
96                // Notifications are fire-and-forget. We log them but don't respond.
97                info!(method = %notif.method, "Received notification (fire-and-forget)");
98
99                // For MCP protocol, notifications/initialized is expected and valid
100                // We acknowledge it but send no response per JSON-RPC spec
101                None
102            }
103            JsonRpcMessage::Response(_) => {
104                warn!("Received JSON-RPC response (unexpected)");
105                Some(JsonRpcResponse {
106                    jsonrpc: JsonRpcVersion,
107                    payload: JsonRpcResponsePayload::Error {
108                        error: JsonRpcError {
109                            code: -32600,
110                            message: "Invalid request: unexpected response".to_string(),
111                            data: None,
112                        },
113                    },
114                    id: ResponseId::null(),
115                })
116            }
117            JsonRpcMessage::RequestBatch(_) => {
118                warn!("Received JSON-RPC request batch (not yet supported)");
119                Some(JsonRpcResponse {
120                    jsonrpc: JsonRpcVersion,
121                    payload: JsonRpcResponsePayload::Error {
122                        error: JsonRpcError {
123                            code: -32601,
124                            message: "Batch requests are not yet supported".to_string(),
125                            data: None,
126                        },
127                    },
128                    id: ResponseId::null(),
129                })
130            }
131            JsonRpcMessage::ResponseBatch(_) => {
132                warn!("Received JSON-RPC response batch (unexpected)");
133                Some(JsonRpcResponse {
134                    jsonrpc: JsonRpcVersion,
135                    payload: JsonRpcResponsePayload::Error {
136                        error: JsonRpcError {
137                            code: -32600,
138                            message: "Invalid request: unexpected response batch".to_string(),
139                            data: None,
140                        },
141                    },
142                    id: ResponseId::null(),
143                })
144            }
145            JsonRpcMessage::MessageBatch(_) => {
146                warn!("Received JSON-RPC message batch (not yet supported)");
147                Some(JsonRpcResponse {
148                    jsonrpc: JsonRpcVersion,
149                    payload: JsonRpcResponsePayload::Error {
150                        error: JsonRpcError {
151                            code: -32601,
152                            message: "Message batches are not yet supported".to_string(),
153                            data: None,
154                        },
155                    },
156                    id: ResponseId::null(),
157                })
158            }
159        }
160    }
161}
162
163impl Service<Request<Bytes>> for McpService {
164    type Response = Response<Bytes>;
165    type Error = ServerError;
166    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
167
168    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
169        Poll::Ready(Ok(()))
170    }
171
172    fn call(&mut self, req: Request<Bytes>) -> Self::Future {
173        let registry = Arc::clone(&self.registry);
174        let router = Arc::clone(&self.router);
175        let metrics = Arc::clone(&self.metrics);
176
177        Box::pin(async move {
178            // Extract headers before consuming the request
179            let (parts, body) = req.into_parts();
180
181            // Convert headers to a HashMap for metadata
182            let headers: std::collections::HashMap<String, String> = parts
183                .headers
184                .iter()
185                .filter_map(|(name, value)| {
186                    value
187                        .to_str()
188                        .ok()
189                        .map(|v| (name.to_string(), v.to_string()))
190                })
191                .collect();
192
193            // Extract the body as a string
194            let json_str = match std::str::from_utf8(&body) {
195                Ok(s) => s,
196                Err(e) => {
197                    error!("Invalid UTF-8 in request body: {}", e);
198                    let error_response = JsonRpcResponse {
199                        jsonrpc: JsonRpcVersion,
200                        payload: JsonRpcResponsePayload::Error {
201                            error: JsonRpcError {
202                                code: -32700,
203                                message: "Parse error: invalid UTF-8".to_string(),
204                                data: None,
205                            },
206                        },
207                        id: ResponseId::null(),
208                    };
209                    let response_json = serde_json::to_string(&error_response)
210                        .unwrap_or_else(|_| r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}}"#.to_string());
211
212                    return Ok(Response::builder()
213                        .status(StatusCode::BAD_REQUEST)
214                        .header("content-type", "application/json")
215                        .body(Bytes::from(response_json))
216                        .unwrap());
217                }
218            };
219
220            // Parse JSON-RPC message
221            let parsed = serde_json::from_str::<JsonRpcMessage>(json_str);
222            let response_opt = match parsed {
223                Ok(message) => {
224                    // Create properly configured context with server-to-client capabilities
225                    // Extract headers into HashMap for propagation
226                    let headers_map: HashMap<String, String> = headers
227                        .iter()
228                        .map(|(name, value)| (name.clone(), value.clone()))
229                        .collect();
230
231                    let ctx = router.create_context(Some(headers_map), Some("http"));
232
233                    let service = McpService::new(registry, router, metrics);
234                    service.process_jsonrpc(message, ctx).await
235                }
236                Err(e) => {
237                    error!("Failed to parse JSON-RPC: {}", e);
238                    Some(JsonRpcResponse {
239                        jsonrpc: JsonRpcVersion,
240                        payload: JsonRpcResponsePayload::Error {
241                            error: JsonRpcError {
242                                code: -32700,
243                                message: format!("Parse error: {}", e),
244                                data: None,
245                            },
246                        },
247                        id: ResponseId::null(),
248                    })
249                }
250            };
251
252            // If no response (notification), return 204 No Content
253            let Some(response) = response_opt else {
254                return Ok(Response::builder()
255                    .status(StatusCode::NO_CONTENT)
256                    .body(Bytes::new())
257                    .unwrap());
258            };
259
260            // Serialize response
261            let response_json = match serde_json::to_string(&response) {
262                Ok(json) => json,
263                Err(e) => {
264                    error!("Failed to serialize JSON-RPC response: {}", e);
265                    r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error: failed to serialize response"}}"#.to_string()
266                }
267            };
268
269            Ok(Response::builder()
270                .status(StatusCode::OK)
271                .header("content-type", "application/json")
272                .body(Bytes::from(response_json))
273                .unwrap())
274        })
275    }
276}
277
278/// Type alias for the complete middleware-wrapped MCP service
279pub type WrappedMcpService = Box<
280    dyn Service<
281            Request<Bytes>,
282            Response = Response<Bytes>,
283            Error = ServerError,
284            Future = Pin<Box<dyn Future<Output = Result<Response<Bytes>, ServerError>> + Send>>,
285        > + Send
286        + Sync,
287>;