1use std::io::{Cursor, Read, Write};
23
24#[cfg(test)]
25use proptest::prelude::*;
26
27use super::{
28 primitives::{Int16, Int32, Int64, Int8, Varint, Varlong},
29 traits::{ReadError, ReadType, WriteError, WriteType},
30 vec_builder::VecBuilder,
31};
32
33#[derive(Debug, PartialEq, Eq)]
38#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
39pub struct RecordHeader {
40 pub key: String,
41 pub value: Vec<u8>,
42}
43
44impl<R> ReadType<R> for RecordHeader
45where
46 R: Read,
47{
48 fn read(reader: &mut R) -> Result<Self, ReadError> {
49 let len = Varint::read(reader)?;
51 let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
52 let mut buf = VecBuilder::new(len);
53 buf = buf.read_exact(reader)?;
54 let key = String::from_utf8(buf.into()).map_err(|e| ReadError::Malformed(Box::new(e)))?;
55
56 let len = Varint::read(reader)?;
58 let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
59 let mut value = VecBuilder::new(len);
60 value = value.read_exact(reader)?;
61 let value = value.into();
62
63 Ok(Self { key, value })
64 }
65}
66
67impl<W> WriteType<W> for RecordHeader
68where
69 W: Write,
70{
71 fn write(&self, writer: &mut W) -> Result<(), WriteError> {
72 let l = i32::try_from(self.key.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
74 Varint(l).write(writer)?;
75 writer.write_all(self.key.as_bytes())?;
76
77 let l = i32::try_from(self.value.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
79 Varint(l).write(writer)?;
80 writer.write_all(&self.value)?;
81
82 Ok(())
83 }
84}
85
86#[derive(Debug, PartialEq, Eq)]
91#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
92pub struct Record {
93 pub timestamp_delta: i64,
94 pub offset_delta: i32,
95 pub key: Option<Vec<u8>>,
96 pub value: Option<Vec<u8>>,
97 pub headers: Vec<RecordHeader>,
98}
99
100impl<R> ReadType<R> for Record
101where
102 R: Read,
103{
104 fn read(reader: &mut R) -> Result<Self, ReadError> {
105 let len = Varint::read(reader)?;
107 let len = u64::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
108 let reader = &mut reader.take(len);
109
110 Int8::read(reader)?;
112
113 let timestamp_delta = Varlong::read(reader)?.0;
115
116 let offset_delta = Varint::read(reader)?.0;
118
119 let len = Varint::read(reader)?.0;
121 let key = if len == -1 {
122 None
123 } else {
124 let len = usize::try_from(len).map_err(|e| ReadError::Malformed(Box::new(e)))?;
125 let mut key = VecBuilder::new(len);
126 key = key.read_exact(reader)?;
127 Some(key.into())
128 };
129
130 let len = Varint::read(reader)?.0;
132 let value = if len == -1 {
133 None
134 } else {
135 let len = usize::try_from(len).map_err(|e| ReadError::Malformed(Box::new(e)))?;
136 let mut value = VecBuilder::new(len);
137 value = value.read_exact(reader)?;
138 Some(value.into())
139 };
140
141 let n_headers = Varint::read(reader)?;
144 let n_headers =
145 usize::try_from(n_headers.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
146 let mut headers = VecBuilder::new(n_headers);
147 for _ in 0..n_headers {
148 headers.push(RecordHeader::read(reader)?);
149 }
150
151 if reader.limit() != 0 {
153 return Err(ReadError::Malformed(
154 format!("Found {} trailing bytes after Record", reader.limit()).into(),
155 ));
156 }
157
158 Ok(Self {
159 timestamp_delta,
160 offset_delta,
161 key,
162 value,
163 headers: headers.into(),
164 })
165 }
166}
167
168impl<W> WriteType<W> for Record
169where
170 W: Write,
171{
172 fn write(&self, writer: &mut W) -> Result<(), WriteError> {
173 let mut data = vec![];
177
178 Int8(0).write(&mut data)?;
180
181 Varlong(self.timestamp_delta).write(&mut data)?;
183
184 Varint(self.offset_delta).write(&mut data)?;
186
187 match &self.key {
189 Some(key) => {
190 let l = i32::try_from(key.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
191 Varint(l).write(&mut data)?;
192 data.write_all(key)?;
193 }
194 None => {
195 Varint(-1).write(&mut data)?;
196 }
197 }
198
199 match &self.value {
201 Some(value) => {
202 let l =
203 i32::try_from(value.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
204 Varint(l).write(&mut data)?;
205 data.write_all(value)?;
206 }
207 None => {
208 Varint(-1).write(&mut data)?;
209 }
210 }
211
212 let l =
215 i32::try_from(self.headers.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
216 Varint(l).write(&mut data)?;
217 for header in &self.headers {
218 header.write(&mut data)?;
219 }
220
221 let l = i32::try_from(data.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
226 Varint(l).write(writer)?;
227 writer.write_all(&data)?;
228
229 Ok(())
230 }
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq)]
238#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
239pub enum ControlBatchRecord {
240 Abort,
241 Commit,
242}
243
244impl<R> ReadType<R> for ControlBatchRecord
245where
246 R: Read,
247{
248 fn read(reader: &mut R) -> Result<Self, ReadError> {
249 let version = Int16::read(reader)?.0;
251 if version != 0 {
252 return Err(ReadError::Malformed(
253 format!("Unknown control batch record version: {}", version).into(),
254 ));
255 }
256
257 let t = Int16::read(reader)?.0;
259 match t {
260 0 => Ok(Self::Abort),
261 1 => Ok(Self::Commit),
262 _ => Err(ReadError::Malformed(
263 format!("Unknown control batch record type: {}", t).into(),
264 )),
265 }
266 }
267}
268
269impl<W> WriteType<W> for ControlBatchRecord
270where
271 W: Write,
272{
273 fn write(&self, writer: &mut W) -> Result<(), WriteError> {
274 Int16(0).write(writer)?;
276
277 let t = match self {
279 Self::Abort => 0,
280 Self::Commit => 1,
281 };
282 Int16(t).write(writer)?;
283
284 Ok(())
285 }
286}
287
288#[derive(Debug, PartialEq, Eq)]
289#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
290pub enum ControlBatchOrRecords {
291 ControlBatch(ControlBatchRecord),
292
293 #[cfg_attr(
295 test,
296 proptest(
297 strategy = "prop::collection::vec(any::<Record>(), 0..2).prop_map(ControlBatchOrRecords::Records)"
298 )
299 )]
300 Records(Vec<Record>),
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq)]
304#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
305pub enum RecordBatchCompression {
306 NoCompression,
307 Gzip,
308 Snappy,
309 Lz4,
310 Zstd,
311}
312
313#[derive(Debug, Clone, Copy, PartialEq, Eq)]
318#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
319pub enum RecordBatchTimestampType {
320 CreateTime,
321 LogAppendTime,
322}
323
324#[derive(Debug, PartialEq, Eq)]
329#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
330pub struct RecordBatch {
331 pub base_offset: i64,
332 pub partition_leader_epoch: i32,
333 pub last_offset_delta: i32,
334 pub first_timestamp: i64,
335 pub max_timestamp: i64,
336 pub producer_id: i64,
337 pub producer_epoch: i16,
338 pub base_sequence: i32,
339 pub records: ControlBatchOrRecords,
340 pub compression: RecordBatchCompression,
341 pub is_transactional: bool,
342 pub timestamp_type: RecordBatchTimestampType,
343}
344
345impl<R> ReadType<R> for RecordBatch
346where
347 R: Read,
348{
349 fn read(reader: &mut R) -> Result<Self, ReadError> {
350 let base_offset = Int64::read(reader)?.0;
352
353 let len = Int32::read(reader)?;
358 let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
359 let len = len
360 .checked_sub(
361 4 + 1 + 4, )
365 .ok_or_else(|| {
366 ReadError::Malformed(format!("Record batch len too small: {}", len).into())
367 })?;
368
369 let partition_leader_epoch = Int32::read(reader)?.0;
371
372 let magic = Int8::read(reader)?.0;
374 if magic != 2 {
375 return Err(ReadError::Malformed(
376 format!("Invalid magic number in record batch: {}", magic).into(),
377 ));
378 }
379
380 let crc = Int32::read(reader)?.0;
382 let crc = u32::from_be_bytes(crc.to_be_bytes());
383
384 let mut data = VecBuilder::new(len);
386 data = data.read_exact(reader)?;
387 let data: Vec<u8> = data.into();
388 let actual_crc = crc32c::crc32c(&data);
389 if crc != actual_crc {
390 return Err(ReadError::Malformed(
391 format!("CRC error, got 0x{:x}, expected 0x{:x}", actual_crc, crc).into(),
392 ));
393 }
394
395 let mut data = Cursor::new(data);
398 let body = RecordBatchBody::read(&mut data)?;
399
400 let bytes_read = data.position();
402 let bytes_total = u64::try_from(data.into_inner().len()).map_err(ReadError::Overflow)?;
403 let bytes_left = bytes_total - bytes_read;
404 if bytes_left != 0 {
405 return Err(ReadError::Malformed(
406 format!("Found {} trailing bytes after RecordBatch", bytes_left).into(),
407 ));
408 }
409
410 Ok(Self {
414 base_offset,
415 partition_leader_epoch,
416 last_offset_delta: body.last_offset_delta,
417 first_timestamp: body.first_timestamp,
418 max_timestamp: body.max_timestamp,
419 producer_id: body.producer_id,
420 producer_epoch: body.producer_epoch,
421 base_sequence: body.base_sequence,
422 compression: body.compression,
423 timestamp_type: body.timestamp_type,
424 is_transactional: body.is_transactional,
425 records: body.records,
426 })
427 }
428}
429
430impl<W> WriteType<W> for RecordBatch
431where
432 W: Write,
433{
434 fn write(&self, writer: &mut W) -> Result<(), WriteError> {
435 let mut data = vec![];
439 let body_ref = RecordBatchBodyRef {
440 last_offset_delta: self.last_offset_delta,
441 first_timestamp: self.first_timestamp,
442 max_timestamp: self.max_timestamp,
443 producer_id: self.producer_id,
444 producer_epoch: self.producer_epoch,
445 base_sequence: self.base_sequence,
446 records: &self.records,
447 compression: self.compression,
448 is_transactional: self.is_transactional,
449 timestamp_type: self.timestamp_type,
450 };
451 body_ref.write(&mut data)?;
452
453 Int64(self.base_offset).write(writer)?;
458
459 let l = i32::try_from(
467 data.len()
468 + 4 + 1 + 4, )
472 .map_err(|e| WriteError::Malformed(Box::new(e)))?;
473 Int32(l).write(writer)?;
474
475 Int32(self.partition_leader_epoch).write(writer)?;
477
478 Int8(2).write(writer)?;
480
481 let crc = crc32c::crc32c(&data);
486 let crc = i32::from_be_bytes(crc.to_be_bytes());
487 Int32(crc).write(writer)?;
488
489 writer.write_all(&data)?;
491
492 Ok(())
493 }
494}
495
496#[derive(Debug, PartialEq, Eq)]
498#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
499pub struct RecordBatchBody {
500 pub last_offset_delta: i32,
501 pub first_timestamp: i64,
502 pub max_timestamp: i64,
503 pub producer_id: i64,
504 pub producer_epoch: i16,
505 pub base_sequence: i32,
506 pub records: ControlBatchOrRecords,
507 pub compression: RecordBatchCompression,
508 pub is_transactional: bool,
509 pub timestamp_type: RecordBatchTimestampType,
510}
511
512impl RecordBatchBody {
513 fn read_records<R>(
514 reader: &mut R,
515 is_control: bool,
516 n_records: usize,
517 ) -> Result<ControlBatchOrRecords, ReadError>
518 where
519 R: Read,
520 {
521 if is_control {
522 if n_records != 1 {
523 return Err(ReadError::Malformed(
524 format!("Expected 1 control record but got {}", n_records).into(),
525 ));
526 }
527
528 let record = ControlBatchRecord::read(reader)?;
529 Ok(ControlBatchOrRecords::ControlBatch(record))
530 } else {
531 let mut records = VecBuilder::new(n_records);
532 for _ in 0..n_records {
533 records.push(Record::read(reader)?);
534 }
535 Ok(ControlBatchOrRecords::Records(records.into()))
536 }
537 }
538}
539
540impl<R> ReadType<R> for RecordBatchBody
541where
542 R: Read,
543{
544 fn read(reader: &mut R) -> Result<Self, ReadError> {
545 let attributes = Int16::read(reader)?.0;
547 let compression = match attributes & 0x7 {
548 0 => RecordBatchCompression::NoCompression,
549 1 => RecordBatchCompression::Gzip,
550 2 => RecordBatchCompression::Snappy,
551 3 => RecordBatchCompression::Lz4,
552 4 => RecordBatchCompression::Zstd,
553 other => {
554 return Err(ReadError::Malformed(
555 format!("Invalid compression type: {}", other).into(),
556 ));
557 }
558 };
559 let timestamp_type = if ((attributes >> 3) & 0x1) == 0 {
560 RecordBatchTimestampType::CreateTime
561 } else {
562 RecordBatchTimestampType::LogAppendTime
563 };
564 let is_transactional = ((attributes >> 4) & 0x1) == 1;
565 let is_control = ((attributes >> 5) & 0x1) == 1;
566
567 let last_offset_delta = Int32::read(reader)?.0;
569
570 let first_timestamp = Int64::read(reader)?.0;
572
573 let max_timestamp = Int64::read(reader)?.0;
575
576 let producer_id = Int64::read(reader)?.0;
578
579 let producer_epoch = Int16::read(reader)?.0;
581
582 let base_sequence = Int32::read(reader)?.0;
584
585 let n_records = match Int32::read(reader)?.0 {
587 -1 => 0,
588 n => usize::try_from(n)?,
589 };
590 let records = match compression {
591 RecordBatchCompression::NoCompression => {
592 Self::read_records(reader, is_control, n_records)?
593 }
594 #[cfg(feature = "compression-gzip")]
595 RecordBatchCompression::Gzip => {
596 use flate2::read::GzDecoder;
597
598 let mut decoder = GzDecoder::new(reader);
599 let records = Self::read_records(&mut decoder, is_control, n_records)?;
600
601 ensure_eof(&mut decoder, "Data left in gzip block")?;
602
603 records
604 }
605 #[cfg(feature = "compression-lz4")]
606 RecordBatchCompression::Lz4 => {
607 use lz4::Decoder;
608
609 let mut decoder = Decoder::new(reader)?;
610 let records = Self::read_records(&mut decoder, is_control, n_records)?;
611
612 ensure_eof(&mut decoder, "Data left in LZ4 block")?;
614
615 let (_reader, res) = decoder.finish();
616 res?;
617
618 records
619 }
620 #[cfg(feature = "compression-snappy")]
621 RecordBatchCompression::Snappy => {
622 use crate::protocol::vec_builder::DEFAULT_BLOCK_SIZE;
623
624 let mut input = vec![];
626 reader.read_to_end(&mut input)?;
627
628 const JAVA_MAGIC: &[u8] = &[0x82, b'S', b'N', b'A', b'P', b'P', b'Y', 0];
629
630 let output = if input.starts_with(JAVA_MAGIC) {
633 let cursor_content = &input[JAVA_MAGIC.len()..];
634 let mut cursor = Cursor::new(cursor_content);
635
636 let mut buf_version = [0u8; 4];
637 cursor.read_exact(&mut buf_version)?;
638 if buf_version != [0, 0, 0, 1] {
639 return Err(ReadError::Malformed(
640 format!("Detected Java-specific Snappy compression, but got unknown version: {buf_version:?}").into(),
641 ));
642 }
643
644 let mut buf_compatible = [0u8; 4];
645 cursor.read_exact(&mut buf_compatible)?;
646 if buf_compatible != [0, 0, 0, 1] {
647 return Err(ReadError::Malformed(
648 format!("Detected Java-specific Snappy compression, but got unknown compat flags: {buf_compatible:?}").into(),
649 ));
650 }
651
652 let mut output = vec![];
653 while cursor.position() < cursor.get_ref().len() as u64 {
654 let mut buf_chunk_length = [0u8; 4];
655 cursor.read_exact(&mut buf_chunk_length)?;
656 let chunk_length = u32::from_be_bytes(buf_chunk_length) as usize;
657 let bytes_left = cursor_content.len() - (cursor.position() as usize);
658 if chunk_length > bytes_left {
659 return Err(ReadError::Malformed(format!("Java-specific Snappy-compressed data has illegal chunk length, got {chunk_length} bytes but only {bytes_left} bytes are left.").into()));
661 }
662
663 let mut chunk_data = vec![0u8; chunk_length];
664 cursor.read_exact(&mut chunk_data)?;
665
666 let mut buf = carefully_decompress_snappy(&chunk_data, DEFAULT_BLOCK_SIZE)?;
667 output.append(&mut buf);
668 }
669
670 output
671 } else {
672 carefully_decompress_snappy(&input, DEFAULT_BLOCK_SIZE)?
673 };
674
675 let mut decoder = Cursor::new(output);
677 let records = Self::read_records(&mut decoder, is_control, n_records)?;
678
679 ensure_eof(&mut decoder, "Data left in Snappy block")?;
681
682 records
683 }
684 #[cfg(feature = "compression-zstd")]
685 RecordBatchCompression::Zstd => {
686 use zstd::Decoder;
687
688 let mut decoder = Decoder::new(reader)?;
689 let records = Self::read_records(&mut decoder, is_control, n_records)?;
690
691 ensure_eof(&mut decoder, "Data left in zstd block")?;
692
693 records
694 }
695 #[allow(unreachable_patterns)]
696 _ => {
697 return Err(ReadError::Malformed(
698 format!("Unimplemented compression: {:?}", compression).into(),
699 ));
700 }
701 };
702
703 Ok(Self {
704 last_offset_delta,
705 first_timestamp,
706 max_timestamp,
707 producer_id,
708 producer_epoch,
709 base_sequence,
710 compression,
711 timestamp_type,
712 is_transactional,
713 records,
714 })
715 }
716}
717
718impl<W> WriteType<W> for RecordBatchBody
719where
720 W: Write,
721{
722 fn write(&self, writer: &mut W) -> Result<(), WriteError> {
723 let body_ref = RecordBatchBodyRef {
724 last_offset_delta: self.last_offset_delta,
725 first_timestamp: self.first_timestamp,
726 max_timestamp: self.max_timestamp,
727 producer_id: self.producer_id,
728 producer_epoch: self.producer_epoch,
729 base_sequence: self.base_sequence,
730 records: &self.records,
731 compression: self.compression,
732 is_transactional: self.is_transactional,
733 timestamp_type: self.timestamp_type,
734 };
735 body_ref.write(writer)
736 }
737}
738
739#[derive(Debug)]
743struct RecordBatchBodyRef<'a> {
744 pub last_offset_delta: i32,
745 pub first_timestamp: i64,
746 pub max_timestamp: i64,
747 pub producer_id: i64,
748 pub producer_epoch: i16,
749 pub base_sequence: i32,
750 pub records: &'a ControlBatchOrRecords,
751 pub compression: RecordBatchCompression,
752 pub is_transactional: bool,
753 pub timestamp_type: RecordBatchTimestampType,
754}
755
756impl<'a> RecordBatchBodyRef<'a> {
757 fn write_records<W>(writer: &mut W, records: &ControlBatchOrRecords) -> Result<(), WriteError>
758 where
759 W: Write,
760 {
761 match records {
762 ControlBatchOrRecords::ControlBatch(control_batch) => control_batch.write(writer),
763 ControlBatchOrRecords::Records(records) => {
764 for record in records {
765 record.write(writer)?;
766 }
767 Ok(())
768 }
769 }
770 }
771}
772
773impl<'a, W> WriteType<W> for RecordBatchBodyRef<'a>
774where
775 W: Write,
776{
777 fn write(&self, writer: &mut W) -> Result<(), WriteError> {
778 let mut attributes: i16 = match self.compression {
780 RecordBatchCompression::NoCompression => 0,
781 RecordBatchCompression::Gzip => 1,
782 RecordBatchCompression::Snappy => 2,
783 RecordBatchCompression::Lz4 => 3,
784 RecordBatchCompression::Zstd => 4,
785 };
786 match self.timestamp_type {
787 RecordBatchTimestampType::CreateTime => (),
788 RecordBatchTimestampType::LogAppendTime => {
789 attributes |= 1 << 3;
790 }
791 }
792 if self.is_transactional {
793 attributes |= 1 << 4;
794 }
795 if matches!(self.records, ControlBatchOrRecords::ControlBatch(_)) {
796 attributes |= 1 << 5;
797 }
798 Int16(attributes).write(writer)?;
799
800 Int32(self.last_offset_delta).write(writer)?;
802
803 Int64(self.first_timestamp).write(writer)?;
805
806 Int64(self.max_timestamp).write(writer)?;
808
809 Int64(self.producer_id).write(writer)?;
811
812 Int16(self.producer_epoch).write(writer)?;
814
815 Int32(self.base_sequence).write(writer)?;
817
818 let n_records = match &self.records {
820 ControlBatchOrRecords::ControlBatch(_) => 1,
821 ControlBatchOrRecords::Records(records) => records.len(),
822 };
823 Int32(i32::try_from(n_records)?).write(writer)?;
824 match self.compression {
825 RecordBatchCompression::NoCompression => {
826 Self::write_records(writer, self.records)?;
827 }
828 #[cfg(feature = "compression-gzip")]
829 RecordBatchCompression::Gzip => {
830 use flate2::{write::GzEncoder, Compression};
831
832 let mut encoder = GzEncoder::new(writer, Compression::default());
833 Self::write_records(&mut encoder, self.records)?;
834 encoder.finish()?;
835 }
836 #[cfg(feature = "compression-lz4")]
837 RecordBatchCompression::Lz4 => {
838 use lz4::{liblz4::BlockMode, EncoderBuilder};
839
840 let mut encoder = EncoderBuilder::new()
841 .block_mode(
842 BlockMode::Independent,
844 )
845 .build(writer)?;
846 Self::write_records(&mut encoder, self.records)?;
847 let (_writer, res) = encoder.finish();
848 res?;
849 }
850 #[cfg(feature = "compression-snappy")]
851 RecordBatchCompression::Snappy => {
852 use snap::raw::{max_compress_len, Encoder};
853
854 let mut input = vec![];
855 Self::write_records(&mut input, self.records)?;
856
857 let mut encoder = Encoder::new();
858 let mut output = vec![0; max_compress_len(input.len())];
859 let len = encoder
860 .compress(&input, &mut output)
861 .map_err(|e| WriteError::Malformed(Box::new(e)))?;
862
863 writer.write_all(&output[..len])?;
864 }
865 #[cfg(feature = "compression-zstd")]
866 RecordBatchCompression::Zstd => {
867 use zstd::Encoder;
868
869 let mut encoder = Encoder::new(writer, 0)?;
870 Self::write_records(&mut encoder, self.records)?;
871 encoder.finish()?;
872 }
873 #[allow(unreachable_patterns)]
874 _ => {
875 return Err(WriteError::Malformed(
876 format!("Unimplemented compression: {:?}", self.compression).into(),
877 ));
878 }
879 }
880
881 Ok(())
882 }
883}
884
885#[allow(dead_code)] fn ensure_eof<R>(reader: &mut R, msg: &str) -> Result<(), ReadError>
888where
889 R: Read,
890{
891 let mut buf = [0u8; 1];
892 match reader.read(&mut buf) {
893 Ok(0) => Ok(()),
894 Ok(_) => Err(ReadError::Malformed(msg.to_string().into())),
895 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(()),
896 Err(e) => Err(ReadError::IO(e)),
897 }
898}
899
900#[cfg(feature = "compression-snappy")]
903fn carefully_decompress_snappy(
904 input: &[u8],
905 start_block_size: usize,
906) -> Result<Vec<u8>, ReadError> {
907 use crate::protocol::primitives::UnsignedVarint;
908 use snap::raw::{decompress_len, Decoder};
909
910 if input.is_empty() {
912 return Err(ReadError::Malformed(Box::new(snap::Error::Empty)));
913 }
914
915 let uncompressed_size = decompress_len(input).map_err(|e| ReadError::Malformed(Box::new(e)))?;
921
922 let uncompressed_size_encoded_length = {
924 let mut buf = Vec::with_capacity(100);
925 UnsignedVarint(uncompressed_size as u64)
926 .write(&mut buf)
927 .expect("this write should never fail");
928 buf.len()
929 };
930
931 let mut max_uncompressed_size = start_block_size;
935
936 loop {
938 let try_uncompressed_size = uncompressed_size.min(max_uncompressed_size);
939
940 let try_input = {
944 let mut buf = Cursor::new(Vec::with_capacity(input.len()));
945 UnsignedVarint(try_uncompressed_size as u64)
946 .write(&mut buf)
947 .expect("this write should never fail");
948 buf.write_all(&input[uncompressed_size_encoded_length..])
949 .expect("this write should never fail");
950 buf.into_inner()
951 };
952
953 let mut decoder = Decoder::new();
954 let mut output = vec![0; try_uncompressed_size];
955 let actual_uncompressed_size = match decoder.decompress(&try_input, &mut output) {
956 Ok(size) => size,
957 Err(e) => {
958 let looks_like_dst_too_small = match e {
959 snap::Error::CopyWrite { .. } => true,
961
962 snap::Error::Literal {
964 len,
965 dst_len,
966 src_len,
967 } => (dst_len < len) && (src_len >= len),
968
969 snap::Error::HeaderMismatch {
972 expected_len,
973 got_len,
974 } => expected_len < got_len,
975
976 snap::Error::BufferTooSmall { .. } => {
978 unreachable!("Just allocated a correctly-sized output buffer.")
979 }
980
981 snap::Error::Offset { .. } => false,
984
985 _ => false,
987 };
988 let used_smaller_dst = max_uncompressed_size < uncompressed_size;
989
990 if looks_like_dst_too_small && used_smaller_dst {
991 max_uncompressed_size *= 2;
993 continue;
994 } else {
995 return Err(ReadError::Malformed(Box::new(e)));
996 }
997 }
998 };
999 if actual_uncompressed_size != uncompressed_size {
1000 return Err(ReadError::Malformed(
1001 "broken snappy data".to_string().into(),
1002 ));
1003 }
1004
1005 break Ok(output);
1006 }
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011 use std::io::Cursor;
1012
1013 use crate::protocol::test_utils::test_roundtrip;
1014
1015 use super::*;
1016
1017 use assert_matches::assert_matches;
1018
1019 test_roundtrip!(RecordHeader, test_record_header_roundtrip);
1020
1021 test_roundtrip!(Record, test_record_roundtrip);
1022
1023 test_roundtrip!(ControlBatchRecord, test_control_batch_record_roundtrip);
1024
1025 #[test]
1026 fn test_control_batch_record_unknown_version() {
1027 let mut buf = Cursor::new(Vec::<u8>::new());
1028 Int16(1).write(&mut buf).unwrap();
1029 Int16(0).write(&mut buf).unwrap();
1030 buf.set_position(0);
1031
1032 let err = ControlBatchRecord::read(&mut buf).unwrap_err();
1033 assert_matches!(err, ReadError::Malformed(_));
1034 assert_eq!(
1035 err.to_string(),
1036 "Malformed data: Unknown control batch record version: 1",
1037 );
1038 }
1039
1040 #[test]
1041 fn test_control_batch_record_unknown_type() {
1042 let mut buf = Cursor::new(Vec::<u8>::new());
1043 Int16(0).write(&mut buf).unwrap();
1044 Int16(2).write(&mut buf).unwrap();
1045 buf.set_position(0);
1046
1047 let err = ControlBatchRecord::read(&mut buf).unwrap_err();
1048 assert_matches!(err, ReadError::Malformed(_));
1049 assert_eq!(
1050 err.to_string(),
1051 "Malformed data: Unknown control batch record type: 2",
1052 );
1053 }
1054
1055 test_roundtrip!(RecordBatchBody, test_record_batch_body_roundtrip);
1056
1057 test_roundtrip!(RecordBatch, test_record_batch_roundtrip);
1058
1059 #[test]
1060 fn test_decode_fixture_nocompression() {
1061 let data = [
1063 b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x4b\x00\x00\x00\x00".to_vec(),
1064 b"\x02\x27\x24\xfe\xcd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x61".to_vec(),
1065 b"\xd5\x9b\x77\x00\x00\x00\x00\x61\xd5\x9b\x77\xff\xff\xff\xff\xff".to_vec(),
1066 b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x32\x00\x00".to_vec(),
1067 b"\x00\x00\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02\x06".to_vec(),
1068 b"\x66\x6f\x6f\x06\x62\x61\x72".to_vec(),
1069 ]
1070 .concat();
1071
1072 let actual = RecordBatch::read(&mut Cursor::new(data.clone())).unwrap();
1073 let expected = RecordBatch {
1074 base_offset: 0,
1075 partition_leader_epoch: 0,
1076 last_offset_delta: 0,
1077 first_timestamp: 1641388919,
1078 max_timestamp: 1641388919,
1079 producer_id: -1,
1080 producer_epoch: -1,
1081 base_sequence: -1,
1082 records: ControlBatchOrRecords::Records(vec![Record {
1083 timestamp_delta: 0,
1084 offset_delta: 0,
1085 key: Some(vec![]),
1086 value: Some(b"hello kafka".to_vec()),
1087 headers: vec![RecordHeader {
1088 key: "foo".to_owned(),
1089 value: b"bar".to_vec(),
1090 }],
1091 }]),
1092 compression: RecordBatchCompression::NoCompression,
1093 is_transactional: false,
1094 timestamp_type: RecordBatchTimestampType::CreateTime,
1095 };
1096 assert_eq!(actual, expected);
1097
1098 let mut data2 = vec![];
1099 actual.write(&mut data2).unwrap();
1100 assert_eq!(data, data2);
1101 }
1102
1103 #[cfg(feature = "compression-gzip")]
1104 #[test]
1105 fn test_decode_fixture_gzip() {
1106 let data = [
1108 b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x64\x00\x00\x00\x00".to_vec(),
1109 b"\x02\xba\x41\x46\x65\x00\x01\x00\x00\x00\x00\x00\x00\x01\x7e\x90".to_vec(),
1110 b"\xb3\x34\x67\x00\x00\x01\x7e\x90\xb3\x34\x67\xff\xff\xff\xff\xff".to_vec(),
1111 b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x1f\x8b\x08".to_vec(),
1112 b"\x00\x00\x00\x00\x00\x00\x03\xfb\xc3\xc8\xc0\xc0\x70\x82\xb1\x82".to_vec(),
1113 b"\x0e\x40\x2c\x23\x35\x27\x27\x5f\x21\x3b\x31\x2d\x3b\x91\x89\x2d".to_vec(),
1114 b"\x2d\x3f\x9f\x2d\x29\xb1\x08\x00\xe4\xcd\xba\x1f\x80\x00\x00\x00".to_vec(),
1115 ]
1116 .concat();
1117
1118 let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
1119 let expected = RecordBatch {
1120 base_offset: 0,
1121 partition_leader_epoch: 0,
1122 last_offset_delta: 0,
1123 first_timestamp: 1643105170535,
1124 max_timestamp: 1643105170535,
1125 producer_id: -1,
1126 producer_epoch: -1,
1127 base_sequence: -1,
1128 records: ControlBatchOrRecords::Records(vec![Record {
1129 timestamp_delta: 0,
1130 offset_delta: 0,
1131 key: Some(vec![b'x'; 100]),
1132 value: Some(b"hello kafka".to_vec()),
1133 headers: vec![RecordHeader {
1134 key: "foo".to_owned(),
1135 value: b"bar".to_vec(),
1136 }],
1137 }]),
1138 compression: RecordBatchCompression::Gzip,
1139 is_transactional: false,
1140 timestamp_type: RecordBatchTimestampType::CreateTime,
1141 };
1142 assert_eq!(actual, expected);
1143
1144 let mut data2 = vec![];
1145 actual.write(&mut data2).unwrap();
1146
1147 let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
1150 assert_eq!(actual2, expected);
1151 }
1152
1153 #[cfg(feature = "compression-lz4")]
1154 #[test]
1155 fn test_decode_fixture_lz4() {
1156 let data = [
1158 b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x63\x00\x00\x00\x00".to_vec(),
1159 b"\x02\x1b\xa5\x92\x35\x00\x03\x00\x00\x00\x00\x00\x00\x01\x7e\xb1".to_vec(),
1160 b"\x1f\xc7\x24\x00\x00\x01\x7e\xb1\x1f\xc7\x24\xff\xff\xff\xff\xff".to_vec(),
1161 b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x04\x22\x4d".to_vec(),
1162 b"\x18\x60\x40\x82\x23\x00\x00\x00\x8f\xfc\x01\x00\x00\x00\xc8\x01".to_vec(),
1163 b"\x78\x01\x00\x50\xf0\x06\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66".to_vec(),
1164 b"\x6b\x61\x02\x06\x66\x6f\x6f\x06\x62\x61\x72\x00\x00\x00\x00".to_vec(),
1165 ]
1166 .concat();
1167
1168 let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
1169 let expected = RecordBatch {
1170 base_offset: 0,
1171 partition_leader_epoch: 0,
1172 last_offset_delta: 0,
1173 first_timestamp: 1643649156900,
1174 max_timestamp: 1643649156900,
1175 producer_id: -1,
1176 producer_epoch: -1,
1177 base_sequence: -1,
1178 records: ControlBatchOrRecords::Records(vec![Record {
1179 timestamp_delta: 0,
1180 offset_delta: 0,
1181 key: Some(vec![b'x'; 100]),
1182 value: Some(b"hello kafka".to_vec()),
1183 headers: vec![RecordHeader {
1184 key: "foo".to_owned(),
1185 value: b"bar".to_vec(),
1186 }],
1187 }]),
1188 compression: RecordBatchCompression::Lz4,
1189 is_transactional: false,
1190 timestamp_type: RecordBatchTimestampType::CreateTime,
1191 };
1192 assert_eq!(actual, expected);
1193
1194 let mut data2 = vec![];
1195 actual.write(&mut data2).unwrap();
1196
1197 let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
1200 assert_eq!(actual2, expected);
1201 }
1202
1203 #[cfg(feature = "compression-snappy")]
1204 mod snappy {
1205 use super::*;
1206
1207 #[test]
1208 fn test_decode_fixture_snappy() {
1209 let data = [
1211 b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x58\x00\x00\x00\x00".to_vec(),
1212 b"\x02\xad\x86\xf4\xf4\x00\x02\x00\x00\x00\x00\x00\x00\x01\x7e\xb6".to_vec(),
1213 b"\x45\x0e\x52\x00\x00\x01\x7e\xb6\x45\x0e\x52\xff\xff\xff\xff\xff".to_vec(),
1214 b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x80\x01\x1c".to_vec(),
1215 b"\xfc\x01\x00\x00\x00\xc8\x01\x78\xfe\x01\x00\x8a\x01\x00\x50\x16".to_vec(),
1216 b"\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02\x06\x66\x6f\x6f".to_vec(),
1217 b"\x06\x62\x61\x72".to_vec(),
1218 ]
1219 .concat();
1220
1221 let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
1222 let expected = RecordBatch {
1223 base_offset: 0,
1224 partition_leader_epoch: 0,
1225 last_offset_delta: 0,
1226 first_timestamp: 1643735486034,
1227 max_timestamp: 1643735486034,
1228 producer_id: -1,
1229 producer_epoch: -1,
1230 base_sequence: -1,
1231 records: ControlBatchOrRecords::Records(vec![Record {
1232 timestamp_delta: 0,
1233 offset_delta: 0,
1234 key: Some(vec![b'x'; 100]),
1235 value: Some(b"hello kafka".to_vec()),
1236 headers: vec![RecordHeader {
1237 key: "foo".to_owned(),
1238 value: b"bar".to_vec(),
1239 }],
1240 }]),
1241 compression: RecordBatchCompression::Snappy,
1242 is_transactional: false,
1243 timestamp_type: RecordBatchTimestampType::CreateTime,
1244 };
1245 assert_eq!(actual, expected);
1246
1247 let mut data2 = vec![];
1248 actual.write(&mut data2).unwrap();
1249
1250 let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
1253 assert_eq!(actual2, expected);
1254 }
1255
1256 #[test]
1257 fn test_decode_fixture_snappy_java() {
1258 let data = [
1261 b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x8c\x00\x00\x00\x00".to_vec(),
1262 b"\x02\x79\x1e\x2d\xce\x00\x02\x00\x00\x00\x01\x00\x00\x01\x7f\x07".to_vec(),
1263 b"\x25\x7a\xb1\x00\x00\x01\x7f\x07\x25\x7a\xb1\xff\xff\xff\xff\xff".to_vec(),
1264 b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x02\x82\x53\x4e".to_vec(),
1265 b"\x41\x50\x50\x59\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00".to_vec(),
1266 b"\x47\xff\x01\x1c\xfc\x01\x00\x00\x00\xc8\x01\x78\xfe\x01\x00\x8a".to_vec(),
1267 b"\x01\x00\x64\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02".to_vec(),
1268 b"\x06\x66\x6f\x6f\x06\x62\x61\x72\xfa\x01\x00\x00\x02\xfe\x80\x00".to_vec(),
1269 b"\x96\x80\x00\x4c\x14\x73\x6f\x6d\x65\x20\x76\x61\x6c\x75\x65\x02".to_vec(),
1270 b"\x06\x66\x6f\x6f\x06\x62\x61\x72".to_vec(),
1271 ]
1272 .concat();
1273
1274 let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
1275 let expected = RecordBatch {
1276 base_offset: 0,
1277 partition_leader_epoch: 0,
1278 last_offset_delta: 1,
1279 first_timestamp: 1645092371121,
1280 max_timestamp: 1645092371121,
1281 producer_id: -1,
1282 producer_epoch: -1,
1283 base_sequence: -1,
1284 records: ControlBatchOrRecords::Records(vec![
1285 Record {
1286 timestamp_delta: 0,
1287 offset_delta: 0,
1288 key: Some(vec![b'x'; 100]),
1289 value: Some(b"hello kafka".to_vec()),
1290 headers: vec![RecordHeader {
1291 key: "foo".to_owned(),
1292 value: b"bar".to_vec(),
1293 }],
1294 },
1295 Record {
1296 timestamp_delta: 0,
1297 offset_delta: 1,
1298 key: Some(vec![b'x'; 100]),
1299 value: Some(b"some value".to_vec()),
1300 headers: vec![RecordHeader {
1301 key: "foo".to_owned(),
1302 value: b"bar".to_vec(),
1303 }],
1304 },
1305 ]),
1306 compression: RecordBatchCompression::Snappy,
1307 is_transactional: false,
1308 timestamp_type: RecordBatchTimestampType::CreateTime,
1309 };
1310 assert_eq!(actual, expected);
1311
1312 let mut data2 = vec![];
1313 actual.write(&mut data2).unwrap();
1314
1315 let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
1318 assert_eq!(actual2, expected);
1319 }
1320
1321 #[test]
1322 fn test_decode_java_specific_oom() {
1323 let data = [
1325 0x0a, 0x0a, 0x83, 0x00, 0xd4, 0x00, 0x00, 0x22, 0x00, 0x4b, 0x08, 0xd2, 0x22, 0xfb,
1326 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x9b, 0x00, 0x9b, 0x0a, 0x40,
1327 0x00, 0x00, 0x4b, 0x00, 0x00, 0x00, 0x40, 0x30, 0x00, 0x57, 0x00, 0xd3, 0x82, 0x53,
1328 0x4e, 0x41, 0x50, 0x50, 0x59, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01,
1329 0x00, 0x00, 0x00, 0x03, 0x01, 0x00, 0x00, 0xfc, 0x00, 0x09, 0x09, 0x09, 0x09, 0x09,
1330 0x09, 0x50, 0x50, 0x59, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1331 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x30, 0x00, 0x57, 0x00, 0x00, 0x80,
1332 0x00, 0x00, 0x00, 0x00, 0xb0, 0x9b, 0x00,
1333 ]
1334 .to_vec();
1335
1336 let err = RecordBatchBody::read(&mut Cursor::new(data)).unwrap_err();
1337 assert_matches!(err, ReadError::Malformed(_));
1338 assert_eq!(err.to_string(), "Malformed data: Java-specific Snappy-compressed data has illegal chunk length, got 4227860745 bytes but only 38 bytes are left.");
1339 }
1340
1341 #[test]
1342 fn test_carefully_decompress_snappy_empty_input() {
1343 let err = carefully_decompress_snappy(&[], 1).unwrap_err();
1344 assert_matches!(err, ReadError::Malformed(_));
1345 }
1346
1347 #[test]
1348 fn test_carefully_decompress_snappy_empty_payload() {
1349 let compressed = compress(&[]);
1350 let data = carefully_decompress_snappy(&compressed, 1).unwrap();
1351 assert!(data.is_empty());
1352 }
1353
1354 proptest! {
1355 #![proptest_config(ProptestConfig{cases: 200, ..Default::default()})]
1356 #[test]
1357 fn test_carefully_decompress_snappy(input in prop::collection::vec(any::<u8>(), 0..10_000)) {
1358 let compressed = compress(&input);
1359 let input2 = carefully_decompress_snappy(&compressed, 1).unwrap();
1360 assert_eq!(input, input2);
1361 }
1362 }
1363
1364 fn compress(data: &[u8]) -> Vec<u8> {
1365 use snap::raw::{max_compress_len, Encoder};
1366
1367 let mut encoder = Encoder::new();
1368 let mut output = vec![0; max_compress_len(data.len())];
1369 let l = encoder.compress(data, &mut output).unwrap();
1370
1371 output[..l].to_vec()
1372 }
1373 }
1374
1375 #[cfg(feature = "compression-zstd")]
1376 #[test]
1377 fn test_decode_fixture_zstd() {
1378 let data = [
1380 b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x5d\x00\x00\x00\x00".to_vec(),
1381 b"\x02\xa1\x6e\x4e\x95\x00\x04\x00\x00\x00\x00\x00\x00\x01\x7e\xbf".to_vec(),
1382 b"\x78\xf3\xad\x00\x00\x01\x7e\xbf\x78\xf3\xad\xff\xff\xff\xff\xff".to_vec(),
1383 b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x28\xb5\x2f".to_vec(),
1384 b"\xfd\x00\x58\x1d\x01\x00\xe8\xfc\x01\x00\x00\x00\xc8\x01\x78\x16".to_vec(),
1385 b"\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02\x06\x66\x6f\x6f".to_vec(),
1386 b"\x06\x62\x61\x72\x01\x00\x20\x05\x5c".to_vec(),
1387 ]
1388 .concat();
1389
1390 let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
1391 let expected = RecordBatch {
1392 base_offset: 0,
1393 partition_leader_epoch: 0,
1394 last_offset_delta: 0,
1395 first_timestamp: 1643889882029,
1396 max_timestamp: 1643889882029,
1397 producer_id: -1,
1398 producer_epoch: -1,
1399 base_sequence: -1,
1400 records: ControlBatchOrRecords::Records(vec![Record {
1401 timestamp_delta: 0,
1402 offset_delta: 0,
1403 key: Some(vec![b'x'; 100]),
1404 value: Some(b"hello kafka".to_vec()),
1405 headers: vec![RecordHeader {
1406 key: "foo".to_owned(),
1407 value: b"bar".to_vec(),
1408 }],
1409 }]),
1410 compression: RecordBatchCompression::Zstd,
1411 is_transactional: false,
1412 timestamp_type: RecordBatchTimestampType::CreateTime,
1413 };
1414 assert_eq!(actual, expected);
1415
1416 let mut data2 = vec![];
1417 actual.write(&mut data2).unwrap();
1418
1419 let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
1422 assert_eq!(actual2, expected);
1423 }
1424
1425 #[test]
1426 fn test_decode_fixture_null_key() {
1427 let data = [
1429 b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x1a\x00\x00\x00\x00".to_vec(),
1430 b"\x02\x67\x98\xb9\x54\x00\x00\x00\x00\x00\x00\x00\x00\x01\x7e\xbe".to_vec(),
1431 b"\xdc\x91\xf6\x00\x00\x01\x7e\xbe\xdc\x91\xf6\xff\xff\xff\xff\xff".to_vec(),
1432 b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\xce\x03\x00".to_vec(),
1433 b"\x00\x00\x01\xce\x01\x0a\x65\x0a\x2f\x74\x65\x73\x74\x5f\x74\x6f".to_vec(),
1434 b"\x70\x69\x63\x5f\x33\x37\x33\x39\x38\x66\x38\x64\x2d\x39\x35\x66".to_vec(),
1435 b"\x38\x2d\x34\x34\x65\x65\x2d\x38\x33\x61\x34\x2d\x34\x64\x30\x63".to_vec(),
1436 b"\x35\x39\x32\x62\x34\x34\x36\x64\x12\x32\x0a\x03\x75\x70\x63\x12".to_vec(),
1437 b"\x17\x0a\x04\x75\x73\x65\x72\x10\x03\x1a\x0a\x12\x08\x00\x00\x00".to_vec(),
1438 b"\x00\x00\x00\xf0\x3f\x22\x01\x00\x12\x10\x0a\x04\x74\x69\x6d\x65".to_vec(),
1439 b"\x10\x04\x1a\x03\x0a\x01\x64\x22\x01\x00\x18\x01\x04\x18\x63\x6f".to_vec(),
1440 b"\x6e\x74\x65\x6e\x74\x2d\x74\x79\x70\x65\xa4\x01\x61\x70\x70\x6c".to_vec(),
1441 b"\x69\x63\x61\x74\x69\x6f\x6e\x2f\x78\x2d\x70\x72\x6f\x74\x6f\x62".to_vec(),
1442 b"\x75\x66\x3b\x20\x73\x63\x68\x65\x6d\x61\x3d\x22\x69\x6e\x66\x6c".to_vec(),
1443 b"\x75\x78\x64\x61\x74\x61\x2e\x69\x6f\x78\x2e\x77\x72\x69\x74\x65".to_vec(),
1444 b"\x5f\x62\x75\x66\x66\x65\x72\x2e\x76\x31\x2e\x57\x72\x69\x74\x65".to_vec(),
1445 b"\x42\x75\x66\x66\x65\x72\x50\x61\x79\x6c\x6f\x61\x64\x22\x1a\x69".to_vec(),
1446 b"\x6f\x78\x2d\x6e\x61\x6d\x65\x73\x70\x61\x63\x65\x12\x6e\x61\x6d".to_vec(),
1447 b"\x65\x73\x70\x61\x63\x65".to_vec(),
1448 ]
1449 .concat();
1450
1451 let actual = RecordBatch::read(&mut Cursor::new(data.clone())).unwrap();
1452 let expected = RecordBatch {
1453 base_offset: 0,
1454 partition_leader_epoch: 0,
1455 last_offset_delta: 0,
1456 first_timestamp: 1643879633398,
1457 max_timestamp: 1643879633398,
1458 producer_id: -1,
1459 producer_epoch: -1,
1460 base_sequence: -1,
1461 records: ControlBatchOrRecords::Records(vec![Record {
1462 timestamp_delta: 0,
1463 offset_delta: 0,
1464 key: None,
1465 value: Some(vec![
1466 10, 101, 10, 47, 116, 101, 115, 116, 95, 116, 111, 112, 105, 99, 95, 51, 55,
1467 51, 57, 56, 102, 56, 100, 45, 57, 53, 102, 56, 45, 52, 52, 101, 101, 45, 56,
1468 51, 97, 52, 45, 52, 100, 48, 99, 53, 57, 50, 98, 52, 52, 54, 100, 18, 50, 10,
1469 3, 117, 112, 99, 18, 23, 10, 4, 117, 115, 101, 114, 16, 3, 26, 10, 18, 8, 0, 0,
1470 0, 0, 0, 0, 240, 63, 34, 1, 0, 18, 16, 10, 4, 116, 105, 109, 101, 16, 4, 26, 3,
1471 10, 1, 100, 34, 1, 0, 24, 1,
1472 ]),
1473 headers: vec![
1474 RecordHeader {
1475 key: "content-type".to_owned(),
1476 value: br#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""#.to_vec(),
1477 },
1478 RecordHeader {
1479 key: "iox-namespace".to_owned(),
1480 value: b"namespace".to_vec(),
1481 },
1482 ],
1483 }]),
1484 compression: RecordBatchCompression::NoCompression,
1485 is_transactional: false,
1486 timestamp_type: RecordBatchTimestampType::CreateTime,
1487 };
1488 assert_eq!(actual, expected);
1489
1490 let mut data2 = vec![];
1491 actual.write(&mut data2).unwrap();
1492 assert_eq!(data, data2);
1493 }
1494}