kafka_protocol/protocol/
buf.rs

1//! Utilities for working with the [`bytes`] crate.
2use std::io::Cursor;
3use std::mem::size_of;
4use std::ops::Range;
5
6use bytes::{Buf, BufMut, Bytes, BytesMut};
7use std::error::Error;
8use std::fmt::{Display, Formatter};
9
10/// Error indicating there are not enough remaining bytes in a buffer to perform a read.
11#[derive(Debug)]
12pub struct NotEnoughBytesError;
13
14impl Display for NotEnoughBytesError {
15    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
16        writeln!(f, "Not enough bytes remaining in buffer!")
17    }
18}
19
20impl Error for NotEnoughBytesError {}
21
22/// Extension for working with [`bytes::Buf`].
23pub trait ByteBuf: Buf {
24    /// Peek ahead in the buffer by the provided range.
25    fn peek_bytes(&mut self, r: Range<usize>) -> Bytes;
26    /// Get `size` bytes from the underlying buffer.
27    fn get_bytes(&mut self, size: usize) -> Bytes;
28    /// Try to peek ahead in the buffer by the provided range, returning an error if there are less
29    /// bytes than the requested range.
30    fn try_peek_bytes(&mut self, r: Range<usize>) -> Result<Bytes, NotEnoughBytesError> {
31        if self.remaining() < r.end {
32            Err(NotEnoughBytesError)
33        } else {
34            Ok(self.peek_bytes(r))
35        }
36    }
37    /// Try to get `size` bytes from the buffer, returning an error if there are less bytes than the
38    /// requested number.
39    fn try_get_bytes(&mut self, size: usize) -> Result<Bytes, NotEnoughBytesError> {
40        if self.remaining() < size {
41            Err(NotEnoughBytesError)
42        } else {
43            Ok(self.get_bytes(size))
44        }
45    }
46    /// Attempt to copy from buffer into destination slice, returning an error if not enough space
47    /// remains.
48    fn try_copy_to_slice(&mut self, dst: &mut [u8]) -> Result<(), NotEnoughBytesError> {
49        if self.remaining() < dst.len() {
50            Err(NotEnoughBytesError)
51        } else {
52            self.copy_to_slice(dst);
53            Ok(())
54        }
55    }
56    /// Attempt to read a `u8` from the buffer, returning an error if not enough space remains.
57    fn try_get_u8(&mut self) -> Result<u8, NotEnoughBytesError> {
58        if self.remaining() < size_of::<u8>() {
59            Err(NotEnoughBytesError)
60        } else {
61            Ok(self.get_u8())
62        }
63    }
64    /// Attempt to read a `u16` from the buffer, returning an error if not enough space remains.
65    fn try_get_u16(&mut self) -> Result<u16, NotEnoughBytesError> {
66        if self.remaining() < size_of::<u16>() {
67            Err(NotEnoughBytesError)
68        } else {
69            Ok(self.get_u16())
70        }
71    }
72    /// Attempt to read a `u32` from the buffer, returning an error if not enough space remains.
73    fn try_get_u32(&mut self) -> Result<u32, NotEnoughBytesError> {
74        if self.remaining() < size_of::<u32>() {
75            Err(NotEnoughBytesError)
76        } else {
77            Ok(self.get_u32())
78        }
79    }
80    /// Attempt to read a `i8` from the buffer, returning an error if not enough space remains.
81    fn try_get_i8(&mut self) -> Result<i8, NotEnoughBytesError> {
82        if self.remaining() < size_of::<i8>() {
83            Err(NotEnoughBytesError)
84        } else {
85            Ok(self.get_i8())
86        }
87    }
88    /// Attempt to read a `i16` from the buffer, returning an error if not enough space remains.
89    fn try_get_i16(&mut self) -> Result<i16, NotEnoughBytesError> {
90        if self.remaining() < size_of::<i16>() {
91            Err(NotEnoughBytesError)
92        } else {
93            Ok(self.get_i16())
94        }
95    }
96    /// Attempt to read a `i32` from the buffer, returning an error if not enough space remains.
97    fn try_get_i32(&mut self) -> Result<i32, NotEnoughBytesError> {
98        if self.remaining() < size_of::<i32>() {
99            Err(NotEnoughBytesError)
100        } else {
101            Ok(self.get_i32())
102        }
103    }
104    /// Attempt to read a `i64` from the buffer, returning an error if not enough space remains.
105    fn try_get_i64(&mut self) -> Result<i64, NotEnoughBytesError> {
106        if self.remaining() < size_of::<i64>() {
107            Err(NotEnoughBytesError)
108        } else {
109            Ok(self.get_i64())
110        }
111    }
112    /// Attempt to read a `f32` from the buffer, returning an error if not enough space remains.
113    fn try_get_f64(&mut self) -> Result<f64, NotEnoughBytesError> {
114        if self.remaining() < size_of::<f64>() {
115            Err(NotEnoughBytesError)
116        } else {
117            Ok(self.get_f64())
118        }
119    }
120}
121
122impl ByteBuf for Bytes {
123    fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
124        self.slice(r)
125    }
126    fn get_bytes(&mut self, size: usize) -> Bytes {
127        self.split_to(size)
128    }
129}
130
131impl ByteBuf for BytesMut {
132    fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
133        Bytes::copy_from_slice(&self[r])
134    }
135    fn get_bytes(&mut self, size: usize) -> Bytes {
136        self.split_to(size).freeze()
137    }
138}
139
140impl<T: ByteBuf> ByteBuf for &mut T {
141    fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
142        (**self).peek_bytes(r)
143    }
144    fn get_bytes(&mut self, size: usize) -> Bytes {
145        (**self).get_bytes(size)
146    }
147    fn try_peek_bytes(&mut self, r: Range<usize>) -> Result<Bytes, NotEnoughBytesError> {
148        (**self).try_peek_bytes(r)
149    }
150    fn try_get_bytes(&mut self, size: usize) -> Result<Bytes, NotEnoughBytesError> {
151        (**self).try_get_bytes(size)
152    }
153}
154
155impl ByteBuf for &[u8] {
156    fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
157        Bytes::copy_from_slice(&self[r])
158    }
159    fn get_bytes(&mut self, size: usize) -> Bytes {
160        let (a, b) = self.split_at(size);
161        *self = b;
162        Bytes::copy_from_slice(a)
163    }
164}
165
166impl<T: AsRef<[u8]>> ByteBuf for Cursor<T> {
167    fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
168        Bytes::copy_from_slice(&self.get_ref().as_ref()[r])
169    }
170    fn get_bytes(&mut self, size: usize) -> Bytes {
171        let pos = self.position() as usize;
172        self.set_position((pos + size) as u64);
173        Bytes::copy_from_slice(&self.get_ref().as_ref()[pos..(pos + size)])
174    }
175}
176
177/// A gap of specified length at the specified offset.
178#[derive(Debug, Copy, Clone)]
179pub struct Gap {
180    offset: usize,
181    len: usize,
182}
183
184/// A type capable of being represented as a gap in a buffer.
185pub trait GapType {
186    /// The type of the gap.
187    type Value;
188    /// The size of the gap.
189    fn size(&self) -> usize;
190    /// Insert a value into the provided buffer.
191    fn put(&self, buf: &mut [u8], value: Self::Value);
192}
193
194/// A gap of type `T`.
195pub struct TypedGap<T> {
196    gap: Gap,
197    type_: T,
198}
199
200macro_rules! define_gap_types {
201    {$($n:ident => $f:ident($t:ty)),*$(,)*} => {
202        /// Types which implement `GapType`.
203        pub mod gap {
204            use super::*;
205            $(
206                #[derive(Copy, Clone, Debug)]
207                pub(crate) struct $n;
208
209                impl GapType for $n {
210                    type Value = $t;
211                    fn size(&self) -> usize {
212                        std::mem::size_of::<Self::Value>()
213                    }
214                    fn put(&self, mut buf: &mut [u8], value: Self::Value) {
215                        buf.$f(value);
216                    }
217                }
218            )*
219        }
220    };
221}
222
223define_gap_types! {
224    U8 => put_u8(u8),
225    I8 => put_i8(i8),
226    U16 => put_u16(u16),
227    U16Le => put_u16_le(u16),
228    I16 => put_i16(i16),
229    I16Le => put_i16_le(i16),
230    U32 => put_u32(u32),
231    U32Le => put_u32_le(u32),
232    I32 => put_i32(i32),
233    I32Le => put_i32_le(i32),
234    U64 => put_u64(u64),
235    U64Le => put_u64_le(u64),
236    I64 => put_i64(i64),
237    I64Le => put_i64_le(i64),
238    U128 => put_u128(u128),
239    U128Le => put_u128_le(u128),
240    I128 => put_i128(i128),
241    I128Le => put_i128_le(i128),
242    F32 => put_f32(f32),
243    F32Le => put_f32_le(f32),
244    F64 => put_f64(f64),
245    F64Le => put_f64_le(f64),
246}
247
248/// Extension for working with [`bytes::buf::BufMut`].
249pub trait ByteBufMut: BufMut {
250    /// Get the current offset of the buffer.
251    fn offset(&self) -> usize;
252
253    /// Seek to the provided offset in the buffer.
254    fn seek(&mut self, offset: usize);
255
256    /// Read a range from the buffer.
257    fn range(&mut self, r: Range<usize>) -> &mut [u8];
258
259    /// Put a gap of `len` at the current buffer offset.
260    fn put_gap(&mut self, len: usize) -> Gap {
261        let res = Gap {
262            offset: self.offset(),
263            len,
264        };
265        self.seek(res.offset + len);
266        res
267    }
268
269    /// Read a gap from the buffer.
270    fn gap_buf(&mut self, gap: Gap) -> &mut [u8] {
271        self.range(gap.offset..(gap.offset + gap.len))
272    }
273
274    /// Put a typed gap of type `T` at the current buffer offset.
275    fn put_typed_gap<T: GapType>(&mut self, type_: T) -> TypedGap<T> {
276        TypedGap {
277            gap: self.put_gap(type_.size()),
278            type_,
279        }
280    }
281
282    /// Insert a value of the [`TypedGap`] type at the current buffer offset.
283    fn fill_typed_gap<T: GapType>(&mut self, gap: TypedGap<T>, value: T::Value) {
284        gap.type_.put(self.gap_buf(gap.gap), value);
285    }
286}
287
288impl ByteBufMut for BytesMut {
289    fn offset(&self) -> usize {
290        self.len()
291    }
292    fn seek(&mut self, offset: usize) {
293        self.resize(offset, 0);
294    }
295    fn range(&mut self, r: Range<usize>) -> &mut [u8] {
296        &mut self[r]
297    }
298}
299
300impl ByteBufMut for Vec<u8> {
301    fn offset(&self) -> usize {
302        self.len()
303    }
304    fn seek(&mut self, offset: usize) {
305        self.resize(offset, 0);
306    }
307    fn range(&mut self, r: Range<usize>) -> &mut [u8] {
308        &mut self[r]
309    }
310}
311
312impl<T: ByteBufMut> ByteBufMut for &mut T {
313    fn offset(&self) -> usize {
314        (**self).offset()
315    }
316    fn seek(&mut self, offset: usize) {
317        (**self).seek(offset)
318    }
319    fn range(&mut self, r: Range<usize>) -> &mut [u8] {
320        (**self).range(r)
321    }
322}