1use std::cmp::Ord;
2use std::collections::BTreeMap;
3use std::io::Error;
4use std::io::ErrorKind;
5use std::io::Read;
6use std::marker::PhantomData;
7
8use bytes::Buf;
9use bytes::BufMut;
10use log::trace;
11
12use super::varint::varint_decode;
13use crate::Version;
14
15pub trait Decoder: Sized + Default {
17 fn decode_from<T>(src: &mut T, version: Version) -> Result<Self, Error>
19 where
20 T: Buf,
21 Self: Default,
22 {
23 let mut decoder = Self::default();
24 decoder.decode(src, version)?;
25 Ok(decoder)
26 }
27
28 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
29 where
30 T: Buf;
31}
32
33pub trait DecoderVarInt {
34 fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
35 where
36 T: Buf;
37}
38
39impl<M> Decoder for Vec<M>
40where
41 M: Default + Decoder,
42{
43 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
44 where
45 T: Buf,
46 {
47 let mut len: i32 = 0;
48 len.decode(src, version)?;
49
50 trace!("decoding Vec len:{}", len);
51
52 if len < 1 {
53 trace!("negative length, skipping");
54 return Ok(());
55 }
56
57 decode_vec(len, self, src, version)?;
58
59 Ok(())
60 }
61}
62
63fn decode_vec<T, M>(len: i32, item: &mut Vec<M>, src: &mut T, version: Version) -> Result<(), Error>
64where
65 T: Buf,
66 M: Default + Decoder,
67{
68 for _ in 0..len {
69 let mut value = <M>::default();
70 value.decode(src, version)?;
71 item.push(value);
72 }
73
74 Ok(())
75}
76
77impl<M> Decoder for Option<M>
78where
79 M: Default + Decoder,
80{
81 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
82 where
83 T: Buf,
84 {
85 let mut some = false;
86 some.decode(src, version)?;
87 if some {
88 let mut value = <M>::default();
89 value.decode(src, version)?;
90 *self = Some(value)
91 } else {
92 *self = None
93 }
94 Ok(())
95 }
96}
97
98impl<M> Decoder for PhantomData<M>
99where
100 M: Default + Decoder,
101{
102 fn decode<T>(&mut self, _src: &mut T, _version: Version) -> Result<(), Error>
103 where
104 T: Buf,
105 {
106 Ok(())
107 }
108}
109
110impl<K, V> Decoder for BTreeMap<K, V>
111where
112 K: Decoder + Ord,
113 V: Decoder,
114{
115 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
116 where
117 T: Buf,
118 {
119 let mut len: u16 = 0;
120 len.decode(src, version)?;
121
122 let mut map: BTreeMap<K, V> = BTreeMap::new();
123 for _i in 0..len {
124 let mut key = K::default();
125 key.decode(src, version)?;
126 let mut value = V::default();
127 value.decode(src, version)?;
128 map.insert(key, value);
129 }
130
131 *self = map;
132 Ok(())
133 }
134}
135
136impl Decoder for bool {
137 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
138 where
139 T: Buf,
140 {
141 if src.remaining() < 1 {
142 return Err(Error::new(
143 ErrorKind::UnexpectedEof,
144 "not enough buf for bool",
145 ));
146 }
147 let value = src.get_u8();
148
149 match value {
150 0 => *self = false,
151 1 => *self = true,
152 _ => {
153 return Err(Error::new(ErrorKind::InvalidData, "not valid bool value"));
154 }
155 };
156
157 Ok(())
158 }
159}
160
161impl Decoder for i8 {
162 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
163 where
164 T: Buf,
165 {
166 if src.remaining() < 1 {
167 return Err(Error::new(
168 ErrorKind::UnexpectedEof,
169 "not enough buf for i8",
170 ));
171 }
172 let value = src.get_i8();
173 *self = value;
174 Ok(())
175 }
176}
177
178impl Decoder for u8 {
179 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
180 where
181 T: Buf,
182 {
183 if src.remaining() < 1 {
184 return Err(Error::new(
185 ErrorKind::UnexpectedEof,
186 "not enough buf for u8",
187 ));
188 }
189 let value = src.get_u8();
190 *self = value;
191 Ok(())
192 }
193}
194
195impl Decoder for i16 {
196 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
197 where
198 T: Buf,
199 {
200 if src.remaining() < 2 {
201 return Err(Error::new(ErrorKind::UnexpectedEof, "can't read i16"));
202 }
203 let value = src.get_i16();
204 *self = value;
205 Ok(())
206 }
207}
208
209impl Decoder for u16 {
210 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
211 where
212 T: Buf,
213 {
214 if src.remaining() < 2 {
215 return Err(Error::new(ErrorKind::UnexpectedEof, "can't read u16"));
216 }
217 let value = src.get_u16();
218 *self = value;
219 Ok(())
220 }
221}
222
223impl Decoder for i32 {
224 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
225 where
226 T: Buf,
227 {
228 if src.remaining() < 4 {
229 return Err(Error::new(ErrorKind::UnexpectedEof, "can't read i32"));
230 }
231 let value = src.get_i32();
232 trace!("i32: {:#x} => {}", &value, &value);
233 *self = value;
234 Ok(())
235 }
236}
237
238impl Decoder for u32 {
239 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
240 where
241 T: Buf,
242 {
243 if src.remaining() < 4 {
244 return Err(Error::new(ErrorKind::UnexpectedEof, "can't read u32"));
245 }
246 let value = src.get_u32();
247 trace!("u32: {:#x} => {}", &value, &value);
248 *self = value;
249 Ok(())
250 }
251}
252
253impl Decoder for i64 {
254 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
255 where
256 T: Buf,
257 {
258 if src.remaining() < 8 {
259 return Err(Error::new(ErrorKind::UnexpectedEof, "can't read i64"));
260 }
261 let value = src.get_i64();
262 trace!("i64: {:#x} => {}", &value, &value);
263 *self = value;
264 Ok(())
265 }
266}
267
268impl DecoderVarInt for i64 {
269 fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
270 where
271 T: Buf,
272 {
273 let (value, _) = varint_decode(src)?;
274 *self = value;
275 Ok(())
276 }
277}
278
279fn decode_string<T>(len: i16, src: &mut T) -> Result<String, Error>
280where
281 T: Buf,
282{
283 let mut value = String::default();
284 let read_size = src.take(len as usize).reader().read_to_string(&mut value)?;
285
286 if read_size != len as usize {
287 return Err(Error::new(ErrorKind::UnexpectedEof, "not enough string"));
288 }
289 Ok(value)
290}
291
292impl Decoder for String {
293 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
294 where
295 T: Buf,
296 {
297 if src.remaining() < 2 {
298 return Err(Error::new(
299 ErrorKind::UnexpectedEof,
300 "can't read string length",
301 ));
302 }
303 let len = src.get_i16();
304 if len <= 0 {
305 return Ok(());
306 }
307
308 let value = decode_string(len, src)?;
309 *self = value;
310 Ok(())
311 }
312}
313
314impl DecoderVarInt for Vec<u8> {
315 fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
316 where
317 T: Buf,
318 {
319 let mut len: i64 = 0;
320 len.decode_varint(src)?;
321
322 if len < 1 {
323 return Ok(());
324 }
325
326 let mut buf = src.take(len as usize);
327 self.put(&mut buf);
328 if self.len() != len as usize {
329 return Err(Error::new(
330 ErrorKind::UnexpectedEof,
331 format!(
332 "varint: Vec<u8>>, expecting {} but received: {}",
333 len,
334 self.len()
335 ),
336 ));
337 }
338
339 Ok(())
340 }
341}
342
343fn decode_option_vec_u<T>(array: &mut Option<Vec<u8>>, src: &mut T, len: isize) -> Result<(), Error>
344where
345 T: Buf,
346{
347 if len < 0 {
348 *array = None;
349 return Ok(());
350 }
351
352 if len == 0 {
353 *array = Some(Vec::new());
354 return Ok(());
355 }
356
357 let mut buf = src.take(len as usize);
358 let mut value: Vec<u8> = Vec::new();
359 value.put(&mut buf);
360 if value.len() != len as usize {
361 return Err(Error::new(
362 ErrorKind::UnexpectedEof,
363 format!(
364 "Option<Vec<u8>>>, expecting {} but received: {}",
365 len,
366 value.len()
367 ),
368 ));
369 }
370
371 *array = Some(value);
372
373 Ok(())
374}
375
376impl DecoderVarInt for Option<Vec<u8>> {
377 fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
378 where
379 T: Buf,
380 {
381 let mut len: i64 = 0;
382 len.decode_varint(src)?;
383
384 decode_option_vec_u(self, src, len as isize)
385 }
386}
387
388#[cfg(test)]
389mod test {
390
391 use crate::Decoder;
392 use crate::DecoderVarInt;
393 use crate::Version;
394 use bytes::Buf;
395 use std::io::Cursor;
396 use std::io::Error;
397
398 #[test]
399 fn test_decode_i18_not_enough() {
400 let data = []; let mut value: i8 = 0;
402 let result = value.decode(&mut Cursor::new(&data), 0);
403 assert!(result.is_err());
404 }
405
406 #[test]
407 fn test_decode_i8() {
408 let data = [0x12];
409
410 let mut value: i8 = 0;
411 let result = value.decode(&mut Cursor::new(&data), 0);
412 assert!(result.is_ok());
413 assert_eq!(value, 18);
414 }
415
416 #[test]
417 fn test_decode_u18_not_enough() {
418 let data = []; let mut value: u8 = 0;
420 let result = value.decode(&mut Cursor::new(&data), 0);
421 assert!(result.is_err());
422 }
423
424 #[test]
425 fn test_decode_u8() {
426 let data = [0x12];
427
428 let mut value: u8 = 0;
429 let result = value.decode(&mut Cursor::new(&data), 0);
430 assert!(result.is_ok());
431 assert_eq!(value, 18);
432 }
433
434 #[test]
435 fn test_decode_i16_not_enough() {
436 let data = [0x11]; let mut value: i16 = 0;
439 let result = value.decode(&mut Cursor::new(&data), 0);
440 assert!(result.is_err());
441 }
442
443 #[test]
444 fn test_decode_i16() {
445 let data = [0x00, 0x05];
446
447 let mut value: i16 = 0;
448 let result = value.decode(&mut Cursor::new(&data), 0);
449 assert!(result.is_ok());
450 assert_eq!(value, 5);
451 }
452
453 #[test]
454 fn test_decode_u16_not_enough() {
455 let data = [0x11]; let mut value: i16 = 0;
458 let result = value.decode(&mut Cursor::new(&data), 0);
459 assert!(result.is_err());
460 }
461
462 #[test]
463 fn test_decode_u16() {
464 let data = [0x00, 0x05];
465
466 let mut value: u16 = 0;
467 let result = value.decode(&mut Cursor::new(&data), 0);
468 assert!(result.is_ok());
469 assert_eq!(value, 5);
470 }
471
472 #[test]
473 fn test_decode_option_u16_none() {
474 let data = [0x00];
475
476 let mut value: Option<u16> = None;
477 let result = value.decode(&mut Cursor::new(&data), 0);
478 assert!(result.is_ok());
479 assert_eq!(value, None);
480 }
481
482 #[test]
483 fn test_decode_option_u16_val() {
484 let data = [0x01, 0x00, 0x10];
485
486 let mut value: Option<u16> = None;
487 let result = value.decode(&mut Cursor::new(&data), 0);
488 assert!(result.is_ok());
489 assert_eq!(value, Some(16));
490 }
491
492 #[test]
493 fn test_decode_i32_not_enough() {
494 let data = [0x11, 0x11, 0x00]; let mut value: i32 = 0;
497 let result = value.decode(&mut Cursor::new(&data), 0);
498 assert!(result.is_err());
499 }
500
501 #[test]
502 fn test_decode_i32() {
503 let data = [0x00, 0x00, 0x00, 0x10];
504
505 let mut value: i32 = 0;
506 let result = value.decode(&mut Cursor::new(&data), 0);
507 assert!(result.is_ok());
508 assert_eq!(value, 16);
509 }
510
511 #[test]
512 fn test_decode_i32_2() {
513 let data = [0x00, 0x00, 0x00, 0x01];
514
515 let mut value: i32 = 0;
516 let result = value.decode(&mut Cursor::new(&data), 0);
517 assert!(result.is_ok());
518 assert_eq!(value, 1);
519 }
520
521 #[test]
522 fn test_decode_i64_not_enough() {
523 let data = [0x11, 0x11, 0x00]; let mut value: i64 = 0;
526 let result = value.decode(&mut Cursor::new(&data), 0);
527 assert!(result.is_err());
528 }
529
530 #[test]
531 fn test_decode_i64() {
532 let data = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x20];
533
534 let mut value: i64 = 0;
535 let result = value.decode(&mut Cursor::new(&data), 0);
536 assert!(result.is_ok());
537 assert_eq!(value, 32);
538 }
539
540 #[test]
541 fn test_decode_invalid_string_not_len() {
542 let data = [0x11]; let mut value = String::from("");
545 let result = value.decode(&mut Cursor::new(&data), 0);
546 assert!(result.is_err());
547 }
548
549 #[test]
550 fn test_decode_invalid_string() {
551 let data = [0x00, 0x0a, 0x63]; let mut value = String::from("");
554 let result = value.decode(&mut Cursor::new(&data), 0);
555 assert!(result.is_err());
556 }
557
558 #[test]
559 fn test_decode_null_option_string() {
560 let data = [0x00]; let mut value: Option<String> = Some(String::from("test"));
563 let result = value.decode(&mut Cursor::new(&data), 0);
564 assert!(result.is_ok());
565 assert!(value.is_none());
566 }
567
568 #[test]
569 fn test_decode_some_option_string() {
570 let data = [0x01, 0x00, 0x02, 0x77, 0x6f]; let mut value: Option<String> = None;
573 let result = value.decode(&mut Cursor::new(&data), 0);
574 assert!(result.is_ok());
575 assert!(value.is_some());
576 assert_eq!(value.unwrap(), "wo");
577 }
578
579 #[test]
580 fn test_decode_string_existing_value() {
581 let src = [0x0, 0x7, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x30];
582 let mut decode_target = "123".to_string();
583 let result = decode_target.decode(&mut Cursor::new(&src), 0);
584 assert!(result.is_ok());
585 assert_eq!(decode_target, "0.0.0.0".to_string());
586 }
587
588 #[test]
589 fn test_decode_string() {
590 let data = [
591 0x00, 0x0a, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x2d, 0x31,
592 ];
593
594 let mut value = String::from("");
595 let result = value.decode(&mut Cursor::new(&data), 0);
596 assert!(result.is_ok());
597 assert_eq!(value, "consumer-1");
598 }
599
600 #[test]
601 fn test_decode_bool_not_enough() {
602 let data = []; let mut value: bool = false;
605 let result = value.decode(&mut Cursor::new(&data), 0);
606 assert!(result.is_err());
607 }
608
609 #[test]
610 fn test_decode_bool() {
611 let data = [0x1];
612
613 let mut value: bool = false;
614 let result = value.decode(&mut Cursor::new(&data), 0);
615 assert!(result.is_ok());
616 assert!(value);
617 }
618
619 #[test]
620 fn test_decode_bool_invalid_value() {
621 let data = [0x23]; let mut value: bool = false;
624 let result = value.decode(&mut Cursor::new(&data), 0);
625 assert!(result.is_err());
626 }
627
628 #[test]
629 fn test_decode_valid_string_vectors() {
630 let data = [0, 0, 0, 0x01, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74];
632
633 let mut values: Vec<String> = Vec::new();
634 let result = values.decode(&mut Cursor::new(&data), 0);
635 assert!(result.is_ok());
636 assert_eq!(values.len(), 1);
637 let first_str = &values[0];
638 assert_eq!(first_str, "test");
639 }
640
641 #[test]
642 fn test_decode_varint_trait() {
643 let data = [0x7e];
644
645 let mut value: i64 = 0;
646 let result = value.decode_varint(&mut Cursor::new(&data));
647 assert!(result.is_ok());
648 assert_eq!(value, 63);
649 }
650
651 #[test]
652 fn test_decode_varint_vec8() {
653 let data = [0x06, 0x64, 0x6f, 0x67];
654
655 let mut value: Vec<u8> = Vec::new();
656 let result = value.decode_varint(&mut Cursor::new(&data));
657 assert!(result.is_ok());
658 assert_eq!(value.len(), 3);
659 assert_eq!(value[0], 0x64);
660 }
661
662 #[test]
663 fn test_vec8_encode_and_decode() {
664 use crate::encoder::Encoder;
665 let in_vec: Vec<u8> = vec![1, 2, 3];
666 let mut out: Vec<u8> = vec![];
667 let ret = in_vec.encode(&mut out, 0);
668 assert!(ret.is_ok());
669 }
670
671 #[test]
672 fn test_decode_varint_vec8_fail() {
673 let data = [0x06, 0x64, 0x6f];
674
675 let mut value: Vec<u8> = Vec::new();
676 let result = value.decode_varint(&mut Cursor::new(&data));
677 assert!(result.is_err());
678 }
679
680 #[test]
681 fn test_varint_decode_array_opton_vec8_simple_array() {
682 let data = [0x06, 0x64, 0x6f, 0x67, 0x00]; let mut value: Option<Vec<u8>> = Some(Vec::new());
685 let result = value.decode_varint(&mut Cursor::new(&data));
686 assert!(result.is_ok());
687 assert!(value.is_some());
688 let array = value.unwrap();
689 assert_eq!(array.len(), 3);
690 assert_eq!(array[0], 0x64);
691 }
692
693 #[derive(Default)]
694 struct TestRecord {
695 value: i8,
696 value2: i8,
697 }
698
699 impl Decoder for TestRecord {
700 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
701 where
702 T: Buf,
703 {
704 self.value.decode(src, 0)?;
705 if version > 1 {
706 self.value2.decode(src, 0)?;
707 }
708 Ok(())
709 }
710 }
711
712 #[test]
713 fn test_decoding_struct() {
714 let data = [0x06];
715
716 let result = TestRecord::decode_from(&mut Cursor::new(&data), 0);
718 assert!(result.is_ok());
719 let record = result.unwrap();
720 assert_eq!(record.value, 6);
721 assert_eq!(record.value2, 0);
722
723 let data2 = [0x06, 0x09];
725 let record2 = TestRecord::decode_from(&mut Cursor::new(&data2), 2).expect("decode");
726 assert_eq!(record2.value, 6);
727 assert_eq!(record2.value2, 9);
728 }
729}