kafka_protocol/protocol/
buf.rs1use std::io::Cursor;
3use std::ops::Range;
4
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6use std::error::Error;
7use std::fmt::{Display, Formatter};
8
9#[derive(Debug)]
11pub struct NotEnoughBytesError;
12
13impl Display for NotEnoughBytesError {
14 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
15 writeln!(f, "Not enough bytes remaining in buffer!")
16 }
17}
18
19impl Error for NotEnoughBytesError {}
20
21pub trait ByteBuf: Buf {
23 fn peek_bytes(&mut self, r: Range<usize>) -> Bytes;
25 fn get_bytes(&mut self, size: usize) -> Bytes;
27 fn try_peek_bytes(&mut self, r: Range<usize>) -> Result<Bytes, NotEnoughBytesError> {
30 if self.remaining() < r.end {
31 Err(NotEnoughBytesError)
32 } else {
33 Ok(self.peek_bytes(r))
34 }
35 }
36 fn try_get_bytes(&mut self, size: usize) -> Result<Bytes, NotEnoughBytesError> {
39 if self.remaining() < size {
40 Err(NotEnoughBytesError)
41 } else {
42 Ok(self.get_bytes(size))
43 }
44 }
45}
46
47impl ByteBuf for Bytes {
48 fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
49 self.slice(r)
50 }
51 fn get_bytes(&mut self, size: usize) -> Bytes {
52 self.split_to(size)
53 }
54}
55
56impl ByteBuf for BytesMut {
57 fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
58 Bytes::copy_from_slice(&self[r])
59 }
60 fn get_bytes(&mut self, size: usize) -> Bytes {
61 self.split_to(size).freeze()
62 }
63}
64
65impl<T: ByteBuf> ByteBuf for &mut T {
66 fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
67 (**self).peek_bytes(r)
68 }
69 fn get_bytes(&mut self, size: usize) -> Bytes {
70 (**self).get_bytes(size)
71 }
72 fn try_peek_bytes(&mut self, r: Range<usize>) -> Result<Bytes, NotEnoughBytesError> {
73 (**self).try_peek_bytes(r)
74 }
75 fn try_get_bytes(&mut self, size: usize) -> Result<Bytes, NotEnoughBytesError> {
76 (**self).try_get_bytes(size)
77 }
78}
79
80impl ByteBuf for &[u8] {
81 fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
82 Bytes::copy_from_slice(&self[r])
83 }
84 fn get_bytes(&mut self, size: usize) -> Bytes {
85 let (a, b) = self.split_at(size);
86 *self = b;
87 Bytes::copy_from_slice(a)
88 }
89}
90
91impl<T: AsRef<[u8]>> ByteBuf for Cursor<T> {
92 fn peek_bytes(&mut self, r: Range<usize>) -> Bytes {
93 Bytes::copy_from_slice(&self.get_ref().as_ref()[r])
94 }
95 fn get_bytes(&mut self, size: usize) -> Bytes {
96 let pos = self.position() as usize;
97 self.set_position((pos + size) as u64);
98 Bytes::copy_from_slice(&self.get_ref().as_ref()[pos..(pos + size)])
99 }
100}
101
102#[derive(Debug, Copy, Clone)]
104pub struct Gap {
105 offset: usize,
106 len: usize,
107}
108
109pub trait GapType {
111 type Value;
113 fn size(&self) -> usize;
115 fn put(&self, buf: &mut [u8], value: Self::Value);
117}
118
119pub struct TypedGap<T> {
121 gap: Gap,
122 type_: T,
123}
124
125macro_rules! define_gap_types {
126 {$($n:ident => $f:ident($t:ty)),*$(,)*} => {
127 pub mod gap {
129 use super::*;
130 $(
131 #[derive(Copy, Clone, Debug)]
132 pub(crate) struct $n;
133
134 impl GapType for $n {
135 type Value = $t;
136 fn size(&self) -> usize {
137 std::mem::size_of::<Self::Value>()
138 }
139 fn put(&self, mut buf: &mut [u8], value: Self::Value) {
140 buf.$f(value);
141 }
142 }
143 )*
144 }
145 };
146}
147
148define_gap_types! {
149 U8 => put_u8(u8),
150 I8 => put_i8(i8),
151 U16 => put_u16(u16),
152 U16Le => put_u16_le(u16),
153 I16 => put_i16(i16),
154 I16Le => put_i16_le(i16),
155 U32 => put_u32(u32),
156 U32Le => put_u32_le(u32),
157 I32 => put_i32(i32),
158 I32Le => put_i32_le(i32),
159 U64 => put_u64(u64),
160 U64Le => put_u64_le(u64),
161 I64 => put_i64(i64),
162 I64Le => put_i64_le(i64),
163 U128 => put_u128(u128),
164 U128Le => put_u128_le(u128),
165 I128 => put_i128(i128),
166 I128Le => put_i128_le(i128),
167 F32 => put_f32(f32),
168 F32Le => put_f32_le(f32),
169 F64 => put_f64(f64),
170 F64Le => put_f64_le(f64),
171}
172
173pub trait ByteBufMut: BufMut {
175 fn offset(&self) -> usize;
177
178 fn seek(&mut self, offset: usize);
180
181 fn range(&mut self, r: Range<usize>) -> &mut [u8];
183
184 fn put_gap(&mut self, len: usize) -> Gap {
186 let res = Gap {
187 offset: self.offset(),
188 len,
189 };
190 self.seek(res.offset + len);
191 res
192 }
193
194 fn gap_buf(&mut self, gap: Gap) -> &mut [u8] {
196 self.range(gap.offset..(gap.offset + gap.len))
197 }
198
199 fn put_typed_gap<T: GapType>(&mut self, type_: T) -> TypedGap<T> {
201 TypedGap {
202 gap: self.put_gap(type_.size()),
203 type_,
204 }
205 }
206
207 fn fill_typed_gap<T: GapType>(&mut self, gap: TypedGap<T>, value: T::Value) {
209 gap.type_.put(self.gap_buf(gap.gap), value);
210 }
211}
212
213impl ByteBufMut for BytesMut {
214 fn offset(&self) -> usize {
215 self.len()
216 }
217 fn seek(&mut self, offset: usize) {
218 self.resize(offset, 0);
219 }
220 fn range(&mut self, r: Range<usize>) -> &mut [u8] {
221 &mut self[r]
222 }
223}
224
225impl ByteBufMut for Vec<u8> {
226 fn offset(&self) -> usize {
227 self.len()
228 }
229 fn seek(&mut self, offset: usize) {
230 self.resize(offset, 0);
231 }
232 fn range(&mut self, r: Range<usize>) -> &mut [u8] {
233 &mut self[r]
234 }
235}
236
237impl<T: ByteBufMut> ByteBufMut for &mut T {
238 fn offset(&self) -> usize {
239 (**self).offset()
240 }
241 fn seek(&mut self, offset: usize) {
242 (**self).seek(offset)
243 }
244 fn range(&mut self, r: Range<usize>) -> &mut [u8] {
245 (**self).range(r)
246 }
247}