mcp_server_fishcode2025/
lib.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures::{Future, Stream};
7use mcp_core_fishcode2025::protocol::{
8    JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse,
9};
10use pin_project::pin_project;
11use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
12use tower_service::Service;
13
14mod errors;
15pub use errors::{BoxError, RouterError, ServerError, TransportError};
16
17pub mod router;
18pub use router::Router;
19
20/// A transport layer that handles JSON-RPC messages over byte
21#[pin_project]
22pub struct ByteTransport<R, W> {
23    // Reader is a BufReader on the underlying stream (stdin or similar) buffering
24    // the underlying data across poll calls, we clear one line (\n) during each
25    // iteration of poll_next from this buffer
26    #[pin]
27    reader: BufReader<R>,
28    #[pin]
29    writer: W,
30}
31
32impl<R, W> ByteTransport<R, W>
33where
34    R: AsyncRead,
35    W: AsyncWrite,
36{
37    pub fn new(reader: R, writer: W) -> Self {
38        Self {
39            // Default BufReader capacity is 8 * 1024, increase this to 2MB to the file size limit
40            // allows the buffer to have the capacity to read very large calls
41            reader: BufReader::with_capacity(2 * 1024 * 1024, reader),
42            writer,
43        }
44    }
45}
46
47impl<R, W> Stream for ByteTransport<R, W>
48where
49    R: AsyncRead + Unpin,
50    W: AsyncWrite + Unpin,
51{
52    type Item = Result<JsonRpcMessage, TransportError>;
53
54    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55        let mut this = self.project();
56        let mut buf = Vec::new();
57
58        let mut reader = this.reader.as_mut();
59        let mut read_future = Box::pin(reader.read_until(b'\n', &mut buf));
60        match read_future.as_mut().poll(cx) {
61            Poll::Ready(Ok(0)) => Poll::Ready(None), // EOF
62            Poll::Ready(Ok(_)) => {
63                // Convert to UTF-8 string
64                let line = match String::from_utf8(buf) {
65                    Ok(s) => s,
66                    Err(e) => return Poll::Ready(Some(Err(TransportError::Utf8(e)))),
67                };
68                // Log incoming message here before serde conversion to
69                // track incomplete chunks which are not valid JSON
70                tracing::info!(json = %line, "incoming message");
71
72                // Parse JSON and validate message format
73                match serde_json::from_str::<serde_json::Value>(&line) {
74                    Ok(value) => {
75                        // Validate basic JSON-RPC structure
76                        if !value.is_object() {
77                            return Poll::Ready(Some(Err(TransportError::InvalidMessage(
78                                "Message must be a JSON object".into(),
79                            ))));
80                        }
81                        let obj = value.as_object().unwrap(); // Safe due to check above
82
83                        // Check jsonrpc version field
84                        if !obj.contains_key("jsonrpc") || obj["jsonrpc"] != "2.0" {
85                            return Poll::Ready(Some(Err(TransportError::InvalidMessage(
86                                "Missing or invalid jsonrpc version".into(),
87                            ))));
88                        }
89
90                        // Now try to parse as proper message
91                        match serde_json::from_value::<JsonRpcMessage>(value) {
92                            Ok(msg) => Poll::Ready(Some(Ok(msg))),
93                            Err(e) => Poll::Ready(Some(Err(TransportError::Json(e)))),
94                        }
95                    }
96                    Err(e) => Poll::Ready(Some(Err(TransportError::Json(e)))),
97                }
98            }
99            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(TransportError::Io(e)))),
100            Poll::Pending => Poll::Pending,
101        }
102    }
103}
104
105impl<R, W> ByteTransport<R, W>
106where
107    R: AsyncRead + Unpin,
108    W: AsyncWrite + Unpin,
109{
110    pub async fn write_message(&mut self, msg: JsonRpcMessage) -> Result<(), std::io::Error> {
111        let json = serde_json::to_string(&msg)?;
112        Pin::new(&mut self.writer)
113            .write_all(json.as_bytes())
114            .await?;
115        Pin::new(&mut self.writer).write_all(b"\n").await?;
116        Pin::new(&mut self.writer).flush().await?;
117        Ok(())
118    }
119}
120
121/// The main server type that processes incoming requests
122pub struct Server<S> {
123    service: S,
124}
125
126impl<S> Server<S>
127where
128    S: Service<JsonRpcRequest, Response = JsonRpcResponse> + Send,
129    S::Error: Into<BoxError>,
130    S::Future: Send,
131{
132    pub fn new(service: S) -> Self {
133        Self { service }
134    }
135
136    // TODO transport trait instead of byte transport if we implement others
137    pub async fn run<R, W>(self, mut transport: ByteTransport<R, W>) -> Result<(), ServerError>
138    where
139        R: AsyncRead + Unpin,
140        W: AsyncWrite + Unpin,
141    {
142        use futures::StreamExt;
143        let mut service = self.service;
144
145        tracing::info!("Server started");
146        while let Some(msg_result) = transport.next().await {
147            let _span = tracing::span!(tracing::Level::INFO, "message_processing");
148            let _enter = _span.enter();
149            match msg_result {
150                Ok(msg) => {
151                    match msg {
152                        JsonRpcMessage::Request(request) => {
153                            // Serialize request for logging
154                            let id = request.id;
155                            let request_json = serde_json::to_string(&request)
156                                .unwrap_or_else(|_| "Failed to serialize request".to_string());
157
158                            tracing::info!(
159                                request_id = ?id,
160                                method = ?request.method,
161                                json = %request_json,
162                                "Received request"
163                            );
164
165                            // Process the request using our service
166                            let response = match service.call(request).await {
167                                Ok(resp) => resp,
168                                Err(e) => {
169                                    let error_msg = e.into().to_string();
170                                    tracing::error!(error = %error_msg, "Request processing failed");
171                                    JsonRpcResponse {
172                                        jsonrpc: "2.0".to_string(),
173                                        id,
174                                        result: None,
175                                        error: Some(mcp_core_fishcode2025::protocol::ErrorData {
176                                            code: mcp_core_fishcode2025::protocol::INTERNAL_ERROR,
177                                            message: error_msg,
178                                            data: None,
179                                        }),
180                                    }
181                                }
182                            };
183
184                            // Serialize response for logging
185                            let response_json = serde_json::to_string(&response)
186                                .unwrap_or_else(|_| "Failed to serialize response".to_string());
187
188                            tracing::info!(
189                                response_id = ?response.id,
190                                json = %response_json,
191                                "Sending response"
192                            );
193                            // Send the response back
194                            if let Err(e) = transport
195                                .write_message(JsonRpcMessage::Response(response))
196                                .await
197                            {
198                                return Err(ServerError::Transport(TransportError::Io(e)));
199                            }
200                        }
201                        JsonRpcMessage::Response(_)
202                        | JsonRpcMessage::Notification(_)
203                        | JsonRpcMessage::Nil
204                        | JsonRpcMessage::Error(_) => {
205                            // Ignore responses, notifications and nil messages for now
206                            continue;
207                        }
208                    }
209                }
210                Err(e) => {
211                    // Convert transport error to JSON-RPC error response
212                    let error = match e {
213                        TransportError::Json(_) | TransportError::InvalidMessage(_) => {
214                            mcp_core_fishcode2025::protocol::ErrorData {
215                                code: mcp_core_fishcode2025::protocol::PARSE_ERROR,
216                                message: e.to_string(),
217                                data: None,
218                            }
219                        }
220                        TransportError::Protocol(_) => mcp_core_fishcode2025::protocol::ErrorData {
221                            code: mcp_core_fishcode2025::protocol::INVALID_REQUEST,
222                            message: e.to_string(),
223                            data: None,
224                        },
225                        _ => mcp_core_fishcode2025::protocol::ErrorData {
226                            code: mcp_core_fishcode2025::protocol::INTERNAL_ERROR,
227                            message: e.to_string(),
228                            data: None,
229                        },
230                    };
231
232                    let error_response = JsonRpcMessage::Error(JsonRpcError {
233                        jsonrpc: "2.0".to_string(),
234                        id: None,
235                        error,
236                    });
237
238                    if let Err(e) = transport.write_message(error_response).await {
239                        return Err(ServerError::Transport(TransportError::Io(e)));
240                    }
241                }
242            }
243        }
244
245        Ok(())
246    }
247}
248
249// Define a specific service implementation that we need for any
250// Any router implements this
251pub trait BoundedService:
252    Service<
253        JsonRpcRequest,
254        Response = JsonRpcResponse,
255        Error = BoxError,
256        Future = Pin<Box<dyn Future<Output = Result<JsonRpcResponse, BoxError>> + Send>>,
257    > + Send
258    + 'static
259{
260}
261
262// Implement it for any type that meets the bounds
263impl<T> BoundedService for T where
264    T: Service<
265            JsonRpcRequest,
266            Response = JsonRpcResponse,
267            Error = BoxError,
268            Future = Pin<Box<dyn Future<Output = Result<JsonRpcResponse, BoxError>> + Send>>,
269        > + Send
270        + 'static
271{
272}