mcp_server_rs/
server.rs

1use mcp_core_rs::protocol::{
2    constants::{INTERNAL_ERROR, INVALID_REQUEST, PARSE_ERROR},
3    error::ErrorData,
4    message::{JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse},
5};
6use mcp_error_rs::{BoxError, Error, Result};
7use mcp_transport_rs::server::traits::ServerTransport;
8use tower_service::Service;
9
10pub struct Server<S> {
11    service: S,
12}
13
14impl<S> Server<S>
15where
16    S: Service<JsonRpcRequest, Response = JsonRpcResponse> + Send,
17    S::Error: Into<BoxError>,
18    S::Future: Send,
19{
20    pub fn new(service: S) -> Self {
21        Self { service }
22    }
23
24    pub async fn run(self, mut transport: impl ServerTransport) -> Result<()> {
25        let mut service = self.service;
26
27        tracing::info!("Server started");
28        while let Some(msg_result) = transport.read_message().await {
29            let _span = tracing::span!(tracing::Level::INFO, "message_processing");
30            let _enter = _span.enter();
31
32            match msg_result {
33                Ok(msg) => {
34                    Self::handle_message(&mut service, &mut transport, msg).await?;
35                }
36                Err(e) => {
37                    Self::handle_error(&mut transport, e).await?;
38                }
39            }
40        }
41        tracing::info!("Server transport closed, exiting run loop");
42
43        Ok(())
44    }
45
46    async fn handle_message(
47        service: &mut S,
48        transport: &mut impl ServerTransport,
49        msg: JsonRpcMessage,
50    ) -> Result<()> {
51        match msg {
52            JsonRpcMessage::Request(request) => {
53                let response = Self::process_request(service, request).await;
54                Self::send_response(transport, response).await?;
55            }
56            JsonRpcMessage::Response(_)
57            | JsonRpcMessage::Notification(_)
58            | JsonRpcMessage::Nil
59            | JsonRpcMessage::Error(_) => {
60                // TODO: Handle other message types
61            }
62        }
63        Ok(())
64    }
65
66    async fn process_request(service: &mut S, request: JsonRpcRequest) -> JsonRpcResponse {
67        let id = request.id;
68        let request_json = serde_json::to_string(&request)
69            .unwrap_or_else(|_| "Failed to serialize request".to_string());
70
71        tracing::debug!(
72            request_id = ?id,
73            method = ?request.method,
74            json = %request_json,
75            "Received request"
76        );
77
78        match service.call(request).await {
79            Ok(resp) => resp,
80            Err(e) => {
81                let error_msg = e.into().to_string();
82                tracing::error!(error = %error_msg, "Request processing failed");
83                JsonRpcResponse {
84                    jsonrpc: "2.0".to_string(),
85                    id,
86                    result: None,
87                    error: Some(ErrorData {
88                        code: INTERNAL_ERROR,
89                        message: error_msg,
90                        data: None,
91                    }),
92                }
93            }
94        }
95    }
96
97    async fn send_response(
98        transport: &mut impl ServerTransport,
99        response: JsonRpcResponse,
100    ) -> Result<()> {
101        let response_json = serde_json::to_string(&response)
102            .unwrap_or_else(|_| "Failed to serialize response".to_string());
103
104        tracing::debug!(
105            response_id = ?response.id,
106            json = %response_json,
107            "Sending response"
108        );
109
110        transport
111            .write_message(JsonRpcMessage::Response(response))
112            .await
113    }
114
115    async fn handle_error(transport: &mut impl ServerTransport, e: Error) -> Result<()> {
116        let error = match e {
117            Error::Json(_) | Error::InvalidMessage(_) => ErrorData {
118                code: PARSE_ERROR,
119                message: e.to_string(),
120                data: None,
121            },
122            Error::Protocol(_) => ErrorData {
123                code: INVALID_REQUEST,
124                message: e.to_string(),
125                data: None,
126            },
127            _ => ErrorData {
128                code: INTERNAL_ERROR,
129                message: e.to_string(),
130                data: None,
131            },
132        };
133
134        let error_response = JsonRpcMessage::Error(JsonRpcError {
135            jsonrpc: "2.0".to_string(),
136            id: None,
137            error,
138        });
139
140        transport.write_message(error_response).await
141    }
142}