1use bytes::{BufMut, BytesMut};
2use tokio_util::codec::{Decoder, Encoder};
3use tracing::{debug, error};
4
5use crate::{
6 error::{MCPError, Result},
7 schema::{JSONRPCMessage, JSONRPCNotification, JSONRPCRequest, JSONRPCResponse},
8};
9
10pub struct JsonRpcCodec;
13
14impl JsonRpcCodec {
15 pub fn new() -> Self {
16 Self
17 }
18}
19
20impl Default for JsonRpcCodec {
21 fn default() -> Self {
22 Self::new()
23 }
24}
25
26impl Decoder for JsonRpcCodec {
27 type Error = MCPError;
28 type Item = JSONRPCMessage;
29
30 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
31 let Some(n) = src.iter().position(|b| *b == b'\n') else {
33 return Ok(None);
35 };
36
37 let line = src.split_to(n + 1);
39
40 if line.len() <= 1 {
42 return Ok(None);
43 }
44
45 let json_bytes = &line[..line.len() - 1];
47
48 debug!(
49 "Decoding JSON-RPC message: {:?}",
50 std::str::from_utf8(json_bytes)
51 );
52
53 let message: JSONRPCMessage = serde_json::from_slice(json_bytes).map_err(|e| {
54 error!("Failed to parse JSON-RPC message: {}", e);
55 if let Ok(text) = std::str::from_utf8(json_bytes) {
56 MCPError::InvalidMessageFormat {
57 message: format!("Invalid JSON: {e} (content: {text})"),
58 }
59 } else {
60 MCPError::InvalidMessageFormat {
61 message: format!("Invalid JSON: {e} (non-UTF8 content)"),
62 }
63 }
64 })?;
65 Ok(Some(message))
66 }
67}
68
69impl Encoder<JSONRPCMessage> for JsonRpcCodec {
70 type Error = MCPError;
71
72 fn encode(&mut self, item: JSONRPCMessage, dst: &mut BytesMut) -> Result<()> {
73 let json = serde_json::to_vec(&item)?;
74
75 const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024; if json.len() > MAX_MESSAGE_SIZE {
78 return Err(MCPError::MessageTooLarge {
79 size: json.len(),
80 max_size: MAX_MESSAGE_SIZE,
81 });
82 }
83
84 dst.reserve(json.len() + 1);
85 dst.put_slice(&json);
86 dst.put_u8(b'\n');
87
88 debug!("Encoded JSON-RPC message: {:?}", std::str::from_utf8(&json));
89
90 Ok(())
91 }
92}
93
94impl Encoder<JSONRPCRequest> for JsonRpcCodec {
95 type Error = MCPError;
96
97 fn encode(&mut self, item: JSONRPCRequest, dst: &mut BytesMut) -> Result<()> {
98 self.encode(JSONRPCMessage::Request(item), dst)
99 }
100}
101
102impl Encoder<JSONRPCResponse> for JsonRpcCodec {
103 type Error = MCPError;
104
105 fn encode(&mut self, item: JSONRPCResponse, dst: &mut BytesMut) -> Result<()> {
106 self.encode(JSONRPCMessage::Response(item), dst)
107 }
108}
109
110impl Encoder<JSONRPCNotification> for JsonRpcCodec {
111 type Error = MCPError;
112
113 fn encode(&mut self, item: JSONRPCNotification, dst: &mut BytesMut) -> Result<()> {
114 self.encode(JSONRPCMessage::Notification(item), dst)
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121 use crate::schema::{Request, RequestId, JSONRPC_VERSION};
122
123 #[test]
124 fn test_encode_decode_request() {
125 let mut codec = JsonRpcCodec::new();
126 let mut buf = BytesMut::new();
127
128 let request = JSONRPCRequest {
129 jsonrpc: JSONRPC_VERSION.to_string(),
130 id: RequestId::String("test-1".to_string()),
131 request: Request {
132 method: "initialize".to_string(),
133 params: None,
134 },
135 };
136
137 codec.encode(request.clone(), &mut buf).unwrap();
139
140 let decoded = codec.decode(&mut buf).unwrap().unwrap();
142
143 match decoded {
144 JSONRPCMessage::Request(req) => {
145 assert_eq!(req.id, RequestId::String("test-1".to_string()));
146 assert_eq!(req.request.method, "initialize");
147 }
148 _ => panic!("Expected request message"),
149 }
150 }
151}