Skip to main content

crabka_protocol/records/
borrowed.rs

1//! Borrowed `RecordBatch<'a>`, `Record<'a>`, and `RecordHeader<'a>`.
2
3use bytes::Bytes;
4use zerocopy::FromBytes as _;
5
6use crate::primitives::varint::{get_varint, get_varlong};
7use crate::records::RecordsError;
8use crate::records::crc::{crc32c, crc32c_append};
9use crate::records::header::{Attributes, HEADER_LEN, RecordBatchHeader};
10
11// batch_length field semantics: bytes after itself.
12// Header tail = partition_leader_epoch(4) + magic(1) + crc(4) +
13//   attributes(2) + last_offset_delta(4) + base_timestamp(8) +
14//   max_timestamp(8) + producer_id(8) + producer_epoch(2) +
15//   base_sequence(4) + records_count(4) = 49 bytes.
16const HEADER_TAIL_LEN: i32 = 49;
17
18pub struct RecordBatch<'a> {
19    pub(crate) header: &'a RecordBatchHeader,
20    pub(crate) body: RecordBody<'a>,
21}
22
23pub(crate) enum RecordBody<'a> {
24    Borrowed(&'a [u8]),
25    Owned(Bytes),
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct Record<'a> {
30    pub attributes: i8,
31    pub timestamp_delta: i64,
32    pub offset_delta: i32,
33    pub key: Option<&'a [u8]>,
34    pub value: Option<&'a [u8]>,
35    pub headers: Vec<RecordHeader<'a>>,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct RecordHeader<'a> {
40    pub key: &'a str,
41    pub value: Option<&'a [u8]>,
42}
43
44impl RecordBatch<'_> {
45    #[must_use]
46    pub fn header(&self) -> &RecordBatchHeader {
47        self.header
48    }
49
50    #[must_use]
51    pub fn attributes(&self) -> Attributes {
52        Attributes(self.header.attributes.get())
53    }
54}
55
56impl<'a> Default for RecordBatch<'a> {
57    /// Returns an empty record batch with a zeroed header and no records body.
58    /// Intended for use in generated `Default` impls and round-trip tests; not
59    /// suitable for constructing a real Kafka record batch.
60    fn default() -> Self {
61        use zerocopy::FromZeros as _;
62        // RecordBatchHeader derives zerocopy::FromZeros (via FromBytes), so zeroing is safe.
63        let header: &'a RecordBatchHeader = Box::leak(Box::new(RecordBatchHeader::new_zeroed()));
64        Self {
65            header,
66            body: RecordBody::Owned(bytes::Bytes::new()),
67        }
68    }
69}
70
71// ── Decode ────────────────────────────────────────────────────────────────────
72
73impl<'de> crate::DecodeBorrow<'de> for RecordBatch<'de> {
74    fn decode_borrow(buf: &mut &'de [u8], _version: i16) -> Result<Self, crate::ProtocolError> {
75        decode_borrow_impl(buf).map_err(Into::into)
76    }
77}
78
79fn decode_borrow_impl<'de>(buf: &mut &'de [u8]) -> Result<RecordBatch<'de>, RecordsError> {
80    if buf.len() < HEADER_LEN {
81        return Err(RecordsError::HeaderTooShort {
82            needed: HEADER_LEN - buf.len(),
83        });
84    }
85    // Split off the header slice — both slices remain tied to 'de.
86    let (hdr_slice, rest) = buf.split_at(HEADER_LEN);
87    let hdr: &'de RecordBatchHeader =
88        RecordBatchHeader::ref_from_bytes(hdr_slice).map_err(|_| RecordsError::ZerocopyFailure)?;
89    if hdr.magic != 2 {
90        return Err(RecordsError::UnsupportedMagic { found: hdr.magic });
91    }
92
93    let body_len = i32::checked_sub(hdr.batch_length.get(), HEADER_TAIL_LEN)
94        .and_then(|n| usize::try_from(n).ok())
95        .ok_or_else(|| RecordsError::RecordParse("negative or oversized batch_length".into()))?;
96
97    if rest.len() < body_len {
98        return Err(RecordsError::BodyTooShort {
99            needed: body_len - rest.len(),
100        });
101    }
102    let (raw_body, after) = rest.split_at(body_len);
103    *buf = after;
104
105    // CRC: hash header[21..HEADER_LEN] (attributes through records_count)
106    // then append the raw_body bytes.
107    let expected = hdr.crc.get();
108    let mut computed = crc32c(&hdr_slice[21..HEADER_LEN]);
109    computed = crc32c_append(computed, raw_body);
110    if computed != expected {
111        return Err(RecordsError::CrcMismatch { expected, computed });
112    }
113
114    let attributes = Attributes(hdr.attributes.get());
115    let codec = attributes.compression();
116    let body = if codec == crabka_compression::CompressionType::None {
117        RecordBody::Borrowed(raw_body)
118    } else {
119        let decompressed = crabka_compression::decompress(codec, raw_body)?;
120        RecordBody::Owned(decompressed)
121    };
122
123    Ok(RecordBatch { header: hdr, body })
124}
125
126// ── Iteration ─────────────────────────────────────────────────────────────────
127
128impl RecordBatch<'_> {
129    /// Iterate over records, parsing each lazily.
130    ///
131    /// The returned `Record<'b>` items borrow from `self` (lifetime `'b`),
132    /// not from the original input buffer. For uncompressed batches the
133    /// backing memory is the input buffer; for compressed batches it is the
134    /// batch's internal decompressed `Bytes`.
135    pub fn iter(&self) -> RecordIter<'_> {
136        let body: &[u8] = match &self.body {
137            RecordBody::Borrowed(s) => s,
138            RecordBody::Owned(b) => b.as_ref(),
139        };
140        #[allow(clippy::cast_sign_loss)] // guarded by .max(0) above
141        let count = self.header.records_count.get().max(0) as usize;
142        RecordIter {
143            remaining: body,
144            count,
145            index: 0,
146        }
147    }
148}
149
150impl<'a> IntoIterator for &'a RecordBatch<'_> {
151    type Item = Result<Record<'a>, RecordsError>;
152    type IntoIter = RecordIter<'a>;
153
154    fn into_iter(self) -> Self::IntoIter {
155        self.iter()
156    }
157}
158
159pub struct RecordIter<'a> {
160    remaining: &'a [u8],
161    count: usize,
162    index: usize,
163}
164
165impl<'a> Iterator for RecordIter<'a> {
166    type Item = Result<Record<'a>, RecordsError>;
167
168    fn next(&mut self) -> Option<Self::Item> {
169        if self.index >= self.count {
170            return None;
171        }
172        self.index += 1;
173        Some(parse_one_record(&mut self.remaining))
174    }
175}
176
177fn parse_one_record<'a>(buf: &mut &'a [u8]) -> Result<Record<'a>, RecordsError> {
178    let body_len =
179        get_varlong(buf).map_err(|e| RecordsError::RecordParse(format!("record length: {e}")))?;
180    let body_len = usize::try_from(body_len).map_err(|_| {
181        RecordsError::RecordParse(format!("record length negative or too large: {body_len}"))
182    })?;
183    if buf.len() < body_len {
184        return Err(RecordsError::BodyTooShort {
185            needed: body_len - buf.len(),
186        });
187    }
188    let (body, rest) = buf.split_at(body_len);
189    *buf = rest;
190    let mut body_cur = body;
191    let r = parse_body(&mut body_cur)?;
192    if !body_cur.is_empty() {
193        return Err(RecordsError::RecordParse(format!(
194            "trailing bytes inside record (left={})",
195            body_cur.len()
196        )));
197    }
198    Ok(r)
199}
200
201fn parse_body<'a>(buf: &mut &'a [u8]) -> Result<Record<'a>, RecordsError> {
202    if buf.is_empty() {
203        return Err(RecordsError::RecordParse("record body empty".into()));
204    }
205    #[allow(clippy::cast_possible_wrap)] // intentional: Kafka attributes are i8 on the wire
206    let attributes = buf[0] as i8;
207    *buf = &buf[1..];
208    let timestamp_delta =
209        get_varlong(buf).map_err(|e| RecordsError::RecordParse(format!("timestamp_delta: {e}")))?;
210    let offset_delta =
211        get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("offset_delta: {e}")))?;
212
213    let key = read_nullable_slice(buf, "key")?;
214    let value = read_nullable_slice(buf, "value")?;
215
216    let header_count =
217        get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("header_count: {e}")))?;
218    if header_count < 0 {
219        return Err(RecordsError::RecordParse(format!(
220            "negative header count {header_count}"
221        )));
222    }
223    #[allow(clippy::cast_sign_loss)] // guarded by < 0 check above
224    let mut headers = Vec::with_capacity(header_count as usize);
225    for i in 0..header_count {
226        let key_len = get_varint(buf)
227            .map_err(|e| RecordsError::RecordParse(format!("header[{i}] key length: {e}")))?;
228        if key_len < 0 {
229            return Err(RecordsError::RecordParse(format!(
230                "header[{i}] negative key length"
231            )));
232        }
233        #[allow(clippy::cast_sign_loss)] // guarded by < 0 check above
234        let n = key_len as usize;
235        if buf.len() < n {
236            return Err(RecordsError::BodyTooShort {
237                needed: n - buf.len(),
238            });
239        }
240        let (key_bytes, rest) = buf.split_at(n);
241        *buf = rest;
242        let key_str = std::str::from_utf8(key_bytes)
243            .map_err(|e| RecordsError::RecordParse(format!("header[{i}] key utf-8: {e}")))?;
244
245        let value = read_nullable_slice(buf, &format!("header[{i}] value"))?;
246        headers.push(RecordHeader {
247            key: key_str,
248            value,
249        });
250    }
251
252    Ok(Record {
253        attributes,
254        timestamp_delta,
255        offset_delta,
256        key,
257        value,
258        headers,
259    })
260}
261
262fn read_nullable_slice<'a>(
263    buf: &mut &'a [u8],
264    label: &str,
265) -> Result<Option<&'a [u8]>, RecordsError> {
266    let len =
267        get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("{label} length: {e}")))?;
268    if len < 0 {
269        Ok(None)
270    } else {
271        #[allow(clippy::cast_sign_loss)] // guarded by < 0 check above
272        let n = len as usize;
273        if buf.len() < n {
274            return Err(RecordsError::BodyTooShort {
275                needed: n - buf.len(),
276            });
277        }
278        let (head, rest) = buf.split_at(n);
279        *buf = rest;
280        Ok(Some(head))
281    }
282}
283
284// ── to_owned bridge ───────────────────────────────────────────────────────────
285
286impl RecordBatch<'_> {
287    /// Materialise an owned `RecordBatch` by copying every byte slice into
288    /// `Bytes` / `String`.
289    pub fn to_owned(&self) -> Result<super::owned::RecordBatch, RecordsError> {
290        let mut records = Vec::new();
291        for r in self {
292            let r = r?;
293            records.push(super::owned::Record {
294                attributes: r.attributes,
295                timestamp_delta: r.timestamp_delta,
296                offset_delta: r.offset_delta,
297                key: r.key.map(Bytes::copy_from_slice),
298                value: r.value.map(Bytes::copy_from_slice),
299                headers: r
300                    .headers
301                    .into_iter()
302                    .map(|h| super::owned::RecordHeader {
303                        key: h.key.to_string(),
304                        value: h.value.map(Bytes::copy_from_slice),
305                    })
306                    .collect(),
307            });
308        }
309        Ok(super::owned::RecordBatch {
310            base_offset: self.header.base_offset.get(),
311            partition_leader_epoch: self.header.partition_leader_epoch.get(),
312            attributes: self.attributes(),
313            last_offset_delta: self.header.last_offset_delta.get(),
314            base_timestamp: self.header.base_timestamp.get(),
315            max_timestamp: self.header.max_timestamp.get(),
316            producer_id: self.header.producer_id.get(),
317            producer_epoch: self.header.producer_epoch.get(),
318            base_sequence: self.header.base_sequence.get(),
319            records,
320        })
321    }
322}
323
324// ── Debug / Clone / PartialEq / Eq ────────────────────────────────────────────
325//
326// RecordBatch<'a> holds a header reference and a body slice/bytes, so we can't
327// #[derive] these. We provide hand-rolled impls that delegate to the owned type.
328
329impl std::fmt::Debug for RecordBatch<'_> {
330    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331        match self.to_owned() {
332            Ok(o) => o.fmt(f),
333            Err(e) => write!(f, "RecordBatch(<decode error: {e}>)"),
334        }
335    }
336}
337
338impl Clone for RecordBatch<'_> {
339    /// Shallow clone: both `header` and `body` share the same underlying
340    /// data as `self`.  For a `Borrowed` body this is a reference copy;
341    /// for an `Owned` body, `Bytes::clone` is a cheap reference-count bump.
342    fn clone(&self) -> Self {
343        RecordBatch {
344            header: self.header,
345            body: match &self.body {
346                RecordBody::Borrowed(s) => RecordBody::Borrowed(s),
347                RecordBody::Owned(b) => RecordBody::Owned(b.clone()),
348            },
349        }
350    }
351}
352
353impl PartialEq for RecordBatch<'_> {
354    fn eq(&self, other: &Self) -> bool {
355        match (self.to_owned(), other.to_owned()) {
356            (Ok(a), Ok(b)) => a == b,
357            _ => false,
358        }
359    }
360}
361
362impl Eq for RecordBatch<'_> {}
363
364// ── Encode trait impl ─────────────────────────────────────────────────────────
365
366impl crate::Encode for RecordBatch<'_> {
367    fn encode<B: bytes::BufMut>(
368        &self,
369        buf: &mut B,
370        version: i16,
371    ) -> Result<(), crate::ProtocolError> {
372        let owned = self.to_owned().map_err(crate::ProtocolError::from)?;
373        crate::Encode::encode(&owned, buf, version)
374    }
375
376    fn encoded_len(&self, version: i16) -> usize {
377        match self.to_owned() {
378            Ok(o) => crate::Encode::encoded_len(&o, version),
379            Err(_) => 0,
380        }
381    }
382}
383
384// ── Tests ─────────────────────────────────────────────────────────────────────
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389    use crate::DecodeBorrow;
390    use assert2::assert;
391    use bytes::BytesMut;
392    use crabka_compression::CompressionType;
393
394    fn encode_owned_then_borrow(b: &super::super::owned::RecordBatch) -> Vec<u8> {
395        let mut buf = BytesMut::new();
396        b.encode(&mut buf).unwrap();
397        buf.to_vec()
398    }
399
400    macro_rules! borrowed_roundtrip {
401        ($name:ident, $codec:expr) => {
402            #[test]
403            fn $name() {
404                let mut owned = super::super::owned::RecordBatch::default();
405                owned.attributes = owned.attributes.with_compression($codec);
406                owned.records.push(super::super::owned::Record {
407                    key: Some(Bytes::from_static(b"key")),
408                    value: Some(Bytes::from_static(b"value")),
409                    ..Default::default()
410                });
411
412                let encoded = encode_owned_then_borrow(&owned);
413                let mut cur: &[u8] = &encoded[..];
414                let borrowed = RecordBatch::decode_borrow(&mut cur, 0).unwrap();
415                assert!(cur.is_empty());
416                assert!(borrowed.attributes() == owned.attributes);
417
418                let records: Vec<_> = borrowed.iter().collect::<Result<_, _>>().unwrap();
419                assert!(records.len() == 1);
420                assert!(records[0].key == Some(b"key".as_slice()));
421                assert!(records[0].value == Some(b"value".as_slice()));
422
423                let back_owned = borrowed.to_owned().unwrap();
424                assert!(back_owned == owned);
425            }
426        };
427    }
428
429    borrowed_roundtrip!(roundtrip_none, CompressionType::None);
430    borrowed_roundtrip!(roundtrip_gzip, CompressionType::Gzip);
431    borrowed_roundtrip!(roundtrip_snappy, CompressionType::Snappy);
432    borrowed_roundtrip!(roundtrip_lz4, CompressionType::Lz4);
433    borrowed_roundtrip!(roundtrip_zstd, CompressionType::Zstd);
434
435    #[test]
436    fn zero_copy_for_uncompressed() {
437        // Pointer-identity: record key/value slices must point into the
438        // input buffer for uncompressed batches.
439        let mut owned = super::super::owned::RecordBatch::default();
440        owned.records.push(super::super::owned::Record {
441            key: Some(Bytes::from_static(b"k")),
442            value: Some(Bytes::from_static(b"v")),
443            ..Default::default()
444        });
445        let encoded = encode_owned_then_borrow(&owned);
446        let encoded_start = encoded.as_ptr() as usize;
447        let encoded_end = encoded_start + encoded.len();
448
449        let mut cur: &[u8] = &encoded[..];
450        let borrowed = RecordBatch::decode_borrow(&mut cur, 0).unwrap();
451        let records: Vec<_> = borrowed.iter().collect::<Result<_, _>>().unwrap();
452
453        let v_ptr = records[0].value.unwrap().as_ptr() as usize;
454        assert!(
455            v_ptr >= encoded_start && v_ptr < encoded_end,
456            "value slice does not point into the input buffer: \
457             input range [{encoded_start:#x}, {encoded_end:#x}), value ptr {v_ptr:#x}",
458        );
459    }
460
461    #[test]
462    fn borrowed_encode_via_trait_roundtrips() {
463        use crate::Encode as _;
464        let owned_in = super::super::owned::RecordBatch {
465            records: vec![super::super::owned::Record {
466                key: Some(Bytes::from_static(b"x")),
467                value: Some(Bytes::from_static(b"y")),
468                ..Default::default()
469            }],
470            ..Default::default()
471        };
472        let bytes_in = encode_owned_then_borrow(&owned_in);
473        let mut cur: &[u8] = &bytes_in[..];
474        let borrowed = RecordBatch::decode_borrow(&mut cur, 0).unwrap();
475
476        let mut out = BytesMut::new();
477        borrowed.encode(&mut out, 0).unwrap();
478        assert!(&out[..] == &bytes_in[..]);
479    }
480}