Skip to main content

crabka_protocol/primitives/
string_bytes.rs

1use bytes::{Buf, BufMut, Bytes};
2
3use crate::ProtocolError;
4use crate::primitives::fixed::{get_i16, get_i32, put_i16, put_i32};
5use crate::primitives::varint::{get_uvarint, put_uvarint, uvarint_len};
6
7// ---- STRING (non-flexible) ----
8// Wire: INT16 length (>=0), then `length` bytes UTF-8. -1 = null.
9
10pub fn put_string<B: BufMut>(buf: &mut B, s: &str) {
11    let len = i16::try_from(s.len()).expect("string length must fit in i16");
12    put_i16(buf, len);
13    buf.put_slice(s.as_bytes());
14}
15
16pub fn put_nullable_string<B: BufMut>(buf: &mut B, s: Option<&str>) {
17    match s {
18        None => put_i16(buf, -1),
19        Some(s) => put_string(buf, s),
20    }
21}
22
23pub fn get_string_owned<B: Buf>(buf: &mut B) -> Result<String, ProtocolError> {
24    match get_nullable_string_owned(buf)? {
25        Some(s) => Ok(s),
26        None => Err(ProtocolError::InvalidValue("non-nullable STRING was null")),
27    }
28}
29
30pub fn get_nullable_string_owned<B: Buf>(buf: &mut B) -> Result<Option<String>, ProtocolError> {
31    let len = get_i16(buf)?;
32    if len < 0 {
33        return Ok(None);
34    }
35    #[allow(clippy::cast_sign_loss)]
36    let n = len as usize;
37    if buf.remaining() < n {
38        return Err(ProtocolError::UnexpectedEof {
39            needed: n - buf.remaining(),
40        });
41    }
42    let mut v = vec![0u8; n];
43    buf.copy_to_slice(&mut v);
44    let s = String::from_utf8(v).map_err(|e| ProtocolError::InvalidUtf8(e.utf8_error()))?;
45    Ok(Some(s))
46}
47
48#[must_use]
49pub fn string_len(s: &str) -> usize {
50    2 + s.len()
51}
52#[must_use]
53pub fn nullable_string_len(s: Option<&str>) -> usize {
54    2 + s.map_or(0, str::len)
55}
56
57// ---- COMPACT_STRING (flexible) ----
58// Wire: UVARINT length+1 (0 = null), then `length` UTF-8 bytes.
59
60pub fn put_compact_string<B: BufMut>(buf: &mut B, s: &str) {
61    let len = u32::try_from(s.len() + 1).expect("string length too large");
62    put_uvarint(buf, len);
63    buf.put_slice(s.as_bytes());
64}
65
66pub fn put_compact_nullable_string<B: BufMut>(buf: &mut B, s: Option<&str>) {
67    match s {
68        None => put_uvarint(buf, 0),
69        Some(s) => put_compact_string(buf, s),
70    }
71}
72
73pub fn get_compact_string_owned<B: Buf>(buf: &mut B) -> Result<String, ProtocolError> {
74    match get_compact_nullable_string_owned(buf)? {
75        Some(s) => Ok(s),
76        None => Err(ProtocolError::InvalidValue(
77            "non-nullable COMPACT_STRING was null",
78        )),
79    }
80}
81
82pub fn get_compact_nullable_string_owned<B: Buf>(
83    buf: &mut B,
84) -> Result<Option<String>, ProtocolError> {
85    let raw = get_uvarint(buf)?;
86    if raw == 0 {
87        return Ok(None);
88    }
89    let n = (raw - 1) as usize;
90    if buf.remaining() < n {
91        return Err(ProtocolError::UnexpectedEof {
92            needed: n - buf.remaining(),
93        });
94    }
95    let mut v = vec![0u8; n];
96    buf.copy_to_slice(&mut v);
97    let s = String::from_utf8(v).map_err(|e| ProtocolError::InvalidUtf8(e.utf8_error()))?;
98    Ok(Some(s))
99}
100
101#[must_use]
102pub fn compact_string_len(s: &str) -> usize {
103    uvarint_len(u32::try_from(s.len() + 1).unwrap()) + s.len()
104}
105#[must_use]
106pub fn compact_nullable_string_len(s: Option<&str>) -> usize {
107    match s {
108        None => uvarint_len(0),
109        Some(s) => compact_string_len(s),
110    }
111}
112
113// ---- BYTES / COMPACT_BYTES ----
114// BYTES: INT32 length, `length` bytes. -1 = null.
115// COMPACT_BYTES: UVARINT length+1 (0=null), `length` bytes.
116
117pub fn put_bytes<B: BufMut>(buf: &mut B, b: &[u8]) {
118    let len = i32::try_from(b.len()).expect("bytes length must fit in i32");
119    put_i32(buf, len);
120    buf.put_slice(b);
121}
122
123pub fn put_nullable_bytes<B: BufMut>(buf: &mut B, b: Option<&[u8]>) {
124    match b {
125        None => put_i32(buf, -1),
126        Some(b) => put_bytes(buf, b),
127    }
128}
129
130pub fn get_bytes_owned<B: Buf>(buf: &mut B) -> Result<Bytes, ProtocolError> {
131    match get_nullable_bytes_owned(buf)? {
132        Some(b) => Ok(b),
133        None => Err(ProtocolError::InvalidValue("non-nullable BYTES was null")),
134    }
135}
136
137pub fn get_nullable_bytes_owned<B: Buf>(buf: &mut B) -> Result<Option<Bytes>, ProtocolError> {
138    let len = get_i32(buf)?;
139    if len < 0 {
140        return Ok(None);
141    }
142    #[allow(clippy::cast_sign_loss)]
143    let n = len as usize;
144    if buf.remaining() < n {
145        return Err(ProtocolError::UnexpectedEof {
146            needed: n - buf.remaining(),
147        });
148    }
149    let mut v = vec![0u8; n];
150    buf.copy_to_slice(&mut v);
151    Ok(Some(Bytes::from(v)))
152}
153
154#[must_use]
155pub fn bytes_len(b: &[u8]) -> usize {
156    4 + b.len()
157}
158#[must_use]
159pub fn nullable_bytes_len(b: Option<&[u8]>) -> usize {
160    4 + b.map_or(0, <[u8]>::len)
161}
162
163pub fn put_compact_bytes<B: BufMut>(buf: &mut B, b: &[u8]) {
164    let len = u32::try_from(b.len() + 1).expect("bytes length too large");
165    put_uvarint(buf, len);
166    buf.put_slice(b);
167}
168
169pub fn put_compact_nullable_bytes<B: BufMut>(buf: &mut B, b: Option<&[u8]>) {
170    match b {
171        None => put_uvarint(buf, 0),
172        Some(b) => put_compact_bytes(buf, b),
173    }
174}
175
176#[must_use]
177pub fn compact_bytes_len(b: &[u8]) -> usize {
178    uvarint_len(u32::try_from(b.len() + 1).unwrap()) + b.len()
179}
180
181/// Like `compact_bytes_len` but takes the byte-count directly rather than a slice.
182/// Useful when the content size is known without materialising the buffer.
183#[must_use]
184pub fn compact_bytes_len_from_size(n: usize) -> usize {
185    uvarint_len(u32::try_from(n + 1).unwrap()) + n
186}
187#[must_use]
188pub fn compact_nullable_bytes_len(b: Option<&[u8]>) -> usize {
189    match b {
190        None => uvarint_len(0),
191        Some(b) => compact_bytes_len(b),
192    }
193}
194
195pub fn get_compact_bytes_owned<B: Buf>(buf: &mut B) -> Result<Bytes, ProtocolError> {
196    match get_compact_nullable_bytes_owned(buf)? {
197        Some(b) => Ok(b),
198        None => Err(ProtocolError::InvalidValue(
199            "non-nullable COMPACT_BYTES was null",
200        )),
201    }
202}
203
204pub fn get_compact_nullable_bytes_owned<B: Buf>(
205    buf: &mut B,
206) -> Result<Option<Bytes>, ProtocolError> {
207    let raw = get_uvarint(buf)?;
208    if raw == 0 {
209        return Ok(None);
210    }
211    let n = (raw - 1) as usize;
212    if buf.remaining() < n {
213        return Err(ProtocolError::UnexpectedEof {
214            needed: n - buf.remaining(),
215        });
216    }
217    let mut v = vec![0u8; n];
218    buf.copy_to_slice(&mut v);
219    Ok(Some(Bytes::from(v)))
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use bytes::BytesMut;
226
227    #[test]
228    fn string_roundtrip() {
229        let mut buf = BytesMut::new();
230        put_string(&mut buf, "kafka");
231        // INT16(5) + bytes
232        assert_eq!(&buf[..], &[0x00, 0x05, b'k', b'a', b'f', b'k', b'a']);
233        let mut cur = &buf[..];
234        assert_eq!(get_string_owned(&mut cur).unwrap(), "kafka");
235    }
236
237    #[test]
238    fn nullable_string_null() {
239        let mut buf = BytesMut::new();
240        put_nullable_string(&mut buf, None);
241        assert_eq!(&buf[..], &[0xFF, 0xFF]);
242        let mut cur = &buf[..];
243        assert_eq!(get_nullable_string_owned(&mut cur).unwrap(), None);
244    }
245
246    #[test]
247    fn compact_string_roundtrip() {
248        let mut buf = BytesMut::new();
249        put_compact_string(&mut buf, "kafka");
250        // UVARINT(6) + bytes
251        assert_eq!(&buf[..], &[0x06, b'k', b'a', b'f', b'k', b'a']);
252        let mut cur = &buf[..];
253        assert_eq!(get_compact_string_owned(&mut cur).unwrap(), "kafka");
254    }
255
256    #[test]
257    fn compact_nullable_string_null() {
258        let mut buf = BytesMut::new();
259        put_compact_nullable_string(&mut buf, None);
260        assert_eq!(&buf[..], &[0x00]);
261        let mut cur = &buf[..];
262        assert_eq!(get_compact_nullable_string_owned(&mut cur).unwrap(), None);
263    }
264
265    #[test]
266    fn empty_compact_string() {
267        let mut buf = BytesMut::new();
268        put_compact_string(&mut buf, "");
269        assert_eq!(&buf[..], &[0x01]); // length = 1 means "0 bytes"
270        let mut cur = &buf[..];
271        assert_eq!(get_compact_string_owned(&mut cur).unwrap(), "");
272    }
273
274    #[test]
275    fn bytes_roundtrip() {
276        let mut buf = BytesMut::new();
277        put_bytes(&mut buf, &[1, 2, 3]);
278        let mut cur = &buf[..];
279        let out = get_bytes_owned(&mut cur).unwrap();
280        assert_eq!(out.as_ref(), &[1, 2, 3]);
281    }
282
283    #[test]
284    fn invalid_utf8_is_rejected() {
285        // INT16(2) + invalid UTF-8 byte sequence
286        let bytes = [0x00, 0x02, 0xC3, 0x28];
287        let mut cur = &bytes[..];
288        assert!(matches!(
289            get_string_owned(&mut cur),
290            Err(ProtocolError::InvalidUtf8(_))
291        ));
292    }
293}