fluvio_protocol_core/
encoder.rs

1// decode values
2use std::collections::BTreeMap;
3use std::io::Error;
4use std::io::ErrorKind;
5use std::io::Write;
6use std::marker::PhantomData;
7
8use bytes::BufMut;
9use bytes::Bytes;
10use bytes::BytesMut;
11use log::trace;
12
13use crate::Version;
14
15use super::varint::variant_encode;
16use super::varint::variant_size;
17
18// trait for encoding and decoding using Kafka Protocol
19pub trait Encoder {
20    /// size of this object in bytes
21    fn write_size(&self, version: Version) -> usize;
22
23    /// encoding contents for buffer
24    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
25    where
26        T: BufMut;
27
28    fn as_bytes(&self, version: Version) -> Result<Bytes, Error> {
29        trace!("encoding as bytes");
30        let mut out = vec![];
31        self.encode(&mut out, version)?;
32        let mut buf = BytesMut::with_capacity(out.len());
33        buf.put_slice(&out);
34        Ok(buf.freeze())
35    }
36}
37
38pub trait EncoderVarInt {
39    fn var_write_size(&self) -> usize;
40
41    /// encoding contents for buffer
42    fn encode_varint<T>(&self, dest: &mut T) -> Result<(), Error>
43    where
44        T: BufMut;
45}
46
47impl<M> Encoder for Vec<M>
48where
49    M: Encoder,
50{
51    fn write_size(&self, version: Version) -> usize {
52        self.iter()
53            .fold(4, |sum, val| sum + val.write_size(version))
54    }
55
56    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
57    where
58        T: BufMut,
59    {
60        if dest.remaining_mut() < 4 {
61            return Err(Error::new(
62                ErrorKind::UnexpectedEof,
63                "not enough capacity for vec",
64            ));
65        }
66
67        dest.put_u32(self.len() as u32);
68
69        for ref v in self {
70            v.encode(dest, version)?;
71        }
72
73        Ok(())
74    }
75}
76
77impl<M> Encoder for Option<M>
78where
79    M: Encoder,
80{
81    fn write_size(&self, version: Version) -> usize {
82        match *self {
83            Some(ref value) => true.write_size(version) + value.write_size(version),
84            None => false.write_size(version),
85        }
86    }
87
88    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
89    where
90        T: BufMut,
91    {
92        match *self {
93            Some(ref value) => {
94                true.encode(dest, version)?;
95                value.encode(dest, version)
96            }
97            None => false.encode(dest, version),
98        }
99    }
100}
101
102impl<M> Encoder for PhantomData<M>
103where
104    M: Encoder,
105{
106    fn write_size(&self, _version: Version) -> usize {
107        0
108    }
109
110    fn encode<T>(&self, _dest: &mut T, _version: Version) -> Result<(), Error>
111    where
112        T: BufMut,
113    {
114        Ok(())
115    }
116}
117
118impl<K, V> Encoder for BTreeMap<K, V>
119where
120    K: Encoder,
121    V: Encoder,
122{
123    fn write_size(&self, version: Version) -> usize {
124        let mut len: usize = (0_u16).write_size(version);
125
126        for (key, value) in self.iter() {
127            len += key.write_size(version);
128            len += value.write_size(version);
129        }
130
131        len
132    }
133
134    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
135    where
136        T: BufMut,
137    {
138        let len = self.len() as u16;
139        len.encode(dest, version)?;
140
141        for (key, value) in self.iter() {
142            key.encode(dest, version)?;
143            value.encode(dest, version)?;
144        }
145
146        Ok(())
147    }
148}
149
150impl Encoder for bool {
151    fn write_size(&self, _version: Version) -> usize {
152        1
153    }
154
155    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
156    where
157        T: BufMut,
158    {
159        if dest.remaining_mut() < 1 {
160            return Err(Error::new(
161                ErrorKind::UnexpectedEof,
162                "not enough capacity for bool",
163            ));
164        }
165        if *self {
166            dest.put_i8(1);
167        } else {
168            dest.put_i8(0);
169        }
170        Ok(())
171    }
172}
173
174impl Encoder for i8 {
175    fn write_size(&self, _version: Version) -> usize {
176        1
177    }
178
179    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
180    where
181        T: BufMut,
182    {
183        if dest.remaining_mut() < 1 {
184            return Err(Error::new(
185                ErrorKind::UnexpectedEof,
186                "not enough capacity for i8",
187            ));
188        }
189        dest.put_i8(*self);
190        Ok(())
191    }
192}
193
194impl Encoder for u8 {
195    fn write_size(&self, _version: Version) -> usize {
196        1
197    }
198
199    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
200    where
201        T: BufMut,
202    {
203        if dest.remaining_mut() < 1 {
204            return Err(Error::new(
205                ErrorKind::UnexpectedEof,
206                "not enough capacity for i8",
207            ));
208        }
209        dest.put_u8(*self);
210        Ok(())
211    }
212}
213
214impl Encoder for i16 {
215    fn write_size(&self, _version: Version) -> usize {
216        2
217    }
218
219    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
220    where
221        T: BufMut,
222    {
223        if dest.remaining_mut() < 2 {
224            return Err(Error::new(
225                ErrorKind::UnexpectedEof,
226                "not enough capacity for i16",
227            ));
228        }
229        dest.put_i16(*self);
230        trace!("encoding i16: {:#x}", *self);
231        Ok(())
232    }
233}
234
235impl Encoder for u16 {
236    fn write_size(&self, _version: Version) -> usize {
237        2
238    }
239
240    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
241    where
242        T: BufMut,
243    {
244        if dest.remaining_mut() < 2 {
245            return Err(Error::new(
246                ErrorKind::UnexpectedEof,
247                "not enough capacity for u16",
248            ));
249        }
250        dest.put_u16(*self);
251        trace!("encoding u16: {:#x}", *self);
252        Ok(())
253    }
254}
255
256impl Encoder for i32 {
257    fn write_size(&self, _version: Version) -> usize {
258        4
259    }
260
261    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
262    where
263        T: BufMut,
264    {
265        if dest.remaining_mut() < 4 {
266            return Err(Error::new(
267                ErrorKind::UnexpectedEof,
268                "not enough capacity for i32",
269            ));
270        }
271        dest.put_i32(*self);
272        trace!("encoding i32: {:#x}", *self);
273        Ok(())
274    }
275}
276
277impl Encoder for u32 {
278    fn write_size(&self, _version: Version) -> usize {
279        4
280    }
281
282    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
283    where
284        T: BufMut,
285    {
286        if dest.remaining_mut() < 4 {
287            return Err(Error::new(
288                ErrorKind::UnexpectedEof,
289                "not enough capacity for u32",
290            ));
291        }
292        dest.put_u32(*self);
293        Ok(())
294    }
295}
296
297impl Encoder for i64 {
298    fn write_size(&self, _version: Version) -> usize {
299        8
300    }
301
302    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
303    where
304        T: BufMut,
305    {
306        if dest.remaining_mut() < 8 {
307            return Err(Error::new(
308                ErrorKind::UnexpectedEof,
309                "not enough capacity for i164",
310            ));
311        }
312        dest.put_i64(*self);
313        Ok(())
314    }
315}
316
317impl EncoderVarInt for i64 {
318    fn var_write_size(&self) -> usize {
319        variant_size(*self)
320    }
321
322    fn encode_varint<T>(&self, dest: &mut T) -> Result<(), Error>
323    where
324        T: BufMut,
325    {
326        variant_encode(dest, *self)?;
327        Ok(())
328    }
329}
330
331impl Encoder for String {
332    fn write_size(&self, _version: Version) -> usize {
333        2 + self.len()
334    }
335
336    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
337    where
338        T: BufMut,
339    {
340        if dest.remaining_mut() < 2 + self.len() {
341            return Err(Error::new(
342                ErrorKind::UnexpectedEof,
343                "not enough capacity for string",
344            ));
345        }
346
347        dest.put_u16(self.len() as u16);
348
349        let mut writer = dest.writer();
350        let bytes_written = writer.write(self.as_bytes())?;
351
352        if bytes_written != self.len() {
353            return Err(Error::new(
354                ErrorKind::UnexpectedEof,
355                format!(
356                    "out of {} bytes, {} not written",
357                    self.len(),
358                    self.len() - bytes_written
359                ),
360            ));
361        }
362
363        Ok(())
364    }
365}
366
367impl<M> Encoder for &M
368where
369    M: Encoder,
370{
371    fn write_size(&self, version: Version) -> usize {
372        (*self).write_size(version)
373    }
374
375    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
376    where
377        T: BufMut,
378    {
379        (*self).encode(dest, version)
380    }
381}
382
383#[cfg(test)]
384mod test {
385
386    use bytes::BufMut;
387    use std::io::Error as IoError;
388
389    use crate::Encoder;
390    use crate::Version;
391
392    #[test]
393    fn test_encode_i8() {
394        let mut dest = vec![];
395        let value: i8 = 5;
396        let result = value.encode(&mut dest, 0);
397        assert!(result.is_ok());
398        assert_eq!(dest.len(), 1);
399        assert_eq!(dest[0], 0x05);
400        assert_eq!(value.write_size(0), 1);
401    }
402
403    #[test]
404    fn test_encode_u8() {
405        let mut dest = vec![];
406        let value: u8 = 8;
407        let result = value.encode(&mut dest, 0);
408        assert!(result.is_ok());
409        assert_eq!(dest.len(), 1);
410        assert_eq!(dest[0], 0x08);
411        assert_eq!(value.write_size(0), 1);
412    }
413
414    #[test]
415    fn test_encode_i16() {
416        let mut dest = vec![];
417        let value: i16 = 5;
418        let result = value.encode(&mut dest, 0);
419        assert!(result.is_ok());
420        assert_eq!(dest.len(), 2);
421        assert_eq!(dest[0], 0x00);
422        assert_eq!(dest[1], 0x05);
423        assert_eq!(value.write_size(0), 2);
424    }
425
426    #[test]
427    fn test_encode_u16() {
428        let mut dest = vec![];
429        let value: u16 = 16;
430        let result = value.encode(&mut dest, 0);
431        assert!(result.is_ok());
432        assert_eq!(dest.len(), 2);
433        assert_eq!(dest[0], 0x00);
434        assert_eq!(dest[1], 0x10);
435        assert_eq!(value.write_size(0), 2);
436    }
437
438    #[test]
439    fn test_encode_option_u16_none() {
440        let mut dest = vec![];
441        let value: Option<u16> = None;
442        let result = value.encode(&mut dest, 0);
443        assert!(result.is_ok());
444        assert_eq!(dest.len(), 1);
445        assert_eq!(dest[0], 0x00);
446        assert_eq!(value.write_size(0), 1);
447    }
448
449    #[test]
450    fn test_encode_option_u16_with_val() {
451        let mut dest = vec![];
452        let value: Option<u16> = Some(16);
453        let result = value.encode(&mut dest, 0);
454        assert!(result.is_ok());
455        assert_eq!(dest.len(), 3);
456        assert_eq!(dest[0], 0x01);
457        assert_eq!(dest[1], 0x00);
458        assert_eq!(dest[2], 0x10);
459        assert_eq!(value.write_size(0), 3);
460    }
461
462    #[test]
463    fn test_encode_i32() {
464        let mut dest = vec![];
465        let value: i32 = 5;
466        let result = value.encode(&mut dest, 0);
467        assert!(result.is_ok());
468        assert_eq!(dest.len(), 4);
469        assert_eq!(dest[3], 0x05);
470        assert_eq!(value.write_size(0), 4);
471    }
472
473    #[test]
474    fn test_encode_i64() {
475        let mut dest = vec![];
476        let value: i64 = 5;
477        let result = value.encode(&mut dest, 0);
478        assert!(result.is_ok());
479        assert_eq!(dest.len(), 8);
480        assert_eq!(dest[0], 0x00);
481        assert_eq!(dest[7], 0x05);
482        assert_eq!(value.write_size(0), 8);
483    }
484
485    #[test]
486    fn test_encode_string_option_none() {
487        let mut dest = vec![];
488        let value: Option<String> = None;
489        let result = value.encode(&mut dest, 0);
490        assert!(result.is_ok());
491        assert_eq!(dest.len(), 1);
492        assert_eq!(dest[0], 0x00);
493        assert_eq!(value.write_size(0), 1);
494    }
495
496    #[test]
497    fn test_encode_string_option_some() {
498        let mut dest = vec![];
499        let value: Option<String> = Some(String::from("wo"));
500        let result = value.encode(&mut dest, 0);
501        assert!(result.is_ok());
502        assert_eq!(dest.len(), 5);
503        assert_eq!(dest[0], 0x01);
504        assert_eq!(dest[1], 0x00);
505        assert_eq!(dest[2], 0x02);
506        assert_eq!(dest[3], 0x77);
507        assert_eq!(dest[4], 0x6f);
508        assert_eq!(value.write_size(0), 5);
509    }
510
511    #[test]
512    fn test_encode_string() {
513        let mut dest = vec![];
514        let value = String::from("wo");
515        let result = value.encode(&mut dest, 0);
516        assert!(result.is_ok());
517        assert_eq!(dest.len(), 4);
518        assert_eq!(dest[0], 0x00);
519        assert_eq!(dest[1], 0x02);
520        assert_eq!(dest[2], 0x77);
521        assert_eq!(dest[3], 0x6f);
522        assert_eq!(value.write_size(0), 4);
523    }
524
525    #[test]
526    fn test_encode_bool() {
527        let mut dest = vec![];
528        let value = true;
529        let result = value.encode(&mut dest, 0);
530        assert!(result.is_ok());
531        assert_eq!(dest.len(), 1);
532        assert_eq!(dest[0], 0x01);
533        assert_eq!(value.write_size(0), 1);
534    }
535
536    #[test]
537    fn test_encode_string_vectors() {
538        let mut dest = vec![];
539        let value: Vec<String> = vec![String::from("test")];
540        let result = value.encode(&mut dest, 0);
541        assert!(result.is_ok());
542        assert_eq!(dest.len(), 10);
543        assert_eq!(dest[3], 0x01);
544        assert_eq!(dest[9], 0x74);
545        assert_eq!(value.write_size(0), dest.len()); // vec len 4: string len: 2, string 4
546    }
547
548    #[test]
549    fn test_encode_u8_vectors() {
550        let mut dest = vec![];
551        let value: Vec<u8> = vec![0x10, 0x11];
552        let result = value.encode(&mut dest, 0);
553        assert!(result.is_ok());
554        assert_eq!(dest.len(), 6);
555        assert_eq!(dest[3], 0x02);
556        assert_eq!(dest[5], 0x11);
557        assert_eq!(value.write_size(0), dest.len());
558    }
559    #[test]
560    fn test_encode_u8_vectors_big() {
561        let mut dest = vec![];
562        let value: Vec<u8> = vec![0x10; 257];
563        let result = value.encode(&mut dest, 0);
564        assert!(result.is_ok());
565        assert_eq!(dest.len(), 257 + 4);
566        assert_eq!(dest[4..257 + 4], vec![0x10; 257]);
567        assert_eq!(value.write_size(0), dest.len());
568    }
569
570    #[derive(Default)]
571    struct TestRecord {
572        value: i8,
573        value2: i8,
574    }
575
576    impl Encoder for TestRecord {
577        fn write_size(&self, version: Version) -> usize {
578            self.value.write_size(version) + {
579                if version > 1 {
580                    self.value2.write_size(version)
581                } else {
582                    0
583                }
584            }
585        }
586
587        fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), IoError>
588        where
589            T: BufMut,
590        {
591            self.value.encode(dest, version)?;
592            if version > 1 {
593                self.value2.encode(dest, version)?;
594            }
595            Ok(())
596        }
597    }
598
599    #[test]
600    fn test_encoding_struct() {
601        // v1
602        let mut dest = vec![];
603        let record = TestRecord {
604            value: 20,
605            value2: 10,
606        };
607        record.encode(&mut dest, 0).expect("encode");
608        assert_eq!(dest.len(), 1);
609        assert_eq!(dest[0], 20);
610        assert_eq!(record.write_size(0), 1);
611
612        let mut dest2 = vec![];
613        record.encode(&mut dest2, 2).expect("encodv2 encodee");
614        assert_eq!(dest2.len(), 2);
615        assert_eq!(dest2[1], 10);
616
617        // v2
618        /*
619        let data2 = [0x06,0x09];
620        let record2 = TestRecord::decode_from(&mut Cursor::new(&data2),2).expect("decode");
621        assert_eq!(record2.value, 6);
622        assert_eq!(record2.value2, 9);
623        */
624    }
625}