kf_protocol_core/
encoder.rs

1// decode values
2use std::io::Error;
3use std::io::ErrorKind;
4use std::io::Write;
5use std::collections::BTreeMap;
6use std::marker::PhantomData;
7
8use bytes::BufMut;
9use bytes::BytesMut;
10use bytes::Bytes;
11use bytes::buf::ext::BufMutExt;
12use log::trace;
13
14use crate::Version;
15
16use super::varint::variant_encode;
17use super::varint::variant_size;
18
19// trait for encoding and decoding using Kafka Protocol
20pub trait Encoder {
21    /// size of this object in bytes
22    fn write_size(&self, version: Version) -> usize;
23
24    /// encoding contents for buffer
25    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
26    where
27        T: BufMut;
28
29    fn as_bytes(&self, version: Version) -> Result<Bytes, Error> {
30        trace!("encoding as bytes");
31        let mut out = vec![];
32        self.encode(&mut out, version)?;
33        let mut buf = BytesMut::with_capacity(out.len());
34        buf.put_slice(&out);
35        Ok(buf.freeze())
36    }
37}
38
39pub trait EncoderVarInt {
40    fn var_write_size(&self) -> usize;
41
42    /// encoding contents for buffer
43    fn encode_varint<T>(&self, dest: &mut T) -> Result<(), Error>
44    where
45        T: BufMut;
46}
47
48impl<M> Encoder for Vec<M>
49where
50    M: Encoder,
51{
52    fn write_size(&self, version: Version) -> usize {
53        self.iter()
54            .fold(4, |sum, val| sum + val.write_size(version))
55    }
56
57    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
58    where
59        T: BufMut,
60    {
61        if dest.remaining_mut() < 4 {
62            return Err(Error::new(
63                ErrorKind::UnexpectedEof,
64                "not enough capacity for vec",
65            ));
66        }
67
68        dest.put_u32(self.len() as u32);
69
70        for ref v in self {
71            v.encode(dest, version)?;
72        }
73
74        Ok(())
75    }
76}
77
78impl<M> Encoder for Option<M>
79where
80    M: Encoder,
81{
82    default fn write_size(&self, version: Version) -> usize {
83        match *self {
84            Some(ref value) => true.write_size(version) + value.write_size(version),
85            None => false.write_size(version),
86        }
87    }
88
89    default fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
90    where
91        T: BufMut,
92    {
93        match *self {
94            Some(ref value) => {
95                true.encode(dest, version)?;
96                value.encode(dest, version)
97            }
98            None => false.encode(dest, version),
99        }
100    }
101}
102
103impl<M> Encoder for PhantomData<M>
104where
105    M: Encoder,
106{
107    fn write_size(&self, _version: Version) -> usize {
108        0
109    }
110
111    fn encode<T>(&self, _dest: &mut T, _version: Version) -> Result<(), Error>
112    where
113        T: BufMut,
114    {
115        Ok(())
116    }
117}
118
119impl<M> Encoder for Option<Vec<M>>
120where
121    M: Encoder,
122{
123    fn write_size(&self, version: Version) -> usize {
124        match self {
125            Some(inner) => inner.write_size(version),
126            None => 4,
127        }
128    }
129
130    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
131    where
132        T: BufMut,
133    {
134        if self.is_none() {
135            let len: i32 = -1;
136            len.encode(dest, version)?;
137            trace!("Option<Vec>: None encode as: {:X?}", -1 as i32);
138            return Ok(());
139        }
140
141        let inner = self.as_ref().unwrap();
142        inner.encode(dest, version)
143    }
144}
145
146impl<K, V> Encoder for BTreeMap<K, V>
147where
148    K: Encoder,
149    V: Encoder,
150{
151    fn write_size(&self, version: Version) -> usize {
152        let mut len: usize = (0 as u16).write_size(version);
153
154        for (key, value) in self.iter() {
155            len += key.write_size(version);
156            len += value.write_size(version);
157        }
158
159        len
160    }
161
162    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
163    where
164        T: BufMut,
165    {
166        let len = self.len() as u16;
167        len.encode(dest, version)?;
168
169        for (key, value) in self.iter() {
170            key.encode(dest, version)?;
171            value.encode(dest, version)?;
172        }
173
174        Ok(())
175    }
176}
177
178impl Encoder for bool {
179    fn write_size(&self, _version: Version) -> usize {
180        1
181    }
182
183    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
184    where
185        T: BufMut,
186    {
187        if dest.remaining_mut() < 1 {
188            return Err(Error::new(
189                ErrorKind::UnexpectedEof,
190                "not enough capacity for bool",
191            ));
192        }
193        if *self {
194            dest.put_i8(1);
195        } else {
196            dest.put_i8(0);
197        }
198        Ok(())
199    }
200}
201
202impl Encoder for i8 {
203    fn write_size(&self, _version: Version) -> usize {
204        1
205    }
206
207    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
208    where
209        T: BufMut,
210    {
211        if dest.remaining_mut() < 1 {
212            return Err(Error::new(
213                ErrorKind::UnexpectedEof,
214                "not enough capacity for i8",
215            ));
216        }
217        dest.put_i8(*self);
218        Ok(())
219    }
220}
221
222impl Encoder for u8 {
223    fn write_size(&self, _version: Version) -> usize {
224        1
225    }
226
227    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
228    where
229        T: BufMut,
230    {
231        if dest.remaining_mut() < 1 {
232            return Err(Error::new(
233                ErrorKind::UnexpectedEof,
234                "not enough capacity for i8",
235            ));
236        }
237        dest.put_u8(*self);
238        Ok(())
239    }
240}
241
242impl Encoder for i16 {
243    fn write_size(&self, _version: Version) -> usize {
244        2
245    }
246
247    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
248    where
249        T: BufMut,
250    {
251        if dest.remaining_mut() < 2 {
252            return Err(Error::new(
253                ErrorKind::UnexpectedEof,
254                "not enough capacity for i16",
255            ));
256        }
257        dest.put_i16(*self);
258        trace!("encoding i16: {:#x}", *self);
259        Ok(())
260    }
261}
262
263impl Encoder for u16 {
264    fn write_size(&self, _version: Version) -> usize {
265        2
266    }
267
268    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
269    where
270        T: BufMut,
271    {
272        if dest.remaining_mut() < 2 {
273            return Err(Error::new(
274                ErrorKind::UnexpectedEof,
275                "not enough capacity for u16",
276            ));
277        }
278        dest.put_u16(*self);
279        trace!("encoding u16: {:#x}", *self);
280        Ok(())
281    }
282}
283
284impl Encoder for Option<u16> {
285    fn write_size(&self, _version: Version) -> usize {
286        if self.is_none() {
287            return 1;
288        } else {
289            3
290        }
291    }
292
293    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
294    where
295        T: BufMut,
296    {
297        if dest.remaining_mut() < 1 {
298            return Err(Error::new(
299                ErrorKind::UnexpectedEof,
300                "not enough capacity for len of 1",
301            ));
302        }
303        if self.is_none() {
304            dest.put_i8(0);
305            return Ok(());
306        }
307
308        dest.put_i8(1);
309        if dest.remaining_mut() < 2 {
310            return Err(Error::new(
311                ErrorKind::UnexpectedEof,
312                "not enough capacity for u16",
313            ));
314        }
315        let u16_value = self.as_ref().unwrap();
316        dest.put_u16(*u16_value);
317
318        Ok(())
319    }
320}
321
322impl Encoder for i32 {
323    fn write_size(&self, _version: Version) -> usize {
324        4
325    }
326
327    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
328    where
329        T: BufMut,
330    {
331        if dest.remaining_mut() < 4 {
332            return Err(Error::new(
333                ErrorKind::UnexpectedEof,
334                "not enough capacity for i32",
335            ));
336        }
337        dest.put_i32(*self);
338        trace!("encoding i32: {:#x}", *self);
339        Ok(())
340    }
341}
342
343impl Encoder for u32 {
344    fn write_size(&self, _version: Version) -> usize {
345        4
346    }
347
348    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
349    where
350        T: BufMut,
351    {
352        if dest.remaining_mut() < 4 {
353            return Err(Error::new(
354                ErrorKind::UnexpectedEof,
355                "not enough capacity for u32",
356            ));
357        }
358        dest.put_u32(*self);
359        Ok(())
360    }
361}
362
363impl Encoder for i64 {
364    fn write_size(&self, _version: Version) -> usize {
365        8
366    }
367
368    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
369    where
370        T: BufMut,
371    {
372        if dest.remaining_mut() < 8 {
373            return Err(Error::new(
374                ErrorKind::UnexpectedEof,
375                "not enough capacity for i164",
376            ));
377        }
378        dest.put_i64(*self);
379        Ok(())
380    }
381}
382
383impl EncoderVarInt for i64 {
384    fn var_write_size(&self) -> usize {
385        variant_size(*self)
386    }
387
388    fn encode_varint<T>(&self, dest: &mut T) -> Result<(), Error>
389    where
390        T: BufMut,
391    {
392        variant_encode(dest, *self)?;
393        Ok(())
394    }
395}
396
397impl Encoder for Option<String> {
398    fn write_size(&self, _version: Version) -> usize {
399        if self.is_none() {
400            2
401        } else {
402            2 + self.as_ref().unwrap().len()
403        }
404    }
405
406    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
407    where
408        T: BufMut,
409    {
410        if self.is_none() {
411            let len: i16 = -1;
412            len.encode(dest, version)?;
413            trace!("Option<String>: None encode as: {:X?}", -1 as i16);
414            return Ok(());
415        }
416
417        let str_value = self.as_ref().unwrap();
418
419        str_value.encode(dest, version)
420    }
421}
422
423impl Encoder for String {
424    fn write_size(&self, _version: Version) -> usize {
425        2 + self.len()
426    }
427
428    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
429    where
430        T: BufMut,
431    {
432        if dest.remaining_mut() < 2 + self.len() {
433            return Err(Error::new(
434                ErrorKind::UnexpectedEof,
435                "not enough capacity for string",
436            ));
437        }
438
439        dest.put_u16(self.len() as u16);
440
441        let mut writer = dest.writer();
442        let bytes_written = writer.write(self.as_bytes())?;
443
444        if bytes_written != self.len() {
445            return Err(Error::new(
446                ErrorKind::UnexpectedEof,
447                format!(
448                    "out of {} bytes, {} not written",
449                    self.len(),
450                    self.len() - bytes_written
451                ),
452            ));
453        }
454
455        Ok(())
456    }
457}
458
459impl EncoderVarInt for Option<Vec<u8>> {
460    fn var_write_size(&self) -> usize {
461        if self.is_none() {
462            let len: i64 = -1;
463            return variant_size(len);
464        }
465
466        let b_values = self.as_ref().unwrap();
467
468        let len: i64 = b_values.len() as i64;
469        let bytes = variant_size(len);
470
471        bytes + b_values.len()
472    }
473
474    fn encode_varint<T>(&self, dest: &mut T) -> Result<(), Error>
475    where
476        T: BufMut,
477    {
478        if self.is_none() {
479            let len: i64 = -1;
480            variant_encode(dest, len)?;
481            return Ok(());
482        }
483
484        let b_values = self.as_ref().unwrap();
485
486        let len: i64 = b_values.len() as i64;
487        len.encode_varint(dest)?;
488
489        if dest.remaining_mut() < b_values.len() {
490            return Err(Error::new(
491                ErrorKind::UnexpectedEof,
492                format!("not enough capacity for byte array: {}", b_values.len()),
493            ));
494        }
495
496        dest.put_slice(b_values);
497
498        Ok(())
499    }
500}
501
502#[cfg(test)]
503mod test {
504
505    use bytes::BufMut;
506    use std::io::Error as IoError;
507
508    use crate::Encoder;
509    use crate::EncoderVarInt;
510    use crate::Version;
511
512    #[test]
513    fn test_encode_i8() {
514        let mut dest = vec![];
515        let value: i8 = 5;
516        let result = value.encode(&mut dest, 0);
517        assert!(result.is_ok());
518        assert_eq!(dest.len(), 1);
519        assert_eq!(dest[0], 0x05);
520        assert_eq!(value.write_size(0), 1);
521    }
522
523    #[test]
524    fn test_encode_u8() {
525        let mut dest = vec![];
526        let value: u8 = 8;
527        let result = value.encode(&mut dest, 0);
528        assert!(result.is_ok());
529        assert_eq!(dest.len(), 1);
530        assert_eq!(dest[0], 0x08);
531        assert_eq!(value.write_size(0), 1);
532    }
533
534
535
536    #[test]
537    fn test_encode_i16() {
538        let mut dest = vec![];
539        let value: i16 = 5;
540        let result = value.encode(&mut dest, 0);
541        assert!(result.is_ok());
542        assert_eq!(dest.len(), 2);
543        assert_eq!(dest[0], 0x00);
544        assert_eq!(dest[1], 0x05);
545        assert_eq!(value.write_size(0), 2);
546    }
547
548    #[test]
549    fn test_encode_u16() {
550        let mut dest = vec![];
551        let value: u16 = 16;
552        let result = value.encode(&mut dest, 0);
553        assert!(result.is_ok());
554        assert_eq!(dest.len(), 2);
555        assert_eq!(dest[0], 0x00);
556        assert_eq!(dest[1], 0x10);
557        assert_eq!(value.write_size(0), 2);
558    }
559
560    #[test]
561    fn test_encode_option_u16_none() {
562        let mut dest = vec![];
563        let value: Option<u16> = None;
564        let result = value.encode(&mut dest, 0);
565        assert!(result.is_ok());
566        assert_eq!(dest.len(), 1);
567        assert_eq!(dest[0], 0x00);
568        assert_eq!(value.write_size(0), 1);
569    }
570
571    #[test]
572    fn test_encode_option_u16_with_val() {
573        let mut dest = vec![];
574        let value: Option<u16> = Some(16);
575        let result = value.encode(&mut dest, 0);
576        assert!(result.is_ok());
577        assert_eq!(dest.len(), 3);
578        assert_eq!(dest[0], 0x01);
579        assert_eq!(dest[1], 0x00);
580        assert_eq!(dest[2], 0x10);
581        assert_eq!(value.write_size(0), 3);
582    }
583
584    #[test]
585    fn test_encode_i32() {
586        let mut dest = vec![];
587        let value: i32 = 5;
588        let result = value.encode(&mut dest, 0);
589        assert!(result.is_ok());
590        assert_eq!(dest.len(), 4);
591        assert_eq!(dest[3], 0x05);
592        assert_eq!(value.write_size(0), 4);
593    }
594
595    #[test]
596    fn test_encode_i64() {
597        let mut dest = vec![];
598        let value: i64 = 5;
599        let result = value.encode(&mut dest, 0);
600        assert!(result.is_ok());
601        assert_eq!(dest.len(), 8);
602        assert_eq!(dest[0], 0x00);
603        assert_eq!(dest[7], 0x05);
604        assert_eq!(value.write_size(0), 8);
605    }
606
607    #[test]
608    fn test_encode_string_option_none() {
609        let mut dest = vec![];
610        let value: Option<String> = None;
611        let result = value.encode(&mut dest, 0);
612        assert!(result.is_ok());
613        assert_eq!(dest.len(), 2);
614        assert_eq!(dest[0], 0xff);
615        assert_eq!(dest[1], 0xff);
616        assert_eq!(value.write_size(0), 2);
617    }
618
619    #[test]
620    fn test_encode_string_option_some() {
621        let mut dest = vec![];
622        let value: Option<String> = Some(String::from("wo"));
623        let result = value.encode(&mut dest, 0);
624        assert!(result.is_ok());
625        assert_eq!(dest.len(), 4);
626        assert_eq!(dest[0], 0x00);
627        assert_eq!(dest[1], 0x02);
628        assert_eq!(dest[2], 0x77);
629        assert_eq!(dest[3], 0x6f);
630        assert_eq!(value.write_size(0), 4);
631    }
632
633    #[test]
634    fn test_encode_string() {
635        let mut dest = vec![];
636        let value = String::from("wo");
637        let result = value.encode(&mut dest, 0);
638        assert!(result.is_ok());
639        assert_eq!(dest.len(), 4);
640        assert_eq!(dest[0], 0x00);
641        assert_eq!(dest[1], 0x02);
642        assert_eq!(dest[2], 0x77);
643        assert_eq!(dest[3], 0x6f);
644        assert_eq!(value.write_size(0), 4);
645    }
646
647    #[test]
648    fn test_encode_bool() {
649        let mut dest = vec![];
650        let value = true;
651        let result = value.encode(&mut dest, 0);
652        assert!(result.is_ok());
653        assert_eq!(dest.len(), 1);
654        assert_eq!(dest[0], 0x01);
655        assert_eq!(value.write_size(0), 1);
656    }
657
658    #[test]
659    fn test_encode_string_vectors() {
660        let mut dest = vec![];
661        let value: Vec<String> = vec![String::from("test")];
662        let result = value.encode(&mut dest, 0);
663        assert!(result.is_ok());
664        assert_eq!(dest.len(), 10);
665        assert_eq!(dest[3], 0x01);
666        assert_eq!(dest[9], 0x74);
667        assert_eq!(value.write_size(0), 10); // vec len 4: string len: 2, string 4
668    }
669
670    #[test]
671    fn test_encode_u8_vectors() {
672        let mut dest = vec![];
673        let value: Vec<u8> = vec![0x10, 0x11];
674        let result = value.encode(&mut dest, 0);
675        assert!(result.is_ok());
676        assert_eq!(dest.len(), 6);
677        assert_eq!(dest[3], 0x02);
678        assert_eq!(dest[5], 0x11);
679        assert_eq!(value.write_size(0), 6);
680    }
681
682    #[test]
683    fn test_varint_encode_array_opton_vec8_none() {
684        let mut dest = vec![];
685        let value: Option<Vec<u8>> = None;
686        let result = value.encode_varint(&mut dest);
687        assert!(result.is_ok());
688        assert_eq!(dest.len(), 1);
689        assert_eq!(dest[0], 0x01);
690    }
691
692    #[test]
693    fn test_varint_encode_array_opton_vec8_simple_array() {
694        let mut dest = vec![];
695        let value: Option<Vec<u8>> = Some(vec![0x64, 0x6f, 0x67]);
696        let result = value.encode_varint(&mut dest);
697        assert!(result.is_ok());
698        assert_eq!(dest.len(), 4);
699    }
700
701    #[derive(Default)]
702    struct TestRecord {
703        value: i8,
704        value2: i8,
705    }
706
707    impl Encoder for TestRecord {
708        fn write_size(&self, version: Version) -> usize {
709            self.value.write_size(version) + {
710                if version > 1 {
711                    self.value2.write_size(version)
712                } else {
713                    0
714                }
715            }
716        }
717
718        fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), IoError>
719        where
720            T: BufMut,
721        {
722            self.value.encode(dest, version)?;
723            if version > 1 {
724                self.value2.encode(dest, version)?;
725            }
726            Ok(())
727        }
728    }
729
730    #[test]
731    fn test_encoding_struct() {
732        // v1
733        let mut dest = vec![];
734        let mut record = TestRecord::default();
735        record.value = 20;
736        record.value2 = 10;
737        record.encode(&mut dest, 0).expect("encode");
738        assert_eq!(dest.len(), 1);
739        assert_eq!(dest[0], 20);
740        assert_eq!(record.write_size(0), 1);
741
742        let mut dest2 = vec![];
743        record.encode(&mut dest2, 2).expect("encodv2 encodee");
744        assert_eq!(dest2.len(), 2);
745        assert_eq!(dest2[1], 10);
746
747        // v2
748        /*
749        let data2 = [0x06,0x09];
750        let record2 = TestRecord::decode_from(&mut Cursor::new(&data2),2).expect("decode");
751        assert_eq!(record2.value, 6);
752        assert_eq!(record2.value2, 9);
753        */
754    }
755}