kafka_protocol/protocol/
buf.rs

1//! Utilities for working with the [`bytes`] crate.
2use std::io::Cursor;
3use std::ops::Range;
4
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6use std::error::Error;
7use std::fmt::{Display, Formatter};
8
9/// Error indicating there are not enough remaining bytes in a buffer to perform a read.
10#[derive(Debug)]
11pub struct NotEnoughBytesError;
12
13impl Display for NotEnoughBytesError {
14    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
15        writeln!(f, "Not enough bytes remaining in buffer!")
16    }
17}
18
19impl Error for NotEnoughBytesError {}
20
21/// Extension for working with [`bytes::Buf`].
22pub trait ByteBuf: Buf {
23    /// Peek ahead in the buffer by the provided range.
24    fn peek_bytes(&mut self, r: Range<usize>) -> Bytes;
25    /// Get `size` bytes from the underlying buffer.
26    fn get_bytes(&mut self, size: usize) -> Bytes;
27    /// Try to peek ahead in the buffer by the provided range, returning an error if there are less
28    /// bytes than the requested range.
29    fn try_peek_bytes(&mut self, r: Range<usize>) -> Result<Bytes, NotEnoughBytesError> {
30        if self.remaining() < r.end {
31            Err(NotEnoughBytesError)
32        } else {
33            Ok(self.peek_bytes(r))
34        }
35    }
36    /// Try to get `size` bytes from the buffer, returning an error if there are less bytes than the
37    /// requested number.
38    fn try_get_bytes(&mut self, size: usize) -> Result<Bytes, NotEnoughBytesError> {
39        if self.remaining() < size {
40            Err(NotEnoughBytesError)
41        } else {
42            Ok(self.get_bytes(size))
43        }
44    }
45}
46
47impl ByteBuf for Bytes {
48    fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
49        self.slice(r)
50    }
51    fn get_bytes(&mut self, size: usize) -> Bytes {
52        self.split_to(size)
53    }
54}
55
56impl ByteBuf for BytesMut {
57    fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
58        Bytes::copy_from_slice(&self[r])
59    }
60    fn get_bytes(&mut self, size: usize) -> Bytes {
61        self.split_to(size).freeze()
62    }
63}
64
65impl<T: ByteBuf> ByteBuf for &mut T {
66    fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
67        (**self).peek_bytes(r)
68    }
69    fn get_bytes(&mut self, size: usize) -> Bytes {
70        (**self).get_bytes(size)
71    }
72    fn try_peek_bytes(&mut self, r: Range<usize>) -> Result<Bytes, NotEnoughBytesError> {
73        (**self).try_peek_bytes(r)
74    }
75    fn try_get_bytes(&mut self, size: usize) -> Result<Bytes, NotEnoughBytesError> {
76        (**self).try_get_bytes(size)
77    }
78}
79
80impl ByteBuf for &[u8] {
81    fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
82        Bytes::copy_from_slice(&self[r])
83    }
84    fn get_bytes(&mut self, size: usize) -> Bytes {
85        let (a, b) = self.split_at(size);
86        *self = b;
87        Bytes::copy_from_slice(a)
88    }
89}
90
91impl<T: AsRef<[u8]>> ByteBuf for Cursor<T> {
92    fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
93        Bytes::copy_from_slice(&self.get_ref().as_ref()[r])
94    }
95    fn get_bytes(&mut self, size: usize) -> Bytes {
96        let pos = self.position() as usize;
97        self.set_position((pos + size) as u64);
98        Bytes::copy_from_slice(&self.get_ref().as_ref()[pos..(pos + size)])
99    }
100}
101
102/// A gap of specified length at the specified offset.
103#[derive(Debug, Copy, Clone)]
104pub struct Gap {
105    offset: usize,
106    len: usize,
107}
108
109/// A type capable of being represented as a gap in a buffer.
110pub trait GapType {
111    /// The type of the gap.
112    type Value;
113    /// The size of the gap.
114    fn size(&self) -> usize;
115    /// Insert a value into the provided buffer.
116    fn put(&self, buf: &mut [u8], value: Self::Value);
117}
118
119/// A gap of type `T`.
120pub struct TypedGap<T> {
121    gap: Gap,
122    type_: T,
123}
124
125macro_rules! define_gap_types {
126    {$($n:ident => $f:ident($t:ty)),*$(,)*} => {
127        /// Types which implement `GapType`.
128        pub mod gap {
129            use super::*;
130            $(
131                #[derive(Copy, Clone, Debug)]
132                pub(crate) struct $n;
133
134                impl GapType for $n {
135                    type Value = $t;
136                    fn size(&self) -> usize {
137                        std::mem::size_of::<Self::Value>()
138                    }
139                    fn put(&self, mut buf: &mut [u8], value: Self::Value) {
140                        buf.$f(value);
141                    }
142                }
143            )*
144        }
145    };
146}
147
148define_gap_types! {
149    U8 => put_u8(u8),
150    I8 => put_i8(i8),
151    U16 => put_u16(u16),
152    U16Le => put_u16_le(u16),
153    I16 => put_i16(i16),
154    I16Le => put_i16_le(i16),
155    U32 => put_u32(u32),
156    U32Le => put_u32_le(u32),
157    I32 => put_i32(i32),
158    I32Le => put_i32_le(i32),
159    U64 => put_u64(u64),
160    U64Le => put_u64_le(u64),
161    I64 => put_i64(i64),
162    I64Le => put_i64_le(i64),
163    U128 => put_u128(u128),
164    U128Le => put_u128_le(u128),
165    I128 => put_i128(i128),
166    I128Le => put_i128_le(i128),
167    F32 => put_f32(f32),
168    F32Le => put_f32_le(f32),
169    F64 => put_f64(f64),
170    F64Le => put_f64_le(f64),
171}
172
173/// Extension for working with [`bytes::buf::BufMut`].
174pub trait ByteBufMut: BufMut {
175    /// Get the current offset of the buffer.
176    fn offset(&self) -> usize;
177
178    /// Seek to the provided offset in the buffer.
179    fn seek(&mut self, offset: usize);
180
181    /// Read a range from the buffer.
182    fn range(&mut self, r: Range<usize>) -> &mut [u8];
183
184    /// Put a gap of `len` at the current buffer offset.
185    fn put_gap(&mut self, len: usize) -> Gap {
186        let res = Gap {
187            offset: self.offset(),
188            len,
189        };
190        self.seek(res.offset + len);
191        res
192    }
193
194    /// Read a gap from the buffer.
195    fn gap_buf(&mut self, gap: Gap) -> &mut [u8] {
196        self.range(gap.offset..(gap.offset + gap.len))
197    }
198
199    /// Put a typed gap of type `T` at the current buffer offset.
200    fn put_typed_gap<T: GapType>(&mut self, type_: T) -> TypedGap<T> {
201        TypedGap {
202            gap: self.put_gap(type_.size()),
203            type_,
204        }
205    }
206
207    /// Insert a value of the [`TypedGap`] type at the current buffer offset.
208    fn fill_typed_gap<T: GapType>(&mut self, gap: TypedGap<T>, value: T::Value) {
209        gap.type_.put(self.gap_buf(gap.gap), value);
210    }
211}
212
213impl ByteBufMut for BytesMut {
214    fn offset(&self) -> usize {
215        self.len()
216    }
217    fn seek(&mut self, offset: usize) {
218        self.resize(offset, 0);
219    }
220    fn range(&mut self, r: Range<usize>) -> &mut [u8] {
221        &mut self[r]
222    }
223}
224
225impl ByteBufMut for Vec<u8> {
226    fn offset(&self) -> usize {
227        self.len()
228    }
229    fn seek(&mut self, offset: usize) {
230        self.resize(offset, 0);
231    }
232    fn range(&mut self, r: Range<usize>) -> &mut [u8] {
233        &mut self[r]
234    }
235}
236
237impl<T: ByteBufMut> ByteBufMut for &mut T {
238    fn offset(&self) -> usize {
239        (**self).offset()
240    }
241    fn seek(&mut self, offset: usize) {
242        (**self).seek(offset)
243    }
244    fn range(&mut self, r: Range<usize>) -> &mut [u8] {
245        (**self).range(r)
246    }
247}