kafka_protocol/
records.rs

1//! Provides utilities for working with records (Kafka messages).
2//!
3//! [`FetchResponse`](crate::messages::fetch_response::FetchResponse) and associated APIs for interacting with reading and writing
4//! contain records in a raw format, allowing the user to implement their own logic for interacting
5//! with those values.
6//!
7//! # Example
8//!
9//! Decoding a set of records from a [`FetchResponse`](crate::messages::fetch_response::FetchResponse):
10//! ```rust
11//! use kafka_protocol::messages::FetchResponse;
12//! use kafka_protocol::protocol::Decodable;
13//! use kafka_protocol::records::RecordBatchDecoder;
14//! use bytes::Bytes;
15//! use kafka_protocol::records::Compression;
16//!
17//! # const HEADER: [u8; 45] = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,];
18//! # const RECORD: [u8; 79] = [ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x43, 0x0, 0x0, 0x0, 0x0, 0x2, 0x73, 0x6d, 0x29, 0x7b, 0x0, 0b00000000, 0x0, 0x0, 0x0, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x22, 0x1, 0xd0, 0xf, 0x2, 0xa, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0xa, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x0,];
19//! # let mut res = vec![];
20//! # res.extend_from_slice(&HEADER[..]);
21//! # res.extend_from_slice(&[0x00, 0x00, 0x00, 0x4f]);
22//! # res.extend_from_slice(&RECORD[..]);
23//! # let mut buf = Bytes::from(res);
24//!
25//! let res = FetchResponse::decode(&mut buf, 4).unwrap();
26//!
27//! for topic in res.responses {
28//!     for partition in topic.partitions {
29//!          let mut records = partition.records.unwrap();
30//!          let records = RecordBatchDecoder::decode_with_custom_compression(&mut records, Some(decompress_record_batch_data)).unwrap();
31//!     }
32//! }
33//!
34//! fn decompress_record_batch_data(compressed_buffer: &mut bytes::Bytes, compression: Compression) -> anyhow::Result<Bytes> {
35//!         match compression {
36//!             Compression::None => Ok(compressed_buffer.to_vec().into()),
37//!             _ => { panic!("Compression not implemented") }
38//!         }
39//!  }
40//! ```
41use anyhow::{anyhow, bail, Result};
42use bytes::{Bytes, BytesMut};
43use crc::{Crc, CRC_32_ISO_HDLC};
44use crc32c::crc32c;
45use indexmap::IndexMap;
46
47use crate::protocol::{
48    buf::{gap, ByteBuf, ByteBufMut},
49    types, Decoder, Encoder, StrBytes,
50};
51
52use super::compression::{self as cmpr, Compressor, Decompressor};
53use std::cmp::Ordering;
54use std::convert::TryFrom;
55/// IEEE (checksum) cyclic redundancy check.
56pub const IEEE: Crc<u32> = Crc::<u32>::new(&CRC_32_ISO_HDLC);
57
58/// The different types of compression supported by Kafka.
59#[derive(Debug, Copy, Clone, PartialEq, Eq)]
60pub enum Compression {
61    /// No compression.
62    None = 0,
63    /// gzip compression library.
64    Gzip = 1,
65    /// Google's Snappy compression library.
66    Snappy = 2,
67    /// The LZ4 compression library.
68    Lz4 = 3,
69    /// Facebook's ZStandard compression library.
70    Zstd = 4,
71}
72
73/// Indicates the meaning of the timestamp field on a record.
74#[derive(Debug, Copy, Clone, PartialEq, Eq)]
75pub enum TimestampType {
76    /// The timestamp represents when the record was created by the client.
77    Creation = 0,
78    /// The timestamp represents when the record was appended to the log.
79    LogAppend = 1,
80}
81
82/// Options for encoding and compressing a batch of records. Note, not all compression algorithms
83/// are currently implemented by this library.
84pub struct RecordEncodeOptions {
85    /// Record version, 0, 1, or 2.
86    pub version: i8,
87
88    /// The compression algorithm to use.
89    pub compression: Compression,
90}
91
92/// Value to indicate missing producer id.
93pub const NO_PRODUCER_ID: i64 = -1;
94/// Value to indicate missing producer epoch.
95pub const NO_PRODUCER_EPOCH: i16 = -1;
96/// Value to indicated missing leader epoch.
97pub const NO_PARTITION_LEADER_EPOCH: i32 = -1;
98/// Value to indicate missing sequence id.
99pub const NO_SEQUENCE: i32 = -1;
100/// Value to indicate missing timestamp.
101pub const NO_TIMESTAMP: i64 = -1;
102
103#[derive(Debug, Clone)]
104/// Batch encoder for Kafka records.
105pub struct RecordBatchEncoder;
106
107#[derive(Debug, Clone)]
108/// Batch decoder for Kafka records.
109pub struct RecordBatchDecoder;
110
111struct BatchDecodeInfo {
112    record_count: usize,
113    timestamp_type: TimestampType,
114    min_offset: i64,
115    min_timestamp: i64,
116    base_sequence: i32,
117    transactional: bool,
118    control: bool,
119    partition_leader_epoch: i32,
120    producer_id: i64,
121    producer_epoch: i16,
122}
123
124/// Record compression for a set of records.
125#[derive(Debug, Clone, PartialEq, Eq)]
126pub enum RecordCompression {
127    /// The set of records was a record batch with the given `Compression`.
128    RecordBatch(Compression),
129    /// The set of records was a message set and does not have a well-defined `Compression`.
130    MessageSet,
131}
132
133/// Decoded records plus information about compression.
134#[derive(Debug, Clone, PartialEq, Eq)]
135pub struct RecordSet {
136    /// Compression used for this set of records
137    pub compression: RecordCompression,
138    /// Version used to encode the set of records
139    pub version: i8,
140    /// Records decoded in this set
141    pub records: Vec<Record>,
142}
143
144/// A Kafka message containing key, payload value, and all associated metadata.
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct Record {
147    // Batch properties
148    /// Whether this record is transactional.
149    pub transactional: bool,
150    /// Whether this record is a control message, which should not be exposed to the client.
151    pub control: bool,
152    /// Epoch of the leader for this record 's partition.
153    pub partition_leader_epoch: i32,
154    /// The identifier of the producer.
155    pub producer_id: i64,
156    /// Producer metadata used to implement transactional writes.
157    pub producer_epoch: i16,
158
159    // Record properties
160    /// Indicates whether timestamp represents record creation or appending to the log.
161    pub timestamp_type: TimestampType,
162    /// Message offset within a partition.
163    pub offset: i64,
164    /// Sequence identifier used for idempotent delivery.
165    pub sequence: i32,
166    /// Timestamp the record. See also `timestamp_type`.
167    pub timestamp: i64,
168    /// The key of the record.
169    pub key: Option<Bytes>,
170    /// The payload of the record.
171    pub value: Option<Bytes>,
172    /// Headers associated with the record's payload.
173    pub headers: IndexMap<StrBytes, Option<Bytes>>,
174}
175
176const MAGIC_BYTE_OFFSET: usize = 16;
177
178impl RecordBatchEncoder {
179    /// Encode records into given buffer, using provided encoding options that select the encoding
180    /// strategy based on version.
181    pub fn encode<'a, B, I>(buf: &mut B, records: I, options: &RecordEncodeOptions) -> Result<()>
182    where
183        B: ByteBufMut,
184        I: IntoIterator<Item = &'a Record>,
185        I::IntoIter: Clone,
186    {
187        Self::encode_with_custom_compression(
188            buf,
189            records,
190            options,
191            None::<fn(&mut BytesMut, &mut B, Compression) -> Result<()>>,
192        )
193    }
194
195    /// Encode records into given buffer, using provided encoding options that select the encoding
196    /// strategy based on version.
197    /// # Arguments
198    /// * `compressor` - A function that compresses the given batch of records.
199    ///
200    /// If `None`, the right compression algorithm will automatically be selected and applied.
201    pub fn encode_with_custom_compression<'a, B, I, CF>(
202        buf: &mut B,
203        records: I,
204        options: &RecordEncodeOptions,
205        compressor: Option<CF>,
206    ) -> Result<()>
207    where
208        B: ByteBufMut,
209        I: IntoIterator<Item = &'a Record>,
210        I::IntoIter: Clone,
211        CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
212    {
213        let records = records.into_iter();
214        match options.version {
215            0..=1 => Self::encode_legacy(buf, records, options, compressor),
216            2 => Self::encode_new(buf, records, options, compressor),
217            _ => bail!("Unknown record batch version"),
218        }
219    }
220    fn encode_legacy_records<'a, B, I>(
221        buf: &mut B,
222        records: I,
223        options: &RecordEncodeOptions,
224    ) -> Result<()>
225    where
226        B: ByteBufMut,
227        I: Iterator<Item = &'a Record> + Clone,
228    {
229        for record in records {
230            record.encode_legacy(buf, options)?;
231        }
232        Ok(())
233    }
234    fn encode_legacy<'a, B, I, CF>(
235        buf: &mut B,
236        records: I,
237        options: &RecordEncodeOptions,
238        compressor: Option<CF>,
239    ) -> Result<()>
240    where
241        B: ByteBufMut,
242        I: Iterator<Item = &'a Record> + Clone,
243        CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
244    {
245        if options.compression == Compression::None {
246            // No wrapper needed
247            Self::encode_legacy_records(buf, records, options)?;
248        } else {
249            // Need a "wrapper" message
250            let inner_opts = RecordEncodeOptions {
251                compression: Compression::None,
252                version: options.version,
253            };
254
255            Record::encode_legacy_static(buf, options, |buf| {
256                // Timestamp
257                if options.version > 0 {
258                    let min_timestamp = records
259                        .clone()
260                        .map(|r| r.timestamp)
261                        .min()
262                        .unwrap_or_default();
263                    types::Int64.encode(buf, min_timestamp)?;
264                };
265
266                // Key
267                buf.put_i32(-1);
268
269                // Value (Compressed MessageSet)
270                let size_gap = buf.put_typed_gap(gap::I32);
271                let value_start = buf.offset();
272                if let Some(compressor) = compressor {
273                    let mut encoded_buf = BytesMut::new();
274                    Self::encode_legacy_records(&mut encoded_buf, records, &inner_opts)?;
275                    compressor(&mut encoded_buf, buf, options.compression)?;
276                } else {
277                    match options.compression {
278                        #[cfg(feature = "snappy")]
279                        Compression::Snappy => cmpr::Snappy::compress(buf, |buf| {
280                            Self::encode_legacy_records(buf, records, &inner_opts)
281                        })?,
282                        #[cfg(feature = "gzip")]
283                        Compression::Gzip => cmpr::Gzip::compress(buf, |buf| {
284                            Self::encode_legacy_records(buf, records, &inner_opts)
285                        })?,
286                        #[cfg(feature = "lz4")]
287                        Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| {
288                            Self::encode_legacy_records(buf, records, &inner_opts)
289                        })?,
290                        #[cfg(feature = "zstd")]
291                        Compression::Zstd => cmpr::Zstd::compress(buf, |buf| {
292                            Self::encode_legacy_records(buf, records, &inner_opts)
293                        })?,
294                        c => {
295                            return Err(anyhow!(
296                                "Support for {c:?} is not enabled as a cargo feature"
297                            ))
298                        }
299                    }
300                }
301
302                let value_end = buf.offset();
303                let value_size = value_end - value_start;
304                if value_size > i32::MAX as usize {
305                    bail!(
306                        "Record batch was too large to encode ({} bytes)",
307                        value_size
308                    );
309                }
310                buf.fill_typed_gap(size_gap, value_size as i32);
311
312                Ok(())
313            })?;
314        }
315        Ok(())
316    }
317
318    fn encode_new_records<'a, B, I>(
319        buf: &mut B,
320        records: I,
321        min_offset: i64,
322        min_timestamp: i64,
323        options: &RecordEncodeOptions,
324    ) -> Result<()>
325    where
326        B: ByteBufMut,
327        I: Iterator<Item = &'a Record>,
328    {
329        for record in records {
330            record.encode_new(buf, min_offset, min_timestamp, options)?;
331        }
332        Ok(())
333    }
334
335    fn encode_new_batch<'a, B, I, CF>(
336        buf: &mut B,
337        records: &mut I,
338        options: &RecordEncodeOptions,
339        compressor: Option<&CF>,
340    ) -> Result<bool>
341    where
342        B: ByteBufMut,
343        I: Iterator<Item = &'a Record> + Clone,
344        CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
345    {
346        let mut record_peeker = records.clone();
347
348        // Get first record
349        let first_record = match record_peeker.next() {
350            Some(record) => record,
351            None => return Ok(false),
352        };
353
354        // Determine how many additional records can be included in the batch
355        let num_records = record_peeker
356            .take_while(|record| {
357                record.transactional == first_record.transactional
358                    && record.control == first_record.control
359                    && record.partition_leader_epoch == first_record.partition_leader_epoch
360                    && record.producer_id == first_record.producer_id
361                    && record.producer_epoch == first_record.producer_epoch
362                    && (record.offset as i32).wrapping_sub(record.sequence)
363                        == (first_record.offset as i32).wrapping_sub(first_record.sequence)
364            })
365            .count()
366            + 1;
367
368        // Aggregate various record properties
369        let min_offset = records
370            .clone()
371            .take(num_records)
372            .map(|r| r.offset)
373            .min()
374            .expect("Batch contains at least one element");
375        let max_offset = records
376            .clone()
377            .take(num_records)
378            .map(|r| r.offset)
379            .max()
380            .expect("Batch contains at least one element");
381        let min_timestamp = records
382            .clone()
383            .take(num_records)
384            .map(|r| r.timestamp)
385            .min()
386            .expect("Batch contains at least one element");
387        let max_timestamp = records
388            .clone()
389            .take(num_records)
390            .map(|r| r.timestamp)
391            .max()
392            .expect("Batch contains at least one element");
393        let base_sequence = first_record
394            .sequence
395            .wrapping_sub((first_record.offset - min_offset) as i32);
396
397        // Base offset
398        types::Int64.encode(buf, min_offset)?;
399
400        // Batch length
401        let size_gap = buf.put_typed_gap(gap::I32);
402        let batch_start = buf.offset();
403
404        // Partition leader epoch
405        types::Int32.encode(buf, first_record.partition_leader_epoch)?;
406
407        // Magic byte
408        types::Int8.encode(buf, options.version)?;
409
410        // CRC
411        let crc_gap = buf.put_typed_gap(gap::U32);
412        let content_start = buf.offset();
413
414        // Attributes
415        let mut attributes = options.compression as i16;
416        if first_record.transactional {
417            attributes |= 1 << 4;
418        }
419        if first_record.control {
420            attributes |= 1 << 5;
421        }
422        types::Int16.encode(buf, attributes)?;
423
424        // Last offset delta
425        types::Int32.encode(buf, (max_offset - min_offset) as i32)?;
426
427        // First timestamp
428        types::Int64.encode(buf, min_timestamp)?;
429
430        // Last timestamp
431        types::Int64.encode(buf, max_timestamp)?;
432
433        // Producer ID
434        types::Int64.encode(buf, first_record.producer_id)?;
435
436        // Producer epoch
437        types::Int16.encode(buf, first_record.producer_epoch)?;
438
439        // Base sequence
440        types::Int32.encode(buf, base_sequence)?;
441
442        // Record count
443        if num_records > i32::MAX as usize {
444            bail!(
445                "Too many records to encode in one batch ({} records)",
446                num_records
447            );
448        }
449        types::Int32.encode(buf, num_records as i32)?;
450
451        // Records
452        let records = records.take(num_records);
453
454        if let Some(compressor) = compressor {
455            let mut record_buf = BytesMut::new();
456            Self::encode_new_records(&mut record_buf, records, min_offset, min_timestamp, options)?;
457            compressor(&mut record_buf, buf, options.compression)?;
458        } else {
459            match options.compression {
460                Compression::None => cmpr::None::compress(buf, |buf| {
461                    Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
462                })?,
463                #[cfg(feature = "snappy")]
464                Compression::Snappy => cmpr::Snappy::compress(buf, |buf| {
465                    Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
466                })?,
467                #[cfg(feature = "gzip")]
468                Compression::Gzip => cmpr::Gzip::compress(buf, |buf| {
469                    Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
470                })?,
471                #[cfg(feature = "lz4")]
472                Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| {
473                    Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
474                })?,
475                #[cfg(feature = "zstd")]
476                Compression::Zstd => cmpr::Zstd::compress(buf, |buf| {
477                    Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
478                })?,
479                #[allow(unreachable_patterns)]
480                c => {
481                    return Err(anyhow!(
482                        "Support for {c:?} is not enabled as a cargo feature"
483                    ))
484                }
485            }
486        }
487        let batch_end = buf.offset();
488
489        // Fill size gap
490        let batch_size = batch_end - batch_start;
491        if batch_size > i32::MAX as usize {
492            bail!(
493                "Record batch was too large to encode ({} bytes)",
494                batch_size
495            );
496        }
497
498        buf.fill_typed_gap(size_gap, batch_size as i32);
499
500        // Fill CRC gap
501        let crc = crc32c(buf.range(content_start..batch_end));
502        buf.fill_typed_gap(crc_gap, crc);
503
504        Ok(true)
505    }
506
507    fn encode_new<'a, B, I, CF>(
508        buf: &mut B,
509        mut records: I,
510        options: &RecordEncodeOptions,
511        compressor: Option<CF>,
512    ) -> Result<()>
513    where
514        B: ByteBufMut,
515        I: Iterator<Item = &'a Record> + Clone,
516        CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
517    {
518        while Self::encode_new_batch(buf, &mut records, options, compressor.as_ref())? {}
519        Ok(())
520    }
521}
522
523impl RecordBatchDecoder {
524    /// Decode one RecordSet from the provided buffer.
525    /// # Arguments
526    /// * `decompressor` - A function that decompresses the given batch of records.
527    ///
528    /// If `None`, the right decompression algorithm will automatically be selected and applied.
529    pub fn decode_with_custom_compression<B: ByteBuf, F>(
530        buf: &mut B,
531        decompressor: Option<F>,
532    ) -> Result<RecordSet>
533    where
534        F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
535    {
536        let mut records = Vec::new();
537        let (version, compression) =
538            Self::decode_into_vec(buf, &mut records, decompressor.as_ref())?;
539        Ok(RecordSet {
540            version,
541            compression,
542            records,
543        })
544    }
545
546    /// Decode the entire buffer into a vec of RecordSets.
547    pub fn decode_all<B: ByteBuf>(buf: &mut B) -> Result<Vec<RecordSet>> {
548        let mut batches = Vec::new();
549        while buf.has_remaining() {
550            batches.push(Self::decode(buf)?);
551        }
552        Ok(batches)
553    }
554
555    /// Decode one RecordSet from the provided buffer.
556    pub fn decode<B: ByteBuf>(buf: &mut B) -> Result<RecordSet> {
557        Self::decode_with_custom_compression(
558            buf,
559            None::<fn(&mut bytes::Bytes, Compression) -> Result<B>>.as_ref(),
560        )
561    }
562
563    fn decode_into_vec<B: ByteBuf, F>(
564        buf: &mut B,
565        records: &mut Vec<Record>,
566        decompress_func: Option<&F>,
567    ) -> Result<(i8, RecordCompression)>
568    where
569        F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
570    {
571        let version = buf.try_peek_bytes(MAGIC_BYTE_OFFSET..(MAGIC_BYTE_OFFSET + 1))?[0] as i8;
572        let compression = match version {
573            0..=1 => {
574                Record::decode_legacy(buf, version, records).map(|()| RecordCompression::MessageSet)
575            }
576            2 => Self::decode_new_batch(buf, version, records, decompress_func)
577                .map(RecordCompression::RecordBatch),
578            _ => {
579                bail!("Unknown record batch version ({})", version);
580            }
581        }?;
582        Ok((version, compression))
583    }
584    fn decode_new_records<B: ByteBuf>(
585        buf: &mut B,
586        batch_decode_info: &BatchDecodeInfo,
587        version: i8,
588        records: &mut Vec<Record>,
589    ) -> Result<()> {
590        records.reserve(batch_decode_info.record_count);
591        for _ in 0..batch_decode_info.record_count {
592            records.push(Record::decode_new(buf, batch_decode_info, version)?);
593        }
594        Ok(())
595    }
596    fn decode_new_batch<B: ByteBuf, F>(
597        buf: &mut B,
598        version: i8,
599        records: &mut Vec<Record>,
600        decompress_func: Option<&F>,
601    ) -> Result<Compression>
602    where
603        F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
604    {
605        // Base offset
606        let min_offset = types::Int64.decode(buf)?;
607
608        // Batch length
609        let batch_length: i32 = types::Int32.decode(buf)?;
610        if batch_length < 0 {
611            bail!("Unexpected negative batch size: {}", batch_length);
612        }
613
614        // Convert buf to bytes
615        let buf = &mut buf.try_get_bytes(batch_length as usize)?;
616
617        // Partition leader epoch
618        let partition_leader_epoch = types::Int32.decode(buf)?;
619
620        // Magic byte
621        let magic: i8 = types::Int8.decode(buf)?;
622        if magic != version {
623            bail!("Version mismatch ({} != {})", magic, version);
624        }
625
626        // CRC
627        let supplied_crc: u32 = types::UInt32.decode(buf)?;
628        let actual_crc = crc32c(buf);
629
630        if supplied_crc != actual_crc {
631            bail!(
632                "Cyclic redundancy check failed ({} != {})",
633                supplied_crc,
634                actual_crc
635            );
636        }
637
638        // Attributes
639        let attributes: i16 = types::Int16.decode(buf)?;
640        let transactional = (attributes & (1 << 4)) != 0;
641        let control = (attributes & (1 << 5)) != 0;
642        let compression = match attributes & 0x7 {
643            0 => Compression::None,
644            1 => Compression::Gzip,
645            2 => Compression::Snappy,
646            3 => Compression::Lz4,
647            4 => Compression::Zstd,
648            other => {
649                bail!("Unknown compression algorithm used: {}", other);
650            }
651        };
652        let timestamp_type = if (attributes & (1 << 3)) != 0 {
653            TimestampType::LogAppend
654        } else {
655            TimestampType::Creation
656        };
657
658        // Last offset delta
659        let _max_offset_delta: i32 = types::Int32.decode(buf)?;
660
661        // First timestamp
662        let min_timestamp = types::Int64.decode(buf)?;
663
664        // Last timestamp
665        let _max_timestamp: i64 = types::Int64.decode(buf)?;
666
667        // Producer ID
668        let producer_id = types::Int64.decode(buf)?;
669
670        // Producer epoch
671        let producer_epoch = types::Int16.decode(buf)?;
672
673        // Base sequence
674        let base_sequence = types::Int32.decode(buf)?;
675
676        // Record count
677        let record_count: i32 = types::Int32.decode(buf)?;
678        if record_count < 0 {
679            bail!("Unexpected negative record count ({})", record_count);
680        }
681        let record_count = record_count as usize;
682
683        let batch_decode_info = BatchDecodeInfo {
684            record_count,
685            timestamp_type,
686            min_offset,
687            min_timestamp,
688            base_sequence,
689            transactional,
690            control,
691            partition_leader_epoch,
692            producer_id,
693            producer_epoch,
694        };
695
696        if let Some(decompress_func) = decompress_func {
697            let mut decompressed_buf = decompress_func(buf, compression)?;
698
699            Self::decode_new_records(&mut decompressed_buf, &batch_decode_info, version, records)?;
700        } else {
701            match compression {
702                Compression::None => cmpr::None::decompress(buf, |buf| {
703                    Self::decode_new_records(buf, &batch_decode_info, version, records)
704                })?,
705                #[cfg(feature = "snappy")]
706                Compression::Snappy => cmpr::Snappy::decompress(buf, |buf| {
707                    Self::decode_new_records(buf, &batch_decode_info, version, records)
708                })?,
709                #[cfg(feature = "gzip")]
710                Compression::Gzip => cmpr::Gzip::decompress(buf, |buf| {
711                    Self::decode_new_records(buf, &batch_decode_info, version, records)
712                })?,
713                #[cfg(feature = "zstd")]
714                Compression::Zstd => cmpr::Zstd::decompress(buf, |buf| {
715                    Self::decode_new_records(buf, &batch_decode_info, version, records)
716                })?,
717                #[cfg(feature = "lz4")]
718                Compression::Lz4 => cmpr::Lz4::decompress(buf, |buf| {
719                    Self::decode_new_records(buf, &batch_decode_info, version, records)
720                })?,
721                #[allow(unreachable_patterns)]
722                c => {
723                    return Err(anyhow!(
724                        "Support for {c:?} is not enabled as a cargo feature"
725                    ))
726                }
727            };
728        }
729
730        Ok(compression)
731    }
732}
733
734impl Record {
735    fn encode_legacy_static<B, F>(
736        buf: &mut B,
737        options: &RecordEncodeOptions,
738        content_writer: F,
739    ) -> Result<()>
740    where
741        B: ByteBufMut,
742        F: FnOnce(&mut B) -> Result<()>,
743    {
744        types::Int64.encode(buf, 0)?;
745        let size_gap = buf.put_typed_gap(gap::I32);
746        let message_start = buf.offset();
747        let crc_gap = buf.put_typed_gap(gap::U32);
748        let content_start = buf.offset();
749
750        types::Int8.encode(buf, options.version)?;
751
752        let compression = options.compression as i8;
753        if compression > 2 + options.version {
754            bail!(
755                "Compression algorithm '{:?}' is unsupported for record version '{}'",
756                options.compression,
757                options.version
758            );
759        }
760        types::Int8.encode(buf, compression)?;
761
762        // Write content
763        content_writer(buf)?;
764
765        let message_end = buf.offset();
766
767        let message_size = message_end - message_start;
768        if message_start > i32::MAX as usize {
769            bail!("Record was too large to encode ({} bytes)", message_size);
770        }
771        buf.fill_typed_gap(size_gap, message_size as i32);
772
773        let crc = IEEE.checksum(buf.range(content_start..message_end));
774        buf.fill_typed_gap(crc_gap, crc);
775
776        Ok(())
777    }
778    fn encode_legacy<B: ByteBufMut>(
779        &self,
780        buf: &mut B,
781        options: &RecordEncodeOptions,
782    ) -> Result<()> {
783        if self.transactional || self.control {
784            bail!("Transactional and control records are not supported in this version of the protocol!");
785        }
786
787        if !self.headers.is_empty() {
788            bail!("Record headers are not supported in this version of the protocol!");
789        }
790
791        Self::encode_legacy_static(buf, options, |buf| {
792            if options.version > 0 {
793                types::Int64.encode(buf, self.timestamp)?;
794            }
795            types::Bytes.encode(buf, &self.key)?;
796            types::Bytes.encode(buf, &self.value)?;
797
798            Ok(())
799        })
800    }
801    fn encode_new<B: ByteBufMut>(
802        &self,
803        buf: &mut B,
804        min_offset: i64,
805        min_timestamp: i64,
806        options: &RecordEncodeOptions,
807    ) -> Result<()> {
808        // Size
809        let size = self.compute_size_new(min_offset, min_timestamp, options)?;
810        if size > i32::MAX as usize {
811            bail!("Record was too large to encode ({} bytes)", size);
812        }
813        types::VarInt.encode(buf, size as i32)?;
814
815        // Attributes
816        types::Int8.encode(buf, 0)?;
817
818        // Timestamp delta
819        let timestamp_delta = self.timestamp - min_timestamp;
820        if timestamp_delta > i32::MAX as i64 || timestamp_delta < i32::MIN as i64 {
821            bail!(
822                "Timestamps within batch are too far apart ({}, {})",
823                min_timestamp,
824                self.timestamp
825            );
826        }
827        types::VarInt.encode(buf, timestamp_delta as i32)?;
828
829        // Offset delta
830        let offset_delta = self.offset - min_offset;
831        if offset_delta > i32::MAX as i64 || offset_delta < i32::MIN as i64 {
832            bail!(
833                "Timestamps within batch are too far apart ({}, {})",
834                min_offset,
835                self.offset
836            );
837        }
838        types::VarInt.encode(buf, offset_delta as i32)?;
839
840        // Key
841        if let Some(k) = self.key.as_ref() {
842            if k.len() > i32::MAX as usize {
843                bail!("Record key was too large to encode ({} bytes)", k.len());
844            }
845            types::VarInt.encode(buf, k.len() as i32)?;
846            buf.put_slice(k);
847        } else {
848            types::VarInt.encode(buf, -1)?;
849        }
850
851        // Value
852        if let Some(v) = self.value.as_ref() {
853            if v.len() > i32::MAX as usize {
854                bail!("Record value was too large to encode ({} bytes)", v.len());
855            }
856            types::VarInt.encode(buf, v.len() as i32)?;
857            buf.put_slice(v);
858        } else {
859            types::VarInt.encode(buf, -1)?;
860        }
861
862        // Headers
863        if self.headers.len() > i32::MAX as usize {
864            bail!("Too many record headers encode ({})", self.headers.len());
865        }
866        types::VarInt.encode(buf, self.headers.len() as i32)?;
867        for (k, v) in &self.headers {
868            // Key len
869            if k.len() > i32::MAX as usize {
870                bail!(
871                    "Record header key was too large to encode ({} bytes)",
872                    k.len()
873                );
874            }
875            types::VarInt.encode(buf, k.len() as i32)?;
876
877            // Key
878            buf.put_slice(k.as_ref());
879
880            // Value
881            if let Some(v) = v.as_ref() {
882                if v.len() > i32::MAX as usize {
883                    bail!(
884                        "Record header value was too large to encode ({} bytes)",
885                        v.len()
886                    );
887                }
888                types::VarInt.encode(buf, v.len() as i32)?;
889                buf.put_slice(v);
890            } else {
891                types::VarInt.encode(buf, -1)?;
892            }
893        }
894
895        Ok(())
896    }
897    fn compute_size_new(
898        &self,
899        min_offset: i64,
900        min_timestamp: i64,
901        _options: &RecordEncodeOptions,
902    ) -> Result<usize> {
903        let mut total_size = 0;
904
905        // Attributes
906        total_size += types::Int8.compute_size(0)?;
907
908        // Timestamp delta
909        let timestamp_delta = self.timestamp - min_timestamp;
910        if timestamp_delta > i32::MAX as i64 || timestamp_delta < i32::MIN as i64 {
911            bail!(
912                "Timestamps within batch are too far apart ({}, {})",
913                min_timestamp,
914                self.timestamp
915            );
916        }
917        total_size += types::VarInt.compute_size(timestamp_delta as i32)?;
918
919        // Offset delta
920        let offset_delta = self.offset - min_offset;
921        if offset_delta > i32::MAX as i64 || offset_delta < i32::MIN as i64 {
922            bail!(
923                "Timestamps within batch are too far apart ({}, {})",
924                min_offset,
925                self.offset
926            );
927        }
928        total_size += types::VarInt.compute_size(offset_delta as i32)?;
929
930        // Key
931        if let Some(k) = self.key.as_ref() {
932            if k.len() > i32::MAX as usize {
933                bail!("Record key was too large to encode ({} bytes)", k.len());
934            }
935            total_size += types::VarInt.compute_size(k.len() as i32)?;
936            total_size += k.len();
937        } else {
938            total_size += types::VarInt.compute_size(-1)?;
939        }
940
941        // Value len
942        if let Some(v) = self.value.as_ref() {
943            if v.len() > i32::MAX as usize {
944                bail!("Record value was too large to encode ({} bytes)", v.len());
945            }
946            total_size += types::VarInt.compute_size(v.len() as i32)?;
947            total_size += v.len();
948        } else {
949            total_size += types::VarInt.compute_size(-1)?;
950        }
951
952        // Headers
953        if self.headers.len() > i32::MAX as usize {
954            bail!("Too many record headers encode ({})", self.headers.len());
955        }
956        total_size += types::VarInt.compute_size(self.headers.len() as i32)?;
957        for (k, v) in &self.headers {
958            // Key len
959            if k.len() > i32::MAX as usize {
960                bail!(
961                    "Record header key was too large to encode ({} bytes)",
962                    k.len()
963                );
964            }
965            total_size += types::VarInt.compute_size(k.len() as i32)?;
966
967            // Key
968            total_size += k.len();
969
970            // Value
971            if let Some(v) = v.as_ref() {
972                if v.len() > i32::MAX as usize {
973                    bail!(
974                        "Record header value was too large to encode ({} bytes)",
975                        v.len()
976                    );
977                }
978                total_size += types::VarInt.compute_size(v.len() as i32)?;
979                total_size += v.len();
980            } else {
981                total_size += types::VarInt.compute_size(-1)?;
982            }
983        }
984
985        Ok(total_size)
986    }
987    fn decode_legacy<B: ByteBuf>(
988        buf: &mut B,
989        version: i8,
990        records: &mut Vec<Record>,
991    ) -> Result<()> {
992        let offset = types::Int64.decode(buf)?;
993        let size: i32 = types::Int32.decode(buf)?;
994        if size < 0 {
995            bail!("Unexpected negative record size: {}", size);
996        }
997
998        // Ensure we don't over-read
999        let buf = &mut buf.try_get_bytes(size as usize)?;
1000
1001        // CRC
1002        let supplied_crc: u32 = types::UInt32.decode(buf)?;
1003        let actual_crc = IEEE.checksum(buf);
1004
1005        if supplied_crc != actual_crc {
1006            bail!(
1007                "Cyclic redundancy check failed ({} != {})",
1008                supplied_crc,
1009                actual_crc
1010            );
1011        }
1012
1013        // Magic
1014        let magic: i8 = types::Int8.decode(buf)?;
1015        if magic != version {
1016            bail!("Version mismatch ({} != {})", magic, version);
1017        }
1018
1019        // Attributes
1020        let attributes: i8 = types::Int8.decode(buf)?;
1021        let compression = match attributes & 0x7 {
1022            0 => Compression::None,
1023            1 => Compression::Gzip,
1024            2 => Compression::Snappy,
1025            3 if version > 0 => Compression::Lz4,
1026            other => {
1027                bail!("Unknown compression algorithm used: {}", other);
1028            }
1029        };
1030        let timestamp_type = if (attributes & (1 << 3)) != 0 {
1031            TimestampType::LogAppend
1032        } else {
1033            TimestampType::Creation
1034        };
1035
1036        // Write content
1037        let timestamp = if version > 0 {
1038            types::Int64.decode(buf)?
1039        } else {
1040            NO_TIMESTAMP
1041        };
1042        let key = types::Bytes.decode(buf)?;
1043        let value = types::Bytes.decode(buf)?;
1044
1045        if compression == Compression::None {
1046            // Uncompressed record
1047            records.push(Record {
1048                transactional: false,
1049                control: false,
1050                partition_leader_epoch: NO_PARTITION_LEADER_EPOCH,
1051                producer_id: NO_PRODUCER_ID,
1052                producer_epoch: NO_PRODUCER_EPOCH,
1053                sequence: NO_SEQUENCE,
1054                timestamp_type,
1055                offset,
1056                timestamp,
1057                key,
1058                value,
1059                headers: Default::default(),
1060            });
1061        } else {
1062            // Wrapper record around a compressed MessageSet
1063            let mut value = value
1064                .ok_or_else(|| anyhow!("Received compressed legacy record without a value"))?;
1065
1066            while !value.is_empty() {
1067                Record::decode_legacy(&mut value, version, records)?;
1068            }
1069        }
1070
1071        Ok(())
1072    }
1073    fn decode_new<B: ByteBuf>(
1074        buf: &mut B,
1075        batch_decode_info: &BatchDecodeInfo,
1076        _version: i8,
1077    ) -> Result<Self> {
1078        // Size
1079        let size: i32 = types::VarInt.decode(buf)?;
1080        if size < 0 {
1081            bail!("Unexpected negative record size: {}", size);
1082        }
1083
1084        // Ensure we don't over-read
1085        let buf = &mut buf.try_get_bytes(size as usize)?;
1086
1087        // Attributes
1088        let _attributes: i8 = types::Int8.decode(buf)?;
1089
1090        // Timestamp delta
1091        let timestamp_delta: i32 = types::VarInt.decode(buf)?;
1092        let timestamp = batch_decode_info.min_timestamp + timestamp_delta as i64;
1093
1094        // Offset delta
1095        let offset_delta: i32 = types::VarInt.decode(buf)?;
1096        let offset = batch_decode_info.min_offset + offset_delta as i64;
1097        let sequence = batch_decode_info.base_sequence.wrapping_add(offset_delta);
1098
1099        // Key
1100        let key_len: i32 = types::VarInt.decode(buf)?;
1101        let key = match key_len.cmp(&-1) {
1102            Ordering::Less => {
1103                bail!("Unexpected negative record key length ({} bytes)", key_len);
1104            }
1105            Ordering::Equal => None,
1106            Ordering::Greater => Some(buf.try_get_bytes(key_len as usize)?),
1107        };
1108
1109        // Value
1110        let value_len: i32 = types::VarInt.decode(buf)?;
1111        let value = match value_len.cmp(&-1) {
1112            Ordering::Less => {
1113                bail!(
1114                    "Unexpected negative record value length ({} bytes)",
1115                    value_len
1116                );
1117            }
1118            Ordering::Equal => None,
1119            Ordering::Greater => Some(buf.try_get_bytes(value_len as usize)?),
1120        };
1121
1122        // Headers
1123        let num_headers: i32 = types::VarInt.decode(buf)?;
1124        if num_headers < 0 {
1125            bail!("Unexpected negative record header count: {}", num_headers);
1126        }
1127        let num_headers = num_headers as usize;
1128
1129        let mut headers = IndexMap::with_capacity(num_headers);
1130        for _ in 0..num_headers {
1131            // Key len
1132            let key_len: i32 = types::VarInt.decode(buf)?;
1133            if key_len < 0 {
1134                bail!(
1135                    "Unexpected negative record header key length ({} bytes)",
1136                    key_len
1137                );
1138            }
1139
1140            // Key
1141            let key = StrBytes::try_from(buf.try_get_bytes(key_len as usize)?)?;
1142
1143            // Key len
1144            let value_len: i32 = types::VarInt.decode(buf)?;
1145
1146            // Value
1147            let value = match value_len.cmp(&-2) {
1148                Ordering::Less => {
1149                    bail!(
1150                        "Unexpected negative record header value length ({} bytes)",
1151                        value_len
1152                    );
1153                }
1154                Ordering::Equal => None,
1155                Ordering::Greater => Some(buf.try_get_bytes(value_len as usize)?),
1156            };
1157
1158            headers.insert(key, value);
1159        }
1160
1161        Ok(Self {
1162            transactional: batch_decode_info.transactional,
1163            control: batch_decode_info.control,
1164            timestamp_type: batch_decode_info.timestamp_type,
1165            partition_leader_epoch: batch_decode_info.partition_leader_epoch,
1166            producer_id: batch_decode_info.producer_id,
1167            producer_epoch: batch_decode_info.producer_epoch,
1168            sequence,
1169            offset,
1170            timestamp,
1171            key,
1172            value,
1173            headers,
1174        })
1175    }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180    use bytes::Bytes;
1181
1182    use super::{Record, TimestampType};
1183
1184    #[test]
1185    fn lookup_header_via_u8_slice() {
1186        let record = Record {
1187            transactional: false,
1188            control: false,
1189            partition_leader_epoch: 0,
1190            producer_id: 0,
1191            producer_epoch: 0,
1192            sequence: 0,
1193            timestamp_type: TimestampType::Creation,
1194            offset: Default::default(),
1195            timestamp: Default::default(),
1196            key: Default::default(),
1197            value: Default::default(),
1198            headers: [
1199                ("some-key".into(), Some("some-value".into())),
1200                ("other-header".into(), None),
1201            ]
1202            .into(),
1203        };
1204        assert_eq!(
1205            Bytes::from("some-value"),
1206            record
1207                .headers
1208                // This relies on `impl Borrow<[u8]> for StrBytes`
1209                .get("some-key".as_bytes())
1210                .expect("key exists in headers")
1211                .as_ref()
1212                .expect("value is present")
1213        );
1214    }
1215}