fluvio_protocol/core/
encoder.rs

1// decode values
2use std::collections::BTreeMap;
3use std::io::Error;
4use std::io::ErrorKind;
5use std::io::Write;
6use std::marker::PhantomData;
7use std::time::Duration;
8
9use bytes::BufMut;
10use bytes::Bytes;
11use bytes::BytesMut;
12use tracing::trace;
13
14use crate::Version;
15
16use super::varint::variant_encode;
17use super::varint::variant_size;
18
19// trait for encoding and decoding using Fluvio Protocol
20pub trait Encoder {
21    /// size of this object in bytes
22    fn write_size(&self, version: Version) -> usize;
23
24    /// encoding contents for buffer
25    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
26    where
27        T: BufMut;
28
29    fn as_bytes(&self, version: Version) -> Result<Bytes, Error> {
30        let len = self.write_size(version);
31        let mut out = Vec::with_capacity(len);
32        self.encode(&mut out, version)?;
33        let mut buf = BytesMut::with_capacity(out.len());
34        buf.put_slice(&out);
35        trace!(len = buf.len(), "encoding as bytes");
36        Ok(buf.freeze())
37    }
38}
39
40pub trait EncoderVarInt {
41    fn var_write_size(&self) -> usize;
42
43    /// encoding contents for buffer
44    fn encode_varint<T>(&self, dest: &mut T) -> Result<(), Error>
45    where
46        T: BufMut;
47}
48
49impl<M> Encoder for Vec<M>
50where
51    M: Encoder,
52{
53    fn write_size(&self, version: Version) -> usize {
54        self.iter()
55            .fold(4, |sum, val| sum + val.write_size(version))
56    }
57
58    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
59    where
60        T: BufMut,
61    {
62        if dest.remaining_mut() < 4 {
63            return Err(Error::new(
64                ErrorKind::UnexpectedEof,
65                "not enough capacity for vec",
66            ));
67        }
68
69        dest.put_u32(self.len() as u32);
70
71        for ref v in self {
72            v.encode(dest, version)?;
73        }
74
75        Ok(())
76    }
77}
78
79impl<M> Encoder for Option<M>
80where
81    M: Encoder,
82{
83    fn write_size(&self, version: Version) -> usize {
84        match *self {
85            Some(ref value) => true.write_size(version) + value.write_size(version),
86            None => false.write_size(version),
87        }
88    }
89
90    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
91    where
92        T: BufMut,
93    {
94        match *self {
95            Some(ref value) => {
96                true.encode(dest, version)?;
97                value.encode(dest, version)
98            }
99            None => false.encode(dest, version),
100        }
101    }
102}
103
104impl<M> Encoder for PhantomData<M>
105where
106    M: Encoder,
107{
108    fn write_size(&self, _version: Version) -> usize {
109        0
110    }
111
112    fn encode<T>(&self, _dest: &mut T, _version: Version) -> Result<(), Error>
113    where
114        T: BufMut,
115    {
116        Ok(())
117    }
118}
119
120impl<K, V> Encoder for BTreeMap<K, V>
121where
122    K: Encoder,
123    V: Encoder,
124{
125    fn write_size(&self, version: Version) -> usize {
126        let mut len: usize = (0_u16).write_size(version);
127
128        for (key, value) in self.iter() {
129            len += key.write_size(version);
130            len += value.write_size(version);
131        }
132
133        len
134    }
135
136    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
137    where
138        T: BufMut,
139    {
140        let len = self.len() as u16;
141        len.encode(dest, version)?;
142
143        for (key, value) in self.iter() {
144            key.encode(dest, version)?;
145            value.encode(dest, version)?;
146        }
147
148        Ok(())
149    }
150}
151
152impl Encoder for bool {
153    fn write_size(&self, _version: Version) -> usize {
154        1
155    }
156
157    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
158    where
159        T: BufMut,
160    {
161        if dest.remaining_mut() < 1 {
162            return Err(Error::new(
163                ErrorKind::UnexpectedEof,
164                "not enough capacity for bool",
165            ));
166        }
167        if *self {
168            dest.put_i8(1);
169        } else {
170            dest.put_i8(0);
171        }
172        Ok(())
173    }
174}
175
176impl Encoder for i8 {
177    fn write_size(&self, _version: Version) -> usize {
178        1
179    }
180
181    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
182    where
183        T: BufMut,
184    {
185        if dest.remaining_mut() < 1 {
186            return Err(Error::new(
187                ErrorKind::UnexpectedEof,
188                "not enough capacity for i8",
189            ));
190        }
191        dest.put_i8(*self);
192        Ok(())
193    }
194}
195
196impl Encoder for u8 {
197    fn write_size(&self, _version: Version) -> usize {
198        1
199    }
200
201    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
202    where
203        T: BufMut,
204    {
205        if dest.remaining_mut() < 1 {
206            return Err(Error::new(
207                ErrorKind::UnexpectedEof,
208                "not enough capacity for i8",
209            ));
210        }
211        dest.put_u8(*self);
212        Ok(())
213    }
214}
215
216impl Encoder for i16 {
217    fn write_size(&self, _version: Version) -> usize {
218        2
219    }
220
221    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
222    where
223        T: BufMut,
224    {
225        if dest.remaining_mut() < 2 {
226            return Err(Error::new(
227                ErrorKind::UnexpectedEof,
228                "not enough capacity for i16",
229            ));
230        }
231        dest.put_i16(*self);
232        trace!("encoding i16: {:#x}", *self);
233        Ok(())
234    }
235}
236
237impl Encoder for u16 {
238    fn write_size(&self, _version: Version) -> usize {
239        2
240    }
241
242    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
243    where
244        T: BufMut,
245    {
246        if dest.remaining_mut() < 2 {
247            return Err(Error::new(
248                ErrorKind::UnexpectedEof,
249                "not enough capacity for u16",
250            ));
251        }
252        dest.put_u16(*self);
253        trace!("encoding u16: {:#x}", *self);
254        Ok(())
255    }
256}
257
258impl Encoder for i32 {
259    fn write_size(&self, _version: Version) -> usize {
260        4
261    }
262
263    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
264    where
265        T: BufMut,
266    {
267        if dest.remaining_mut() < 4 {
268            return Err(Error::new(
269                ErrorKind::UnexpectedEof,
270                "not enough capacity for i32",
271            ));
272        }
273        dest.put_i32(*self);
274        trace!("encoding i32: {:#x}", *self);
275        Ok(())
276    }
277}
278
279impl Encoder for u32 {
280    fn write_size(&self, _version: Version) -> usize {
281        4
282    }
283
284    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
285    where
286        T: BufMut,
287    {
288        if dest.remaining_mut() < 4 {
289            return Err(Error::new(
290                ErrorKind::UnexpectedEof,
291                "not enough capacity for u32",
292            ));
293        }
294        dest.put_u32(*self);
295        Ok(())
296    }
297}
298
299impl Encoder for f32 {
300    fn write_size(&self, _version: Version) -> usize {
301        4
302    }
303
304    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
305    where
306        T: BufMut,
307    {
308        if dest.remaining_mut() < 4 {
309            return Err(Error::new(
310                ErrorKind::UnexpectedEof,
311                "not enough capacity for f32",
312            ));
313        }
314        dest.put_f32(*self);
315        Ok(())
316    }
317}
318
319impl Encoder for u64 {
320    fn write_size(&self, _version: Version) -> usize {
321        8
322    }
323
324    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
325    where
326        T: BufMut,
327    {
328        if dest.remaining_mut() < 8 {
329            return Err(Error::new(
330                ErrorKind::UnexpectedEof,
331                "not enough capacity for u64",
332            ));
333        }
334        dest.put_u64(*self);
335        Ok(())
336    }
337}
338
339impl Encoder for i64 {
340    fn write_size(&self, _version: Version) -> usize {
341        8
342    }
343
344    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
345    where
346        T: BufMut,
347    {
348        if dest.remaining_mut() < 8 {
349            return Err(Error::new(
350                ErrorKind::UnexpectedEof,
351                "not enough capacity for i64",
352            ));
353        }
354        dest.put_i64(*self);
355        Ok(())
356    }
357}
358
359impl EncoderVarInt for i64 {
360    fn var_write_size(&self) -> usize {
361        variant_size(*self)
362    }
363
364    fn encode_varint<T>(&self, dest: &mut T) -> Result<(), Error>
365    where
366        T: BufMut,
367    {
368        variant_encode(dest, *self)?;
369        Ok(())
370    }
371}
372
373impl Encoder for f64 {
374    fn write_size(&self, _version: Version) -> usize {
375        8
376    }
377
378    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
379    where
380        T: BufMut,
381    {
382        if dest.remaining_mut() < 8 {
383            return Err(Error::new(
384                ErrorKind::UnexpectedEof,
385                "not enough capacity for f64",
386            ));
387        }
388        dest.put_f64(*self);
389        Ok(())
390    }
391}
392
393impl Encoder for Duration {
394    fn write_size(&self, _version: Version) -> usize {
395        12
396    }
397
398    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
399    where
400        T: BufMut,
401    {
402        if dest.remaining_mut() < 12 {
403            return Err(Error::new(
404                ErrorKind::UnexpectedEof,
405                "not enough capacity for u64+u32",
406            ));
407        }
408        dest.put_u64(self.as_secs());
409        dest.put_u32(self.subsec_nanos());
410        Ok(())
411    }
412}
413
414impl Encoder for String {
415    fn write_size(&self, _version: Version) -> usize {
416        2 + self.len()
417    }
418
419    fn encode<T>(&self, dest: &mut T, _version: Version) -> Result<(), Error>
420    where
421        T: BufMut,
422    {
423        if dest.remaining_mut() < 2 + self.len() {
424            return Err(Error::new(
425                ErrorKind::UnexpectedEof,
426                "not enough capacity for string",
427            ));
428        }
429
430        dest.put_u16(self.len() as u16);
431
432        let mut writer = dest.writer();
433        let bytes_written = writer.write(self.as_bytes())?;
434
435        if bytes_written != self.len() {
436            return Err(Error::new(
437                ErrorKind::UnexpectedEof,
438                format!(
439                    "out of {} bytes, {} not written",
440                    self.len(),
441                    self.len() - bytes_written
442                ),
443            ));
444        }
445
446        Ok(())
447    }
448}
449
450impl<M> Encoder for &M
451where
452    M: Encoder,
453{
454    fn write_size(&self, version: Version) -> usize {
455        (*self).write_size(version)
456    }
457
458    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
459    where
460        T: BufMut,
461    {
462        (*self).encode(dest, version)
463    }
464}
465
466#[cfg(test)]
467mod test {
468
469    use bytes::BufMut;
470    use std::io::Error as IoError;
471
472    use crate::Encoder;
473    use crate::Version;
474
475    #[test]
476    fn test_encode_i8() {
477        let mut dest = vec![];
478        let value: i8 = 5;
479        let result = value.encode(&mut dest, 0);
480        assert!(result.is_ok());
481        assert_eq!(dest.len(), 1);
482        assert_eq!(dest[0], 0x05);
483        assert_eq!(value.write_size(0), 1);
484    }
485
486    #[test]
487    fn test_encode_u8() {
488        let mut dest = vec![];
489        let value: u8 = 8;
490        let result = value.encode(&mut dest, 0);
491        assert!(result.is_ok());
492        assert_eq!(dest.len(), 1);
493        assert_eq!(dest[0], 0x08);
494        assert_eq!(value.write_size(0), 1);
495    }
496
497    #[test]
498    fn test_encode_i16() {
499        let mut dest = vec![];
500        let value: i16 = 5;
501        let result = value.encode(&mut dest, 0);
502        assert!(result.is_ok());
503        assert_eq!(dest.len(), 2);
504        assert_eq!(dest[0], 0x00);
505        assert_eq!(dest[1], 0x05);
506        assert_eq!(value.write_size(0), 2);
507    }
508
509    #[test]
510    fn test_encode_u16() {
511        let mut dest = vec![];
512        let value: u16 = 16;
513        let result = value.encode(&mut dest, 0);
514        assert!(result.is_ok());
515        assert_eq!(dest.len(), 2);
516        assert_eq!(dest[0], 0x00);
517        assert_eq!(dest[1], 0x10);
518        assert_eq!(value.write_size(0), 2);
519    }
520
521    #[test]
522    fn test_encode_option_u16_none() {
523        let mut dest = vec![];
524        let value: Option<u16> = None;
525        let result = value.encode(&mut dest, 0);
526        assert!(result.is_ok());
527        assert_eq!(dest.len(), 1);
528        assert_eq!(dest[0], 0x00);
529        assert_eq!(value.write_size(0), 1);
530    }
531
532    #[test]
533    fn test_encode_option_u16_with_val() {
534        let mut dest = vec![];
535        let value: Option<u16> = Some(16);
536        let result = value.encode(&mut dest, 0);
537        assert!(result.is_ok());
538        assert_eq!(dest.len(), 3);
539        assert_eq!(dest[0], 0x01);
540        assert_eq!(dest[1], 0x00);
541        assert_eq!(dest[2], 0x10);
542        assert_eq!(value.write_size(0), 3);
543    }
544
545    #[test]
546    fn test_encode_u32() {
547        let mut dest = vec![];
548        let value: u32 = 16;
549        let result = value.encode(&mut dest, 0);
550        assert!(result.is_ok());
551        assert_eq!(dest, vec![0x00, 0x00, 0x00, 0x10]);
552        assert_eq!(value.write_size(0), 4);
553    }
554
555    #[test]
556    fn test_encode_option_u32_none() {
557        let mut dest = vec![];
558        let value: Option<u32> = None;
559        let result = value.encode(&mut dest, 0);
560        assert!(result.is_ok());
561        assert_eq!(dest.len(), 1);
562        assert_eq!(dest[0], 0x00);
563        assert_eq!(value.write_size(0), 1);
564    }
565
566    #[test]
567    fn test_encode_option_u32_with_val() {
568        let mut dest = vec![];
569        let value: Option<u32> = Some(16);
570        let result = value.encode(&mut dest, 0);
571        assert!(result.is_ok());
572        assert_eq!(dest, vec![0x01, 0x00, 0x00, 0x00, 0x10]);
573        assert_eq!(value.write_size(0), 5);
574    }
575
576    #[test]
577    fn test_encode_u64() {
578        let mut dest = vec![];
579        let value: u64 = 16;
580        let result = value.encode(&mut dest, 0);
581        assert!(result.is_ok());
582        assert_eq!(dest, vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10]);
583        assert_eq!(value.write_size(0), 8);
584    }
585
586    #[test]
587    fn test_encode_option_u64_none() {
588        let mut dest = vec![];
589        let value: Option<u64> = None;
590        let result = value.encode(&mut dest, 0);
591        assert!(result.is_ok());
592        assert_eq!(dest.len(), 1);
593        assert_eq!(dest[0], 0x00);
594        assert_eq!(value.write_size(0), 1);
595    }
596
597    #[test]
598    fn test_encode_option_u64_with_val() {
599        let mut dest = vec![];
600        let value: Option<u64> = Some(16);
601        let result = value.encode(&mut dest, 0);
602        assert!(result.is_ok());
603        assert_eq!(
604            dest,
605            vec![0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10]
606        );
607        assert_eq!(value.write_size(0), 9);
608    }
609
610    #[test]
611    fn test_encode_i32() {
612        let mut dest = vec![];
613        let value: i32 = 5;
614        let result = value.encode(&mut dest, 0);
615        assert!(result.is_ok());
616        assert_eq!(dest.len(), 4);
617        assert_eq!(dest[3], 0x05);
618        assert_eq!(value.write_size(0), 4);
619    }
620
621    #[test]
622    fn test_encode_i64() {
623        let mut dest = vec![];
624        let value: i64 = 5;
625        let result = value.encode(&mut dest, 0);
626        assert!(result.is_ok());
627        assert_eq!(dest.len(), 8);
628        assert_eq!(dest[0], 0x00);
629        assert_eq!(dest[7], 0x05);
630        assert_eq!(value.write_size(0), 8);
631    }
632
633    #[test]
634    fn test_encode_string_option_none() {
635        let mut dest = vec![];
636        let value: Option<String> = None;
637        let result = value.encode(&mut dest, 0);
638        assert!(result.is_ok());
639        assert_eq!(dest.len(), 1);
640        assert_eq!(dest[0], 0x00);
641        assert_eq!(value.write_size(0), 1);
642    }
643
644    #[test]
645    fn test_encode_string_option_some() {
646        let mut dest = vec![];
647        let value: Option<String> = Some(String::from("wo"));
648        let result = value.encode(&mut dest, 0);
649        assert!(result.is_ok());
650        assert_eq!(dest.len(), 5);
651        assert_eq!(dest[0], 0x01);
652        assert_eq!(dest[1], 0x00);
653        assert_eq!(dest[2], 0x02);
654        assert_eq!(dest[3], 0x77);
655        assert_eq!(dest[4], 0x6f);
656        assert_eq!(value.write_size(0), 5);
657    }
658
659    #[test]
660    fn test_encode_string() {
661        let mut dest = vec![];
662        let value = String::from("wo");
663        let result = value.encode(&mut dest, 0);
664        assert!(result.is_ok());
665        assert_eq!(dest.len(), 4);
666        assert_eq!(dest[0], 0x00);
667        assert_eq!(dest[1], 0x02);
668        assert_eq!(dest[2], 0x77);
669        assert_eq!(dest[3], 0x6f);
670        assert_eq!(value.write_size(0), 4);
671    }
672
673    #[test]
674    fn test_encode_bool() {
675        let mut dest = vec![];
676        let value = true;
677        let result = value.encode(&mut dest, 0);
678        assert!(result.is_ok());
679        assert_eq!(dest.len(), 1);
680        assert_eq!(dest[0], 0x01);
681        assert_eq!(value.write_size(0), 1);
682    }
683
684    #[test]
685    fn test_encode_string_vectors() {
686        let mut dest = vec![];
687        let value: Vec<String> = vec![String::from("test")];
688        let result = value.encode(&mut dest, 0);
689        assert!(result.is_ok());
690        assert_eq!(dest.len(), 10);
691        assert_eq!(dest[3], 0x01);
692        assert_eq!(dest[9], 0x74);
693        assert_eq!(value.write_size(0), dest.len()); // vec len 4: string len: 2, string 4
694    }
695
696    #[test]
697    fn test_encode_u8_vectors() {
698        let mut dest = vec![];
699        let value: Vec<u8> = vec![0x10, 0x11];
700        let result = value.encode(&mut dest, 0);
701        assert!(result.is_ok());
702        assert_eq!(dest.len(), 6);
703        assert_eq!(dest[3], 0x02);
704        assert_eq!(dest[5], 0x11);
705        assert_eq!(value.write_size(0), dest.len());
706    }
707    #[test]
708    fn test_encode_u8_vectors_big() {
709        let mut dest = vec![];
710        let value: Vec<u8> = vec![0x10; 257];
711        let result = value.encode(&mut dest, 0);
712        assert!(result.is_ok());
713        assert_eq!(dest.len(), 257 + 4);
714        assert_eq!(dest[4..257 + 4], vec![0x10; 257]);
715        assert_eq!(value.write_size(0), dest.len());
716    }
717
718    #[derive(Default)]
719    struct TestRecord {
720        value: i8,
721        value2: i8,
722    }
723
724    impl Encoder for TestRecord {
725        fn write_size(&self, version: Version) -> usize {
726            self.value.write_size(version) + {
727                if version > 1 {
728                    self.value2.write_size(version)
729                } else {
730                    0
731                }
732            }
733        }
734
735        fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), IoError>
736        where
737            T: BufMut,
738        {
739            self.value.encode(dest, version)?;
740            if version > 1 {
741                self.value2.encode(dest, version)?;
742            }
743            Ok(())
744        }
745    }
746
747    #[test]
748    fn test_encoding_struct() {
749        // v1
750        let mut dest = vec![];
751        let record = TestRecord {
752            value: 20,
753            value2: 10,
754        };
755        record.encode(&mut dest, 0).expect("encode");
756        assert_eq!(dest.len(), 1);
757        assert_eq!(dest[0], 20);
758        assert_eq!(record.write_size(0), 1);
759
760        let mut dest2 = vec![];
761        record.encode(&mut dest2, 2).expect("encodv2 encodee");
762        assert_eq!(dest2.len(), 2);
763        assert_eq!(dest2[1], 10);
764
765        // v2
766        /*
767        let data2 = [0x06,0x09];
768        let record2 = TestRecord::decode_from(&mut Cursor::new(&data2),2).expect("decode");
769        assert_eq!(record2.value, 6);
770        assert_eq!(record2.value2, 9);
771        */
772    }
773}