use bytes::{BufMut, BytesMut};
use tokio_util::codec::{Decoder, Encoder};
use tracing::{debug, error};
use crate::{
error::{MCPError, Result},
schema::{JSONRPCMessage, JSONRPCNotification, JSONRPCRequest, JSONRPCResponse},
};
pub struct JsonRpcCodec;
impl JsonRpcCodec {
pub fn new() -> Self {
Self
}
}
impl Default for JsonRpcCodec {
fn default() -> Self {
Self::new()
}
}
impl Decoder for JsonRpcCodec {
type Error = MCPError;
type Item = JSONRPCMessage;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
let Some(n) = src.iter().position(|b| *b == b'\n') else {
return Ok(None);
};
let line = src.split_to(n + 1);
if line.len() <= 1 {
return Ok(None);
}
let json_bytes = &line[..line.len() - 1];
debug!(
"Decoding JSON-RPC message: {:?}",
std::str::from_utf8(json_bytes)
);
let message: JSONRPCMessage = serde_json::from_slice(json_bytes).map_err(|e| {
error!("Failed to parse JSON-RPC message: {}", e);
if let Ok(text) = std::str::from_utf8(json_bytes) {
MCPError::InvalidMessageFormat {
message: format!("Invalid JSON: {e} (content: {text})"),
}
} else {
MCPError::InvalidMessageFormat {
message: format!("Invalid JSON: {e} (non-UTF8 content)"),
}
}
})?;
Ok(Some(message))
}
}
impl Encoder<JSONRPCMessage> for JsonRpcCodec {
type Error = MCPError;
fn encode(&mut self, item: JSONRPCMessage, dst: &mut BytesMut) -> Result<()> {
let json = serde_json::to_vec(&item)?;
const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024; if json.len() > MAX_MESSAGE_SIZE {
return Err(MCPError::MessageTooLarge {
size: json.len(),
max_size: MAX_MESSAGE_SIZE,
});
}
dst.reserve(json.len() + 1);
dst.put_slice(&json);
dst.put_u8(b'\n');
debug!("Encoded JSON-RPC message: {:?}", std::str::from_utf8(&json));
Ok(())
}
}
impl Encoder<JSONRPCRequest> for JsonRpcCodec {
type Error = MCPError;
fn encode(&mut self, item: JSONRPCRequest, dst: &mut BytesMut) -> Result<()> {
self.encode(JSONRPCMessage::Request(item), dst)
}
}
impl Encoder<JSONRPCResponse> for JsonRpcCodec {
type Error = MCPError;
fn encode(&mut self, item: JSONRPCResponse, dst: &mut BytesMut) -> Result<()> {
self.encode(JSONRPCMessage::Response(item), dst)
}
}
impl Encoder<JSONRPCNotification> for JsonRpcCodec {
type Error = MCPError;
fn encode(&mut self, item: JSONRPCNotification, dst: &mut BytesMut) -> Result<()> {
self.encode(JSONRPCMessage::Notification(item), dst)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{Request, RequestId, JSONRPC_VERSION};
#[test]
fn test_encode_decode_request() {
let mut codec = JsonRpcCodec::new();
let mut buf = BytesMut::new();
let request = JSONRPCRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::String("test-1".to_string()),
request: Request {
method: "initialize".to_string(),
params: None,
},
};
codec.encode(request.clone(), &mut buf).unwrap();
let decoded = codec.decode(&mut buf).unwrap().unwrap();
match decoded {
JSONRPCMessage::Request(req) => {
assert_eq!(req.id, RequestId::String("test-1".to_string()));
assert_eq!(req.request.method, "initialize");
}
_ => panic!("Expected request message"),
}
}
}