crabka_protocol/records/
payload.rs1use bytes::{Buf, BufMut, Bytes};
20
21use crate::records::RecordsError;
22use crate::records::borrowed::RecordBatch as RecordBatchBorrowed;
23use crate::records::owned::RecordBatch;
24
25#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum RecordsPayload {
28 V2(Vec<RecordBatch>),
30 Raw(Bytes),
33 Legacy(Bytes),
36}
37
38impl RecordsPayload {
39 pub fn from_bytes(bytes: Bytes) -> Result<Self, RecordsError> {
42 if looks_like_v2(&bytes) {
43 let mut cur: &[u8] = &bytes;
44 let mut batches = Vec::new();
45 while !cur.is_empty() {
46 batches.push(RecordBatch::decode(&mut cur)?);
47 }
48 Ok(Self::V2(batches))
49 } else {
50 Ok(Self::Legacy(bytes))
51 }
52 }
53
54 #[must_use]
56 pub fn payload_len(&self) -> usize {
57 match self {
58 Self::V2(batches) => batches.iter().map(RecordBatch::encoded_len).sum(),
59 Self::Raw(b) | Self::Legacy(b) => b.len(),
60 }
61 }
62
63 pub fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
65 match self {
66 Self::V2(batches) => {
67 for b in batches {
68 b.encode(buf)?;
69 }
70 Ok(())
71 }
72 Self::Raw(b) | Self::Legacy(b) => {
73 buf.put_slice(b);
74 Ok(())
75 }
76 }
77 }
78
79 #[must_use]
82 pub fn as_v2(&self) -> Option<&[RecordBatch]> {
83 match self {
84 Self::V2(batches) => Some(batches),
85 Self::Raw(_) | Self::Legacy(_) => None,
86 }
87 }
88
89 #[must_use]
91 pub fn as_legacy(&self) -> Option<&Bytes> {
92 match self {
93 Self::Legacy(b) => Some(b),
94 Self::V2(_) | Self::Raw(_) => None,
95 }
96 }
97
98 pub fn from_fetch_bytes(bytes: Bytes) -> Result<Self, RecordsError> {
113 if !looks_like_v2(&bytes) {
114 return Ok(Self::Legacy(bytes));
115 }
116 let mut cur: &[u8] = &bytes;
117 let mut batches = Vec::new();
118 while !cur.is_empty() {
119 match RecordBatch::decode(&mut cur) {
120 Ok(rb) => batches.push(rb),
121 Err(RecordsError::HeaderTooShort { .. } | RecordsError::BodyTooShort { .. }) => {
122 break;
123 }
124 Err(e) => return Err(e),
125 }
126 }
127 Ok(Self::V2(batches))
128 }
129
130 pub fn decode_lenient<B: Buf>(
135 buf: &mut B,
136 _version: i16,
137 ) -> Result<Self, crate::ProtocolError> {
138 let bytes = buf.copy_to_bytes(buf.remaining());
139 Self::from_fetch_bytes(bytes).map_err(Into::into)
140 }
141}
142
143impl From<RecordBatch> for RecordsPayload {
144 fn from(rb: RecordBatch) -> Self {
145 Self::V2(vec![rb])
146 }
147}
148
149impl From<Vec<RecordBatch>> for RecordsPayload {
150 fn from(v: Vec<RecordBatch>) -> Self {
151 Self::V2(v)
152 }
153}
154
155impl Default for RecordsPayload {
156 fn default() -> Self {
157 Self::V2(Vec::new())
158 }
159}
160
161impl crate::Encode for RecordsPayload {
162 fn encode<B: BufMut>(&self, buf: &mut B, _version: i16) -> Result<(), crate::ProtocolError> {
163 self.encode_to(buf).map_err(Into::into)
164 }
165
166 fn encoded_len(&self, _version: i16) -> usize {
167 self.payload_len()
168 }
169}
170
171impl crate::Decode<'_> for RecordsPayload {
172 fn decode<B: Buf>(buf: &mut B, _version: i16) -> Result<Self, crate::ProtocolError> {
173 let bytes = buf.copy_to_bytes(buf.remaining());
177 Self::from_bytes(bytes).map_err(Into::into)
178 }
179}
180
181#[derive(Debug, Clone, PartialEq, Eq)]
183pub enum RecordsPayloadBorrowed<'a> {
184 V2(Vec<RecordBatchBorrowed<'a>>),
185 Legacy(&'a [u8]),
186}
187
188impl<'a> RecordsPayloadBorrowed<'a> {
189 pub fn from_slice(bytes: &'a [u8]) -> Result<Self, RecordsError> {
190 if looks_like_v2(bytes) {
191 let mut cur: &'a [u8] = bytes;
192 let mut batches = Vec::new();
193 while !cur.is_empty() {
194 let rb = <RecordBatchBorrowed<'a> as crate::DecodeBorrow<'a>>::decode_borrow(
195 &mut cur, 0,
196 )
197 .map_err(|e| RecordsError::RecordParse(format!("borrowed v2 decode: {e}")))?;
198 batches.push(rb);
199 }
200 Ok(Self::V2(batches))
201 } else {
202 Ok(Self::Legacy(bytes))
203 }
204 }
205
206 #[must_use]
207 pub fn payload_len(&self) -> usize {
208 match self {
209 Self::V2(batches) => batches
210 .iter()
211 .map(|rb| crate::Encode::encoded_len(rb, 0))
212 .sum(),
213 Self::Legacy(b) => b.len(),
214 }
215 }
216
217 pub fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
218 match self {
219 Self::V2(batches) => {
220 for rb in batches {
221 crate::Encode::encode(rb, buf, 0).map_err(|e| {
222 RecordsError::RecordParse(format!("borrowed v2 encode: {e}"))
223 })?;
224 }
225 Ok(())
226 }
227 Self::Legacy(b) => {
228 buf.put_slice(b);
229 Ok(())
230 }
231 }
232 }
233
234 pub fn to_owned(&self) -> Result<RecordsPayload, RecordsError> {
236 match self {
237 Self::V2(batches) => {
238 let mut owned = Vec::with_capacity(batches.len());
239 for rb in batches {
240 owned.push(rb.to_owned()?);
241 }
242 Ok(RecordsPayload::V2(owned))
243 }
244 Self::Legacy(b) => Ok(RecordsPayload::Legacy(Bytes::copy_from_slice(b))),
245 }
246 }
247}
248
249impl Default for RecordsPayloadBorrowed<'_> {
250 fn default() -> Self {
251 Self::V2(Vec::new())
252 }
253}
254
255impl crate::Encode for RecordsPayloadBorrowed<'_> {
256 fn encode<B: BufMut>(&self, buf: &mut B, _version: i16) -> Result<(), crate::ProtocolError> {
257 self.encode_to(buf).map_err(Into::into)
258 }
259
260 fn encoded_len(&self, _version: i16) -> usize {
261 self.payload_len()
262 }
263}
264
265impl<'de> crate::DecodeBorrow<'de> for RecordsPayloadBorrowed<'de> {
266 fn decode_borrow(buf: &mut &'de [u8], _version: i16) -> Result<Self, crate::ProtocolError> {
267 let bytes = std::mem::take(buf);
270 Self::from_slice(bytes).map_err(Into::into)
271 }
272}
273
274#[inline]
283fn looks_like_v2(bytes: &[u8]) -> bool {
284 const MAGIC_OFFSET: usize = 16;
289 bytes.len() > MAGIC_OFFSET && bytes[MAGIC_OFFSET] == 2
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use crate::records::{Record, RecordBatch};
296 use assert2::assert;
297 use bytes::BytesMut;
298
299 fn sample_v2() -> RecordBatch {
300 RecordBatch {
301 base_offset: 42,
302 records: vec![Record {
303 key: Some(Bytes::from_static(b"k")),
304 value: Some(Bytes::from_static(b"v")),
305 ..Default::default()
306 }],
307 ..RecordBatch::default()
308 }
309 }
310
311 #[test]
312 fn from_bytes_dispatches_v2() {
313 let rb = sample_v2();
314 let mut buf = BytesMut::new();
315 rb.encode(&mut buf).unwrap();
316 let p = RecordsPayload::from_bytes(buf.freeze()).unwrap();
317 match p {
318 RecordsPayload::V2(batches) => assert!(batches == vec![rb]),
319 RecordsPayload::Raw(_) | RecordsPayload::Legacy(_) => panic!("expected V2"),
320 }
321 }
322
323 #[test]
324 fn from_bytes_parses_all_batches() {
325 let mut b0 = sample_v2();
327 b0.base_offset = 0;
328 let mut b1 = sample_v2();
329 b1.base_offset = 1;
330 let mut buf = BytesMut::new();
331 b0.encode(&mut buf).unwrap();
332 b1.encode(&mut buf).unwrap();
333 let p = RecordsPayload::from_bytes(buf.freeze()).unwrap();
334 let batches = p.as_v2().expect("v2");
335 assert!(batches.len() == 2);
336 assert!(batches[0].base_offset == 0);
337 assert!(batches[1].base_offset == 1);
338 }
339
340 #[test]
341 fn raw_passthrough_roundtrips() {
342 let mut b = sample_v2();
343 b.base_offset = 7;
344 let mut wire = BytesMut::new();
345 b.encode(&mut wire).unwrap();
346 let wire = wire.freeze();
347 let p = RecordsPayload::Raw(wire.clone());
348 assert!(p.payload_len() == wire.len());
349 let mut out = BytesMut::new();
350 p.encode_to(&mut out).unwrap();
351 assert!(&out[..] == &wire[..]); assert!(p.as_v2().is_none()); }
354
355 #[test]
356 fn from_bytes_dispatches_legacy() {
357 let mut buf = vec![0u8; 17];
360 buf[16] = 1;
361 let p = RecordsPayload::from_bytes(Bytes::from(buf.clone())).unwrap();
362 match p {
363 RecordsPayload::Legacy(b) => assert!(&b[..] == &buf[..]),
364 RecordsPayload::Raw(_) | RecordsPayload::V2(_) => panic!("expected Legacy"),
365 }
366 }
367
368 #[test]
369 fn roundtrip_v2() {
370 let p: RecordsPayload = sample_v2().into();
371 let mut buf = BytesMut::new();
372 p.encode_to(&mut buf).unwrap();
373 let back = RecordsPayload::from_bytes(buf.freeze()).unwrap();
374 assert!(p == back);
375 assert!(p.payload_len() == back.payload_len());
376 }
377
378 #[test]
379 fn encode_decode_via_traits() {
380 let p: RecordsPayload = sample_v2().into();
381 let mut buf = BytesMut::new();
382 <RecordsPayload as crate::Encode>::encode(&p, &mut buf, 0).unwrap();
383 let mut cur: &[u8] = &buf;
384 let back = <RecordsPayload as crate::Decode>::decode(&mut cur, 0).unwrap();
385 assert!(p == back);
386 }
387
388 #[test]
389 fn borrowed_dispatches() {
390 let rb = sample_v2();
391 let mut buf = BytesMut::new();
392 rb.encode(&mut buf).unwrap();
393 let frozen = buf.freeze();
394 let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
395 assert!(matches!(p, RecordsPayloadBorrowed::V2(_)));
396 let owned = p.to_owned().unwrap();
397 match owned {
398 RecordsPayload::V2(batches) => assert!(batches[0].base_offset == 42),
399 RecordsPayload::Raw(_) | RecordsPayload::Legacy(_) => panic!("expected V2"),
400 }
401 }
402
403 #[test]
404 fn from_record_batch() {
405 let rb = sample_v2();
406 let p: RecordsPayload = rb.clone().into();
407 assert!(p.as_v2() == Some(&[rb][..]));
408 assert!(p.as_legacy().is_none());
409 }
410
411 fn legacy_bytes() -> Bytes {
412 let mut buf = vec![0u8; 24];
413 buf[16] = 1;
414 for (i, b) in (b'a'..=b'h').enumerate() {
415 buf[17 + i % 7] = b;
416 }
417 Bytes::from(buf)
418 }
419
420 #[test]
421 fn legacy_payload_len_and_encode_owned() {
422 let bytes = legacy_bytes();
423 let p = RecordsPayload::from_bytes(bytes.clone()).unwrap();
424 assert!(p.payload_len() == bytes.len());
425 assert!(p.as_v2().is_none());
426 assert!(p.as_legacy() == Some(&bytes));
427
428 let mut out = BytesMut::new();
429 p.encode_to(&mut out).unwrap();
430 assert!(&out[..] == &bytes[..]);
431 }
432
433 #[test]
434 fn legacy_roundtrip_via_traits() {
435 let bytes = legacy_bytes();
436 let p = RecordsPayload::from_bytes(bytes.clone()).unwrap();
437 let mut buf = BytesMut::new();
438 <RecordsPayload as crate::Encode>::encode(&p, &mut buf, 0).unwrap();
439 assert!(<RecordsPayload as crate::Encode>::encoded_len(&p, 0) == bytes.len());
440 let mut cur: &[u8] = &buf;
441 let back = <RecordsPayload as crate::Decode>::decode(&mut cur, 0).unwrap();
442 assert!(matches!(back, RecordsPayload::Legacy(_)));
443 assert!(back.as_legacy().unwrap() == &bytes);
444 }
445
446 #[test]
447 fn owned_default_is_empty_v2() {
448 let p = RecordsPayload::default();
449 assert!(matches!(p, RecordsPayload::V2(ref v) if v.is_empty()));
450 }
451
452 #[test]
453 fn looks_like_v2_rejects_short_buffer() {
454 let short = Bytes::from_static(&[0u8; 10]);
456 let p = RecordsPayload::from_bytes(short.clone()).unwrap();
457 assert!(p.as_legacy() == Some(&short));
458 }
459
460 #[test]
461 fn borrowed_legacy_roundtrip() {
462 let bytes = legacy_bytes();
463 let p = RecordsPayloadBorrowed::from_slice(&bytes).unwrap();
464 assert!(matches!(p, RecordsPayloadBorrowed::Legacy(_)));
465 assert!(p.payload_len() == bytes.len());
466
467 let mut out = BytesMut::new();
468 p.encode_to(&mut out).unwrap();
469 assert!(&out[..] == &bytes[..]);
470
471 let owned = p.to_owned().unwrap();
472 match owned {
473 RecordsPayload::Legacy(b) => assert!(&b[..] == &bytes[..]),
474 RecordsPayload::Raw(_) | RecordsPayload::V2(_) => panic!("expected Legacy"),
475 }
476 }
477
478 #[test]
479 fn borrowed_v2_payload_len_and_encode() {
480 let rb = sample_v2();
481 let mut buf = BytesMut::new();
482 rb.encode(&mut buf).unwrap();
483 let frozen = buf.freeze();
484 let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
485 assert!(p.payload_len() == frozen.len());
486
487 let mut out = BytesMut::new();
488 p.encode_to(&mut out).unwrap();
489 assert!(&out[..] == &frozen[..]);
490 }
491
492 #[test]
493 fn borrowed_encode_decode_via_traits() {
494 let rb = sample_v2();
495 let mut buf = BytesMut::new();
496 rb.encode(&mut buf).unwrap();
497 let frozen = buf.freeze();
498
499 let p = RecordsPayloadBorrowed::from_slice(&frozen).unwrap();
500 let mut out = BytesMut::new();
501 <RecordsPayloadBorrowed as crate::Encode>::encode(&p, &mut out, 0).unwrap();
502 assert!(<RecordsPayloadBorrowed as crate::Encode>::encoded_len(&p, 0) == frozen.len());
503
504 let mut cur: &[u8] = &out;
505 let back =
506 <RecordsPayloadBorrowed as crate::DecodeBorrow>::decode_borrow(&mut cur, 0).unwrap();
507 assert!(matches!(back, RecordsPayloadBorrowed::V2(_)));
508 }
509
510 #[test]
511 fn borrowed_default_is_empty_v2() {
512 let p = RecordsPayloadBorrowed::default();
513 assert!(matches!(p, RecordsPayloadBorrowed::V2(ref v) if v.is_empty()));
514 }
515
516 #[test]
517 fn from_fetch_bytes_drops_incomplete_trailing_batch() {
518 let mut b0 = sample_v2();
523 b0.base_offset = 0;
524 let mut b1 = sample_v2();
525 b1.base_offset = 1;
526 let mut buf = BytesMut::new();
527 b0.encode(&mut buf).unwrap();
528 b1.encode(&mut buf).unwrap();
529 buf.extend_from_slice(&[0u8; 7]); let p = RecordsPayload::from_fetch_bytes(buf.freeze()).unwrap();
531 let batches = p.as_v2().expect("v2");
532 assert!(batches.len() == 2);
533 assert!(batches[0].base_offset == 0);
534 assert!(batches[1].base_offset == 1);
535 }
536
537 #[test]
538 fn from_fetch_bytes_still_errors_on_corrupt_batch() {
539 let rb = sample_v2();
542 let mut buf = BytesMut::new();
543 rb.encode(&mut buf).unwrap();
544 let mut bytes = buf.to_vec();
545 bytes[61] ^= 0xFF;
547 let err = RecordsPayload::from_fetch_bytes(Bytes::from(bytes)).unwrap_err();
548 assert!(matches!(err, RecordsError::CrcMismatch { .. }));
549 }
550
551 #[test]
552 fn from_fetch_bytes_legacy_passes_through() {
553 let bytes = legacy_bytes();
554 let p = RecordsPayload::from_fetch_bytes(bytes.clone()).unwrap();
555 assert!(p.as_legacy() == Some(&bytes));
556 }
557
558 #[test]
559 fn from_fetch_bytes_empty_is_empty_v2() {
560 let p = RecordsPayload::from_fetch_bytes(Bytes::new()).unwrap();
563 assert!(matches!(p, RecordsPayload::Legacy(ref b) if b.is_empty()));
564 }
565}