tetsy_jsonrpc_server_utils/
stream_codec.rs1use bytes::BytesMut;
2use std::{io, str};
3use tokio_codec::{Decoder, Encoder};
4
5#[derive(Debug, Clone)]
7pub enum Separator {
8 Empty,
12 Byte(u8),
14}
15
16impl Default for Separator {
17 fn default() -> Self {
18 Separator::Byte(b'\n')
19 }
20}
21
22#[derive(Debug, Default)]
24pub struct StreamCodec {
25 incoming_separator: Separator,
26 outgoing_separator: Separator,
27}
28
29impl StreamCodec {
30 pub fn stream_incoming() -> Self {
32 StreamCodec::new(Separator::Empty, Default::default())
33 }
34
35 pub fn new(incoming_separator: Separator, outgoing_separator: Separator) -> Self {
37 StreamCodec {
38 incoming_separator,
39 outgoing_separator,
40 }
41 }
42}
43
44fn is_whitespace(byte: u8) -> bool {
45 match byte {
46 0x0D | 0x0A | 0x20 | 0x09 => true,
47 _ => false,
48 }
49}
50
51impl Decoder for StreamCodec {
52 type Item = String;
53 type Error = io::Error;
54
55 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
56 if let Separator::Byte(separator) = self.incoming_separator {
57 if let Some(i) = buf.as_ref().iter().position(|&b| b == separator) {
58 let line = buf.split_to(i);
59 buf.split_to(1);
60
61 match str::from_utf8(&line.as_ref()) {
62 Ok(s) => Ok(Some(s.to_string())),
63 Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid UTF-8")),
64 }
65 } else {
66 Ok(None)
67 }
68 } else {
69 let mut depth = 0;
70 let mut in_str = false;
71 let mut is_escaped = false;
72 let mut start_idx = 0;
73 let mut whitespaces = 0;
74
75 for idx in 0..buf.as_ref().len() {
76 let byte = buf.as_ref()[idx];
77
78 if (byte == b'{' || byte == b'[') && !in_str {
79 if depth == 0 {
80 start_idx = idx;
81 }
82 depth += 1;
83 } else if (byte == b'}' || byte == b']') && !in_str {
84 depth -= 1;
85 } else if byte == b'"' && !is_escaped {
86 in_str = !in_str;
87 } else if is_whitespace(byte) {
88 whitespaces += 1;
89 }
90 if byte == b'\\' && !is_escaped && in_str {
91 is_escaped = true;
92 } else {
93 is_escaped = false;
94 }
95
96 if depth == 0 && idx != start_idx && idx - start_idx + 1 > whitespaces {
97 let bts = buf.split_to(idx + 1);
98 match String::from_utf8(bts.as_ref().to_vec()) {
99 Ok(val) => return Ok(Some(val)),
100 Err(_) => {
101 return Ok(None);
102 } };
104 }
105 }
106 Ok(None)
107 }
108 }
109}
110
111impl Encoder for StreamCodec {
112 type Item = String;
113 type Error = io::Error;
114
115 fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
116 let mut payload = msg.into_bytes();
117 if let Separator::Byte(separator) = self.outgoing_separator {
118 payload.push(separator);
119 }
120 buf.extend_from_slice(&payload);
121 Ok(())
122 }
123}
124
125#[cfg(test)]
126mod tests {
127
128 use super::StreamCodec;
129 use bytes::{BufMut, BytesMut};
130 use tokio_codec::Decoder;
131
132 #[test]
133 fn simple_encode() {
134 let mut buf = BytesMut::with_capacity(2048);
135 buf.put_slice(b"{ test: 1 }{ test: 2 }{ test: 3 }");
136
137 let mut codec = StreamCodec::stream_incoming();
138
139 let request = codec
140 .decode(&mut buf)
141 .expect("There should be no error in simple test")
142 .expect("There should be at least one request in simple test");
143
144 assert_eq!(request, "{ test: 1 }");
145 }
146
147 #[test]
148 fn escape() {
149 let mut buf = BytesMut::with_capacity(2048);
150 buf.put_slice(br#"{ test: "\"\\" }{ test: "\ " }{ test: "\}" }[ test: "\]" ]"#);
151
152 let mut codec = StreamCodec::stream_incoming();
153
154 let request = codec
155 .decode(&mut buf)
156 .expect("There should be no error in first escape test")
157 .expect("There should be a request in first escape test");
158
159 assert_eq!(request, r#"{ test: "\"\\" }"#);
160
161 let request2 = codec
162 .decode(&mut buf)
163 .expect("There should be no error in 2nd escape test")
164 .expect("There should be a request in 2nd escape test");
165 assert_eq!(request2, r#"{ test: "\ " }"#);
166
167 let request3 = codec
168 .decode(&mut buf)
169 .expect("There should be no error in 3rd escape test")
170 .expect("There should be a request in 3rd escape test");
171 assert_eq!(request3, r#"{ test: "\}" }"#);
172
173 let request4 = codec
174 .decode(&mut buf)
175 .expect("There should be no error in 4th escape test")
176 .expect("There should be a request in 4th escape test");
177 assert_eq!(request4, r#"[ test: "\]" ]"#);
178 }
179
180 #[test]
181 fn whitespace() {
182 let mut buf = BytesMut::with_capacity(2048);
183 buf.put_slice(b"{ test: 1 }\n\n\n\n{ test: 2 }\n\r{\n test: 3 } ");
184
185 let mut codec = StreamCodec::stream_incoming();
186
187 let request = codec
188 .decode(&mut buf)
189 .expect("There should be no error in first whitespace test")
190 .expect("There should be a request in first whitespace test");
191
192 assert_eq!(request, "{ test: 1 }");
193
194 let request2 = codec
195 .decode(&mut buf)
196 .expect("There should be no error in first 2nd test")
197 .expect("There should be aa request in 2nd whitespace test");
198 assert_eq!(request2, "\n\n\n\n{ test: 2 }");
200
201 let request3 = codec
202 .decode(&mut buf)
203 .expect("There should be no error in first 3rd test")
204 .expect("There should be a request in 3rd whitespace test");
205 assert_eq!(request3, "\n\r{\n test: 3 }");
206
207 let request4 = codec
208 .decode(&mut buf)
209 .expect("There should be no error in first 4th test");
210 assert!(
211 request4.is_none(),
212 "There should be no 4th request because it contains only whitespaces"
213 );
214 }
215
216 #[test]
217 fn fragmented_encode() {
218 let mut buf = BytesMut::with_capacity(2048);
219 buf.put_slice(b"{ test: 1 }{ test: 2 }{ tes");
220
221 let mut codec = StreamCodec::stream_incoming();
222
223 let request = codec
224 .decode(&mut buf)
225 .expect("There should be no error in first fragmented test")
226 .expect("There should be at least one request in first fragmented test");
227 assert_eq!(request, "{ test: 1 }");
228 codec
229 .decode(&mut buf)
230 .expect("There should be no error in second fragmented test")
231 .expect("There should be at least one request in second fragmented test");
232 assert_eq!(String::from_utf8(buf.as_ref().to_vec()).unwrap(), "{ tes");
233
234 buf.put_slice(b"t: 3 }");
235 let request = codec
236 .decode(&mut buf)
237 .expect("There should be no error in third fragmented test")
238 .expect("There should be at least one request in third fragmented test");
239 assert_eq!(request, "{ test: 3 }");
240 }
241
242 #[test]
243 fn huge() {
244 let request = r#"
245 {
246 "jsonrpc":"2.0",
247 "method":"say_hello",
248 "params": [
249 42,
250 0,
251 {
252 "from":"0xb60e8dd61c5d32be8058bb8eb970870f07233155",
253 "gas":"0x2dc6c0",
254 "data":"0x606060405260003411156010576002565b6001805433600160a060020a0319918216811790925560028054909116909117905561291f806100406000396000f3606060405236156100e55760e060020a600035046304029f2381146100ed5780630a1273621461015f57806317c1dd87146102335780631f9ea25d14610271578063266fa0e91461029357806349593f5314610429578063569aa0d8146104fc57806359a4669f14610673578063647a4d5f14610759578063656104f5146108095780636e9febfe1461082b57806370de8c6e1461090d57806371bde852146109ed5780638f30435d14610ab4578063916dbc1714610da35780639f5a7cd414610eef578063c91540f614610fe6578063eae99e1c146110b5578063fedc2a281461115a575b61122d610002565b61122d6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050606435600154600090600160a060020a03908116339091161461233357610002565b61122f6004808035906020019082018035906020019191908080601f016020809104026020016040519081016040528093929190818152602001838380828437509496505093359350506044359150506064355b60006000600060005086604051808280519060200190808383829060006004602084601f0104600f02600301f1509050019150509081526020016040518091039020600050905042816005016000508560ff1660028110156100025760040201835060010154604060020a90046001604060020a0316116115df576115d6565b6112416004355b604080516001604060020a038316408152606060020a33600160a060020a031602602082015290519081900360340190205b919050565b61122d600435600254600160a060020a0390811633909116146128e357610002565b61125e6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050505060006000600060006000600060005087604051808280519060200190808383829060006004602084601f0104600f02600301f1509050019150509081526020016040518091039020600050905080600001600050600087600160a060020a0316815260200190815260200160002060005060000160059054906101000a90046001604060020a03169450845080600001600050600087600160a060020a03168152602001908152602001600020600050600001600d9054906101000a90046001604060020a03169350835080600001600050600087600160a060020a0316815260200190815260200160002060005060000160009054906101000a900460ff169250825080600001600050600087600160a060020a0316815260200190815260200160002060005060000160019054906101000a900463ffffffff16915081505092959194509250565b61122d6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050606435608435600060006000600060005088604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509250346000141515611c0e5760405133600160a060020a0316908290349082818181858883f193505050501515611c1a57610002565b6112996004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050600060006000600060006000600060006000508a604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509050806001016000508960ff16600281101561000257600160a060020a038a168452828101600101602052604084205463ffffffff1698506002811015610002576040842054606060020a90046001604060020a031697506002811015610002576040842054640100000000900463ffffffff169650600281101561000257604084206001015495506002811015610002576040842054604060020a900463ffffffff169450600281101561000257505060409091205495999498509296509094509260a060020a90046001604060020a0316919050565b61122d6004808035906020019082018035906020019191908080601f016020809104026020016040519081016040528093929190818152602001838380828437509496505050505050506000600060005082604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509050348160050160005082600d0160009054906101000a900460ff1660ff16600281101561000257600402830160070180546001608060020a0381169093016001608060020a03199390931692909217909155505b5050565b6112e26004808035906020019082018035906020019191908080601f01602080910003423423094734987103498712093847102938740192387401349857109487501938475"
255 }
256 ]
257 }"#;
258
259 let mut buf = BytesMut::with_capacity(65536);
260 buf.put_slice(request.as_bytes());
261
262 let mut codec = StreamCodec::stream_incoming();
263
264 let parsed_request = codec
265 .decode(&mut buf)
266 .expect("There should be no error in huge test")
267 .expect("There should be at least one request huge test");
268 assert_eq!(request, parsed_request);
269 }
270
271 #[test]
272 fn simple_line_codec() {
273 let mut buf = BytesMut::with_capacity(2048);
274 buf.put_slice(b"{ test: 1 }\n{ test: 2 }\n{ test: 3 }");
275
276 let mut codec = StreamCodec::default();
277
278 let request = codec
279 .decode(&mut buf)
280 .expect("There should be no error in simple test")
281 .expect("There should be at least one request in simple test");
282 let request2 = codec
283 .decode(&mut buf)
284 .expect("There should be no error in simple test")
285 .expect("There should be at least one request in simple test");
286
287 assert_eq!(request, "{ test: 1 }");
288 assert_eq!(request2, "{ test: 2 }");
289 }
290}