fluvio_protocol_core/
decoder.rs

1use std::cmp::Ord;
2use std::collections::BTreeMap;
3use std::io::Error;
4use std::io::ErrorKind;
5use std::io::Read;
6use std::marker::PhantomData;
7
8use bytes::Buf;
9use bytes::BufMut;
10use log::trace;
11
12use super::varint::varint_decode;
13use crate::Version;
14
15// trait for encoding and decoding using Kafka Protocol
16pub trait Decoder: Sized + Default {
17    /// decode Kafka compliant protocol values from buf
18    fn decode_from<T>(src: &mut T, version: Version) -> Result<Self, Error>
19    where
20        T: Buf,
21        Self: Default,
22    {
23        let mut decoder = Self::default();
24        decoder.decode(src, version)?;
25        Ok(decoder)
26    }
27
28    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
29    where
30        T: Buf;
31}
32
33pub trait DecoderVarInt {
34    fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
35    where
36        T: Buf;
37}
38
39impl<M> Decoder for Vec<M>
40where
41    M: Default + Decoder,
42{
43    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
44    where
45        T: Buf,
46    {
47        let mut len: i32 = 0;
48        len.decode(src, version)?;
49
50        trace!("decoding Vec len:{}", len);
51
52        if len < 1 {
53            trace!("negative length, skipping");
54            return Ok(());
55        }
56
57        decode_vec(len, self, src, version)?;
58
59        Ok(())
60    }
61}
62
63fn decode_vec<T, M>(len: i32, item: &mut Vec<M>, src: &mut T, version: Version) -> Result<(), Error>
64where
65    T: Buf,
66    M: Default + Decoder,
67{
68    for _ in 0..len {
69        let mut value = <M>::default();
70        value.decode(src, version)?;
71        item.push(value);
72    }
73
74    Ok(())
75}
76
77impl<M> Decoder for Option<M>
78where
79    M: Default + Decoder,
80{
81    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
82    where
83        T: Buf,
84    {
85        let mut some = false;
86        some.decode(src, version)?;
87        if some {
88            let mut value = <M>::default();
89            value.decode(src, version)?;
90            *self = Some(value)
91        } else {
92            *self = None
93        }
94        Ok(())
95    }
96}
97
98impl<M> Decoder for PhantomData<M>
99where
100    M: Default + Decoder,
101{
102    fn decode<T>(&mut self, _src: &mut T, _version: Version) -> Result<(), Error>
103    where
104        T: Buf,
105    {
106        Ok(())
107    }
108}
109
110impl<K, V> Decoder for BTreeMap<K, V>
111where
112    K: Decoder + Ord,
113    V: Decoder,
114{
115    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
116    where
117        T: Buf,
118    {
119        let mut len: u16 = 0;
120        len.decode(src, version)?;
121
122        let mut map: BTreeMap<K, V> = BTreeMap::new();
123        for _i in 0..len {
124            let mut key = K::default();
125            key.decode(src, version)?;
126            let mut value = V::default();
127            value.decode(src, version)?;
128            map.insert(key, value);
129        }
130
131        *self = map;
132        Ok(())
133    }
134}
135
136impl Decoder for bool {
137    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
138    where
139        T: Buf,
140    {
141        if src.remaining() < 1 {
142            return Err(Error::new(
143                ErrorKind::UnexpectedEof,
144                "not enough buf for bool",
145            ));
146        }
147        let value = src.get_u8();
148
149        match value {
150            0 => *self = false,
151            1 => *self = true,
152            _ => {
153                return Err(Error::new(ErrorKind::InvalidData, "not valid bool value"));
154            }
155        };
156
157        Ok(())
158    }
159}
160
161impl Decoder for i8 {
162    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
163    where
164        T: Buf,
165    {
166        if src.remaining() < 1 {
167            return Err(Error::new(
168                ErrorKind::UnexpectedEof,
169                "not enough buf for i8",
170            ));
171        }
172        let value = src.get_i8();
173        *self = value;
174        Ok(())
175    }
176}
177
178impl Decoder for u8 {
179    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
180    where
181        T: Buf,
182    {
183        if src.remaining() < 1 {
184            return Err(Error::new(
185                ErrorKind::UnexpectedEof,
186                "not enough buf for u8",
187            ));
188        }
189        let value = src.get_u8();
190        *self = value;
191        Ok(())
192    }
193}
194
195impl Decoder for i16 {
196    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
197    where
198        T: Buf,
199    {
200        if src.remaining() < 2 {
201            return Err(Error::new(ErrorKind::UnexpectedEof, "can't read i16"));
202        }
203        let value = src.get_i16();
204        *self = value;
205        Ok(())
206    }
207}
208
209impl Decoder for u16 {
210    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
211    where
212        T: Buf,
213    {
214        if src.remaining() < 2 {
215            return Err(Error::new(ErrorKind::UnexpectedEof, "can't read u16"));
216        }
217        let value = src.get_u16();
218        *self = value;
219        Ok(())
220    }
221}
222
223impl Decoder for i32 {
224    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
225    where
226        T: Buf,
227    {
228        if src.remaining() < 4 {
229            return Err(Error::new(ErrorKind::UnexpectedEof, "can't read i32"));
230        }
231        let value = src.get_i32();
232        trace!("i32: {:#x} => {}", &value, &value);
233        *self = value;
234        Ok(())
235    }
236}
237
238impl Decoder for u32 {
239    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
240    where
241        T: Buf,
242    {
243        if src.remaining() < 4 {
244            return Err(Error::new(ErrorKind::UnexpectedEof, "can't read u32"));
245        }
246        let value = src.get_u32();
247        trace!("u32: {:#x} => {}", &value, &value);
248        *self = value;
249        Ok(())
250    }
251}
252
253impl Decoder for i64 {
254    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
255    where
256        T: Buf,
257    {
258        if src.remaining() < 8 {
259            return Err(Error::new(ErrorKind::UnexpectedEof, "can't read i64"));
260        }
261        let value = src.get_i64();
262        trace!("i64: {:#x} => {}", &value, &value);
263        *self = value;
264        Ok(())
265    }
266}
267
268impl DecoderVarInt for i64 {
269    fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
270    where
271        T: Buf,
272    {
273        let (value, _) = varint_decode(src)?;
274        *self = value;
275        Ok(())
276    }
277}
278
279fn decode_string<T>(len: i16, src: &mut T) -> Result<String, Error>
280where
281    T: Buf,
282{
283    let mut value = String::default();
284    let read_size = src.take(len as usize).reader().read_to_string(&mut value)?;
285
286    if read_size != len as usize {
287        return Err(Error::new(ErrorKind::UnexpectedEof, "not enough string"));
288    }
289    Ok(value)
290}
291
292impl Decoder for String {
293    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
294    where
295        T: Buf,
296    {
297        if src.remaining() < 2 {
298            return Err(Error::new(
299                ErrorKind::UnexpectedEof,
300                "can't read string length",
301            ));
302        }
303        let len = src.get_i16();
304        if len <= 0 {
305            return Ok(());
306        }
307
308        let value = decode_string(len, src)?;
309        *self = value;
310        Ok(())
311    }
312}
313
314impl DecoderVarInt for Vec<u8> {
315    fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
316    where
317        T: Buf,
318    {
319        let mut len: i64 = 0;
320        len.decode_varint(src)?;
321
322        if len < 1 {
323            return Ok(());
324        }
325
326        let mut buf = src.take(len as usize);
327        self.put(&mut buf);
328        if self.len() != len as usize {
329            return Err(Error::new(
330                ErrorKind::UnexpectedEof,
331                format!(
332                    "varint: Vec<u8>>, expecting {} but received: {}",
333                    len,
334                    self.len()
335                ),
336            ));
337        }
338
339        Ok(())
340    }
341}
342
343fn decode_option_vec_u<T>(array: &mut Option<Vec<u8>>, src: &mut T, len: isize) -> Result<(), Error>
344where
345    T: Buf,
346{
347    if len < 0 {
348        *array = None;
349        return Ok(());
350    }
351
352    if len == 0 {
353        *array = Some(Vec::new());
354        return Ok(());
355    }
356
357    let mut buf = src.take(len as usize);
358    let mut value: Vec<u8> = Vec::new();
359    value.put(&mut buf);
360    if value.len() != len as usize {
361        return Err(Error::new(
362            ErrorKind::UnexpectedEof,
363            format!(
364                "Option<Vec<u8>>>, expecting {} but received: {}",
365                len,
366                value.len()
367            ),
368        ));
369    }
370
371    *array = Some(value);
372
373    Ok(())
374}
375
376impl DecoderVarInt for Option<Vec<u8>> {
377    fn decode_varint<T>(&mut self, src: &mut T) -> Result<(), Error>
378    where
379        T: Buf,
380    {
381        let mut len: i64 = 0;
382        len.decode_varint(src)?;
383
384        decode_option_vec_u(self, src, len as isize)
385    }
386}
387
388#[cfg(test)]
389mod test {
390
391    use crate::Decoder;
392    use crate::DecoderVarInt;
393    use crate::Version;
394    use bytes::Buf;
395    use std::io::Cursor;
396    use std::io::Error;
397
398    #[test]
399    fn test_decode_i18_not_enough() {
400        let data = []; // no values
401        let mut value: i8 = 0;
402        let result = value.decode(&mut Cursor::new(&data), 0);
403        assert!(result.is_err());
404    }
405
406    #[test]
407    fn test_decode_i8() {
408        let data = [0x12];
409
410        let mut value: i8 = 0;
411        let result = value.decode(&mut Cursor::new(&data), 0);
412        assert!(result.is_ok());
413        assert_eq!(value, 18);
414    }
415
416    #[test]
417    fn test_decode_u18_not_enough() {
418        let data = []; // no values
419        let mut value: u8 = 0;
420        let result = value.decode(&mut Cursor::new(&data), 0);
421        assert!(result.is_err());
422    }
423
424    #[test]
425    fn test_decode_u8() {
426        let data = [0x12];
427
428        let mut value: u8 = 0;
429        let result = value.decode(&mut Cursor::new(&data), 0);
430        assert!(result.is_ok());
431        assert_eq!(value, 18);
432    }
433
434    #[test]
435    fn test_decode_i16_not_enough() {
436        let data = [0x11]; // only one value
437
438        let mut value: i16 = 0;
439        let result = value.decode(&mut Cursor::new(&data), 0);
440        assert!(result.is_err());
441    }
442
443    #[test]
444    fn test_decode_i16() {
445        let data = [0x00, 0x05];
446
447        let mut value: i16 = 0;
448        let result = value.decode(&mut Cursor::new(&data), 0);
449        assert!(result.is_ok());
450        assert_eq!(value, 5);
451    }
452
453    #[test]
454    fn test_decode_u16_not_enough() {
455        let data = [0x11]; // only one value
456
457        let mut value: i16 = 0;
458        let result = value.decode(&mut Cursor::new(&data), 0);
459        assert!(result.is_err());
460    }
461
462    #[test]
463    fn test_decode_u16() {
464        let data = [0x00, 0x05];
465
466        let mut value: u16 = 0;
467        let result = value.decode(&mut Cursor::new(&data), 0);
468        assert!(result.is_ok());
469        assert_eq!(value, 5);
470    }
471
472    #[test]
473    fn test_decode_option_u16_none() {
474        let data = [0x00];
475
476        let mut value: Option<u16> = None;
477        let result = value.decode(&mut Cursor::new(&data), 0);
478        assert!(result.is_ok());
479        assert_eq!(value, None);
480    }
481
482    #[test]
483    fn test_decode_option_u16_val() {
484        let data = [0x01, 0x00, 0x10];
485
486        let mut value: Option<u16> = None;
487        let result = value.decode(&mut Cursor::new(&data), 0);
488        assert!(result.is_ok());
489        assert_eq!(value, Some(16));
490    }
491
492    #[test]
493    fn test_decode_i32_not_enough() {
494        let data = [0x11, 0x11, 0x00]; // still need one more
495
496        let mut value: i32 = 0;
497        let result = value.decode(&mut Cursor::new(&data), 0);
498        assert!(result.is_err());
499    }
500
501    #[test]
502    fn test_decode_i32() {
503        let data = [0x00, 0x00, 0x00, 0x10];
504
505        let mut value: i32 = 0;
506        let result = value.decode(&mut Cursor::new(&data), 0);
507        assert!(result.is_ok());
508        assert_eq!(value, 16);
509    }
510
511    #[test]
512    fn test_decode_i32_2() {
513        let data = [0x00, 0x00, 0x00, 0x01];
514
515        let mut value: i32 = 0;
516        let result = value.decode(&mut Cursor::new(&data), 0);
517        assert!(result.is_ok());
518        assert_eq!(value, 1);
519    }
520
521    #[test]
522    fn test_decode_i64_not_enough() {
523        let data = [0x11, 0x11, 0x00]; // still need one more
524
525        let mut value: i64 = 0;
526        let result = value.decode(&mut Cursor::new(&data), 0);
527        assert!(result.is_err());
528    }
529
530    #[test]
531    fn test_decode_i64() {
532        let data = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x20];
533
534        let mut value: i64 = 0;
535        let result = value.decode(&mut Cursor::new(&data), 0);
536        assert!(result.is_ok());
537        assert_eq!(value, 32);
538    }
539
540    #[test]
541    fn test_decode_invalid_string_not_len() {
542        let data = [0x11]; // doesn't have right bytes
543
544        let mut value = String::from("");
545        let result = value.decode(&mut Cursor::new(&data), 0);
546        assert!(result.is_err());
547    }
548
549    #[test]
550    fn test_decode_invalid_string() {
551        let data = [0x00, 0x0a, 0x63]; // len and string doesn't match
552
553        let mut value = String::from("");
554        let result = value.decode(&mut Cursor::new(&data), 0);
555        assert!(result.is_err());
556    }
557
558    #[test]
559    fn test_decode_null_option_string() {
560        let data = [0x00]; // len and string doesn't match
561
562        let mut value: Option<String> = Some(String::from("test"));
563        let result = value.decode(&mut Cursor::new(&data), 0);
564        assert!(result.is_ok());
565        assert!(value.is_none());
566    }
567
568    #[test]
569    fn test_decode_some_option_string() {
570        let data = [0x01, 0x00, 0x02, 0x77, 0x6f]; // len and string doesn't match
571
572        let mut value: Option<String> = None;
573        let result = value.decode(&mut Cursor::new(&data), 0);
574        assert!(result.is_ok());
575        assert!(value.is_some());
576        assert_eq!(value.unwrap(), "wo");
577    }
578
579    #[test]
580    fn test_decode_string_existing_value() {
581        let src = [0x0, 0x7, 0x30, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x30];
582        let mut decode_target = "123".to_string();
583        let result = decode_target.decode(&mut Cursor::new(&src), 0);
584        assert!(result.is_ok());
585        assert_eq!(decode_target, "0.0.0.0".to_string());
586    }
587
588    #[test]
589    fn test_decode_string() {
590        let data = [
591            0x00, 0x0a, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x2d, 0x31,
592        ];
593
594        let mut value = String::from("");
595        let result = value.decode(&mut Cursor::new(&data), 0);
596        assert!(result.is_ok());
597        assert_eq!(value, "consumer-1");
598    }
599
600    #[test]
601    fn test_decode_bool_not_enough() {
602        let data = []; // no values
603
604        let mut value: bool = false;
605        let result = value.decode(&mut Cursor::new(&data), 0);
606        assert!(result.is_err());
607    }
608
609    #[test]
610    fn test_decode_bool() {
611        let data = [0x1];
612
613        let mut value: bool = false;
614        let result = value.decode(&mut Cursor::new(&data), 0);
615        assert!(result.is_ok());
616        assert!(value);
617    }
618
619    #[test]
620    fn test_decode_bool_invalid_value() {
621        let data = [0x23]; // not bool
622
623        let mut value: bool = false;
624        let result = value.decode(&mut Cursor::new(&data), 0);
625        assert!(result.is_err());
626    }
627
628    #[test]
629    fn test_decode_valid_string_vectors() {
630        // array of strings with "test"
631        let data = [0, 0, 0, 0x01, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74];
632
633        let mut values: Vec<String> = Vec::new();
634        let result = values.decode(&mut Cursor::new(&data), 0);
635        assert!(result.is_ok());
636        assert_eq!(values.len(), 1);
637        let first_str = &values[0];
638        assert_eq!(first_str, "test");
639    }
640
641    #[test]
642    fn test_decode_varint_trait() {
643        let data = [0x7e];
644
645        let mut value: i64 = 0;
646        let result = value.decode_varint(&mut Cursor::new(&data));
647        assert!(result.is_ok());
648        assert_eq!(value, 63);
649    }
650
651    #[test]
652    fn test_decode_varint_vec8() {
653        let data = [0x06, 0x64, 0x6f, 0x67];
654
655        let mut value: Vec<u8> = Vec::new();
656        let result = value.decode_varint(&mut Cursor::new(&data));
657        assert!(result.is_ok());
658        assert_eq!(value.len(), 3);
659        assert_eq!(value[0], 0x64);
660    }
661
662    #[test]
663    fn test_vec8_encode_and_decode() {
664        use crate::encoder::Encoder;
665        let in_vec: Vec<u8> = vec![1, 2, 3];
666        let mut out: Vec<u8> = vec![];
667        let ret = in_vec.encode(&mut out, 0);
668        assert!(ret.is_ok());
669    }
670
671    #[test]
672    fn test_decode_varint_vec8_fail() {
673        let data = [0x06, 0x64, 0x6f];
674
675        let mut value: Vec<u8> = Vec::new();
676        let result = value.decode_varint(&mut Cursor::new(&data));
677        assert!(result.is_err());
678    }
679
680    #[test]
681    fn test_varint_decode_array_opton_vec8_simple_array() {
682        let data = [0x06, 0x64, 0x6f, 0x67, 0x00]; // should only read first 3
683
684        let mut value: Option<Vec<u8>> = Some(Vec::new());
685        let result = value.decode_varint(&mut Cursor::new(&data));
686        assert!(result.is_ok());
687        assert!(value.is_some());
688        let array = value.unwrap();
689        assert_eq!(array.len(), 3);
690        assert_eq!(array[0], 0x64);
691    }
692
693    #[derive(Default)]
694    struct TestRecord {
695        value: i8,
696        value2: i8,
697    }
698
699    impl Decoder for TestRecord {
700        fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
701        where
702            T: Buf,
703        {
704            self.value.decode(src, 0)?;
705            if version > 1 {
706                self.value2.decode(src, 0)?;
707            }
708            Ok(())
709        }
710    }
711
712    #[test]
713    fn test_decoding_struct() {
714        let data = [0x06];
715
716        // v1
717        let result = TestRecord::decode_from(&mut Cursor::new(&data), 0);
718        assert!(result.is_ok());
719        let record = result.unwrap();
720        assert_eq!(record.value, 6);
721        assert_eq!(record.value2, 0);
722
723        // v2
724        let data2 = [0x06, 0x09];
725        let record2 = TestRecord::decode_from(&mut Cursor::new(&data2), 2).expect("decode");
726        assert_eq!(record2.value, 6);
727        assert_eq!(record2.value2, 9);
728    }
729}