crabka_protocol/primitives/
string_bytes.rs1use bytes::{Buf, BufMut, Bytes};
2
3use crate::ProtocolError;
4use crate::primitives::fixed::{get_i16, get_i32, put_i16, put_i32};
5use crate::primitives::varint::{get_uvarint, put_uvarint, uvarint_len};
6
7pub fn put_string<B: BufMut>(buf: &mut B, s: &str) {
11 let len = i16::try_from(s.len()).expect("string length must fit in i16");
12 put_i16(buf, len);
13 buf.put_slice(s.as_bytes());
14}
15
16pub fn put_nullable_string<B: BufMut>(buf: &mut B, s: Option<&str>) {
17 match s {
18 None => put_i16(buf, -1),
19 Some(s) => put_string(buf, s),
20 }
21}
22
23pub fn get_string_owned<B: Buf>(buf: &mut B) -> Result<String, ProtocolError> {
24 match get_nullable_string_owned(buf)? {
25 Some(s) => Ok(s),
26 None => Err(ProtocolError::InvalidValue("non-nullable STRING was null")),
27 }
28}
29
30pub fn get_nullable_string_owned<B: Buf>(buf: &mut B) -> Result<Option<String>, ProtocolError> {
31 let len = get_i16(buf)?;
32 if len < 0 {
33 return Ok(None);
34 }
35 #[allow(clippy::cast_sign_loss)]
36 let n = len as usize;
37 if buf.remaining() < n {
38 return Err(ProtocolError::UnexpectedEof {
39 needed: n - buf.remaining(),
40 });
41 }
42 let mut v = vec![0u8; n];
43 buf.copy_to_slice(&mut v);
44 let s = String::from_utf8(v).map_err(|e| ProtocolError::InvalidUtf8(e.utf8_error()))?;
45 Ok(Some(s))
46}
47
48#[must_use]
49pub fn string_len(s: &str) -> usize {
50 2 + s.len()
51}
52#[must_use]
53pub fn nullable_string_len(s: Option<&str>) -> usize {
54 2 + s.map_or(0, str::len)
55}
56
57pub fn put_compact_string<B: BufMut>(buf: &mut B, s: &str) {
61 let len = u32::try_from(s.len() + 1).expect("string length too large");
62 put_uvarint(buf, len);
63 buf.put_slice(s.as_bytes());
64}
65
66pub fn put_compact_nullable_string<B: BufMut>(buf: &mut B, s: Option<&str>) {
67 match s {
68 None => put_uvarint(buf, 0),
69 Some(s) => put_compact_string(buf, s),
70 }
71}
72
73pub fn get_compact_string_owned<B: Buf>(buf: &mut B) -> Result<String, ProtocolError> {
74 match get_compact_nullable_string_owned(buf)? {
75 Some(s) => Ok(s),
76 None => Err(ProtocolError::InvalidValue(
77 "non-nullable COMPACT_STRING was null",
78 )),
79 }
80}
81
82pub fn get_compact_nullable_string_owned<B: Buf>(
83 buf: &mut B,
84) -> Result<Option<String>, ProtocolError> {
85 let raw = get_uvarint(buf)?;
86 if raw == 0 {
87 return Ok(None);
88 }
89 let n = (raw - 1) as usize;
90 if buf.remaining() < n {
91 return Err(ProtocolError::UnexpectedEof {
92 needed: n - buf.remaining(),
93 });
94 }
95 let mut v = vec![0u8; n];
96 buf.copy_to_slice(&mut v);
97 let s = String::from_utf8(v).map_err(|e| ProtocolError::InvalidUtf8(e.utf8_error()))?;
98 Ok(Some(s))
99}
100
101#[must_use]
102pub fn compact_string_len(s: &str) -> usize {
103 uvarint_len(u32::try_from(s.len() + 1).unwrap()) + s.len()
104}
105#[must_use]
106pub fn compact_nullable_string_len(s: Option<&str>) -> usize {
107 match s {
108 None => uvarint_len(0),
109 Some(s) => compact_string_len(s),
110 }
111}
112
113pub fn put_bytes<B: BufMut>(buf: &mut B, b: &[u8]) {
118 let len = i32::try_from(b.len()).expect("bytes length must fit in i32");
119 put_i32(buf, len);
120 buf.put_slice(b);
121}
122
123pub fn put_nullable_bytes<B: BufMut>(buf: &mut B, b: Option<&[u8]>) {
124 match b {
125 None => put_i32(buf, -1),
126 Some(b) => put_bytes(buf, b),
127 }
128}
129
130pub fn get_bytes_owned<B: Buf>(buf: &mut B) -> Result<Bytes, ProtocolError> {
131 match get_nullable_bytes_owned(buf)? {
132 Some(b) => Ok(b),
133 None => Err(ProtocolError::InvalidValue("non-nullable BYTES was null")),
134 }
135}
136
137pub fn get_nullable_bytes_owned<B: Buf>(buf: &mut B) -> Result<Option<Bytes>, ProtocolError> {
138 let len = get_i32(buf)?;
139 if len < 0 {
140 return Ok(None);
141 }
142 #[allow(clippy::cast_sign_loss)]
143 let n = len as usize;
144 if buf.remaining() < n {
145 return Err(ProtocolError::UnexpectedEof {
146 needed: n - buf.remaining(),
147 });
148 }
149 let mut v = vec![0u8; n];
150 buf.copy_to_slice(&mut v);
151 Ok(Some(Bytes::from(v)))
152}
153
154#[must_use]
155pub fn bytes_len(b: &[u8]) -> usize {
156 4 + b.len()
157}
158#[must_use]
159pub fn nullable_bytes_len(b: Option<&[u8]>) -> usize {
160 4 + b.map_or(0, <[u8]>::len)
161}
162
163pub fn put_compact_bytes<B: BufMut>(buf: &mut B, b: &[u8]) {
164 let len = u32::try_from(b.len() + 1).expect("bytes length too large");
165 put_uvarint(buf, len);
166 buf.put_slice(b);
167}
168
169pub fn put_compact_nullable_bytes<B: BufMut>(buf: &mut B, b: Option<&[u8]>) {
170 match b {
171 None => put_uvarint(buf, 0),
172 Some(b) => put_compact_bytes(buf, b),
173 }
174}
175
176#[must_use]
177pub fn compact_bytes_len(b: &[u8]) -> usize {
178 uvarint_len(u32::try_from(b.len() + 1).unwrap()) + b.len()
179}
180
181#[must_use]
184pub fn compact_bytes_len_from_size(n: usize) -> usize {
185 uvarint_len(u32::try_from(n + 1).unwrap()) + n
186}
187#[must_use]
188pub fn compact_nullable_bytes_len(b: Option<&[u8]>) -> usize {
189 match b {
190 None => uvarint_len(0),
191 Some(b) => compact_bytes_len(b),
192 }
193}
194
195pub fn get_compact_bytes_owned<B: Buf>(buf: &mut B) -> Result<Bytes, ProtocolError> {
196 match get_compact_nullable_bytes_owned(buf)? {
197 Some(b) => Ok(b),
198 None => Err(ProtocolError::InvalidValue(
199 "non-nullable COMPACT_BYTES was null",
200 )),
201 }
202}
203
204pub fn get_compact_nullable_bytes_owned<B: Buf>(
205 buf: &mut B,
206) -> Result<Option<Bytes>, ProtocolError> {
207 let raw = get_uvarint(buf)?;
208 if raw == 0 {
209 return Ok(None);
210 }
211 let n = (raw - 1) as usize;
212 if buf.remaining() < n {
213 return Err(ProtocolError::UnexpectedEof {
214 needed: n - buf.remaining(),
215 });
216 }
217 let mut v = vec![0u8; n];
218 buf.copy_to_slice(&mut v);
219 Ok(Some(Bytes::from(v)))
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use assert2::assert;
226 use bytes::BytesMut;
227
228 #[test]
229 fn string_roundtrip() {
230 let mut buf = BytesMut::new();
231 put_string(&mut buf, "kafka");
232 assert!(&buf[..] == &[0x00, 0x05, b'k', b'a', b'f', b'k', b'a']);
234 let mut cur = &buf[..];
235 assert!(get_string_owned(&mut cur).unwrap() == "kafka");
236 }
237
238 #[test]
239 fn nullable_string_null() {
240 let mut buf = BytesMut::new();
241 put_nullable_string(&mut buf, None);
242 assert!(&buf[..] == &[0xFF, 0xFF]);
243 let mut cur = &buf[..];
244 assert!(get_nullable_string_owned(&mut cur).unwrap() == None);
245 }
246
247 #[test]
248 fn compact_string_roundtrip() {
249 let mut buf = BytesMut::new();
250 put_compact_string(&mut buf, "kafka");
251 assert!(&buf[..] == &[0x06, b'k', b'a', b'f', b'k', b'a']);
253 let mut cur = &buf[..];
254 assert!(get_compact_string_owned(&mut cur).unwrap() == "kafka");
255 }
256
257 #[test]
258 fn compact_nullable_string_null() {
259 let mut buf = BytesMut::new();
260 put_compact_nullable_string(&mut buf, None);
261 assert!(&buf[..] == &[0x00]);
262 let mut cur = &buf[..];
263 assert!(get_compact_nullable_string_owned(&mut cur).unwrap() == None);
264 }
265
266 #[test]
267 fn empty_compact_string() {
268 let mut buf = BytesMut::new();
269 put_compact_string(&mut buf, "");
270 assert!(&buf[..] == &[0x01]); let mut cur = &buf[..];
272 assert!(get_compact_string_owned(&mut cur).unwrap() == "");
273 }
274
275 #[test]
276 fn bytes_roundtrip() {
277 let mut buf = BytesMut::new();
278 put_bytes(&mut buf, &[1, 2, 3]);
279 let mut cur = &buf[..];
280 let out = get_bytes_owned(&mut cur).unwrap();
281 assert!(out.as_ref() == &[1, 2, 3]);
282 }
283
284 #[test]
285 fn invalid_utf8_is_rejected() {
286 let bytes = [0x00, 0x02, 0xC3, 0x28];
288 let mut cur = &bytes[..];
289 assert!(matches!(
290 get_string_owned(&mut cur),
291 Err(ProtocolError::InvalidUtf8(_))
292 ));
293 }
294}