Skip to main content

crabka_protocol/records/
payload.rs

1//! `RecordsPayload`: the wire-field type for any Kafka message whose schema
2//! declares a `records` field (`Fetch`, `Produce`, `FetchSnapshot`, `ShareFetch`).
3//!
4//! Kafka's records-field is "opaque bytes" at the protocol layer; the
5//! contents may be a v2 `RecordBatch` (current) or a v0/v1 `MessageSet`
6//! (legacy, used by old clients on down-conversion). The discriminator
7//! is the **magic byte at offset 16** of the first batch, which is at
8//! the same offset for both formats by coincidence of layout (v2:
9//! `base_offset+batch_length+leader_epoch+magic`; legacy: `offset+
10//! message_size+crc+magic`).
11//!
12//! Eagerly parsing the v2 form keeps the existing broker code paths
13//! unchanged: where they used `RecordBatch::encoded_len` and similar,
14//! they now call the equivalent method on `RecordsPayload`. Legacy
15//! payloads are kept as raw [`Bytes`] and round-tripped verbatim — the
16//! [`crabka-records-legacy`](../../records_legacy/index.html) crate
17//! provides the codec when an old client actually appears on the wire.
18
19use bytes::{Buf, BufMut, Bytes};
20
21use crate::records::RecordsError;
22use crate::records::borrowed::RecordBatch as RecordBatchBorrowed;
23use crate::records::owned::RecordBatch;
24
25/// Owned form of a records-field payload.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum RecordsPayload {
28    /// Zero or more parsed v2 batches (the records field is a *sequence*).
29    V2(Vec<RecordBatch>),
30    /// Verbatim, already-wire-format v2 bytes (one or more batches),
31    /// forwarded without parsing. Produced by the fetch pass-through path.
32    Raw(Bytes),
33    /// Opaque pre-v2 bytes (v0/v1 `MessageSet`). Decode with
34    /// `crabka_records_legacy::decode_message_set`.
35    Legacy(Bytes),
36}
37
38impl RecordsPayload {
39    /// Construct from raw records-field bytes. When the bytes look like v2,
40    /// decode *every* batch in the field; otherwise keep as opaque legacy.
41    pub fn from_bytes(bytes: Bytes) -> Result<Self, RecordsError> {
42        if looks_like_v2(&bytes) {
43            let mut cur: &[u8] = &bytes;
44            let mut batches = Vec::new();
45            while !cur.is_empty() {
46                batches.push(RecordBatch::decode(&mut cur)?);
47            }
48            Ok(Self::V2(batches))
49        } else {
50            Ok(Self::Legacy(bytes))
51        }
52    }
53
54    /// Wire size of the records-field bytes (no outer length prefix).
55    #[must_use]
56    pub fn payload_len(&self) -> usize {
57        match self {
58            Self::V2(batches) => batches.iter().map(RecordBatch::encoded_len).sum(),
59            Self::Raw(b) | Self::Legacy(b) => b.len(),
60        }
61    }
62
63    /// Write the payload bytes into `buf` (caller owns the outer framing).
64    pub fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
65        match self {
66            Self::V2(batches) => {
67                for b in batches {
68                    b.encode(buf)?;
69                }
70                Ok(())
71            }
72            Self::Raw(b) | Self::Legacy(b) => {
73                buf.put_slice(b);
74                Ok(())
75            }
76        }
77    }
78
79    /// Borrow the parsed v2 batches, if this is a parsed `V2` payload.
80    /// Returns `None` for `Raw` (intentionally unparsed) and `Legacy`.
81    #[must_use]
82    pub fn as_v2(&self) -> Option<&[RecordBatch]> {
83        match self {
84            Self::V2(batches) => Some(batches),
85            Self::Raw(_) | Self::Legacy(_) => None,
86        }
87    }
88
89    /// Borrow as raw legacy bytes, if that's what this payload is.
90    #[must_use]
91    pub fn as_legacy(&self) -> Option<&Bytes> {
92        match self {
93            Self::Legacy(b) => Some(b),
94            Self::V2(_) | Self::Raw(_) => None,
95        }
96    }
97}
98
99impl From<RecordBatch> for RecordsPayload {
100    fn from(rb: RecordBatch) -> Self {
101        Self::V2(vec![rb])
102    }
103}
104
105impl From<Vec<RecordBatch>> for RecordsPayload {
106    fn from(v: Vec<RecordBatch>) -> Self {
107        Self::V2(v)
108    }
109}
110
111impl Default for RecordsPayload {
112    fn default() -> Self {
113        Self::V2(Vec::new())
114    }
115}
116
117impl crate::Encode for RecordsPayload {
118    fn encode<B: BufMut>(&self, buf: &mut B, _version: i16) -> Result<(), crate::ProtocolError> {
119        self.encode_to(buf).map_err(Into::into)
120    }
121
122    fn encoded_len(&self, _version: i16) -> usize {
123        self.payload_len()
124    }
125}
126
127impl crate::Decode<'_> for RecordsPayload {
128    fn decode<B: Buf>(buf: &mut B, _version: i16) -> Result<Self, crate::ProtocolError> {
129        // The caller (generated codec) has already sliced the buffer to
130        // exactly the records-field bytes via `get_(nullable_)bytes_owned`,
131        // so we consume everything.
132        let bytes = buf.copy_to_bytes(buf.remaining());
133        Self::from_bytes(bytes).map_err(Into::into)
134    }
135}
136
137/// Borrowed form: zero-copy view into the input buffer.
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub enum RecordsPayloadBorrowed<'a> {
140    V2(Vec<RecordBatchBorrowed<'a>>),
141    Legacy(&'a [u8]),
142}
143
144impl<'a> RecordsPayloadBorrowed<'a> {
145    pub fn from_slice(bytes: &'a [u8]) -> Result<Self, RecordsError> {
146        if looks_like_v2(bytes) {
147            let mut cur: &'a [u8] = bytes;
148            let mut batches = Vec::new();
149            while !cur.is_empty() {
150                let rb = <RecordBatchBorrowed<'a> as crate::DecodeBorrow<'a>>::decode_borrow(
151                    &mut cur, 0,
152                )
153                .map_err(|e| RecordsError::RecordParse(format!("borrowed v2 decode: {e}")))?;
154                batches.push(rb);
155            }
156            Ok(Self::V2(batches))
157        } else {
158            Ok(Self::Legacy(bytes))
159        }
160    }
161
162    #[must_use]
163    pub fn payload_len(&self) -> usize {
164        match self {
165            Self::V2(batches) => batches
166                .iter()
167                .map(|rb| crate::Encode::encoded_len(rb, 0))
168                .sum(),
169            Self::Legacy(b) => b.len(),
170        }
171    }
172
173    pub fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
174        match self {
175            Self::V2(batches) => {
176                for rb in batches {
177                    crate::Encode::encode(rb, buf, 0).map_err(|e| {
178                        RecordsError::RecordParse(format!("borrowed v2 encode: {e}"))
179                    })?;
180                }
181                Ok(())
182            }
183            Self::Legacy(b) => {
184                buf.put_slice(b);
185                Ok(())
186            }
187        }
188    }
189
190    /// Convert to the owned flavor, performing any necessary buffer copies.
191    pub fn to_owned(&self) -> Result<RecordsPayload, RecordsError> {
192        match self {
193            Self::V2(batches) => {
194                let mut owned = Vec::with_capacity(batches.len());
195                for rb in batches {
196                    owned.push(rb.to_owned()?);
197                }
198                Ok(RecordsPayload::V2(owned))
199            }
200            Self::Legacy(b) => Ok(RecordsPayload::Legacy(Bytes::copy_from_slice(b))),
201        }
202    }
203}
204
205impl Default for RecordsPayloadBorrowed<'_> {
206    fn default() -> Self {
207        Self::V2(Vec::new())
208    }
209}
210
211impl crate::Encode for RecordsPayloadBorrowed<'_> {
212    fn encode<B: BufMut>(&self, buf: &mut B, _version: i16) -> Result<(), crate::ProtocolError> {
213        self.encode_to(buf).map_err(Into::into)
214    }
215
216    fn encoded_len(&self, _version: i16) -> usize {
217        self.payload_len()
218    }
219}
220
221impl<'de> crate::DecodeBorrow<'de> for RecordsPayloadBorrowed<'de> {
222    fn decode_borrow(buf: &mut &'de [u8], _version: i16) -> Result<Self, crate::ProtocolError> {
223        // Caller has sliced the buffer to exactly the records-field bytes;
224        // consume everything by swapping in an empty tail.
225        let bytes = std::mem::take(buf);
226        Self::from_slice(bytes).map_err(Into::into)
227    }
228}
229
230/// True when `bytes` look like a v2 record batch (magic byte 2 at the
231/// well-known offset). v0 and v1 legacy `MessageSets` carry magic 0 or 1
232/// at the same offset, so this check distinguishes the two.
233///
234/// The threshold is `MAGIC_OFFSET + 1 = 17`, not the full `HEADER_LEN`:
235/// a truncated v2 batch still needs to land in the V2 arm so the
236/// downstream `RecordBatch::decode` can surface a precise error,
237/// rather than be silently misclassified as legacy.
238#[inline]
239fn looks_like_v2(bytes: &[u8]) -> bool {
240    // The magic byte sits at `base_offset(8) + batch_length(4) +
241    // partition_leader_epoch(4) = 16` in v2. Legacy MessageSets place
242    // the first message's magic byte at `offset(8) + message_size(4) +
243    // crc(4) = 16` — same index, different meaning.
244    const MAGIC_OFFSET: usize = 16;
245    bytes.len() > MAGIC_OFFSET && bytes[MAGIC_OFFSET] == 2
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use crate::records::{Record, RecordBatch};
252    use bytes::BytesMut;
253
254    fn sample_v2() -> RecordBatch {
255        RecordBatch {
256            base_offset: 42,
257            records: vec![Record {
258                key: Some(Bytes::from_static(b"k")),
259                value: Some(Bytes::from_static(b"v")),
260                ..Default::default()
261            }],
262            ..RecordBatch::default()
263        }
264    }
265
266    #[test]
267    fn from_bytes_dispatches_v2() {
268        let rb = sample_v2();
269        let mut buf = BytesMut::new();
270        rb.encode(&mut buf).unwrap();
271        let p = RecordsPayload::from_bytes(buf.freeze()).unwrap();
272        match p {
273            RecordsPayload::V2(batches) => assert_eq!(batches, vec![rb]),
274            RecordsPayload::Raw(_) | RecordsPayload::Legacy(_) => panic!("expected V2"),
275        }
276    }
277
278    #[test]
279    fn from_bytes_parses_all_batches() {
280        // Two v2 batches concatenated must both decode.
281        let mut b0 = sample_v2();
282        b0.base_offset = 0;
283        let mut b1 = sample_v2();
284        b1.base_offset = 1;
285        let mut buf = BytesMut::new();
286        b0.encode(&mut buf).unwrap();
287        b1.encode(&mut buf).unwrap();
288        let p = RecordsPayload::from_bytes(buf.freeze()).unwrap();
289        let batches = p.as_v2().expect("v2");
290        assert_eq!(batches.len(), 2);
291        assert_eq!(batches[0].base_offset, 0);
292        assert_eq!(batches[1].base_offset, 1);
293    }
294
295    #[test]
296    fn raw_passthrough_roundtrips() {
297        let mut b = sample_v2();
298        b.base_offset = 7;
299        let mut wire = BytesMut::new();
300        b.encode(&mut wire).unwrap();
301        let wire = wire.freeze();
302        let p = RecordsPayload::Raw(wire.clone());
303        assert_eq!(p.payload_len(), wire.len());
304        let mut out = BytesMut::new();
305        p.encode_to(&mut out).unwrap();
306        assert_eq!(&out[..], &wire[..]); // verbatim
307        assert!(p.as_v2().is_none()); // Raw is unparsed
308    }
309
310    #[test]
311    fn from_bytes_dispatches_legacy() {
312        // Build a fake legacy MessageSet entry: offset(8) + size(4) + crc(4) +
313        // magic(1)=1 + … . Just need byte 16 to be 1.
314        let mut buf = vec![0u8; 17];
315        buf[16] = 1;
316        let p = RecordsPayload::from_bytes(Bytes::from(buf.clone())).unwrap();
317        match p {
318            RecordsPayload::Legacy(b) => assert_eq!(&b[..], &buf[..]),
319            RecordsPayload::Raw(_) | RecordsPayload::V2(_) => panic!("expected Legacy"),
320        }
321    }
322
323    #[test]
324    fn roundtrip_v2() {
325        let p: RecordsPayload = sample_v2().into();
326        let mut buf = BytesMut::new();
327        p.encode_to(&mut buf).unwrap();
328        let back = RecordsPayload::from_bytes(buf.freeze()).unwrap();
329        assert_eq!(p, back);
330        assert_eq!(p.payload_len(), back.payload_len());
331    }
332
333    #[test]
334    fn encode_decode_via_traits() {
335        let p: RecordsPayload = sample_v2().into();
336        let mut buf = BytesMut::new();
337        <RecordsPayload as crate::Encode>::encode(&p, &mut buf, 0).unwrap();
338        let mut cur: &[u8] = &buf;
339        let back = <RecordsPayload as crate::Decode>::decode(&mut cur, 0).unwrap();
340        assert_eq!(p, back);
341    }
342
343    #[test]
344    fn borrowed_dispatches() {
345        let rb = sample_v2();
346        let mut buf = BytesMut::new();
347        rb.encode(&mut buf).unwrap();
348        let frozen = buf.freeze();
349        let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
350        assert!(matches!(p, RecordsPayloadBorrowed::V2(_)));
351        let owned = p.to_owned().unwrap();
352        match owned {
353            RecordsPayload::V2(batches) => assert_eq!(batches[0].base_offset, 42),
354            RecordsPayload::Raw(_) | RecordsPayload::Legacy(_) => panic!("expected V2"),
355        }
356    }
357
358    #[test]
359    fn from_record_batch() {
360        let rb = sample_v2();
361        let p: RecordsPayload = rb.clone().into();
362        assert_eq!(p.as_v2(), Some(&[rb][..]));
363        assert!(p.as_legacy().is_none());
364    }
365
366    fn legacy_bytes() -> Bytes {
367        let mut buf = vec![0u8; 24];
368        buf[16] = 1;
369        for (i, b) in (b'a'..=b'h').enumerate() {
370            buf[17 + i % 7] = b;
371        }
372        Bytes::from(buf)
373    }
374
375    #[test]
376    fn legacy_payload_len_and_encode_owned() {
377        let bytes = legacy_bytes();
378        let p = RecordsPayload::from_bytes(bytes.clone()).unwrap();
379        assert_eq!(p.payload_len(), bytes.len());
380        assert!(p.as_v2().is_none());
381        assert_eq!(p.as_legacy(), Some(&bytes));
382
383        let mut out = BytesMut::new();
384        p.encode_to(&mut out).unwrap();
385        assert_eq!(&out[..], &bytes[..]);
386    }
387
388    #[test]
389    fn legacy_roundtrip_via_traits() {
390        let bytes = legacy_bytes();
391        let p = RecordsPayload::from_bytes(bytes.clone()).unwrap();
392        let mut buf = BytesMut::new();
393        <RecordsPayload as crate::Encode>::encode(&p, &mut buf, 0).unwrap();
394        assert_eq!(
395            <RecordsPayload as crate::Encode>::encoded_len(&p, 0),
396            bytes.len()
397        );
398        let mut cur: &[u8] = &buf;
399        let back = <RecordsPayload as crate::Decode>::decode(&mut cur, 0).unwrap();
400        assert!(matches!(back, RecordsPayload::Legacy(_)));
401        assert_eq!(back.as_legacy().unwrap(), &bytes);
402    }
403
404    #[test]
405    fn owned_default_is_empty_v2() {
406        let p = RecordsPayload::default();
407        assert!(matches!(p, RecordsPayload::V2(ref v) if v.is_empty()));
408    }
409
410    #[test]
411    fn looks_like_v2_rejects_short_buffer() {
412        // Too short to peek the magic byte at offset 16; must fall through to Legacy.
413        let short = Bytes::from_static(&[0u8; 10]);
414        let p = RecordsPayload::from_bytes(short.clone()).unwrap();
415        assert_eq!(p.as_legacy(), Some(&short));
416    }
417
418    #[test]
419    fn borrowed_legacy_roundtrip() {
420        let bytes = legacy_bytes();
421        let p = RecordsPayloadBorrowed::from_slice(&bytes).unwrap();
422        assert!(matches!(p, RecordsPayloadBorrowed::Legacy(_)));
423        assert_eq!(p.payload_len(), bytes.len());
424
425        let mut out = BytesMut::new();
426        p.encode_to(&mut out).unwrap();
427        assert_eq!(&out[..], &bytes[..]);
428
429        let owned = p.to_owned().unwrap();
430        match owned {
431            RecordsPayload::Legacy(b) => assert_eq!(&b[..], &bytes[..]),
432            RecordsPayload::Raw(_) | RecordsPayload::V2(_) => panic!("expected Legacy"),
433        }
434    }
435
436    #[test]
437    fn borrowed_v2_payload_len_and_encode() {
438        let rb = sample_v2();
439        let mut buf = BytesMut::new();
440        rb.encode(&mut buf).unwrap();
441        let frozen = buf.freeze();
442        let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
443        assert_eq!(p.payload_len(), frozen.len());
444
445        let mut out = BytesMut::new();
446        p.encode_to(&mut out).unwrap();
447        assert_eq!(&out[..], &frozen[..]);
448    }
449
450    #[test]
451    fn borrowed_encode_decode_via_traits() {
452        let rb = sample_v2();
453        let mut buf = BytesMut::new();
454        rb.encode(&mut buf).unwrap();
455        let frozen = buf.freeze();
456
457        let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
458        let mut out = BytesMut::new();
459        <RecordsPayloadBorrowed as crate::Encode>::encode(&p, &mut out, 0).unwrap();
460        assert_eq!(
461            <RecordsPayloadBorrowed as crate::Encode>::encoded_len(&p, 0),
462            frozen.len()
463        );
464
465        let mut cur: &[u8] = &out;
466        let back =
467            <RecordsPayloadBorrowed as crate::DecodeBorrow>::decode_borrow(&mut cur, 0).unwrap();
468        assert!(matches!(back, RecordsPayloadBorrowed::V2(_)));
469    }
470
471    #[test]
472    fn borrowed_default_is_empty_v2() {
473        let p = RecordsPayloadBorrowed::default();
474        assert!(matches!(p, RecordsPayloadBorrowed::V2(ref v) if v.is_empty()));
475    }
476}