1use mcp_core_rs::protocol::{
2 constants::{INTERNAL_ERROR, INVALID_REQUEST, PARSE_ERROR},
3 error::ErrorData,
4 message::{JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse},
5};
6use mcp_error_rs::{BoxError, Error, Result};
7use mcp_transport_rs::server::traits::ServerTransport;
8use tower_service::Service;
9
10pub struct Server<S> {
11 service: S,
12}
13
14impl<S> Server<S>
15where
16 S: Service<JsonRpcRequest, Response = JsonRpcResponse> + Send,
17 S::Error: Into<BoxError>,
18 S::Future: Send,
19{
20 pub fn new(service: S) -> Self {
21 Self { service }
22 }
23
24 pub async fn run(self, mut transport: impl ServerTransport) -> Result<()> {
25 let mut service = self.service;
26
27 tracing::info!("Server started");
28 while let Some(msg_result) = transport.read_message().await {
29 let _span = tracing::span!(tracing::Level::INFO, "message_processing");
30 let _enter = _span.enter();
31
32 match msg_result {
33 Ok(msg) => {
34 Self::handle_message(&mut service, &mut transport, msg).await?;
35 }
36 Err(e) => {
37 Self::handle_error(&mut transport, e).await?;
38 }
39 }
40 }
41 tracing::info!("Server transport closed, exiting run loop");
42
43 Ok(())
44 }
45
46 async fn handle_message(
47 service: &mut S,
48 transport: &mut impl ServerTransport,
49 msg: JsonRpcMessage,
50 ) -> Result<()> {
51 match msg {
52 JsonRpcMessage::Request(request) => {
53 let response = Self::process_request(service, request).await;
54 Self::send_response(transport, response).await?;
55 }
56 JsonRpcMessage::Response(_)
57 | JsonRpcMessage::Notification(_)
58 | JsonRpcMessage::Nil
59 | JsonRpcMessage::Error(_) => {
60 }
62 }
63 Ok(())
64 }
65
66 async fn process_request(service: &mut S, request: JsonRpcRequest) -> JsonRpcResponse {
67 let id = request.id;
68 let request_json = serde_json::to_string(&request)
69 .unwrap_or_else(|_| "Failed to serialize request".to_string());
70
71 tracing::debug!(
72 request_id = ?id,
73 method = ?request.method,
74 json = %request_json,
75 "Received request"
76 );
77
78 match service.call(request).await {
79 Ok(resp) => resp,
80 Err(e) => {
81 let error_msg = e.into().to_string();
82 tracing::error!(error = %error_msg, "Request processing failed");
83 JsonRpcResponse {
84 jsonrpc: "2.0".to_string(),
85 id,
86 result: None,
87 error: Some(ErrorData {
88 code: INTERNAL_ERROR,
89 message: error_msg,
90 data: None,
91 }),
92 }
93 }
94 }
95 }
96
97 async fn send_response(
98 transport: &mut impl ServerTransport,
99 response: JsonRpcResponse,
100 ) -> Result<()> {
101 let response_json = serde_json::to_string(&response)
102 .unwrap_or_else(|_| "Failed to serialize response".to_string());
103
104 tracing::debug!(
105 response_id = ?response.id,
106 json = %response_json,
107 "Sending response"
108 );
109
110 transport
111 .write_message(JsonRpcMessage::Response(response))
112 .await
113 }
114
115 async fn handle_error(transport: &mut impl ServerTransport, e: Error) -> Result<()> {
116 let error = match e {
117 Error::Json(_) | Error::InvalidMessage(_) => ErrorData {
118 code: PARSE_ERROR,
119 message: e.to_string(),
120 data: None,
121 },
122 Error::Protocol(_) => ErrorData {
123 code: INVALID_REQUEST,
124 message: e.to_string(),
125 data: None,
126 },
127 _ => ErrorData {
128 code: INTERNAL_ERROR,
129 message: e.to_string(),
130 data: None,
131 },
132 };
133
134 let error_response = JsonRpcMessage::Error(JsonRpcError {
135 jsonrpc: "2.0".to_string(),
136 id: None,
137 error,
138 });
139
140 transport.write_message(error_response).await
141 }
142}