1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures::{Future, Stream};
7use mcp_core_fishcode2025::protocol::{
8 JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse,
9};
10use pin_project::pin_project;
11use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
12use tower_service::Service;
13
14mod errors;
15pub use errors::{BoxError, RouterError, ServerError, TransportError};
16
17pub mod router;
18pub use router::Router;
19
20#[pin_project]
22pub struct ByteTransport<R, W> {
23 #[pin]
27 reader: BufReader<R>,
28 #[pin]
29 writer: W,
30}
31
32impl<R, W> ByteTransport<R, W>
33where
34 R: AsyncRead,
35 W: AsyncWrite,
36{
37 pub fn new(reader: R, writer: W) -> Self {
38 Self {
39 reader: BufReader::with_capacity(2 * 1024 * 1024, reader),
42 writer,
43 }
44 }
45}
46
47impl<R, W> Stream for ByteTransport<R, W>
48where
49 R: AsyncRead + Unpin,
50 W: AsyncWrite + Unpin,
51{
52 type Item = Result<JsonRpcMessage, TransportError>;
53
54 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55 let mut this = self.project();
56 let mut buf = Vec::new();
57
58 let mut reader = this.reader.as_mut();
59 let mut read_future = Box::pin(reader.read_until(b'\n', &mut buf));
60 match read_future.as_mut().poll(cx) {
61 Poll::Ready(Ok(0)) => Poll::Ready(None), Poll::Ready(Ok(_)) => {
63 let line = match String::from_utf8(buf) {
65 Ok(s) => s,
66 Err(e) => return Poll::Ready(Some(Err(TransportError::Utf8(e)))),
67 };
68 tracing::info!(json = %line, "incoming message");
71
72 match serde_json::from_str::<serde_json::Value>(&line) {
74 Ok(value) => {
75 if !value.is_object() {
77 return Poll::Ready(Some(Err(TransportError::InvalidMessage(
78 "Message must be a JSON object".into(),
79 ))));
80 }
81 let obj = value.as_object().unwrap(); if !obj.contains_key("jsonrpc") || obj["jsonrpc"] != "2.0" {
85 return Poll::Ready(Some(Err(TransportError::InvalidMessage(
86 "Missing or invalid jsonrpc version".into(),
87 ))));
88 }
89
90 match serde_json::from_value::<JsonRpcMessage>(value) {
92 Ok(msg) => Poll::Ready(Some(Ok(msg))),
93 Err(e) => Poll::Ready(Some(Err(TransportError::Json(e)))),
94 }
95 }
96 Err(e) => Poll::Ready(Some(Err(TransportError::Json(e)))),
97 }
98 }
99 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(TransportError::Io(e)))),
100 Poll::Pending => Poll::Pending,
101 }
102 }
103}
104
105impl<R, W> ByteTransport<R, W>
106where
107 R: AsyncRead + Unpin,
108 W: AsyncWrite + Unpin,
109{
110 pub async fn write_message(&mut self, msg: JsonRpcMessage) -> Result<(), std::io::Error> {
111 let json = serde_json::to_string(&msg)?;
112 Pin::new(&mut self.writer)
113 .write_all(json.as_bytes())
114 .await?;
115 Pin::new(&mut self.writer).write_all(b"\n").await?;
116 Pin::new(&mut self.writer).flush().await?;
117 Ok(())
118 }
119}
120
121pub struct Server<S> {
123 service: S,
124}
125
126impl<S> Server<S>
127where
128 S: Service<JsonRpcRequest, Response = JsonRpcResponse> + Send,
129 S::Error: Into<BoxError>,
130 S::Future: Send,
131{
132 pub fn new(service: S) -> Self {
133 Self { service }
134 }
135
136 pub async fn run<R, W>(self, mut transport: ByteTransport<R, W>) -> Result<(), ServerError>
138 where
139 R: AsyncRead + Unpin,
140 W: AsyncWrite + Unpin,
141 {
142 use futures::StreamExt;
143 let mut service = self.service;
144
145 tracing::info!("Server started");
146 while let Some(msg_result) = transport.next().await {
147 let _span = tracing::span!(tracing::Level::INFO, "message_processing");
148 let _enter = _span.enter();
149 match msg_result {
150 Ok(msg) => {
151 match msg {
152 JsonRpcMessage::Request(request) => {
153 let id = request.id;
155 let request_json = serde_json::to_string(&request)
156 .unwrap_or_else(|_| "Failed to serialize request".to_string());
157
158 tracing::info!(
159 request_id = ?id,
160 method = ?request.method,
161 json = %request_json,
162 "Received request"
163 );
164
165 let response = match service.call(request).await {
167 Ok(resp) => resp,
168 Err(e) => {
169 let error_msg = e.into().to_string();
170 tracing::error!(error = %error_msg, "Request processing failed");
171 JsonRpcResponse {
172 jsonrpc: "2.0".to_string(),
173 id,
174 result: None,
175 error: Some(mcp_core_fishcode2025::protocol::ErrorData {
176 code: mcp_core_fishcode2025::protocol::INTERNAL_ERROR,
177 message: error_msg,
178 data: None,
179 }),
180 }
181 }
182 };
183
184 let response_json = serde_json::to_string(&response)
186 .unwrap_or_else(|_| "Failed to serialize response".to_string());
187
188 tracing::info!(
189 response_id = ?response.id,
190 json = %response_json,
191 "Sending response"
192 );
193 if let Err(e) = transport
195 .write_message(JsonRpcMessage::Response(response))
196 .await
197 {
198 return Err(ServerError::Transport(TransportError::Io(e)));
199 }
200 }
201 JsonRpcMessage::Response(_)
202 | JsonRpcMessage::Notification(_)
203 | JsonRpcMessage::Nil
204 | JsonRpcMessage::Error(_) => {
205 continue;
207 }
208 }
209 }
210 Err(e) => {
211 let error = match e {
213 TransportError::Json(_) | TransportError::InvalidMessage(_) => {
214 mcp_core_fishcode2025::protocol::ErrorData {
215 code: mcp_core_fishcode2025::protocol::PARSE_ERROR,
216 message: e.to_string(),
217 data: None,
218 }
219 }
220 TransportError::Protocol(_) => mcp_core_fishcode2025::protocol::ErrorData {
221 code: mcp_core_fishcode2025::protocol::INVALID_REQUEST,
222 message: e.to_string(),
223 data: None,
224 },
225 _ => mcp_core_fishcode2025::protocol::ErrorData {
226 code: mcp_core_fishcode2025::protocol::INTERNAL_ERROR,
227 message: e.to_string(),
228 data: None,
229 },
230 };
231
232 let error_response = JsonRpcMessage::Error(JsonRpcError {
233 jsonrpc: "2.0".to_string(),
234 id: None,
235 error,
236 });
237
238 if let Err(e) = transport.write_message(error_response).await {
239 return Err(ServerError::Transport(TransportError::Io(e)));
240 }
241 }
242 }
243 }
244
245 Ok(())
246 }
247}
248
249pub trait BoundedService:
252 Service<
253 JsonRpcRequest,
254 Response = JsonRpcResponse,
255 Error = BoxError,
256 Future = Pin<Box<dyn Future<Output = Result<JsonRpcResponse, BoxError>> + Send>>,
257 > + Send
258 + 'static
259{
260}
261
262impl<T> BoundedService for T where
264 T: Service<
265 JsonRpcRequest,
266 Response = JsonRpcResponse,
267 Error = BoxError,
268 Future = Pin<Box<dyn Future<Output = Result<JsonRpcResponse, BoxError>> + Send>>,
269 > + Send
270 + 'static
271{
272}