rskafka/protocol/
record.rs

1//! The actual payload message which is also the on-disk format for Kafka.
2//!
3//! The format evolved twice in [KIP-32] and [KIP-98]. We only support the latest generation (message version 2).
4//!
5//!
6//! # CRC
7//! The CRC used to check payload data is `CRC-32C` / iSCSI as documented by the following sources:
8//!
9//! - <https://kafka.apache.org/documentation/#recordbatch>
10//! - <https://docs.oracle.com/javase/9/docs/api/java/util/zip/CRC32C.html>
11//! - <https://reveng.sourceforge.io/crc-catalogue/all.htm>
12//!
13//!
14//! # References
15//! - [KIP-32]
16//! - [KIP-98]
17//! - <https://kafka.apache.org/documentation/#messageformat>
18//!
19//!
20//! [KIP-32]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message
21//! [KIP-98]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
22use std::io::{Cursor, Read, Write};
23
24#[cfg(test)]
25use proptest::prelude::*;
26
27use super::{
28    primitives::{Int16, Int32, Int64, Int8, Varint, Varlong},
29    traits::{ReadError, ReadType, WriteError, WriteType},
30    vec_builder::VecBuilder,
31};
32
33/// Record Header
34///
35/// # References
36/// - <https://kafka.apache.org/documentation/#recordheader>
37#[derive(Debug, PartialEq, Eq)]
38#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
39pub struct RecordHeader {
40    pub key: String,
41    pub value: Vec<u8>,
42}
43
44impl<R> ReadType<R> for RecordHeader
45where
46    R: Read,
47{
48    fn read(reader: &mut R) -> Result<Self, ReadError> {
49        // key
50        let len = Varint::read(reader)?;
51        let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
52        let mut buf = VecBuilder::new(len);
53        buf = buf.read_exact(reader)?;
54        let key = String::from_utf8(buf.into()).map_err(|e| ReadError::Malformed(Box::new(e)))?;
55
56        // value
57        let len = Varint::read(reader)?;
58        let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
59        let mut value = VecBuilder::new(len);
60        value = value.read_exact(reader)?;
61        let value = value.into();
62
63        Ok(Self { key, value })
64    }
65}
66
67impl<W> WriteType<W> for RecordHeader
68where
69    W: Write,
70{
71    fn write(&self, writer: &mut W) -> Result<(), WriteError> {
72        // key
73        let l = i32::try_from(self.key.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
74        Varint(l).write(writer)?;
75        writer.write_all(self.key.as_bytes())?;
76
77        // value
78        let l = i32::try_from(self.value.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
79        Varint(l).write(writer)?;
80        writer.write_all(&self.value)?;
81
82        Ok(())
83    }
84}
85
86/// Record
87///
88/// # References
89/// - <https://kafka.apache.org/documentation/#record>
90#[derive(Debug, PartialEq, Eq)]
91#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
92pub struct Record {
93    pub timestamp_delta: i64,
94    pub offset_delta: i32,
95    pub key: Option<Vec<u8>>,
96    pub value: Option<Vec<u8>>,
97    pub headers: Vec<RecordHeader>,
98}
99
100impl<R> ReadType<R> for Record
101where
102    R: Read,
103{
104    fn read(reader: &mut R) -> Result<Self, ReadError> {
105        // length
106        let len = Varint::read(reader)?;
107        let len = u64::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
108        let reader = &mut reader.take(len);
109
110        // attributes
111        Int8::read(reader)?;
112
113        // timestampDelta
114        let timestamp_delta = Varlong::read(reader)?.0;
115
116        // offsetDelta
117        let offset_delta = Varint::read(reader)?.0;
118
119        // key
120        let len = Varint::read(reader)?.0;
121        let key = if len == -1 {
122            None
123        } else {
124            let len = usize::try_from(len).map_err(|e| ReadError::Malformed(Box::new(e)))?;
125            let mut key = VecBuilder::new(len);
126            key = key.read_exact(reader)?;
127            Some(key.into())
128        };
129
130        // value
131        let len = Varint::read(reader)?.0;
132        let value = if len == -1 {
133            None
134        } else {
135            let len = usize::try_from(len).map_err(|e| ReadError::Malformed(Box::new(e)))?;
136            let mut value = VecBuilder::new(len);
137            value = value.read_exact(reader)?;
138            Some(value.into())
139        };
140
141        // headers
142        // Note: This is NOT a normal array but uses a Varint instead.
143        let n_headers = Varint::read(reader)?;
144        let n_headers =
145            usize::try_from(n_headers.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
146        let mut headers = VecBuilder::new(n_headers);
147        for _ in 0..n_headers {
148            headers.push(RecordHeader::read(reader)?);
149        }
150
151        // check if there is any trailing data because this is likely a bug
152        if reader.limit() != 0 {
153            return Err(ReadError::Malformed(
154                format!("Found {} trailing bytes after Record", reader.limit()).into(),
155            ));
156        }
157
158        Ok(Self {
159            timestamp_delta,
160            offset_delta,
161            key,
162            value,
163            headers: headers.into(),
164        })
165    }
166}
167
168impl<W> WriteType<W> for Record
169where
170    W: Write,
171{
172    fn write(&self, writer: &mut W) -> Result<(), WriteError> {
173        // ============================================================================================
174        // ======================================== inner data ========================================
175        // write data to buffer because we need to prepend the length
176        let mut data = vec![];
177
178        // attributes
179        Int8(0).write(&mut data)?;
180
181        // timestampDelta
182        Varlong(self.timestamp_delta).write(&mut data)?;
183
184        // offsetDelta
185        Varint(self.offset_delta).write(&mut data)?;
186
187        // key
188        match &self.key {
189            Some(key) => {
190                let l = i32::try_from(key.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
191                Varint(l).write(&mut data)?;
192                data.write_all(key)?;
193            }
194            None => {
195                Varint(-1).write(&mut data)?;
196            }
197        }
198
199        // value
200        match &self.value {
201            Some(value) => {
202                let l =
203                    i32::try_from(value.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
204                Varint(l).write(&mut data)?;
205                data.write_all(value)?;
206            }
207            None => {
208                Varint(-1).write(&mut data)?;
209            }
210        }
211
212        // headers
213        // Note: This is NOT a normal array but uses a Varint instead.
214        let l =
215            i32::try_from(self.headers.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
216        Varint(l).write(&mut data)?;
217        for header in &self.headers {
218            header.write(&mut data)?;
219        }
220
221        // ============================================================================================
222        // ============================================================================================
223
224        // now write accumulated data
225        let l = i32::try_from(data.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
226        Varint(l).write(writer)?;
227        writer.write_all(&data)?;
228
229        Ok(())
230    }
231}
232
233/// Control Batch Record
234///
235/// # References
236/// - <https://kafka.apache.org/documentation/#controlbatch>
237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
238#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
239pub enum ControlBatchRecord {
240    Abort,
241    Commit,
242}
243
244impl<R> ReadType<R> for ControlBatchRecord
245where
246    R: Read,
247{
248    fn read(reader: &mut R) -> Result<Self, ReadError> {
249        // version
250        let version = Int16::read(reader)?.0;
251        if version != 0 {
252            return Err(ReadError::Malformed(
253                format!("Unknown control batch record version: {}", version).into(),
254            ));
255        }
256
257        // type
258        let t = Int16::read(reader)?.0;
259        match t {
260            0 => Ok(Self::Abort),
261            1 => Ok(Self::Commit),
262            _ => Err(ReadError::Malformed(
263                format!("Unknown control batch record type: {}", t).into(),
264            )),
265        }
266    }
267}
268
269impl<W> WriteType<W> for ControlBatchRecord
270where
271    W: Write,
272{
273    fn write(&self, writer: &mut W) -> Result<(), WriteError> {
274        // version
275        Int16(0).write(writer)?;
276
277        // type
278        let t = match self {
279            Self::Abort => 0,
280            Self::Commit => 1,
281        };
282        Int16(t).write(writer)?;
283
284        Ok(())
285    }
286}
287
288#[derive(Debug, PartialEq, Eq)]
289#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
290pub enum ControlBatchOrRecords {
291    ControlBatch(ControlBatchRecord),
292
293    // tell proptest to only generate small vectors, otherwise tests take forever
294    #[cfg_attr(
295        test,
296        proptest(
297            strategy = "prop::collection::vec(any::<Record>(), 0..2).prop_map(ControlBatchOrRecords::Records)"
298        )
299    )]
300    Records(Vec<Record>),
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq)]
304#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
305pub enum RecordBatchCompression {
306    NoCompression,
307    Gzip,
308    Snappy,
309    Lz4,
310    Zstd,
311}
312
313/// Record batch timestamp type.
314///
315/// # References
316/// - <https://kafka.apache.org/documentation/#messageset> (this is the old message format, but the flag is the same)
317#[derive(Debug, Clone, Copy, PartialEq, Eq)]
318#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
319pub enum RecordBatchTimestampType {
320    CreateTime,
321    LogAppendTime,
322}
323
324/// Record Batch
325///
326/// # References
327/// - <https://kafka.apache.org/documentation/#recordbatch>
328#[derive(Debug, PartialEq, Eq)]
329#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
330pub struct RecordBatch {
331    pub base_offset: i64,
332    pub partition_leader_epoch: i32,
333    pub last_offset_delta: i32,
334    pub first_timestamp: i64,
335    pub max_timestamp: i64,
336    pub producer_id: i64,
337    pub producer_epoch: i16,
338    pub base_sequence: i32,
339    pub records: ControlBatchOrRecords,
340    pub compression: RecordBatchCompression,
341    pub is_transactional: bool,
342    pub timestamp_type: RecordBatchTimestampType,
343}
344
345impl<R> ReadType<R> for RecordBatch
346where
347    R: Read,
348{
349    fn read(reader: &mut R) -> Result<Self, ReadError> {
350        // baseOffset
351        let base_offset = Int64::read(reader)?.0;
352
353        // batchLength
354        //
355        // Contains all fields AFTER the length field (so excluding `baseOffset` and `batchLength`). To determine the
356        // size of the CRC-checked part we must substract all sized from this field to and including the CRC field.
357        let len = Int32::read(reader)?;
358        let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
359        let len = len
360            .checked_sub(
361                4 // partitionLeaderEpoch
362            + 1 // magic
363            + 4, // crc
364            )
365            .ok_or_else(|| {
366                ReadError::Malformed(format!("Record batch len too small: {}", len).into())
367            })?;
368
369        // partitionLeaderEpoch
370        let partition_leader_epoch = Int32::read(reader)?.0;
371
372        // magic
373        let magic = Int8::read(reader)?.0;
374        if magic != 2 {
375            return Err(ReadError::Malformed(
376                format!("Invalid magic number in record batch: {}", magic).into(),
377            ));
378        }
379
380        // crc
381        let crc = Int32::read(reader)?.0;
382        let crc = u32::from_be_bytes(crc.to_be_bytes());
383
384        // data
385        let mut data = VecBuilder::new(len);
386        data = data.read_exact(reader)?;
387        let data: Vec<u8> = data.into();
388        let actual_crc = crc32c::crc32c(&data);
389        if crc != actual_crc {
390            return Err(ReadError::Malformed(
391                format!("CRC error, got 0x{:x}, expected 0x{:x}", actual_crc, crc).into(),
392            ));
393        }
394
395        // ==========================================================================================
396        // ======================================== CRC data ========================================
397        let mut data = Cursor::new(data);
398        let body = RecordBatchBody::read(&mut data)?;
399
400        // check if there is any trailing data because this is likely a bug
401        let bytes_read = data.position();
402        let bytes_total = u64::try_from(data.into_inner().len()).map_err(ReadError::Overflow)?;
403        let bytes_left = bytes_total - bytes_read;
404        if bytes_left != 0 {
405            return Err(ReadError::Malformed(
406                format!("Found {} trailing bytes after RecordBatch", bytes_left).into(),
407            ));
408        }
409
410        // ==========================================================================================
411        // ==========================================================================================
412
413        Ok(Self {
414            base_offset,
415            partition_leader_epoch,
416            last_offset_delta: body.last_offset_delta,
417            first_timestamp: body.first_timestamp,
418            max_timestamp: body.max_timestamp,
419            producer_id: body.producer_id,
420            producer_epoch: body.producer_epoch,
421            base_sequence: body.base_sequence,
422            compression: body.compression,
423            timestamp_type: body.timestamp_type,
424            is_transactional: body.is_transactional,
425            records: body.records,
426        })
427    }
428}
429
430impl<W> WriteType<W> for RecordBatch
431where
432    W: Write,
433{
434    fn write(&self, writer: &mut W) -> Result<(), WriteError> {
435        // ==========================================================================================
436        // ======================================== CRC data ========================================
437        // collect everything that should be part of the CRC calculation
438        let mut data = vec![];
439        let body_ref = RecordBatchBodyRef {
440            last_offset_delta: self.last_offset_delta,
441            first_timestamp: self.first_timestamp,
442            max_timestamp: self.max_timestamp,
443            producer_id: self.producer_id,
444            producer_epoch: self.producer_epoch,
445            base_sequence: self.base_sequence,
446            records: &self.records,
447            compression: self.compression,
448            is_transactional: self.is_transactional,
449            timestamp_type: self.timestamp_type,
450        };
451        body_ref.write(&mut data)?;
452
453        // ==========================================================================================
454        // ==========================================================================================
455
456        // baseOffset
457        Int64(self.base_offset).write(writer)?;
458
459        // batchLength
460        //
461        // Contains all fields AFTER the length field (so excluding `baseOffset` and `batchLength`, but including
462        // `partitionLeaderEpoch`, `magic`, and `crc).
463        //
464        // See
465        // https://github.com/kafka-rust/kafka-rust/blob/657202832806cda77d0a1801d618dc6c382b4d79/src/protocol/produce.rs#L224-L226
466        let l = i32::try_from(
467            data.len()
468            + 4 // partitionLeaderEpoch
469            + 1 // magic
470            + 4, // crc
471        )
472        .map_err(|e| WriteError::Malformed(Box::new(e)))?;
473        Int32(l).write(writer)?;
474
475        // partitionLeaderEpoch
476        Int32(self.partition_leader_epoch).write(writer)?;
477
478        // magic
479        Int8(2).write(writer)?;
480
481        // crc
482        // See
483        // https://github.com/kafka-rust/kafka-rust/blob/a551b6231a7adc9b715552b635a69ac2856ec8a1/src/protocol/mod.rs#L161-L163
484        // WARNING: the range in the code linked above is correct but the polynomial is wrong!
485        let crc = crc32c::crc32c(&data);
486        let crc = i32::from_be_bytes(crc.to_be_bytes());
487        Int32(crc).write(writer)?;
488
489        // the actual CRC-checked data
490        writer.write_all(&data)?;
491
492        Ok(())
493    }
494}
495
496/// Inner part of a [`RecordBatch`] that is protected by a header containing its length and a CRC checksum.
497#[derive(Debug, PartialEq, Eq)]
498#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
499pub struct RecordBatchBody {
500    pub last_offset_delta: i32,
501    pub first_timestamp: i64,
502    pub max_timestamp: i64,
503    pub producer_id: i64,
504    pub producer_epoch: i16,
505    pub base_sequence: i32,
506    pub records: ControlBatchOrRecords,
507    pub compression: RecordBatchCompression,
508    pub is_transactional: bool,
509    pub timestamp_type: RecordBatchTimestampType,
510}
511
512impl RecordBatchBody {
513    fn read_records<R>(
514        reader: &mut R,
515        is_control: bool,
516        n_records: usize,
517    ) -> Result<ControlBatchOrRecords, ReadError>
518    where
519        R: Read,
520    {
521        if is_control {
522            if n_records != 1 {
523                return Err(ReadError::Malformed(
524                    format!("Expected 1 control record but got {}", n_records).into(),
525                ));
526            }
527
528            let record = ControlBatchRecord::read(reader)?;
529            Ok(ControlBatchOrRecords::ControlBatch(record))
530        } else {
531            let mut records = VecBuilder::new(n_records);
532            for _ in 0..n_records {
533                records.push(Record::read(reader)?);
534            }
535            Ok(ControlBatchOrRecords::Records(records.into()))
536        }
537    }
538}
539
540impl<R> ReadType<R> for RecordBatchBody
541where
542    R: Read,
543{
544    fn read(reader: &mut R) -> Result<Self, ReadError> {
545        // attributes
546        let attributes = Int16::read(reader)?.0;
547        let compression = match attributes & 0x7 {
548            0 => RecordBatchCompression::NoCompression,
549            1 => RecordBatchCompression::Gzip,
550            2 => RecordBatchCompression::Snappy,
551            3 => RecordBatchCompression::Lz4,
552            4 => RecordBatchCompression::Zstd,
553            other => {
554                return Err(ReadError::Malformed(
555                    format!("Invalid compression type: {}", other).into(),
556                ));
557            }
558        };
559        let timestamp_type = if ((attributes >> 3) & 0x1) == 0 {
560            RecordBatchTimestampType::CreateTime
561        } else {
562            RecordBatchTimestampType::LogAppendTime
563        };
564        let is_transactional = ((attributes >> 4) & 0x1) == 1;
565        let is_control = ((attributes >> 5) & 0x1) == 1;
566
567        // lastOffsetDelta
568        let last_offset_delta = Int32::read(reader)?.0;
569
570        // firstTimestamp
571        let first_timestamp = Int64::read(reader)?.0;
572
573        // maxTimestamp
574        let max_timestamp = Int64::read(reader)?.0;
575
576        // producerId
577        let producer_id = Int64::read(reader)?.0;
578
579        // producerEpoch
580        let producer_epoch = Int16::read(reader)?.0;
581
582        // baseSequence
583        let base_sequence = Int32::read(reader)?.0;
584
585        // records
586        let n_records = match Int32::read(reader)?.0 {
587            -1 => 0,
588            n => usize::try_from(n)?,
589        };
590        let records = match compression {
591            RecordBatchCompression::NoCompression => {
592                Self::read_records(reader, is_control, n_records)?
593            }
594            #[cfg(feature = "compression-gzip")]
595            RecordBatchCompression::Gzip => {
596                use flate2::read::GzDecoder;
597
598                let mut decoder = GzDecoder::new(reader);
599                let records = Self::read_records(&mut decoder, is_control, n_records)?;
600
601                ensure_eof(&mut decoder, "Data left in gzip block")?;
602
603                records
604            }
605            #[cfg(feature = "compression-lz4")]
606            RecordBatchCompression::Lz4 => {
607                use lz4::Decoder;
608
609                let mut decoder = Decoder::new(reader)?;
610                let records = Self::read_records(&mut decoder, is_control, n_records)?;
611
612                // the lz4 decoder requires us to consume the whole inner stream until we reach EOF
613                ensure_eof(&mut decoder, "Data left in LZ4 block")?;
614
615                let (_reader, res) = decoder.finish();
616                res?;
617
618                records
619            }
620            #[cfg(feature = "compression-snappy")]
621            RecordBatchCompression::Snappy => {
622                use crate::protocol::vec_builder::DEFAULT_BLOCK_SIZE;
623
624                // Construct the input for the raw decoder.
625                let mut input = vec![];
626                reader.read_to_end(&mut input)?;
627
628                const JAVA_MAGIC: &[u8] = &[0x82, b'S', b'N', b'A', b'P', b'P', b'Y', 0];
629
630                // There are "normal" compression libs, and there is Java
631                // See https://github.com/edenhill/librdkafka/blob/2b76b65212e5efda213961d5f84e565038036270/src/rdkafka_msgset_reader.c#L307-L318
632                let output = if input.starts_with(JAVA_MAGIC) {
633                    let cursor_content = &input[JAVA_MAGIC.len()..];
634                    let mut cursor = Cursor::new(cursor_content);
635
636                    let mut buf_version = [0u8; 4];
637                    cursor.read_exact(&mut buf_version)?;
638                    if buf_version != [0, 0, 0, 1] {
639                        return Err(ReadError::Malformed(
640                            format!("Detected Java-specific Snappy compression, but got unknown version: {buf_version:?}").into(),
641                        ));
642                    }
643
644                    let mut buf_compatible = [0u8; 4];
645                    cursor.read_exact(&mut buf_compatible)?;
646                    if buf_compatible != [0, 0, 0, 1] {
647                        return Err(ReadError::Malformed(
648                            format!("Detected Java-specific Snappy compression, but got unknown compat flags: {buf_compatible:?}").into(),
649                        ));
650                    }
651
652                    let mut output = vec![];
653                    while cursor.position() < cursor.get_ref().len() as u64 {
654                        let mut buf_chunk_length = [0u8; 4];
655                        cursor.read_exact(&mut buf_chunk_length)?;
656                        let chunk_length = u32::from_be_bytes(buf_chunk_length) as usize;
657                        let bytes_left = cursor_content.len() - (cursor.position() as usize);
658                        if chunk_length > bytes_left {
659                            // do NOT try to allocate massive buffer for `chunk_data` but instead fail early
660                            return Err(ReadError::Malformed(format!("Java-specific Snappy-compressed data has illegal chunk length, got {chunk_length} bytes but only {bytes_left} bytes are left.").into()));
661                        }
662
663                        let mut chunk_data = vec![0u8; chunk_length];
664                        cursor.read_exact(&mut chunk_data)?;
665
666                        let mut buf = carefully_decompress_snappy(&chunk_data, DEFAULT_BLOCK_SIZE)?;
667                        output.append(&mut buf);
668                    }
669
670                    output
671                } else {
672                    carefully_decompress_snappy(&input, DEFAULT_BLOCK_SIZE)?
673                };
674
675                // Read uncompressed records.
676                let mut decoder = Cursor::new(output);
677                let records = Self::read_records(&mut decoder, is_control, n_records)?;
678
679                // Check that there's no data left within the uncompressed block.
680                ensure_eof(&mut decoder, "Data left in Snappy block")?;
681
682                records
683            }
684            #[cfg(feature = "compression-zstd")]
685            RecordBatchCompression::Zstd => {
686                use zstd::Decoder;
687
688                let mut decoder = Decoder::new(reader)?;
689                let records = Self::read_records(&mut decoder, is_control, n_records)?;
690
691                ensure_eof(&mut decoder, "Data left in zstd block")?;
692
693                records
694            }
695            #[allow(unreachable_patterns)]
696            _ => {
697                return Err(ReadError::Malformed(
698                    format!("Unimplemented compression: {:?}", compression).into(),
699                ));
700            }
701        };
702
703        Ok(Self {
704            last_offset_delta,
705            first_timestamp,
706            max_timestamp,
707            producer_id,
708            producer_epoch,
709            base_sequence,
710            compression,
711            timestamp_type,
712            is_transactional,
713            records,
714        })
715    }
716}
717
718impl<W> WriteType<W> for RecordBatchBody
719where
720    W: Write,
721{
722    fn write(&self, writer: &mut W) -> Result<(), WriteError> {
723        let body_ref = RecordBatchBodyRef {
724            last_offset_delta: self.last_offset_delta,
725            first_timestamp: self.first_timestamp,
726            max_timestamp: self.max_timestamp,
727            producer_id: self.producer_id,
728            producer_epoch: self.producer_epoch,
729            base_sequence: self.base_sequence,
730            records: &self.records,
731            compression: self.compression,
732            is_transactional: self.is_transactional,
733            timestamp_type: self.timestamp_type,
734        };
735        body_ref.write(writer)
736    }
737}
738
739/// Same as [`RecordBatchBody`] but contains referenced data.
740///
741/// This only supports writing.
742#[derive(Debug)]
743struct RecordBatchBodyRef<'a> {
744    pub last_offset_delta: i32,
745    pub first_timestamp: i64,
746    pub max_timestamp: i64,
747    pub producer_id: i64,
748    pub producer_epoch: i16,
749    pub base_sequence: i32,
750    pub records: &'a ControlBatchOrRecords,
751    pub compression: RecordBatchCompression,
752    pub is_transactional: bool,
753    pub timestamp_type: RecordBatchTimestampType,
754}
755
756impl<'a> RecordBatchBodyRef<'a> {
757    fn write_records<W>(writer: &mut W, records: &ControlBatchOrRecords) -> Result<(), WriteError>
758    where
759        W: Write,
760    {
761        match records {
762            ControlBatchOrRecords::ControlBatch(control_batch) => control_batch.write(writer),
763            ControlBatchOrRecords::Records(records) => {
764                for record in records {
765                    record.write(writer)?;
766                }
767                Ok(())
768            }
769        }
770    }
771}
772
773impl<'a, W> WriteType<W> for RecordBatchBodyRef<'a>
774where
775    W: Write,
776{
777    fn write(&self, writer: &mut W) -> Result<(), WriteError> {
778        // attributes
779        let mut attributes: i16 = match self.compression {
780            RecordBatchCompression::NoCompression => 0,
781            RecordBatchCompression::Gzip => 1,
782            RecordBatchCompression::Snappy => 2,
783            RecordBatchCompression::Lz4 => 3,
784            RecordBatchCompression::Zstd => 4,
785        };
786        match self.timestamp_type {
787            RecordBatchTimestampType::CreateTime => (),
788            RecordBatchTimestampType::LogAppendTime => {
789                attributes |= 1 << 3;
790            }
791        }
792        if self.is_transactional {
793            attributes |= 1 << 4;
794        }
795        if matches!(self.records, ControlBatchOrRecords::ControlBatch(_)) {
796            attributes |= 1 << 5;
797        }
798        Int16(attributes).write(writer)?;
799
800        // lastOffsetDelta
801        Int32(self.last_offset_delta).write(writer)?;
802
803        // firstTimestamp
804        Int64(self.first_timestamp).write(writer)?;
805
806        // maxTimestamp
807        Int64(self.max_timestamp).write(writer)?;
808
809        // producerId
810        Int64(self.producer_id).write(writer)?;
811
812        // producerEpoch
813        Int16(self.producer_epoch).write(writer)?;
814
815        // baseSequence
816        Int32(self.base_sequence).write(writer)?;
817
818        // records
819        let n_records = match &self.records {
820            ControlBatchOrRecords::ControlBatch(_) => 1,
821            ControlBatchOrRecords::Records(records) => records.len(),
822        };
823        Int32(i32::try_from(n_records)?).write(writer)?;
824        match self.compression {
825            RecordBatchCompression::NoCompression => {
826                Self::write_records(writer, self.records)?;
827            }
828            #[cfg(feature = "compression-gzip")]
829            RecordBatchCompression::Gzip => {
830                use flate2::{write::GzEncoder, Compression};
831
832                let mut encoder = GzEncoder::new(writer, Compression::default());
833                Self::write_records(&mut encoder, self.records)?;
834                encoder.finish()?;
835            }
836            #[cfg(feature = "compression-lz4")]
837            RecordBatchCompression::Lz4 => {
838                use lz4::{liblz4::BlockMode, EncoderBuilder};
839
840                let mut encoder = EncoderBuilder::new()
841                    .block_mode(
842                        // the only one supported by Kafka
843                        BlockMode::Independent,
844                    )
845                    .build(writer)?;
846                Self::write_records(&mut encoder, self.records)?;
847                let (_writer, res) = encoder.finish();
848                res?;
849            }
850            #[cfg(feature = "compression-snappy")]
851            RecordBatchCompression::Snappy => {
852                use snap::raw::{max_compress_len, Encoder};
853
854                let mut input = vec![];
855                Self::write_records(&mut input, self.records)?;
856
857                let mut encoder = Encoder::new();
858                let mut output = vec![0; max_compress_len(input.len())];
859                let len = encoder
860                    .compress(&input, &mut output)
861                    .map_err(|e| WriteError::Malformed(Box::new(e)))?;
862
863                writer.write_all(&output[..len])?;
864            }
865            #[cfg(feature = "compression-zstd")]
866            RecordBatchCompression::Zstd => {
867                use zstd::Encoder;
868
869                let mut encoder = Encoder::new(writer, 0)?;
870                Self::write_records(&mut encoder, self.records)?;
871                encoder.finish()?;
872            }
873            #[allow(unreachable_patterns)]
874            _ => {
875                return Err(WriteError::Malformed(
876                    format!("Unimplemented compression: {:?}", self.compression).into(),
877                ));
878            }
879        }
880
881        Ok(())
882    }
883}
884
885/// Ensure that given reader is at EOF.
886#[allow(dead_code)] // only use by some features
887fn ensure_eof<R>(reader: &mut R, msg: &str) -> Result<(), ReadError>
888where
889    R: Read,
890{
891    let mut buf = [0u8; 1];
892    match reader.read(&mut buf) {
893        Ok(0) => Ok(()),
894        Ok(_) => Err(ReadError::Malformed(msg.to_string().into())),
895        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(()),
896        Err(e) => Err(ReadError::IO(e)),
897    }
898}
899
900/// Try to decompress a snappy message without blindly believing the uncompressed size encoded at the start of the
901/// message (and therefore potentially OOMing).
902#[cfg(feature = "compression-snappy")]
903fn carefully_decompress_snappy(
904    input: &[u8],
905    start_block_size: usize,
906) -> Result<Vec<u8>, ReadError> {
907    use crate::protocol::primitives::UnsignedVarint;
908    use snap::raw::{decompress_len, Decoder};
909
910    // early exit, otherwise `uncompressed_size_encoded_length` will be 1 even though there was no input
911    if input.is_empty() {
912        return Err(ReadError::Malformed(Box::new(snap::Error::Empty)));
913    }
914
915    // The snappy compression used here is unframed aka "raw". So we first need to figure out the
916    // uncompressed length. See
917    //
918    // - https://github.com/edenhill/librdkafka/blob/2b76b65212e5efda213961d5f84e565038036270/src/rdkafka_msgset_reader.c#L345-L348
919    // - https://github.com/edenhill/librdkafka/blob/747f77c98fbddf7dc6508f76398e0fc9ee91450f/src/snappy.c#L779
920    let uncompressed_size = decompress_len(input).map_err(|e| ReadError::Malformed(Box::new(e)))?;
921
922    // figure out how long the encoded size was
923    let uncompressed_size_encoded_length = {
924        let mut buf = Vec::with_capacity(100);
925        UnsignedVarint(uncompressed_size as u64)
926            .write(&mut buf)
927            .expect("this write should never fail");
928        buf.len()
929    };
930
931    // Decode snappy payload.
932    // The uncompressed length is unchecked and can be up to 2^32-1 bytes. To avoid a DDoS vector we try to
933    // limit it to a small size and if that fails we double that size;
934    let mut max_uncompressed_size = start_block_size;
935
936    // Try to decode the message with growing output buffers.
937    loop {
938        let try_uncompressed_size = uncompressed_size.min(max_uncompressed_size);
939
940        // We need to lie to the snap decoder about the target length, otherwise it will reject our shortened test
941        // straight away. Luckily that's rather easy and we just need fake the length stored right at the beginning of
942        // the message.
943        let try_input = {
944            let mut buf = Cursor::new(Vec::with_capacity(input.len()));
945            UnsignedVarint(try_uncompressed_size as u64)
946                .write(&mut buf)
947                .expect("this write should never fail");
948            buf.write_all(&input[uncompressed_size_encoded_length..])
949                .expect("this write should never fail");
950            buf.into_inner()
951        };
952
953        let mut decoder = Decoder::new();
954        let mut output = vec![0; try_uncompressed_size];
955        let actual_uncompressed_size = match decoder.decompress(&try_input, &mut output) {
956            Ok(size) => size,
957            Err(e) => {
958                let looks_like_dst_too_small = match e {
959                    // `CopyWrite` only occurs when the dst buffer is too small.
960                    snap::Error::CopyWrite { .. } => true,
961
962                    // `Literal` may occur due to src or dst errors, so need to check
963                    snap::Error::Literal {
964                        len,
965                        dst_len,
966                        src_len,
967                    } => (dst_len < len) && (src_len >= len),
968
969                    // `HeaderMismatch` may also occur when the output was smaller than we predicted, in which case the
970                    // header would actually be broken
971                    snap::Error::HeaderMismatch {
972                        expected_len,
973                        got_len,
974                    } => expected_len < got_len,
975
976                    // `BufferTooSmall` cannot happed by construction, because we just allocated the right buffer
977                    snap::Error::BufferTooSmall { .. } => {
978                        unreachable!("Just allocated a correctly-sized output buffer.")
979                    }
980
981                    // `Offset` does NOT occur due undersized dst but due to invalid offset calculations. Instead
982                    // `CopyWrite` would be used.
983                    snap::Error::Offset { .. } => false,
984
985                    // All other errors are real errors
986                    _ => false,
987                };
988                let used_smaller_dst = max_uncompressed_size < uncompressed_size;
989
990                if looks_like_dst_too_small && used_smaller_dst {
991                    // try larger buffer
992                    max_uncompressed_size *= 2;
993                    continue;
994                } else {
995                    return Err(ReadError::Malformed(Box::new(e)));
996                }
997            }
998        };
999        if actual_uncompressed_size != uncompressed_size {
1000            return Err(ReadError::Malformed(
1001                "broken snappy data".to_string().into(),
1002            ));
1003        }
1004
1005        break Ok(output);
1006    }
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011    use std::io::Cursor;
1012
1013    use crate::protocol::test_utils::test_roundtrip;
1014
1015    use super::*;
1016
1017    use assert_matches::assert_matches;
1018
1019    test_roundtrip!(RecordHeader, test_record_header_roundtrip);
1020
1021    test_roundtrip!(Record, test_record_roundtrip);
1022
1023    test_roundtrip!(ControlBatchRecord, test_control_batch_record_roundtrip);
1024
1025    #[test]
1026    fn test_control_batch_record_unknown_version() {
1027        let mut buf = Cursor::new(Vec::<u8>::new());
1028        Int16(1).write(&mut buf).unwrap();
1029        Int16(0).write(&mut buf).unwrap();
1030        buf.set_position(0);
1031
1032        let err = ControlBatchRecord::read(&mut buf).unwrap_err();
1033        assert_matches!(err, ReadError::Malformed(_));
1034        assert_eq!(
1035            err.to_string(),
1036            "Malformed data: Unknown control batch record version: 1",
1037        );
1038    }
1039
1040    #[test]
1041    fn test_control_batch_record_unknown_type() {
1042        let mut buf = Cursor::new(Vec::<u8>::new());
1043        Int16(0).write(&mut buf).unwrap();
1044        Int16(2).write(&mut buf).unwrap();
1045        buf.set_position(0);
1046
1047        let err = ControlBatchRecord::read(&mut buf).unwrap_err();
1048        assert_matches!(err, ReadError::Malformed(_));
1049        assert_eq!(
1050            err.to_string(),
1051            "Malformed data: Unknown control batch record type: 2",
1052        );
1053    }
1054
1055    test_roundtrip!(RecordBatchBody, test_record_batch_body_roundtrip);
1056
1057    test_roundtrip!(RecordBatch, test_record_batch_roundtrip);
1058
1059    #[test]
1060    fn test_decode_fixture_nocompression() {
1061        // This data was obtained by watching rdkafka.
1062        let data = [
1063            b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x4b\x00\x00\x00\x00".to_vec(),
1064            b"\x02\x27\x24\xfe\xcd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x61".to_vec(),
1065            b"\xd5\x9b\x77\x00\x00\x00\x00\x61\xd5\x9b\x77\xff\xff\xff\xff\xff".to_vec(),
1066            b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x32\x00\x00".to_vec(),
1067            b"\x00\x00\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02\x06".to_vec(),
1068            b"\x66\x6f\x6f\x06\x62\x61\x72".to_vec(),
1069        ]
1070        .concat();
1071
1072        let actual = RecordBatch::read(&mut Cursor::new(data.clone())).unwrap();
1073        let expected = RecordBatch {
1074            base_offset: 0,
1075            partition_leader_epoch: 0,
1076            last_offset_delta: 0,
1077            first_timestamp: 1641388919,
1078            max_timestamp: 1641388919,
1079            producer_id: -1,
1080            producer_epoch: -1,
1081            base_sequence: -1,
1082            records: ControlBatchOrRecords::Records(vec![Record {
1083                timestamp_delta: 0,
1084                offset_delta: 0,
1085                key: Some(vec![]),
1086                value: Some(b"hello kafka".to_vec()),
1087                headers: vec![RecordHeader {
1088                    key: "foo".to_owned(),
1089                    value: b"bar".to_vec(),
1090                }],
1091            }]),
1092            compression: RecordBatchCompression::NoCompression,
1093            is_transactional: false,
1094            timestamp_type: RecordBatchTimestampType::CreateTime,
1095        };
1096        assert_eq!(actual, expected);
1097
1098        let mut data2 = vec![];
1099        actual.write(&mut data2).unwrap();
1100        assert_eq!(data, data2);
1101    }
1102
1103    #[cfg(feature = "compression-gzip")]
1104    #[test]
1105    fn test_decode_fixture_gzip() {
1106        // This data was obtained by watching rdkafka.
1107        let data = [
1108            b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x64\x00\x00\x00\x00".to_vec(),
1109            b"\x02\xba\x41\x46\x65\x00\x01\x00\x00\x00\x00\x00\x00\x01\x7e\x90".to_vec(),
1110            b"\xb3\x34\x67\x00\x00\x01\x7e\x90\xb3\x34\x67\xff\xff\xff\xff\xff".to_vec(),
1111            b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x1f\x8b\x08".to_vec(),
1112            b"\x00\x00\x00\x00\x00\x00\x03\xfb\xc3\xc8\xc0\xc0\x70\x82\xb1\x82".to_vec(),
1113            b"\x0e\x40\x2c\x23\x35\x27\x27\x5f\x21\x3b\x31\x2d\x3b\x91\x89\x2d".to_vec(),
1114            b"\x2d\x3f\x9f\x2d\x29\xb1\x08\x00\xe4\xcd\xba\x1f\x80\x00\x00\x00".to_vec(),
1115        ]
1116        .concat();
1117
1118        let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
1119        let expected = RecordBatch {
1120            base_offset: 0,
1121            partition_leader_epoch: 0,
1122            last_offset_delta: 0,
1123            first_timestamp: 1643105170535,
1124            max_timestamp: 1643105170535,
1125            producer_id: -1,
1126            producer_epoch: -1,
1127            base_sequence: -1,
1128            records: ControlBatchOrRecords::Records(vec![Record {
1129                timestamp_delta: 0,
1130                offset_delta: 0,
1131                key: Some(vec![b'x'; 100]),
1132                value: Some(b"hello kafka".to_vec()),
1133                headers: vec![RecordHeader {
1134                    key: "foo".to_owned(),
1135                    value: b"bar".to_vec(),
1136                }],
1137            }]),
1138            compression: RecordBatchCompression::Gzip,
1139            is_transactional: false,
1140            timestamp_type: RecordBatchTimestampType::CreateTime,
1141        };
1142        assert_eq!(actual, expected);
1143
1144        let mut data2 = vec![];
1145        actual.write(&mut data2).unwrap();
1146
1147        // don't compare if the data is equal because compression encoder might work slightly differently, use another
1148        // roundtrip instead
1149        let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
1150        assert_eq!(actual2, expected);
1151    }
1152
1153    #[cfg(feature = "compression-lz4")]
1154    #[test]
1155    fn test_decode_fixture_lz4() {
1156        // This data was obtained by watching rdkafka.
1157        let data = [
1158            b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x63\x00\x00\x00\x00".to_vec(),
1159            b"\x02\x1b\xa5\x92\x35\x00\x03\x00\x00\x00\x00\x00\x00\x01\x7e\xb1".to_vec(),
1160            b"\x1f\xc7\x24\x00\x00\x01\x7e\xb1\x1f\xc7\x24\xff\xff\xff\xff\xff".to_vec(),
1161            b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x04\x22\x4d".to_vec(),
1162            b"\x18\x60\x40\x82\x23\x00\x00\x00\x8f\xfc\x01\x00\x00\x00\xc8\x01".to_vec(),
1163            b"\x78\x01\x00\x50\xf0\x06\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66".to_vec(),
1164            b"\x6b\x61\x02\x06\x66\x6f\x6f\x06\x62\x61\x72\x00\x00\x00\x00".to_vec(),
1165        ]
1166        .concat();
1167
1168        let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
1169        let expected = RecordBatch {
1170            base_offset: 0,
1171            partition_leader_epoch: 0,
1172            last_offset_delta: 0,
1173            first_timestamp: 1643649156900,
1174            max_timestamp: 1643649156900,
1175            producer_id: -1,
1176            producer_epoch: -1,
1177            base_sequence: -1,
1178            records: ControlBatchOrRecords::Records(vec![Record {
1179                timestamp_delta: 0,
1180                offset_delta: 0,
1181                key: Some(vec![b'x'; 100]),
1182                value: Some(b"hello kafka".to_vec()),
1183                headers: vec![RecordHeader {
1184                    key: "foo".to_owned(),
1185                    value: b"bar".to_vec(),
1186                }],
1187            }]),
1188            compression: RecordBatchCompression::Lz4,
1189            is_transactional: false,
1190            timestamp_type: RecordBatchTimestampType::CreateTime,
1191        };
1192        assert_eq!(actual, expected);
1193
1194        let mut data2 = vec![];
1195        actual.write(&mut data2).unwrap();
1196
1197        // don't compare if the data is equal because compression encoder might work slightly differently, use another
1198        // roundtrip instead
1199        let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
1200        assert_eq!(actual2, expected);
1201    }
1202
1203    #[cfg(feature = "compression-snappy")]
1204    mod snappy {
1205        use super::*;
1206
1207        #[test]
1208        fn test_decode_fixture_snappy() {
1209            // This data was obtained by watching rdkafka.
1210            let data = [
1211                b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x58\x00\x00\x00\x00".to_vec(),
1212                b"\x02\xad\x86\xf4\xf4\x00\x02\x00\x00\x00\x00\x00\x00\x01\x7e\xb6".to_vec(),
1213                b"\x45\x0e\x52\x00\x00\x01\x7e\xb6\x45\x0e\x52\xff\xff\xff\xff\xff".to_vec(),
1214                b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x80\x01\x1c".to_vec(),
1215                b"\xfc\x01\x00\x00\x00\xc8\x01\x78\xfe\x01\x00\x8a\x01\x00\x50\x16".to_vec(),
1216                b"\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02\x06\x66\x6f\x6f".to_vec(),
1217                b"\x06\x62\x61\x72".to_vec(),
1218            ]
1219            .concat();
1220
1221            let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
1222            let expected = RecordBatch {
1223                base_offset: 0,
1224                partition_leader_epoch: 0,
1225                last_offset_delta: 0,
1226                first_timestamp: 1643735486034,
1227                max_timestamp: 1643735486034,
1228                producer_id: -1,
1229                producer_epoch: -1,
1230                base_sequence: -1,
1231                records: ControlBatchOrRecords::Records(vec![Record {
1232                    timestamp_delta: 0,
1233                    offset_delta: 0,
1234                    key: Some(vec![b'x'; 100]),
1235                    value: Some(b"hello kafka".to_vec()),
1236                    headers: vec![RecordHeader {
1237                        key: "foo".to_owned(),
1238                        value: b"bar".to_vec(),
1239                    }],
1240                }]),
1241                compression: RecordBatchCompression::Snappy,
1242                is_transactional: false,
1243                timestamp_type: RecordBatchTimestampType::CreateTime,
1244            };
1245            assert_eq!(actual, expected);
1246
1247            let mut data2 = vec![];
1248            actual.write(&mut data2).unwrap();
1249
1250            // don't compare if the data is equal because compression encoder might work slightly differently, use another
1251            // roundtrip instead
1252            let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
1253            assert_eq!(actual2, expected);
1254        }
1255
1256        #[test]
1257        fn test_decode_fixture_snappy_java() {
1258            // This data was obtained by watching Kafka returning a recording to rskafka that was produced by the official
1259            // Java client.
1260            let data = [
1261                b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x8c\x00\x00\x00\x00".to_vec(),
1262                b"\x02\x79\x1e\x2d\xce\x00\x02\x00\x00\x00\x01\x00\x00\x01\x7f\x07".to_vec(),
1263                b"\x25\x7a\xb1\x00\x00\x01\x7f\x07\x25\x7a\xb1\xff\xff\xff\xff\xff".to_vec(),
1264                b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x02\x82\x53\x4e".to_vec(),
1265                b"\x41\x50\x50\x59\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00".to_vec(),
1266                b"\x47\xff\x01\x1c\xfc\x01\x00\x00\x00\xc8\x01\x78\xfe\x01\x00\x8a".to_vec(),
1267                b"\x01\x00\x64\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02".to_vec(),
1268                b"\x06\x66\x6f\x6f\x06\x62\x61\x72\xfa\x01\x00\x00\x02\xfe\x80\x00".to_vec(),
1269                b"\x96\x80\x00\x4c\x14\x73\x6f\x6d\x65\x20\x76\x61\x6c\x75\x65\x02".to_vec(),
1270                b"\x06\x66\x6f\x6f\x06\x62\x61\x72".to_vec(),
1271            ]
1272            .concat();
1273
1274            let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
1275            let expected = RecordBatch {
1276                base_offset: 0,
1277                partition_leader_epoch: 0,
1278                last_offset_delta: 1,
1279                first_timestamp: 1645092371121,
1280                max_timestamp: 1645092371121,
1281                producer_id: -1,
1282                producer_epoch: -1,
1283                base_sequence: -1,
1284                records: ControlBatchOrRecords::Records(vec![
1285                    Record {
1286                        timestamp_delta: 0,
1287                        offset_delta: 0,
1288                        key: Some(vec![b'x'; 100]),
1289                        value: Some(b"hello kafka".to_vec()),
1290                        headers: vec![RecordHeader {
1291                            key: "foo".to_owned(),
1292                            value: b"bar".to_vec(),
1293                        }],
1294                    },
1295                    Record {
1296                        timestamp_delta: 0,
1297                        offset_delta: 1,
1298                        key: Some(vec![b'x'; 100]),
1299                        value: Some(b"some value".to_vec()),
1300                        headers: vec![RecordHeader {
1301                            key: "foo".to_owned(),
1302                            value: b"bar".to_vec(),
1303                        }],
1304                    },
1305                ]),
1306                compression: RecordBatchCompression::Snappy,
1307                is_transactional: false,
1308                timestamp_type: RecordBatchTimestampType::CreateTime,
1309            };
1310            assert_eq!(actual, expected);
1311
1312            let mut data2 = vec![];
1313            actual.write(&mut data2).unwrap();
1314
1315            // don't compare if the data is equal because compression encoder might work slightly differently, use another
1316            // roundtrip instead
1317            let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
1318            assert_eq!(actual2, expected);
1319        }
1320
1321        #[test]
1322        fn test_decode_java_specific_oom() {
1323            // Found by the fuzzer, this should return an error instead of OOM.
1324            let data = [
1325                0x0a, 0x0a, 0x83, 0x00, 0xd4, 0x00, 0x00, 0x22, 0x00, 0x4b, 0x08, 0xd2, 0x22, 0xfb,
1326                0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x9b, 0x00, 0x9b, 0x0a, 0x40,
1327                0x00, 0x00, 0x4b, 0x00, 0x00, 0x00, 0x40, 0x30, 0x00, 0x57, 0x00, 0xd3, 0x82, 0x53,
1328                0x4e, 0x41, 0x50, 0x50, 0x59, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01,
1329                0x00, 0x00, 0x00, 0x03, 0x01, 0x00, 0x00, 0xfc, 0x00, 0x09, 0x09, 0x09, 0x09, 0x09,
1330                0x09, 0x50, 0x50, 0x59, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1331                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x30, 0x00, 0x57, 0x00, 0x00, 0x80,
1332                0x00, 0x00, 0x00, 0x00, 0xb0, 0x9b, 0x00,
1333            ]
1334            .to_vec();
1335
1336            let err = RecordBatchBody::read(&mut Cursor::new(data)).unwrap_err();
1337            assert_matches!(err, ReadError::Malformed(_));
1338            assert_eq!(err.to_string(), "Malformed data: Java-specific Snappy-compressed data has illegal chunk length, got 4227860745 bytes but only 38 bytes are left.");
1339        }
1340
1341        #[test]
1342        fn test_carefully_decompress_snappy_empty_input() {
1343            let err = carefully_decompress_snappy(&[], 1).unwrap_err();
1344            assert_matches!(err, ReadError::Malformed(_));
1345        }
1346
1347        #[test]
1348        fn test_carefully_decompress_snappy_empty_payload() {
1349            let compressed = compress(&[]);
1350            let data = carefully_decompress_snappy(&compressed, 1).unwrap();
1351            assert!(data.is_empty());
1352        }
1353
1354        proptest! {
1355            #![proptest_config(ProptestConfig{cases: 200, ..Default::default()})]
1356            #[test]
1357            fn test_carefully_decompress_snappy(input in prop::collection::vec(any::<u8>(), 0..10_000)) {
1358                let compressed = compress(&input);
1359                let input2 = carefully_decompress_snappy(&compressed, 1).unwrap();
1360                assert_eq!(input, input2);
1361            }
1362        }
1363
1364        fn compress(data: &[u8]) -> Vec<u8> {
1365            use snap::raw::{max_compress_len, Encoder};
1366
1367            let mut encoder = Encoder::new();
1368            let mut output = vec![0; max_compress_len(data.len())];
1369            let l = encoder.compress(data, &mut output).unwrap();
1370
1371            output[..l].to_vec()
1372        }
1373    }
1374
1375    #[cfg(feature = "compression-zstd")]
1376    #[test]
1377    fn test_decode_fixture_zstd() {
1378        // This data was obtained by watching rdkafka.
1379        let data = [
1380            b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x5d\x00\x00\x00\x00".to_vec(),
1381            b"\x02\xa1\x6e\x4e\x95\x00\x04\x00\x00\x00\x00\x00\x00\x01\x7e\xbf".to_vec(),
1382            b"\x78\xf3\xad\x00\x00\x01\x7e\xbf\x78\xf3\xad\xff\xff\xff\xff\xff".to_vec(),
1383            b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x28\xb5\x2f".to_vec(),
1384            b"\xfd\x00\x58\x1d\x01\x00\xe8\xfc\x01\x00\x00\x00\xc8\x01\x78\x16".to_vec(),
1385            b"\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02\x06\x66\x6f\x6f".to_vec(),
1386            b"\x06\x62\x61\x72\x01\x00\x20\x05\x5c".to_vec(),
1387        ]
1388        .concat();
1389
1390        let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
1391        let expected = RecordBatch {
1392            base_offset: 0,
1393            partition_leader_epoch: 0,
1394            last_offset_delta: 0,
1395            first_timestamp: 1643889882029,
1396            max_timestamp: 1643889882029,
1397            producer_id: -1,
1398            producer_epoch: -1,
1399            base_sequence: -1,
1400            records: ControlBatchOrRecords::Records(vec![Record {
1401                timestamp_delta: 0,
1402                offset_delta: 0,
1403                key: Some(vec![b'x'; 100]),
1404                value: Some(b"hello kafka".to_vec()),
1405                headers: vec![RecordHeader {
1406                    key: "foo".to_owned(),
1407                    value: b"bar".to_vec(),
1408                }],
1409            }]),
1410            compression: RecordBatchCompression::Zstd,
1411            is_transactional: false,
1412            timestamp_type: RecordBatchTimestampType::CreateTime,
1413        };
1414        assert_eq!(actual, expected);
1415
1416        let mut data2 = vec![];
1417        actual.write(&mut data2).unwrap();
1418
1419        // don't compare if the data is equal because compression encoder might work slightly differently, use another
1420        // roundtrip instead
1421        let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
1422        assert_eq!(actual2, expected);
1423    }
1424
1425    #[test]
1426    fn test_decode_fixture_null_key() {
1427        // This data was obtained by watching rdkafka driven by IOx.
1428        let data = [
1429            b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x1a\x00\x00\x00\x00".to_vec(),
1430            b"\x02\x67\x98\xb9\x54\x00\x00\x00\x00\x00\x00\x00\x00\x01\x7e\xbe".to_vec(),
1431            b"\xdc\x91\xf6\x00\x00\x01\x7e\xbe\xdc\x91\xf6\xff\xff\xff\xff\xff".to_vec(),
1432            b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\xce\x03\x00".to_vec(),
1433            b"\x00\x00\x01\xce\x01\x0a\x65\x0a\x2f\x74\x65\x73\x74\x5f\x74\x6f".to_vec(),
1434            b"\x70\x69\x63\x5f\x33\x37\x33\x39\x38\x66\x38\x64\x2d\x39\x35\x66".to_vec(),
1435            b"\x38\x2d\x34\x34\x65\x65\x2d\x38\x33\x61\x34\x2d\x34\x64\x30\x63".to_vec(),
1436            b"\x35\x39\x32\x62\x34\x34\x36\x64\x12\x32\x0a\x03\x75\x70\x63\x12".to_vec(),
1437            b"\x17\x0a\x04\x75\x73\x65\x72\x10\x03\x1a\x0a\x12\x08\x00\x00\x00".to_vec(),
1438            b"\x00\x00\x00\xf0\x3f\x22\x01\x00\x12\x10\x0a\x04\x74\x69\x6d\x65".to_vec(),
1439            b"\x10\x04\x1a\x03\x0a\x01\x64\x22\x01\x00\x18\x01\x04\x18\x63\x6f".to_vec(),
1440            b"\x6e\x74\x65\x6e\x74\x2d\x74\x79\x70\x65\xa4\x01\x61\x70\x70\x6c".to_vec(),
1441            b"\x69\x63\x61\x74\x69\x6f\x6e\x2f\x78\x2d\x70\x72\x6f\x74\x6f\x62".to_vec(),
1442            b"\x75\x66\x3b\x20\x73\x63\x68\x65\x6d\x61\x3d\x22\x69\x6e\x66\x6c".to_vec(),
1443            b"\x75\x78\x64\x61\x74\x61\x2e\x69\x6f\x78\x2e\x77\x72\x69\x74\x65".to_vec(),
1444            b"\x5f\x62\x75\x66\x66\x65\x72\x2e\x76\x31\x2e\x57\x72\x69\x74\x65".to_vec(),
1445            b"\x42\x75\x66\x66\x65\x72\x50\x61\x79\x6c\x6f\x61\x64\x22\x1a\x69".to_vec(),
1446            b"\x6f\x78\x2d\x6e\x61\x6d\x65\x73\x70\x61\x63\x65\x12\x6e\x61\x6d".to_vec(),
1447            b"\x65\x73\x70\x61\x63\x65".to_vec(),
1448        ]
1449        .concat();
1450
1451        let actual = RecordBatch::read(&mut Cursor::new(data.clone())).unwrap();
1452        let expected = RecordBatch {
1453            base_offset: 0,
1454            partition_leader_epoch: 0,
1455            last_offset_delta: 0,
1456            first_timestamp: 1643879633398,
1457            max_timestamp: 1643879633398,
1458            producer_id: -1,
1459            producer_epoch: -1,
1460            base_sequence: -1,
1461            records: ControlBatchOrRecords::Records(vec![Record {
1462                timestamp_delta: 0,
1463                offset_delta: 0,
1464                key: None,
1465                value: Some(vec![
1466                    10, 101, 10, 47, 116, 101, 115, 116, 95, 116, 111, 112, 105, 99, 95, 51, 55,
1467                    51, 57, 56, 102, 56, 100, 45, 57, 53, 102, 56, 45, 52, 52, 101, 101, 45, 56,
1468                    51, 97, 52, 45, 52, 100, 48, 99, 53, 57, 50, 98, 52, 52, 54, 100, 18, 50, 10,
1469                    3, 117, 112, 99, 18, 23, 10, 4, 117, 115, 101, 114, 16, 3, 26, 10, 18, 8, 0, 0,
1470                    0, 0, 0, 0, 240, 63, 34, 1, 0, 18, 16, 10, 4, 116, 105, 109, 101, 16, 4, 26, 3,
1471                    10, 1, 100, 34, 1, 0, 24, 1,
1472                ]),
1473                headers: vec![
1474                    RecordHeader {
1475                        key: "content-type".to_owned(),
1476                        value: br#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""#.to_vec(),
1477                    },
1478                    RecordHeader {
1479                        key: "iox-namespace".to_owned(),
1480                        value: b"namespace".to_vec(),
1481                    },
1482                ],
1483            }]),
1484            compression: RecordBatchCompression::NoCompression,
1485            is_transactional: false,
1486            timestamp_type: RecordBatchTimestampType::CreateTime,
1487        };
1488        assert_eq!(actual, expected);
1489
1490        let mut data2 = vec![];
1491        actual.write(&mut data2).unwrap();
1492        assert_eq!(data, data2);
1493    }
1494}