Skip to main content

crabka_protocol/records/
owned.rs

1//! Owned `RecordBatch`, `Record`, and `RecordHeader` types.
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use zerocopy::FromBytes as _;
5
6use crate::primitives::varint::{
7    get_varint, get_varlong, put_varint, put_varlong, varint_len, varlong_len,
8};
9use crate::records::RecordsError;
10use crate::records::crc::{crc32c, crc32c_append};
11use crate::records::header::{Attributes, HEADER_LEN};
12
13#[derive(Debug, Clone, PartialEq, Eq, Default)]
14pub struct RecordHeader {
15    pub key: String,
16    pub value: Option<Bytes>,
17}
18
19#[derive(Debug, Clone, PartialEq, Eq, Default)]
20pub struct Record {
21    pub attributes: i8,
22    pub timestamp_delta: i64,
23    pub offset_delta: i32,
24    pub key: Option<Bytes>,
25    pub value: Option<Bytes>,
26    pub headers: Vec<RecordHeader>,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct RecordBatch {
31    pub base_offset: i64,
32    pub partition_leader_epoch: i32,
33    pub attributes: Attributes,
34    pub last_offset_delta: i32,
35    pub base_timestamp: i64,
36    pub max_timestamp: i64,
37    pub producer_id: i64,
38    pub producer_epoch: i16,
39    pub base_sequence: i32,
40    pub records: Vec<Record>,
41}
42
43impl Default for RecordBatch {
44    fn default() -> Self {
45        Self {
46            base_offset: 0,
47            partition_leader_epoch: 0,
48            attributes: Attributes::default(),
49            last_offset_delta: 0,
50            base_timestamp: 0,
51            max_timestamp: 0,
52            producer_id: -1, // sentinel: non-idempotent
53            producer_epoch: -1,
54            base_sequence: -1,
55            records: Vec::new(),
56        }
57    }
58}
59
60impl Record {
61    /// Encode a single record (varlong length prefix + fields) into `buf`.
62    pub fn encode<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
63        let body_len = self.body_len();
64        put_varlong(
65            buf,
66            i64::try_from(body_len)
67                .map_err(|_| RecordsError::RecordParse("record body length overflow".into()))?,
68        );
69        self.encode_body(buf)
70    }
71
72    /// Predicted total length of this record on the wire (length-prefix + body).
73    pub fn encoded_len(&self) -> usize {
74        let body = self.body_len();
75        #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
76        let body_i64 = body as i64;
77        varlong_len(body_i64) + body
78    }
79
80    fn body_len(&self) -> usize {
81        let mut n = 1; // attributes (i8)
82        n += varlong_len(self.timestamp_delta);
83        n += varint_len(self.offset_delta);
84        n += match &self.key {
85            None => varint_len(-1),
86            Some(k) => varint_len(i32::try_from(k.len()).unwrap_or(i32::MAX)) + k.len(),
87        };
88        n += match &self.value {
89            None => varint_len(-1),
90            Some(v) => varint_len(i32::try_from(v.len()).unwrap_or(i32::MAX)) + v.len(),
91        };
92        n += varint_len(i32::try_from(self.headers.len()).unwrap_or(i32::MAX));
93        for h in &self.headers {
94            let key_bytes = h.key.as_bytes();
95            n += varint_len(i32::try_from(key_bytes.len()).unwrap_or(i32::MAX)) + key_bytes.len();
96            n += match &h.value {
97                None => varint_len(-1),
98                Some(v) => varint_len(i32::try_from(v.len()).unwrap_or(i32::MAX)) + v.len(),
99            };
100        }
101        n
102    }
103
104    fn encode_body<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
105        buf.put_i8(self.attributes);
106        put_varlong(buf, self.timestamp_delta);
107        put_varint(buf, self.offset_delta);
108        match &self.key {
109            None => put_varint(buf, -1),
110            Some(k) => {
111                put_varint(
112                    buf,
113                    i32::try_from(k.len()).map_err(|_| {
114                        RecordsError::RecordParse("record key length overflow".into())
115                    })?,
116                );
117                buf.put_slice(k);
118            }
119        }
120        match &self.value {
121            None => put_varint(buf, -1),
122            Some(v) => {
123                put_varint(
124                    buf,
125                    i32::try_from(v.len()).map_err(|_| {
126                        RecordsError::RecordParse("record value length overflow".into())
127                    })?,
128                );
129                buf.put_slice(v);
130            }
131        }
132        put_varint(
133            buf,
134            i32::try_from(self.headers.len())
135                .map_err(|_| RecordsError::RecordParse("record header count overflow".into()))?,
136        );
137        for h in &self.headers {
138            let key_bytes = h.key.as_bytes();
139            put_varint(
140                buf,
141                i32::try_from(key_bytes.len())
142                    .map_err(|_| RecordsError::RecordParse("header key length overflow".into()))?,
143            );
144            buf.put_slice(key_bytes);
145            match &h.value {
146                None => put_varint(buf, -1),
147                Some(v) => {
148                    put_varint(
149                        buf,
150                        i32::try_from(v.len()).map_err(|_| {
151                            RecordsError::RecordParse("header value length overflow".into())
152                        })?,
153                    );
154                    buf.put_slice(v);
155                }
156            }
157        }
158        Ok(())
159    }
160
161    /// Decode a single record. `buf` must be positioned at the record's
162    /// varlong length prefix.
163    pub fn decode<B: Buf>(buf: &mut B) -> Result<Self, RecordsError> {
164        let body_len = get_varlong(buf)
165            .map_err(|e| RecordsError::RecordParse(format!("record length: {e}")))?;
166        let body_len = usize::try_from(body_len).map_err(|_| {
167            RecordsError::RecordParse(format!("record length negative or too large: {body_len}"))
168        })?;
169        if buf.remaining() < body_len {
170            return Err(RecordsError::BodyTooShort {
171                needed: body_len - buf.remaining(),
172            });
173        }
174        // Restrict to body_len bytes so a malformed inner field doesn't run
175        // past the record boundary.
176        let mut body = buf.take(body_len);
177        let r = Self::decode_body(&mut body)?;
178        // Trailing bytes inside the record's claimed length — protocol corruption.
179        if body.has_remaining() {
180            return Err(RecordsError::RecordParse(format!(
181                "trailing bytes inside record (left={})",
182                body.remaining()
183            )));
184        }
185        Ok(r)
186    }
187
188    fn decode_body<B: Buf>(buf: &mut B) -> Result<Self, RecordsError> {
189        if buf.remaining() == 0 {
190            return Err(RecordsError::RecordParse("record body empty".into()));
191        }
192        let attributes = buf.get_i8();
193        let timestamp_delta = get_varlong(buf)
194            .map_err(|e| RecordsError::RecordParse(format!("timestamp_delta: {e}")))?;
195        let offset_delta =
196            get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("offset_delta: {e}")))?;
197
198        let key = decode_nullable_bytes(buf, "key")?;
199        let value = decode_nullable_bytes(buf, "value")?;
200
201        let header_count =
202            get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("header_count: {e}")))?;
203        if header_count < 0 {
204            return Err(RecordsError::RecordParse(format!(
205                "negative header count {header_count}"
206            )));
207        }
208        #[allow(clippy::cast_sign_loss)] // checked < 0 above
209        let header_count_usize = header_count as usize;
210        // Bound pre-allocation: a record header is at least 1 byte on the wire,
211        // so an honest `header_count` can never exceed the bytes left in the
212        // record body. Clamp the capacity hint to reject huge declared counts
213        // without affecting the loop (it stops at EOF anyway).
214        let mut headers = Vec::with_capacity(header_count_usize.min(buf.remaining()));
215        for i in 0..header_count {
216            headers.push(
217                decode_record_header(buf)
218                    .map_err(|e| RecordsError::RecordParse(format!("header[{i}]: {e}")))?,
219            );
220        }
221
222        Ok(Self {
223            attributes,
224            timestamp_delta,
225            offset_delta,
226            key,
227            value,
228            headers,
229        })
230    }
231}
232
233fn decode_nullable_bytes<B: Buf>(buf: &mut B, label: &str) -> Result<Option<Bytes>, RecordsError> {
234    let len =
235        get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("{label} length: {e}")))?;
236    if len < 0 {
237        Ok(None)
238    } else {
239        #[allow(clippy::cast_sign_loss)] // checked < 0 above
240        let n = len as usize;
241        if buf.remaining() < n {
242            return Err(RecordsError::BodyTooShort {
243                needed: n - buf.remaining(),
244            });
245        }
246        let mut v = vec![0u8; n];
247        buf.copy_to_slice(&mut v);
248        Ok(Some(Bytes::from(v)))
249    }
250}
251
252fn decode_record_header<B: Buf>(buf: &mut B) -> Result<RecordHeader, String> {
253    let key_len = get_varint(buf).map_err(|e| format!("key length: {e}"))?;
254    if key_len < 0 {
255        return Err(format!("non-nullable key has negative length {key_len}"));
256    }
257    #[allow(clippy::cast_sign_loss)] // checked < 0 above
258    let n = key_len as usize;
259    if buf.remaining() < n {
260        return Err(format!("key truncated (need {} more)", n - buf.remaining()));
261    }
262    let mut kv = vec![0u8; n];
263    buf.copy_to_slice(&mut kv);
264    let key = String::from_utf8(kv).map_err(|e| format!("key utf-8: {e}"))?;
265
266    let value_len = get_varint(buf).map_err(|e| format!("value length: {e}"))?;
267    let value = if value_len < 0 {
268        None
269    } else {
270        #[allow(clippy::cast_sign_loss)] // checked < 0 above
271        let n = value_len as usize;
272        if buf.remaining() < n {
273            return Err(format!(
274                "value truncated (need {} more)",
275                n - buf.remaining()
276            ));
277        }
278        let mut vv = vec![0u8; n];
279        buf.copy_to_slice(&mut vv);
280        Some(Bytes::from(vv))
281    };
282
283    Ok(RecordHeader { key, value })
284}
285
286#[cfg(test)]
287mod record_tests {
288    use super::*;
289    use assert2::assert;
290    use bytes::BytesMut;
291
292    fn fixture_minimal_record() -> Record {
293        Record {
294            attributes: 0,
295            timestamp_delta: 0,
296            offset_delta: 0,
297            key: None,
298            value: None,
299            headers: vec![],
300        }
301    }
302
303    fn fixture_keyed_record() -> Record {
304        Record {
305            attributes: 0,
306            timestamp_delta: 17,
307            offset_delta: 2,
308            key: Some(Bytes::from_static(b"the-key")),
309            value: Some(Bytes::from_static(b"hello kafka")),
310            headers: vec![
311                RecordHeader {
312                    key: "trace-id".to_string(),
313                    value: Some(Bytes::from_static(b"abc")),
314                },
315                RecordHeader {
316                    key: "null-val".to_string(),
317                    value: None,
318                },
319            ],
320        }
321    }
322
323    fn fixture_large_payload_record() -> Record {
324        Record {
325            attributes: 0,
326            timestamp_delta: 1_000_000,
327            offset_delta: 999,
328            key: Some(Bytes::from(vec![b'k'; 128])),
329            value: Some(Bytes::from(vec![b'v'; 4096])),
330            headers: vec![],
331        }
332    }
333
334    macro_rules! roundtrip {
335        ($name:ident, $fixture:ident) => {
336            #[test]
337            fn $name() {
338                let r = $fixture();
339                let mut buf = BytesMut::new();
340                r.encode(&mut buf).unwrap();
341                assert!(buf.len() == r.encoded_len(), "predicted len mismatch");
342
343                let mut cur: &[u8] = &buf[..];
344                let decoded = Record::decode(&mut cur).unwrap();
345                assert!(decoded == r);
346                assert!(cur.is_empty(), "trailing bytes after decode");
347            }
348        };
349    }
350
351    roundtrip!(minimal, fixture_minimal_record);
352    roundtrip!(keyed_with_headers, fixture_keyed_record);
353    roundtrip!(large_payload, fixture_large_payload_record);
354
355    #[test]
356    fn decode_rejects_negative_header_count() {
357        let mut buf = BytesMut::new();
358        // body: attributes(1) + timestamp_delta(1) + offset_delta(1) + key=-1(1)
359        //       + value=-1(1) + headers=-1(1) = 6 bytes body
360        put_varlong(&mut buf, 6); // body length 6 bytes
361        buf.put_i8(0); // attributes
362        put_varlong(&mut buf, 0); // timestamp_delta = 0  (1 byte)
363        put_varint(&mut buf, 0); // offset_delta = 0     (1 byte)
364        put_varint(&mut buf, -1); // key len               (1 byte)
365        put_varint(&mut buf, -1); // value len             (1 byte)
366        put_varint(&mut buf, -1); // negative header count (1 byte)
367
368        let mut cur: &[u8] = &buf[..];
369        match Record::decode(&mut cur) {
370            Err(RecordsError::RecordParse(msg)) => {
371                assert!(msg.contains("negative header count"), "got: {msg}");
372            }
373            other => panic!("expected RecordParse, got {other:?}"),
374        }
375    }
376
377    #[test]
378    fn decode_huge_header_count_does_not_overallocate() {
379        // A record that declares ~1 billion headers but supplies none. The
380        // capacity hint must be clamped to the (tiny) body remaining, and the
381        // decode must fail cleanly on EOF rather than attempting a multi-GB
382        // allocation.
383        let mut inner = BytesMut::new();
384        inner.put_i8(0); // attributes
385        put_varlong(&mut inner, 0); // timestamp_delta
386        put_varint(&mut inner, 0); // offset_delta
387        put_varint(&mut inner, -1); // key len = null
388        put_varint(&mut inner, -1); // value len = null
389        put_varint(&mut inner, 1_000_000_000); // absurd header count
390
391        let mut buf = BytesMut::new();
392        put_varlong(&mut buf, i64::try_from(inner.len()).unwrap());
393        buf.extend_from_slice(&inner);
394
395        let mut cur: &[u8] = &buf[..];
396        // Must return an Err (EOF reading the first header), not OOM.
397        assert!(Record::decode(&mut cur).is_err());
398    }
399}
400
401impl RecordBatch {
402    /// Decode a complete v2 record batch from `buf`. Reads from the start of
403    /// the header.
404    pub fn decode<B: Buf>(buf: &mut B) -> Result<Self, RecordsError> {
405        // batch_length field semantics: bytes after itself.
406        // Header tail = partition_leader_epoch(4) + magic(1) + crc(4) +
407        //   attributes(2) + last_offset_delta(4) + base_timestamp(8) +
408        //   max_timestamp(8) + producer_id(8) + producer_epoch(2) +
409        //   base_sequence(4) + records_count(4) = 49 bytes.
410        const HEADER_TAIL_LEN: i32 = 49;
411
412        // Need the full header before doing anything.
413        if buf.remaining() < HEADER_LEN {
414            return Err(RecordsError::HeaderTooShort {
415                needed: HEADER_LEN - buf.remaining(),
416            });
417        }
418        // Copy out the header to a stack buffer so we can use zerocopy.
419        let mut hdr_bytes = [0u8; HEADER_LEN];
420        buf.copy_to_slice(&mut hdr_bytes);
421
422        let hdr = crate::records::header::RecordBatchHeader::ref_from_bytes(&hdr_bytes[..])
423            .map_err(|_| RecordsError::ZerocopyFailure)?;
424
425        if hdr.magic != 2 {
426            return Err(RecordsError::UnsupportedMagic { found: hdr.magic });
427        }
428
429        // body_len = batch_length - HEADER_TAIL_LEN
430        let body_len = i32::checked_sub(hdr.batch_length.get(), HEADER_TAIL_LEN)
431            .and_then(|n| usize::try_from(n).ok())
432            .ok_or_else(|| {
433                RecordsError::RecordParse("negative or oversized batch_length".into())
434            })?;
435
436        if buf.remaining() < body_len {
437            return Err(RecordsError::BodyTooShort {
438                needed: body_len - buf.remaining(),
439            });
440        }
441
442        // Read the (possibly compressed) body.
443        let mut body = vec![0u8; body_len];
444        buf.copy_to_slice(&mut body);
445
446        // CRC is computed over: header bytes 21..HEADER_LEN (attributes through
447        // records_count), then the body bytes.
448        let expected_crc = hdr.crc.get();
449        let mut computed = crc32c(&hdr_bytes[21..HEADER_LEN]);
450        computed = crc32c_append(computed, &body);
451        if computed != expected_crc {
452            return Err(RecordsError::CrcMismatch {
453                expected: expected_crc,
454                computed,
455            });
456        }
457
458        let attributes = Attributes(hdr.attributes.get());
459        let codec = attributes.compression();
460
461        // Decompress body if needed.
462        let body_for_records: Bytes = if codec == crabka_compression::CompressionType::None {
463            Bytes::from(body)
464        } else {
465            // Bound decompressed output: generous vs. legit ratios, but finite.
466            // A small compressed batch must not be able to expand to gigabytes
467            // and OOM the broker (decompression bomb).
468            const DECOMPRESS_MIN_CAP: usize = 16 * 1024 * 1024; // 16 MiB floor (small inputs)
469            const DECOMPRESS_MAX_RATIO: usize = 100; // ≤100x the compressed size
470            const DECOMPRESS_ABSOLUTE_CEILING: usize = 1024 * 1024 * 1024; // 1 GiB hard ceiling
471            let max_output = body
472                .len()
473                .saturating_mul(DECOMPRESS_MAX_RATIO)
474                .clamp(DECOMPRESS_MIN_CAP, DECOMPRESS_ABSOLUTE_CEILING);
475            crabka_compression::decompress(codec, &body, max_output)?
476        };
477
478        // Parse records.
479        let count = hdr.records_count.get();
480        if count < 0 {
481            return Err(RecordsError::RecordParse(format!(
482                "negative records_count {count}"
483            )));
484        }
485        let mut body_cur: &[u8] = &body_for_records[..];
486        // Bound pre-allocation: each record is at least 1 byte in the
487        // decompressed body, so an honest `records_count` can never exceed the
488        // body length. Clamp the capacity hint to reject huge declared counts.
489        #[allow(clippy::cast_sign_loss)] // checked < 0 above
490        let mut records = Vec::with_capacity((count as usize).min(body_for_records.len()));
491        for i in 0..count {
492            records.push(
493                Record::decode(&mut body_cur)
494                    .map_err(|e| RecordsError::RecordParse(format!("record[{i}]: {e}")))?,
495            );
496        }
497        if !body_cur.is_empty() {
498            return Err(RecordsError::RecordParse(format!(
499                "trailing bytes after records (left={})",
500                body_cur.len()
501            )));
502        }
503
504        Ok(Self {
505            base_offset: hdr.base_offset.get(),
506            partition_leader_epoch: hdr.partition_leader_epoch.get(),
507            attributes,
508            last_offset_delta: hdr.last_offset_delta.get(),
509            base_timestamp: hdr.base_timestamp.get(),
510            max_timestamp: hdr.max_timestamp.get(),
511            producer_id: hdr.producer_id.get(),
512            producer_epoch: hdr.producer_epoch.get(),
513            base_sequence: hdr.base_sequence.get(),
514            records,
515        })
516    }
517
518    /// Encode this batch into `buf`.
519    pub fn encode<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
520        const HEADER_TAIL_LEN: i32 = 49;
521
522        // 1. Encode records into a temporary buffer.
523        let mut raw_body =
524            BytesMut::with_capacity(self.records.iter().map(Record::encoded_len).sum());
525        for r in &self.records {
526            r.encode(&mut raw_body)?;
527        }
528        let raw_body = raw_body.freeze();
529
530        // 2. Compress if needed.
531        let codec = self.attributes.compression();
532        let body: Bytes = if codec == crabka_compression::CompressionType::None {
533            raw_body
534        } else {
535            crabka_compression::compress(codec, &raw_body)?
536        };
537
538        // 3. batch_length = HEADER_TAIL_LEN + body_len
539        let batch_length = HEADER_TAIL_LEN
540            + i32::try_from(body.len())
541                .map_err(|_| RecordsError::RecordParse("body length exceeds i32".into()))?;
542
543        // 4. Build the CRC-covered header portion (attributes through records_count = 40 bytes).
544        let mut covered = BytesMut::with_capacity(40);
545        covered.put_i16(self.attributes.0);
546        covered.put_i32(self.last_offset_delta);
547        covered.put_i64(self.base_timestamp);
548        covered.put_i64(self.max_timestamp);
549        covered.put_i64(self.producer_id);
550        covered.put_i16(self.producer_epoch);
551        covered.put_i32(self.base_sequence);
552        covered.put_i32(
553            i32::try_from(self.records.len())
554                .map_err(|_| RecordsError::RecordParse("records_count exceeds i32".into()))?,
555        );
556        let covered_head = covered.freeze();
557
558        // 5. Compute CRC over covered_head then body.
559        let mut crc = crc32c(&covered_head);
560        crc = crc32c_append(crc, &body);
561
562        // 6. Emit the full header then body.
563        buf.put_i64(self.base_offset);
564        buf.put_i32(batch_length);
565        buf.put_i32(self.partition_leader_epoch);
566        buf.put_i8(2); // magic v2
567        buf.put_u32(crc);
568        buf.put_slice(&covered_head);
569        buf.put_slice(&body);
570        Ok(())
571    }
572
573    /// Predicted total bytes that `encode` will write (uncompressed; for
574    /// compressed batches the actual size will differ).
575    pub fn encoded_len(&self) -> usize {
576        let body: usize = self.records.iter().map(Record::encoded_len).sum();
577        HEADER_LEN + body
578    }
579}
580
581#[cfg(test)]
582mod batch_tests {
583    use super::*;
584    use assert2::assert;
585    use crabka_compression::CompressionType;
586
587    fn fixture_empty_batch() -> RecordBatch {
588        RecordBatch::default()
589    }
590
591    fn fixture_single_record_batch() -> RecordBatch {
592        RecordBatch {
593            records: vec![Record {
594                key: Some(Bytes::from_static(b"k1")),
595                value: Some(Bytes::from_static(b"v1")),
596                ..Default::default()
597            }],
598            ..RecordBatch::default()
599        }
600    }
601
602    fn fixture_multi_record_batch() -> RecordBatch {
603        RecordBatch {
604            base_offset: 42,
605            partition_leader_epoch: 5,
606            last_offset_delta: 2,
607            base_timestamp: 1_700_000_000,
608            max_timestamp: 1_700_000_500,
609            producer_id: 100,
610            producer_epoch: 3,
611            base_sequence: 7,
612            records: vec![
613                Record {
614                    offset_delta: 0,
615                    timestamp_delta: 0,
616                    key: Some(Bytes::from_static(b"a")),
617                    value: Some(Bytes::from_static(b"1")),
618                    ..Default::default()
619                },
620                Record {
621                    offset_delta: 1,
622                    timestamp_delta: 100,
623                    key: Some(Bytes::from_static(b"b")),
624                    value: Some(Bytes::from_static(b"2")),
625                    ..Default::default()
626                },
627                Record {
628                    offset_delta: 2,
629                    timestamp_delta: 500,
630                    key: None,
631                    value: Some(Bytes::from_static(b"3")),
632                    headers: vec![RecordHeader {
633                        key: "h".to_string(),
634                        value: Some(Bytes::from_static(b"hv")),
635                    }],
636                    ..Default::default()
637                },
638            ],
639            ..RecordBatch::default()
640        }
641    }
642
643    macro_rules! roundtrip_uncompressed {
644        ($name:ident, $fixture:ident) => {
645            #[test]
646            fn $name() {
647                let mut b = $fixture();
648                b.attributes = b.attributes.with_compression(CompressionType::None);
649
650                let mut buf = BytesMut::new();
651                b.encode(&mut buf).unwrap();
652                assert!(buf.len() == b.encoded_len());
653
654                let mut cur: &[u8] = &buf[..];
655                let decoded = RecordBatch::decode(&mut cur).unwrap();
656                assert!(decoded == b);
657                assert!(cur.is_empty());
658            }
659        };
660    }
661
662    roundtrip_uncompressed!(uncompressed_empty, fixture_empty_batch);
663    roundtrip_uncompressed!(uncompressed_single, fixture_single_record_batch);
664    roundtrip_uncompressed!(uncompressed_multi, fixture_multi_record_batch);
665
666    #[test]
667    fn rejects_pre_v2_magic() {
668        let mut buf = BytesMut::new();
669        buf.put_i64(0); // base_offset
670        buf.put_i32(49); // batch_length
671        buf.put_i32(0); // partition_leader_epoch
672        buf.put_i8(1); // magic = 1 (v1, deprecated)
673        buf.put_u32(0); // crc (irrelevant; we reject on magic first)
674        for _ in 21..HEADER_LEN {
675            buf.put_u8(0);
676        }
677        let mut cur: &[u8] = &buf[..];
678        assert!(matches!(
679            RecordBatch::decode(&mut cur),
680            Err(RecordsError::UnsupportedMagic { found: 1 })
681        ));
682    }
683
684    #[test]
685    fn rejects_bad_crc() {
686        let b = fixture_single_record_batch();
687        let mut buf = BytesMut::new();
688        b.encode(&mut buf).unwrap();
689        // Corrupt the CRC bytes (offsets 17..21).
690        buf[17] ^= 0xFF;
691        let mut cur: &[u8] = &buf[..];
692        assert!(matches!(
693            RecordBatch::decode(&mut cur),
694            Err(RecordsError::CrcMismatch { .. })
695        ));
696    }
697
698    macro_rules! roundtrip_compressed {
699        ($name:ident, $codec:expr) => {
700            #[test]
701            fn $name() {
702                let mut b = fixture_multi_record_batch();
703                b.attributes = b.attributes.with_compression($codec);
704
705                let mut buf = BytesMut::new();
706                b.encode(&mut buf).unwrap();
707                let mut cur: &[u8] = &buf[..];
708                let decoded = RecordBatch::decode(&mut cur).unwrap();
709                assert!(decoded == b);
710                assert!(cur.is_empty());
711            }
712        };
713    }
714
715    roundtrip_compressed!(compressed_gzip, CompressionType::Gzip);
716    roundtrip_compressed!(compressed_snappy, CompressionType::Snappy);
717    roundtrip_compressed!(compressed_lz4, CompressionType::Lz4);
718    roundtrip_compressed!(compressed_zstd, CompressionType::Zstd);
719
720    #[test]
721    fn decode_huge_records_count_does_not_overallocate() {
722        // Encode an empty uncompressed batch, then overwrite records_count with
723        // an absurd value (~1 billion) and fix up the CRC. The capacity hint
724        // must be clamped to the (empty) decompressed body, and decode must
725        // fail cleanly on EOF rather than attempting a multi-GB allocation.
726        let mut b = fixture_empty_batch();
727        b.attributes = b.attributes.with_compression(CompressionType::None);
728        let mut buf = BytesMut::new();
729        b.encode(&mut buf).unwrap();
730
731        // records_count is the last 4 bytes of the fixed header.
732        let rc_off = HEADER_LEN - 4;
733        buf[rc_off..HEADER_LEN].copy_from_slice(&1_000_000_000i32.to_be_bytes());
734
735        // Recompute CRC over header[21..HEADER_LEN] + body (body is empty here).
736        let body = &buf[HEADER_LEN..];
737        let mut computed = crc32c(&buf[21..HEADER_LEN]);
738        computed = crc32c_append(computed, body);
739        buf[17..21].copy_from_slice(&computed.to_be_bytes());
740
741        let mut cur: &[u8] = &buf[..];
742        // Must return an Err (EOF reading the first record), not OOM.
743        assert!(RecordBatch::decode(&mut cur).is_err());
744    }
745}
746
747impl crate::Encode for RecordBatch {
748    fn encode<B: BufMut>(&self, buf: &mut B, _version: i16) -> Result<(), crate::ProtocolError> {
749        RecordBatch::encode(self, buf).map_err(Into::into)
750    }
751
752    fn encoded_len(&self, _version: i16) -> usize {
753        RecordBatch::encoded_len(self)
754    }
755}
756
757impl crate::Decode<'_> for RecordBatch {
758    fn decode<B: Buf>(buf: &mut B, _version: i16) -> Result<Self, crate::ProtocolError> {
759        RecordBatch::decode(buf).map_err(Into::into)
760    }
761}