kafka_protocol/protocol/
types.rs

1//! Raw types of the Kafka protocol, as defined by the protocol specification.
2//!
3//! In general, most types map closely to a corresponding rust type, with the exception of a number
4//! types that use zigzag encoded to represent length as a "compact" representation.
5//!
6//! It is unnecessary to interact directly with these types for most use cases.
7use super::{Decodable, Decoder, Encodable, Encoder, NewType, StrBytes};
8use crate::protocol::buf::{ByteBuf, ByteBufMut};
9use anyhow::{bail, Result};
10use std::convert::TryFrom;
11use std::string::String as StdString;
12
13macro_rules! define_copy_impl {
14    ($e:ident, $t:ty) => (
15        impl Encoder<$t> for $e {
16            fn encode<B: ByteBufMut>(&self, buf: &mut B, value: $t) -> Result<()> {
17                self.encode(buf, &value)
18            }
19            fn compute_size(&self, value: $t) -> Result<usize> {
20                self.compute_size(&value)
21            }
22            fn fixed_size(&self) -> Option<usize> {
23                <Self as Encoder<&$t>>::fixed_size(self)
24            }
25        }
26    )
27}
28
29/// A boolean value.
30#[derive(Debug, Copy, Clone, Default)]
31pub struct Boolean;
32
33impl<T: NewType<bool>> Encoder<&T> for Boolean {
34    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &T) -> Result<()> {
35        buf.put_u8(if *value.borrow() { 1 } else { 0 });
36        Ok(())
37    }
38    fn compute_size(&self, _value: &T) -> Result<usize> {
39        Ok(1)
40    }
41    fn fixed_size(&self) -> Option<usize> {
42        Some(1)
43    }
44}
45
46impl<T: NewType<bool>> Decoder<T> for Boolean {
47    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<T> {
48        Ok((buf.try_get_u8()? != 0).into())
49    }
50}
51
52define_copy_impl!(Boolean, bool);
53
54macro_rules! define_simple_ints {
55    ($($name:ident: $t:ty [$put:ident, $get:ident],)*) => (
56        $(
57            /// A struct representing [`$ty`]
58            #[derive(Debug, Copy, Clone)]
59            pub struct $name;
60
61            impl<T: NewType<$t>> Encoder<&T> for $name {
62                fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &T) -> Result<()> {
63                    Ok(buf.$put(*value.borrow()))
64                }
65                fn compute_size(&self, _value: &T) -> Result<usize> {
66                    Ok(std::mem::size_of::<$t>())
67                }
68                fn fixed_size(&self) -> Option<usize> {
69                    Some(std::mem::size_of::<$t>())
70                }
71            }
72
73            impl<T: NewType<$t>> Decoder<T> for $name {
74                fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<T> {
75                    Ok(buf.$get()?.into())
76                }
77            }
78
79            define_copy_impl!($name, $t);
80        )*
81    )
82}
83
84define_simple_ints! {
85    Int8: i8 [put_i8, try_get_i8],
86    Int16: i16 [put_i16, try_get_i16],
87    UInt16: u16 [put_u16, try_get_u16],
88    Int32: i32 [put_i32, try_get_i32],
89    Int64: i64 [put_i64, try_get_i64],
90    UInt32: u32 [put_u32, try_get_u32],
91    Float64: f64 [put_f64, try_get_f64],
92}
93
94/// An unsigned zigzag encoded int.
95#[derive(Debug, Copy, Clone, Default)]
96pub struct UnsignedVarInt;
97
98impl<T: NewType<u32>> Encoder<&T> for UnsignedVarInt {
99    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &T) -> Result<()> {
100        let mut value = *value.borrow();
101        while value >= 0x80 {
102            buf.put_u8((value as u8) | 0x80);
103            value >>= 7;
104        }
105        buf.put_u8(value as u8);
106        Ok(())
107    }
108    fn compute_size(&self, value: &T) -> Result<usize> {
109        let value = *value.borrow();
110        Ok(match value {
111            0x0..=0x7f => 1,
112            0x80..=0x3fff => 2,
113            0x4000..=0x1fffff => 3,
114            0x200000..=0xfffffff => 4,
115            0x10000000..=0xffffffff => 5,
116        })
117    }
118}
119
120impl<T: NewType<u32>> Decoder<T> for UnsignedVarInt {
121    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<T> {
122        let mut value = 0;
123        for i in 0..5 {
124            let b = buf.try_get_u8()? as u32;
125            value |= (b & 0x7F) << (i * 7);
126            if b < 0x80 {
127                break;
128            }
129        }
130        Ok(value.into())
131    }
132}
133
134define_copy_impl!(UnsignedVarInt, u32);
135
136/// An unsigned zigzag encoded long.
137#[derive(Debug, Copy, Clone, Default)]
138pub struct UnsignedVarLong;
139
140impl<T: NewType<u64>> Encoder<&T> for UnsignedVarLong {
141    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &T) -> Result<()> {
142        let mut value = *value.borrow();
143        while value >= 0x80 {
144            buf.put_u8((value as u8) | 0x80);
145            value >>= 7;
146        }
147        buf.put_u8(value as u8);
148        Ok(())
149    }
150    fn compute_size(&self, value: &T) -> Result<usize> {
151        let value = *value.borrow();
152        Ok(match value {
153            0x0..=0x7f => 1,
154            0x80..=0x3fff => 2,
155            0x4000..=0x1fffff => 3,
156            0x200000..=0xfffffff => 4,
157            0x10000000..=0x7ffffffff => 5,
158            0x800000000..=0x3ffffffffff => 6,
159            0x40000000000..=0x1ffffffffffff => 7,
160            0x2000000000000..=0xffffffffffffff => 8,
161            0x100000000000000..=0x7fffffffffffffff => 9,
162            0x8000000000000000..=0xffffffffffffffff => 10,
163        })
164    }
165}
166
167impl<T: NewType<u64>> Decoder<T> for UnsignedVarLong {
168    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<T> {
169        let mut value = 0;
170        for i in 0..10 {
171            let b = buf.try_get_u8()? as u64;
172            value |= (b & 0x7F) << (i * 7);
173            if b < 0x80 {
174                break;
175            }
176        }
177        Ok(value.into())
178    }
179}
180
181define_copy_impl!(UnsignedVarLong, u64);
182
183/// A zizag encoded int.
184#[derive(Debug, Copy, Clone, Default)]
185pub struct VarInt;
186
187impl<T: NewType<i32>> Encoder<&T> for VarInt {
188    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &T) -> Result<()> {
189        let value = *value.borrow();
190        let zigzag = ((value << 1) ^ (value >> 31)) as u32;
191        UnsignedVarInt.encode(buf, zigzag)
192    }
193    fn compute_size(&self, value: &T) -> Result<usize> {
194        let value = *value.borrow();
195        let zigzag = ((value << 1) ^ (value >> 31)) as u32;
196        UnsignedVarInt.compute_size(zigzag)
197    }
198}
199
200impl<T: NewType<i32>> Decoder<T> for VarInt {
201    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<T> {
202        let zigzag: u32 = UnsignedVarInt.decode(buf)?;
203        Ok((((zigzag >> 1) as i32) ^ (-((zigzag & 1) as i32))).into())
204    }
205}
206
207define_copy_impl!(VarInt, i32);
208
209/// A zizag encoded long.
210#[derive(Debug, Copy, Clone, Default)]
211pub struct VarLong;
212
213impl<T: NewType<i64>> Encoder<&T> for VarLong {
214    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &T) -> Result<()> {
215        let value = *value.borrow();
216        let zigzag = ((value << 1) ^ (value >> 63)) as u64;
217        UnsignedVarLong.encode(buf, &zigzag)
218    }
219    fn compute_size(&self, value: &T) -> Result<usize> {
220        let value = *value.borrow();
221        let zigzag = ((value << 1) ^ (value >> 63)) as u64;
222        UnsignedVarLong.compute_size(zigzag)
223    }
224}
225
226impl<T: NewType<i64>> Decoder<T> for VarLong {
227    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<T> {
228        let zigzag: u64 = UnsignedVarLong.decode(buf)?;
229        Ok((((zigzag >> 1) as i64) ^ (-((zigzag & 1) as i64))).into())
230    }
231}
232
233define_copy_impl!(VarLong, i64);
234
235/// A v4 UUID.
236#[derive(Debug, Copy, Clone, Default)]
237pub struct Uuid;
238
239impl<T: NewType<uuid::Uuid>> Encoder<&T> for Uuid {
240    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &T) -> Result<()> {
241        buf.put_slice(value.borrow().as_bytes());
242        Ok(())
243    }
244    fn compute_size(&self, _value: &T) -> Result<usize> {
245        Ok(16)
246    }
247    fn fixed_size(&self) -> Option<usize> {
248        Some(16)
249    }
250}
251
252impl<T: NewType<uuid::Uuid>> Decoder<T> for Uuid {
253    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<T> {
254        let mut result = [0; 16];
255        buf.try_copy_to_slice(&mut result)?;
256        Ok(uuid::Uuid::from_bytes(result).into())
257    }
258}
259
260define_copy_impl!(Uuid, uuid::Uuid);
261
262/// A string of length up to [`i16::MAX`].
263#[derive(Debug, Copy, Clone, Default)]
264pub struct String;
265
266impl Encoder<Option<&str>> for String {
267    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&str>) -> Result<()> {
268        if let Some(s) = value {
269            if s.len() > i16::MAX as usize {
270                bail!("String is too long to encode ({} bytes)", s.len());
271            } else {
272                Int16.encode(buf, s.len() as i16)?;
273                buf.put_slice(s.as_bytes());
274                Ok(())
275            }
276        } else {
277            Int16.encode(buf, -1)?;
278            Ok(())
279        }
280    }
281    fn compute_size(&self, value: Option<&str>) -> Result<usize> {
282        if let Some(s) = value {
283            if s.len() > i16::MAX as usize {
284                bail!("String is too long to encode ({} bytes)", s.len());
285            } else {
286                Ok(2 + s.len())
287            }
288        } else {
289            Ok(2)
290        }
291    }
292}
293
294impl Encoder<Option<&StdString>> for String {
295    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&StdString>) -> Result<()> {
296        String.encode(buf, value.map(|s| s.as_str()))
297    }
298    fn compute_size(&self, value: Option<&StdString>) -> Result<usize> {
299        String.compute_size(value.map(|s| s.as_str()))
300    }
301}
302
303impl Encoder<&Option<StdString>> for String {
304    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Option<StdString>) -> Result<()> {
305        String.encode(buf, value.as_ref())
306    }
307    fn compute_size(&self, value: &Option<StdString>) -> Result<usize> {
308        String.compute_size(value.as_ref())
309    }
310}
311
312impl<T: NewType<StrBytes>> Encoder<Option<&T>> for String {
313    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&T>) -> Result<()> {
314        String.encode(buf, value.map(|s| &**s.borrow()))
315    }
316    fn compute_size(&self, value: Option<&T>) -> Result<usize> {
317        String.compute_size(value.map(|s| &**s.borrow()))
318    }
319}
320
321impl<T: NewType<StrBytes>> Encoder<&Option<T>> for String {
322    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Option<T>) -> Result<()> {
323        String.encode(buf, value.as_ref())
324    }
325    fn compute_size(&self, value: &Option<T>) -> Result<usize> {
326        String.compute_size(value.as_ref())
327    }
328}
329
330impl Encoder<&str> for String {
331    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &str) -> Result<()> {
332        String.encode(buf, Some(value))
333    }
334    fn compute_size(&self, value: &str) -> Result<usize> {
335        String.compute_size(Some(value))
336    }
337}
338
339impl Encoder<&StdString> for String {
340    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &StdString) -> Result<()> {
341        String.encode(buf, Some(value))
342    }
343    fn compute_size(&self, value: &StdString) -> Result<usize> {
344        String.compute_size(Some(value))
345    }
346}
347
348impl<T: NewType<StrBytes>> Encoder<&T> for String {
349    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &T) -> Result<()> {
350        String.encode(buf, Some(value))
351    }
352    fn compute_size(&self, value: &T) -> Result<usize> {
353        String.compute_size(Some(value))
354    }
355}
356
357impl Decoder<Option<StdString>> for String {
358    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Option<StdString>> {
359        match Int16.decode(buf)? {
360            -1 => Ok(None),
361            n if n >= 0 => {
362                let mut strbuf = vec![0; n as usize];
363                buf.try_copy_to_slice(&mut strbuf)?;
364                Ok(Some(std::string::String::from_utf8(strbuf)?))
365            }
366            n => {
367                bail!("String length is negative ({})", n);
368            }
369        }
370    }
371}
372
373impl<T: NewType<StrBytes>> Decoder<Option<T>> for String {
374    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Option<T>> {
375        match Int16.decode(buf)? {
376            -1 => Ok(None),
377            n if n >= 0 => {
378                let strbuf = StrBytes::try_from(buf.try_get_bytes(n as usize)?)?;
379                Ok(Some(strbuf.into()))
380            }
381            n => {
382                bail!("String length is negative ({})", n);
383            }
384        }
385    }
386}
387
388impl Decoder<StdString> for String {
389    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<StdString> {
390        match String.decode(buf) {
391            Ok(None) => {
392                bail!("String length is negative (-1)");
393            }
394            Ok(Some(s)) => Ok(s),
395            Err(e) => Err(e),
396        }
397    }
398}
399
400impl<T: NewType<StrBytes>> Decoder<T> for String {
401    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<T> {
402        match String.decode(buf) {
403            Ok(None) => {
404                bail!("String length is negative (-1)");
405            }
406            Ok(Some(s)) => Ok(s),
407            Err(e) => Err(e),
408        }
409    }
410}
411
412/// A string whose length is encoded with a `u32` or varint.
413#[derive(Debug, Copy, Clone, Default)]
414pub struct CompactString;
415
416impl Encoder<Option<&str>> for CompactString {
417    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&str>) -> Result<()> {
418        if let Some(s) = value {
419            // Use >= because we're going to add one to the length
420            if s.len() >= u32::MAX as usize {
421                bail!("CompactString is too long to encode ({} bytes)", s.len());
422            } else {
423                UnsignedVarInt.encode(buf, (s.len() as u32) + 1)?;
424                buf.put_slice(s.as_bytes());
425                Ok(())
426            }
427        } else {
428            UnsignedVarInt.encode(buf, 0)?;
429            Ok(())
430        }
431    }
432    fn compute_size(&self, value: Option<&str>) -> Result<usize> {
433        if let Some(s) = value {
434            // Use >= because we're going to add one to the length
435            if s.len() >= u32::MAX as usize {
436                bail!("CompactString is too long to encode ({} bytes)", s.len());
437            } else {
438                Ok(UnsignedVarInt.compute_size((s.len() as u32) + 1)? + s.len())
439            }
440        } else {
441            Ok(1)
442        }
443    }
444}
445
446impl Encoder<Option<&StdString>> for CompactString {
447    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&StdString>) -> Result<()> {
448        CompactString.encode(buf, value.map(|s| s.as_str()))
449    }
450    fn compute_size(&self, value: Option<&StdString>) -> Result<usize> {
451        CompactString.compute_size(value.map(|s| s.as_str()))
452    }
453}
454
455impl<T: NewType<StrBytes>> Encoder<Option<&T>> for CompactString {
456    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&T>) -> Result<()> {
457        CompactString.encode(buf, value.map(|s| &**s.borrow()))
458    }
459    fn compute_size(&self, value: Option<&T>) -> Result<usize> {
460        CompactString.compute_size(value.map(|s| &**s.borrow()))
461    }
462}
463
464impl Encoder<&Option<StdString>> for CompactString {
465    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Option<StdString>) -> Result<()> {
466        CompactString.encode(buf, value.as_ref())
467    }
468    fn compute_size(&self, value: &Option<StdString>) -> Result<usize> {
469        CompactString.compute_size(value.as_ref())
470    }
471}
472
473impl<T: NewType<StrBytes>> Encoder<&Option<T>> for CompactString {
474    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Option<T>) -> Result<()> {
475        CompactString.encode(buf, value.as_ref())
476    }
477    fn compute_size(&self, value: &Option<T>) -> Result<usize> {
478        CompactString.compute_size(value.as_ref())
479    }
480}
481
482impl Encoder<&str> for CompactString {
483    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &str) -> Result<()> {
484        CompactString.encode(buf, Some(value))
485    }
486    fn compute_size(&self, value: &str) -> Result<usize> {
487        CompactString.compute_size(Some(value))
488    }
489}
490
491impl Encoder<&StdString> for CompactString {
492    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &StdString) -> Result<()> {
493        CompactString.encode(buf, Some(value))
494    }
495    fn compute_size(&self, value: &StdString) -> Result<usize> {
496        CompactString.compute_size(Some(value))
497    }
498}
499
500impl<T: NewType<StrBytes>> Encoder<&T> for CompactString {
501    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &T) -> Result<()> {
502        CompactString.encode(buf, Some(value))
503    }
504    fn compute_size(&self, value: &T) -> Result<usize> {
505        CompactString.compute_size(Some(value))
506    }
507}
508
509impl Decoder<Option<StdString>> for CompactString {
510    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Option<StdString>> {
511        match UnsignedVarInt.decode(buf)? {
512            0 => Ok(None),
513            n => {
514                let mut strbuf = vec![0; (n - 1) as usize];
515                buf.try_copy_to_slice(&mut strbuf)?;
516                Ok(Some(std::string::String::from_utf8(strbuf)?))
517            }
518        }
519    }
520}
521
522impl<T: NewType<StrBytes>> Decoder<Option<T>> for CompactString {
523    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Option<T>> {
524        match UnsignedVarInt.decode(buf)? {
525            0 => Ok(None),
526            n => {
527                let strbuf = StrBytes::try_from(buf.try_get_bytes((n - 1) as usize)?)?;
528                Ok(Some(strbuf.into()))
529            }
530        }
531    }
532}
533
534impl Decoder<StdString> for CompactString {
535    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<StdString> {
536        match CompactString.decode(buf) {
537            Ok(None) => {
538                bail!("CompactString length is negative (-1)");
539            }
540            Ok(Some(s)) => Ok(s),
541            Err(e) => Err(e),
542        }
543    }
544}
545
546impl<T: NewType<StrBytes>> Decoder<T> for CompactString {
547    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<T> {
548        match CompactString.decode(buf) {
549            Ok(None) => {
550                bail!("CompactString length is negative (-1)");
551            }
552            Ok(Some(s)) => Ok(s),
553            Err(e) => Err(e),
554        }
555    }
556}
557
558/// A sequence of bytes, up to [`i32::MAX`] long.
559#[derive(Debug, Copy, Clone, Default)]
560pub struct Bytes;
561
562impl Encoder<Option<&[u8]>> for Bytes {
563    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&[u8]>) -> Result<()> {
564        if let Some(s) = value {
565            if s.len() > i32::MAX as usize {
566                bail!("Data is too long to encode ({} bytes)", s.len());
567            } else {
568                Int32.encode(buf, s.len() as i32)?;
569                buf.put_slice(s);
570                Ok(())
571            }
572        } else {
573            Int32.encode(buf, -1)?;
574            Ok(())
575        }
576    }
577    fn compute_size(&self, value: Option<&[u8]>) -> Result<usize> {
578        if let Some(s) = value {
579            if s.len() > i32::MAX as usize {
580                bail!("Data is too long to encode ({} bytes)", s.len());
581            } else {
582                Ok(4 + s.len())
583            }
584        } else {
585            Ok(4)
586        }
587    }
588}
589
590impl Encoder<Option<&Vec<u8>>> for Bytes {
591    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&Vec<u8>>) -> Result<()> {
592        Bytes.encode(buf, value.map(|s| s.as_slice()))
593    }
594    fn compute_size(&self, value: Option<&Vec<u8>>) -> Result<usize> {
595        Bytes.compute_size(value.map(|s| s.as_slice()))
596    }
597}
598
599impl Encoder<&Option<Vec<u8>>> for Bytes {
600    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Option<Vec<u8>>) -> Result<()> {
601        Bytes.encode(buf, value.as_ref())
602    }
603    fn compute_size(&self, value: &Option<Vec<u8>>) -> Result<usize> {
604        Bytes.compute_size(value.as_ref())
605    }
606}
607
608impl Encoder<Option<&bytes::Bytes>> for Bytes {
609    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&bytes::Bytes>) -> Result<()> {
610        Bytes.encode(buf, value.map(|s| &**s))
611    }
612    fn compute_size(&self, value: Option<&bytes::Bytes>) -> Result<usize> {
613        Bytes.compute_size(value.map(|s| &**s))
614    }
615}
616
617impl Encoder<&Option<bytes::Bytes>> for Bytes {
618    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Option<bytes::Bytes>) -> Result<()> {
619        Bytes.encode(buf, value.as_ref())
620    }
621    fn compute_size(&self, value: &Option<bytes::Bytes>) -> Result<usize> {
622        Bytes.compute_size(value.as_ref())
623    }
624}
625
626impl Encoder<&[u8]> for Bytes {
627    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &[u8]) -> Result<()> {
628        Bytes.encode(buf, Some(value))
629    }
630    fn compute_size(&self, value: &[u8]) -> Result<usize> {
631        Bytes.compute_size(Some(value))
632    }
633}
634
635impl Encoder<&Vec<u8>> for Bytes {
636    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Vec<u8>) -> Result<()> {
637        Bytes.encode(buf, Some(value))
638    }
639    fn compute_size(&self, value: &Vec<u8>) -> Result<usize> {
640        Bytes.compute_size(Some(value))
641    }
642}
643
644impl Encoder<&bytes::Bytes> for Bytes {
645    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &bytes::Bytes) -> Result<()> {
646        Bytes.encode(buf, Some(value))
647    }
648    fn compute_size(&self, value: &bytes::Bytes) -> Result<usize> {
649        Bytes.compute_size(Some(value))
650    }
651}
652
653impl Decoder<Option<Vec<u8>>> for Bytes {
654    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Option<Vec<u8>>> {
655        match Int32.decode(buf)? {
656            -1 => Ok(None),
657            n if n >= 0 => {
658                let mut data = vec![0; n as usize];
659                buf.try_copy_to_slice(&mut data)?;
660                Ok(Some(data))
661            }
662            n => {
663                bail!("Data length is negative ({})", n);
664            }
665        }
666    }
667}
668
669impl Decoder<Option<bytes::Bytes>> for Bytes {
670    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Option<bytes::Bytes>> {
671        match Int32.decode(buf)? {
672            -1 => Ok(None),
673            n if n >= 0 => Ok(Some(buf.try_get_bytes(n as usize)?)),
674            n => {
675                bail!("Data length is negative ({})", n);
676            }
677        }
678    }
679}
680
681impl Decoder<Vec<u8>> for Bytes {
682    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Vec<u8>> {
683        match Bytes.decode(buf) {
684            Ok(None) => {
685                bail!("Data length is negative (-1)");
686            }
687            Ok(Some(s)) => Ok(s),
688            Err(e) => Err(e),
689        }
690    }
691}
692
693impl Decoder<bytes::Bytes> for Bytes {
694    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<bytes::Bytes> {
695        match Bytes.decode(buf) {
696            Ok(None) => {
697                bail!("Data length is negative (-1)");
698            }
699            Ok(Some(s)) => Ok(s),
700            Err(e) => Err(e),
701        }
702    }
703}
704
705/// A sequence of bytes that is encoded with a `u32` or a varint, depending on size.
706#[derive(Debug, Copy, Clone, Default)]
707pub struct CompactBytes;
708
709impl Encoder<Option<&[u8]>> for CompactBytes {
710    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&[u8]>) -> Result<()> {
711        if let Some(s) = value {
712            // Use >= because we're going to add one to the length
713            if s.len() >= u32::MAX as usize {
714                bail!("CompactBytes is too long to encode ({} bytes)", s.len());
715            } else {
716                UnsignedVarInt.encode(buf, (s.len() as u32) + 1)?;
717                buf.put_slice(s);
718                Ok(())
719            }
720        } else {
721            UnsignedVarInt.encode(buf, 0)?;
722            Ok(())
723        }
724    }
725    fn compute_size(&self, value: Option<&[u8]>) -> Result<usize> {
726        if let Some(s) = value {
727            // Use >= because we're going to add one to the length
728            if s.len() >= u32::MAX as usize {
729                bail!("CompactBytes is too long to encode ({} bytes)", s.len());
730            } else {
731                Ok(UnsignedVarInt.compute_size((s.len() as u32) + 1)? + s.len())
732            }
733        } else {
734            Ok(1)
735        }
736    }
737}
738
739impl Encoder<Option<&Vec<u8>>> for CompactBytes {
740    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&Vec<u8>>) -> Result<()> {
741        CompactBytes.encode(buf, value.map(|s| s.as_slice()))
742    }
743    fn compute_size(&self, value: Option<&Vec<u8>>) -> Result<usize> {
744        CompactBytes.compute_size(value.map(|s| s.as_slice()))
745    }
746}
747
748impl Encoder<&Option<Vec<u8>>> for CompactBytes {
749    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Option<Vec<u8>>) -> Result<()> {
750        CompactBytes.encode(buf, value.as_ref())
751    }
752    fn compute_size(&self, value: &Option<Vec<u8>>) -> Result<usize> {
753        CompactBytes.compute_size(value.as_ref())
754    }
755}
756
757impl Encoder<Option<&bytes::Bytes>> for CompactBytes {
758    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&bytes::Bytes>) -> Result<()> {
759        CompactBytes.encode(buf, value.map(|s| &**s))
760    }
761    fn compute_size(&self, value: Option<&bytes::Bytes>) -> Result<usize> {
762        CompactBytes.compute_size(value.map(|s| &**s))
763    }
764}
765
766impl Encoder<&Option<bytes::Bytes>> for CompactBytes {
767    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Option<bytes::Bytes>) -> Result<()> {
768        CompactBytes.encode(buf, value.as_ref())
769    }
770    fn compute_size(&self, value: &Option<bytes::Bytes>) -> Result<usize> {
771        CompactBytes.compute_size(value.as_ref())
772    }
773}
774
775impl Encoder<&[u8]> for CompactBytes {
776    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &[u8]) -> Result<()> {
777        CompactBytes.encode(buf, Some(value))
778    }
779    fn compute_size(&self, value: &[u8]) -> Result<usize> {
780        CompactBytes.compute_size(Some(value))
781    }
782}
783
784impl Encoder<&Vec<u8>> for CompactBytes {
785    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Vec<u8>) -> Result<()> {
786        CompactBytes.encode(buf, Some(value))
787    }
788    fn compute_size(&self, value: &Vec<u8>) -> Result<usize> {
789        CompactBytes.compute_size(Some(value))
790    }
791}
792
793impl Encoder<&bytes::Bytes> for CompactBytes {
794    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &bytes::Bytes) -> Result<()> {
795        CompactBytes.encode(buf, Some(value))
796    }
797    fn compute_size(&self, value: &bytes::Bytes) -> Result<usize> {
798        CompactBytes.compute_size(Some(value))
799    }
800}
801
802impl Decoder<Option<Vec<u8>>> for CompactBytes {
803    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Option<Vec<u8>>> {
804        match UnsignedVarInt.decode(buf)? {
805            0 => Ok(None),
806            n => {
807                let mut data = vec![0; (n - 1) as usize];
808                buf.try_copy_to_slice(&mut data)?;
809                Ok(Some(data))
810            }
811        }
812    }
813}
814
815impl Decoder<Option<bytes::Bytes>> for CompactBytes {
816    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Option<bytes::Bytes>> {
817        match UnsignedVarInt.decode(buf)? {
818            0 => Ok(None),
819            n => Ok(Some(buf.try_get_bytes((n - 1) as usize)?)),
820        }
821    }
822}
823
824impl Decoder<Vec<u8>> for CompactBytes {
825    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Vec<u8>> {
826        match CompactBytes.decode(buf) {
827            Ok(None) => {
828                bail!("Data length is negative (-1)");
829            }
830            Ok(Some(s)) => Ok(s),
831            Err(e) => Err(e),
832        }
833    }
834}
835
836impl Decoder<bytes::Bytes> for CompactBytes {
837    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<bytes::Bytes> {
838        match CompactBytes.decode(buf) {
839            Ok(None) => {
840                bail!("Data length is negative (-1)");
841            }
842            Ok(Some(s)) => Ok(s),
843            Err(e) => Err(e),
844        }
845    }
846}
847
848/// A struct, which is encoded according to the type it represents.
849#[derive(Debug, Copy, Clone, Default)]
850pub struct Struct {
851    /// The version of the struct.
852    pub version: i16,
853}
854
855impl<T: Encodable> Encoder<&T> for Struct {
856    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &T) -> Result<()> {
857        value.encode(buf, self.version)
858    }
859    fn compute_size(&self, value: &T) -> Result<usize> {
860        value.compute_size(self.version)
861    }
862}
863
864impl<T: Decodable> Decoder<T> for Struct {
865    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<T> {
866        T::decode(buf, self.version)
867    }
868}
869
870/// An optional structure
871#[derive(Debug, Copy, Clone)]
872pub struct OptionStruct {
873    /// The version of the struct.
874    pub version: i16,
875}
876
877impl<T: Encodable> Encoder<&Option<T>> for OptionStruct {
878    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Option<T>) -> Result<()> {
879        if let Some(value) = value {
880            Int8.encode(buf, 1)?;
881            value.encode(buf, self.version)?;
882        } else {
883            Int8.encode(buf, -1)?;
884        }
885        Ok(())
886    }
887
888    fn compute_size(&self, value: &Option<T>) -> Result<usize> {
889        Ok(match value {
890            Some(value) => 1 + value.compute_size(self.version)?,
891            None => 1,
892        })
893    }
894}
895
896impl<T: Decodable> Decoder<Option<T>> for OptionStruct {
897    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Option<T>> {
898        let present: i8 = Int8.decode(buf)?;
899        if present == 1 {
900            Ok(Some(T::decode(buf, self.version)?))
901        } else {
902            Ok(None)
903        }
904    }
905}
906
907/// An array whose length is encoded with an `i32`.
908#[derive(Debug, Copy, Clone)]
909pub struct Array<E>(pub E);
910
911impl<T, E: for<'a> Encoder<&'a T>> Encoder<Option<&[T]>> for Array<E> {
912    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&[T]>) -> Result<()> {
913        if let Some(a) = value {
914            if a.len() > i32::MAX as usize {
915                bail!("Array is too long to encode ({} items)", a.len());
916            } else {
917                Int32.encode(buf, a.len() as i32)?;
918                for item in a {
919                    self.0.encode(buf, item)?;
920                }
921                Ok(())
922            }
923        } else {
924            Int32.encode(buf, -1)?;
925            Ok(())
926        }
927    }
928    fn compute_size(&self, value: Option<&[T]>) -> Result<usize> {
929        if let Some(a) = value {
930            if a.len() > i32::MAX as usize {
931                bail!("Array is too long to encode ({} items)", a.len());
932            } else if let Some(fixed_size) = self.0.fixed_size() {
933                Ok(4 + a.len() * fixed_size)
934            } else {
935                let mut total_size = 4;
936                for item in a {
937                    total_size += self.0.compute_size(item)?;
938                }
939                Ok(total_size)
940            }
941        } else {
942            Ok(4)
943        }
944    }
945}
946
947impl<T, E: for<'a> Encoder<&'a T>> Encoder<Option<&Vec<T>>> for Array<E> {
948    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&Vec<T>>) -> Result<()> {
949        self.encode(buf, value.map(|s| s.as_slice()))
950    }
951    fn compute_size(&self, value: Option<&Vec<T>>) -> Result<usize> {
952        self.compute_size(value.map(|s| s.as_slice()))
953    }
954}
955
956impl<T, E: for<'a> Encoder<&'a T>> Encoder<&Option<Vec<T>>> for Array<E> {
957    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Option<Vec<T>>) -> Result<()> {
958        self.encode(buf, value.as_ref())
959    }
960    fn compute_size(&self, value: &Option<Vec<T>>) -> Result<usize> {
961        self.compute_size(value.as_ref())
962    }
963}
964
965impl<T, E: for<'a> Encoder<&'a T>> Encoder<&[T]> for Array<E> {
966    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &[T]) -> Result<()> {
967        self.encode(buf, Some(value))
968    }
969    fn compute_size(&self, value: &[T]) -> Result<usize> {
970        self.compute_size(Some(value))
971    }
972}
973
974impl<T, E: for<'a> Encoder<&'a T>> Encoder<&Vec<T>> for Array<E> {
975    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Vec<T>) -> Result<()> {
976        self.encode(buf, Some(value))
977    }
978    fn compute_size(&self, value: &Vec<T>) -> Result<usize> {
979        self.compute_size(Some(value))
980    }
981}
982
983impl<T, E: Decoder<T>> Decoder<Option<Vec<T>>> for Array<E> {
984    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Option<Vec<T>>> {
985        match Int32.decode(buf)? {
986            -1 => Ok(None),
987            n if n >= 0 => {
988                let mut result = Vec::with_capacity(n as usize);
989                for _ in 0..n {
990                    result.push(self.0.decode(buf)?);
991                }
992                Ok(Some(result))
993            }
994            n => {
995                bail!("Array length is negative ({})", n);
996            }
997        }
998    }
999}
1000
1001impl<T, E: Decoder<T>> Decoder<Vec<T>> for Array<E> {
1002    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Vec<T>> {
1003        match self.decode(buf) {
1004            Ok(None) => {
1005                bail!("Array length is negative (-1)");
1006            }
1007            Ok(Some(s)) => Ok(s),
1008            Err(e) => Err(e),
1009        }
1010    }
1011}
1012
1013/// An array whose length is encoded with a varint.
1014#[derive(Debug, Copy, Clone)]
1015pub struct CompactArray<E>(pub E);
1016
1017impl<T, E: for<'a> Encoder<&'a T>> Encoder<Option<&[T]>> for CompactArray<E> {
1018    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&[T]>) -> Result<()> {
1019        if let Some(a) = value {
1020            // Use >= because we're going to add one to the length
1021            if a.len() >= u32::MAX as usize {
1022                bail!("CompactArray is too long to encode ({} items)", a.len());
1023            } else {
1024                UnsignedVarInt.encode(buf, (a.len() as u32) + 1)?;
1025                for item in a {
1026                    self.0.encode(buf, item)?;
1027                }
1028                Ok(())
1029            }
1030        } else {
1031            UnsignedVarInt.encode(buf, 0)?;
1032            Ok(())
1033        }
1034    }
1035    fn compute_size(&self, value: Option<&[T]>) -> Result<usize> {
1036        if let Some(a) = value {
1037            // Use >= because we're going to add one to the length
1038            if a.len() >= u32::MAX as usize {
1039                bail!("CompactArray is too long to encode ({} items)", a.len());
1040            } else if let Some(fixed_size) = self.0.fixed_size() {
1041                Ok(UnsignedVarInt.compute_size((a.len() as u32) + 1)? + a.len() * fixed_size)
1042            } else {
1043                let mut total_size = UnsignedVarInt.compute_size((a.len() as u32) + 1)?;
1044                for item in a {
1045                    total_size += self.0.compute_size(item)?;
1046                }
1047                Ok(total_size)
1048            }
1049        } else {
1050            Ok(1)
1051        }
1052    }
1053}
1054
1055impl<T, E: for<'a> Encoder<&'a T>> Encoder<Option<&Vec<T>>> for CompactArray<E> {
1056    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: Option<&Vec<T>>) -> Result<()> {
1057        self.encode(buf, value.map(|s| s.as_slice()))
1058    }
1059    fn compute_size(&self, value: Option<&Vec<T>>) -> Result<usize> {
1060        self.compute_size(value.map(|s| s.as_slice()))
1061    }
1062}
1063
1064impl<T, E: for<'a> Encoder<&'a T>> Encoder<&Option<Vec<T>>> for CompactArray<E> {
1065    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Option<Vec<T>>) -> Result<()> {
1066        self.encode(buf, value.as_ref())
1067    }
1068    fn compute_size(&self, value: &Option<Vec<T>>) -> Result<usize> {
1069        self.compute_size(value.as_ref())
1070    }
1071}
1072
1073impl<T, E: for<'a> Encoder<&'a T>> Encoder<&[T]> for CompactArray<E> {
1074    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &[T]) -> Result<()> {
1075        self.encode(buf, Some(value))
1076    }
1077    fn compute_size(&self, value: &[T]) -> Result<usize> {
1078        self.compute_size(Some(value))
1079    }
1080}
1081
1082impl<T, E: for<'a> Encoder<&'a T>> Encoder<&Vec<T>> for CompactArray<E> {
1083    fn encode<B: ByteBufMut>(&self, buf: &mut B, value: &Vec<T>) -> Result<()> {
1084        self.encode(buf, Some(value))
1085    }
1086    fn compute_size(&self, value: &Vec<T>) -> Result<usize> {
1087        self.compute_size(Some(value))
1088    }
1089}
1090
1091impl<T, E: Decoder<T>> Decoder<Option<Vec<T>>> for CompactArray<E> {
1092    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Option<Vec<T>>> {
1093        match UnsignedVarInt.decode(buf)? {
1094            0 => Ok(None),
1095            n => {
1096                let mut result = Vec::with_capacity((n - 1) as usize);
1097                for _ in 1..n {
1098                    result.push(self.0.decode(buf)?);
1099                }
1100                Ok(Some(result))
1101            }
1102        }
1103    }
1104}
1105
1106impl<T, E: Decoder<T>> Decoder<Vec<T>> for CompactArray<E> {
1107    fn decode<B: ByteBuf>(&self, buf: &mut B) -> Result<Vec<T>> {
1108        match self.decode(buf) {
1109            Ok(None) => {
1110                bail!("CompactArray length is negative (-1)");
1111            }
1112            Ok(Some(s)) => Ok(s),
1113            Err(e) => Err(e),
1114        }
1115    }
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120    use super::*;
1121
1122    use std::fmt::Debug;
1123
1124    fn test_encoder_decoder<V: PartialEq + Debug, E: for<'a> Encoder<&'a V> + Decoder<V>>(
1125        encoder: E,
1126        value: V,
1127        mut expected: &[u8],
1128    ) {
1129        let mut buf = vec![];
1130        encoder.encode(&mut buf, &value).unwrap();
1131        assert_eq!(buf, expected);
1132
1133        let decoded = encoder.decode(&mut expected).unwrap();
1134        assert_eq!(value, decoded);
1135    }
1136
1137    #[test]
1138    fn smoke_varint_encoder_decoder() {
1139        test_encoder_decoder(VarInt, 0, &[0]);
1140        test_encoder_decoder(VarInt, -1, &[1]);
1141        test_encoder_decoder(VarInt, 1, &[2]);
1142        test_encoder_decoder(VarInt, -2, &[3]);
1143        test_encoder_decoder(VarInt, 300, &[216, 4]);
1144        test_encoder_decoder(VarInt, i32::MAX, &[254, 255, 255, 255, 15]);
1145        test_encoder_decoder(VarInt, i32::MIN, &[255, 255, 255, 255, 15]);
1146    }
1147
1148    #[test]
1149    fn smoke_varlong_encoder_decoder() {
1150        test_encoder_decoder(VarLong, 0, &[0]);
1151        test_encoder_decoder(VarLong, -1, &[1]);
1152        test_encoder_decoder(VarLong, 1, &[2]);
1153        test_encoder_decoder(VarLong, -2, &[3]);
1154        test_encoder_decoder(VarLong, 300, &[216, 4]);
1155        test_encoder_decoder(
1156            VarLong,
1157            i64::MAX,
1158            &[254, 255, 255, 255, 255, 255, 255, 255, 255, 1],
1159        );
1160        test_encoder_decoder(
1161            VarLong,
1162            i64::MIN,
1163            &[255, 255, 255, 255, 255, 255, 255, 255, 255, 1],
1164        );
1165    }
1166
1167    #[test]
1168    fn smoke_string_encoder_decoder() {
1169        test_encoder_decoder(
1170            String,
1171            std::string::String::from("hello"),
1172            &[0, 5, 104, 101, 108, 108, 111],
1173        );
1174        test_encoder_decoder(String, None::<std::string::String>, &[255, 255]);
1175    }
1176
1177    #[test]
1178    fn smoke_compact_string_encoder_decoder() {
1179        test_encoder_decoder(
1180            CompactString,
1181            std::string::String::from("hello"),
1182            &[6, 104, 101, 108, 108, 111],
1183        );
1184        test_encoder_decoder(CompactString, None::<std::string::String>, &[0]);
1185    }
1186
1187    #[test]
1188    fn smoke_bytes_encoder_decoder() {
1189        test_encoder_decoder(Bytes, vec![1, 2, 3, 4], &[0, 0, 0, 4, 1, 2, 3, 4]);
1190        test_encoder_decoder(Bytes, None::<Vec<u8>>, &[255, 255, 255, 255]);
1191    }
1192
1193    #[test]
1194    fn smoke_compact_bytes_encoder_decoder() {
1195        test_encoder_decoder(CompactBytes, vec![1, 2, 3, 4], &[5, 1, 2, 3, 4]);
1196        test_encoder_decoder(CompactBytes, None::<Vec<u8>>, &[0]);
1197    }
1198}