Skip to main content

tenx_mcp/
codec.rs

1use bytes::{BufMut, BytesMut};
2use tokio_util::codec::{Decoder, Encoder};
3use tracing::{debug, error};
4
5use crate::{
6    error::{MCPError, Result},
7    schema::{JSONRPCMessage, JSONRPCNotification, JSONRPCRequest, JSONRPCResponse},
8};
9
10/// JSON-RPC codec for encoding/decoding messages over a stream
11/// Uses newline-delimited JSON format
12pub struct JsonRpcCodec;
13
14impl JsonRpcCodec {
15    pub fn new() -> Self {
16        Self
17    }
18}
19
20impl Default for JsonRpcCodec {
21    fn default() -> Self {
22        Self::new()
23    }
24}
25
26impl Decoder for JsonRpcCodec {
27    type Error = MCPError;
28    type Item = JSONRPCMessage;
29
30    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
31        // Look for newline delimiter
32        let Some(n) = src.iter().position(|b| *b == b'\n') else {
33            // Not enough data
34            return Ok(None);
35        };
36
37        // Split off the line including the newline
38        let line = src.split_to(n + 1);
39
40        // Skip empty lines
41        if line.len() <= 1 {
42            return Ok(None);
43        }
44
45        // Parse JSON, excluding the trailing newline
46        let json_bytes = &line[..line.len() - 1];
47
48        debug!(
49            "Decoding JSON-RPC message: {:?}",
50            std::str::from_utf8(json_bytes)
51        );
52
53        let message: JSONRPCMessage = serde_json::from_slice(json_bytes).map_err(|e| {
54            error!("Failed to parse JSON-RPC message: {}", e);
55            if let Ok(text) = std::str::from_utf8(json_bytes) {
56                MCPError::InvalidMessageFormat {
57                    message: format!("Invalid JSON: {e} (content: {text})"),
58                }
59            } else {
60                MCPError::InvalidMessageFormat {
61                    message: format!("Invalid JSON: {e} (non-UTF8 content)"),
62                }
63            }
64        })?;
65        Ok(Some(message))
66    }
67}
68
69impl Encoder<JSONRPCMessage> for JsonRpcCodec {
70    type Error = MCPError;
71
72    fn encode(&mut self, item: JSONRPCMessage, dst: &mut BytesMut) -> Result<()> {
73        let json = serde_json::to_vec(&item)?;
74
75        // Check message size
76        const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024; // 10MB
77        if json.len() > MAX_MESSAGE_SIZE {
78            return Err(MCPError::MessageTooLarge {
79                size: json.len(),
80                max_size: MAX_MESSAGE_SIZE,
81            });
82        }
83
84        dst.reserve(json.len() + 1);
85        dst.put_slice(&json);
86        dst.put_u8(b'\n');
87
88        debug!("Encoded JSON-RPC message: {:?}", std::str::from_utf8(&json));
89
90        Ok(())
91    }
92}
93
94impl Encoder<JSONRPCRequest> for JsonRpcCodec {
95    type Error = MCPError;
96
97    fn encode(&mut self, item: JSONRPCRequest, dst: &mut BytesMut) -> Result<()> {
98        self.encode(JSONRPCMessage::Request(item), dst)
99    }
100}
101
102impl Encoder<JSONRPCResponse> for JsonRpcCodec {
103    type Error = MCPError;
104
105    fn encode(&mut self, item: JSONRPCResponse, dst: &mut BytesMut) -> Result<()> {
106        self.encode(JSONRPCMessage::Response(item), dst)
107    }
108}
109
110impl Encoder<JSONRPCNotification> for JsonRpcCodec {
111    type Error = MCPError;
112
113    fn encode(&mut self, item: JSONRPCNotification, dst: &mut BytesMut) -> Result<()> {
114        self.encode(JSONRPCMessage::Notification(item), dst)
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121    use crate::schema::{Request, RequestId, JSONRPC_VERSION};
122
123    #[test]
124    fn test_encode_decode_request() {
125        let mut codec = JsonRpcCodec::new();
126        let mut buf = BytesMut::new();
127
128        let request = JSONRPCRequest {
129            jsonrpc: JSONRPC_VERSION.to_string(),
130            id: RequestId::String("test-1".to_string()),
131            request: Request {
132                method: "initialize".to_string(),
133                params: None,
134            },
135        };
136
137        // Encode
138        codec.encode(request.clone(), &mut buf).unwrap();
139
140        // Decode
141        let decoded = codec.decode(&mut buf).unwrap().unwrap();
142
143        match decoded {
144            JSONRPCMessage::Request(req) => {
145                assert_eq!(req.id, RequestId::String("test-1".to_string()));
146                assert_eq!(req.request.method, "initialize");
147            }
148            _ => panic!("Expected request message"),
149        }
150    }
151}