fluvio_protocol/core/
bytebuf.rs

1use std::io::Error;
2use std::io::ErrorKind;
3use std::ops::Deref;
4
5use bytes::{Buf, Bytes, BufMut};
6
7use crate::{Encoder, Decoder, Version};
8
9/// Bytes Buffer with an optimized implementation for encoding and decoding.
10/// Useful for handling an immutable set of bytes.
11#[derive(Clone, Debug, Default, PartialEq, Eq)]
12pub struct ByteBuf {
13    inner: Bytes,
14}
15
16impl From<Bytes> for ByteBuf {
17    fn from(bytes: Bytes) -> Self {
18        ByteBuf { inner: bytes }
19    }
20}
21
22impl From<Vec<u8>> for ByteBuf {
23    fn from(bytes: Vec<u8>) -> Self {
24        ByteBuf {
25            inner: Bytes::from_iter(bytes),
26        }
27    }
28}
29
30impl From<ByteBuf> for Vec<u8> {
31    fn from(bytebuf: ByteBuf) -> Self {
32        bytebuf.inner.to_vec()
33    }
34}
35
36impl Deref for ByteBuf {
37    type Target = Bytes;
38
39    fn deref(&self) -> &Self::Target {
40        &self.inner
41    }
42}
43
44impl Decoder for ByteBuf {
45    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
46    where
47        T: Buf,
48    {
49        let mut len: u32 = 0;
50        len.decode(src, version)?;
51
52        if len < 1 {
53            return Ok(());
54        }
55
56        self.inner = src.copy_to_bytes(len as usize);
57
58        Ok(())
59    }
60}
61
62impl Encoder for ByteBuf {
63    fn write_size(&self, version: Version) -> usize {
64        0_u32.write_size(version) + self.inner.len()
65    }
66
67    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
68    where
69        T: BufMut,
70    {
71        let remaining = dest.remaining_mut();
72        let expected = self.write_size(version);
73
74        if remaining < expected {
75            return Err(Error::new(
76                ErrorKind::UnexpectedEof,
77                format!(
78                    "Not enough capacity for ByteBuf. Expected: {expected}, Remaining: {remaining}"
79                ),
80            ));
81        }
82
83        dest.put_u32(self.inner.len() as u32);
84        dest.put(self.inner.clone());
85
86        Ok(())
87    }
88
89    fn as_bytes(&self, _version: Version) -> Result<Bytes, Error> {
90        Ok(self.inner.clone())
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use std::io::Cursor;
97
98    use crate::{Decoder, Encoder};
99    use super::ByteBuf;
100
101    #[test]
102    fn test_is_empty_by_default() {
103        let bytebuf = ByteBuf::default();
104
105        assert!(bytebuf.is_empty());
106        assert!(bytebuf.inner.is_empty());
107    }
108
109    #[test]
110    fn test_is_empty_from_inner_vec() {
111        let bytebuf = ByteBuf::from(vec![128, 129, 130, 131]);
112
113        assert_eq!(bytebuf.is_empty(), bytebuf.inner.is_empty());
114    }
115
116    #[test]
117    fn test_is_empty_from_inner_empty_vec() {
118        let bytebuf = ByteBuf::from(Vec::default());
119
120        assert_eq!(bytebuf.is_empty(), bytebuf.inner.is_empty());
121    }
122
123    #[test]
124    fn test_len_from_inner_vec() {
125        let bytebuf = ByteBuf::from(vec![128, 129, 130, 131]);
126
127        assert_eq!(bytebuf.len(), bytebuf.inner.len());
128    }
129
130    #[test]
131    fn test_encode_bytebuf() {
132        let mut dest = Vec::default();
133        let value: ByteBuf = ByteBuf::from(vec![0x10, 0x11]);
134        let result = value.encode(&mut dest, 0);
135
136        assert!(result.is_ok());
137
138        // Length + Contents
139        assert_eq!(dest.len(), 6);
140
141        // Length Bytes
142        assert_eq!(dest[0], 0x00);
143        assert_eq!(dest[1], 0x00);
144        assert_eq!(dest[2], 0x00);
145        assert_eq!(dest[3], 0x02);
146
147        // Actual Content
148        assert_eq!(dest[3], 0x02);
149        assert_eq!(dest[5], 0x11);
150
151        // Length in 32bits (4 bytes) + Contents (5 elements)
152        assert_eq!(value.write_size(0), dest.len());
153    }
154
155    #[test]
156    fn test_bytebuf_encodes_transparent_with_vecu8() {
157        let raw_data: [u8; 10] = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A];
158        let mut encoded_data: Vec<u8> = Vec::default();
159
160        Vec::<u8>::from(raw_data)
161            .encode(&mut encoded_data, 0)
162            .expect("Failed to encode Vec<u8>");
163
164        assert_eq!(encoded_data[3], 0x0A, "the length value doesnt match");
165
166        let mut bytebuf_encoded: Vec<u8> = Vec::default();
167        let encode_bytebuf_res = ByteBuf::from(Vec::from(raw_data)).encode(&mut bytebuf_encoded, 0);
168
169        assert!(encode_bytebuf_res.is_ok());
170        assert_eq!(
171            bytebuf_encoded,
172            encoded_data.as_slice(),
173            "encoded version doesn't match with expected"
174        );
175        assert_eq!(
176            bytebuf_encoded,
177            encoded_data.as_slice(),
178            "encoded version doesn't match with expected"
179        );
180        assert_eq!(bytebuf_encoded[3], 0x0A, "the length value doesnt match");
181    }
182
183    #[test]
184    fn test_bytebuf_decode_transparent_with_vecu8() {
185        let raw_data: [u8; 10] = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A];
186
187        let mut encoded_data: Vec<u8> = Vec::default();
188        Vec::<u8>::from(raw_data)
189            .encode(&mut encoded_data, 0)
190            .expect("Failed to encode Vec<u8>");
191
192        let mut bytebuf_encoded = Vec::default();
193        ByteBuf::from(Vec::from(raw_data))
194            .encode(&mut bytebuf_encoded, 0)
195            .expect("Failed to encode ByteBuf");
196
197        assert_eq!(
198            bytebuf_encoded,
199            encoded_data.as_slice(),
200            "the encoded output doesn't match with the expected for Vec<u8>"
201        );
202
203        let mut decoded_vecu8: Vec<u8> = Vec::default();
204        decoded_vecu8
205            .decode(&mut Cursor::new(&encoded_data.clone()), 0)
206            .expect("Failed to decode Vec<u8>");
207
208        assert_eq!(decoded_vecu8.len(), 10);
209
210        let mut decoded_bytebuf: ByteBuf = ByteBuf::default();
211        decoded_bytebuf
212            .decode(&mut Cursor::new(&encoded_data.clone()), 0)
213            .expect("Failed to decode ByteBuf");
214
215        assert_eq!(decoded_bytebuf.inner.len(), 10);
216        assert_eq!(decoded_bytebuf.inner, decoded_vecu8);
217    }
218}