Skip to main content

rstmdb_protocol/
codec.rs

1//! Encoder and decoder for RCP frames and messages.
2
3use crate::error::ProtocolError;
4use crate::frame::Frame;
5use crate::message::{Request, Response};
6use bytes::{Bytes, BytesMut};
7
8/// Encodes requests and responses into frames.
9pub struct Encoder;
10
11impl Encoder {
12    /// Encodes a request into a frame.
13    pub fn encode_request(request: &Request) -> Result<BytesMut, ProtocolError> {
14        let frame = Frame::from_json(request)?;
15        frame.encode()
16    }
17
18    /// Encodes a response into a frame.
19    pub fn encode_response(response: &Response) -> Result<BytesMut, ProtocolError> {
20        let frame = Frame::from_json(response)?;
21        frame.encode()
22    }
23
24    /// Encodes any JSON-serializable value into a frame.
25    pub fn encode_json<T: serde::Serialize>(value: &T) -> Result<BytesMut, ProtocolError> {
26        let frame = Frame::from_json(value)?;
27        frame.encode()
28    }
29}
30
31/// Decodes frames into requests and responses.
32pub struct Decoder {
33    buffer: BytesMut,
34}
35
36impl Decoder {
37    pub fn new() -> Self {
38        Self {
39            buffer: BytesMut::with_capacity(8192),
40        }
41    }
42
43    /// Appends data to the internal buffer.
44    pub fn extend(&mut self, data: &[u8]) {
45        self.buffer.extend_from_slice(data);
46    }
47
48    /// Appends bytes to the internal buffer.
49    pub fn extend_bytes(&mut self, data: Bytes) {
50        self.buffer.extend_from_slice(&data);
51    }
52
53    /// Attempts to decode the next frame from the buffer.
54    pub fn decode_frame(&mut self) -> Result<Option<Frame>, ProtocolError> {
55        Frame::decode(&mut self.buffer)
56    }
57
58    /// Attempts to decode the next request from the buffer.
59    pub fn decode_request(&mut self) -> Result<Option<Request>, ProtocolError> {
60        match self.decode_frame()? {
61            Some(frame) => {
62                let payload =
63                    std::str::from_utf8(&frame.payload).map_err(|_| ProtocolError::InvalidUtf8)?;
64                let request: Request = serde_json::from_str(payload)?;
65                Ok(Some(request))
66            }
67            None => Ok(None),
68        }
69    }
70
71    /// Attempts to decode the next response from the buffer.
72    pub fn decode_response(&mut self) -> Result<Option<Response>, ProtocolError> {
73        match self.decode_frame()? {
74            Some(frame) => {
75                let payload =
76                    std::str::from_utf8(&frame.payload).map_err(|_| ProtocolError::InvalidUtf8)?;
77                let response: Response = serde_json::from_str(payload)?;
78                Ok(Some(response))
79            }
80            None => Ok(None),
81        }
82    }
83
84    /// Returns the number of bytes currently buffered.
85    pub fn buffered(&self) -> usize {
86        self.buffer.len()
87    }
88
89    /// Clears the internal buffer.
90    pub fn clear(&mut self) {
91        self.buffer.clear();
92    }
93}
94
95impl Default for Decoder {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101/// Line-delimited JSON codec for debug mode.
102pub mod jsonl {
103    use super::*;
104
105    /// Encodes a value as a JSON line (no framing).
106    pub fn encode<T: serde::Serialize>(value: &T) -> Result<Vec<u8>, ProtocolError> {
107        let mut bytes = serde_json::to_vec(value)?;
108        bytes.push(b'\n');
109        Ok(bytes)
110    }
111
112    /// Line-delimited JSON decoder.
113    pub struct LineDecoder {
114        buffer: Vec<u8>,
115    }
116
117    impl LineDecoder {
118        pub fn new() -> Self {
119            Self {
120                buffer: Vec::with_capacity(4096),
121            }
122        }
123
124        pub fn extend(&mut self, data: &[u8]) {
125            self.buffer.extend_from_slice(data);
126        }
127
128        /// Attempts to decode the next JSON line.
129        pub fn decode_line<T: serde::de::DeserializeOwned>(
130            &mut self,
131        ) -> Result<Option<T>, ProtocolError> {
132            if let Some(pos) = self.buffer.iter().position(|&b| b == b'\n') {
133                let line = self.buffer.drain(..=pos).collect::<Vec<_>>();
134                let json = std::str::from_utf8(&line[..line.len() - 1])
135                    .map_err(|_| ProtocolError::InvalidUtf8)?;
136                let value: T = serde_json::from_str(json)?;
137                Ok(Some(value))
138            } else {
139                Ok(None)
140            }
141        }
142    }
143
144    impl Default for LineDecoder {
145        fn default() -> Self {
146            Self::new()
147        }
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use crate::message::Operation;
155
156    #[test]
157    fn test_encoder_decoder_roundtrip() {
158        let request = Request::new("42", Operation::Ping);
159        let encoded = Encoder::encode_request(&request).unwrap();
160
161        let mut decoder = Decoder::new();
162        decoder.extend(&encoded);
163
164        let decoded = decoder.decode_request().unwrap().unwrap();
165        assert_eq!(decoded.id, "42");
166        assert_eq!(decoded.op, Operation::Ping);
167    }
168
169    #[test]
170    fn test_jsonl_roundtrip() {
171        let request = Request::new("1", Operation::Info);
172        let encoded = jsonl::encode(&request).unwrap();
173
174        let mut decoder = jsonl::LineDecoder::new();
175        decoder.extend(&encoded);
176
177        let decoded: Request = decoder.decode_line().unwrap().unwrap();
178        assert_eq!(decoded.id, "1");
179        assert_eq!(decoded.op, Operation::Info);
180    }
181
182    #[test]
183    fn test_partial_frame_decoding() {
184        let request = Request::new("1", Operation::Ping);
185        let encoded = Encoder::encode_request(&request).unwrap();
186
187        let mut decoder = Decoder::new();
188
189        // Feed partial data
190        decoder.extend(&encoded[..10]);
191        assert!(decoder.decode_request().unwrap().is_none());
192
193        // Feed the rest
194        decoder.extend(&encoded[10..]);
195        let decoded = decoder.decode_request().unwrap().unwrap();
196        assert_eq!(decoded.id, "1");
197    }
198
199    #[test]
200    fn test_encode_response() {
201        use crate::message::{Response, ResponseStatus};
202
203        let response = Response::ok("req-1", serde_json::json!({"pong": true}));
204        let encoded = Encoder::encode_response(&response).unwrap();
205
206        let mut decoder = Decoder::new();
207        decoder.extend(&encoded);
208        let decoded = decoder.decode_response().unwrap().unwrap();
209
210        assert_eq!(decoded.id, "req-1");
211        assert_eq!(decoded.status, ResponseStatus::Ok);
212    }
213
214    #[test]
215    fn test_decoder_buffered() {
216        let mut decoder = Decoder::new();
217        assert_eq!(decoder.buffered(), 0);
218
219        decoder.extend(b"some data");
220        assert_eq!(decoder.buffered(), 9);
221
222        decoder.clear();
223        assert_eq!(decoder.buffered(), 0);
224    }
225
226    #[test]
227    fn test_decoder_extend_bytes() {
228        use bytes::Bytes;
229
230        let request = Request::new("1", Operation::Ping);
231        let encoded = Encoder::encode_request(&request).unwrap();
232
233        let mut decoder = Decoder::new();
234        decoder.extend_bytes(Bytes::from(encoded.to_vec()));
235
236        let decoded = decoder.decode_request().unwrap().unwrap();
237        assert_eq!(decoded.id, "1");
238    }
239
240    #[test]
241    fn test_jsonl_multiple_lines() {
242        let req1 = Request::new("1", Operation::Ping);
243        let req2 = Request::new("2", Operation::Info);
244
245        let mut data = jsonl::encode(&req1).unwrap();
246        data.extend(jsonl::encode(&req2).unwrap());
247
248        let mut decoder = jsonl::LineDecoder::new();
249        decoder.extend(&data);
250
251        let decoded1: Request = decoder.decode_line().unwrap().unwrap();
252        assert_eq!(decoded1.id, "1");
253
254        let decoded2: Request = decoder.decode_line().unwrap().unwrap();
255        assert_eq!(decoded2.id, "2");
256    }
257
258    #[test]
259    fn test_jsonl_partial_line() {
260        let mut decoder = jsonl::LineDecoder::new();
261        decoder.extend(b"{\"type\":\"request\"");
262
263        // Not complete yet
264        let result: Result<Option<Request>, _> = decoder.decode_line();
265        assert!(result.unwrap().is_none());
266
267        // Complete the line
268        decoder.extend(b",\"id\":\"1\",\"op\":\"PING\",\"params\":{}}\n");
269        let decoded: Request = decoder.decode_line().unwrap().unwrap();
270        assert_eq!(decoded.id, "1");
271    }
272
273    #[test]
274    fn test_encode_json_generic() {
275        #[derive(serde::Serialize)]
276        struct CustomMsg {
277            action: String,
278        }
279
280        let msg = CustomMsg {
281            action: "test".to_string(),
282        };
283        let encoded = Encoder::encode_json(&msg).unwrap();
284        assert!(!encoded.is_empty());
285    }
286
287    #[test]
288    fn test_decoder_default() {
289        let decoder = Decoder::default();
290        assert_eq!(decoder.buffered(), 0);
291    }
292
293    #[test]
294    fn test_jsonl_decoder_default() {
295        let decoder = jsonl::LineDecoder::default();
296        // Just verify it creates successfully
297        drop(decoder);
298    }
299}