1use anyhow::{anyhow, bail, Result};
42use bytes::{Bytes, BytesMut};
43use crc::{Crc, CRC_32_ISO_HDLC};
44use crc32c::crc32c;
45use indexmap::IndexMap;
46
47use crate::protocol::{
48 buf::{gap, ByteBuf, ByteBufMut},
49 types, Decoder, Encoder, StrBytes,
50};
51
52use super::compression::{self as cmpr, Compressor, Decompressor};
53use std::cmp::Ordering;
54use std::convert::TryFrom;
55pub const IEEE: Crc<u32> = Crc::<u32>::new(&CRC_32_ISO_HDLC);
57
58#[derive(Debug, Copy, Clone, PartialEq, Eq)]
60pub enum Compression {
61 None = 0,
63 Gzip = 1,
65 Snappy = 2,
67 Lz4 = 3,
69 Zstd = 4,
71}
72
73#[derive(Debug, Copy, Clone, PartialEq, Eq)]
75pub enum TimestampType {
76 Creation = 0,
78 LogAppend = 1,
80}
81
82pub struct RecordEncodeOptions {
85 pub version: i8,
87
88 pub compression: Compression,
90}
91
92pub const NO_PRODUCER_ID: i64 = -1;
94pub const NO_PRODUCER_EPOCH: i16 = -1;
96pub const NO_PARTITION_LEADER_EPOCH: i32 = -1;
98pub const NO_SEQUENCE: i32 = -1;
100pub const NO_TIMESTAMP: i64 = -1;
102
103#[derive(Debug, Clone)]
104pub struct RecordBatchEncoder;
106
107#[derive(Debug, Clone)]
108pub struct RecordBatchDecoder;
110
111struct BatchDecodeInfo {
112 record_count: usize,
113 timestamp_type: TimestampType,
114 min_offset: i64,
115 min_timestamp: i64,
116 base_sequence: i32,
117 transactional: bool,
118 control: bool,
119 partition_leader_epoch: i32,
120 producer_id: i64,
121 producer_epoch: i16,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
126pub enum RecordCompression {
127 RecordBatch(Compression),
129 MessageSet,
131}
132
133#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct RecordSet {
136 pub compression: RecordCompression,
138 pub version: i8,
140 pub records: Vec<Record>,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct Record {
147 pub transactional: bool,
150 pub control: bool,
152 pub partition_leader_epoch: i32,
154 pub producer_id: i64,
156 pub producer_epoch: i16,
158
159 pub timestamp_type: TimestampType,
162 pub offset: i64,
164 pub sequence: i32,
166 pub timestamp: i64,
168 pub key: Option<Bytes>,
170 pub value: Option<Bytes>,
172 pub headers: IndexMap<StrBytes, Option<Bytes>>,
174}
175
176const MAGIC_BYTE_OFFSET: usize = 16;
177
178impl RecordBatchEncoder {
179 pub fn encode<'a, B, I>(buf: &mut B, records: I, options: &RecordEncodeOptions) -> Result<()>
182 where
183 B: ByteBufMut,
184 I: IntoIterator<Item = &'a Record>,
185 I::IntoIter: Clone,
186 {
187 Self::encode_with_custom_compression(
188 buf,
189 records,
190 options,
191 None::<fn(&mut BytesMut, &mut B, Compression) -> Result<()>>,
192 )
193 }
194
195 pub fn encode_with_custom_compression<'a, B, I, CF>(
202 buf: &mut B,
203 records: I,
204 options: &RecordEncodeOptions,
205 compressor: Option<CF>,
206 ) -> Result<()>
207 where
208 B: ByteBufMut,
209 I: IntoIterator<Item = &'a Record>,
210 I::IntoIter: Clone,
211 CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
212 {
213 let records = records.into_iter();
214 match options.version {
215 0..=1 => Self::encode_legacy(buf, records, options, compressor),
216 2 => Self::encode_new(buf, records, options, compressor),
217 _ => bail!("Unknown record batch version"),
218 }
219 }
220 fn encode_legacy_records<'a, B, I>(
221 buf: &mut B,
222 records: I,
223 options: &RecordEncodeOptions,
224 ) -> Result<()>
225 where
226 B: ByteBufMut,
227 I: Iterator<Item = &'a Record> + Clone,
228 {
229 for record in records {
230 record.encode_legacy(buf, options)?;
231 }
232 Ok(())
233 }
234 fn encode_legacy<'a, B, I, CF>(
235 buf: &mut B,
236 records: I,
237 options: &RecordEncodeOptions,
238 compressor: Option<CF>,
239 ) -> Result<()>
240 where
241 B: ByteBufMut,
242 I: Iterator<Item = &'a Record> + Clone,
243 CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
244 {
245 if options.compression == Compression::None {
246 Self::encode_legacy_records(buf, records, options)?;
248 } else {
249 let inner_opts = RecordEncodeOptions {
251 compression: Compression::None,
252 version: options.version,
253 };
254
255 Record::encode_legacy_static(buf, options, |buf| {
256 if options.version > 0 {
258 let min_timestamp = records
259 .clone()
260 .map(|r| r.timestamp)
261 .min()
262 .unwrap_or_default();
263 types::Int64.encode(buf, min_timestamp)?;
264 };
265
266 buf.put_i32(-1);
268
269 let size_gap = buf.put_typed_gap(gap::I32);
271 let value_start = buf.offset();
272 if let Some(compressor) = compressor {
273 let mut encoded_buf = BytesMut::new();
274 Self::encode_legacy_records(&mut encoded_buf, records, &inner_opts)?;
275 compressor(&mut encoded_buf, buf, options.compression)?;
276 } else {
277 match options.compression {
278 #[cfg(feature = "snappy")]
279 Compression::Snappy => cmpr::Snappy::compress(buf, |buf| {
280 Self::encode_legacy_records(buf, records, &inner_opts)
281 })?,
282 #[cfg(feature = "gzip")]
283 Compression::Gzip => cmpr::Gzip::compress(buf, |buf| {
284 Self::encode_legacy_records(buf, records, &inner_opts)
285 })?,
286 #[cfg(feature = "lz4")]
287 Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| {
288 Self::encode_legacy_records(buf, records, &inner_opts)
289 })?,
290 #[cfg(feature = "zstd")]
291 Compression::Zstd => cmpr::Zstd::compress(buf, |buf| {
292 Self::encode_legacy_records(buf, records, &inner_opts)
293 })?,
294 c => {
295 return Err(anyhow!(
296 "Support for {c:?} is not enabled as a cargo feature"
297 ))
298 }
299 }
300 }
301
302 let value_end = buf.offset();
303 let value_size = value_end - value_start;
304 if value_size > i32::MAX as usize {
305 bail!(
306 "Record batch was too large to encode ({} bytes)",
307 value_size
308 );
309 }
310 buf.fill_typed_gap(size_gap, value_size as i32);
311
312 Ok(())
313 })?;
314 }
315 Ok(())
316 }
317
318 fn encode_new_records<'a, B, I>(
319 buf: &mut B,
320 records: I,
321 min_offset: i64,
322 min_timestamp: i64,
323 options: &RecordEncodeOptions,
324 ) -> Result<()>
325 where
326 B: ByteBufMut,
327 I: Iterator<Item = &'a Record>,
328 {
329 for record in records {
330 record.encode_new(buf, min_offset, min_timestamp, options)?;
331 }
332 Ok(())
333 }
334
335 fn encode_new_batch<'a, B, I, CF>(
336 buf: &mut B,
337 records: &mut I,
338 options: &RecordEncodeOptions,
339 compressor: Option<&CF>,
340 ) -> Result<bool>
341 where
342 B: ByteBufMut,
343 I: Iterator<Item = &'a Record> + Clone,
344 CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
345 {
346 let mut record_peeker = records.clone();
347
348 let first_record = match record_peeker.next() {
350 Some(record) => record,
351 None => return Ok(false),
352 };
353
354 let num_records = record_peeker
356 .take_while(|record| {
357 record.transactional == first_record.transactional
358 && record.control == first_record.control
359 && record.partition_leader_epoch == first_record.partition_leader_epoch
360 && record.producer_id == first_record.producer_id
361 && record.producer_epoch == first_record.producer_epoch
362 && (record.offset as i32).wrapping_sub(record.sequence)
363 == (first_record.offset as i32).wrapping_sub(first_record.sequence)
364 })
365 .count()
366 + 1;
367
368 let min_offset = records
370 .clone()
371 .take(num_records)
372 .map(|r| r.offset)
373 .min()
374 .expect("Batch contains at least one element");
375 let max_offset = records
376 .clone()
377 .take(num_records)
378 .map(|r| r.offset)
379 .max()
380 .expect("Batch contains at least one element");
381 let min_timestamp = records
382 .clone()
383 .take(num_records)
384 .map(|r| r.timestamp)
385 .min()
386 .expect("Batch contains at least one element");
387 let max_timestamp = records
388 .clone()
389 .take(num_records)
390 .map(|r| r.timestamp)
391 .max()
392 .expect("Batch contains at least one element");
393 let base_sequence = first_record
394 .sequence
395 .wrapping_sub((first_record.offset - min_offset) as i32);
396
397 types::Int64.encode(buf, min_offset)?;
399
400 let size_gap = buf.put_typed_gap(gap::I32);
402 let batch_start = buf.offset();
403
404 types::Int32.encode(buf, first_record.partition_leader_epoch)?;
406
407 types::Int8.encode(buf, options.version)?;
409
410 let crc_gap = buf.put_typed_gap(gap::U32);
412 let content_start = buf.offset();
413
414 let mut attributes = options.compression as i16;
416 if first_record.transactional {
417 attributes |= 1 << 4;
418 }
419 if first_record.control {
420 attributes |= 1 << 5;
421 }
422 types::Int16.encode(buf, attributes)?;
423
424 types::Int32.encode(buf, (max_offset - min_offset) as i32)?;
426
427 types::Int64.encode(buf, min_timestamp)?;
429
430 types::Int64.encode(buf, max_timestamp)?;
432
433 types::Int64.encode(buf, first_record.producer_id)?;
435
436 types::Int16.encode(buf, first_record.producer_epoch)?;
438
439 types::Int32.encode(buf, base_sequence)?;
441
442 if num_records > i32::MAX as usize {
444 bail!(
445 "Too many records to encode in one batch ({} records)",
446 num_records
447 );
448 }
449 types::Int32.encode(buf, num_records as i32)?;
450
451 let records = records.take(num_records);
453
454 if let Some(compressor) = compressor {
455 let mut record_buf = BytesMut::new();
456 Self::encode_new_records(&mut record_buf, records, min_offset, min_timestamp, options)?;
457 compressor(&mut record_buf, buf, options.compression)?;
458 } else {
459 match options.compression {
460 Compression::None => cmpr::None::compress(buf, |buf| {
461 Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
462 })?,
463 #[cfg(feature = "snappy")]
464 Compression::Snappy => cmpr::Snappy::compress(buf, |buf| {
465 Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
466 })?,
467 #[cfg(feature = "gzip")]
468 Compression::Gzip => cmpr::Gzip::compress(buf, |buf| {
469 Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
470 })?,
471 #[cfg(feature = "lz4")]
472 Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| {
473 Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
474 })?,
475 #[cfg(feature = "zstd")]
476 Compression::Zstd => cmpr::Zstd::compress(buf, |buf| {
477 Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
478 })?,
479 #[allow(unreachable_patterns)]
480 c => {
481 return Err(anyhow!(
482 "Support for {c:?} is not enabled as a cargo feature"
483 ))
484 }
485 }
486 }
487 let batch_end = buf.offset();
488
489 let batch_size = batch_end - batch_start;
491 if batch_size > i32::MAX as usize {
492 bail!(
493 "Record batch was too large to encode ({} bytes)",
494 batch_size
495 );
496 }
497
498 buf.fill_typed_gap(size_gap, batch_size as i32);
499
500 let crc = crc32c(buf.range(content_start..batch_end));
502 buf.fill_typed_gap(crc_gap, crc);
503
504 Ok(true)
505 }
506
507 fn encode_new<'a, B, I, CF>(
508 buf: &mut B,
509 mut records: I,
510 options: &RecordEncodeOptions,
511 compressor: Option<CF>,
512 ) -> Result<()>
513 where
514 B: ByteBufMut,
515 I: Iterator<Item = &'a Record> + Clone,
516 CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
517 {
518 while Self::encode_new_batch(buf, &mut records, options, compressor.as_ref())? {}
519 Ok(())
520 }
521}
522
523impl RecordBatchDecoder {
524 pub fn decode_with_custom_compression<B: ByteBuf, F>(
530 buf: &mut B,
531 decompressor: Option<F>,
532 ) -> Result<RecordSet>
533 where
534 F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
535 {
536 let mut records = Vec::new();
537 let (version, compression) =
538 Self::decode_into_vec(buf, &mut records, decompressor.as_ref())?;
539 Ok(RecordSet {
540 version,
541 compression,
542 records,
543 })
544 }
545
546 pub fn decode_all<B: ByteBuf>(buf: &mut B) -> Result<Vec<RecordSet>> {
548 let mut batches = Vec::new();
549 while buf.has_remaining() {
550 batches.push(Self::decode(buf)?);
551 }
552 Ok(batches)
553 }
554
555 pub fn decode<B: ByteBuf>(buf: &mut B) -> Result<RecordSet> {
557 Self::decode_with_custom_compression(
558 buf,
559 None::<fn(&mut bytes::Bytes, Compression) -> Result<B>>.as_ref(),
560 )
561 }
562
563 fn decode_into_vec<B: ByteBuf, F>(
564 buf: &mut B,
565 records: &mut Vec<Record>,
566 decompress_func: Option<&F>,
567 ) -> Result<(i8, RecordCompression)>
568 where
569 F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
570 {
571 let version = buf.try_peek_bytes(MAGIC_BYTE_OFFSET..(MAGIC_BYTE_OFFSET + 1))?[0] as i8;
572 let compression = match version {
573 0..=1 => {
574 Record::decode_legacy(buf, version, records).map(|()| RecordCompression::MessageSet)
575 }
576 2 => Self::decode_new_batch(buf, version, records, decompress_func)
577 .map(RecordCompression::RecordBatch),
578 _ => {
579 bail!("Unknown record batch version ({})", version);
580 }
581 }?;
582 Ok((version, compression))
583 }
584 fn decode_new_records<B: ByteBuf>(
585 buf: &mut B,
586 batch_decode_info: &BatchDecodeInfo,
587 version: i8,
588 records: &mut Vec<Record>,
589 ) -> Result<()> {
590 records.reserve(batch_decode_info.record_count);
591 for _ in 0..batch_decode_info.record_count {
592 records.push(Record::decode_new(buf, batch_decode_info, version)?);
593 }
594 Ok(())
595 }
596 fn decode_new_batch<B: ByteBuf, F>(
597 buf: &mut B,
598 version: i8,
599 records: &mut Vec<Record>,
600 decompress_func: Option<&F>,
601 ) -> Result<Compression>
602 where
603 F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
604 {
605 let min_offset = types::Int64.decode(buf)?;
607
608 let batch_length: i32 = types::Int32.decode(buf)?;
610 if batch_length < 0 {
611 bail!("Unexpected negative batch size: {}", batch_length);
612 }
613
614 let buf = &mut buf.try_get_bytes(batch_length as usize)?;
616
617 let partition_leader_epoch = types::Int32.decode(buf)?;
619
620 let magic: i8 = types::Int8.decode(buf)?;
622 if magic != version {
623 bail!("Version mismatch ({} != {})", magic, version);
624 }
625
626 let supplied_crc: u32 = types::UInt32.decode(buf)?;
628 let actual_crc = crc32c(buf);
629
630 if supplied_crc != actual_crc {
631 bail!(
632 "Cyclic redundancy check failed ({} != {})",
633 supplied_crc,
634 actual_crc
635 );
636 }
637
638 let attributes: i16 = types::Int16.decode(buf)?;
640 let transactional = (attributes & (1 << 4)) != 0;
641 let control = (attributes & (1 << 5)) != 0;
642 let compression = match attributes & 0x7 {
643 0 => Compression::None,
644 1 => Compression::Gzip,
645 2 => Compression::Snappy,
646 3 => Compression::Lz4,
647 4 => Compression::Zstd,
648 other => {
649 bail!("Unknown compression algorithm used: {}", other);
650 }
651 };
652 let timestamp_type = if (attributes & (1 << 3)) != 0 {
653 TimestampType::LogAppend
654 } else {
655 TimestampType::Creation
656 };
657
658 let _max_offset_delta: i32 = types::Int32.decode(buf)?;
660
661 let min_timestamp = types::Int64.decode(buf)?;
663
664 let _max_timestamp: i64 = types::Int64.decode(buf)?;
666
667 let producer_id = types::Int64.decode(buf)?;
669
670 let producer_epoch = types::Int16.decode(buf)?;
672
673 let base_sequence = types::Int32.decode(buf)?;
675
676 let record_count: i32 = types::Int32.decode(buf)?;
678 if record_count < 0 {
679 bail!("Unexpected negative record count ({})", record_count);
680 }
681 let record_count = record_count as usize;
682
683 let batch_decode_info = BatchDecodeInfo {
684 record_count,
685 timestamp_type,
686 min_offset,
687 min_timestamp,
688 base_sequence,
689 transactional,
690 control,
691 partition_leader_epoch,
692 producer_id,
693 producer_epoch,
694 };
695
696 if let Some(decompress_func) = decompress_func {
697 let mut decompressed_buf = decompress_func(buf, compression)?;
698
699 Self::decode_new_records(&mut decompressed_buf, &batch_decode_info, version, records)?;
700 } else {
701 match compression {
702 Compression::None => cmpr::None::decompress(buf, |buf| {
703 Self::decode_new_records(buf, &batch_decode_info, version, records)
704 })?,
705 #[cfg(feature = "snappy")]
706 Compression::Snappy => cmpr::Snappy::decompress(buf, |buf| {
707 Self::decode_new_records(buf, &batch_decode_info, version, records)
708 })?,
709 #[cfg(feature = "gzip")]
710 Compression::Gzip => cmpr::Gzip::decompress(buf, |buf| {
711 Self::decode_new_records(buf, &batch_decode_info, version, records)
712 })?,
713 #[cfg(feature = "zstd")]
714 Compression::Zstd => cmpr::Zstd::decompress(buf, |buf| {
715 Self::decode_new_records(buf, &batch_decode_info, version, records)
716 })?,
717 #[cfg(feature = "lz4")]
718 Compression::Lz4 => cmpr::Lz4::decompress(buf, |buf| {
719 Self::decode_new_records(buf, &batch_decode_info, version, records)
720 })?,
721 #[allow(unreachable_patterns)]
722 c => {
723 return Err(anyhow!(
724 "Support for {c:?} is not enabled as a cargo feature"
725 ))
726 }
727 };
728 }
729
730 Ok(compression)
731 }
732}
733
734impl Record {
735 fn encode_legacy_static<B, F>(
736 buf: &mut B,
737 options: &RecordEncodeOptions,
738 content_writer: F,
739 ) -> Result<()>
740 where
741 B: ByteBufMut,
742 F: FnOnce(&mut B) -> Result<()>,
743 {
744 types::Int64.encode(buf, 0)?;
745 let size_gap = buf.put_typed_gap(gap::I32);
746 let message_start = buf.offset();
747 let crc_gap = buf.put_typed_gap(gap::U32);
748 let content_start = buf.offset();
749
750 types::Int8.encode(buf, options.version)?;
751
752 let compression = options.compression as i8;
753 if compression > 2 + options.version {
754 bail!(
755 "Compression algorithm '{:?}' is unsupported for record version '{}'",
756 options.compression,
757 options.version
758 );
759 }
760 types::Int8.encode(buf, compression)?;
761
762 content_writer(buf)?;
764
765 let message_end = buf.offset();
766
767 let message_size = message_end - message_start;
768 if message_start > i32::MAX as usize {
769 bail!("Record was too large to encode ({} bytes)", message_size);
770 }
771 buf.fill_typed_gap(size_gap, message_size as i32);
772
773 let crc = IEEE.checksum(buf.range(content_start..message_end));
774 buf.fill_typed_gap(crc_gap, crc);
775
776 Ok(())
777 }
778 fn encode_legacy<B: ByteBufMut>(
779 &self,
780 buf: &mut B,
781 options: &RecordEncodeOptions,
782 ) -> Result<()> {
783 if self.transactional || self.control {
784 bail!("Transactional and control records are not supported in this version of the protocol!");
785 }
786
787 if !self.headers.is_empty() {
788 bail!("Record headers are not supported in this version of the protocol!");
789 }
790
791 Self::encode_legacy_static(buf, options, |buf| {
792 if options.version > 0 {
793 types::Int64.encode(buf, self.timestamp)?;
794 }
795 types::Bytes.encode(buf, &self.key)?;
796 types::Bytes.encode(buf, &self.value)?;
797
798 Ok(())
799 })
800 }
801 fn encode_new<B: ByteBufMut>(
802 &self,
803 buf: &mut B,
804 min_offset: i64,
805 min_timestamp: i64,
806 options: &RecordEncodeOptions,
807 ) -> Result<()> {
808 let size = self.compute_size_new(min_offset, min_timestamp, options)?;
810 if size > i32::MAX as usize {
811 bail!("Record was too large to encode ({} bytes)", size);
812 }
813 types::VarInt.encode(buf, size as i32)?;
814
815 types::Int8.encode(buf, 0)?;
817
818 let timestamp_delta = self.timestamp - min_timestamp;
820 if timestamp_delta > i32::MAX as i64 || timestamp_delta < i32::MIN as i64 {
821 bail!(
822 "Timestamps within batch are too far apart ({}, {})",
823 min_timestamp,
824 self.timestamp
825 );
826 }
827 types::VarInt.encode(buf, timestamp_delta as i32)?;
828
829 let offset_delta = self.offset - min_offset;
831 if offset_delta > i32::MAX as i64 || offset_delta < i32::MIN as i64 {
832 bail!(
833 "Timestamps within batch are too far apart ({}, {})",
834 min_offset,
835 self.offset
836 );
837 }
838 types::VarInt.encode(buf, offset_delta as i32)?;
839
840 if let Some(k) = self.key.as_ref() {
842 if k.len() > i32::MAX as usize {
843 bail!("Record key was too large to encode ({} bytes)", k.len());
844 }
845 types::VarInt.encode(buf, k.len() as i32)?;
846 buf.put_slice(k);
847 } else {
848 types::VarInt.encode(buf, -1)?;
849 }
850
851 if let Some(v) = self.value.as_ref() {
853 if v.len() > i32::MAX as usize {
854 bail!("Record value was too large to encode ({} bytes)", v.len());
855 }
856 types::VarInt.encode(buf, v.len() as i32)?;
857 buf.put_slice(v);
858 } else {
859 types::VarInt.encode(buf, -1)?;
860 }
861
862 if self.headers.len() > i32::MAX as usize {
864 bail!("Too many record headers encode ({})", self.headers.len());
865 }
866 types::VarInt.encode(buf, self.headers.len() as i32)?;
867 for (k, v) in &self.headers {
868 if k.len() > i32::MAX as usize {
870 bail!(
871 "Record header key was too large to encode ({} bytes)",
872 k.len()
873 );
874 }
875 types::VarInt.encode(buf, k.len() as i32)?;
876
877 buf.put_slice(k.as_ref());
879
880 if let Some(v) = v.as_ref() {
882 if v.len() > i32::MAX as usize {
883 bail!(
884 "Record header value was too large to encode ({} bytes)",
885 v.len()
886 );
887 }
888 types::VarInt.encode(buf, v.len() as i32)?;
889 buf.put_slice(v);
890 } else {
891 types::VarInt.encode(buf, -1)?;
892 }
893 }
894
895 Ok(())
896 }
897 fn compute_size_new(
898 &self,
899 min_offset: i64,
900 min_timestamp: i64,
901 _options: &RecordEncodeOptions,
902 ) -> Result<usize> {
903 let mut total_size = 0;
904
905 total_size += types::Int8.compute_size(0)?;
907
908 let timestamp_delta = self.timestamp - min_timestamp;
910 if timestamp_delta > i32::MAX as i64 || timestamp_delta < i32::MIN as i64 {
911 bail!(
912 "Timestamps within batch are too far apart ({}, {})",
913 min_timestamp,
914 self.timestamp
915 );
916 }
917 total_size += types::VarInt.compute_size(timestamp_delta as i32)?;
918
919 let offset_delta = self.offset - min_offset;
921 if offset_delta > i32::MAX as i64 || offset_delta < i32::MIN as i64 {
922 bail!(
923 "Timestamps within batch are too far apart ({}, {})",
924 min_offset,
925 self.offset
926 );
927 }
928 total_size += types::VarInt.compute_size(offset_delta as i32)?;
929
930 if let Some(k) = self.key.as_ref() {
932 if k.len() > i32::MAX as usize {
933 bail!("Record key was too large to encode ({} bytes)", k.len());
934 }
935 total_size += types::VarInt.compute_size(k.len() as i32)?;
936 total_size += k.len();
937 } else {
938 total_size += types::VarInt.compute_size(-1)?;
939 }
940
941 if let Some(v) = self.value.as_ref() {
943 if v.len() > i32::MAX as usize {
944 bail!("Record value was too large to encode ({} bytes)", v.len());
945 }
946 total_size += types::VarInt.compute_size(v.len() as i32)?;
947 total_size += v.len();
948 } else {
949 total_size += types::VarInt.compute_size(-1)?;
950 }
951
952 if self.headers.len() > i32::MAX as usize {
954 bail!("Too many record headers encode ({})", self.headers.len());
955 }
956 total_size += types::VarInt.compute_size(self.headers.len() as i32)?;
957 for (k, v) in &self.headers {
958 if k.len() > i32::MAX as usize {
960 bail!(
961 "Record header key was too large to encode ({} bytes)",
962 k.len()
963 );
964 }
965 total_size += types::VarInt.compute_size(k.len() as i32)?;
966
967 total_size += k.len();
969
970 if let Some(v) = v.as_ref() {
972 if v.len() > i32::MAX as usize {
973 bail!(
974 "Record header value was too large to encode ({} bytes)",
975 v.len()
976 );
977 }
978 total_size += types::VarInt.compute_size(v.len() as i32)?;
979 total_size += v.len();
980 } else {
981 total_size += types::VarInt.compute_size(-1)?;
982 }
983 }
984
985 Ok(total_size)
986 }
987 fn decode_legacy<B: ByteBuf>(
988 buf: &mut B,
989 version: i8,
990 records: &mut Vec<Record>,
991 ) -> Result<()> {
992 let offset = types::Int64.decode(buf)?;
993 let size: i32 = types::Int32.decode(buf)?;
994 if size < 0 {
995 bail!("Unexpected negative record size: {}", size);
996 }
997
998 let buf = &mut buf.try_get_bytes(size as usize)?;
1000
1001 let supplied_crc: u32 = types::UInt32.decode(buf)?;
1003 let actual_crc = IEEE.checksum(buf);
1004
1005 if supplied_crc != actual_crc {
1006 bail!(
1007 "Cyclic redundancy check failed ({} != {})",
1008 supplied_crc,
1009 actual_crc
1010 );
1011 }
1012
1013 let magic: i8 = types::Int8.decode(buf)?;
1015 if magic != version {
1016 bail!("Version mismatch ({} != {})", magic, version);
1017 }
1018
1019 let attributes: i8 = types::Int8.decode(buf)?;
1021 let compression = match attributes & 0x7 {
1022 0 => Compression::None,
1023 1 => Compression::Gzip,
1024 2 => Compression::Snappy,
1025 3 if version > 0 => Compression::Lz4,
1026 other => {
1027 bail!("Unknown compression algorithm used: {}", other);
1028 }
1029 };
1030 let timestamp_type = if (attributes & (1 << 3)) != 0 {
1031 TimestampType::LogAppend
1032 } else {
1033 TimestampType::Creation
1034 };
1035
1036 let timestamp = if version > 0 {
1038 types::Int64.decode(buf)?
1039 } else {
1040 NO_TIMESTAMP
1041 };
1042 let key = types::Bytes.decode(buf)?;
1043 let value = types::Bytes.decode(buf)?;
1044
1045 if compression == Compression::None {
1046 records.push(Record {
1048 transactional: false,
1049 control: false,
1050 partition_leader_epoch: NO_PARTITION_LEADER_EPOCH,
1051 producer_id: NO_PRODUCER_ID,
1052 producer_epoch: NO_PRODUCER_EPOCH,
1053 sequence: NO_SEQUENCE,
1054 timestamp_type,
1055 offset,
1056 timestamp,
1057 key,
1058 value,
1059 headers: Default::default(),
1060 });
1061 } else {
1062 let mut value = value
1064 .ok_or_else(|| anyhow!("Received compressed legacy record without a value"))?;
1065
1066 while !value.is_empty() {
1067 Record::decode_legacy(&mut value, version, records)?;
1068 }
1069 }
1070
1071 Ok(())
1072 }
1073 fn decode_new<B: ByteBuf>(
1074 buf: &mut B,
1075 batch_decode_info: &BatchDecodeInfo,
1076 _version: i8,
1077 ) -> Result<Self> {
1078 let size: i32 = types::VarInt.decode(buf)?;
1080 if size < 0 {
1081 bail!("Unexpected negative record size: {}", size);
1082 }
1083
1084 let buf = &mut buf.try_get_bytes(size as usize)?;
1086
1087 let _attributes: i8 = types::Int8.decode(buf)?;
1089
1090 let timestamp_delta: i32 = types::VarInt.decode(buf)?;
1092 let timestamp = batch_decode_info.min_timestamp + timestamp_delta as i64;
1093
1094 let offset_delta: i32 = types::VarInt.decode(buf)?;
1096 let offset = batch_decode_info.min_offset + offset_delta as i64;
1097 let sequence = batch_decode_info.base_sequence.wrapping_add(offset_delta);
1098
1099 let key_len: i32 = types::VarInt.decode(buf)?;
1101 let key = match key_len.cmp(&-1) {
1102 Ordering::Less => {
1103 bail!("Unexpected negative record key length ({} bytes)", key_len);
1104 }
1105 Ordering::Equal => None,
1106 Ordering::Greater => Some(buf.try_get_bytes(key_len as usize)?),
1107 };
1108
1109 let value_len: i32 = types::VarInt.decode(buf)?;
1111 let value = match value_len.cmp(&-1) {
1112 Ordering::Less => {
1113 bail!(
1114 "Unexpected negative record value length ({} bytes)",
1115 value_len
1116 );
1117 }
1118 Ordering::Equal => None,
1119 Ordering::Greater => Some(buf.try_get_bytes(value_len as usize)?),
1120 };
1121
1122 let num_headers: i32 = types::VarInt.decode(buf)?;
1124 if num_headers < 0 {
1125 bail!("Unexpected negative record header count: {}", num_headers);
1126 }
1127 let num_headers = num_headers as usize;
1128
1129 let mut headers = IndexMap::with_capacity(num_headers);
1130 for _ in 0..num_headers {
1131 let key_len: i32 = types::VarInt.decode(buf)?;
1133 if key_len < 0 {
1134 bail!(
1135 "Unexpected negative record header key length ({} bytes)",
1136 key_len
1137 );
1138 }
1139
1140 let key = StrBytes::try_from(buf.try_get_bytes(key_len as usize)?)?;
1142
1143 let value_len: i32 = types::VarInt.decode(buf)?;
1145
1146 let value = match value_len.cmp(&-2) {
1148 Ordering::Less => {
1149 bail!(
1150 "Unexpected negative record header value length ({} bytes)",
1151 value_len
1152 );
1153 }
1154 Ordering::Equal => None,
1155 Ordering::Greater => Some(buf.try_get_bytes(value_len as usize)?),
1156 };
1157
1158 headers.insert(key, value);
1159 }
1160
1161 Ok(Self {
1162 transactional: batch_decode_info.transactional,
1163 control: batch_decode_info.control,
1164 timestamp_type: batch_decode_info.timestamp_type,
1165 partition_leader_epoch: batch_decode_info.partition_leader_epoch,
1166 producer_id: batch_decode_info.producer_id,
1167 producer_epoch: batch_decode_info.producer_epoch,
1168 sequence,
1169 offset,
1170 timestamp,
1171 key,
1172 value,
1173 headers,
1174 })
1175 }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180 use bytes::Bytes;
1181
1182 use super::{Record, TimestampType};
1183
1184 #[test]
1185 fn lookup_header_via_u8_slice() {
1186 let record = Record {
1187 transactional: false,
1188 control: false,
1189 partition_leader_epoch: 0,
1190 producer_id: 0,
1191 producer_epoch: 0,
1192 sequence: 0,
1193 timestamp_type: TimestampType::Creation,
1194 offset: Default::default(),
1195 timestamp: Default::default(),
1196 key: Default::default(),
1197 value: Default::default(),
1198 headers: [
1199 ("some-key".into(), Some("some-value".into())),
1200 ("other-header".into(), None),
1201 ]
1202 .into(),
1203 };
1204 assert_eq!(
1205 Bytes::from("some-value"),
1206 record
1207 .headers
1208 .get("some-key".as_bytes())
1210 .expect("key exists in headers")
1211 .as_ref()
1212 .expect("value is present")
1213 );
1214 }
1215}