1use crate::error::ProtocolError;
4use crate::frame::Frame;
5use crate::message::{Request, Response};
6use bytes::{Bytes, BytesMut};
7
8pub struct Encoder;
10
11impl Encoder {
12 pub fn encode_request(request: &Request) -> Result<BytesMut, ProtocolError> {
14 let frame = Frame::from_json(request)?;
15 frame.encode()
16 }
17
18 pub fn encode_response(response: &Response) -> Result<BytesMut, ProtocolError> {
20 let frame = Frame::from_json(response)?;
21 frame.encode()
22 }
23
24 pub fn encode_json<T: serde::Serialize>(value: &T) -> Result<BytesMut, ProtocolError> {
26 let frame = Frame::from_json(value)?;
27 frame.encode()
28 }
29}
30
31pub struct Decoder {
33 buffer: BytesMut,
34}
35
36impl Decoder {
37 pub fn new() -> Self {
38 Self {
39 buffer: BytesMut::with_capacity(8192),
40 }
41 }
42
43 pub fn extend(&mut self, data: &[u8]) {
45 self.buffer.extend_from_slice(data);
46 }
47
48 pub fn extend_bytes(&mut self, data: Bytes) {
50 self.buffer.extend_from_slice(&data);
51 }
52
53 pub fn decode_frame(&mut self) -> Result<Option<Frame>, ProtocolError> {
55 Frame::decode(&mut self.buffer)
56 }
57
58 pub fn decode_request(&mut self) -> Result<Option<Request>, ProtocolError> {
60 match self.decode_frame()? {
61 Some(frame) => {
62 let payload =
63 std::str::from_utf8(&frame.payload).map_err(|_| ProtocolError::InvalidUtf8)?;
64 let request: Request = serde_json::from_str(payload)?;
65 Ok(Some(request))
66 }
67 None => Ok(None),
68 }
69 }
70
71 pub fn decode_response(&mut self) -> Result<Option<Response>, ProtocolError> {
73 match self.decode_frame()? {
74 Some(frame) => {
75 let payload =
76 std::str::from_utf8(&frame.payload).map_err(|_| ProtocolError::InvalidUtf8)?;
77 let response: Response = serde_json::from_str(payload)?;
78 Ok(Some(response))
79 }
80 None => Ok(None),
81 }
82 }
83
84 pub fn buffered(&self) -> usize {
86 self.buffer.len()
87 }
88
89 pub fn clear(&mut self) {
91 self.buffer.clear();
92 }
93}
94
95impl Default for Decoder {
96 fn default() -> Self {
97 Self::new()
98 }
99}
100
101pub mod jsonl {
103 use super::*;
104
105 pub fn encode<T: serde::Serialize>(value: &T) -> Result<Vec<u8>, ProtocolError> {
107 let mut bytes = serde_json::to_vec(value)?;
108 bytes.push(b'\n');
109 Ok(bytes)
110 }
111
112 pub struct LineDecoder {
114 buffer: Vec<u8>,
115 }
116
117 impl LineDecoder {
118 pub fn new() -> Self {
119 Self {
120 buffer: Vec::with_capacity(4096),
121 }
122 }
123
124 pub fn extend(&mut self, data: &[u8]) {
125 self.buffer.extend_from_slice(data);
126 }
127
128 pub fn decode_line<T: serde::de::DeserializeOwned>(
130 &mut self,
131 ) -> Result<Option<T>, ProtocolError> {
132 if let Some(pos) = self.buffer.iter().position(|&b| b == b'\n') {
133 let line = self.buffer.drain(..=pos).collect::<Vec<_>>();
134 let json = std::str::from_utf8(&line[..line.len() - 1])
135 .map_err(|_| ProtocolError::InvalidUtf8)?;
136 let value: T = serde_json::from_str(json)?;
137 Ok(Some(value))
138 } else {
139 Ok(None)
140 }
141 }
142 }
143
144 impl Default for LineDecoder {
145 fn default() -> Self {
146 Self::new()
147 }
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use crate::message::Operation;
155
156 #[test]
157 fn test_encoder_decoder_roundtrip() {
158 let request = Request::new("42", Operation::Ping);
159 let encoded = Encoder::encode_request(&request).unwrap();
160
161 let mut decoder = Decoder::new();
162 decoder.extend(&encoded);
163
164 let decoded = decoder.decode_request().unwrap().unwrap();
165 assert_eq!(decoded.id, "42");
166 assert_eq!(decoded.op, Operation::Ping);
167 }
168
169 #[test]
170 fn test_jsonl_roundtrip() {
171 let request = Request::new("1", Operation::Info);
172 let encoded = jsonl::encode(&request).unwrap();
173
174 let mut decoder = jsonl::LineDecoder::new();
175 decoder.extend(&encoded);
176
177 let decoded: Request = decoder.decode_line().unwrap().unwrap();
178 assert_eq!(decoded.id, "1");
179 assert_eq!(decoded.op, Operation::Info);
180 }
181
182 #[test]
183 fn test_partial_frame_decoding() {
184 let request = Request::new("1", Operation::Ping);
185 let encoded = Encoder::encode_request(&request).unwrap();
186
187 let mut decoder = Decoder::new();
188
189 decoder.extend(&encoded[..10]);
191 assert!(decoder.decode_request().unwrap().is_none());
192
193 decoder.extend(&encoded[10..]);
195 let decoded = decoder.decode_request().unwrap().unwrap();
196 assert_eq!(decoded.id, "1");
197 }
198
199 #[test]
200 fn test_encode_response() {
201 use crate::message::{Response, ResponseStatus};
202
203 let response = Response::ok("req-1", serde_json::json!({"pong": true}));
204 let encoded = Encoder::encode_response(&response).unwrap();
205
206 let mut decoder = Decoder::new();
207 decoder.extend(&encoded);
208 let decoded = decoder.decode_response().unwrap().unwrap();
209
210 assert_eq!(decoded.id, "req-1");
211 assert_eq!(decoded.status, ResponseStatus::Ok);
212 }
213
214 #[test]
215 fn test_decoder_buffered() {
216 let mut decoder = Decoder::new();
217 assert_eq!(decoder.buffered(), 0);
218
219 decoder.extend(b"some data");
220 assert_eq!(decoder.buffered(), 9);
221
222 decoder.clear();
223 assert_eq!(decoder.buffered(), 0);
224 }
225
226 #[test]
227 fn test_decoder_extend_bytes() {
228 use bytes::Bytes;
229
230 let request = Request::new("1", Operation::Ping);
231 let encoded = Encoder::encode_request(&request).unwrap();
232
233 let mut decoder = Decoder::new();
234 decoder.extend_bytes(Bytes::from(encoded.to_vec()));
235
236 let decoded = decoder.decode_request().unwrap().unwrap();
237 assert_eq!(decoded.id, "1");
238 }
239
240 #[test]
241 fn test_jsonl_multiple_lines() {
242 let req1 = Request::new("1", Operation::Ping);
243 let req2 = Request::new("2", Operation::Info);
244
245 let mut data = jsonl::encode(&req1).unwrap();
246 data.extend(jsonl::encode(&req2).unwrap());
247
248 let mut decoder = jsonl::LineDecoder::new();
249 decoder.extend(&data);
250
251 let decoded1: Request = decoder.decode_line().unwrap().unwrap();
252 assert_eq!(decoded1.id, "1");
253
254 let decoded2: Request = decoder.decode_line().unwrap().unwrap();
255 assert_eq!(decoded2.id, "2");
256 }
257
258 #[test]
259 fn test_jsonl_partial_line() {
260 let mut decoder = jsonl::LineDecoder::new();
261 decoder.extend(b"{\"type\":\"request\"");
262
263 let result: Result<Option<Request>, _> = decoder.decode_line();
265 assert!(result.unwrap().is_none());
266
267 decoder.extend(b",\"id\":\"1\",\"op\":\"PING\",\"params\":{}}\n");
269 let decoded: Request = decoder.decode_line().unwrap().unwrap();
270 assert_eq!(decoded.id, "1");
271 }
272
273 #[test]
274 fn test_encode_json_generic() {
275 #[derive(serde::Serialize)]
276 struct CustomMsg {
277 action: String,
278 }
279
280 let msg = CustomMsg {
281 action: "test".to_string(),
282 };
283 let encoded = Encoder::encode_json(&msg).unwrap();
284 assert!(!encoded.is_empty());
285 }
286
287 #[test]
288 fn test_decoder_default() {
289 let decoder = Decoder::default();
290 assert_eq!(decoder.buffered(), 0);
291 }
292
293 #[test]
294 fn test_jsonl_decoder_default() {
295 let decoder = jsonl::LineDecoder::default();
296 drop(decoder);
298 }
299}