1use std::cmp::Ord;
2use std::collections::BTreeMap;
3use std::io::Error;
4use std::io::ErrorKind;
5use std::io::Read;
6use std::marker::PhantomData;
7
8use bytes::Buf;
9use bytes::BufMut;
10use bytes::buf::ext::BufExt;
11use log::trace;
12
13use crate::Version;
14use super::varint::varint_decode;
15
16pub trait Decoder: Sized + Default {
18 fn decode_from<T>(src: &mut T, version: Version) -> Result<Self, Error>
20 where
21 T: Buf,
22 Self: Default,
23 {
24 let mut decoder = Self::default();
25 decoder.decode(src, version)?;
26 Ok(decoder)
27 }
28
29 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
30 where
31 T: Buf;
32}
33
34pub trait DecoderVarInt {
35 fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
36 where
37 T: Buf;
38}
39
40impl<M> Decoder for Vec<M>
41where
42 M: Default + Decoder,
43{
44 default fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
45 where
46 T: Buf,
47 {
48 let mut len: i32 = 0;
49 len.decode(src, version)?;
50
51 trace!("decoding Vec len:{}", len);
52
53 if len < 1 {
54 trace!("negative length, skipping");
55 return Ok(());
56 }
57
58 decode_vec(len, self, src, version)?;
59
60 Ok(())
61 }
62}
63
64fn decode_vec<T, M>(len: i32, item: &mut Vec<M>, src: &mut T, version: Version) -> Result<(), Error>
65where
66 T: Buf,
67 M: Default + Decoder,
68{
69 for _ in 0..len {
70 let mut value = <M>::default();
71 value.decode(src, version)?;
72 item.push(value);
73 }
74
75 Ok(())
76}
77
78impl<M> Decoder for Option<Vec<M>>
79where
80 M: Default + Decoder,
81{
82 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
83 where
84 T: Buf,
85 {
86 let mut len: i32 = 0;
87 len.decode(src, version)?;
88
89 trace!("decoding Vec len:{}", len);
90
91 if len < 0 {
92 *self = None;
93 return Ok(());
94 }
95
96 let mut item: Vec<M> = vec![];
97
98 decode_vec(len, &mut item, src, version)?;
99 *self = Some(item);
100 Ok(())
101 }
102}
103
104impl<M> Decoder for Option<M>
105where
106 M: Default + Decoder,
107{
108 default fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
109 where
110 T: Buf,
111 {
112 let mut some = false;
113 some.decode(src, version)?;
114 if some {
115 let mut value = <M>::default();
116 value.decode(src, version)?;
117 *self = Some(value)
118 } else {
119 *self = None
120 }
121 Ok(())
122 }
123}
124
125impl<M> Decoder for PhantomData<M>
126where
127 M: Default + Decoder,
128{
129 default fn decode<T>(&mut self, _src: &mut T, _version: Version) -> Result<(), Error>
130 where
131 T: Buf,
132 {
133 Ok(())
134 }
135}
136
137impl<K, V> Decoder for BTreeMap<K, V>
138where
139 K: Decoder + Ord,
140 V: Decoder,
141{
142 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
143 where
144 T: Buf,
145 {
146 let mut len: u16 = 0;
147 len.decode(src, version)?;
148
149 let mut map: BTreeMap<K, V> = BTreeMap::new();
150 for _i in 0..len {
151 let mut key = K::default();
152 key.decode(src, version)?;
153 let mut value = V::default();
154 value.decode(src, version)?;
155 map.insert(key, value);
156 }
157
158 *self = map;
159 Ok(())
160 }
161}
162
163impl Decoder for bool {
164 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
165 where
166 T: Buf,
167 {
168 if src.remaining() < 1 {
169 return Err(Error::new(
170 ErrorKind::UnexpectedEof,
171 "not enough buf for bool",
172 ));
173 }
174 let value = src.get_u8();
175
176 match value {
177 0 => *self = false,
178 1 => *self = true,
179 _ => {
180 return Err(Error::new(ErrorKind::InvalidData, "not valid bool value"));
181 }
182 };
183
184 Ok(())
185 }
186}
187
188impl Decoder for i8 {
189 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
190 where
191 T: Buf,
192 {
193 if src.remaining() < 1 {
194 return Err(Error::new(
195 ErrorKind::UnexpectedEof,
196 "not enough buf for i8",
197 ));
198 }
199 let value = src.get_i8();
200 *self = value;
201 Ok(())
202 }
203}
204
205impl Decoder for u8 {
206 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
207 where
208 T: Buf,
209 {
210 if src.remaining() < 1 {
211 return Err(Error::new(
212 ErrorKind::UnexpectedEof,
213 "not enough buf for u8",
214 ));
215 }
216 let value = src.get_u8();
217 *self = value;
218 Ok(())
219 }
220}
221
222impl Decoder for i16 {
223 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
224 where
225 T: Buf,
226 {
227 if src.remaining() < 2 {
228 return Err(Error::new(ErrorKind::UnexpectedEof, "can't read i16"));
229 }
230 let value = src.get_i16();
231 *self = value;
232 Ok(())
233 }
234}
235
236impl Decoder for u16 {
237 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
238 where
239 T: Buf,
240 {
241 if src.remaining() < 2 {
242 return Err(Error::new(ErrorKind::UnexpectedEof, "can't read u16"));
243 }
244 let value = src.get_u16();
245 *self = value;
246 Ok(())
247 }
248}
249
250impl Decoder for Option<u16> {
251 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
252 where
253 T: Buf,
254 {
255 if src.remaining() < 1 {
256 return Err(Error::new(
257 ErrorKind::UnexpectedEof,
258 "can't read option flag for u16",
259 ));
260 }
261 let some_or_none = src.get_i8();
262 if some_or_none == 0 {
263 *self = None;
264 return Ok(());
265 }
266
267 if src.remaining() < 2 {
268 return Err(Error::new(
269 ErrorKind::UnexpectedEof,
270 "can't read Option<u16>",
271 ));
272 }
273 let value = src.get_u16();
274 *self = Some(value);
275 Ok(())
276 }
277}
278
279impl Decoder for i32 {
280 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
281 where
282 T: Buf,
283 {
284 if src.remaining() < 4 {
285 return Err(Error::new(ErrorKind::UnexpectedEof, "can't read i32"));
286 }
287 let value = src.get_i32();
288 trace!("i32: {:#x} => {}", &value, &value);
289 *self = value;
290 Ok(())
291 }
292}
293
294impl Decoder for u32 {
295 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
296 where
297 T: Buf,
298 {
299 if src.remaining() < 4 {
300 return Err(Error::new(ErrorKind::UnexpectedEof, "can't read u32"));
301 }
302 let value = src.get_u32();
303 trace!("u32: {:#x} => {}", &value, &value);
304 *self = value;
305 Ok(())
306 }
307}
308
309impl Decoder for i64 {
310 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
311 where
312 T: Buf,
313 {
314 if src.remaining() < 4 {
315 return Err(Error::new(ErrorKind::UnexpectedEof, "can't read i64"));
316 }
317 let value = src.get_i64();
318 trace!("i64: {:#x} => {}", &value, &value);
319 *self = value;
320 Ok(())
321 }
322}
323
324impl DecoderVarInt for i64 {
325 fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
326 where
327 T: Buf,
328 {
329 let (value, _) = varint_decode(src)?;
330 *self = value;
331 Ok(())
332 }
333}
334
335impl Decoder for Option<String> {
336 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
337 where
338 T: Buf,
339 {
340 let mut len: i16 = 0;
341 len.decode(src, version)?;
342 if len < 0 {
343 *self = None;
344 return Ok(());
345 }
346
347 if len == 0 {
348 *self = Some(String::default());
349 }
350
351 let value = decode_string(len, src)?;
352 *self = Some(value);
353 Ok(())
354 }
355}
356
357fn decode_string<T>(len: i16, src: &mut T) -> Result<String, Error>
358where
359 T: Buf,
360{
361 let mut value = String::default();
362 let read_size = src.take(len as usize).reader().read_to_string(&mut value)?;
363
364 if read_size != len as usize {
365 return Err(Error::new(ErrorKind::UnexpectedEof, "not enough string"));
366 }
367 Ok(value)
368}
369
370impl Decoder for String {
371 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
372 where
373 T: Buf,
374 {
375 if src.remaining() < 2 {
376 return Err(Error::new(
377 ErrorKind::UnexpectedEof,
378 "can't read string length",
379 ));
380 }
381 let len = src.get_i16();
382 if len <= 0 {
383 return Ok(());
384 }
385
386 let value = decode_string(len, src)?;
387 *self = value;
388 Ok(())
389 }
390}
391
392impl Decoder for Vec<u8> {
393 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
394 where
395 T: Buf,
396 {
397 let mut len: i32 = 0;
398 len.decode(src, version)?;
399
400 trace!("decoding Vec len:{}", len);
401
402 if len < 0 {
403 trace!("negative length, treat as empty values");
404 return Ok(());
405 }
406
407 if src.remaining() < len as usize {
408 return Err(Error::new(ErrorKind::UnexpectedEof, "not enought bytes"));
409 }
410
411 let mut buf = src.take(len as usize);
412 self.put(&mut buf);
413 if self.len() != len as usize {
414 return Err(Error::new(
415 ErrorKind::UnexpectedEof,
416 format!(
417 "varint: Vec<u8>>, expecting {} but received: {}",
418 len,
419 self.len()
420 ),
421 ));
422 }
423
424 Ok(())
425 }
426}
427
428impl DecoderVarInt for Vec<u8> {
429 fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
430 where
431 T: Buf,
432 {
433 let mut len: i64 = 0;
434 len.decode_varint(src)?;
435
436 if len < 1 {
437 return Ok(());
438 }
439
440 let mut buf = src.take(len as usize);
441 self.put(&mut buf);
442 if self.len() != len as usize {
443 return Err(Error::new(
444 ErrorKind::UnexpectedEof,
445 format!(
446 "varint: Vec<u8>>, expecting {} but received: {}",
447 len,
448 self.len()
449 ),
450 ));
451 }
452
453 Ok(())
454 }
455}
456
457fn decode_option_vec_u<T>(array: &mut Option<Vec<u8>>, src: &mut T, len: isize) -> Result<(), Error>
458where
459 T: Buf,
460{
461 if len < 0 {
462 *array = None;
463 return Ok(());
464 }
465
466 if len == 0 {
467 *array = Some(Vec::new());
468 return Ok(());
469 }
470
471 let mut buf = src.take(len as usize);
472 let mut value: Vec<u8> = Vec::new();
473 value.put(&mut buf);
474 if value.len() != len as usize {
475 return Err(Error::new(
476 ErrorKind::UnexpectedEof,
477 format!(
478 "Option<Vec<u8>>>, expecting {} but received: {}",
479 len,
480 value.len()
481 ),
482 ));
483 }
484
485 *array = Some(value);
486
487 Ok(())
488}
489
490impl DecoderVarInt for Option<Vec<u8>> {
491 fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
492 where
493 T: Buf,
494 {
495 let mut len: i64 = 0;
496 len.decode_varint(src)?;
497
498 decode_option_vec_u(self, src, len as isize)
499 }
500}
501
502#[cfg(test)]
503mod test {
504
505 use crate::Decoder;
506 use crate::DecoderVarInt;
507 use crate::Version;
508 use bytes::Buf;
509 use std::io::Cursor;
510 use std::io::Error;
511
512 #[test]
513 fn test_decode_i18_not_enough() {
514 let data = []; let mut value: i8 = 0;
516 let result = value.decode(&mut Cursor::new(&data), 0);
517 assert!(result.is_err());
518 }
519
520 #[test]
521 fn test_decode_i8() {
522 let data = [0x12];
523
524 let mut value: i8 = 0;
525 let result = value.decode(&mut Cursor::new(&data), 0);
526 assert!(result.is_ok());
527 assert_eq!(value, 18);
528 }
529
530 #[test]
531 fn test_decode_u18_not_enough() {
532 let data = []; let mut value: u8 = 0;
534 let result = value.decode(&mut Cursor::new(&data), 0);
535 assert!(result.is_err());
536 }
537
538 #[test]
539 fn test_decode_u8() {
540 let data = [0x12];
541
542 let mut value: u8 = 0;
543 let result = value.decode(&mut Cursor::new(&data), 0);
544 assert!(result.is_ok());
545 assert_eq!(value, 18);
546 }
547
548 #[test]
549 fn test_decode_i16_not_enough() {
550 let data = [0x11]; let mut value: i16 = 0;
553 let result = value.decode(&mut Cursor::new(&data), 0);
554 assert!(result.is_err());
555 }
556
557 #[test]
558 fn test_decode_i16() {
559 let data = [0x00, 0x05];
560
561 let mut value: i16 = 0;
562 let result = value.decode(&mut Cursor::new(&data), 0);
563 assert!(result.is_ok());
564 assert_eq!(value, 5);
565 }
566
567 #[test]
568 fn test_decode_u16_not_enough() {
569 let data = [0x11]; let mut value: i16 = 0;
572 let result = value.decode(&mut Cursor::new(&data), 0);
573 assert!(result.is_err());
574 }
575
576 #[test]
577 fn test_decode_u16() {
578 let data = [0x00, 0x05];
579
580 let mut value: u16 = 0;
581 let result = value.decode(&mut Cursor::new(&data), 0);
582 assert!(result.is_ok());
583 assert_eq!(value, 5);
584 }
585
586 #[test]
587 fn test_decode_option_u16_none() {
588 let data = [0x00];
589
590 let mut value: Option<u16> = None;
591 let result = value.decode(&mut Cursor::new(&data), 0);
592 assert!(result.is_ok());
593 assert_eq!(value, None);
594 }
595
596 #[test]
597 fn test_decode_option_u16_val() {
598 let data = [0x01, 0x00, 0x10];
599
600 let mut value: Option<u16> = None;
601 let result = value.decode(&mut Cursor::new(&data), 0);
602 assert!(result.is_ok());
603 assert_eq!(value, Some(16));
604 }
605
606 #[test]
607 fn test_decode_i32_not_enough() {
608 let data = [0x11, 0x11, 0x00]; let mut value: i32 = 0;
611 let result = value.decode(&mut Cursor::new(&data), 0);
612 assert!(result.is_err());
613 }
614
615 #[test]
616 fn test_decode_i32() {
617 let data = [0x00, 0x00, 0x00, 0x10];
618
619 let mut value: i32 = 0;
620 let result = value.decode(&mut Cursor::new(&data), 0);
621 assert!(result.is_ok());
622 assert_eq!(value, 16);
623 }
624
625 #[test]
626 fn test_decode_i32_2() {
627 let data = [0x00, 0x00, 0x00, 0x01];
628
629 let mut value: i32 = 0;
630 let result = value.decode(&mut Cursor::new(&data), 0);
631 assert!(result.is_ok());
632 assert_eq!(value, 1);
633 }
634
635 #[test]
636 fn test_decode_i64_not_enough() {
637 let data = [0x11, 0x11, 0x00]; let mut value: i64 = 0;
640 let result = value.decode(&mut Cursor::new(&data), 0);
641 assert!(result.is_err());
642 }
643
644 #[test]
645 fn test_decode_i64() {
646 let data = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x20];
647
648 let mut value: i64 = 0;
649 let result = value.decode(&mut Cursor::new(&data), 0);
650 assert!(result.is_ok());
651 assert_eq!(value, 32);
652 }
653
654 #[test]
655 fn test_decode_invalid_string_not_len() {
656 let data = [0x11]; let mut value = String::from("");
659 let result = value.decode(&mut Cursor::new(&data), 0);
660 assert!(result.is_err());
661 }
662
663 #[test]
664 fn test_decode_invalid_string() {
665 let data = [0x00, 0x0a, 0x63]; let mut value = String::from("");
668 let result = value.decode(&mut Cursor::new(&data), 0);
669 assert!(result.is_err());
670 }
671
672 #[test]
673 fn test_decode_null_option_string() {
674 let data = [0xff, 0xff]; let mut value: Option<String> = Some(String::from("test"));
677 let result = value.decode(&mut Cursor::new(&data), 0);
678 assert!(result.is_ok());
679 assert!(value.is_none());
680 }
681
682 #[test]
683 fn test_decode_some_option_string() {
684 let data = [0x00, 0x02, 0x77, 0x6f]; let mut value: Option<String> = None;
687 let result = value.decode(&mut Cursor::new(&data), 0);
688 assert!(result.is_ok());
689 assert!(value.is_some());
690 assert_eq!(value.unwrap(), "wo");
691 }
692
693 #[test]
694 fn test_decode_string_existing_value() {
695 let src = [0x0, 0x7, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x30];
696 let mut decode_target = "123".to_string();
697 let result = decode_target.decode(&mut Cursor::new(&src), 0);
698 assert!(result.is_ok());
699 assert_eq!(decode_target, "0.0.0.0".to_string());
700 }
701
702 #[test]
703 fn test_decode_string() {
704 let data = [
705 0x00, 0x0a, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x2d, 0x31,
706 ];
707
708 let mut value = String::from("");
709 let result = value.decode(&mut Cursor::new(&data), 0);
710 assert!(result.is_ok());
711 assert_eq!(value, "consumer-1");
712 }
713
714 #[test]
715 fn test_decode_bool_not_enough() {
716 let data = []; let mut value: bool = false;
719 let result = value.decode(&mut Cursor::new(&data), 0);
720 assert!(result.is_err());
721 }
722
723 #[test]
724 fn test_decode_bool() {
725 let data = [0x1];
726
727 let mut value: bool = false;
728 let result = value.decode(&mut Cursor::new(&data), 0);
729 assert!(result.is_ok());
730 assert_eq!(value, true);
731 }
732
733 #[test]
734 fn test_decode_bool_invalid_value() {
735 let data = [0x23]; let mut value: bool = false;
738 let result = value.decode(&mut Cursor::new(&data), 0);
739 assert!(result.is_err());
740 }
741
742 #[test]
743 fn test_decode_valid_string_vectors() {
744 let data = [0x00, 0x00, 0x00, 0x01, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74];
746
747 let mut values: Vec<String> = Vec::new();
748 let result = values.decode(&mut Cursor::new(&data), 0);
749 assert!(result.is_ok());
750 assert_eq!(values.len(), 1);
751 let first_str = &values[0];
752 assert_eq!(first_str, "test");
753 }
754
755 #[test]
756 fn test_decode_varint_trait() {
757 let data = [0x7e];
758
759 let mut value: i64 = 0;
760 let result = value.decode_varint(&mut Cursor::new(&data));
761 assert!(result.is_ok());
762 assert_eq!(value, 63);
763 }
764
765 #[test]
766 fn test_decode_varint_vec8() {
767 let data = [0x06, 0x64, 0x6f, 0x67];
768
769 let mut value: Vec<u8> = Vec::new();
770 let result = value.decode_varint(&mut Cursor::new(&data));
771 assert!(result.is_ok());
772 assert_eq!(value.len(), 3);
773 assert_eq!(value[0], 0x64);
774 }
775
776 #[test]
777 fn test_decode_varint_vec8_fail() {
778 let data = [0x06, 0x64, 0x6f];
779
780 let mut value: Vec<u8> = Vec::new();
781 let result = value.decode_varint(&mut Cursor::new(&data));
782 assert!(result.is_err());
783 }
784
785 #[test]
786 fn test_decode_varint_array_option_vec8_null() {
787 let data = [0x01];
788
789 let mut value: Option<Vec<u8>> = Some(Vec::new());
790 let result = value.decode_varint(&mut Cursor::new(&data));
791 assert!(result.is_ok());
792 assert!(value.is_none());
793 }
794
795 #[test]
796 fn test_varint_decode_array_opton_vec8_simple_array() {
797 let data = [0x06, 0x64, 0x6f, 0x67, 0x00]; let mut value: Option<Vec<u8>> = Some(Vec::new());
800 let result = value.decode_varint(&mut Cursor::new(&data));
801 assert!(result.is_ok());
802 assert!(value.is_some());
803 let array = value.unwrap();
804 assert_eq!(array.len(), 3);
805 assert_eq!(array[0], 0x64);
806 }
807
808 #[derive(Default)]
809 struct TestRecord {
810 value: i8,
811 value2: i8,
812 }
813
814 impl Decoder for TestRecord {
815 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
816 where
817 T: Buf,
818 {
819 self.value.decode(src, 0)?;
820 if version > 1 {
821 self.value2.decode(src, 0)?;
822 }
823 Ok(())
824 }
825 }
826
827 #[test]
828 fn test_decoding_struct() {
829 let data = [0x06];
830
831 let result = TestRecord::decode_from(&mut Cursor::new(&data), 0);
833 assert!(result.is_ok());
834 let record = result.unwrap();
835 assert_eq!(record.value, 6);
836 assert_eq!(record.value2, 0);
837
838 let data2 = [0x06, 0x09];
840 let record2 = TestRecord::decode_from(&mut Cursor::new(&data2), 2).expect("decode");
841 assert_eq!(record2.value, 6);
842 assert_eq!(record2.value2, 9);
843 }
844}