1use crate::{
2 error::{DecodeError, DecodeResult},
3 ClientMessage as CM, Export, Get, GraveGoods, HandshakeRequest, Import, Key, KeyLength,
4 LastWill, NumGraveGoods, NumLastWill, NumProtocolVersions, PGet, PSubscribe, Path, PathLength,
5 ProtocolVersion, ProtocolVersionSegment, ProtocolVersions, RequestPattern,
6 RequestPatternLength, Set, Subscribe, TransactionId, Unsubscribe, Value, ValueLength, EXP, GET,
7 HSHKR, IMP, KEY_LENGTH_BYTES, MESSAGE_LENGTH_BYTES, NUM_GRAVE_GOODS_BYTES, NUM_LAST_WILL_BYTES,
8 NUM_PROTOCOL_VERSION_BYTES, PATH_LENGTH_BYTES, PGET, PROTOCOL_VERSION_SEGMENT_BYTES, PSUB,
9 REQUEST_PATTERN_LENGTH_BYTES, SET, SUB, TRANSACTION_ID_BYTES, UNIQUE_FLAG_BYTES, USUB,
10 VALUE_LENGTH_BYTES,
11};
12use std::io::Read;
13
14pub fn read_client_message(mut data: impl Read) -> DecodeResult<CM> {
15 let mut buf = [0; MESSAGE_LENGTH_BYTES];
16 data.read_exact(&mut buf)?;
17 let mut buf = [0];
18 data.read_exact(&mut buf)?;
19 match buf[0] {
20 HSHKR => read_handshake_request_message(data).map(CM::HandshakeRequest),
21 GET => read_get_message(data).map(CM::Get),
22 PGET => read_pget_message(data).map(CM::PGet),
23 SET => read_set_message(data).map(CM::Set),
24 SUB => read_subscribe_message(data).map(CM::Subscribe),
25 PSUB => read_psubscribe_message(data).map(CM::PSubscribe),
26 EXP => read_export_message(data).map(CM::Export),
27 IMP => read_import_message(data).map(CM::Import),
28 USUB => read_unsubscribe_message(data).map(CM::Unsubscribe),
29 _ => Err(DecodeError::UndefinedType(buf[0])),
30 }
31}
32
33fn read_get_message(mut data: impl Read) -> DecodeResult<Get> {
34 let mut buf = [0; TRANSACTION_ID_BYTES];
35 data.read_exact(&mut buf)?;
36 let transaction_id = TransactionId::from_be_bytes(buf);
37
38 let mut buf = [0; KEY_LENGTH_BYTES];
39 data.read_exact(&mut buf)?;
40 let key_length = KeyLength::from_be_bytes(buf);
41
42 let mut buf = vec![0; key_length as usize];
43 data.read_exact(&mut buf)?;
44 let key = Key::from_utf8_lossy(&buf).to_string();
45
46 Ok(Get {
47 transaction_id,
48 key,
49 })
50}
51
52fn read_pget_message(mut data: impl Read) -> DecodeResult<PGet> {
53 let mut buf = [0; TRANSACTION_ID_BYTES];
54 data.read_exact(&mut buf)?;
55 let transaction_id = TransactionId::from_be_bytes(buf);
56
57 let mut buf = [0; REQUEST_PATTERN_LENGTH_BYTES];
58 data.read_exact(&mut buf)?;
59 let request_pattern_length = RequestPatternLength::from_be_bytes(buf);
60
61 let mut buf = vec![0; request_pattern_length as usize];
62 data.read_exact(&mut buf)?;
63 let request_pattern = RequestPattern::from_utf8_lossy(&buf).to_string();
64
65 Ok(PGet {
66 transaction_id,
67 request_pattern,
68 })
69}
70
71fn read_set_message(mut data: impl Read) -> DecodeResult<Set> {
72 let mut buf = [0; TRANSACTION_ID_BYTES];
73 data.read_exact(&mut buf)?;
74 let transaction_id = TransactionId::from_be_bytes(buf);
75
76 let mut buf = [0; KEY_LENGTH_BYTES];
77 data.read_exact(&mut buf)?;
78 let key_length = KeyLength::from_be_bytes(buf);
79
80 let mut buf = [0; VALUE_LENGTH_BYTES];
81 data.read_exact(&mut buf)?;
82 let value_length = ValueLength::from_be_bytes(buf);
83
84 let mut buf = vec![0; key_length as usize];
85 data.read_exact(&mut buf)?;
86 let key = Key::from_utf8_lossy(&buf).to_string();
87
88 let mut buf = vec![0; value_length as usize];
89 data.read_exact(&mut buf)?;
90 let value = Value::from_utf8_lossy(&buf).to_string();
91
92 Ok(Set {
93 transaction_id,
94 key,
95 value,
96 })
97}
98
99fn read_subscribe_message(mut data: impl Read) -> DecodeResult<Subscribe> {
100 let mut buf = [0; TRANSACTION_ID_BYTES];
101 data.read_exact(&mut buf)?;
102 let transaction_id = TransactionId::from_be_bytes(buf);
103
104 let mut buf = [0; KEY_LENGTH_BYTES];
105 data.read_exact(&mut buf)?;
106 let key_length = RequestPatternLength::from_be_bytes(buf);
107
108 let mut buf = vec![0; key_length as usize];
109 data.read_exact(&mut buf)?;
110 let key = RequestPattern::from_utf8_lossy(&buf).to_string();
111
112 let mut buf = vec![0; UNIQUE_FLAG_BYTES];
113 data.read_exact(&mut buf)?;
114 let unique = buf[0] != 0;
115
116 Ok(Subscribe {
117 transaction_id,
118 key,
119 unique,
120 })
121}
122
123fn read_psubscribe_message(mut data: impl Read) -> DecodeResult<PSubscribe> {
124 let mut buf = [0; TRANSACTION_ID_BYTES];
125 data.read_exact(&mut buf)?;
126 let transaction_id = TransactionId::from_be_bytes(buf);
127
128 let mut buf = [0; REQUEST_PATTERN_LENGTH_BYTES];
129 data.read_exact(&mut buf)?;
130 let request_pattern_length = RequestPatternLength::from_be_bytes(buf);
131
132 let mut buf = vec![0; request_pattern_length as usize];
133 data.read_exact(&mut buf)?;
134 let request_pattern = RequestPattern::from_utf8_lossy(&buf).to_string();
135
136 let mut buf = vec![0; UNIQUE_FLAG_BYTES];
137 data.read_exact(&mut buf)?;
138 let unique = buf[0] != 0;
139
140 Ok(PSubscribe {
141 transaction_id,
142 request_pattern,
143 unique,
144 })
145}
146
147fn read_import_message(mut data: impl Read) -> DecodeResult<Import> {
148 let mut buf = [0; TRANSACTION_ID_BYTES];
149 data.read_exact(&mut buf)?;
150 let transaction_id = TransactionId::from_be_bytes(buf);
151
152 let mut buf = [0; PATH_LENGTH_BYTES];
153 data.read_exact(&mut buf)?;
154 let path_length = PathLength::from_be_bytes(buf);
155
156 let mut buf = vec![0; path_length as usize];
157 data.read_exact(&mut buf)?;
158 let path = Path::from_utf8_lossy(&buf).to_string();
159
160 Ok(Import {
161 transaction_id,
162 path,
163 })
164}
165
166fn read_unsubscribe_message(mut data: impl Read) -> DecodeResult<Unsubscribe> {
167 let mut buf = [0; TRANSACTION_ID_BYTES];
168 data.read_exact(&mut buf)?;
169 let transaction_id = TransactionId::from_be_bytes(buf);
170
171 Ok(Unsubscribe { transaction_id })
172}
173
174fn read_export_message(mut data: impl Read) -> DecodeResult<Export> {
175 let mut buf = [0; TRANSACTION_ID_BYTES];
176 data.read_exact(&mut buf)?;
177 let transaction_id = TransactionId::from_be_bytes(buf);
178
179 let mut buf = [0; PATH_LENGTH_BYTES];
180 data.read_exact(&mut buf)?;
181 let path_length = PathLength::from_be_bytes(buf);
182
183 let mut buf = vec![0; path_length as usize];
184 data.read_exact(&mut buf)?;
185 let path = Path::from_utf8_lossy(&buf).to_string();
186
187 Ok(Export {
188 transaction_id,
189 path,
190 })
191}
192
193fn read_handshake_request_message(mut data: impl Read) -> DecodeResult<HandshakeRequest> {
194 let mut buf = [0; NUM_PROTOCOL_VERSION_BYTES];
195 data.read_exact(&mut buf)?;
196 let num_protocol_versions = NumProtocolVersions::from_be_bytes(buf);
197
198 let mut buf = [0; NUM_LAST_WILL_BYTES];
199 data.read_exact(&mut buf)?;
200 let num_last_will = NumLastWill::from_be_bytes(buf);
201
202 let mut buf = [0; NUM_GRAVE_GOODS_BYTES];
203 data.read_exact(&mut buf)?;
204 let num_grave_good = NumGraveGoods::from_be_bytes(buf);
205
206 let mut supported_protocol_versions = ProtocolVersions::new();
207
208 for _ in 0..num_protocol_versions {
209 let mut buf = [0; PROTOCOL_VERSION_SEGMENT_BYTES];
210 data.read_exact(&mut buf)?;
211 let major = ProtocolVersionSegment::from_be_bytes(buf);
212
213 let mut buf = [0; PROTOCOL_VERSION_SEGMENT_BYTES];
214 data.read_exact(&mut buf)?;
215 let minor = ProtocolVersionSegment::from_be_bytes(buf);
216
217 supported_protocol_versions.push(ProtocolVersion { major, minor });
218 }
219
220 let mut last_will_key_value_lengths = Vec::new();
221
222 for _ in 0..num_last_will {
223 let mut buf = [0; KEY_LENGTH_BYTES];
224 data.read_exact(&mut buf)?;
225 let key_length = KeyLength::from_be_bytes(buf);
226
227 let mut buf = [0; VALUE_LENGTH_BYTES];
228 data.read_exact(&mut buf)?;
229 let value_length = ValueLength::from_be_bytes(buf);
230
231 last_will_key_value_lengths.push((key_length, value_length));
232 }
233
234 let mut grave_goods_key_lengths = Vec::new();
235
236 for _ in 0..num_grave_good {
237 let mut buf = [0; KEY_LENGTH_BYTES];
238 data.read_exact(&mut buf)?;
239 let key_length = KeyLength::from_be_bytes(buf);
240
241 grave_goods_key_lengths.push(key_length);
242 }
243
244 let mut last_will = LastWill::new();
245
246 for (key_length, value_length) in last_will_key_value_lengths {
247 let mut buf = vec![0; key_length as usize];
248 data.read_exact(&mut buf)?;
249 let key = Key::from_utf8(buf)?;
250
251 let mut buf = vec![0; value_length as usize];
252 data.read_exact(&mut buf)?;
253 let value = Value::from_utf8_lossy(&buf).to_string();
254
255 last_will.push((key, value).into());
256 }
257
258 let mut grave_goods = GraveGoods::new();
259
260 for key_length in grave_goods_key_lengths {
261 let mut buf = vec![0; key_length as usize];
262 data.read_exact(&mut buf)?;
263 let key = Key::from_utf8(buf)?;
264
265 grave_goods.push(key);
266 }
267
268 Ok(HandshakeRequest {
269 supported_protocol_versions,
270 last_will,
271 grave_goods,
272 })
273}
274
275#[cfg(test)]
276mod test {
277
278 use super::*;
279 use crate::encode_set_message;
280
281 #[test]
282 fn get_message_is_read_correctly() {
283 let data = [
284 0b00000000, 0b00000000, 0b00000000, 0b00010000, GET, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000,
286 0b00000000, 0b00000100, 0b00000000, 0b00000101, b't', b'r', b'o', b'l', b'o',
287 ];
288
289 let result = read_client_message(&data[..]).unwrap();
290
291 assert_eq!(
292 result,
293 CM::Get(Get {
294 transaction_id: 4,
295 key: "trolo".to_owned()
296 })
297 )
298 }
299
300 #[test]
301 fn pget_message_is_read_correctly() {
302 let data = [
303 0b00000000, 0b00000000, 0b00000000, 0b00010000, PGET, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000,
305 0b00000000, 0b00000100, 0b00000000, 0b00000101, b't', b'r', b'o', b'l', b'o',
306 ];
307
308 let result = read_client_message(&data[..]).unwrap();
309
310 assert_eq!(
311 result,
312 CM::PGet(PGet {
313 transaction_id: 4,
314 request_pattern: "trolo".to_owned()
315 })
316 )
317 }
318
319 #[test]
320 fn set_message_is_read_correctly() {
321 let data = [
322 0b00000000, 0b00000000, 0b00000000, 0b00011001, SET, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000,
324 0b00000000, 0b00000000, 0b00000000, 0b00000111, 0b00000000, 0b00000000, 0b00000000,
325 0b00000011, b'y', b'o', b'/', b'm', b'a', b'm', b'a', b'f', b'a', b't',
326 ];
327
328 let result = read_client_message(&data[..]).unwrap();
329
330 assert_eq!(
331 result,
332 CM::Set(Set {
333 transaction_id: 0,
334 key: "yo/mama".to_owned(),
335 value: "fat".to_owned()
336 })
337 )
338 }
339
340 #[test]
341 fn subscribe_message_is_read_correctly() {
342 let data = [
343 0b00000000, 0b00000000, 0b00000000, 0b00100101, SUB, 0b00000000, 0b00000000, 0b00000101, 0b00001001, 0b00011100, 0b00100000,
345 0b01110000, 0b10010111, 0b00000000, 0b00011001, b'l', b'e', b't', b'/', b'm', b'e',
346 b'/', b'?', b'/', b'y', b'o', b'u', b'/', b'i', b't', b's', b'/', b'f', b'e', b'a',
347 b't', b'u', b'r', b'e', b's', 0b00000000,
348 ];
349
350 let result = read_client_message(&data[..]).unwrap();
351
352 assert_eq!(
353 result,
354 CM::Subscribe(Subscribe {
355 transaction_id: 5536684732567,
356 key: "let/me/?/you/its/features".to_owned(),
358 unique: false
359 })
360 )
361 }
362 #[test]
363 fn psubscribe_message_is_read_correctly() {
364 let data = [
365 0b00000000, 0b00000000, 0b00000000, 0b00100101, PSUB, 0b00000000, 0b00000000, 0b00000101, 0b00001001, 0b00011100, 0b00100000,
367 0b01110000, 0b10010111, 0b00000000, 0b00011001, b'l', b'e', b't', b'/', b'm', b'e',
368 b'/', b'?', b'/', b'y', b'o', b'u', b'/', b'i', b't', b's', b'/', b'f', b'e', b'a',
369 b't', b'u', b'r', b'e', b's', 0b00000001,
370 ];
371
372 let result = read_client_message(&data[..]).unwrap();
373
374 assert_eq!(
375 result,
376 CM::PSubscribe(PSubscribe {
377 transaction_id: 5536684732567,
378 request_pattern: "let/me/?/you/its/features".to_owned(),
379 unique: true
380 })
381 )
382 }
383
384 #[test]
385 fn export_message_is_read_correctly() {
386 let data = [
387 0b00000000, 0b00000000, 0b00000000, 0b00011000, EXP, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000,
389 0b00000000, 0b00101010, 0b00000000, 0b00001101, b'/', b'p', b'a', b't', b'h', b'/',
390 b't', b'o', b'/', b'f', b'i', b'l', b'e',
391 ];
392
393 let result = read_client_message(&data[..]).unwrap();
394
395 assert_eq!(
396 result,
397 CM::Export(Export {
398 transaction_id: 42,
399 path: "/path/to/file".to_owned(),
400 })
401 )
402 }
403
404 #[test]
405 fn import_message_is_read_correctly() {
406 let data = [
407 0b00000000, 0b00000000, 0b00000000, 0b00011000, IMP, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000,
409 0b00000000, 0b00101010, 0b00000000, 0b00001101, b'/', b'p', b'a', b't', b'h', b'/',
410 b't', b'o', b'/', b'f', b'i', b'l', b'e',
411 ];
412
413 let result = read_client_message(&data[..]).unwrap();
414
415 assert_eq!(
416 result,
417 CM::Import(Import {
418 transaction_id: 42,
419 path: "/path/to/file".to_owned(),
420 })
421 )
422 }
423
424 #[test]
425 fn unsubscribe_message_is_read_correctly() {
426 let data = [
427 0b00000000, 0b00000000, 0b00000000, 0b00001001, USUB, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000, 0b00000000,
429 0b00000000, 0b00101010,
430 ];
431
432 let result = read_client_message(&data[..]).unwrap();
433
434 assert_eq!(result, CM::Unsubscribe(Unsubscribe { transaction_id: 42 }))
435 }
436
437 #[test]
438 fn handshake_request_message_is_read_correctly() {
439 let data = [
440 0b00000000, 0b00000000, 0b00000000, 0b01000001, HSHKR, 0b00000011, 0b00000001, 0b00000010, 0b00000000, 0b00000000, 0b00000000, 0b00000001, 0b00000000, 0b00000000, 0b00000000, 0b00000101, 0b00000000, 0b00000001, 0b00000000, 0b00000000, 0b00000000, 0b00001001, 0b00000000, 0b00000000, 0b00000000, 0b00000100, 0b00000000, 0b00001101, 0b00000000, 0b00001101, b'l', b'a', b's', b't', b'/', b'w', b'i', b'l', b'l', b't', b'e', b's', b't', b'g', b'r', b'a', b'v', b'e', b'/', b'g', b'o', b'o', b'd', b's', b'/',
455 b'1', b'g', b'r', b'a', b'v', b'e', b'/', b'g', b'o', b'o', b'd', b's', b'/',
457 b'2', ];
459
460 let result = read_client_message(&data[..]).unwrap();
461
462 assert_eq!(
463 result,
464 CM::HandshakeRequest(HandshakeRequest {
465 supported_protocol_versions: vec![
466 ProtocolVersion { major: 0, minor: 1 },
467 ProtocolVersion { major: 0, minor: 5 },
468 ProtocolVersion { major: 1, minor: 0 },
469 ],
470 last_will: vec![("last/will", "test").into(),],
471 grave_goods: vec!["grave/goods/1".into(), "grave/goods/2".into(),]
472 })
473 )
474 }
475
476 #[test]
477 fn utf_message_roundtrip_is_successful() {
478 let msg = Set {
479 transaction_id: 42,
480 key: "🦀/🕸/😅".to_owned(),
481 value: "…".to_owned(),
482 };
483
484 let data = encode_set_message(&msg).unwrap();
485
486 let decoded = read_client_message(&data[..]).unwrap();
487
488 assert_eq!(CM::Set(msg), decoded);
489 }
490}