Skip to main content

crabka_protocol/records/
borrowed.rs

1//! Borrowed `RecordBatch<'a>`, `Record<'a>`, and `RecordHeader<'a>`.
2
3use bytes::Bytes;
4use zerocopy::FromBytes as _;
5
6use crate::primitives::varint::{get_varint, get_varlong};
7use crate::records::RecordsError;
8use crate::records::crc::{crc32c, crc32c_append};
9use crate::records::header::{Attributes, HEADER_LEN, RecordBatchHeader};
10
11// batch_length field semantics: bytes after itself.
12// Header tail = partition_leader_epoch(4) + magic(1) + crc(4) +
13//   attributes(2) + last_offset_delta(4) + base_timestamp(8) +
14//   max_timestamp(8) + producer_id(8) + producer_epoch(2) +
15//   base_sequence(4) + records_count(4) = 49 bytes.
16const HEADER_TAIL_LEN: i32 = 49;
17
18pub struct RecordBatch<'a> {
19    pub(crate) header: &'a RecordBatchHeader,
20    pub(crate) body: RecordBody<'a>,
21}
22
23pub(crate) enum RecordBody<'a> {
24    Borrowed(&'a [u8]),
25    Owned(Bytes),
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Record<'a> {
30    pub attributes: i8,
31    pub timestamp_delta: i64,
32    pub offset_delta: i32,
33    pub key: Option<&'a [u8]>,
34    pub value: Option<&'a [u8]>,
35    pub headers: Vec<RecordHeader<'a>>,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct RecordHeader<'a> {
40    pub key: &'a str,
41    pub value: Option<&'a [u8]>,
42}
43
44impl RecordBatch<'_> {
45    #[must_use]
46    pub fn header(&self) -> &RecordBatchHeader {
47        self.header
48    }
49
50    #[must_use]
51    pub fn attributes(&self) -> Attributes {
52        Attributes(self.header.attributes.get())
53    }
54}
55
56impl<'a> Default for RecordBatch<'a> {
57    /// Returns an empty record batch with a zeroed header and no records body.
58    /// Intended for use in generated `Default` impls and round-trip tests; not
59    /// suitable for constructing a real Kafka record batch.
60    fn default() -> Self {
61        use zerocopy::FromZeros as _;
62        // RecordBatchHeader derives zerocopy::FromZeros (via FromBytes), so zeroing is safe.
63        let header: &'a RecordBatchHeader = Box::leak(Box::new(RecordBatchHeader::new_zeroed()));
64        Self {
65            header,
66            body: RecordBody::Owned(bytes::Bytes::new()),
67        }
68    }
69}
70
71// ── Decode ────────────────────────────────────────────────────────────────────
72
73impl<'de> crate::DecodeBorrow<'de> for RecordBatch<'de> {
74    fn decode_borrow(buf: &mut &'de [u8], _version: i16) -> Result<Self, crate::ProtocolError> {
75        decode_borrow_impl(buf).map_err(Into::into)
76    }
77}
78
79fn decode_borrow_impl<'de>(buf: &mut &'de [u8]) -> Result<RecordBatch<'de>, RecordsError> {
80    if buf.len() < HEADER_LEN {
81        return Err(RecordsError::HeaderTooShort {
82            needed: HEADER_LEN - buf.len(),
83        });
84    }
85    // Split off the header slice — both slices remain tied to 'de.
86    let (hdr_slice, rest) = buf.split_at(HEADER_LEN);
87    let hdr: &'de RecordBatchHeader =
88        RecordBatchHeader::ref_from_bytes(hdr_slice).map_err(|_| RecordsError::ZerocopyFailure)?;
89    if hdr.magic != 2 {
90        return Err(RecordsError::UnsupportedMagic { found: hdr.magic });
91    }
92
93    let body_len = i32::checked_sub(hdr.batch_length.get(), HEADER_TAIL_LEN)
94        .and_then(|n| usize::try_from(n).ok())
95        .ok_or_else(|| RecordsError::RecordParse("negative or oversized batch_length".into()))?;
96
97    if rest.len() < body_len {
98        return Err(RecordsError::BodyTooShort {
99            needed: body_len - rest.len(),
100        });
101    }
102    let (raw_body, after) = rest.split_at(body_len);
103    *buf = after;
104
105    // CRC: hash header[21..HEADER_LEN] (attributes through records_count)
106    // then append the raw_body bytes.
107    let expected = hdr.crc.get();
108    let mut computed = crc32c(&hdr_slice[21..HEADER_LEN]);
109    computed = crc32c_append(computed, raw_body);
110    if computed != expected {
111        return Err(RecordsError::CrcMismatch { expected, computed });
112    }
113
114    let attributes = Attributes(hdr.attributes.get());
115    let codec = attributes.compression();
116    let body = if codec == crabka_compression::CompressionType::None {
117        RecordBody::Borrowed(raw_body)
118    } else {
119        // Bound decompressed output: generous vs. legit ratios, but finite.
120        // A small compressed batch must not be able to expand to gigabytes and
121        // OOM the broker (decompression bomb).
122        const DECOMPRESS_MIN_CAP: usize = 16 * 1024 * 1024; // 16 MiB floor (small inputs)
123        const DECOMPRESS_MAX_RATIO: usize = 100; // ≤100x the compressed size
124        const DECOMPRESS_ABSOLUTE_CEILING: usize = 1024 * 1024 * 1024; // 1 GiB hard ceiling
125        let max_output = raw_body
126            .len()
127            .saturating_mul(DECOMPRESS_MAX_RATIO)
128            .clamp(DECOMPRESS_MIN_CAP, DECOMPRESS_ABSOLUTE_CEILING);
129        let decompressed = crabka_compression::decompress(codec, raw_body, max_output)?;
130        RecordBody::Owned(decompressed)
131    };
132
133    Ok(RecordBatch { header: hdr, body })
134}
135
136// ── Iteration ─────────────────────────────────────────────────────────────────
137
138impl RecordBatch<'_> {
139    /// Iterate over records, parsing each lazily.
140    ///
141    /// The returned `Record<'b>` items borrow from `self` (lifetime `'b`),
142    /// not from the original input buffer. For uncompressed batches the
143    /// backing memory is the input buffer; for compressed batches it is the
144    /// batch's internal decompressed `Bytes`.
145    pub fn iter(&self) -> RecordIter<'_> {
146        let body: &[u8] = match &self.body {
147            RecordBody::Borrowed(s) => s,
148            RecordBody::Owned(b) => b.as_ref(),
149        };
150        #[allow(clippy::cast_sign_loss)] // guarded by .max(0) above
151        let count = self.header.records_count.get().max(0) as usize;
152        RecordIter {
153            remaining: body,
154            count,
155            index: 0,
156        }
157    }
158}
159
160impl<'a> IntoIterator for &'a RecordBatch<'_> {
161    type Item = Result<Record<'a>, RecordsError>;
162    type IntoIter = RecordIter<'a>;
163
164    fn into_iter(self) -> Self::IntoIter {
165        self.iter()
166    }
167}
168
169pub struct RecordIter<'a> {
170    remaining: &'a [u8],
171    count: usize,
172    index: usize,
173}
174
175impl<'a> Iterator for RecordIter<'a> {
176    type Item = Result<Record<'a>, RecordsError>;
177
178    fn next(&mut self) -> Option<Self::Item> {
179        if self.index >= self.count {
180            return None;
181        }
182        self.index += 1;
183        Some(parse_one_record(&mut self.remaining))
184    }
185}
186
187fn parse_one_record<'a>(buf: &mut &'a [u8]) -> Result<Record<'a>, RecordsError> {
188    let body_len =
189        get_varlong(buf).map_err(|e| RecordsError::RecordParse(format!("record length: {e}")))?;
190    let body_len = usize::try_from(body_len).map_err(|_| {
191        RecordsError::RecordParse(format!("record length negative or too large: {body_len}"))
192    })?;
193    if buf.len() < body_len {
194        return Err(RecordsError::BodyTooShort {
195            needed: body_len - buf.len(),
196        });
197    }
198    let (body, rest) = buf.split_at(body_len);
199    *buf = rest;
200    let mut body_cur = body;
201    let r = parse_body(&mut body_cur)?;
202    if !body_cur.is_empty() {
203        return Err(RecordsError::RecordParse(format!(
204            "trailing bytes inside record (left={})",
205            body_cur.len()
206        )));
207    }
208    Ok(r)
209}
210
211fn parse_body<'a>(buf: &mut &'a [u8]) -> Result<Record<'a>, RecordsError> {
212    if buf.is_empty() {
213        return Err(RecordsError::RecordParse("record body empty".into()));
214    }
215    #[allow(clippy::cast_possible_wrap)] // intentional: Kafka attributes are i8 on the wire
216    let attributes = buf[0] as i8;
217    *buf = &buf[1..];
218    let timestamp_delta =
219        get_varlong(buf).map_err(|e| RecordsError::RecordParse(format!("timestamp_delta: {e}")))?;
220    let offset_delta =
221        get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("offset_delta: {e}")))?;
222
223    let key = read_nullable_slice(buf, "key")?;
224    let value = read_nullable_slice(buf, "value")?;
225
226    let header_count =
227        get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("header_count: {e}")))?;
228    if header_count < 0 {
229        return Err(RecordsError::RecordParse(format!(
230            "negative header count {header_count}"
231        )));
232    }
233    // Bound pre-allocation: a record header is at least 1 byte on the wire, so
234    // an honest `header_count` can never exceed the bytes left in the record
235    // body. Clamp the capacity hint to reject huge declared counts.
236    #[allow(clippy::cast_sign_loss)] // guarded by < 0 check above
237    let mut headers = Vec::with_capacity((header_count as usize).min(buf.len()));
238    for i in 0..header_count {
239        let key_len = get_varint(buf)
240            .map_err(|e| RecordsError::RecordParse(format!("header[{i}] key length: {e}")))?;
241        if key_len < 0 {
242            return Err(RecordsError::RecordParse(format!(
243                "header[{i}] negative key length"
244            )));
245        }
246        #[allow(clippy::cast_sign_loss)] // guarded by < 0 check above
247        let n = key_len as usize;
248        if buf.len() < n {
249            return Err(RecordsError::BodyTooShort {
250                needed: n - buf.len(),
251            });
252        }
253        let (key_bytes, rest) = buf.split_at(n);
254        *buf = rest;
255        let key_str = std::str::from_utf8(key_bytes)
256            .map_err(|e| RecordsError::RecordParse(format!("header[{i}] key utf-8: {e}")))?;
257
258        let value = read_nullable_slice(buf, &format!("header[{i}] value"))?;
259        headers.push(RecordHeader {
260            key: key_str,
261            value,
262        });
263    }
264
265    Ok(Record {
266        attributes,
267        timestamp_delta,
268        offset_delta,
269        key,
270        value,
271        headers,
272    })
273}
274
275fn read_nullable_slice<'a>(
276    buf: &mut &'a [u8],
277    label: &str,
278) -> Result<Option<&'a [u8]>, RecordsError> {
279    let len =
280        get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("{label} length: {e}")))?;
281    if len < 0 {
282        Ok(None)
283    } else {
284        #[allow(clippy::cast_sign_loss)] // guarded by < 0 check above
285        let n = len as usize;
286        if buf.len() < n {
287            return Err(RecordsError::BodyTooShort {
288                needed: n - buf.len(),
289            });
290        }
291        let (head, rest) = buf.split_at(n);
292        *buf = rest;
293        Ok(Some(head))
294    }
295}
296
297// ── to_owned bridge ───────────────────────────────────────────────────────────
298
299impl RecordBatch<'_> {
300    /// Materialise an owned `RecordBatch` by copying every byte slice into
301    /// `Bytes` / `String`.
302    pub fn to_owned(&self) -> Result<super::owned::RecordBatch, RecordsError> {
303        let mut records = Vec::new();
304        for r in self {
305            let r = r?;
306            records.push(super::owned::Record {
307                attributes: r.attributes,
308                timestamp_delta: r.timestamp_delta,
309                offset_delta: r.offset_delta,
310                key: r.key.map(Bytes::copy_from_slice),
311                value: r.value.map(Bytes::copy_from_slice),
312                headers: r
313                    .headers
314                    .into_iter()
315                    .map(|h| super::owned::RecordHeader {
316                        key: h.key.to_string(),
317                        value: h.value.map(Bytes::copy_from_slice),
318                    })
319                    .collect(),
320            });
321        }
322        Ok(super::owned::RecordBatch {
323            base_offset: self.header.base_offset.get(),
324            partition_leader_epoch: self.header.partition_leader_epoch.get(),
325            attributes: self.attributes(),
326            last_offset_delta: self.header.last_offset_delta.get(),
327            base_timestamp: self.header.base_timestamp.get(),
328            max_timestamp: self.header.max_timestamp.get(),
329            producer_id: self.header.producer_id.get(),
330            producer_epoch: self.header.producer_epoch.get(),
331            base_sequence: self.header.base_sequence.get(),
332            records,
333        })
334    }
335}
336
337// ── Debug / Clone / PartialEq / Eq ────────────────────────────────────────────
338//
339// RecordBatch<'a> holds a header reference and a body slice/bytes, so we can't
340// #[derive] these. We provide hand-rolled impls that delegate to the owned type.
341
342impl std::fmt::Debug for RecordBatch<'_> {
343    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
344        match self.to_owned() {
345            Ok(o) => o.fmt(f),
346            Err(e) => write!(f, "RecordBatch(<decode error: {e}>)"),
347        }
348    }
349}
350
351impl Clone for RecordBatch<'_> {
352    /// Shallow clone: both `header` and `body` share the same underlying
353    /// data as `self`.  For a `Borrowed` body this is a reference copy;
354    /// for an `Owned` body, `Bytes::clone` is a cheap reference-count bump.
355    fn clone(&self) -> Self {
356        RecordBatch {
357            header: self.header,
358            body: match &self.body {
359                RecordBody::Borrowed(s) => RecordBody::Borrowed(s),
360                RecordBody::Owned(b) => RecordBody::Owned(b.clone()),
361            },
362        }
363    }
364}
365
366impl PartialEq for RecordBatch<'_> {
367    fn eq(&self, other: &Self) -> bool {
368        match (self.to_owned(), other.to_owned()) {
369            (Ok(a), Ok(b)) => a == b,
370            _ => false,
371        }
372    }
373}
374
375impl Eq for RecordBatch<'_> {}
376
377// ── Encode trait impl ─────────────────────────────────────────────────────────
378
379impl crate::Encode for RecordBatch<'_> {
380    fn encode<B: bytes::BufMut>(
381        &self,
382        buf: &mut B,
383        version: i16,
384    ) -> Result<(), crate::ProtocolError> {
385        let owned = self.to_owned().map_err(crate::ProtocolError::from)?;
386        crate::Encode::encode(&owned, buf, version)
387    }
388
389    fn encoded_len(&self, version: i16) -> usize {
390        match self.to_owned() {
391            Ok(o) => crate::Encode::encoded_len(&o, version),
392            Err(_) => 0,
393        }
394    }
395}
396
397// ── Tests ─────────────────────────────────────────────────────────────────────
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402    use crate::DecodeBorrow;
403    use assert2::assert;
404    use bytes::BytesMut;
405    use crabka_compression::CompressionType;
406
407    fn encode_owned_then_borrow(b: &super::super::owned::RecordBatch) -> Vec<u8> {
408        let mut buf = BytesMut::new();
409        b.encode(&mut buf).unwrap();
410        buf.to_vec()
411    }
412
413    macro_rules! borrowed_roundtrip {
414        ($name:ident, $codec:expr) => {
415            #[test]
416            fn $name() {
417                let mut owned = super::super::owned::RecordBatch::default();
418                owned.attributes = owned.attributes.with_compression($codec);
419                owned.records.push(super::super::owned::Record {
420                    key: Some(Bytes::from_static(b"key")),
421                    value: Some(Bytes::from_static(b"value")),
422                    ..Default::default()
423                });
424
425                let encoded = encode_owned_then_borrow(&owned);
426                let mut cur: &[u8] = &encoded[..];
427                let borrowed = RecordBatch::decode_borrow(&mut cur, 0).unwrap();
428                assert!(cur.is_empty());
429                assert!(borrowed.attributes() == owned.attributes);
430
431                let records: Vec<_> = borrowed.iter().collect::<Result<_, _>>().unwrap();
432                assert!(records.len() == 1);
433                assert!(records[0].key == Some(b"key".as_slice()));
434                assert!(records[0].value == Some(b"value".as_slice()));
435
436                let back_owned = borrowed.to_owned().unwrap();
437                assert!(back_owned == owned);
438            }
439        };
440    }
441
442    borrowed_roundtrip!(roundtrip_none, CompressionType::None);
443    borrowed_roundtrip!(roundtrip_gzip, CompressionType::Gzip);
444    borrowed_roundtrip!(roundtrip_snappy, CompressionType::Snappy);
445    borrowed_roundtrip!(roundtrip_lz4, CompressionType::Lz4);
446    borrowed_roundtrip!(roundtrip_zstd, CompressionType::Zstd);
447
448    #[test]
449    fn zero_copy_for_uncompressed() {
450        // Pointer-identity: record key/value slices must point into the
451        // input buffer for uncompressed batches.
452        let mut owned = super::super::owned::RecordBatch::default();
453        owned.records.push(super::super::owned::Record {
454            key: Some(Bytes::from_static(b"k")),
455            value: Some(Bytes::from_static(b"v")),
456            ..Default::default()
457        });
458        let encoded = encode_owned_then_borrow(&owned);
459        let encoded_start = encoded.as_ptr() as usize;
460        let encoded_end = encoded_start + encoded.len();
461
462        let mut cur: &[u8] = &encoded[..];
463        let borrowed = RecordBatch::decode_borrow(&mut cur, 0).unwrap();
464        let records: Vec<_> = borrowed.iter().collect::<Result<_, _>>().unwrap();
465
466        let v_ptr = records[0].value.unwrap().as_ptr() as usize;
467        assert!(
468            v_ptr >= encoded_start && v_ptr < encoded_end,
469            "value slice does not point into the input buffer: \
470             input range [{encoded_start:#x}, {encoded_end:#x}), value ptr {v_ptr:#x}",
471        );
472    }
473
474    #[test]
475    fn borrowed_encode_via_trait_roundtrips() {
476        use crate::Encode as _;
477        let owned_in = super::super::owned::RecordBatch {
478            records: vec![super::super::owned::Record {
479                key: Some(Bytes::from_static(b"x")),
480                value: Some(Bytes::from_static(b"y")),
481                ..Default::default()
482            }],
483            ..Default::default()
484        };
485        let bytes_in = encode_owned_then_borrow(&owned_in);
486        let mut cur: &[u8] = &bytes_in[..];
487        let borrowed = RecordBatch::decode_borrow(&mut cur, 0).unwrap();
488
489        let mut out = BytesMut::new();
490        borrowed.encode(&mut out, 0).unwrap();
491        assert!(&out[..] == &bytes_in[..]);
492    }
493}