Skip to main content

crabka_protocol/records/
payload.rs

1//! `RecordsPayload`: the wire-field type for any Kafka message whose schema
2//! declares a `records` field (`Fetch`, `Produce`, `FetchSnapshot`, `ShareFetch`).
3//!
4//! Kafka's records-field is "opaque bytes" at the protocol layer; the
5//! contents may be a v2 `RecordBatch` (current) or a v0/v1 `MessageSet`
6//! (legacy, used by old clients on down-conversion). The discriminator
7//! is the **magic byte at offset 16** of the first batch, which is at
8//! the same offset for both formats by coincidence of layout (v2:
9//! `base_offset+batch_length+leader_epoch+magic`; legacy: `offset+
10//! message_size+crc+magic`).
11//!
12//! Eagerly parsing the v2 form keeps the existing broker code paths
13//! unchanged: where they used `RecordBatch::encoded_len` and similar,
14//! they now call the equivalent method on `RecordsPayload`. Legacy
15//! payloads are kept as raw [`Bytes`] and round-tripped verbatim — the
16//! [`crabka-records-legacy`](../../records_legacy/index.html) crate
17//! provides the codec when an old client actually appears on the wire.
18
19use bytes::{Buf, BufMut, Bytes};
20
21use crate::records::RecordsError;
22use crate::records::borrowed::RecordBatch as RecordBatchBorrowed;
23use crate::records::owned::RecordBatch;
24
25/// Owned form of a records-field payload.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum RecordsPayload {
28    /// Zero or more parsed v2 batches (the records field is a *sequence*).
29    V2(Vec<RecordBatch>),
30    /// Verbatim, already-wire-format v2 bytes (one or more batches),
31    /// forwarded without parsing. Produced by the fetch pass-through path.
32    Raw(Bytes),
33    /// Opaque pre-v2 bytes (v0/v1 `MessageSet`). Decode with
34    /// `crabka_records_legacy::decode_message_set`.
35    Legacy(Bytes),
36}
37
38impl RecordsPayload {
39    /// Construct from raw records-field bytes. When the bytes look like v2,
40    /// decode *every* batch in the field; otherwise keep as opaque legacy.
41    pub fn from_bytes(bytes: Bytes) -> Result<Self, RecordsError> {
42        if looks_like_v2(&bytes) {
43            let mut cur: &[u8] = &bytes;
44            let mut batches = Vec::new();
45            while !cur.is_empty() {
46                batches.push(RecordBatch::decode(&mut cur)?);
47            }
48            Ok(Self::V2(batches))
49        } else {
50            Ok(Self::Legacy(bytes))
51        }
52    }
53
54    /// Wire size of the records-field bytes (no outer length prefix).
55    #[must_use]
56    pub fn payload_len(&self) -> usize {
57        match self {
58            Self::V2(batches) => batches.iter().map(RecordBatch::encoded_len).sum(),
59            Self::Raw(b) | Self::Legacy(b) => b.len(),
60        }
61    }
62
63    /// Write the payload bytes into `buf` (caller owns the outer framing).
64    pub fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
65        match self {
66            Self::V2(batches) => {
67                for b in batches {
68                    b.encode(buf)?;
69                }
70                Ok(())
71            }
72            Self::Raw(b) | Self::Legacy(b) => {
73                buf.put_slice(b);
74                Ok(())
75            }
76        }
77    }
78
79    /// Borrow the parsed v2 batches, if this is a parsed `V2` payload.
80    /// Returns `None` for `Raw` (intentionally unparsed) and `Legacy`.
81    #[must_use]
82    pub fn as_v2(&self) -> Option<&[RecordBatch]> {
83        match self {
84            Self::V2(batches) => Some(batches),
85            Self::Raw(_) | Self::Legacy(_) => None,
86        }
87    }
88
89    /// Borrow as raw legacy bytes, if that's what this payload is.
90    #[must_use]
91    pub fn as_legacy(&self) -> Option<&Bytes> {
92        match self {
93            Self::Legacy(b) => Some(b),
94            Self::V2(_) | Self::Raw(_) => None,
95        }
96    }
97
98    /// Decode a **response-side** records field, tolerating a truncated
99    /// trailing batch. Kafka returns a partial final `RecordBatch` when a
100    /// partition's fetch byte budget is hit mid-batch; the JVM consumer stops
101    /// at the first incomplete batch and re-fetches it from the next offset.
102    /// We mirror that: decode every complete batch, and on the first
103    /// `HeaderTooShort` / `BodyTooShort` stop and drop the remainder. A
104    /// *corrupt* complete batch (bad CRC/magic/content) still errors — leniency
105    /// forgives truncation only. Strict [`from_bytes`](Self::from_bytes) is
106    /// retained for Produce-request validation.
107    ///
108    /// Only `HeaderTooShort`/`BodyTooShort` are treated as truncation; a genuinely
109    /// invalid `batch_length` (`RecordParse`) is corruption and still errors —
110    /// legitimate Kafka truncation always preserves a valid `batch_length` prefix,
111    /// so it can only manifest as the too-short variants.
112    pub fn from_fetch_bytes(bytes: Bytes) -> Result<Self, RecordsError> {
113        if !looks_like_v2(&bytes) {
114            return Ok(Self::Legacy(bytes));
115        }
116        let mut cur: &[u8] = &bytes;
117        let mut batches = Vec::new();
118        while !cur.is_empty() {
119            match RecordBatch::decode(&mut cur) {
120                Ok(rb) => batches.push(rb),
121                Err(RecordsError::HeaderTooShort { .. } | RecordsError::BodyTooShort { .. }) => {
122                    break;
123                }
124                Err(e) => return Err(e),
125            }
126        }
127        Ok(Self::V2(batches))
128    }
129
130    /// `Decode`-shaped lenient entry point the generated codec calls for
131    /// records fields in **response** messages. Consumes the whole sliced
132    /// field buffer (the caller has already framed it) and parses leniently
133    /// via [`from_fetch_bytes`](Self::from_fetch_bytes).
134    pub fn decode_lenient<B: Buf>(
135        buf: &mut B,
136        _version: i16,
137    ) -> Result<Self, crate::ProtocolError> {
138        let bytes = buf.copy_to_bytes(buf.remaining());
139        Self::from_fetch_bytes(bytes).map_err(Into::into)
140    }
141}
142
143impl From<RecordBatch> for RecordsPayload {
144    fn from(rb: RecordBatch) -> Self {
145        Self::V2(vec![rb])
146    }
147}
148
149impl From<Vec<RecordBatch>> for RecordsPayload {
150    fn from(v: Vec<RecordBatch>) -> Self {
151        Self::V2(v)
152    }
153}
154
155impl Default for RecordsPayload {
156    fn default() -> Self {
157        Self::V2(Vec::new())
158    }
159}
160
161impl crate::Encode for RecordsPayload {
162    fn encode<B: BufMut>(&self, buf: &mut B, _version: i16) -> Result<(), crate::ProtocolError> {
163        self.encode_to(buf).map_err(Into::into)
164    }
165
166    fn encoded_len(&self, _version: i16) -> usize {
167        self.payload_len()
168    }
169}
170
171impl crate::Decode<'_> for RecordsPayload {
172    fn decode<B: Buf>(buf: &mut B, _version: i16) -> Result<Self, crate::ProtocolError> {
173        // The caller (generated codec) has already sliced the buffer to
174        // exactly the records-field bytes via `get_(nullable_)bytes_owned`,
175        // so we consume everything.
176        let bytes = buf.copy_to_bytes(buf.remaining());
177        Self::from_bytes(bytes).map_err(Into::into)
178    }
179}
180
181/// Borrowed form: zero-copy view into the input buffer.
182#[derive(Debug, Clone, PartialEq, Eq)]
183pub enum RecordsPayloadBorrowed<'a> {
184    V2(Vec<RecordBatchBorrowed<'a>>),
185    Legacy(&'a [u8]),
186}
187
188impl<'a> RecordsPayloadBorrowed<'a> {
189    pub fn from_slice(bytes: &'a [u8]) -> Result<Self, RecordsError> {
190        if looks_like_v2(bytes) {
191            let mut cur: &'a [u8] = bytes;
192            let mut batches = Vec::new();
193            while !cur.is_empty() {
194                let rb = <RecordBatchBorrowed<'a> as crate::DecodeBorrow<'a>>::decode_borrow(
195                    &mut cur, 0,
196                )
197                .map_err(|e| RecordsError::RecordParse(format!("borrowed v2 decode: {e}")))?;
198                batches.push(rb);
199            }
200            Ok(Self::V2(batches))
201        } else {
202            Ok(Self::Legacy(bytes))
203        }
204    }
205
206    #[must_use]
207    pub fn payload_len(&self) -> usize {
208        match self {
209            Self::V2(batches) => batches
210                .iter()
211                .map(|rb| crate::Encode::encoded_len(rb, 0))
212                .sum(),
213            Self::Legacy(b) => b.len(),
214        }
215    }
216
217    pub fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
218        match self {
219            Self::V2(batches) => {
220                for rb in batches {
221                    crate::Encode::encode(rb, buf, 0).map_err(|e| {
222                        RecordsError::RecordParse(format!("borrowed v2 encode: {e}"))
223                    })?;
224                }
225                Ok(())
226            }
227            Self::Legacy(b) => {
228                buf.put_slice(b);
229                Ok(())
230            }
231        }
232    }
233
234    /// Convert to the owned flavor, performing any necessary buffer copies.
235    pub fn to_owned(&self) -> Result<RecordsPayload, RecordsError> {
236        match self {
237            Self::V2(batches) => {
238                let mut owned = Vec::with_capacity(batches.len());
239                for rb in batches {
240                    owned.push(rb.to_owned()?);
241                }
242                Ok(RecordsPayload::V2(owned))
243            }
244            Self::Legacy(b) => Ok(RecordsPayload::Legacy(Bytes::copy_from_slice(b))),
245        }
246    }
247}
248
249impl Default for RecordsPayloadBorrowed<'_> {
250    fn default() -> Self {
251        Self::V2(Vec::new())
252    }
253}
254
255impl crate::Encode for RecordsPayloadBorrowed<'_> {
256    fn encode<B: BufMut>(&self, buf: &mut B, _version: i16) -> Result<(), crate::ProtocolError> {
257        self.encode_to(buf).map_err(Into::into)
258    }
259
260    fn encoded_len(&self, _version: i16) -> usize {
261        self.payload_len()
262    }
263}
264
265impl<'de> crate::DecodeBorrow<'de> for RecordsPayloadBorrowed<'de> {
266    fn decode_borrow(buf: &mut &'de [u8], _version: i16) -> Result<Self, crate::ProtocolError> {
267        // Caller has sliced the buffer to exactly the records-field bytes;
268        // consume everything by swapping in an empty tail.
269        let bytes = std::mem::take(buf);
270        Self::from_slice(bytes).map_err(Into::into)
271    }
272}
273
274/// True when `bytes` look like a v2 record batch (magic byte 2 at the
275/// well-known offset). v0 and v1 legacy `MessageSets` carry magic 0 or 1
276/// at the same offset, so this check distinguishes the two.
277///
278/// The threshold is `MAGIC_OFFSET + 1 = 17`, not the full `HEADER_LEN`:
279/// a truncated v2 batch still needs to land in the V2 arm so the
280/// downstream `RecordBatch::decode` can surface a precise error,
281/// rather than be silently misclassified as legacy.
282#[inline]
283fn looks_like_v2(bytes: &[u8]) -> bool {
284    // The magic byte sits at `base_offset(8) + batch_length(4) +
285    // partition_leader_epoch(4) = 16` in v2. Legacy MessageSets place
286    // the first message's magic byte at `offset(8) + message_size(4) +
287    // crc(4) = 16` — same index, different meaning.
288    const MAGIC_OFFSET: usize = 16;
289    bytes.len() > MAGIC_OFFSET && bytes[MAGIC_OFFSET] == 2
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use crate::records::{Record, RecordBatch};
296    use assert2::assert;
297    use bytes::BytesMut;
298
299    fn sample_v2() -> RecordBatch {
300        RecordBatch {
301            base_offset: 42,
302            records: vec![Record {
303                key: Some(Bytes::from_static(b"k")),
304                value: Some(Bytes::from_static(b"v")),
305                ..Default::default()
306            }],
307            ..RecordBatch::default()
308        }
309    }
310
311    #[test]
312    fn from_bytes_dispatches_v2() {
313        let rb = sample_v2();
314        let mut buf = BytesMut::new();
315        rb.encode(&mut buf).unwrap();
316        let p = RecordsPayload::from_bytes(buf.freeze()).unwrap();
317        match p {
318            RecordsPayload::V2(batches) => assert!(batches == vec![rb]),
319            RecordsPayload::Raw(_) | RecordsPayload::Legacy(_) => panic!("expected V2"),
320        }
321    }
322
323    #[test]
324    fn from_bytes_parses_all_batches() {
325        // Two v2 batches concatenated must both decode.
326        let mut b0 = sample_v2();
327        b0.base_offset = 0;
328        let mut b1 = sample_v2();
329        b1.base_offset = 1;
330        let mut buf = BytesMut::new();
331        b0.encode(&mut buf).unwrap();
332        b1.encode(&mut buf).unwrap();
333        let p = RecordsPayload::from_bytes(buf.freeze()).unwrap();
334        let batches = p.as_v2().expect("v2");
335        assert!(batches.len() == 2);
336        assert!(batches[0].base_offset == 0);
337        assert!(batches[1].base_offset == 1);
338    }
339
340    #[test]
341    fn raw_passthrough_roundtrips() {
342        let mut b = sample_v2();
343        b.base_offset = 7;
344        let mut wire = BytesMut::new();
345        b.encode(&mut wire).unwrap();
346        let wire = wire.freeze();
347        let p = RecordsPayload::Raw(wire.clone());
348        assert!(p.payload_len() == wire.len());
349        let mut out = BytesMut::new();
350        p.encode_to(&mut out).unwrap();
351        assert!(&out[..] == &wire[..]); // verbatim
352        assert!(p.as_v2().is_none()); // Raw is unparsed
353    }
354
355    #[test]
356    fn from_bytes_dispatches_legacy() {
357        // Build a fake legacy MessageSet entry: offset(8) + size(4) + crc(4) +
358        // magic(1)=1 + … . Just need byte 16 to be 1.
359        let mut buf = vec![0u8; 17];
360        buf[16] = 1;
361        let p = RecordsPayload::from_bytes(Bytes::from(buf.clone())).unwrap();
362        match p {
363            RecordsPayload::Legacy(b) => assert!(&b[..] == &buf[..]),
364            RecordsPayload::Raw(_) | RecordsPayload::V2(_) => panic!("expected Legacy"),
365        }
366    }
367
368    #[test]
369    fn roundtrip_v2() {
370        let p: RecordsPayload = sample_v2().into();
371        let mut buf = BytesMut::new();
372        p.encode_to(&mut buf).unwrap();
373        let back = RecordsPayload::from_bytes(buf.freeze()).unwrap();
374        assert!(p == back);
375        assert!(p.payload_len() == back.payload_len());
376    }
377
378    #[test]
379    fn encode_decode_via_traits() {
380        let p: RecordsPayload = sample_v2().into();
381        let mut buf = BytesMut::new();
382        <RecordsPayload as crate::Encode>::encode(&p, &mut buf, 0).unwrap();
383        let mut cur: &[u8] = &buf;
384        let back = <RecordsPayload as crate::Decode>::decode(&mut cur, 0).unwrap();
385        assert!(p == back);
386    }
387
388    #[test]
389    fn borrowed_dispatches() {
390        let rb = sample_v2();
391        let mut buf = BytesMut::new();
392        rb.encode(&mut buf).unwrap();
393        let frozen = buf.freeze();
394        let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
395        assert!(matches!(p, RecordsPayloadBorrowed::V2(_)));
396        let owned = p.to_owned().unwrap();
397        match owned {
398            RecordsPayload::V2(batches) => assert!(batches[0].base_offset == 42),
399            RecordsPayload::Raw(_) | RecordsPayload::Legacy(_) => panic!("expected V2"),
400        }
401    }
402
403    #[test]
404    fn from_record_batch() {
405        let rb = sample_v2();
406        let p: RecordsPayload = rb.clone().into();
407        assert!(p.as_v2() == Some(&[rb][..]));
408        assert!(p.as_legacy().is_none());
409    }
410
411    fn legacy_bytes() -> Bytes {
412        let mut buf = vec![0u8; 24];
413        buf[16] = 1;
414        for (i, b) in (b'a'..=b'h').enumerate() {
415            buf[17 + i % 7] = b;
416        }
417        Bytes::from(buf)
418    }
419
420    #[test]
421    fn legacy_payload_len_and_encode_owned() {
422        let bytes = legacy_bytes();
423        let p = RecordsPayload::from_bytes(bytes.clone()).unwrap();
424        assert!(p.payload_len() == bytes.len());
425        assert!(p.as_v2().is_none());
426        assert!(p.as_legacy() == Some(&bytes));
427
428        let mut out = BytesMut::new();
429        p.encode_to(&mut out).unwrap();
430        assert!(&out[..] == &bytes[..]);
431    }
432
433    #[test]
434    fn legacy_roundtrip_via_traits() {
435        let bytes = legacy_bytes();
436        let p = RecordsPayload::from_bytes(bytes.clone()).unwrap();
437        let mut buf = BytesMut::new();
438        <RecordsPayload as crate::Encode>::encode(&p, &mut buf, 0).unwrap();
439        assert!(<RecordsPayload as crate::Encode>::encoded_len(&p, 0) == bytes.len());
440        let mut cur: &[u8] = &buf;
441        let back = <RecordsPayload as crate::Decode>::decode(&mut cur, 0).unwrap();
442        assert!(matches!(back, RecordsPayload::Legacy(_)));
443        assert!(back.as_legacy().unwrap() == &bytes);
444    }
445
446    #[test]
447    fn owned_default_is_empty_v2() {
448        let p = RecordsPayload::default();
449        assert!(matches!(p, RecordsPayload::V2(ref v) if v.is_empty()));
450    }
451
452    #[test]
453    fn looks_like_v2_rejects_short_buffer() {
454        // Too short to peek the magic byte at offset 16; must fall through to Legacy.
455        let short = Bytes::from_static(&[0u8; 10]);
456        let p = RecordsPayload::from_bytes(short.clone()).unwrap();
457        assert!(p.as_legacy() == Some(&short));
458    }
459
460    #[test]
461    fn borrowed_legacy_roundtrip() {
462        let bytes = legacy_bytes();
463        let p = RecordsPayloadBorrowed::from_slice(&bytes).unwrap();
464        assert!(matches!(p, RecordsPayloadBorrowed::Legacy(_)));
465        assert!(p.payload_len() == bytes.len());
466
467        let mut out = BytesMut::new();
468        p.encode_to(&mut out).unwrap();
469        assert!(&out[..] == &bytes[..]);
470
471        let owned = p.to_owned().unwrap();
472        match owned {
473            RecordsPayload::Legacy(b) => assert!(&b[..] == &bytes[..]),
474            RecordsPayload::Raw(_) | RecordsPayload::V2(_) => panic!("expected Legacy"),
475        }
476    }
477
478    #[test]
479    fn borrowed_v2_payload_len_and_encode() {
480        let rb = sample_v2();
481        let mut buf = BytesMut::new();
482        rb.encode(&mut buf).unwrap();
483        let frozen = buf.freeze();
484        let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
485        assert!(p.payload_len() == frozen.len());
486
487        let mut out = BytesMut::new();
488        p.encode_to(&mut out).unwrap();
489        assert!(&out[..] == &frozen[..]);
490    }
491
492    #[test]
493    fn borrowed_encode_decode_via_traits() {
494        let rb = sample_v2();
495        let mut buf = BytesMut::new();
496        rb.encode(&mut buf).unwrap();
497        let frozen = buf.freeze();
498
499        let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
500        let mut out = BytesMut::new();
501        <RecordsPayloadBorrowed as crate::Encode>::encode(&p, &mut out, 0).unwrap();
502        assert!(<RecordsPayloadBorrowed as crate::Encode>::encoded_len(&p, 0) == frozen.len());
503
504        let mut cur: &[u8] = &out;
505        let back =
506            <RecordsPayloadBorrowed as crate::DecodeBorrow>::decode_borrow(&mut cur, 0).unwrap();
507        assert!(matches!(back, RecordsPayloadBorrowed::V2(_)));
508    }
509
510    #[test]
511    fn borrowed_default_is_empty_v2() {
512        let p = RecordsPayloadBorrowed::default();
513        assert!(matches!(p, RecordsPayloadBorrowed::V2(ref v) if v.is_empty()));
514    }
515
516    #[test]
517    fn from_fetch_bytes_drops_incomplete_trailing_batch() {
518        // Two complete batches followed by a truncated third (only a few
519        // bytes of its header). Kafka sends this when the partition byte
520        // budget cuts the final batch; the consumer must keep the two
521        // complete batches and drop the fragment.
522        let mut b0 = sample_v2();
523        b0.base_offset = 0;
524        let mut b1 = sample_v2();
525        b1.base_offset = 1;
526        let mut buf = BytesMut::new();
527        b0.encode(&mut buf).unwrap();
528        b1.encode(&mut buf).unwrap();
529        buf.extend_from_slice(&[0u8; 7]); // partial trailing batch header
530        let p = RecordsPayload::from_fetch_bytes(buf.freeze()).unwrap();
531        let batches = p.as_v2().expect("v2");
532        assert!(batches.len() == 2);
533        assert!(batches[0].base_offset == 0);
534        assert!(batches[1].base_offset == 1);
535    }
536
537    #[test]
538    fn from_fetch_bytes_still_errors_on_corrupt_batch() {
539        // A complete-looking batch whose CRC is wrong must still error, even
540        // leniently — leniency only forgives truncation, not corruption.
541        let rb = sample_v2();
542        let mut buf = BytesMut::new();
543        rb.encode(&mut buf).unwrap();
544        let mut bytes = buf.to_vec();
545        // Corrupt a body byte after the header (HEADER_LEN = 61) to break CRC.
546        bytes[61] ^= 0xFF;
547        let err = RecordsPayload::from_fetch_bytes(Bytes::from(bytes)).unwrap_err();
548        assert!(matches!(err, RecordsError::CrcMismatch { .. }));
549    }
550
551    #[test]
552    fn from_fetch_bytes_legacy_passes_through() {
553        let bytes = legacy_bytes();
554        let p = RecordsPayload::from_fetch_bytes(bytes.clone()).unwrap();
555        assert!(p.as_legacy() == Some(&bytes));
556    }
557
558    #[test]
559    fn from_fetch_bytes_empty_is_empty_v2() {
560        // Empty bytes do not carry a magic byte, so looks_like_v2 returns false
561        // and from_fetch_bytes yields an empty Legacy payload (no panic, no error).
562        let p = RecordsPayload::from_fetch_bytes(Bytes::new()).unwrap();
563        assert!(matches!(p, RecordsPayload::Legacy(ref b) if b.is_empty()));
564    }
565}