1use anyhow::{anyhow, bail, Result};
42use bytes::{Bytes, BytesMut};
43use crc::{Crc, CRC_32_ISO_HDLC};
44use crc32c::crc32c;
45use indexmap::IndexMap;
46
47use crate::protocol::{
48 buf::{gap, ByteBuf, ByteBufMut},
49 types, Decoder, Encoder, StrBytes,
50};
51
52use super::compression::{self as cmpr, Compressor, Decompressor};
53use std::cmp::Ordering;
54use std::convert::TryFrom;
55pub const IEEE: Crc<u32> = Crc::<u32>::new(&CRC_32_ISO_HDLC);
57
58#[derive(Debug, Copy, Clone, PartialEq, Eq)]
60pub enum Compression {
61 None = 0,
63 Gzip = 1,
65 Snappy = 2,
67 Lz4 = 3,
69 Zstd = 4,
71}
72
73#[derive(Debug, Copy, Clone, PartialEq, Eq)]
75pub enum TimestampType {
76 Creation = 0,
78 LogAppend = 1,
80}
81
82pub struct RecordEncodeOptions {
85 pub version: i8,
87
88 pub compression: Compression,
90}
91
92pub const NO_PRODUCER_ID: i64 = -1;
94pub const NO_PRODUCER_EPOCH: i16 = -1;
96pub const NO_PARTITION_LEADER_EPOCH: i32 = -1;
98pub const NO_SEQUENCE: i32 = -1;
100pub const NO_TIMESTAMP: i64 = -1;
102
103#[derive(Debug, Clone)]
104pub struct RecordBatchEncoder;
106
107#[derive(Debug, Clone)]
108pub struct RecordBatchDecoder;
110
111struct BatchDecodeInfo {
112 record_count: usize,
113 timestamp_type: TimestampType,
114 min_offset: i64,
115 min_timestamp: i64,
116 base_sequence: i32,
117 transactional: bool,
118 control: bool,
119 partition_leader_epoch: i32,
120 producer_id: i64,
121 producer_epoch: i16,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
126pub struct RecordSet {
127 pub compression: Compression,
129 pub version: i8,
131 pub records: Vec<Record>,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq)]
137pub struct Record {
138 pub transactional: bool,
141 pub control: bool,
143 pub partition_leader_epoch: i32,
145 pub producer_id: i64,
147 pub producer_epoch: i16,
149
150 pub timestamp_type: TimestampType,
153 pub offset: i64,
155 pub sequence: i32,
157 pub timestamp: i64,
159 pub key: Option<Bytes>,
161 pub value: Option<Bytes>,
163 pub headers: IndexMap<StrBytes, Option<Bytes>>,
165}
166
167const MAGIC_BYTE_OFFSET: usize = 16;
168
169impl RecordBatchEncoder {
170 pub fn encode<'a, B, I>(buf: &mut B, records: I, options: &RecordEncodeOptions) -> Result<()>
173 where
174 B: ByteBufMut,
175 I: IntoIterator<Item = &'a Record>,
176 I::IntoIter: Clone,
177 {
178 Self::encode_with_custom_compression(
179 buf,
180 records,
181 options,
182 None::<fn(&mut BytesMut, &mut B, Compression) -> Result<()>>,
183 )
184 }
185
186 pub fn encode_with_custom_compression<'a, B, I, CF>(
193 buf: &mut B,
194 records: I,
195 options: &RecordEncodeOptions,
196 compressor: Option<CF>,
197 ) -> Result<()>
198 where
199 B: ByteBufMut,
200 I: IntoIterator<Item = &'a Record>,
201 I::IntoIter: Clone,
202 CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
203 {
204 let records = records.into_iter();
205 match options.version {
206 0..=1 => bail!("message sets v{} are unsupported", options.version),
207 2 => Self::encode_new(buf, records, options, compressor),
208 _ => bail!("Unknown record batch version"),
209 }
210 }
211
212 fn encode_new_records<'a, B, I>(
213 buf: &mut B,
214 records: I,
215 min_offset: i64,
216 min_timestamp: i64,
217 options: &RecordEncodeOptions,
218 ) -> Result<()>
219 where
220 B: ByteBufMut,
221 I: Iterator<Item = &'a Record>,
222 {
223 for record in records {
224 record.encode_new(buf, min_offset, min_timestamp, options)?;
225 }
226 Ok(())
227 }
228
229 fn encode_new_batch<'a, B, I, CF>(
230 buf: &mut B,
231 records: &mut I,
232 options: &RecordEncodeOptions,
233 compressor: Option<&CF>,
234 ) -> Result<bool>
235 where
236 B: ByteBufMut,
237 I: Iterator<Item = &'a Record> + Clone,
238 CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
239 {
240 let mut record_peeker = records.clone();
241
242 let first_record = match record_peeker.next() {
244 Some(record) => record,
245 None => return Ok(false),
246 };
247
248 let num_records = record_peeker
250 .take_while(|record| {
251 record.transactional == first_record.transactional
252 && record.control == first_record.control
253 && record.partition_leader_epoch == first_record.partition_leader_epoch
254 && record.producer_id == first_record.producer_id
255 && record.producer_epoch == first_record.producer_epoch
256 && (record.offset as i32).wrapping_sub(record.sequence)
257 == (first_record.offset as i32).wrapping_sub(first_record.sequence)
258 })
259 .count()
260 + 1;
261
262 let min_offset = records
264 .clone()
265 .take(num_records)
266 .map(|r| r.offset)
267 .min()
268 .expect("Batch contains at least one element");
269 let max_offset = records
270 .clone()
271 .take(num_records)
272 .map(|r| r.offset)
273 .max()
274 .expect("Batch contains at least one element");
275 let min_timestamp = records
276 .clone()
277 .take(num_records)
278 .map(|r| r.timestamp)
279 .min()
280 .expect("Batch contains at least one element");
281 let max_timestamp = records
282 .clone()
283 .take(num_records)
284 .map(|r| r.timestamp)
285 .max()
286 .expect("Batch contains at least one element");
287 let base_sequence = first_record
288 .sequence
289 .wrapping_sub((first_record.offset - min_offset) as i32);
290
291 types::Int64.encode(buf, min_offset)?;
293
294 let size_gap = buf.put_typed_gap(gap::I32);
296 let batch_start = buf.offset();
297
298 types::Int32.encode(buf, first_record.partition_leader_epoch)?;
300
301 types::Int8.encode(buf, options.version)?;
303
304 let crc_gap = buf.put_typed_gap(gap::U32);
306 let content_start = buf.offset();
307
308 let mut attributes = options.compression as i16;
310 if first_record.transactional {
311 attributes |= 1 << 4;
312 }
313 if first_record.control {
314 attributes |= 1 << 5;
315 }
316 types::Int16.encode(buf, attributes)?;
317
318 types::Int32.encode(buf, (max_offset - min_offset) as i32)?;
320
321 types::Int64.encode(buf, min_timestamp)?;
323
324 types::Int64.encode(buf, max_timestamp)?;
326
327 types::Int64.encode(buf, first_record.producer_id)?;
329
330 types::Int16.encode(buf, first_record.producer_epoch)?;
332
333 types::Int32.encode(buf, base_sequence)?;
335
336 if num_records > i32::MAX as usize {
338 bail!(
339 "Too many records to encode in one batch ({} records)",
340 num_records
341 );
342 }
343 types::Int32.encode(buf, num_records as i32)?;
344
345 let records = records.take(num_records);
347
348 if let Some(compressor) = compressor {
349 let mut record_buf = BytesMut::new();
350 Self::encode_new_records(&mut record_buf, records, min_offset, min_timestamp, options)?;
351 compressor(&mut record_buf, buf, options.compression)?;
352 } else {
353 match options.compression {
354 Compression::None => cmpr::None::compress(buf, |buf| {
355 Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
356 })?,
357 #[cfg(feature = "snappy")]
358 Compression::Snappy => cmpr::Snappy::compress(buf, |buf| {
359 Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
360 })?,
361 #[cfg(feature = "gzip")]
362 Compression::Gzip => cmpr::Gzip::compress(buf, |buf| {
363 Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
364 })?,
365 #[cfg(feature = "lz4")]
366 Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| {
367 Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
368 })?,
369 #[cfg(feature = "zstd")]
370 Compression::Zstd => cmpr::Zstd::compress(buf, |buf| {
371 Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
372 })?,
373 #[allow(unreachable_patterns)]
374 c => {
375 return Err(anyhow!(
376 "Support for {c:?} is not enabled as a cargo feature"
377 ))
378 }
379 }
380 }
381 let batch_end = buf.offset();
382
383 let batch_size = batch_end - batch_start;
385 if batch_size > i32::MAX as usize {
386 bail!(
387 "Record batch was too large to encode ({} bytes)",
388 batch_size
389 );
390 }
391
392 buf.fill_typed_gap(size_gap, batch_size as i32);
393
394 let crc = crc32c(buf.range(content_start..batch_end));
396 buf.fill_typed_gap(crc_gap, crc);
397
398 Ok(true)
399 }
400
401 fn encode_new<'a, B, I, CF>(
402 buf: &mut B,
403 mut records: I,
404 options: &RecordEncodeOptions,
405 compressor: Option<CF>,
406 ) -> Result<()>
407 where
408 B: ByteBufMut,
409 I: Iterator<Item = &'a Record> + Clone,
410 CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
411 {
412 while Self::encode_new_batch(buf, &mut records, options, compressor.as_ref())? {}
413 Ok(())
414 }
415}
416
417impl RecordBatchDecoder {
418 pub fn decode_with_custom_compression<B: ByteBuf, F>(
424 buf: &mut B,
425 decompressor: Option<F>,
426 ) -> Result<RecordSet>
427 where
428 F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
429 {
430 let mut records = Vec::new();
431 let (version, compression) =
432 Self::decode_into_vec(buf, &mut records, decompressor.as_ref())?;
433 Ok(RecordSet {
434 version,
435 compression,
436 records,
437 })
438 }
439
440 pub fn decode_all<B: ByteBuf>(buf: &mut B) -> Result<Vec<RecordSet>> {
442 let mut batches = Vec::new();
443 while buf.has_remaining() {
444 batches.push(Self::decode(buf)?);
445 }
446 Ok(batches)
447 }
448
449 pub fn decode<B: ByteBuf>(buf: &mut B) -> Result<RecordSet> {
451 Self::decode_with_custom_compression(
452 buf,
453 None::<fn(&mut bytes::Bytes, Compression) -> Result<B>>.as_ref(),
454 )
455 }
456
457 fn decode_into_vec<B: ByteBuf, F>(
458 buf: &mut B,
459 records: &mut Vec<Record>,
460 decompress_func: Option<&F>,
461 ) -> Result<(i8, Compression)>
462 where
463 F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
464 {
465 let version = buf.try_peek_bytes(MAGIC_BYTE_OFFSET..(MAGIC_BYTE_OFFSET + 1))?[0] as i8;
466 let compression = match version {
467 0..=1 => bail!("message sets v{} are unsupported", version),
468 2 => Self::decode_new_batch(buf, version, records, decompress_func),
469 _ => {
470 bail!("Unknown record batch version ({})", version);
471 }
472 }?;
473 Ok((version, compression))
474 }
475 fn decode_new_records<B: ByteBuf>(
476 buf: &mut B,
477 batch_decode_info: &BatchDecodeInfo,
478 version: i8,
479 records: &mut Vec<Record>,
480 ) -> Result<()> {
481 records.reserve(batch_decode_info.record_count);
482 for _ in 0..batch_decode_info.record_count {
483 records.push(Record::decode_new(buf, batch_decode_info, version)?);
484 }
485 Ok(())
486 }
487 fn decode_new_batch<B: ByteBuf, F>(
488 buf: &mut B,
489 version: i8,
490 records: &mut Vec<Record>,
491 decompress_func: Option<&F>,
492 ) -> Result<Compression>
493 where
494 F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
495 {
496 let min_offset = types::Int64.decode(buf)?;
498
499 let batch_length: i32 = types::Int32.decode(buf)?;
501 if batch_length < 0 {
502 bail!("Unexpected negative batch size: {}", batch_length);
503 }
504
505 let buf = &mut buf.try_get_bytes(batch_length as usize)?;
507
508 let partition_leader_epoch = types::Int32.decode(buf)?;
510
511 let magic: i8 = types::Int8.decode(buf)?;
513 if magic != version {
514 bail!("Version mismatch ({} != {})", magic, version);
515 }
516
517 let supplied_crc: u32 = types::UInt32.decode(buf)?;
519 let actual_crc = crc32c(buf);
520
521 if supplied_crc != actual_crc {
522 bail!(
523 "Cyclic redundancy check failed ({} != {})",
524 supplied_crc,
525 actual_crc
526 );
527 }
528
529 let attributes: i16 = types::Int16.decode(buf)?;
531 let transactional = (attributes & (1 << 4)) != 0;
532 let control = (attributes & (1 << 5)) != 0;
533 let compression = match attributes & 0x7 {
534 0 => Compression::None,
535 1 => Compression::Gzip,
536 2 => Compression::Snappy,
537 3 => Compression::Lz4,
538 4 => Compression::Zstd,
539 other => {
540 bail!("Unknown compression algorithm used: {}", other);
541 }
542 };
543 let timestamp_type = if (attributes & (1 << 3)) != 0 {
544 TimestampType::LogAppend
545 } else {
546 TimestampType::Creation
547 };
548
549 let _max_offset_delta: i32 = types::Int32.decode(buf)?;
551
552 let min_timestamp = types::Int64.decode(buf)?;
554
555 let _max_timestamp: i64 = types::Int64.decode(buf)?;
557
558 let producer_id = types::Int64.decode(buf)?;
560
561 let producer_epoch = types::Int16.decode(buf)?;
563
564 let base_sequence = types::Int32.decode(buf)?;
566
567 let record_count: i32 = types::Int32.decode(buf)?;
569 if record_count < 0 {
570 bail!("Unexpected negative record count ({})", record_count);
571 }
572 let record_count = record_count as usize;
573
574 let batch_decode_info = BatchDecodeInfo {
575 record_count,
576 timestamp_type,
577 min_offset,
578 min_timestamp,
579 base_sequence,
580 transactional,
581 control,
582 partition_leader_epoch,
583 producer_id,
584 producer_epoch,
585 };
586
587 if let Some(decompress_func) = decompress_func {
588 let mut decompressed_buf = decompress_func(buf, compression)?;
589
590 Self::decode_new_records(&mut decompressed_buf, &batch_decode_info, version, records)?;
591 } else {
592 match compression {
593 Compression::None => cmpr::None::decompress(buf, |buf| {
594 Self::decode_new_records(buf, &batch_decode_info, version, records)
595 })?,
596 #[cfg(feature = "snappy")]
597 Compression::Snappy => cmpr::Snappy::decompress(buf, |buf| {
598 Self::decode_new_records(buf, &batch_decode_info, version, records)
599 })?,
600 #[cfg(feature = "gzip")]
601 Compression::Gzip => cmpr::Gzip::decompress(buf, |buf| {
602 Self::decode_new_records(buf, &batch_decode_info, version, records)
603 })?,
604 #[cfg(feature = "zstd")]
605 Compression::Zstd => cmpr::Zstd::decompress(buf, |buf| {
606 Self::decode_new_records(buf, &batch_decode_info, version, records)
607 })?,
608 #[cfg(feature = "lz4")]
609 Compression::Lz4 => cmpr::Lz4::decompress(buf, |buf| {
610 Self::decode_new_records(buf, &batch_decode_info, version, records)
611 })?,
612 #[allow(unreachable_patterns)]
613 c => {
614 return Err(anyhow!(
615 "Support for {c:?} is not enabled as a cargo feature"
616 ))
617 }
618 };
619 }
620
621 Ok(compression)
622 }
623}
624
625impl Record {
626 fn encode_new<B: ByteBufMut>(
627 &self,
628 buf: &mut B,
629 min_offset: i64,
630 min_timestamp: i64,
631 options: &RecordEncodeOptions,
632 ) -> Result<()> {
633 let size = self.compute_size_new(min_offset, min_timestamp, options)?;
635 if size > i32::MAX as usize {
636 bail!("Record was too large to encode ({} bytes)", size);
637 }
638 types::VarInt.encode(buf, size as i32)?;
639
640 types::Int8.encode(buf, 0)?;
642
643 let timestamp_delta = self.timestamp - min_timestamp;
645 if timestamp_delta > i32::MAX as i64 || timestamp_delta < i32::MIN as i64 {
646 bail!(
647 "Timestamps within batch are too far apart ({}, {})",
648 min_timestamp,
649 self.timestamp
650 );
651 }
652 types::VarInt.encode(buf, timestamp_delta as i32)?;
653
654 let offset_delta = self.offset - min_offset;
656 if offset_delta > i32::MAX as i64 || offset_delta < i32::MIN as i64 {
657 bail!(
658 "Timestamps within batch are too far apart ({}, {})",
659 min_offset,
660 self.offset
661 );
662 }
663 types::VarInt.encode(buf, offset_delta as i32)?;
664
665 if let Some(k) = self.key.as_ref() {
667 if k.len() > i32::MAX as usize {
668 bail!("Record key was too large to encode ({} bytes)", k.len());
669 }
670 types::VarInt.encode(buf, k.len() as i32)?;
671 buf.put_slice(k);
672 } else {
673 types::VarInt.encode(buf, -1)?;
674 }
675
676 if let Some(v) = self.value.as_ref() {
678 if v.len() > i32::MAX as usize {
679 bail!("Record value was too large to encode ({} bytes)", v.len());
680 }
681 types::VarInt.encode(buf, v.len() as i32)?;
682 buf.put_slice(v);
683 } else {
684 types::VarInt.encode(buf, -1)?;
685 }
686
687 if self.headers.len() > i32::MAX as usize {
689 bail!("Too many record headers encode ({})", self.headers.len());
690 }
691 types::VarInt.encode(buf, self.headers.len() as i32)?;
692 for (k, v) in &self.headers {
693 if k.len() > i32::MAX as usize {
695 bail!(
696 "Record header key was too large to encode ({} bytes)",
697 k.len()
698 );
699 }
700 types::VarInt.encode(buf, k.len() as i32)?;
701
702 buf.put_slice(k.as_ref());
704
705 if let Some(v) = v.as_ref() {
707 if v.len() > i32::MAX as usize {
708 bail!(
709 "Record header value was too large to encode ({} bytes)",
710 v.len()
711 );
712 }
713 types::VarInt.encode(buf, v.len() as i32)?;
714 buf.put_slice(v);
715 } else {
716 types::VarInt.encode(buf, -1)?;
717 }
718 }
719
720 Ok(())
721 }
722 fn compute_size_new(
723 &self,
724 min_offset: i64,
725 min_timestamp: i64,
726 _options: &RecordEncodeOptions,
727 ) -> Result<usize> {
728 let mut total_size = 0;
729
730 total_size += types::Int8.compute_size(0)?;
732
733 let timestamp_delta = self.timestamp - min_timestamp;
735 if timestamp_delta > i32::MAX as i64 || timestamp_delta < i32::MIN as i64 {
736 bail!(
737 "Timestamps within batch are too far apart ({}, {})",
738 min_timestamp,
739 self.timestamp
740 );
741 }
742 total_size += types::VarInt.compute_size(timestamp_delta as i32)?;
743
744 let offset_delta = self.offset - min_offset;
746 if offset_delta > i32::MAX as i64 || offset_delta < i32::MIN as i64 {
747 bail!(
748 "Timestamps within batch are too far apart ({}, {})",
749 min_offset,
750 self.offset
751 );
752 }
753 total_size += types::VarInt.compute_size(offset_delta as i32)?;
754
755 if let Some(k) = self.key.as_ref() {
757 if k.len() > i32::MAX as usize {
758 bail!("Record key was too large to encode ({} bytes)", k.len());
759 }
760 total_size += types::VarInt.compute_size(k.len() as i32)?;
761 total_size += k.len();
762 } else {
763 total_size += types::VarInt.compute_size(-1)?;
764 }
765
766 if let Some(v) = self.value.as_ref() {
768 if v.len() > i32::MAX as usize {
769 bail!("Record value was too large to encode ({} bytes)", v.len());
770 }
771 total_size += types::VarInt.compute_size(v.len() as i32)?;
772 total_size += v.len();
773 } else {
774 total_size += types::VarInt.compute_size(-1)?;
775 }
776
777 if self.headers.len() > i32::MAX as usize {
779 bail!("Too many record headers encode ({})", self.headers.len());
780 }
781 total_size += types::VarInt.compute_size(self.headers.len() as i32)?;
782 for (k, v) in &self.headers {
783 if k.len() > i32::MAX as usize {
785 bail!(
786 "Record header key was too large to encode ({} bytes)",
787 k.len()
788 );
789 }
790 total_size += types::VarInt.compute_size(k.len() as i32)?;
791
792 total_size += k.len();
794
795 if let Some(v) = v.as_ref() {
797 if v.len() > i32::MAX as usize {
798 bail!(
799 "Record header value was too large to encode ({} bytes)",
800 v.len()
801 );
802 }
803 total_size += types::VarInt.compute_size(v.len() as i32)?;
804 total_size += v.len();
805 } else {
806 total_size += types::VarInt.compute_size(-1)?;
807 }
808 }
809
810 Ok(total_size)
811 }
812 fn decode_new<B: ByteBuf>(
813 buf: &mut B,
814 batch_decode_info: &BatchDecodeInfo,
815 _version: i8,
816 ) -> Result<Self> {
817 let size: i32 = types::VarInt.decode(buf)?;
819 if size < 0 {
820 bail!("Unexpected negative record size: {}", size);
821 }
822
823 let buf = &mut buf.try_get_bytes(size as usize)?;
825
826 let _attributes: i8 = types::Int8.decode(buf)?;
828
829 let timestamp_delta: i32 = types::VarInt.decode(buf)?;
831 let timestamp = batch_decode_info.min_timestamp + timestamp_delta as i64;
832
833 let offset_delta: i32 = types::VarInt.decode(buf)?;
835 let offset = batch_decode_info.min_offset + offset_delta as i64;
836 let sequence = batch_decode_info.base_sequence.wrapping_add(offset_delta);
837
838 let key_len: i32 = types::VarInt.decode(buf)?;
840 let key = match key_len.cmp(&-1) {
841 Ordering::Less => {
842 bail!("Unexpected negative record key length ({} bytes)", key_len);
843 }
844 Ordering::Equal => None,
845 Ordering::Greater => Some(buf.try_get_bytes(key_len as usize)?),
846 };
847
848 let value_len: i32 = types::VarInt.decode(buf)?;
850 let value = match value_len.cmp(&-1) {
851 Ordering::Less => {
852 bail!(
853 "Unexpected negative record value length ({} bytes)",
854 value_len
855 );
856 }
857 Ordering::Equal => None,
858 Ordering::Greater => Some(buf.try_get_bytes(value_len as usize)?),
859 };
860
861 let num_headers: i32 = types::VarInt.decode(buf)?;
863 if num_headers < 0 {
864 bail!("Unexpected negative record header count: {}", num_headers);
865 }
866 let num_headers = num_headers as usize;
867
868 let mut headers = IndexMap::with_capacity(num_headers);
869 for _ in 0..num_headers {
870 let key_len: i32 = types::VarInt.decode(buf)?;
872 if key_len < 0 {
873 bail!(
874 "Unexpected negative record header key length ({} bytes)",
875 key_len
876 );
877 }
878
879 let key = StrBytes::try_from(buf.try_get_bytes(key_len as usize)?)?;
881
882 let value_len: i32 = types::VarInt.decode(buf)?;
884
885 let value = match value_len.cmp(&-1) {
887 Ordering::Less => {
888 bail!(
889 "Unexpected negative record header value length ({} bytes)",
890 value_len
891 );
892 }
893 Ordering::Equal => None,
894 Ordering::Greater => Some(buf.try_get_bytes(value_len as usize)?),
895 };
896
897 headers.insert(key, value);
898 }
899
900 Ok(Self {
901 transactional: batch_decode_info.transactional,
902 control: batch_decode_info.control,
903 timestamp_type: batch_decode_info.timestamp_type,
904 partition_leader_epoch: batch_decode_info.partition_leader_epoch,
905 producer_id: batch_decode_info.producer_id,
906 producer_epoch: batch_decode_info.producer_epoch,
907 sequence,
908 offset,
909 timestamp,
910 key,
911 value,
912 headers,
913 })
914 }
915}
916
917#[cfg(test)]
918mod tests {
919 use bytes::Bytes;
920
921 use super::*;
922
923 #[test]
924 fn lookup_header_via_u8_slice() {
925 let record = Record {
926 transactional: false,
927 control: false,
928 partition_leader_epoch: 0,
929 producer_id: 0,
930 producer_epoch: 0,
931 sequence: 0,
932 timestamp_type: TimestampType::Creation,
933 offset: Default::default(),
934 timestamp: Default::default(),
935 key: Default::default(),
936 value: Default::default(),
937 headers: [
938 ("some-key".into(), Some("some-value".into())),
939 ("other-header".into(), None),
940 ]
941 .into(),
942 };
943 assert_eq!(
944 Bytes::from("some-value"),
945 record
946 .headers
947 .get("some-key".as_bytes())
949 .expect("key exists in headers")
950 .as_ref()
951 .expect("value is present")
952 );
953 }
954
955 #[test]
956 fn decode_record_header_no_value() {
957 let record = Record {
958 transactional: false,
959 control: false,
960 partition_leader_epoch: 0,
961 producer_id: 0,
962 producer_epoch: 0,
963 sequence: 0,
964 timestamp_type: TimestampType::Creation,
965 offset: Default::default(),
966 timestamp: Default::default(),
967 key: Default::default(),
968 value: Default::default(),
969 headers: [("other-header".into(), None)].into(),
970 };
971 let mut buf = &mut bytes::BytesMut::new();
972 record
973 .encode_new(
974 buf,
975 0,
976 0,
977 &RecordEncodeOptions {
978 version: 2,
979 compression: super::Compression::None,
980 },
981 )
982 .expect("encode works");
983
984 Record::decode_new(
985 &mut buf,
986 &BatchDecodeInfo {
987 record_count: 1,
988 timestamp_type: TimestampType::Creation,
989 min_offset: 0,
990 min_timestamp: 0,
991 base_sequence: 0,
992 transactional: false,
993 control: false,
994 partition_leader_epoch: 0,
995 producer_id: 0,
996 producer_epoch: 0,
997 },
998 2,
999 )
1000 .expect("decode works");
1001 }
1002}