kafka_api/codec/
mod.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::error;
16use std::fmt::Display;
17use std::io;
18use std::mem::size_of;
19
20use byteorder::BigEndian;
21use byteorder::ReadBytesExt;
22use byteorder::WriteBytesExt;
23
24use crate::records::Header;
25use crate::records::Record;
26use crate::IoResult;
27
28pub(crate) fn err_codec_message<E>(message: E) -> io::Error
29where
30    E: Into<Box<dyn error::Error + Send + Sync>>,
31{
32    io::Error::new(io::ErrorKind::InvalidData, message)
33}
34
35pub(crate) fn err_decode_message_unsupported(version: i16, schemata: &str) -> io::Error {
36    err_codec_message(format!("failed to read version {version} of {schemata}"))
37}
38
39pub(crate) fn err_encode_message_unsupported(version: i16, schemata: &str) -> io::Error {
40    err_codec_message(format!("failed to write version {version} of {schemata}"))
41}
42
43pub(crate) fn err_decode_message_null(field: impl Display) -> io::Error {
44    err_codec_message(format!("non-nullable field {field} was serialized as null"))
45}
46
47pub(crate) fn err_encode_message_null(field: impl Display) -> io::Error {
48    err_codec_message(format!(
49        "non-nullable field {field} to be serialized as null"
50    ))
51}
52
53pub(crate) trait Decoder<T: Sized> {
54    fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<T>;
55}
56
57pub(crate) trait Encoder<T> {
58    fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: T) -> IoResult<()>;
59    fn calculate_size(&self, value: T) -> usize;
60}
61
62pub(crate) trait FixedSizeEncoder {
63    const SIZE: usize;
64}
65
66pub trait Decodable: Sized {
67    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self>;
68}
69
70pub trait Encodable: Sized {
71    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()>;
72    fn calculate_size(&self, version: i16) -> usize;
73}
74
75macro_rules! define_ints_codec {
76    ($name:ident, $ty:ty, $write:ident, $read:ident $(,)? $($endian:ident)?) => {
77        #[derive(Debug, Copy, Clone)]
78        pub(crate) struct $name;
79
80        impl Decoder<$ty> for $name {
81            fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<$ty> {
82                buf.$read$(::<$endian>)?()
83            }
84        }
85
86        impl Encoder<$ty> for $name {
87            fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: $ty) -> IoResult<()> {
88                buf.$write$(::<$endian>)?(value)
89            }
90
91            #[inline]
92            fn calculate_size(&self, _: $ty) -> usize {
93                Self::SIZE
94            }
95        }
96
97        impl Encoder<&$ty> for $name {
98            fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: &$ty) -> IoResult<()> {
99                self.encode(buf, *value)
100            }
101
102            #[inline]
103            fn calculate_size(&self, _: &$ty) -> usize {
104                Self::SIZE
105            }
106        }
107
108        impl FixedSizeEncoder for $name {
109            const SIZE: usize = size_of::<$ty>();
110        }
111    };
112}
113
114define_ints_codec!(Int8, i8, write_i8, read_i8);
115define_ints_codec!(Int16, i16, write_i16, read_i16, BigEndian);
116define_ints_codec!(Int32, i32, write_i32, read_i32, BigEndian);
117define_ints_codec!(Int64, i64, write_i64, read_i64, BigEndian);
118define_ints_codec!(UInt8, u8, write_u8, read_u8);
119define_ints_codec!(UInt16, u16, write_u16, read_u16, BigEndian);
120define_ints_codec!(UInt32, u32, write_u32, read_u32, BigEndian);
121define_ints_codec!(UInt64, u64, write_u64, read_u64, BigEndian);
122define_ints_codec!(Float32, f32, write_f32, read_f32, BigEndian);
123define_ints_codec!(Float64, f64, write_f64, read_f64, BigEndian);
124
125#[derive(Debug, Copy, Clone)]
126pub(crate) struct Bool;
127
128impl Decoder<bool> for Bool {
129    fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<bool> {
130        Ok(buf.read_u8()? != 0)
131    }
132}
133
134impl Encoder<bool> for Bool {
135    fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: bool) -> IoResult<()> {
136        buf.write_u8(if value { 1 } else { 0 })
137    }
138
139    #[inline]
140    fn calculate_size(&self, _: bool) -> usize {
141        Self::SIZE
142    }
143}
144
145impl FixedSizeEncoder for Bool {
146    const SIZE: usize = size_of::<bool>();
147}
148
149#[derive(Debug, Copy, Clone)]
150pub(crate) struct Uuid;
151
152impl Decoder<uuid::Uuid> for Uuid {
153    fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<uuid::Uuid> {
154        read_uuid(buf)
155    }
156}
157
158impl Encoder<uuid::Uuid> for Uuid {
159    fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: uuid::Uuid) -> IoResult<()> {
160        write_uuid(buf, value)
161    }
162
163    fn calculate_size(&self, _: uuid::Uuid) -> usize {
164        Self::SIZE
165    }
166}
167
168impl FixedSizeEncoder for Uuid {
169    const SIZE: usize = 16;
170}
171
172#[derive(Debug, Copy, Clone)]
173pub(crate) struct VarInt;
174
175impl Decoder<i32> for VarInt {
176    fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<i32> {
177        read_unsigned_varint(buf)
178    }
179}
180
181impl Encoder<i32> for VarInt {
182    fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: i32) -> IoResult<()> {
183        write_unsigned_varint(buf, value)
184    }
185
186    fn calculate_size(&self, value: i32) -> usize {
187        let mut res = 1;
188        let mut v = value;
189        while v >= 0x80 {
190            res += 1;
191            v >>= 7;
192        }
193        debug_assert!(res <= 5);
194        res
195    }
196}
197
198#[derive(Debug, Copy, Clone)]
199pub(crate) struct VarLong;
200
201impl Decoder<i64> for VarLong {
202    fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<i64> {
203        read_unsigned_varlong(buf)
204    }
205}
206
207impl Encoder<i64> for VarLong {
208    fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: i64) -> IoResult<()> {
209        write_unsigned_varlong(buf, value)
210    }
211
212    fn calculate_size(&self, value: i64) -> usize {
213        let mut res = 1;
214        let mut v = value;
215        while v >= 0x80 {
216            res += 1;
217            v >>= 7;
218        }
219        debug_assert!(res <= 10);
220        res
221    }
222}
223
224#[derive(Debug, Copy, Clone)]
225pub(crate) struct NullableString(pub bool /* flexible */);
226
227impl Decoder<Option<String>> for NullableString {
228    fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<Option<String>> {
229        let len = if self.0 {
230            VarInt.decode(buf)? - 1
231        } else {
232            Int16.decode(buf)? as i32
233        };
234        Ok(read_bytes(buf, len)?.map(|bs| String::from_utf8_lossy(&bs).into_owned()))
235    }
236}
237
238impl Encoder<Option<&str>> for NullableString {
239    fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: Option<&str>) -> IoResult<()> {
240        write_str(buf, value, self.0)
241    }
242
243    fn calculate_size(&self, value: Option<&str>) -> usize {
244        let bytes = value.map(|s| s.as_bytes());
245        let len = bytes.map(|bs| bs.len()).unwrap_or(0);
246        if self.0 {
247            VarInt.calculate_size(len as i32 + 1) + len
248        } else {
249            Int16::SIZE + len
250        }
251    }
252}
253
254impl Encoder<&str> for NullableString {
255    fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: &str) -> IoResult<()> {
256        self.encode(buf, Some(value))
257    }
258
259    fn calculate_size(&self, value: &str) -> usize {
260        self.calculate_size(Some(value))
261    }
262}
263
264#[derive(Debug, Copy, Clone)]
265pub(crate) struct NullableBytes(pub bool /* flexible */);
266
267impl Decoder<Option<Vec<u8>>> for NullableBytes {
268    fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<Option<Vec<u8>>> {
269        let len = if self.0 {
270            VarInt.decode(buf)? - 1
271        } else {
272            Int32.decode(buf)?
273        };
274        read_bytes(buf, len)
275    }
276}
277
278impl<T: AsRef<[u8]>> Encoder<Option<&T>> for NullableBytes {
279    fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: Option<&T>) -> IoResult<()> {
280        write_bytes(buf, value.map(|s| s.as_ref()), self.0)
281    }
282
283    fn calculate_size(&self, value: Option<&T>) -> usize {
284        let bytes = value.map(|s| s.as_ref());
285        let len = bytes.map(|bs| bs.len()).unwrap_or(0);
286        if self.0 {
287            VarInt.calculate_size(len as i32 + 1) + len
288        } else {
289            Int32::SIZE + len
290        }
291    }
292}
293
294impl<T: AsRef<[u8]>> Encoder<&T> for NullableBytes {
295    fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: &T) -> IoResult<()> {
296        self.encode(buf, Some(value))
297    }
298
299    fn calculate_size(&self, value: &T) -> usize {
300        self.calculate_size(Some(value))
301    }
302}
303
304#[derive(Debug, Copy, Clone)]
305pub(crate) struct Struct(pub i16 /* version */);
306
307impl<T: Decodable> Decoder<T> for Struct {
308    fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<T> {
309        T::read(buf, self.0)
310    }
311}
312
313impl<T: Encodable> Encoder<&T> for Struct {
314    fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: &T) -> IoResult<()> {
315        value.write(buf, self.0)
316    }
317
318    fn calculate_size(&self, value: &T) -> usize {
319        value.calculate_size(self.0)
320    }
321}
322
323#[derive(Debug, Copy, Clone)]
324pub(crate) struct NullableArray<E>(pub E, pub bool /* flexible */);
325
326impl<T, E: Decoder<T>> Decoder<Option<Vec<T>>> for NullableArray<E> {
327    fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<Option<Vec<T>>> {
328        let len = if self.1 {
329            VarInt.decode(buf)? - 1
330        } else {
331            Int32.decode(buf)?
332        };
333        match len {
334            -1 => Ok(None),
335            n if n >= 0 => {
336                let n = n as usize;
337                let mut result = Vec::with_capacity(n);
338                for _ in 0..n {
339                    result.push(self.0.decode(buf)?);
340                }
341                Ok(Some(result))
342            }
343            n => Err(err_codec_message(format!("invalid length: {n}"))),
344        }
345    }
346}
347
348impl<T, E: for<'a> Encoder<&'a T>> Encoder<Option<&[T]>> for NullableArray<E> {
349    fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: Option<&[T]>) -> IoResult<()> {
350        match value {
351            None => {
352                if self.1 {
353                    VarInt.encode(buf, 0)
354                } else {
355                    Int32.encode(buf, -1)
356                }
357            }
358            Some(s) => self.encode(buf, s),
359        }
360    }
361
362    fn calculate_size(&self, value: Option<&[T]>) -> usize {
363        match value {
364            None => {
365                if self.1 {
366                    VarInt.calculate_size(0)
367                } else {
368                    Int32::SIZE
369                }
370            }
371            Some(ns) => {
372                let mut res = 0;
373                res += if self.1 {
374                    VarInt.calculate_size(ns.len() as i32 + 1)
375                } else {
376                    Int32::SIZE
377                };
378                for n in ns {
379                    res += self.0.calculate_size(n);
380                }
381                res
382            }
383        }
384    }
385}
386
387impl<T, E: for<'a> Encoder<&'a T>> Encoder<&[T]> for NullableArray<E> {
388    fn encode<B: WriteBytesExt>(&self, buf: &mut B, value: &[T]) -> IoResult<()> {
389        if self.1 {
390            VarInt.encode(buf, value.len() as i32 + 1)?;
391        } else {
392            Int32.encode(buf, value.len() as i32)?;
393        }
394        for v in value {
395            self.0.encode(buf, v)?;
396        }
397        Ok(())
398    }
399
400    fn calculate_size(&self, value: &[T]) -> usize {
401        self.calculate_size(Some(value))
402    }
403}
404
405#[derive(Debug, Copy, Clone)]
406pub(crate) struct RawTaggedFieldList;
407
408impl Decoder<Vec<RawTaggedField>> for RawTaggedFieldList {
409    fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<Vec<RawTaggedField>> {
410        RawTaggedFieldList.decode_with(buf, |_, _, _| Ok(false))
411    }
412}
413
414impl Encoder<&[RawTaggedField]> for RawTaggedFieldList {
415    fn encode<B: WriteBytesExt>(&self, buf: &mut B, fields: &[RawTaggedField]) -> IoResult<()> {
416        self.encode_with(buf, 0, fields, |_| Ok(()))
417    }
418
419    fn calculate_size(&self, fields: &[RawTaggedField]) -> usize {
420        self.calculate_size_with(0, 0, fields)
421    }
422}
423
424impl RawTaggedFieldList {
425    pub(crate) fn decode_with<B: ReadBytesExt, F>(
426        &self,
427        buf: &mut B,
428        mut f: F,
429    ) -> IoResult<Vec<RawTaggedField>>
430    where
431        F: FnMut(&mut B, i32, usize) -> IoResult<bool>,
432    {
433        let n = VarInt.decode(buf)?;
434        let mut res = vec![];
435        for _ in 0..n {
436            let tag = VarInt.decode(buf)?;
437            let size = VarInt.decode(buf)? as usize;
438            let consumed = f(buf, tag, size)?;
439            if !consumed {
440                match read_bytes(buf, size as i32)? {
441                    None => return Err(err_codec_message("unexpected null data")),
442                    Some(data) => res.push(RawTaggedField { tag, data }),
443                }
444            }
445        }
446        Ok(res)
447    }
448
449    pub(crate) fn encode_with<B: WriteBytesExt, F>(
450        &self,
451        buf: &mut B,
452        n: usize, // extra fields
453        fields: &[RawTaggedField],
454        mut f: F,
455    ) -> IoResult<()>
456    where
457        F: FnMut(&mut B) -> IoResult<()>,
458    {
459        VarInt.encode(buf, (fields.len() + n) as i32)?;
460        f(buf)?;
461        for field in fields {
462            RawTaggedFieldWriter.write_byte_buffer(buf, field.tag, &field.data)?;
463        }
464        Ok(())
465    }
466
467    pub(crate) fn calculate_size_with(
468        &self,
469        n: usize,  // extra fields
470        bs: usize, // extra bytes
471        fields: &[RawTaggedField],
472    ) -> usize {
473        let mut res = 0;
474        res += VarInt.calculate_size((fields.len() + n) as i32);
475        for field in fields {
476            res += VarInt.calculate_size(field.tag);
477            res += VarInt.calculate_size(field.data.len() as i32);
478            res += field.data.len();
479        }
480        res + bs
481    }
482}
483
484#[derive(Debug, Default, Clone)]
485pub struct RawTaggedField {
486    pub tag: i32,
487    pub data: Vec<u8>,
488}
489
490#[derive(Debug, Copy, Clone)]
491pub(crate) struct RawTaggedFieldWriter;
492
493impl RawTaggedFieldWriter {
494    pub(crate) fn write_field<
495        B: WriteBytesExt,
496        T: Copy, // primitive or reference
497        E: Encoder<T>,
498    >(
499        &self,
500        buf: &mut B,
501        tag: i32,
502        encoder: E,
503        value: T,
504    ) -> IoResult<()> {
505        VarInt.encode(buf, tag)?;
506        VarInt.encode(buf, encoder.calculate_size(value) as i32)?;
507        encoder.encode(buf, value)?;
508        Ok(())
509    }
510
511    pub(crate) fn calculate_field_size<T, E: Encoder<T>>(
512        &self,
513        tag: i32,
514        encoder: E,
515        value: T,
516    ) -> usize {
517        let size = encoder.calculate_size(value);
518        let mut res = 0;
519        res += VarInt.calculate_size(tag);
520        res += VarInt.calculate_size(size as i32);
521        res + size
522    }
523
524    fn write_byte_buffer<B: WriteBytesExt>(
525        &self,
526        buf: &mut B,
527        tag: i32,
528        bs: &[u8],
529    ) -> IoResult<()> {
530        VarInt.encode(buf, tag)?;
531        VarInt.encode(buf, bs.len() as i32)?;
532        buf.write_all(bs)?;
533        Ok(())
534    }
535}
536
537fn read_uuid<B: ReadBytesExt>(buf: &mut B) -> IoResult<uuid::Uuid> {
538    let msb = buf.read_u64::<BigEndian>()?;
539    let lsb = buf.read_u64::<BigEndian>()?;
540    Ok(uuid::Uuid::from_u64_pair(msb, lsb))
541}
542
543fn write_uuid<B: WriteBytesExt>(buf: &mut B, n: uuid::Uuid) -> IoResult<()> {
544    buf.write_all(n.as_ref())
545}
546
547fn read_unsigned_varint<B: ReadBytesExt>(buf: &mut B) -> IoResult<i32> {
548    let mut res = 0;
549    for i in 0.. {
550        debug_assert!(i < 5); // no larger than i32
551        let next = buf.read_u8()? as i32;
552        res |= (next & 0x7F) << (i * 7);
553        if next < 0x80 {
554            break;
555        }
556    }
557    Ok(res)
558}
559
560fn read_unsigned_varlong<B: ReadBytesExt>(buf: &mut B) -> IoResult<i64> {
561    let mut res = 0;
562    for i in 0.. {
563        debug_assert!(i < 10); // no larger than i64
564        let next = buf.read_u8()? as i64;
565        res |= (next & 0x7F) << (i * 7);
566        if next < 0x80 {
567            break;
568        }
569    }
570    Ok(res)
571}
572
573fn varint_zigzag(i: i32) -> i32 {
574    (((i as u32) >> 1) as i32) ^ -(i & 1)
575}
576
577fn varlong_zigzag(i: i64) -> i64 {
578    (((i as u64) >> 1) as i64) ^ -(i & 1)
579}
580
581fn read_varint<B: ReadBytesExt>(buf: &mut B) -> IoResult<i32> {
582    read_unsigned_varint(buf).map(varint_zigzag)
583}
584
585fn read_varlong<B: ReadBytesExt>(buf: &mut B) -> IoResult<i64> {
586    read_unsigned_varlong(buf).map(varlong_zigzag)
587}
588
589fn write_unsigned_varint<B: WriteBytesExt>(buf: &mut B, n: i32) -> IoResult<()> {
590    let mut v = n;
591    while v >= 0x80 {
592        buf.write_u8((v as u8) | 0x80)?;
593        v >>= 7;
594    }
595    buf.write_u8(v as u8)
596}
597
598fn write_unsigned_varlong<B: WriteBytesExt>(buf: &mut B, n: i64) -> IoResult<()> {
599    let mut v = n;
600    while v >= 0x80 {
601        buf.write_u8((v as u8) | 0x80)?;
602        v >>= 7;
603    }
604    buf.write_u8(v as u8)
605}
606
607fn read_bytes<B: ReadBytesExt>(buf: &mut B, len: i32) -> IoResult<Option<Vec<u8>>> {
608    match len {
609        -1 => Ok(None),
610        n if n >= 0 => {
611            let n = n as usize;
612            let mut v = vec![0; n];
613            buf.read_exact(&mut v).map_err(|e| {
614                io::Error::new(
615                    io::ErrorKind::InvalidData,
616                    format!("failed to read {n} bytes: {e}"),
617                )
618            })?;
619            Ok(Some(v))
620        }
621        n => Err(io::Error::new(
622            io::ErrorKind::InvalidData,
623            format!("invalid length: {n}"),
624        )),
625    }
626}
627
628fn write_str<B: WriteBytesExt>(buf: &mut B, str: Option<&str>, flexible: bool) -> IoResult<()> {
629    match str {
630        None => {
631            if flexible {
632                VarInt.encode(buf, 0)?
633            } else {
634                Int16.encode(buf, -1)?
635            }
636        }
637        Some(bs) => {
638            let bs = bs.as_bytes();
639            let len = bs.len();
640            if flexible {
641                VarInt.encode(buf, len as i32 + 1)?;
642            } else {
643                Int16.encode(buf, len as i16)?;
644            }
645            buf.write_all(bs)?;
646        }
647    }
648    Ok(())
649}
650
651fn write_bytes<B: WriteBytesExt>(
652    buf: &mut B,
653    bytes: Option<&[u8]>,
654    flexible: bool,
655) -> IoResult<()> {
656    match bytes {
657        None => {
658            if flexible {
659                VarInt.encode(buf, 0)?
660            } else {
661                Int32.encode(buf, -1)?
662            }
663        }
664        Some(bs) => {
665            let len = bs.len() as i32;
666            if flexible {
667                VarInt.encode(buf, len + 1)?;
668            } else {
669                Int32.encode(buf, len)?;
670            }
671            buf.write_all(bs)?;
672        }
673    }
674    Ok(())
675}
676
677#[derive(Debug, Copy, Clone)]
678pub(crate) struct RecordList;
679
680impl Decoder<Vec<Record>> for RecordList {
681    fn decode<B: ReadBytesExt>(&self, buf: &mut B) -> IoResult<Vec<Record>> {
682        struct Entry(i32, Option<Vec<u8>>);
683        fn read_key_value<B: ReadBytesExt>(buf: &mut B) -> IoResult<(Entry, Entry)> {
684            let key = {
685                let len = read_varint(buf)?;
686                let payload = read_bytes(buf, len)?;
687                Entry(len, payload)
688            };
689
690            let value = {
691                let len = read_varint(buf)?;
692                let payload = read_bytes(buf, len)?;
693                Entry(len, payload)
694            };
695
696            Ok((key, value))
697        }
698
699        let cnt = Int32.decode(buf)?;
700        let mut records = vec![];
701        for _ in 0..cnt {
702            let mut record = Record {
703                len: read_varint(buf)?,
704                attributes: Int8.decode(buf)?,
705                timestamp_delta: read_varlong(buf)?,
706                offset_delta: read_varint(buf)?,
707                ..Default::default()
708            };
709
710            let (key, value) = read_key_value(buf)?;
711            record.key_len = key.0;
712            record.key = key.1;
713            record.value_len = value.0;
714            record.value = value.1;
715
716            let headers_cnt = read_varint(buf)?;
717            for _ in 0..headers_cnt {
718                let (key, value) = read_key_value(buf)?;
719                record.headers.push(Header {
720                    key_len: key.0,
721                    key: key.1,
722                    value_len: value.0,
723                    value: value.1,
724                });
725            }
726            records.push(record);
727        }
728        Ok(records)
729    }
730}