1use bytes::{Buf, BufMut, Bytes, BytesMut};
4use zerocopy::FromBytes as _;
5
6use crate::primitives::varint::{
7 get_varint, get_varlong, put_varint, put_varlong, varint_len, varlong_len,
8};
9use crate::records::RecordsError;
10use crate::records::crc::{crc32c, crc32c_append};
11use crate::records::header::{Attributes, HEADER_LEN};
12
13#[derive(Debug, Clone, PartialEq, Eq, Default)]
14pub struct RecordHeader {
15 pub key: String,
16 pub value: Option<Bytes>,
17}
18
19#[derive(Debug, Clone, PartialEq, Eq, Default)]
20pub struct Record {
21 pub attributes: i8,
22 pub timestamp_delta: i64,
23 pub offset_delta: i32,
24 pub key: Option<Bytes>,
25 pub value: Option<Bytes>,
26 pub headers: Vec<RecordHeader>,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct RecordBatch {
31 pub base_offset: i64,
32 pub partition_leader_epoch: i32,
33 pub attributes: Attributes,
34 pub last_offset_delta: i32,
35 pub base_timestamp: i64,
36 pub max_timestamp: i64,
37 pub producer_id: i64,
38 pub producer_epoch: i16,
39 pub base_sequence: i32,
40 pub records: Vec<Record>,
41}
42
43impl Default for RecordBatch {
44 fn default() -> Self {
45 Self {
46 base_offset: 0,
47 partition_leader_epoch: 0,
48 attributes: Attributes::default(),
49 last_offset_delta: 0,
50 base_timestamp: 0,
51 max_timestamp: 0,
52 producer_id: -1, producer_epoch: -1,
54 base_sequence: -1,
55 records: Vec::new(),
56 }
57 }
58}
59
60impl Record {
61 pub fn encode<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
63 let body_len = self.body_len();
64 put_varlong(
65 buf,
66 i64::try_from(body_len)
67 .map_err(|_| RecordsError::RecordParse("record body length overflow".into()))?,
68 );
69 self.encode_body(buf)
70 }
71
72 pub fn encoded_len(&self) -> usize {
74 let body = self.body_len();
75 #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
76 let body_i64 = body as i64;
77 varlong_len(body_i64) + body
78 }
79
80 fn body_len(&self) -> usize {
81 let mut n = 1; n += varlong_len(self.timestamp_delta);
83 n += varint_len(self.offset_delta);
84 n += match &self.key {
85 None => varint_len(-1),
86 Some(k) => varint_len(i32::try_from(k.len()).unwrap_or(i32::MAX)) + k.len(),
87 };
88 n += match &self.value {
89 None => varint_len(-1),
90 Some(v) => varint_len(i32::try_from(v.len()).unwrap_or(i32::MAX)) + v.len(),
91 };
92 n += varint_len(i32::try_from(self.headers.len()).unwrap_or(i32::MAX));
93 for h in &self.headers {
94 let key_bytes = h.key.as_bytes();
95 n += varint_len(i32::try_from(key_bytes.len()).unwrap_or(i32::MAX)) + key_bytes.len();
96 n += match &h.value {
97 None => varint_len(-1),
98 Some(v) => varint_len(i32::try_from(v.len()).unwrap_or(i32::MAX)) + v.len(),
99 };
100 }
101 n
102 }
103
104 fn encode_body<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
105 buf.put_i8(self.attributes);
106 put_varlong(buf, self.timestamp_delta);
107 put_varint(buf, self.offset_delta);
108 match &self.key {
109 None => put_varint(buf, -1),
110 Some(k) => {
111 put_varint(
112 buf,
113 i32::try_from(k.len()).map_err(|_| {
114 RecordsError::RecordParse("record key length overflow".into())
115 })?,
116 );
117 buf.put_slice(k);
118 }
119 }
120 match &self.value {
121 None => put_varint(buf, -1),
122 Some(v) => {
123 put_varint(
124 buf,
125 i32::try_from(v.len()).map_err(|_| {
126 RecordsError::RecordParse("record value length overflow".into())
127 })?,
128 );
129 buf.put_slice(v);
130 }
131 }
132 put_varint(
133 buf,
134 i32::try_from(self.headers.len())
135 .map_err(|_| RecordsError::RecordParse("record header count overflow".into()))?,
136 );
137 for h in &self.headers {
138 let key_bytes = h.key.as_bytes();
139 put_varint(
140 buf,
141 i32::try_from(key_bytes.len())
142 .map_err(|_| RecordsError::RecordParse("header key length overflow".into()))?,
143 );
144 buf.put_slice(key_bytes);
145 match &h.value {
146 None => put_varint(buf, -1),
147 Some(v) => {
148 put_varint(
149 buf,
150 i32::try_from(v.len()).map_err(|_| {
151 RecordsError::RecordParse("header value length overflow".into())
152 })?,
153 );
154 buf.put_slice(v);
155 }
156 }
157 }
158 Ok(())
159 }
160
161 pub fn decode<B: Buf>(buf: &mut B) -> Result<Self, RecordsError> {
164 let body_len = get_varlong(buf)
165 .map_err(|e| RecordsError::RecordParse(format!("record length: {e}")))?;
166 let body_len = usize::try_from(body_len).map_err(|_| {
167 RecordsError::RecordParse(format!("record length negative or too large: {body_len}"))
168 })?;
169 if buf.remaining() < body_len {
170 return Err(RecordsError::BodyTooShort {
171 needed: body_len - buf.remaining(),
172 });
173 }
174 let mut body = buf.take(body_len);
177 let r = Self::decode_body(&mut body)?;
178 if body.has_remaining() {
180 return Err(RecordsError::RecordParse(format!(
181 "trailing bytes inside record (left={})",
182 body.remaining()
183 )));
184 }
185 Ok(r)
186 }
187
188 fn decode_body<B: Buf>(buf: &mut B) -> Result<Self, RecordsError> {
189 if buf.remaining() == 0 {
190 return Err(RecordsError::RecordParse("record body empty".into()));
191 }
192 let attributes = buf.get_i8();
193 let timestamp_delta = get_varlong(buf)
194 .map_err(|e| RecordsError::RecordParse(format!("timestamp_delta: {e}")))?;
195 let offset_delta =
196 get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("offset_delta: {e}")))?;
197
198 let key = decode_nullable_bytes(buf, "key")?;
199 let value = decode_nullable_bytes(buf, "value")?;
200
201 let header_count =
202 get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("header_count: {e}")))?;
203 if header_count < 0 {
204 return Err(RecordsError::RecordParse(format!(
205 "negative header count {header_count}"
206 )));
207 }
208 #[allow(clippy::cast_sign_loss)] let header_count_usize = header_count as usize;
210 let mut headers = Vec::with_capacity(header_count_usize.min(buf.remaining()));
215 for i in 0..header_count {
216 headers.push(
217 decode_record_header(buf)
218 .map_err(|e| RecordsError::RecordParse(format!("header[{i}]: {e}")))?,
219 );
220 }
221
222 Ok(Self {
223 attributes,
224 timestamp_delta,
225 offset_delta,
226 key,
227 value,
228 headers,
229 })
230 }
231}
232
233fn decode_nullable_bytes<B: Buf>(buf: &mut B, label: &str) -> Result<Option<Bytes>, RecordsError> {
234 let len =
235 get_varint(buf).map_err(|e| RecordsError::RecordParse(format!("{label} length: {e}")))?;
236 if len < 0 {
237 Ok(None)
238 } else {
239 #[allow(clippy::cast_sign_loss)] let n = len as usize;
241 if buf.remaining() < n {
242 return Err(RecordsError::BodyTooShort {
243 needed: n - buf.remaining(),
244 });
245 }
246 let mut v = vec![0u8; n];
247 buf.copy_to_slice(&mut v);
248 Ok(Some(Bytes::from(v)))
249 }
250}
251
252fn decode_record_header<B: Buf>(buf: &mut B) -> Result<RecordHeader, String> {
253 let key_len = get_varint(buf).map_err(|e| format!("key length: {e}"))?;
254 if key_len < 0 {
255 return Err(format!("non-nullable key has negative length {key_len}"));
256 }
257 #[allow(clippy::cast_sign_loss)] let n = key_len as usize;
259 if buf.remaining() < n {
260 return Err(format!("key truncated (need {} more)", n - buf.remaining()));
261 }
262 let mut kv = vec![0u8; n];
263 buf.copy_to_slice(&mut kv);
264 let key = String::from_utf8(kv).map_err(|e| format!("key utf-8: {e}"))?;
265
266 let value_len = get_varint(buf).map_err(|e| format!("value length: {e}"))?;
267 let value = if value_len < 0 {
268 None
269 } else {
270 #[allow(clippy::cast_sign_loss)] let n = value_len as usize;
272 if buf.remaining() < n {
273 return Err(format!(
274 "value truncated (need {} more)",
275 n - buf.remaining()
276 ));
277 }
278 let mut vv = vec![0u8; n];
279 buf.copy_to_slice(&mut vv);
280 Some(Bytes::from(vv))
281 };
282
283 Ok(RecordHeader { key, value })
284}
285
286#[cfg(test)]
287mod record_tests {
288 use super::*;
289 use assert2::assert;
290 use bytes::BytesMut;
291
292 fn fixture_minimal_record() -> Record {
293 Record {
294 attributes: 0,
295 timestamp_delta: 0,
296 offset_delta: 0,
297 key: None,
298 value: None,
299 headers: vec![],
300 }
301 }
302
303 fn fixture_keyed_record() -> Record {
304 Record {
305 attributes: 0,
306 timestamp_delta: 17,
307 offset_delta: 2,
308 key: Some(Bytes::from_static(b"the-key")),
309 value: Some(Bytes::from_static(b"hello kafka")),
310 headers: vec![
311 RecordHeader {
312 key: "trace-id".to_string(),
313 value: Some(Bytes::from_static(b"abc")),
314 },
315 RecordHeader {
316 key: "null-val".to_string(),
317 value: None,
318 },
319 ],
320 }
321 }
322
323 fn fixture_large_payload_record() -> Record {
324 Record {
325 attributes: 0,
326 timestamp_delta: 1_000_000,
327 offset_delta: 999,
328 key: Some(Bytes::from(vec![b'k'; 128])),
329 value: Some(Bytes::from(vec![b'v'; 4096])),
330 headers: vec![],
331 }
332 }
333
334 macro_rules! roundtrip {
335 ($name:ident, $fixture:ident) => {
336 #[test]
337 fn $name() {
338 let r = $fixture();
339 let mut buf = BytesMut::new();
340 r.encode(&mut buf).unwrap();
341 assert!(buf.len() == r.encoded_len(), "predicted len mismatch");
342
343 let mut cur: &[u8] = &buf[..];
344 let decoded = Record::decode(&mut cur).unwrap();
345 assert!(decoded == r);
346 assert!(cur.is_empty(), "trailing bytes after decode");
347 }
348 };
349 }
350
351 roundtrip!(minimal, fixture_minimal_record);
352 roundtrip!(keyed_with_headers, fixture_keyed_record);
353 roundtrip!(large_payload, fixture_large_payload_record);
354
355 #[test]
356 fn decode_rejects_negative_header_count() {
357 let mut buf = BytesMut::new();
358 put_varlong(&mut buf, 6); buf.put_i8(0); put_varlong(&mut buf, 0); put_varint(&mut buf, 0); put_varint(&mut buf, -1); put_varint(&mut buf, -1); put_varint(&mut buf, -1); let mut cur: &[u8] = &buf[..];
369 match Record::decode(&mut cur) {
370 Err(RecordsError::RecordParse(msg)) => {
371 assert!(msg.contains("negative header count"), "got: {msg}");
372 }
373 other => panic!("expected RecordParse, got {other:?}"),
374 }
375 }
376
377 #[test]
378 fn decode_huge_header_count_does_not_overallocate() {
379 let mut inner = BytesMut::new();
384 inner.put_i8(0); put_varlong(&mut inner, 0); put_varint(&mut inner, 0); put_varint(&mut inner, -1); put_varint(&mut inner, -1); put_varint(&mut inner, 1_000_000_000); let mut buf = BytesMut::new();
392 put_varlong(&mut buf, i64::try_from(inner.len()).unwrap());
393 buf.extend_from_slice(&inner);
394
395 let mut cur: &[u8] = &buf[..];
396 assert!(Record::decode(&mut cur).is_err());
398 }
399}
400
401impl RecordBatch {
402 pub fn decode<B: Buf>(buf: &mut B) -> Result<Self, RecordsError> {
405 const HEADER_TAIL_LEN: i32 = 49;
411
412 if buf.remaining() < HEADER_LEN {
414 return Err(RecordsError::HeaderTooShort {
415 needed: HEADER_LEN - buf.remaining(),
416 });
417 }
418 let mut hdr_bytes = [0u8; HEADER_LEN];
420 buf.copy_to_slice(&mut hdr_bytes);
421
422 let hdr = crate::records::header::RecordBatchHeader::ref_from_bytes(&hdr_bytes[..])
423 .map_err(|_| RecordsError::ZerocopyFailure)?;
424
425 if hdr.magic != 2 {
426 return Err(RecordsError::UnsupportedMagic { found: hdr.magic });
427 }
428
429 let body_len = i32::checked_sub(hdr.batch_length.get(), HEADER_TAIL_LEN)
431 .and_then(|n| usize::try_from(n).ok())
432 .ok_or_else(|| {
433 RecordsError::RecordParse("negative or oversized batch_length".into())
434 })?;
435
436 if buf.remaining() < body_len {
437 return Err(RecordsError::BodyTooShort {
438 needed: body_len - buf.remaining(),
439 });
440 }
441
442 let mut body = vec![0u8; body_len];
444 buf.copy_to_slice(&mut body);
445
446 let expected_crc = hdr.crc.get();
449 let mut computed = crc32c(&hdr_bytes[21..HEADER_LEN]);
450 computed = crc32c_append(computed, &body);
451 if computed != expected_crc {
452 return Err(RecordsError::CrcMismatch {
453 expected: expected_crc,
454 computed,
455 });
456 }
457
458 let attributes = Attributes(hdr.attributes.get());
459 let codec = attributes.compression();
460
461 let body_for_records: Bytes = if codec == crabka_compression::CompressionType::None {
463 Bytes::from(body)
464 } else {
465 const DECOMPRESS_MIN_CAP: usize = 16 * 1024 * 1024; const DECOMPRESS_MAX_RATIO: usize = 100; const DECOMPRESS_ABSOLUTE_CEILING: usize = 1024 * 1024 * 1024; let max_output = body
472 .len()
473 .saturating_mul(DECOMPRESS_MAX_RATIO)
474 .clamp(DECOMPRESS_MIN_CAP, DECOMPRESS_ABSOLUTE_CEILING);
475 crabka_compression::decompress(codec, &body, max_output)?
476 };
477
478 let count = hdr.records_count.get();
480 if count < 0 {
481 return Err(RecordsError::RecordParse(format!(
482 "negative records_count {count}"
483 )));
484 }
485 let mut body_cur: &[u8] = &body_for_records[..];
486 #[allow(clippy::cast_sign_loss)] let mut records = Vec::with_capacity((count as usize).min(body_for_records.len()));
491 for i in 0..count {
492 records.push(
493 Record::decode(&mut body_cur)
494 .map_err(|e| RecordsError::RecordParse(format!("record[{i}]: {e}")))?,
495 );
496 }
497 if !body_cur.is_empty() {
498 return Err(RecordsError::RecordParse(format!(
499 "trailing bytes after records (left={})",
500 body_cur.len()
501 )));
502 }
503
504 Ok(Self {
505 base_offset: hdr.base_offset.get(),
506 partition_leader_epoch: hdr.partition_leader_epoch.get(),
507 attributes,
508 last_offset_delta: hdr.last_offset_delta.get(),
509 base_timestamp: hdr.base_timestamp.get(),
510 max_timestamp: hdr.max_timestamp.get(),
511 producer_id: hdr.producer_id.get(),
512 producer_epoch: hdr.producer_epoch.get(),
513 base_sequence: hdr.base_sequence.get(),
514 records,
515 })
516 }
517
518 pub fn encode<B: BufMut>(&self, buf: &mut B) -> Result<(), RecordsError> {
520 const HEADER_TAIL_LEN: i32 = 49;
521
522 let mut raw_body =
524 BytesMut::with_capacity(self.records.iter().map(Record::encoded_len).sum());
525 for r in &self.records {
526 r.encode(&mut raw_body)?;
527 }
528 let raw_body = raw_body.freeze();
529
530 let codec = self.attributes.compression();
532 let body: Bytes = if codec == crabka_compression::CompressionType::None {
533 raw_body
534 } else {
535 crabka_compression::compress(codec, &raw_body)?
536 };
537
538 let batch_length = HEADER_TAIL_LEN
540 + i32::try_from(body.len())
541 .map_err(|_| RecordsError::RecordParse("body length exceeds i32".into()))?;
542
543 let mut covered = BytesMut::with_capacity(40);
545 covered.put_i16(self.attributes.0);
546 covered.put_i32(self.last_offset_delta);
547 covered.put_i64(self.base_timestamp);
548 covered.put_i64(self.max_timestamp);
549 covered.put_i64(self.producer_id);
550 covered.put_i16(self.producer_epoch);
551 covered.put_i32(self.base_sequence);
552 covered.put_i32(
553 i32::try_from(self.records.len())
554 .map_err(|_| RecordsError::RecordParse("records_count exceeds i32".into()))?,
555 );
556 let covered_head = covered.freeze();
557
558 let mut crc = crc32c(&covered_head);
560 crc = crc32c_append(crc, &body);
561
562 buf.put_i64(self.base_offset);
564 buf.put_i32(batch_length);
565 buf.put_i32(self.partition_leader_epoch);
566 buf.put_i8(2); buf.put_u32(crc);
568 buf.put_slice(&covered_head);
569 buf.put_slice(&body);
570 Ok(())
571 }
572
573 pub fn encoded_len(&self) -> usize {
576 let body: usize = self.records.iter().map(Record::encoded_len).sum();
577 HEADER_LEN + body
578 }
579}
580
581#[cfg(test)]
582mod batch_tests {
583 use super::*;
584 use assert2::assert;
585 use crabka_compression::CompressionType;
586
587 fn fixture_empty_batch() -> RecordBatch {
588 RecordBatch::default()
589 }
590
591 fn fixture_single_record_batch() -> RecordBatch {
592 RecordBatch {
593 records: vec![Record {
594 key: Some(Bytes::from_static(b"k1")),
595 value: Some(Bytes::from_static(b"v1")),
596 ..Default::default()
597 }],
598 ..RecordBatch::default()
599 }
600 }
601
602 fn fixture_multi_record_batch() -> RecordBatch {
603 RecordBatch {
604 base_offset: 42,
605 partition_leader_epoch: 5,
606 last_offset_delta: 2,
607 base_timestamp: 1_700_000_000,
608 max_timestamp: 1_700_000_500,
609 producer_id: 100,
610 producer_epoch: 3,
611 base_sequence: 7,
612 records: vec![
613 Record {
614 offset_delta: 0,
615 timestamp_delta: 0,
616 key: Some(Bytes::from_static(b"a")),
617 value: Some(Bytes::from_static(b"1")),
618 ..Default::default()
619 },
620 Record {
621 offset_delta: 1,
622 timestamp_delta: 100,
623 key: Some(Bytes::from_static(b"b")),
624 value: Some(Bytes::from_static(b"2")),
625 ..Default::default()
626 },
627 Record {
628 offset_delta: 2,
629 timestamp_delta: 500,
630 key: None,
631 value: Some(Bytes::from_static(b"3")),
632 headers: vec![RecordHeader {
633 key: "h".to_string(),
634 value: Some(Bytes::from_static(b"hv")),
635 }],
636 ..Default::default()
637 },
638 ],
639 ..RecordBatch::default()
640 }
641 }
642
643 macro_rules! roundtrip_uncompressed {
644 ($name:ident, $fixture:ident) => {
645 #[test]
646 fn $name() {
647 let mut b = $fixture();
648 b.attributes = b.attributes.with_compression(CompressionType::None);
649
650 let mut buf = BytesMut::new();
651 b.encode(&mut buf).unwrap();
652 assert!(buf.len() == b.encoded_len());
653
654 let mut cur: &[u8] = &buf[..];
655 let decoded = RecordBatch::decode(&mut cur).unwrap();
656 assert!(decoded == b);
657 assert!(cur.is_empty());
658 }
659 };
660 }
661
662 roundtrip_uncompressed!(uncompressed_empty, fixture_empty_batch);
663 roundtrip_uncompressed!(uncompressed_single, fixture_single_record_batch);
664 roundtrip_uncompressed!(uncompressed_multi, fixture_multi_record_batch);
665
666 #[test]
667 fn rejects_pre_v2_magic() {
668 let mut buf = BytesMut::new();
669 buf.put_i64(0); buf.put_i32(49); buf.put_i32(0); buf.put_i8(1); buf.put_u32(0); for _ in 21..HEADER_LEN {
675 buf.put_u8(0);
676 }
677 let mut cur: &[u8] = &buf[..];
678 assert!(matches!(
679 RecordBatch::decode(&mut cur),
680 Err(RecordsError::UnsupportedMagic { found: 1 })
681 ));
682 }
683
684 #[test]
685 fn rejects_bad_crc() {
686 let b = fixture_single_record_batch();
687 let mut buf = BytesMut::new();
688 b.encode(&mut buf).unwrap();
689 buf[17] ^= 0xFF;
691 let mut cur: &[u8] = &buf[..];
692 assert!(matches!(
693 RecordBatch::decode(&mut cur),
694 Err(RecordsError::CrcMismatch { .. })
695 ));
696 }
697
698 macro_rules! roundtrip_compressed {
699 ($name:ident, $codec:expr) => {
700 #[test]
701 fn $name() {
702 let mut b = fixture_multi_record_batch();
703 b.attributes = b.attributes.with_compression($codec);
704
705 let mut buf = BytesMut::new();
706 b.encode(&mut buf).unwrap();
707 let mut cur: &[u8] = &buf[..];
708 let decoded = RecordBatch::decode(&mut cur).unwrap();
709 assert!(decoded == b);
710 assert!(cur.is_empty());
711 }
712 };
713 }
714
715 roundtrip_compressed!(compressed_gzip, CompressionType::Gzip);
716 roundtrip_compressed!(compressed_snappy, CompressionType::Snappy);
717 roundtrip_compressed!(compressed_lz4, CompressionType::Lz4);
718 roundtrip_compressed!(compressed_zstd, CompressionType::Zstd);
719
720 #[test]
721 fn decode_huge_records_count_does_not_overallocate() {
722 let mut b = fixture_empty_batch();
727 b.attributes = b.attributes.with_compression(CompressionType::None);
728 let mut buf = BytesMut::new();
729 b.encode(&mut buf).unwrap();
730
731 let rc_off = HEADER_LEN - 4;
733 buf[rc_off..HEADER_LEN].copy_from_slice(&1_000_000_000i32.to_be_bytes());
734
735 let body = &buf[HEADER_LEN..];
737 let mut computed = crc32c(&buf[21..HEADER_LEN]);
738 computed = crc32c_append(computed, body);
739 buf[17..21].copy_from_slice(&computed.to_be_bytes());
740
741 let mut cur: &[u8] = &buf[..];
742 assert!(RecordBatch::decode(&mut cur).is_err());
744 }
745}
746
747impl crate::Encode for RecordBatch {
748 fn encode<B: BufMut>(&self, buf: &mut B, _version: i16) -> Result<(), crate::ProtocolError> {
749 RecordBatch::encode(self, buf).map_err(Into::into)
750 }
751
752 fn encoded_len(&self, _version: i16) -> usize {
753 RecordBatch::encoded_len(self)
754 }
755}
756
757impl crate::Decode<'_> for RecordBatch {
758 fn decode<B: Buf>(buf: &mut B, _version: i16) -> Result<Self, crate::ProtocolError> {
759 RecordBatch::decode(buf).map_err(Into::into)
760 }
761}