fluvio_protocol/core/
encoder.rs

1// decode values
2use std::collections::BTreeMap;
3use std::io::Error;
4use std::io::ErrorKind;
5use std::io::Write;
6use std::marker::PhantomData;
7use std::time::Duration;
8
9use bytes::BufMut;
10use bytes::Bytes;
11use bytes::BytesMut;
12use tracing::trace;
13
14use crate::Version;
15
16use super::varint::variant_encode;
17use super::varint::variant_size;
18
19// trait for encoding and decoding using Fluvio Protocol
20pub trait Encoder {
21    /// size of this object in bytes
22    fn write_size(&self, version: Version) -> usize;
23
24    /// encoding contents for buffer
25    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
26    where
27        T: BufMut;
28
29    fn as_bytes(&self, version: Version) -> Result<Bytes, Error> {
30        let len = self.write_size(version);
31        let mut out = Vec::with_capacity(len);
32        self.encode(&mut out, version)?;
33        let mut buf = BytesMut::with_capacity(out.len());
34        buf.put_slice(&out);
35        trace!(len = buf.len(), "encoding as bytes");
36        Ok(buf.freeze())
37    }
38}
39
40pub trait EncoderVarInt {
41    fn var_write_size(&self) -> usize;
42
43    /// encoding contents for buffer
44    fn encode_varint<T>(&self, dest: &mut T) -> Result<(), Error>
45    where
46        T: BufMut;
47}
48
49impl<M> Encoder for Vec<M>
50where
51    M: Encoder,
52{
53    fn write_size(&self, version: Version) -> usize {
54        self.iter()
55            .fold(4, |sum, val| sum + val.write_size(version))
56    }
57
58    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
59    where
60        T: BufMut,
61    {
62        if dest.remaining_mut() < 4 {
63            return Err(Error::new(
64                ErrorKind::UnexpectedEof,
65                "not enough capacity for vec",
66            ));
67        }
68
69        dest.put_u32(self.len() as u32);
70
71        for ref v in self {
72            v.encode(dest, version)?;
73        }
74
75        Ok(())
76    }
77}
78
79impl<M> Encoder for Option<M>
80where
81    M: Encoder,
82{
83    fn write_size(&self, version: Version) -> usize {
84        match *self {
85            Some(ref value) => true.write_size(version) + value.write_size(version),
86            None => false.write_size(version),
87        }
88    }
89
90    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
91    where
92        T: BufMut,
93    {
94        match *self {
95            Some(ref value) => {
96                true.encode(dest, version)?;
97                value.encode(dest, version)
98            }
99            None => false.encode(dest, version),
100        }
101    }
102}
103
104impl<M> Encoder for PhantomData<M>
105where
106    M: Encoder,
107{
108    fn write_size(&self, _version: Version) -> usize {
109        0
110    }
111
112    fn encode<T>(&self, _dest: &mut T, _version: Version) -> Result<(), Error>
113    where
114        T: BufMut,
115    {
116        Ok(())
117    }
118}
119
120impl<K, V> Encoder for BTreeMap<K, V>
121where
122    K: Encoder,
123    V: Encoder,
124{
125    fn write_size(&self, version: Version) -> usize {
126        let mut len: usize = (0_u16).write_size(version);
127
128        for (key, value) in self.iter() {
129            len += key.write_size(version);
130            len += value.write_size(version);
131        }
132
133        len
134    }
135
136    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
137    where
138        T: BufMut,
139    {
140        let len = self.len() as u16;
141        len.encode(dest, version)?;
142
143        for (key, value) in self.iter() {
144            key.encode(dest, version)?;
145            value.encode(dest, version)?;
146        }
147
148        Ok(())
149    }
150}
151
152impl Encoder for bool {
153    fn write_size(&self, _version: Version) -> usize {
154        1
155    }
156
157    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
158    where
159        T: BufMut,
160    {
161        if dest.remaining_mut() < 1 {
162            return Err(Error::new(
163                ErrorKind::UnexpectedEof,
164                "not enough capacity for bool",
165            ));
166        }
167        if *self {
168            dest.put_i8(1);
169        } else {
170            dest.put_i8(0);
171        }
172        Ok(())
173    }
174}
175
176impl Encoder for i8 {
177    fn write_size(&self, _version: Version) -> usize {
178        1
179    }
180
181    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
182    where
183        T: BufMut,
184    {
185        if dest.remaining_mut() < 1 {
186            return Err(Error::new(
187                ErrorKind::UnexpectedEof,
188                "not enough capacity for i8",
189            ));
190        }
191        dest.put_i8(*self);
192        Ok(())
193    }
194}
195
196impl Encoder for u8 {
197    fn write_size(&self, _version: Version) -> usize {
198        1
199    }
200
201    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
202    where
203        T: BufMut,
204    {
205        if dest.remaining_mut() < 1 {
206            return Err(Error::new(
207                ErrorKind::UnexpectedEof,
208                "not enough capacity for i8",
209            ));
210        }
211        dest.put_u8(*self);
212        Ok(())
213    }
214}
215
216impl Encoder for i16 {
217    fn write_size(&self, _version: Version) -> usize {
218        2
219    }
220
221    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
222    where
223        T: BufMut,
224    {
225        if dest.remaining_mut() < 2 {
226            return Err(Error::new(
227                ErrorKind::UnexpectedEof,
228                "not enough capacity for i16",
229            ));
230        }
231        dest.put_i16(*self);
232        trace!("encoding i16: {:#x}", *self);
233        Ok(())
234    }
235}
236
237impl Encoder for u16 {
238    fn write_size(&self, _version: Version) -> usize {
239        2
240    }
241
242    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
243    where
244        T: BufMut,
245    {
246        if dest.remaining_mut() < 2 {
247            return Err(Error::new(
248                ErrorKind::UnexpectedEof,
249                "not enough capacity for u16",
250            ));
251        }
252        dest.put_u16(*self);
253        trace!("encoding u16: {:#x}", *self);
254        Ok(())
255    }
256}
257
258impl Encoder for i32 {
259    fn write_size(&self, _version: Version) -> usize {
260        4
261    }
262
263    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
264    where
265        T: BufMut,
266    {
267        if dest.remaining_mut() < 4 {
268            return Err(Error::new(
269                ErrorKind::UnexpectedEof,
270                "not enough capacity for i32",
271            ));
272        }
273        dest.put_i32(*self);
274        trace!("encoding i32: {:#x}", *self);
275        Ok(())
276    }
277}
278
279impl Encoder for u32 {
280    fn write_size(&self, _version: Version) -> usize {
281        4
282    }
283
284    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
285    where
286        T: BufMut,
287    {
288        if dest.remaining_mut() < 4 {
289            return Err(Error::new(
290                ErrorKind::UnexpectedEof,
291                "not enough capacity for u32",
292            ));
293        }
294        dest.put_u32(*self);
295        Ok(())
296    }
297}
298
299impl Encoder for f32 {
300    fn write_size(&self, _version: Version) -> usize {
301        4
302    }
303
304    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
305    where
306        T: BufMut,
307    {
308        if dest.remaining_mut() < 4 {
309            return Err(Error::new(
310                ErrorKind::UnexpectedEof,
311                "not enough capacity for f32",
312            ));
313        }
314        dest.put_f32(*self);
315        Ok(())
316    }
317}
318
319impl Encoder for u64 {
320    fn write_size(&self, _version: Version) -> usize {
321        8
322    }
323
324    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
325    where
326        T: BufMut,
327    {
328        if dest.remaining_mut() < 8 {
329            return Err(Error::new(
330                ErrorKind::UnexpectedEof,
331                "not enough capacity for u64",
332            ));
333        }
334        dest.put_u64(*self);
335        Ok(())
336    }
337}
338
339impl Encoder for i64 {
340    fn write_size(&self, _version: Version) -> usize {
341        8
342    }
343
344    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
345    where
346        T: BufMut,
347    {
348        if dest.remaining_mut() < 8 {
349            return Err(Error::new(
350                ErrorKind::UnexpectedEof,
351                "not enough capacity for i64",
352            ));
353        }
354        dest.put_i64(*self);
355        Ok(())
356    }
357}
358
359impl EncoderVarInt for i64 {
360    fn var_write_size(&self) -> usize {
361        variant_size(*self)
362    }
363
364    fn encode_varint<T>(&self, dest: &mut T) -> Result<(), Error>
365    where
366        T: BufMut,
367    {
368        variant_encode(dest, *self)?;
369        Ok(())
370    }
371}
372
373impl Encoder for f64 {
374    fn write_size(&self, _version: Version) -> usize {
375        8
376    }
377
378    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
379    where
380        T: BufMut,
381    {
382        if dest.remaining_mut() < 8 {
383            return Err(Error::new(
384                ErrorKind::UnexpectedEof,
385                "not enough capacity for f64",
386            ));
387        }
388        dest.put_f64(*self);
389        Ok(())
390    }
391}
392
393impl Encoder for Duration {
394    fn write_size(&self, _version: Version) -> usize {
395        12
396    }
397
398    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
399    where
400        T: BufMut,
401    {
402        if dest.remaining_mut() < 12 {
403            return Err(Error::new(
404                ErrorKind::UnexpectedEof,
405                "not enough capacity for u64+u32",
406            ));
407        }
408        dest.put_u64(self.as_secs());
409        dest.put_u32(self.subsec_nanos());
410        Ok(())
411    }
412}
413
414impl Encoder for String {
415    fn write_size(&self, _version: Version) -> usize {
416        2 + self.len()
417    }
418
419    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
420    where
421        T: BufMut,
422    {
423        if dest.remaining_mut() < 2 + self.len() {
424            return Err(Error::new(
425                ErrorKind::UnexpectedEof,
426                "not enough capacity for string",
427            ));
428        }
429
430        dest.put_u16(self.len() as u16);
431
432        let mut writer = dest.writer();
433        let bytes_written = writer.write(self.as_bytes())?;
434
435        if bytes_written != self.len() {
436            return Err(Error::new(
437                ErrorKind::UnexpectedEof,
438                format!(
439                    "out of {} bytes, {} not written",
440                    self.len(),
441                    self.len() - bytes_written
442                ),
443            ));
444        }
445
446        Ok(())
447    }
448}
449
450impl<M> Encoder for &M
451where
452    M: Encoder,
453{
454    fn write_size(&self, version: Version) -> usize {
455        (*self).write_size(version)
456    }
457
458    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
459    where
460        T: BufMut,
461    {
462        (*self).encode(dest, version)
463    }
464}
465
466impl Encoder for &str {
467    fn write_size(&self, _version: Version) -> usize {
468        2 + self.len()
469    }
470
471    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
472    where
473        T: BufMut,
474    {
475        if dest.remaining_mut() < 2 + self.len() {
476            return Err(Error::new(
477                ErrorKind::UnexpectedEof,
478                "not enough capacity for string",
479            ));
480        }
481
482        dest.put_u16(self.len() as u16);
483
484        let mut writer = dest.writer();
485        let bytes_written = writer.write(<str>::as_bytes(self))?;
486
487        if bytes_written != self.len() {
488            return Err(Error::new(
489                ErrorKind::UnexpectedEof,
490                format!(
491                    "out of {} bytes, {} not written",
492                    self.len(),
493                    self.len() - bytes_written
494                ),
495            ));
496        }
497
498        Ok(())
499    }
500}
501
502#[cfg(test)]
503mod test {
504
505    use bytes::BufMut;
506    use std::io::Error as IoError;
507
508    use crate::Encoder;
509    use crate::Version;
510
511    #[test]
512    fn test_encode_i8() {
513        let mut dest = vec![];
514        let value: i8 = 5;
515        let result = value.encode(&mut dest, 0);
516        assert!(result.is_ok());
517        assert_eq!(dest.len(), 1);
518        assert_eq!(dest[0], 0x05);
519        assert_eq!(value.write_size(0), 1);
520    }
521
522    #[test]
523    fn test_encode_u8() {
524        let mut dest = vec![];
525        let value: u8 = 8;
526        let result = value.encode(&mut dest, 0);
527        assert!(result.is_ok());
528        assert_eq!(dest.len(), 1);
529        assert_eq!(dest[0], 0x08);
530        assert_eq!(value.write_size(0), 1);
531    }
532
533    #[test]
534    fn test_encode_i16() {
535        let mut dest = vec![];
536        let value: i16 = 5;
537        let result = value.encode(&mut dest, 0);
538        assert!(result.is_ok());
539        assert_eq!(dest.len(), 2);
540        assert_eq!(dest[0], 0x00);
541        assert_eq!(dest[1], 0x05);
542        assert_eq!(value.write_size(0), 2);
543    }
544
545    #[test]
546    fn test_encode_u16() {
547        let mut dest = vec![];
548        let value: u16 = 16;
549        let result = value.encode(&mut dest, 0);
550        assert!(result.is_ok());
551        assert_eq!(dest.len(), 2);
552        assert_eq!(dest[0], 0x00);
553        assert_eq!(dest[1], 0x10);
554        assert_eq!(value.write_size(0), 2);
555    }
556
557    #[test]
558    fn test_encode_option_u16_none() {
559        let mut dest = vec![];
560        let value: Option<u16> = None;
561        let result = value.encode(&mut dest, 0);
562        assert!(result.is_ok());
563        assert_eq!(dest.len(), 1);
564        assert_eq!(dest[0], 0x00);
565        assert_eq!(value.write_size(0), 1);
566    }
567
568    #[test]
569    fn test_encode_option_u16_with_val() {
570        let mut dest = vec![];
571        let value: Option<u16> = Some(16);
572        let result = value.encode(&mut dest, 0);
573        assert!(result.is_ok());
574        assert_eq!(dest.len(), 3);
575        assert_eq!(dest[0], 0x01);
576        assert_eq!(dest[1], 0x00);
577        assert_eq!(dest[2], 0x10);
578        assert_eq!(value.write_size(0), 3);
579    }
580
581    #[test]
582    fn test_encode_u32() {
583        let mut dest = vec![];
584        let value: u32 = 16;
585        let result = value.encode(&mut dest, 0);
586        assert!(result.is_ok());
587        assert_eq!(dest, vec![0x00, 0x00, 0x00, 0x10]);
588        assert_eq!(value.write_size(0), 4);
589    }
590
591    #[test]
592    fn test_encode_option_u32_none() {
593        let mut dest = vec![];
594        let value: Option<u32> = None;
595        let result = value.encode(&mut dest, 0);
596        assert!(result.is_ok());
597        assert_eq!(dest.len(), 1);
598        assert_eq!(dest[0], 0x00);
599        assert_eq!(value.write_size(0), 1);
600    }
601
602    #[test]
603    fn test_encode_option_u32_with_val() {
604        let mut dest = vec![];
605        let value: Option<u32> = Some(16);
606        let result = value.encode(&mut dest, 0);
607        assert!(result.is_ok());
608        assert_eq!(dest, vec![0x01, 0x00, 0x00, 0x00, 0x10]);
609        assert_eq!(value.write_size(0), 5);
610    }
611
612    #[test]
613    fn test_encode_u64() {
614        let mut dest = vec![];
615        let value: u64 = 16;
616        let result = value.encode(&mut dest, 0);
617        assert!(result.is_ok());
618        assert_eq!(dest, vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10]);
619        assert_eq!(value.write_size(0), 8);
620    }
621
622    #[test]
623    fn test_encode_option_u64_none() {
624        let mut dest = vec![];
625        let value: Option<u64> = None;
626        let result = value.encode(&mut dest, 0);
627        assert!(result.is_ok());
628        assert_eq!(dest.len(), 1);
629        assert_eq!(dest[0], 0x00);
630        assert_eq!(value.write_size(0), 1);
631    }
632
633    #[test]
634    fn test_encode_option_u64_with_val() {
635        let mut dest = vec![];
636        let value: Option<u64> = Some(16);
637        let result = value.encode(&mut dest, 0);
638        assert!(result.is_ok());
639        assert_eq!(
640            dest,
641            vec![0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10]
642        );
643        assert_eq!(value.write_size(0), 9);
644    }
645
646    #[test]
647    fn test_encode_i32() {
648        let mut dest = vec![];
649        let value: i32 = 5;
650        let result = value.encode(&mut dest, 0);
651        assert!(result.is_ok());
652        assert_eq!(dest.len(), 4);
653        assert_eq!(dest[3], 0x05);
654        assert_eq!(value.write_size(0), 4);
655    }
656
657    #[test]
658    fn test_encode_i64() {
659        let mut dest = vec![];
660        let value: i64 = 5;
661        let result = value.encode(&mut dest, 0);
662        assert!(result.is_ok());
663        assert_eq!(dest.len(), 8);
664        assert_eq!(dest[0], 0x00);
665        assert_eq!(dest[7], 0x05);
666        assert_eq!(value.write_size(0), 8);
667    }
668
669    #[test]
670    fn test_encode_string_option_none() {
671        let mut dest = vec![];
672        let value: Option<String> = None;
673        let result = value.encode(&mut dest, 0);
674        assert!(result.is_ok());
675        assert_eq!(dest.len(), 1);
676        assert_eq!(dest[0], 0x00);
677        assert_eq!(value.write_size(0), 1);
678    }
679
680    #[test]
681    fn test_encode_string_option_some() {
682        let mut dest = vec![];
683        let value: Option<String> = Some(String::from("wo"));
684        let result = value.encode(&mut dest, 0);
685        assert!(result.is_ok());
686        assert_eq!(dest.len(), 5);
687        assert_eq!(dest[0], 0x01);
688        assert_eq!(dest[1], 0x00);
689        assert_eq!(dest[2], 0x02);
690        assert_eq!(dest[3], 0x77);
691        assert_eq!(dest[4], 0x6f);
692        assert_eq!(value.write_size(0), 5);
693    }
694
695    #[test]
696    fn test_encode_string() {
697        let mut dest = vec![];
698        let value = String::from("wo");
699        let result = value.encode(&mut dest, 0);
700        assert!(result.is_ok());
701        assert_eq!(dest.len(), 4);
702        assert_eq!(dest[0], 0x00);
703        assert_eq!(dest[1], 0x02);
704        assert_eq!(dest[2], 0x77);
705        assert_eq!(dest[3], 0x6f);
706        assert_eq!(value.write_size(0), 4);
707    }
708
709    #[test]
710    fn test_encode_bool() {
711        let mut dest = vec![];
712        let value = true;
713        let result = value.encode(&mut dest, 0);
714        assert!(result.is_ok());
715        assert_eq!(dest.len(), 1);
716        assert_eq!(dest[0], 0x01);
717        assert_eq!(value.write_size(0), 1);
718    }
719
720    #[test]
721    fn test_encode_string_vectors() {
722        let mut dest = vec![];
723        let value: Vec<String> = vec![String::from("test")];
724        let result = value.encode(&mut dest, 0);
725        assert!(result.is_ok());
726        assert_eq!(dest.len(), 10);
727        assert_eq!(dest[3], 0x01);
728        assert_eq!(dest[9], 0x74);
729        assert_eq!(value.write_size(0), dest.len()); // vec len 4: string len: 2, string 4
730    }
731
732    #[test]
733    fn test_encode_u8_vectors() {
734        let mut dest = vec![];
735        let value: Vec<u8> = vec![0x10, 0x11];
736        let result = value.encode(&mut dest, 0);
737        assert!(result.is_ok());
738        assert_eq!(dest.len(), 6);
739        assert_eq!(dest[3], 0x02);
740        assert_eq!(dest[5], 0x11);
741        assert_eq!(value.write_size(0), dest.len());
742    }
743    #[test]
744    fn test_encode_u8_vectors_big() {
745        let mut dest = vec![];
746        let value: Vec<u8> = vec![0x10; 257];
747        let result = value.encode(&mut dest, 0);
748        assert!(result.is_ok());
749        assert_eq!(dest.len(), 257 + 4);
750        assert_eq!(dest[4..257 + 4], vec![0x10; 257]);
751        assert_eq!(value.write_size(0), dest.len());
752    }
753
754    #[derive(Default)]
755    struct TestRecord {
756        value: i8,
757        value2: i8,
758    }
759
760    impl Encoder for TestRecord {
761        fn write_size(&self, version: Version) -> usize {
762            self.value.write_size(version) + {
763                if version > 1 {
764                    self.value2.write_size(version)
765                } else {
766                    0
767                }
768            }
769        }
770
771        fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), IoError>
772        where
773            T: BufMut,
774        {
775            self.value.encode(dest, version)?;
776            if version > 1 {
777                self.value2.encode(dest, version)?;
778            }
779            Ok(())
780        }
781    }
782
783    #[test]
784    fn test_encoding_struct() {
785        // v1
786        let mut dest = vec![];
787        let record = TestRecord {
788            value: 20,
789            value2: 10,
790        };
791        record.encode(&mut dest, 0).expect("encode");
792        assert_eq!(dest.len(), 1);
793        assert_eq!(dest[0], 20);
794        assert_eq!(record.write_size(0), 1);
795
796        let mut dest2 = vec![];
797        record.encode(&mut dest2, 2).expect("encodv2 encodee");
798        assert_eq!(dest2.len(), 2);
799        assert_eq!(dest2[1], 10);
800
801        // v2
802        /*
803        let data2 = [0x06,0x09];
804        let record2 = TestRecord::decode_from(&mut Cursor::new(&data2),2).expect("decode");
805        assert_eq!(record2.value, 6);
806        assert_eq!(record2.value2, 9);
807        */
808    }
809}