Skip to main content

crabka_protocol/primitives/
string_bytes.rs

1use bytes::{Buf, BufMut, Bytes};
2
3use crate::ProtocolError;
4use crate::primitives::fixed::{get_i16, get_i32, put_i16, put_i32};
5use crate::primitives::varint::{get_uvarint, put_uvarint, uvarint_len};
6
7// ---- STRING (non-flexible) ----
8// Wire: INT16 length (>=0), then `length` bytes UTF-8. -1 = null.
9
10pub fn put_string<B: BufMut>(buf: &mut B, s: &str) {
11    let len = i16::try_from(s.len()).expect("string length must fit in i16");
12    put_i16(buf, len);
13    buf.put_slice(s.as_bytes());
14}
15
16pub fn put_nullable_string<B: BufMut>(buf: &mut B, s: Option<&str>) {
17    match s {
18        None => put_i16(buf, -1),
19        Some(s) => put_string(buf, s),
20    }
21}
22
23pub fn get_string_owned<B: Buf>(buf: &mut B) -> Result<String, ProtocolError> {
24    match get_nullable_string_owned(buf)? {
25        Some(s) => Ok(s),
26        None => Err(ProtocolError::InvalidValue("non-nullable STRING was null")),
27    }
28}
29
30pub fn get_nullable_string_owned<B: Buf>(buf: &mut B) -> Result<Option<String>, ProtocolError> {
31    let len = get_i16(buf)?;
32    if len < 0 {
33        return Ok(None);
34    }
35    #[allow(clippy::cast_sign_loss)]
36    let n = len as usize;
37    if buf.remaining() < n {
38        return Err(ProtocolError::UnexpectedEof {
39            needed: n - buf.remaining(),
40        });
41    }
42    let mut v = vec![0u8; n];
43    buf.copy_to_slice(&mut v);
44    let s = String::from_utf8(v).map_err(|e| ProtocolError::InvalidUtf8(e.utf8_error()))?;
45    Ok(Some(s))
46}
47
48#[must_use]
49pub fn string_len(s: &str) -> usize {
50    2 + s.len()
51}
52#[must_use]
53pub fn nullable_string_len(s: Option<&str>) -> usize {
54    2 + s.map_or(0, str::len)
55}
56
57// ---- COMPACT_STRING (flexible) ----
58// Wire: UVARINT length+1 (0 = null), then `length` UTF-8 bytes.
59
60pub fn put_compact_string<B: BufMut>(buf: &mut B, s: &str) {
61    let len = u32::try_from(s.len() + 1).expect("string length too large");
62    put_uvarint(buf, len);
63    buf.put_slice(s.as_bytes());
64}
65
66pub fn put_compact_nullable_string<B: BufMut>(buf: &mut B, s: Option<&str>) {
67    match s {
68        None => put_uvarint(buf, 0),
69        Some(s) => put_compact_string(buf, s),
70    }
71}
72
73pub fn get_compact_string_owned<B: Buf>(buf: &mut B) -> Result<String, ProtocolError> {
74    match get_compact_nullable_string_owned(buf)? {
75        Some(s) => Ok(s),
76        None => Err(ProtocolError::InvalidValue(
77            "non-nullable COMPACT_STRING was null",
78        )),
79    }
80}
81
82pub fn get_compact_nullable_string_owned<B: Buf>(
83    buf: &mut B,
84) -> Result<Option<String>, ProtocolError> {
85    let raw = get_uvarint(buf)?;
86    if raw == 0 {
87        return Ok(None);
88    }
89    let n = (raw - 1) as usize;
90    if buf.remaining() < n {
91        return Err(ProtocolError::UnexpectedEof {
92            needed: n - buf.remaining(),
93        });
94    }
95    let mut v = vec![0u8; n];
96    buf.copy_to_slice(&mut v);
97    let s = String::from_utf8(v).map_err(|e| ProtocolError::InvalidUtf8(e.utf8_error()))?;
98    Ok(Some(s))
99}
100
101#[must_use]
102pub fn compact_string_len(s: &str) -> usize {
103    uvarint_len(u32::try_from(s.len() + 1).unwrap()) + s.len()
104}
105#[must_use]
106pub fn compact_nullable_string_len(s: Option<&str>) -> usize {
107    match s {
108        None => uvarint_len(0),
109        Some(s) => compact_string_len(s),
110    }
111}
112
113// ---- BYTES / COMPACT_BYTES ----
114// BYTES: INT32 length, `length` bytes. -1 = null.
115// COMPACT_BYTES: UVARINT length+1 (0=null), `length` bytes.
116
117pub fn put_bytes<B: BufMut>(buf: &mut B, b: &[u8]) {
118    let len = i32::try_from(b.len()).expect("bytes length must fit in i32");
119    put_i32(buf, len);
120    buf.put_slice(b);
121}
122
123pub fn put_nullable_bytes<B: BufMut>(buf: &mut B, b: Option<&[u8]>) {
124    match b {
125        None => put_i32(buf, -1),
126        Some(b) => put_bytes(buf, b),
127    }
128}
129
130pub fn get_bytes_owned<B: Buf>(buf: &mut B) -> Result<Bytes, ProtocolError> {
131    match get_nullable_bytes_owned(buf)? {
132        Some(b) => Ok(b),
133        None => Err(ProtocolError::InvalidValue("non-nullable BYTES was null")),
134    }
135}
136
137pub fn get_nullable_bytes_owned<B: Buf>(buf: &mut B) -> Result<Option<Bytes>, ProtocolError> {
138    let len = get_i32(buf)?;
139    if len < 0 {
140        return Ok(None);
141    }
142    #[allow(clippy::cast_sign_loss)]
143    let n = len as usize;
144    if buf.remaining() < n {
145        return Err(ProtocolError::UnexpectedEof {
146            needed: n - buf.remaining(),
147        });
148    }
149    let mut v = vec![0u8; n];
150    buf.copy_to_slice(&mut v);
151    Ok(Some(Bytes::from(v)))
152}
153
154#[must_use]
155pub fn bytes_len(b: &[u8]) -> usize {
156    4 + b.len()
157}
158#[must_use]
159pub fn nullable_bytes_len(b: Option<&[u8]>) -> usize {
160    4 + b.map_or(0, <[u8]>::len)
161}
162
163pub fn put_compact_bytes<B: BufMut>(buf: &mut B, b: &[u8]) {
164    let len = u32::try_from(b.len() + 1).expect("bytes length too large");
165    put_uvarint(buf, len);
166    buf.put_slice(b);
167}
168
169pub fn put_compact_nullable_bytes<B: BufMut>(buf: &mut B, b: Option<&[u8]>) {
170    match b {
171        None => put_uvarint(buf, 0),
172        Some(b) => put_compact_bytes(buf, b),
173    }
174}
175
176#[must_use]
177pub fn compact_bytes_len(b: &[u8]) -> usize {
178    uvarint_len(u32::try_from(b.len() + 1).unwrap()) + b.len()
179}
180
181/// Like `compact_bytes_len` but takes the byte-count directly rather than a slice.
182/// Useful when the content size is known without materialising the buffer.
183#[must_use]
184pub fn compact_bytes_len_from_size(n: usize) -> usize {
185    uvarint_len(u32::try_from(n + 1).unwrap()) + n
186}
187#[must_use]
188pub fn compact_nullable_bytes_len(b: Option<&[u8]>) -> usize {
189    match b {
190        None => uvarint_len(0),
191        Some(b) => compact_bytes_len(b),
192    }
193}
194
195pub fn get_compact_bytes_owned<B: Buf>(buf: &mut B) -> Result<Bytes, ProtocolError> {
196    match get_compact_nullable_bytes_owned(buf)? {
197        Some(b) => Ok(b),
198        None => Err(ProtocolError::InvalidValue(
199            "non-nullable COMPACT_BYTES was null",
200        )),
201    }
202}
203
204pub fn get_compact_nullable_bytes_owned<B: Buf>(
205    buf: &mut B,
206) -> Result<Option<Bytes>, ProtocolError> {
207    let raw = get_uvarint(buf)?;
208    if raw == 0 {
209        return Ok(None);
210    }
211    let n = (raw - 1) as usize;
212    if buf.remaining() < n {
213        return Err(ProtocolError::UnexpectedEof {
214            needed: n - buf.remaining(),
215        });
216    }
217    let mut v = vec![0u8; n];
218    buf.copy_to_slice(&mut v);
219    Ok(Some(Bytes::from(v)))
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use assert2::assert;
226    use bytes::BytesMut;
227
228    #[test]
229    fn string_roundtrip() {
230        let mut buf = BytesMut::new();
231        put_string(&mut buf, "kafka");
232        // INT16(5) + bytes
233        assert!(&buf[..] == &[0x00, 0x05, b'k', b'a', b'f', b'k', b'a']);
234        let mut cur = &buf[..];
235        assert!(get_string_owned(&mut cur).unwrap() == "kafka");
236    }
237
238    #[test]
239    fn nullable_string_null() {
240        let mut buf = BytesMut::new();
241        put_nullable_string(&mut buf, None);
242        assert!(&buf[..] == &[0xFF, 0xFF]);
243        let mut cur = &buf[..];
244        assert!(get_nullable_string_owned(&mut cur).unwrap() == None);
245    }
246
247    #[test]
248    fn compact_string_roundtrip() {
249        let mut buf = BytesMut::new();
250        put_compact_string(&mut buf, "kafka");
251        // UVARINT(6) + bytes
252        assert!(&buf[..] == &[0x06, b'k', b'a', b'f', b'k', b'a']);
253        let mut cur = &buf[..];
254        assert!(get_compact_string_owned(&mut cur).unwrap() == "kafka");
255    }
256
257    #[test]
258    fn compact_nullable_string_null() {
259        let mut buf = BytesMut::new();
260        put_compact_nullable_string(&mut buf, None);
261        assert!(&buf[..] == &[0x00]);
262        let mut cur = &buf[..];
263        assert!(get_compact_nullable_string_owned(&mut cur).unwrap() == None);
264    }
265
266    #[test]
267    fn empty_compact_string() {
268        let mut buf = BytesMut::new();
269        put_compact_string(&mut buf, "");
270        assert!(&buf[..] == &[0x01]); // length = 1 means "0 bytes"
271        let mut cur = &buf[..];
272        assert!(get_compact_string_owned(&mut cur).unwrap() == "");
273    }
274
275    #[test]
276    fn bytes_roundtrip() {
277        let mut buf = BytesMut::new();
278        put_bytes(&mut buf, &[1, 2, 3]);
279        let mut cur = &buf[..];
280        let out = get_bytes_owned(&mut cur).unwrap();
281        assert!(out.as_ref() == &[1, 2, 3]);
282    }
283
284    #[test]
285    fn invalid_utf8_is_rejected() {
286        // INT16(2) + invalid UTF-8 byte sequence
287        let bytes = [0x00, 0x02, 0xC3, 0x28];
288        let mut cur = &bytes[..];
289        assert!(matches!(
290            get_string_owned(&mut cur),
291            Err(ProtocolError::InvalidUtf8(_))
292        ));
293    }
294}