kf_protocol_core/
decoder.rs

1use std::cmp::Ord;
2use std::collections::BTreeMap;
3use std::io::Error;
4use std::io::ErrorKind;
5use std::io::Read;
6use std::marker::PhantomData;
7
8use bytes::Buf;
9use bytes::BufMut;
10use bytes::buf::ext::BufExt;
11use log::trace;
12
13use crate::Version;
14use super::varint::varint_decode;
15
16// trait for encoding and decoding using Kafka Protocol
17pub trait Decoder: Sized + Default {
18    /// decode Kafka compliant protocol values from buf
19    fn decode_from<T>(src: &mut T, version: Version) -> Result<Self, Error>
20    where
21        T: Buf,
22        Self: Default,
23    {
24        let mut decoder = Self::default();
25        decoder.decode(src, version)?;
26        Ok(decoder)
27    }
28
29    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
30    where
31        T: Buf;
32}
33
34pub trait DecoderVarInt {
35    fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
36    where
37        T: Buf;
38}
39
40impl<M> Decoder for Vec<M>
41where
42    M: Default + Decoder,
43{
44    default fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
45    where
46        T: Buf,
47    {
48        let mut len: i32 = 0;
49        len.decode(src, version)?;
50
51        trace!("decoding Vec len:{}", len);
52
53        if len < 1 {
54            trace!("negative length, skipping");
55            return Ok(());
56        }
57
58        decode_vec(len, self, src, version)?;
59
60        Ok(())
61    }
62}
63
64fn decode_vec<T, M>(len: i32, item: &mut Vec<M>, src: &mut T, version: Version) -> Result<(), Error>
65where
66    T: Buf,
67    M: Default + Decoder,
68{
69    for _ in 0..len {
70        let mut value = <M>::default();
71        value.decode(src, version)?;
72        item.push(value);
73    }
74
75    Ok(())
76}
77
78impl<M> Decoder for Option<Vec<M>>
79where
80    M: Default + Decoder,
81{
82    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
83    where
84        T: Buf,
85    {
86        let mut len: i32 = 0;
87        len.decode(src, version)?;
88
89        trace!("decoding Vec len:{}", len);
90
91        if len < 0 {
92            *self = None;
93            return Ok(());
94        }
95
96        let mut item: Vec<M> = vec![];
97
98        decode_vec(len, &mut item, src, version)?;
99        *self = Some(item);
100        Ok(())
101    }
102}
103
104impl<M> Decoder for Option<M>
105where
106    M: Default + Decoder,
107{
108    default fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
109    where
110        T: Buf,
111    {
112        let mut some = false;
113        some.decode(src, version)?;
114        if some {
115            let mut value = <M>::default();
116            value.decode(src, version)?;
117            *self = Some(value)
118        } else {
119            *self = None
120        }
121        Ok(())
122    }
123}
124
125impl<M> Decoder for PhantomData<M>
126where
127    M: Default + Decoder,
128{
129    default fn decode<T>(&mut self, _src: &mut T, _version: Version) -> Result<(), Error>
130    where
131        T: Buf,
132    {
133        Ok(())
134    }
135}
136
137impl<K, V> Decoder for BTreeMap<K, V>
138where
139    K: Decoder + Ord,
140    V: Decoder,
141{
142    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
143    where
144        T: Buf,
145    {
146        let mut len: u16 = 0;
147        len.decode(src, version)?;
148
149        let mut map: BTreeMap<K, V> = BTreeMap::new();
150        for _i in 0..len {
151            let mut key = K::default();
152            key.decode(src, version)?;
153            let mut value = V::default();
154            value.decode(src, version)?;
155            map.insert(key, value);
156        }
157
158        *self = map;
159        Ok(())
160    }
161}
162
163impl Decoder for bool {
164    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
165    where
166        T: Buf,
167    {
168        if src.remaining() < 1 {
169            return Err(Error::new(
170                ErrorKind::UnexpectedEof,
171                "not enough buf for bool",
172            ));
173        }
174        let value = src.get_u8();
175
176        match value {
177            0 => *self = false,
178            1 => *self = true,
179            _ => {
180                return Err(Error::new(ErrorKind::InvalidData, "not valid bool value"));
181            }
182        };
183
184        Ok(())
185    }
186}
187
188impl Decoder for i8 {
189    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
190    where
191        T: Buf,
192    {
193        if src.remaining() < 1 {
194            return Err(Error::new(
195                ErrorKind::UnexpectedEof,
196                "not enough buf for i8",
197            ));
198        }
199        let value = src.get_i8();
200        *self = value;
201        Ok(())
202    }
203}
204
205impl Decoder for u8 {
206    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
207    where
208        T: Buf,
209    {
210        if src.remaining() < 1 {
211            return Err(Error::new(
212                ErrorKind::UnexpectedEof,
213                "not enough buf for u8",
214            ));
215        }
216        let value = src.get_u8();
217        *self = value;
218        Ok(())
219    }
220}
221
222impl Decoder for i16 {
223    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
224    where
225        T: Buf,
226    {
227        if src.remaining() < 2 {
228            return Err(Error::new(ErrorKind::UnexpectedEof, "can't read i16"));
229        }
230        let value = src.get_i16();
231        *self = value;
232        Ok(())
233    }
234}
235
236impl Decoder for u16 {
237    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
238    where
239        T: Buf,
240    {
241        if src.remaining() < 2 {
242            return Err(Error::new(ErrorKind::UnexpectedEof, "can't read u16"));
243        }
244        let value = src.get_u16();
245        *self = value;
246        Ok(())
247    }
248}
249
250impl Decoder for Option<u16> {
251    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
252    where
253        T: Buf,
254    {
255        if src.remaining() < 1 {
256            return Err(Error::new(
257                ErrorKind::UnexpectedEof,
258                "can't read option flag for u16",
259            ));
260        }
261        let some_or_none = src.get_i8();
262        if some_or_none == 0 {
263            *self = None;
264            return Ok(());
265        }
266
267        if src.remaining() < 2 {
268            return Err(Error::new(
269                ErrorKind::UnexpectedEof,
270                "can't read Option<u16>",
271            ));
272        }
273        let value = src.get_u16();
274        *self = Some(value);
275        Ok(())
276    }
277}
278
279impl Decoder for i32 {
280    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
281    where
282        T: Buf,
283    {
284        if src.remaining() < 4 {
285            return Err(Error::new(ErrorKind::UnexpectedEof, "can't read i32"));
286        }
287        let value = src.get_i32();
288        trace!("i32: {:#x} => {}", &value, &value);
289        *self = value;
290        Ok(())
291    }
292}
293
294impl Decoder for u32 {
295    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
296    where
297        T: Buf,
298    {
299        if src.remaining() < 4 {
300            return Err(Error::new(ErrorKind::UnexpectedEof, "can't read u32"));
301        }
302        let value = src.get_u32();
303        trace!("u32: {:#x} => {}", &value, &value);
304        *self = value;
305        Ok(())
306    }
307}
308
309impl Decoder for i64 {
310    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
311    where
312        T: Buf,
313    {
314        if src.remaining() < 4 {
315            return Err(Error::new(ErrorKind::UnexpectedEof, "can't read i64"));
316        }
317        let value = src.get_i64();
318        trace!("i64: {:#x} => {}", &value, &value);
319        *self = value;
320        Ok(())
321    }
322}
323
324impl DecoderVarInt for i64 {
325    fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
326    where
327        T: Buf,
328    {
329        let (value, _) = varint_decode(src)?;
330        *self = value;
331        Ok(())
332    }
333}
334
335impl Decoder for Option<String> {
336    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
337    where
338        T: Buf,
339    {
340        let mut len: i16 = 0;
341        len.decode(src, version)?;
342        if len < 0 {
343            *self = None;
344            return Ok(());
345        }
346
347        if len == 0 {
348            *self = Some(String::default());
349        }
350
351        let value = decode_string(len, src)?;
352        *self = Some(value);
353        Ok(())
354    }
355}
356
357fn decode_string<T>(len: i16, src: &mut T) -> Result<String, Error>
358where
359    T: Buf,
360{
361    let mut value = String::default();
362    let read_size = src.take(len as usize).reader().read_to_string(&mut value)?;
363
364    if read_size != len as usize {
365        return Err(Error::new(ErrorKind::UnexpectedEof, "not enough string"));
366    }
367    Ok(value)
368}
369
370impl Decoder for String {
371    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
372    where
373        T: Buf,
374    {
375        if src.remaining() < 2 {
376            return Err(Error::new(
377                ErrorKind::UnexpectedEof,
378                "can't read string length",
379            ));
380        }
381        let len = src.get_i16();
382        if len <= 0 {
383            return Ok(());
384        }
385
386        let value = decode_string(len, src)?;
387        *self = value;
388        Ok(())
389    }
390}
391
392impl Decoder for Vec<u8> {
393    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
394    where
395        T: Buf,
396    {
397        let mut len: i32 = 0;
398        len.decode(src, version)?;
399
400        trace!("decoding Vec len:{}", len);
401
402        if len < 0 {
403            trace!("negative length, treat as empty values");
404            return Ok(());
405        }
406
407        if src.remaining() < len as usize {
408            return Err(Error::new(ErrorKind::UnexpectedEof, "not enought bytes"));
409        }
410
411        let mut buf = src.take(len as usize);
412        self.put(&mut buf);
413        if self.len() != len as usize {
414            return Err(Error::new(
415                ErrorKind::UnexpectedEof,
416                format!(
417                    "varint: Vec<u8>>, expecting {} but received: {}",
418                    len,
419                    self.len()
420                ),
421            ));
422        }
423
424        Ok(())
425    }
426}
427
428impl DecoderVarInt for Vec<u8> {
429    fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
430    where
431        T: Buf,
432    {
433        let mut len: i64 = 0;
434        len.decode_varint(src)?;
435
436        if len < 1 {
437            return Ok(());
438        }
439
440        let mut buf = src.take(len as usize);
441        self.put(&mut buf);
442        if self.len() != len as usize {
443            return Err(Error::new(
444                ErrorKind::UnexpectedEof,
445                format!(
446                    "varint: Vec<u8>>, expecting {} but received: {}",
447                    len,
448                    self.len()
449                ),
450            ));
451        }
452
453        Ok(())
454    }
455}
456
457fn decode_option_vec_u<T>(array: &mut Option<Vec<u8>>, src: &mut T, len: isize) -> Result<(), Error>
458where
459    T: Buf,
460{
461    if len < 0 {
462        *array = None;
463        return Ok(());
464    }
465
466    if len == 0 {
467        *array = Some(Vec::new());
468        return Ok(());
469    }
470
471    let mut buf = src.take(len as usize);
472    let mut value: Vec<u8> = Vec::new();
473    value.put(&mut buf);
474    if value.len() != len as usize {
475        return Err(Error::new(
476            ErrorKind::UnexpectedEof,
477            format!(
478                "Option<Vec<u8>>>, expecting {} but received: {}",
479                len,
480                value.len()
481            ),
482        ));
483    }
484
485    *array = Some(value);
486
487    Ok(())
488}
489
490impl DecoderVarInt for Option<Vec<u8>> {
491    fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
492    where
493        T: Buf,
494    {
495        let mut len: i64 = 0;
496        len.decode_varint(src)?;
497
498        decode_option_vec_u(self, src, len as isize)
499    }
500}
501
502#[cfg(test)]
503mod test {
504
505    use crate::Decoder;
506    use crate::DecoderVarInt;
507    use crate::Version;
508    use bytes::Buf;
509    use std::io::Cursor;
510    use std::io::Error;
511
512    #[test]
513    fn test_decode_i18_not_enough() {
514        let data = []; // no values
515        let mut value: i8 = 0;
516        let result = value.decode(&mut Cursor::new(&data), 0);
517        assert!(result.is_err());
518    }
519
520    #[test]
521    fn test_decode_i8() {
522        let data = [0x12];
523
524        let mut value: i8 = 0;
525        let result = value.decode(&mut Cursor::new(&data), 0);
526        assert!(result.is_ok());
527        assert_eq!(value, 18);
528    }
529
530    #[test]
531    fn test_decode_u18_not_enough() {
532        let data = []; // no values
533        let mut value: u8 = 0;
534        let result = value.decode(&mut Cursor::new(&data), 0);
535        assert!(result.is_err());
536    }
537
538    #[test]
539    fn test_decode_u8() {
540        let data = [0x12];
541
542        let mut value: u8 = 0;
543        let result = value.decode(&mut Cursor::new(&data), 0);
544        assert!(result.is_ok());
545        assert_eq!(value, 18);
546    }
547
548    #[test]
549    fn test_decode_i16_not_enough() {
550        let data = [0x11]; // only one value
551
552        let mut value: i16 = 0;
553        let result = value.decode(&mut Cursor::new(&data), 0);
554        assert!(result.is_err());
555    }
556
557    #[test]
558    fn test_decode_i16() {
559        let data = [0x00, 0x05];
560
561        let mut value: i16 = 0;
562        let result = value.decode(&mut Cursor::new(&data), 0);
563        assert!(result.is_ok());
564        assert_eq!(value, 5);
565    }
566
567    #[test]
568    fn test_decode_u16_not_enough() {
569        let data = [0x11]; // only one value
570
571        let mut value: i16 = 0;
572        let result = value.decode(&mut Cursor::new(&data), 0);
573        assert!(result.is_err());
574    }
575
576    #[test]
577    fn test_decode_u16() {
578        let data = [0x00, 0x05];
579
580        let mut value: u16 = 0;
581        let result = value.decode(&mut Cursor::new(&data), 0);
582        assert!(result.is_ok());
583        assert_eq!(value, 5);
584    }
585
586    #[test]
587    fn test_decode_option_u16_none() {
588        let data = [0x00];
589
590        let mut value: Option<u16> = None;
591        let result = value.decode(&mut Cursor::new(&data), 0);
592        assert!(result.is_ok());
593        assert_eq!(value, None);
594    }
595
596    #[test]
597    fn test_decode_option_u16_val() {
598        let data = [0x01, 0x00, 0x10];
599
600        let mut value: Option<u16> = None;
601        let result = value.decode(&mut Cursor::new(&data), 0);
602        assert!(result.is_ok());
603        assert_eq!(value, Some(16));
604    }
605
606    #[test]
607    fn test_decode_i32_not_enough() {
608        let data = [0x11, 0x11, 0x00]; // still need one more
609
610        let mut value: i32 = 0;
611        let result = value.decode(&mut Cursor::new(&data), 0);
612        assert!(result.is_err());
613    }
614
615    #[test]
616    fn test_decode_i32() {
617        let data = [0x00, 0x00, 0x00, 0x10];
618
619        let mut value: i32 = 0;
620        let result = value.decode(&mut Cursor::new(&data), 0);
621        assert!(result.is_ok());
622        assert_eq!(value, 16);
623    }
624
625    #[test]
626    fn test_decode_i32_2() {
627        let data = [0x00, 0x00, 0x00, 0x01];
628
629        let mut value: i32 = 0;
630        let result = value.decode(&mut Cursor::new(&data), 0);
631        assert!(result.is_ok());
632        assert_eq!(value, 1);
633    }
634
635    #[test]
636    fn test_decode_i64_not_enough() {
637        let data = [0x11, 0x11, 0x00]; // still need one more
638
639        let mut value: i64 = 0;
640        let result = value.decode(&mut Cursor::new(&data), 0);
641        assert!(result.is_err());
642    }
643
644    #[test]
645    fn test_decode_i64() {
646        let data = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x20];
647
648        let mut value: i64 = 0;
649        let result = value.decode(&mut Cursor::new(&data), 0);
650        assert!(result.is_ok());
651        assert_eq!(value, 32);
652    }
653
654    #[test]
655    fn test_decode_invalid_string_not_len() {
656        let data = [0x11]; // doesn't have right bytes
657
658        let mut value = String::from("");
659        let result = value.decode(&mut Cursor::new(&data), 0);
660        assert!(result.is_err());
661    }
662
663    #[test]
664    fn test_decode_invalid_string() {
665        let data = [0x00, 0x0a, 0x63]; // len and string doesn't match
666
667        let mut value = String::from("");
668        let result = value.decode(&mut Cursor::new(&data), 0);
669        assert!(result.is_err());
670    }
671
672    #[test]
673    fn test_decode_null_option_string() {
674        let data = [0xff, 0xff]; // len and string doesn't match
675
676        let mut value: Option<String> = Some(String::from("test"));
677        let result = value.decode(&mut Cursor::new(&data), 0);
678        assert!(result.is_ok());
679        assert!(value.is_none());
680    }
681
682    #[test]
683    fn test_decode_some_option_string() {
684        let data = [0x00, 0x02, 0x77, 0x6f]; // len and string doesn't match
685
686        let mut value: Option<String> = None;
687        let result = value.decode(&mut Cursor::new(&data), 0);
688        assert!(result.is_ok());
689        assert!(value.is_some());
690        assert_eq!(value.unwrap(), "wo");
691    }
692
693    #[test]
694    fn test_decode_string_existing_value() {
695        let src = [0x0, 0x7, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x30];
696        let mut decode_target = "123".to_string();
697        let result = decode_target.decode(&mut Cursor::new(&src), 0);
698        assert!(result.is_ok());
699        assert_eq!(decode_target, "0.0.0.0".to_string());
700    }
701
702    #[test]
703    fn test_decode_string() {
704        let data = [
705            0x00, 0x0a, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x2d, 0x31,
706        ];
707
708        let mut value = String::from("");
709        let result = value.decode(&mut Cursor::new(&data), 0);
710        assert!(result.is_ok());
711        assert_eq!(value, "consumer-1");
712    }
713
714    #[test]
715    fn test_decode_bool_not_enough() {
716        let data = []; // no values
717
718        let mut value: bool = false;
719        let result = value.decode(&mut Cursor::new(&data), 0);
720        assert!(result.is_err());
721    }
722
723    #[test]
724    fn test_decode_bool() {
725        let data = [0x1];
726
727        let mut value: bool = false;
728        let result = value.decode(&mut Cursor::new(&data), 0);
729        assert!(result.is_ok());
730        assert_eq!(value, true);
731    }
732
733    #[test]
734    fn test_decode_bool_invalid_value() {
735        let data = [0x23]; // not bool
736
737        let mut value: bool = false;
738        let result = value.decode(&mut Cursor::new(&data), 0);
739        assert!(result.is_err());
740    }
741
742    #[test]
743    fn test_decode_valid_string_vectors() {
744        // array of strings with "test"
745        let data = [0x00, 0x00, 0x00, 0x01, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74];
746
747        let mut values: Vec<String> = Vec::new();
748        let result = values.decode(&mut Cursor::new(&data), 0);
749        assert!(result.is_ok());
750        assert_eq!(values.len(), 1);
751        let first_str = &values[0];
752        assert_eq!(first_str, "test");
753    }
754
755    #[test]
756    fn test_decode_varint_trait() {
757        let data = [0x7e];
758
759        let mut value: i64 = 0;
760        let result = value.decode_varint(&mut Cursor::new(&data));
761        assert!(result.is_ok());
762        assert_eq!(value, 63);
763    }
764
765    #[test]
766    fn test_decode_varint_vec8() {
767        let data = [0x06, 0x64, 0x6f, 0x67];
768
769        let mut value: Vec<u8> = Vec::new();
770        let result = value.decode_varint(&mut Cursor::new(&data));
771        assert!(result.is_ok());
772        assert_eq!(value.len(), 3);
773        assert_eq!(value[0], 0x64);
774    }
775
776    #[test]
777    fn test_decode_varint_vec8_fail() {
778        let data = [0x06, 0x64, 0x6f];
779
780        let mut value: Vec<u8> = Vec::new();
781        let result = value.decode_varint(&mut Cursor::new(&data));
782        assert!(result.is_err());
783    }
784
785    #[test]
786    fn test_decode_varint_array_option_vec8_null() {
787        let data = [0x01];
788
789        let mut value: Option<Vec<u8>> = Some(Vec::new());
790        let result = value.decode_varint(&mut Cursor::new(&data));
791        assert!(result.is_ok());
792        assert!(value.is_none());
793    }
794
795    #[test]
796    fn test_varint_decode_array_opton_vec8_simple_array() {
797        let data = [0x06, 0x64, 0x6f, 0x67, 0x00]; // should only read first 3
798
799        let mut value: Option<Vec<u8>> = Some(Vec::new());
800        let result = value.decode_varint(&mut Cursor::new(&data));
801        assert!(result.is_ok());
802        assert!(value.is_some());
803        let array = value.unwrap();
804        assert_eq!(array.len(), 3);
805        assert_eq!(array[0], 0x64);
806    }
807
808    #[derive(Default)]
809    struct TestRecord {
810        value: i8,
811        value2: i8,
812    }
813
814    impl Decoder for TestRecord {
815        fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
816        where
817            T: Buf,
818        {
819            self.value.decode(src, 0)?;
820            if version > 1 {
821                self.value2.decode(src, 0)?;
822            }
823            Ok(())
824        }
825    }
826
827    #[test]
828    fn test_decoding_struct() {
829        let data = [0x06];
830
831        // v1
832        let result = TestRecord::decode_from(&mut Cursor::new(&data), 0);
833        assert!(result.is_ok());
834        let record = result.unwrap();
835        assert_eq!(record.value, 6);
836        assert_eq!(record.value2, 0);
837
838        // v2
839        let data2 = [0x06, 0x09];
840        let record2 = TestRecord::decode_from(&mut Cursor::new(&data2), 2).expect("decode");
841        assert_eq!(record2.value, 6);
842        assert_eq!(record2.value2, 9);
843    }
844}